RocketMQ Rebalance负载均衡触发机制浅析

想必你已经了解RocketMQ的设计哲学是,同一时刻同一个Queue只能被一个Consumer持有,
但同一个Consumer可以同时消费多个队列,为了订阅、消费模型的高效,Rocket总是希望将Queue分配的足够均匀,
日常使用时,Consumer的上下线,Queue的动态扩缩容,都可能会破坏分配均衡性,
故而Rocket提供了一套完整的Rebalance机制针对上述状况。

触发条件

总结下来Rebalance一共三个触发条件,两个主动,一个被动。满足任意一个都会触发

 1.Consumer启动之时执行start方法主动执行负载均衡逻辑;
 2.定时任务触发;
 3.Broker下发通知告知Client需要进行负载均衡;

 今天重新翻阅代码的时候发现
 很巧合三个触发条件或多或少跟DefaultMQPushConsumerImpl.start()都有关系;

DefaultMQPushConsumerImpl.start()

DefaultMQPushConsumerImpl创建实例时,会初始化rebalanceImpl成员变量 private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this); 此时此刻这个rebalanceImpl对象没有一点作用,因为ta的关键成员属性尚且为null,下文中的start肩负起了赋值重任。
下面是摘录start()主要代码:

public synchronized void start() throws MQClientException { switch (this.serviceState) { case CREATE_JUST: /* 检查配置 */ this.checkConfig(); /* 构建 Topic 订阅信息——SubscriptionData,并添加至 RebalanceImpl 的订阅信息中 */ this.copySubscription(); /* 初始化 MQClientInstance */ this.mQClientFactory = MQClientManager.getInstance()
                .getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook); /**
            * 丰富 rebalanceImpl 对象属性,注意到了吗之前初始化的对象充血了
            * 之前产生的 rebalanceImpl 对象直到此刻才算真正意义上的初始化完成
            * rebalanceImpl就是负载均衡的相关实现
            */ this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory); /**
             * 向 MQClientInstance 注册消费者,并启动 MQClientInstance
             * 一个 JVM 中的所有消费者、生产者持有同一个 MQClientInstance,MQClientInstance 只会启动一次
             */ boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); break; case ...; default: break;  
    } /* Consumer启动成功,立即向所有Broker发送心跳 */ this.mQClientFactory.sendHeartbeatToAllBrokerWithLock(); /*
     * 注意到了嘛,Consumer上线会立即触发一次负载均衡 
     * 但是这里并不是调用一下负载均衡的实现那么简单,这里其实是唤醒了相关服务线程
     * 下文笔者会着重介绍
     */ this.mQClientFactory.rebalanceImmediately();
}

RebalanceImpl

见名知意,Consumer负载均衡相关的操作全部都委托给RebalanceImpl对象。 每一个Consumer的对象都持有一个RebalanceImpl实例,每个RebalanceImpl实例也只服务于一个Consumer。
二者是一个相互持有,循环引用的关系。
我们来看一下这个对象的关键成员属性:

RebalancePushImpl extends RebalanceImpl { protected final ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = new ConcurrentHashMap<>(64); /* ConcurrentMap<topic, Set<MessageQueue>>, Topic与分给自己的MessageQueue信息 */ protected final ConcurrentMap<String, Set<MessageQueue>> topicSubscribeInfoTable = new ConcurrentHashMap<>(); /* ConcurrentMap<topic,  SubscriptionData>, Topic与订阅数据 */ protected final ConcurrentMap<String, SubscriptionData> subscriptionInner = new ConcurrentHashMap<>(); /* 负载算法的具体实现,究竟如何分配就是由这个总指挥决定的 */ protected AllocateMessageQueueStrategy allocateMessageQueueStrategy; /* Consumer实例 */ private final DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;
}

其中有一个ProcessQueue对象尤为瞩目,因为我没做任何注释,如果你有阅读Rocket源码应该知道他是 Consumer消费Message过程中极其重要的一环,举足轻重,你可以认为ta是Client端的消息承载者。 因为跟负载时机关系不大所以此处不再赘述。
负责进行均衡负载的就是doRebalance(),实际上真正执行负载逻辑的是rebalanceByTopic();

RebalanceByTopic()

rebalanceByTopic()是负载均衡的最终落脚点,即系统中所有需要负载的调用最后都会走到这里来。
集群消费模式下的实现 摘录关键代码:

