滴滴面试:Rocketmq消息0丢失(怎样是让人跪服的回答)

这是最近一位小伙伴分享的面试题,面试时自我感觉答得还不错。但我听了他的语音录播后,分析得出从一定程度上还有很多欠缺的点没有答出来,不成体系今天给大家好好解析下这道题。

消息的发送流程

Rocketmq和KafKa类似(实质上,最早的Rocketmq 就是KafKa 的Java版本),一条消息从生产到被消费,将会经历三个阶段:

  • 生产阶段,Producer 新建消息,而后经过网络将消息投递给 MQ Broker。这个发送可能会发生丢失,比如网络延迟不可达等。
  • 存储阶段,消息将会存储在 Broker 端磁盘中,Broker 根据刷盘策略持久化到硬盘中,刚收到Producer的消息在内存中了,但是如果Broker 异常宕机了,导致消息丢失。
  • 消费阶段, Consumer 将会从 Broker 拉取消息

以上任一阶段, 都可能会丢失消息,只要这三个阶段0丢失,就能够完全解决消息丢失的问题。

生产阶段如何实现0丢失方式

生产阶段有三种send方法:

  • 同步发送
  • 异步发送
  • 单向发送
/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer}
 */

// 同步发送
public SendResult send(Message msg) throws MQClientException, RemotingException,      MQBrokerException, InterruptedException {}

// 异步发送,sendCallback作为回调
public void send(Message msg,SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException {}

// 单向发送,不关心发送结果,最不靠谱
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException {}

produce要想发消息时保证消息不丢失,可以采用同步发送的方式去发消息,send消息方法只要不抛出异常,就代表发送成功。发送成功会有多个SendResult 状态,以下对每个状态进行说明:

  • SEND_OK:消息发送成功,Broker刷盘、主从同步成功
  • FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器同步刷盘(默认为异步刷盘)超时(默认超时时间5秒)
  • FlUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步复制(默认为异步复制)到Slave时超时(默认超时时间5秒)
  • SLAVE_NOT_AVAILABLE:Broker从节点不存在

注意:同步发送只要返回以上四种状态,就代表该消息在生产阶段消息正确的投递到了RocketMq,生产阶段没有丢失。

如果业务要求严格,可以使用同步发送,并且只取SEND_OK标识消息发送成功,其他返回值类型的数据,需要在业务维度的 终极0丢失保护措施:本地消息表+定时扫描重试。

是同步发送还是异步发送

AP 和 CP 是天然的矛盾, 到底是 CP 还是 AP的 需要权衡

  • 同步发送的方式 是 CP ,高可靠,但是性能低。
  • 异步发送的方式 是 AP ,低可靠,但是性能高。

为了高可靠(CP),可以采取同步发送的方式进行发送消息,发消息的时候会同步阻塞等待broker返回的结果,如果没成功,则不会收到SendResult,这种是最可靠的。其次是异步发送,再回调方法里可以得知是否发送成功。最后,单向发送(OneWay)是最不靠谱的一种发送方式,我们无法保证消息真正可达。当然,具体的如何选择高可用方案,还是要看业务。为了确保万无一失,可以选择异步发送 + 业务维度的 终极0丢失保护措施 , 实现消息的0丢失。

生产端的失败重试策略

发送消息如果失败或者超时了,则会自动重试。

同步发送默认是重试三次,可以根据api进行更改,比如改为10次:

producer.setRetryTimesWhenSendFailed(10);

其他模式是重试1次,具体请参见源码

/**
 * {@link org.apache.rocketmq.client.producer.DefaultMQProducer#sendDefaultImpl(Message, CommunicationMode, SendCallback, long)}
 */

// 自动重试次数,this.defaultMQProducer.getRetryTimesWhenSendFailed()默认为2,如果是同步发送,默认重试3次,否则重试1次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;

int times = 0;
for (; times < timesTotal; times++) {
      // 选择发送的消息queue
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        try {
            // 真正的发送逻辑,sendKernelImpl。
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            switch (communicationMode) {
                case ASYNC:
                    return null;
                case ONEWAY:
                    return null;
                case SYNC:
                    // 如果发送失败了,则continue,意味着还会再次进入for,继续重试发送
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                            continue;
                        }
                    }
                    // 发送成功的话,将发送结果返回给调用者
                    return sendResult;
                default:
                    break;
            }
        } catch (RemotingException e) {
            continue;
        } catch (...) {
            continue;
        }
    }
}

上面的核心逻辑中,调用sendKernelImpl真正的去发送消息通过核心的发送逻辑,可以看出如下:

  • 同步发送场景的重试次数是1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() =3,其他方式默认1次。
  • this.defaultMQProducer.getRetryTimesWhenSendFailed()默认是2,我们可以手动设置producer.setRetryTimesWhenSendFailed(10);
  • 如果是同步发送sync,且发送失败了,则continue,意味着还会再次进入for,继续重试发送

