再谈Rocket MQ消费进度问题
- 消费进度上报至Broker并非完全实时(5000ms/次)
- Broker接受到消费进度并不会立即持久化
但随着对Rocket MQ研究的深入,发现有些情况并不能笼统的归结于上述三种原因,这也是本篇文章的来源。
private boolean updateProcessQueueTableInRebalance(String topic, Set<MessageQueue> mqSet, boolean isOrder) { boolean changed = false; /* 处理本次负载均衡之后,应该被移除的队列 */ Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); /** * mq,pq 是目前该 Consumer 正在消费的 * 如果负载均衡分配到的 Queue 集合中没有此 mq,则: ①、②、③ */ if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { /* ①: 将该 pq 设置为废弃状态 */ pq.setDropped(true); /* ②: 持久化待移除 MessageQueue 消费进度 */ if (this.removeUnnecessaryMessageQueue(mq, pq)) { /* ③: 移除 processQueueTable 中的 mq、pq 信息 */ it.remove(); changed = true; log.info("doRebalance, {}, remove unnecessary mq, {}", consumerGroup, mq); } } /* (System.currentTimeMillis() - this.lastPullTimestamp) > PULL_MAX_IDLE_TIME */ else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; log.error("[BUG]doRebalance, {}, remove unnecessary mq, {}, because pull is pause, so try to fixed it", consumerGroup, mq); } break; case CONSUME_ACTIVELY: default: break; } } } } ...... return changed; }
public void processConsumeResult( ConsumeConcurrentlyStatus status, ConsumeConcurrentlyContext context, ConsumeRequest consumeRequest ) { ...... /* 获取当前 ProcessQueue#msgTreeMap 中最小的消息偏移量,并上报此偏移量作为消费进度 */ long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); } }
消息消费伊始会启动一个本地消息过期清理定时任务。 消息在Consumer端如果保留的时间超过固定时长,会触发重新投递逻辑。初始延迟时间、执行间隔相同,默认15min。
/** * 定时执行cleanExpireMsg任务 */ public void start() { this.cleanExpireMsgExecutors.scheduleAtFixedRate( this::cleanExpireMsg, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES ); }
- 如果清理线程命中的超时消息恰恰在这之后消费完毕,此时Rocket MQ依然感知不到该消息已经被消费,依然会重新投递。
- 如果清理线程命中的超时消息重新投递之后且执行移除逻辑之前,消息恰好被消费线程消费,此时重投逻辑也已触发,覆水难收。
public void cleanExpiredMsg(DefaultMQPushConsumer pushConsumer) { if (pushConsumer.getDefaultMQPushConsumerImpl().isConsumeOrderly()) { return; } int loop = Math.min(msgTreeMap.size(), 16); for (int i = 0; i < loop; i++) { MessageExt msg = null; try { this.lockTreeMap.readLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && System.currentTimeMillis() - Long.parseLong( MessageAccessor.getConsumeStartTimeStamp( msgTreeMap.firstEntry().getValue() ) ) > pushConsumer.getConsumeTimeout() * 60 * 1000 ) { msg = msgTreeMap.firstEntry().getValue(); } else { break; } } finally { this.lockTreeMap.readLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } try { pushConsumer.sendMessageBack(msg, 3); /* 重发之后才移除,可能在这段时间这个消息已经消费了,有一个重复消费的场景 */ log.info("send expire msg back. topic={}, msgId={}, storeHost={}, queueId={}, queueOffset={}", msg.getTopic(), msg.getMsgId(), msg.getStoreHost(), msg.getQueueId(), msg.getQueueOffset()); try { this.lockTreeMap.writeLock().lockInterruptibly(); try { if (!msgTreeMap.isEmpty() && msg.getQueueOffset() == msgTreeMap.firstKey()) { try { /** * 确定要被移除的消息是不是当前第一个消息(比对偏移量),符合条件说明msg还没有被消费掉可以移除 * 从这里也能看出作者是知道第二种情形的 */ removeMessage(Collections.singletonList(msg)); } catch (Exception e) { log.error("send expired msg exception", e); } } } finally { this.lockTreeMap.writeLock().unlock(); } } catch (InterruptedException e) { log.error("getExpiredMsg exception", e); } } catch (Exception e) { log.error("send expired msg exception", e); } } }
异常退出,Rocket MQ重启之后会根据checkpoint文件中的时间点,确定可信的CommitLog文件,并从该文件开始构建ConsumeQueue、IndexFile文件,之一过程中完全有可能有的消息构建索引的动作执行了两次,自然会被消费两次,当然在业务看来这也会造成重复消费。
public void recoverAbnormally(long maxPhyOffsetOfConsumeQueue) { /* recover by the minimum time stamp */ boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover(); List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles(); if (!mappedFiles.isEmpty()) { /* Looking beginning to recover from which file */ int index = mappedFiles.size() - 1; MappedFile mappedFile = null; for (; index >= 0; index--) { mappedFile = mappedFiles.get(index); /* 根据checkpoint文件确定起始可信文件 */ if (this.isMappedFileMatchedRecover(mappedFile)) { log.info("recover from this mapped file " + mappedFile.getFileName()); break; } } if (index < 0) { index = 0; mappedFile = mappedFiles.get(index); } ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); long processOffset = mappedFile.getFileFromOffset(); long mappedFileOffset = 0; while (true) { DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover); int size = dispatchRequest.getMsgSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { mappedFileOffset += size; if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) { /* 触发消息索引文件构建逻辑 */ if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) { this.defaultMessageStore.doDispatch(dispatchRequest); } } else { this.defaultMessageStore.doDispatch(dispatchRequest); } } // Come the end of the file, switch to the next file // Since the return 0 representatives met last hole, this can not be included in truncate offset else if (size == 0) { index++; if (index >= mappedFiles.size()) { /* The current branch under normal circumstances should not happen */ log.info("recover physics file over, last mapped file " + mappedFile.getFileName()); break; } else { mappedFile = mappedFiles.get(index); byteBuffer = mappedFile.sliceByteBuffer(); processOffset = mappedFile.getFileFromOffset(); mappedFileOffset = 0; log.info("recover next physics file, " + mappedFile.getFileName()); } } } else { log.info("recover physics file end, " + mappedFile.getFileName() + " pos=" + byteBuffer.position()); break; } } ...... } ...... }
确如Rocket MQ官方定位"At least once"一样,作者完全没有纠结一些并发场景,和恢复之后的索引一致性问题。导致消息可能会被重复拉取、多次投递、以及多次构建索引......种种场景皆会导致业务上出现重复消费,因此消息的幂等保障必不可少。