RocketMQ Rebalance负载均衡触发机制浅析
想必你已经了解RocketMQ的设计哲学是,同一时刻同一个Queue只能被一个Consumer持有,
但同一个Consumer可以同时消费多个队列,为了订阅、消费模型的高效,Rocket总是希望将Queue分配的足够均匀,
日常使用时,Consumer的上下线,Queue的动态扩缩容,都可能会破坏分配均衡性,
故而Rocket提供了一套完整的Rebalance机制针对上述状况。
触发条件
总结下来Rebalance一共三个触发条件,两个主动,一个被动。满足任意一个都会触发 1.Consumer启动之时执行start方法主动执行负载均衡逻辑; 2.定时任务触发; 3.Broker下发通知告知Client需要进行负载均衡; 今天重新翻阅代码的时候发现 很巧合三个触发条件或多或少跟DefaultMQPushConsumerImpl.start()都有关系;
DefaultMQPushConsumerImpl.start()
DefaultMQPushConsumerImpl创建实例时,会初始化rebalanceImpl成员变量 private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); 此时此刻这个rebalanceImpl对象没有一点作用,因为ta的关键成员属性尚且为null,下文中的start肩负起了赋值重任。
下面是摘录start()主要代码:
public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: /* 检查配置 */ this.checkConfig(); /* 构建 Topic 订阅信息——SubscriptionData,并添加至 RebalanceImpl 的订阅信息中 */ this.copySubscription(); /* 初始化 MQClientInstance */ this.mQClientFactory = MQClientManager.getInstance() .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); /** * 丰富 rebalanceImpl 对象属性,注意到了吗之前初始化的对象充血了 * 之前产生的 rebalanceImpl 对象直到此刻才算真正意义上的初始化完成 * rebalanceImpl就是负载均衡的相关实现 */ this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); /** * 向 MQClientInstance 注册消费者,并启动 MQClientInstance * 一个 JVM 中的所有消费者、生产者持有同一个 MQClientInstance,MQClientInstance 只会启动一次 */ boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); break; case ...; default: break; } /* Consumer启动成功,立即向所有Broker发送心跳 */ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); /* * 注意到了嘛,Consumer上线会立即触发一次负载均衡 * 但是这里并不是调用一下负载均衡的实现那么简单,这里其实是唤醒了相关服务线程 * 下文笔者会着重介绍 */ this.mQClientFactory.rebalanceImmediately(); }
RebalanceImpl
见名知意,Consumer负载均衡相关的操作全部都委托给RebalanceImpl对象。 每一个Consumer的对象都持有一个RebalanceImpl实例,每个RebalanceImpl实例也只服务于一个Consumer。
二者是一个相互持有,循环引用的关系。
我们来看一下这个对象的关键成员属性:
RebalancePushImpl extends RebalanceImpl { protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64); /* ConcurrentMap<topic, Set<MessageQueue>>, Topic与分给自己的MessageQueue信息 */ protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<>(); /* ConcurrentMap<topic, SubscriptionData>, Topic与订阅数据 */ protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>(); /* 负载算法的具体实现,究竟如何分配就是由这个总指挥决定的 */ protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; /* Consumer实例 */ private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl; }
其中有一个ProcessQueue对象尤为瞩目,因为我没做任何注释,如果你有阅读Rocket源码应该知道他是 Consumer消费Message过程中极其重要的一环,举足轻重,你可以认为ta是Client端的消息承载者。 因为跟负载时机关系不大所以此处不再赘述。
负责进行均衡负载的就是doRebalance(),实际上真正执行负载逻辑的是rebalanceByTopic();
RebalanceByTopic()
rebalanceByTopic()是负载均衡的最终落脚点,即系统中所有需要负载的调用最后都会走到这里来。
集群消费模式下的实现 摘录关键代码:
private void rebalanceByTopic(String topic, boolean isOrder) { /* 获取到该Topic下的所有队列 */ Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); /* 获取该Topic下ConsumerGroup此消费组所有的消费者Id */ List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<>(mqSet); /* 这两个排序极其关键 */ Collections.sort(mqAll); Collections.sort(cidAll); /* 负载均衡算法 */ AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult; /* 调用具体的算法实现进行队列分配 */ allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll ); } }
看似轻描淡写实则举足轻重的两句代码:
Collections.sort(mqAll);
Collections.sort(cidAll);
这两句代码意义在于该ConsumeGroup下的所有Consumer得到的队列顺序、消费者Id顺序都是一致的。在分配视图一致性得到保证的前提下,分配算法又是相同的,这样一来尽管各个Consumer在负载均衡的时候不进行任何信息交换,但是却可以互不干扰有条不紊的将队列均衡的分配完毕。
如何分配的具体细节在allocateMessageQueueStrategy中,RocketMQ也默认支持多种分配算法,比较简单,笔者不想赘述。
负载均衡的实现讲完了,那谁会调用ta,如何调用ta,在什么时机下会调用ta,一个个问题萦绕心头,分析上述问题之前我们总是绕不开RebalanceService对象。
RebalanceService
RocketMQ中有一类对象地位超然,特立独行,一个对象主宰一个领域,ta们往往只受操作系统的约束。 似乎操作系统也格外偏爱他们,因为会给他们分配时间片,直接调度他们的运行(其实原因很简单,后面会有答案)。 ta就是传说中的ServiceThread,江湖中人往往称呼ta们为服务线程。
public abstract class ServiceThread implements Runnable { protected boolean isDaemon = false; /* 这能不被钟爱吗,直接持有一个独立线程 */ private Thread thread; /* 执行start的时候申请一个线程 */ public void start() { /* 只允许申请一次 */ if (!started.compareAndSet(false, true)) { return; } stopped = false; this.thread = new Thread(this, getServiceName()); /* 设置为非守护线程 */ this.thread.setDaemon(isDaemon); this.thread.start(); } }
ServiceThread家族兴旺,除了专门负责负载均衡的RebalanceService,还有一众兄弟姐妹:
FlushRealTimeService:异步刷盘服务线程
CommitRealTimeService:异步刷盘服务线程
GroupCommitService: 同步刷盘服务线程
......
每一个都是Rocket能够稳定运行的背后功臣,上述三个其实就是刷盘相关的服务线程。 (涉及的面太广,如果有人想看,我尝试分析一下)
有了上面的铺垫,RebalanceService应该很好理解了,单独持有一个线程进行负载均衡,当然ta也不是无休止的一直进行负载处理。
public class RebalanceService extends ServiceThread { /* 负载均衡时间间隔,默认20s,支持配置 */ private final static long waitInterval =Long.parseLong( System.getProperty("rocketmq.client.rebalance.waitInterval", "20000") ); private final MQClientInstance mqClientFactory; public void run() { /* 只要该线程未终止就会一直执行 */ while (!this.isStopped()) { /* 喜欢才会放肆,但爱是克制,休息20s */ this.waitForRunning(waitInterval); /* 执行负载均衡 */ this.mqClientFactory.doRebalance(); } } }
定时任务
理解了上面的RebalanceService,应该就理解了定时触发的逻辑,只需要定时唤醒服务线程即可 每个Java服务单点只会启动一个RebalanceService服务实例,同时也只会启动一个mqClientFactory实例 单点内所有的Consumer实例都会共用该实例对象。 每次定时触发mqClientFactory.doRebalance()都会对该JVM下持有的所有Consumer进行负载均衡
/** * RebalanceService 线程默认每隔20s调用该方法 * ⚠️:每个 Java 服务单点只会启动一个 MQClientInstance 实例,单点内所有的 Consumer 实例都会持有该实例对象 * @see #registerConsumer Consumer 对象会将自己注册进 MQClientInstance * @see #consumerTable Consumer对象注册表 * * ⚠️:一个 Java 服务单点只有一个 RebalanceService 服务线程 * ⚠️:但每个 Consumer 实例都持有一个 RebalanceImpl 对象 */ public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) { MQConsumerInner impl = entry.getValue(); if (impl != null) { try { impl.doRebalance(); } catch (Throwable e) { log.error("doRebalance exception", e); } } } }
主动触发
写这篇记录之时我还在思考,为什么刚刚那个定时任务不是交给JDK中的ScheduledExecutorService 事实上,RocketMQ中的很多定时任务也都是这么做的。 可是直到刚刚我才明白,因为RebalanceService支持主动唤醒,提前执行任务。 Consumer上线时候触发主动负载均衡就是因为唤醒了RebalanceService线程, start()最后会调用rebalanceImmediately() public void rebalanceImmediately() { this.rebalanceService.wakeup(); }
Broker通知
Broker下发通知指挥Consumer需要负载均衡则明显复杂很多,但万变不离其宗,无非是多几次Rpc调用而已,无非是网络传输了一遭而已。
每当DefaultMQPushConsumerImpl实例,调用start之后,总是会向Broker发送一个心跳 调用栈如下: DefaultMQPushConsumerImpl.start() -> MQClientInstance.sendHeartbeatToAllBrokerWithLock() -> MQClientInstance.sendHeartbeatToAllBroker() -> MQClientAPIImpl.sendHearbeat()
Consumer启动之后会立即发出一个心跳包告知Broker。
public int sendHearbeat( String addr, HeartbeatData heartbeatData, long timeoutMillis ) throws RemotingException, MQBrokerException, InterruptedException { /* 又是一次 Rpc 远程调用 */ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null); request.setLanguage(clientConfig.getLanguage()); request.setBody(heartbeatData.encode()); RemotingCommand response = this.remotingClient.invokeSync( addr, request, timeoutMillis ); }
根据RequestCode.HEART_BEAT得知此次Rpc的处理器为ClientManageProcessor ClientManageProcessor.heartBeat() -> ConsumerManager.registerConsumer()
仅摘录关键代码:
public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable ) { /* 有新的Consumer上线则有新的SocketChannel建立 */ boolean r1 = consumerGroupInfo.updateChannel( clientChannelInfo, consumeType, messageModel, consumeFromWhere ); /* 判断订阅信息是否发生变化 */ boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { /* 触发ConsumerGroupEvent.CHANGE事件 */ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); } } }
调用进行到DefaultConsumerIdsChangeListener.handle()
可以看到如果是CHANGE事件则调用Broker2Client.notifyConsumerIdsChanged()
public void notifyConsumerIdsChanged(Channel channel, String consumerGroup) { /* 构造Rpc请求头 */ NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader(); requestHeader.setConsumerGroup(consumerGroup); /* 构造Rpc请求对象 */ RemotingCommand request = RemotingCommand.createRequestCommand( RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader ); /* 又是一次RPC */ this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); }
这个Rpc请求最终会交给ClientRemotingProcessor.notifyConsumerIdsChanged()处理
public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx, RemotingCommand request) { NotifyConsumerIdsChangedRequestHeader requestHeader =(NotifyConsumerIdsChangedRequestHeader) request .decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); /* 调用负载均衡逻辑 */ this.mqClientFactory.rebalanceImmediately(); }#java#