private void rebalanceByTopic(String topic, boolean isOrder) { /* 获取到该Topic下的所有队列 */ Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); /* 获取该Topic下ConsumerGroup此消费组所有的消费者Id */ List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (mqSet != null && cidAll != null) {
        List<MessageQueue> mqAll = new ArrayList<>(mqSet); /* 这两个排序极其关键 */ Collections.sort(mqAll);
        Collections.sort(cidAll); /* 负载均衡算法 */ AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
        List<MessageQueue> allocateResult; /* 调用具体的算法实现进行队列分配 */ allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(),
            mqAll,
            cidAll
        );
    }
}

看似轻描淡写实则举足轻重的两句代码:
Collections.sort(mqAll);
Collections.sort(cidAll);
这两句代码意义在于该ConsumeGroup下的所有Consumer得到的队列顺序、消费者Id顺序都是一致的。在分配视图一致性得到保证的前提下,分配算法又是相同的,这样一来尽管各个Consumer在负载均衡的时候不进行任何信息交换,但是却可以互不干扰有条不紊的将队列均衡的分配完毕。
如何分配的具体细节在allocateMessageQueueStrategy中,RocketMQ也默认支持多种分配算法,比较简单,笔者不想赘述。
负载均衡的实现讲完了,那谁会调用ta,如何调用ta,在什么时机下会调用ta,一个个问题萦绕心头,分析上述问题之前我们总是绕不开RebalanceService对象。

RebalanceService

RocketMQ中有一类对象地位超然,特立独行,一个对象主宰一个领域,ta们往往只受操作系统的约束。 似乎操作系统也格外偏爱他们,因为会给他们分配时间片,直接调度他们的运行(其实原因很简单,后面会有答案)。 ta就是传说中的ServiceThread,江湖中人往往称呼ta们为服务线程。

public abstract class ServiceThread implements Runnable { protected boolean isDaemon = false; /* 这能不被钟爱吗,直接持有一个独立线程 */ private Thread thread; /* 执行start的时候申请一个线程 */ public void start() { /* 只允许申请一次 */ if (!started.compareAndSet(false, true)) { return;
        }
        stopped = false; this.thread = new Thread(this, getServiceName()); /* 设置为非守护线程 */ this.thread.setDaemon(isDaemon); this.thread.start();
    }
}

ServiceThread家族兴旺,除了专门负责负载均衡的RebalanceService,还有一众兄弟姐妹:
FlushRealTimeService:异步刷盘服务线程
CommitRealTimeService:异步刷盘服务线程
GroupCommitService: 同步刷盘服务线程
......
每一个都是Rocket能够稳定运行的背后功臣,上述三个其实就是刷盘相关的服务线程。 (涉及的面太广,如果有人想看,我尝试分析一下)

有了上面的铺垫,RebalanceService应该很好理解了,单独持有一个线程进行负载均衡,当然ta也不是无休止的一直进行负载处理。

public class RebalanceService extends ServiceThread { /* 负载均衡时间间隔,默认20s,支持配置 */ private final static long waitInterval =Long.parseLong(
        System.getProperty("rocketmq.client.rebalance.waitInterval", "20000")
    ); private final MQClientInstance mqClientFactory; public void run() { /* 只要该线程未终止就会一直执行 */ while (!this.isStopped()) { /* 喜欢才会放肆,但爱是克制,休息20s */ this.waitForRunning(waitInterval); /* 执行负载均衡 */ this.mqClientFactory.doRebalance();
        }
    }
}

定时任务

理解了上面的RebalanceService,应该就理解了定时触发的逻辑,只需要定时唤醒服务线程即可
每个Java服务单点只会启动一个RebalanceService服务实例,同时也只会启动一个mqClientFactory实例  
单点内所有的Consumer实例都会共用该实例对象。
每次定时触发mqClientFactory.doRebalance()都会对该JVM下持有的所有Consumer进行负载均衡
/**
 * RebalanceService 线程默认每隔20s调用该方法
 * ⚠️:每个 Java 服务单点只会启动一个 MQClientInstance 实例,单点内所有的 Consumer 实例都会持有该实例对象
 * @see #registerConsumer Consumer 对象会将自己注册进 MQClientInstance
 * @see #consumerTable Consumer对象注册表
 *
 * ⚠️:一个 Java 服务单点只有一个 RebalanceService 服务线程
 * ⚠️:但每个 Consumer 实例都持有一个 RebalanceImpl 对象
 */ public void doRebalance() { for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {
        MQConsumerInner impl = entry.getValue(); if (impl != null) { try {
                impl.doRebalance();
            } catch (Throwable e) {
                log.error("doRebalance exception", e);
            }
        }
    }
}

