分享

rocketmq问题汇总

 hh3755 2016-07-06

FAQ: Producer、Consumer服务状态不正确 #43

遇到过一次,原因是因为PRODUCER启动失败了,启动失败的原因,是新创建了两个producer.因为当时是按每个topic创建一个producer这样的想法来的。现在想来有点傻。

 


以下只针对集群模式:

1 producer

  1. 默认情况下不需要设置instanceName,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示
  2. 如果同一个jvm中,不同的producer需要往不同的rocketmq集群发送消息,需要设置不同的instanceName
    1. 原因如下:如果不设置instanceName,那么会使用ip@pid作为producer唯一标识,那么会导致多个producer内部只有一个MQClientInstance(与mq交互)实例,从而导致只往一个集群发消息。

2 consumer

  1. 默认情况下不需要设置instanceName,rocketmq会使用ip@pid作为instanceName(pid代表jvm名字)
  2. 如果设置instanceName,rocketmq会使用ip@instanceName作为consumer的唯一标示,此时需要注意instanceName需要不同。

3 consumer设置上instanceName后,无法集群消费的问题调查

应用场景:

一台机器上的多个consumer jvm进程消费整个集群的消息

问题说明:

由于集群模式下我们希望consumer能够平均消费整个集群的消息,但是设置上instanceName后,发现每个consumer都消费整个集群的消息。

查看开发指南,该参数说明如下:

参数名
默认值
说明
instanceName DEFAULT

客户端实例名称,客户端创建的多个 Producer、
Consumer 实际是共用一个内部实例(这个实例包含
网络连接、线程资源等)

问题调查:

一开始怀疑是cosumer的MessageModel设置为广播消费的原因导致的:

广播消费:一条消息被多个 Consumer 消费, 即使返些 Consumer 属亍同一个 Consumer Group,消息也会被 Consumer 
Group 中的每个 Consumer 都消费一次

但是DefaultMQPushConsumer中MessageModel默认就是集群模式,故排除。

后来经过走查DefaultMQPushConsumer源码发现是由于rebalance策略问题导致的,consumer的rebalance会在首次启动时和之后每10秒一次,做rebalance操作,代码调用链如下:

首次启动rebalance代码如下:

DefaultMQPushConsumer.start -> DefaultMQPushConsumerImpl.start -> MQClientInstance.rebalanceImmediately -> RebalanceService.wakeup

之后每10秒一次的代码如下:

DefaultMQPushConsumer.start -> DefaultMQPushConsumerImpl.start -> MQClientInstance.start -> RebalanceService.start,启动rebalance线程

查看DefaultMQPushConsumer的rebalance策略,默认是AllocateMessageQueueAveragely,该策略是均衡消息队列到consumer,既然有rebalance,那为何没有做到平衡呢?

继续跟踪源码:

RebalanceService.start -> MQClientInstance.doRebalance -> DefaultMQPushConsumerImpl.doRebalance -> RebalanceImpl.doRebalance -> RebalanceImpl.rebalanceByTopic -> AllocateMessageQueueAveragely.allocate 此时才到真正的rebalance。

该方法参数说明如下:

/**
  * Allocating by consumer id
  *
  * @param consumerGroup current consumer group
  * @param currentCID    current consumer id
  * @param mqAll         message queue set in current topic
  * @param cidAll        consumer set in current consumer group
  * @return
  */
 publicList<MessageQueue> allocate(finalString consumerGroup,
                                    finalString currentCID,
                                    finalList<MessageQueue> mqAll,
                                    finalList<String> cidAll
 );

通过debug allocate方法发现,第二个参数需要MQClientInstance.clientId,这个由ClientConfig.buildMQClientId产生,产生规则是ip@instanceName

而instanceName即为consumer设置的,如果设置上这个参数,启动多个jvm进程,则currentCID都一样,而计算rebalance时如下代码导致每次将所有的queue分配到一个consumer上:

int index = cidAll.indexOf(currentCID);


假如不设置instanceName,ClientConfig.changeInstanceNameToPID会获取RuntimeMXBean.getName作为instanceName,而这个值对于多个jvm是不一样的,api解释如下:

RuntimeMXBean.getName:返回表示正在运行的 Java 虚拟机的名称。返回的名称字符串可以为任何任意字符串,Java 虚拟机实现可以选择在返回的名称字符串中嵌入特定于平台的有用信息。每个正在的运行的虚拟机可以具有不同的名称。

这样rebalance时就会平均分配到consumer上。

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多