同步模式下,可以设置严格的消息重试机制,比如设置 RetryTimes为一个较大的值如10。当出现网络的瞬时抖动时,消息发送可能会失败,retries 较大,能够自动重试消息发送,避免消息丢失。

Broker端保证消息不丢失的方法:

要 Broker 在正常运行,就不会出现丢失消息的问题。但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。如果确保万无一失,实现Broker端保证消息不丢失,有两板斧:

  • Broker端第一板斧:设置严格的副本同步机制
  • Broker端第二板斧:设置严格的消息刷盘机制

第一板斧:设置严格的副本同步机制

RocketMQ 通过多副本机制来解决的高可用,核心思想也挺简单的:如果数据保存在一台机器上你觉得可靠性不够,那么我就把相同的数据保存到多台机器上,某台机器宕机了可以由其它机器提供相同的服务和数据。

首先,Broker需要集群部署,通过主从模式包括 topic 数据的高可用。

为了消息0丢失,可以配置设置严格的副本同步机制,等Master 把消息同步给 Slave后,才去通知Producer说消息ok。

设置严格的副本同步机制 , RocketMQ 修改broker刷盘配置如下:所以我们还可以配置不仅是等Master刷完盘就通知Producer,而是等Master和Slave都刷完盘后才去通知Producer说消息ok了。

第二板斧:设置严格的消息刷盘机制

RocketMQ持久化消息分为两种:同步刷盘和异步刷盘。RocketMQ和kafka一样的,刷盘的方式有同步刷盘和异步刷盘两种。

  • 同步刷盘指的是:生产者消息发过来时,只有持久化到磁盘,RocketMQ、kafka的存储端Broker才返回一个成功的ACK响应,这就是同步刷盘。它保证消息不丢失,但是影响了性能。
  • 异步刷盘指的是:消息写入PageCache缓存,就返回一个成功的ACK响应,不管消息有没有落盘,就返回一个成功的ACK响应。这样提高了MQ的性能,但是如果这时候机器断电了,就会丢失消息。

同步刷盘和异步刷盘的区别如下:

  • 同步刷盘:当数据写到内存中之后立刻刷盘(同步),在保证刷盘成功的前提下响应client。
  • 异步刷盘:数据写入内存后,直接响应client。异步将内存中的数据持久化到磁盘上。

同步刷盘和异步输盘的优劣:

  • 同步刷盘保证了数据的可靠性,保证数据不会丢失。
  • 同步刷盘效率较低,因为client获取响应需要等待刷盘时间,为了提升效率,通常采用批量输盘的方式,每次刷盘将会flush内存中的所有数据。(若底层的存储为mmap,则每次刷盘将刷新所有的dirty页)
  • 异步刷盘不能保证数据的可靠性.
  • 异步刷盘可以提高系统的吞吐量.
  • 常见的异步刷盘方式有两种,分别是定时刷盘和触发式刷盘。定时刷盘可设置为如每1s刷新一次内存.触发刷盘为当内存中数据到达一定的值,会触发异步刷盘程序进行刷盘。

Broker端第二板斧:设置严格的消息刷盘机制,设置为Kafka同步刷盘。RocketMQ默认情况是异步刷盘,Broker收到消息后会先存到cache里,然后通知Producer说消息我收到且存储成功。Broker起个线程异步的去持久化到磁盘中,但是Broker还没持久化到磁盘就宕机的话,消息就丢失了。

Consumer保证消息不丢失的方法:

如果要保证 Consumer(消费者)0 丢失, Consumer 端的策略是啥呢?

普通的情况下,rocketMq拉取消息后,执行业务逻辑。

一旦Consumer执行成功,将会返回一个ACK响应给 Broker,这时MQ就会修改offset,将该消息标记为已消费,不再往其他消费者推送消息。如果出现消费超时(默认15分钟)、拉取消息后消费者服务宕机等消费失败的情况,此时的Broker由于没有等到消费者返回的ACK,会向同一个消费者组中的其他消费者间隔性的重发消息,直到消息返回成功(默认是重复发送16次,若16次还是没有消费成功,那么该消息会转移到死信队列,人工处理或是单独写服务处理这些死信消息)但是 消费者,也有两种消费模式:

  • 同步消费,消费线程完成业务操作
  • 异步消息 ,独立业务线程池 完成业务操作

一般大家在rocketMq 在并发消费模式下,这个模式,默认有20个消费线程:

如何保证客户端的高可用,两种场景:

  • 同步消费场景,业务代码手动发送CONSUME_SUCCESS ,保证 消息者的0丢失
  • 异步消费场景,需要通过业务维度的 终极0丢失保护措施:本地消息表+定时扫描 ,保证 消息者的0丢失

同步消费发送CONSUME_SUCCESS

同步消费指的是拉取消息的线程,先把消息拉取到本地,然后进行业务逻辑,业务逻辑完成后手动进行ack确认,这时候才会真正的代表消费完成。举个例子

