基于MQ的分布式事务实现方案

1. 前言

在与不同业务系统之间进行交互时,怎么保证发送的消息对方一定能收到,可能有人说RocketMQ就能做到,如果贵公司用到的消息队列是kafka、rabbitmq、activemq怎么保证不丢?

这里分享一下基于消息的分布式事务解决方案,此种方案是最终一致性的解决方案,不挑MQ,但是前提MQ本身要支持接收到的消息不能丢失。



2. MQ的配置建议

如果要保证MQ接收到的消息不丢,就要配置相关的同步策略或者刷盘策略

主从同步策略

建议主从同步建议设置为主从同步策略为主从同步完再响应,这样单个节点如果挂了,另一个节点的数据还会存在

刷盘策略

消息中间件为了提高效率,默认接收到消息不会立即刷盘,如果要主从同步策略是主节点接收到消息以后立即响应,这会正好主节点宕机,就会导致消息丢失,所以要特别注意下,虽然可以设置成同步刷盘,但是效率就会降低,所以还是建议设置主从同步策略

3. 生产方设计

生产者的职责是必须要保证本地事务提交成功消息一定要发送出去,或者业务处理失败就不发送。

3.1 消息持久化

生产方方案如下,首先需要在业务库中创建一张表,字段大致为:

  • 消息id
  • 业务id
  • 业务方名称(如果一个库是多个子系统在用就需要这个字段)
  • topic(发送消息主题)
  • 分区
  • 消息体
  • 状态(0未发送 1已发送)
  • 是否失败重试( 0不重试 1重试)

与本地业务表使用同一个事务,提交则一起提交,回滚则一起回滚,因为使用的同一个事务所以是强一致的,再事务提交以后进行消息数据的发送,发送成功以后则更改消息状态为已发送,具体流程请查看



这里可能还有一点还要考虑,就是在图1的第2步、第3步、第4步会出现失败,具体描述如下:

  1. 如插入本地库成功,但是发送MQ失败
  2. 消息发送成功,但是响应失败,比如超时,其实这会MQ已经接收发送方的消息了,但是发送方不清楚
  3. 消息发送响应都成功了,但是更改本地表状态为已发送失败了。

持久化相关代码


图3 消息数据保存

图4,集成Spring的事务管理器,重写事务提交后发送消息





3.2 消息补偿设计

以上这三个问题就需要引入补偿任务来处理了,具体查看图5,补偿任务会根据发送状态查询对应的数据,然后进行发送,这里有一点特别注意,消费方要必须做幂等处理,因为图1的第3步、第4步消息都已经发送到MQ了,只是发送方不清楚,所以还会重复发送,另外99.9%的场景是能立即发送成功的,只有很小部分需要做补偿,


补偿代码


查询待发送的数据,这里为1分钟之前的,定时任务用的是elastic-job,用其他定时任务也可以

至此整个发送方设计就完成了,下面看看部分

4. 消费方设计

消费方相对比较简单,主要有两点要求

  1. 保证消息不会重复消费
  2. 记录消息便于消息对账,对账主要是极端情况下,那些消息没收到,便于重新投递

以下是消费表的设计

  • 消息id
  • 业务id
  • Topic
  • 分区
  • 偏移量
  • 消息体
  • 状态( 0未消费成功 1消费成功 2消费失败 )
  • 异常信息(消费失败会记录异常信息)
  • 业务方名称(如果一个库是多个子系统在用就需要这个字段)

此表也要与业务表处于同一个事务,如果不是一个事务,会出现业务表操作成功、消息表插入失败,如果出现消息重复发送就会出现重复消费的问题,具体查看图6


消费方代码

这里是kafka的消费代码,通过动态代理,封装KafkaListener类,在处理前进行消息重复判断,在处理后进行消费表的插入,这里需要特别注意一点,业务处理不能把异常自己吃掉,否则上层捕获不到,会认为业务处理成功,从而插入脏数据



5. 历史数据清理

通过前面介绍,我们创建了2张表,分别为消息发送表、消息消费表,这两张表要特别注意下,如果业务量比较大,数据量会快速增长,所以需要删除已经处理成功的数据,通过配置两个定时任务,保留一定的时间数据,其他时间的数据就可以删除了,代码如下




#Java##程序员#
全部评论
感谢大佬的分享,有用
点赞 回复 分享
发布于 2022-09-18 18:00 陕西

相关推荐

10-09 09:39
门头沟学院 C++
HHHHaos:这也太虚了,工资就一半是真的
点赞 评论 收藏
分享
点赞 收藏 评论
分享
牛客网
牛客企业服务