为啥RocketMQ会发送重复的消息
前言
最近发现数据库里的数据经常会重复,就想定位一下这个问题。数据的insert操作只会在接收到RocketMQ消息时,处理消息的时候落库。这时就在想是不是RocketMQ消息重复发送了?拉取日志分析一下。
不难看出,同一个messageId竟然收到了五次,并且随着时间的推移还在继续发送。不禁让人思考,RocketMQ会在什么情况下,重复发送消息呢?
消息重复的场景
消息重复的场景如下:
1)发送时消息重复 当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
2)投递时消息重复 消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。为了保证消息至少被消费一次,消息队列RocketMQ版的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且Message ID也相同的消息。
3)负载均衡时消息重复(包括但不限于网络抖动、Broker重启以及消费者应用重启) 当消息队列RocketMQ版的Broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会收到重复消息。
那我这属于哪一类情况呢?首先在收到重复消息的时候,我们的服务是正常的,并且RocketMQ也是一直稳定的。所以排出了1、3点。在看2点,当客户端给服务端反馈应答的时候网络闪断,意思是客户端处理完业务没有反馈应答,而网络闪断只是其中原因之一。换个思路,如果客户端一直在处理业务,没有及时反馈应答,那RocketMQ为做啥?
RocketMQ在设置的超时时间后没有收到反馈应答,会认为是一次失败的消息消费。为了保证消息至少被消费一次,RocketMQ自然会再次发送Message ID相同的消息出来的。而我这的问题,也正是业务处理时间过长,导致的。平均处理时间在两分钟左右。而RocketMQ默认消息消费超时时间是15分钟,是由ConsumeTimeout参数决定的。从这段代码里可以看出来。
/**
* 设置每条消息消费的最大超时时间,超过这个时间,这条消息将会被视为消费失败,等下次重新投递再次消费. 每个业务需要设置一个合理的值. 单位(分钟)
*/
public static final String ConsumeTimeout = "consumeTimeout";
复制代码
而我们封装的RocketMQ将消息消费超时时间默认成了1分钟,导致了这个了问题
@Data
public static class Customize {
/**
* 消息监听类
*
* @see AbstractMessageListener
*/
private String listenerClassName = "tech.baoyun.starter.ons.comsume.DefaultMessageListener";
/**
* @see com.aliyun.openservices.ons.api.PropertyKeyConst#ConsumeTimeout
*/
private Integer consumeTimeout = 1;
/**
* @see com.aliyun.openservices.ons.api.PropertyKeyConst#MaxReconsumeTimes
*/
private Integer maxReconsumeTimes = 5;
/**
* @see com.aliyun.openservices.ons.api.PropertyKeyConst#ConsumeThreadNums
*/
private Integer consumeThreadNums = 10;
/**
* @see com.aliyun.openservices.ons.api.PropertyKeyConst#SuspendTimeMillis
*/
private Integer suspendTimeMillis = 60 * 1000;
}
作者:Fun_
链接:https://juejin.cn/post/7031719273773039647
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。