主动触发

写这篇记录之时我还在思考,为什么刚刚那个定时任务不是交给JDK中的ScheduledExecutorService
事实上,RocketMQ中的很多定时任务也都是这么做的。

可是直到刚刚我才明白,因为RebalanceService支持主动唤醒,提前执行任务。

Consumer上线时候触发主动负载均衡就是因为唤醒了RebalanceService线程,
start()最后会调用rebalanceImmediately() public void rebalanceImmediately() { this.rebalanceService.wakeup();
}

Broker通知

Broker下发通知指挥Consumer需要负载均衡则明显复杂很多,但万变不离其宗,无非是多几次Rpc调用而已,无非是网络传输了一遭而已。

每当DefaultMQPushConsumerImpl实例,调用start之后,总是会向Broker发送一个心跳
调用栈如下: DefaultMQPushConsumerImpl.start() -> MQClientInstance.sendHeartbeatToAllBrokerWithLock() -> MQClientInstance.sendHeartbeatToAllBroker() -> MQClientAPIImpl.sendHearbeat()

Consumer启动之后会立即发出一个心跳包告知Broker。

public int sendHearbeat(
        String addr, HeartbeatData heartbeatData, long timeoutMillis
    ) throws RemotingException, MQBrokerException, InterruptedException { /* 又是一次 Rpc 远程调用 */ RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
        request.setLanguage(clientConfig.getLanguage());
        request.setBody(heartbeatData.encode());
        RemotingCommand response = this.remotingClient.invokeSync(
            addr, request, timeoutMillis
        );
    }

根据RequestCode.HEART_BEAT得知此次Rpc的处理器为ClientManageProcessor ClientManageProcessor.heartBeat() -> ConsumerManager.registerConsumer()

仅摘录关键代码:

public boolean registerConsumer(String group, ClientChannelInfo clientChannelInfo,
    ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere,
    Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable
) { /* 有新的Consumer上线则有新的SocketChannel建立 */ boolean r1 = consumerGroupInfo.updateChannel(
       clientChannelInfo, consumeType, messageModel, consumeFromWhere
   ); /* 判断订阅信息是否发生变化 */ boolean r2 = consumerGroupInfo.updateSubscription(subList); if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { /* 触发ConsumerGroupEvent.CHANGE事件 */ this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
       }
   } 
}

调用进行到DefaultConsumerIdsChangeListener.handle()
可以看到如果是CHANGE事件则调用Broker2Client.notifyConsumerIdsChanged()

public void notifyConsumerIdsChanged(Channel channel, String consumerGroup) { /* 构造Rpc请求头 */ NotifyConsumerIdsChangedRequestHeader requestHeader = new NotifyConsumerIdsChangedRequestHeader();
    requestHeader.setConsumerGroup(consumerGroup); /* 构造Rpc请求对象 */ RemotingCommand request = RemotingCommand.createRequestCommand(
        RequestCode.NOTIFY_CONSUMER_IDS_CHANGED, requestHeader
    ); /* 又是一次RPC */ this.brokerController.getRemotingServer().invokeOneway(channel, request, 10);
}

这个Rpc请求最终会交给ClientRemotingProcessor.notifyConsumerIdsChanged()处理

public RemotingCommand notifyConsumerIdsChanged(ChannelHandlerContext ctx,
    RemotingCommand request) {
    NotifyConsumerIdsChangedRequestHeader requestHeader =(NotifyConsumerIdsChangedRequestHeader) request
        .decodeCommandCustomHeader(NotifyConsumerIdsChangedRequestHeader.class); /* 调用负载均衡逻辑 */ this.mqClientFactory.rebalanceImmediately();
}
#java#
全部评论

相关推荐

09-27 00:29
东北大学 Java
伟大的麻辣烫:查看图片
阿里巴巴稳定性 75人发布 投递阿里巴巴等公司10个岗位
点赞 评论 收藏
分享
10-11 17:45
门头沟学院 Java
走吗:别怕 我以前也是这么认为 虽然一面就挂 但是颇有收获!
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务