consumer.registerMessageListener(new MessageListenerConcurrently() {
     @Override
     public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
         for (MessageExt msg : msgs) {
             String str = new String(msg.getBody());
             
             // 消费者 线程 同步进行  业务处理
             System.out.println(str);
         }
         // ack,只有等上面一系列逻辑都处理完后,
         // 发 CONSUME_SUCCESS才会通知broker说消息消费完成,
         // 如果上面发生异常没有走到这步ack,则消息还是未消费状态。
         return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
     }
 });

异步消费场景,如何保证0丢失:本地消息表+定时扫描

1、设计一个本地消息表,可以存储在DB里,或者其它存储引擎里,用户保存消息的消费状态

2、Producer 发送消息之前,首先保证消息的发生状态,并且初始化为待发送;

3、如果消费者(如库存服务)完成的消费,则通过RPC,调用Producer 去更新一下消息状态;

4、Producer 利用定时任务扫描 过期的消息(比如10分钟到期),再次进行发送。

在这里想说的是:本地消息表+定时扫描 的架构方案 ,是业务层通过额外的机制来保证消息数据发送的完整性,是一种很重的方案。这个方案的两个特点:

  • CP 不是 AP,性能低
  • 需要 做好幂等性设计

如何降低业务维度的 终极0丢失保护措施带来的性能耗损?

可以减少本地消息表的规模,对于正常投递的消息不做跟踪,只把生产端发送失败的消息、消费端消费失败的消息记录到数据库,并启动一个定时任务,扫描发送失败的消息,重新发送直到超过阈值(如10次),超过之后,发送邮件或短信通知人工介入处理。

我是@后端进阶 博主,专注计算机后端技术及经验分享,可提供简历修改、面试辅导、技术答疑等服务,关注我不迷路。

全部评论

相关推荐

挣K存W养DOG:我记得好多人说这个公司就是白嫖方案的,现在有大体方案要让你给他展示实现细节了,也是无敌了
点赞 评论 收藏
分享
03-15 12:48
门头沟学院 Java
牛牛要早起:这个一般就跟你说有高薪,然后叫你买车,之后血亏
点赞 评论 收藏
分享
其实本来打算等lastday的时候再写的,但是现在提笔写下这篇总结完全是出于自己的想法,今天上午自己被学校发的签到吵醒时才突然想明白了很多事情,遂决定写下本文进行总结,虽然现在顶多算2.5个月,但也大差不差喵。回看这段时间的日常实习,我的关键词是:遗憾,焦虑。当然也有快乐的时候,不过大部分时间都是前面这两种情绪主导。为了避免后人再次踩坑,我将在本文详细解释我遇到的困难&nbsp;+&nbsp;产生的原因&nbsp;+&nbsp;应对的措施。同时总结新人实习时除了业务本身,还有如何处理生活与工作上的平衡,调控自身的情绪,让自己恢复到最好的工作状态。本文不会教你实习怎么去做产出,因为有产出的前提是你的心态足够健康,且在工作之余还有时间去...
wuwuwuoow:你的经历跟挺像,但我实力远没你强,现在只能干外包。但解决焦虑这块我应该比你更有经验,因为我曾经也非常迷茫和焦虑: 1.规律作息。无论节假日,都必须在同一时间点睡觉,同一时间点起床。放假睡的多,工作睡的少,这就是典型的作息不规律。将直接干扰前额叶皮层功能,导致情绪波动(易怒、焦虑)。无论上班还是周末,我都是 11:30 睡,7 点起床。7.5h 睡眠,完全足够了。 2.运动。缓解压力,强身健体,提高免疫力。不要觉得每天没有时间锻炼,都是懒惰的借口。 3.冥想。长期练习会增厚前额叶皮层(理性决策区),缩小杏仁核体积(减少情绪过敏反应,核心),增强情绪调控能力。 方法很简单,任何时候都能做。就是闭上眼睛,只专注自己的呼吸,不去想其他任何事情。你可以尝试一下,你会发现非常难只专注呼吸,会有大量的想法涌现出来(什么走马灯),不要去压抑它们,而是放平心态,把注意力继续放在呼吸上面。 而且最重要的是,冥想让你学会“活在当下”。因为处于冥想的你,除了专注呼吸你还能做什么呢?你什么都做不了。生活也是这样,我们无法改变过去,无法预知未来会发生什么,我们能做的只有手头的事情,除此之外什么都别想,因为你无法去改变它们。 4.工作与生活分离。工作不是生活的全部,生活可不是只有工作。像我放假的时候,从不带电脑回去。放假该玩就玩吧。 上面要是都能做到,其实完全解决不了你工作上的问题,完不成的需求还是完不成,面试该挂还是得挂。不过呢,当你再次迷茫,再次焦虑的时候,你会发现,诶,还好,没这么难受。比如面试挂了,可能以前的你会感觉非常难受。但如果你做到以上 4 点,你还是会难受的,但其实又没这么难受,可能你会这样想:既然挂了我还能怎么样?这公司不要我,有的是公司要我!
投递腾讯等公司10个岗位 >
点赞 评论 收藏
分享
评论
5
30
分享

创作者周榜

更多
牛客网
牛客企业服务