7. 分布式消息队列-RabbitMQ

7.1-消息队列技术选型

应用场景

  • 服务解耦:服务拆分之后如何通信?强依赖还是弱依赖?

  • 强依赖:服务之间互相调用

  • 弱依赖:就使用 mq 来通知调用

  • 削峰填谷:流量很大的场景下,如何对应用服务进行抗压;削峰填谷的意思是:将高峰和低峰时期的速率做一个均衡,如下游服务处理缓慢,就可以将请求缓存起来,慢速的去消费

  • 异步化缓冲:有些业务逻辑可以异步化操作,只需要做到 最终一致性 即可

应用思考点

  • 生产端可靠性投递:消息发出去了,一定要与数据库保证一个原子性

  • 消费端冥等:生产端要做到可靠性投递,可能会投递多次

以上两点,笔者在这里目前无法理解是个什么意思

另外是 mq 本身的问题:

  • 高可用

  • 低延迟:在高压情况下,消息低延迟

  • 消息可靠性:投递到了消息队列中,如何保证消息不丢失;如磁盘发生损坏,有没有其他的解决方案

  • 堆积能力:在高峰情况下,消息能堆积多少

  • 可扩展性:是否支持天然的无感知扩容

在选型的时候,需要针对您当前的业务需求来考虑,以上的各种点,是否能满足您的业务需求

业界主流的分布式消息队列

  • Active:阿帕奇旗下的

  • Rabbit:

  • Rocket:阿里巴巴捐给阿帕奇的,支持分布式事务

  • Kafka:搞吞吐量、海量数据的存储

如何进行技术选型

  1. 各个 MQ 的性能、优缺点、相应的业务场景

    比如:Rabbit:镜像队列满足高可用,数据不丢失,横向扩展不太好

  2. 集群架构模式:分布式、可扩展、搞可用、可维护性

  3. 综合成本问题,集群规模,人员成本

  4. 未来的方向、规划、思考

7.2-RocketMQ 集群架构与原理解析

RocketMQ 是一款分布式、队列模型的消息中间件,由阿里巴巴自主研发的一款适用于高并发、高可靠性、海量数据场景的消息中间件。早期开源 2.x 版本名为MetaMQ ,迭代 3.x 版本,更名为 RocketMQ,16 年开始贡献到 Apache,经过 1 年多的孵化,最终成为 Apache 顶级的开源项目,更新非常频繁,社区活跃度也非常高;RocketMQ 参考借鉴了优秀的开源消息中间件 Apache Kafka(这也是我们后面课程中重点要讲解的内容哦),其消息的路由、集群划分都借鉴了Kafka 优秀的设计思路,并结合自身的 双十一 场景进行了合理的扩展和 API 丰富。

RocketMQ 中文文档

优秀的能力与支持:

  • 支持集群模型、负载均衡、水平扩展能力

  • 亿级别的消息堆积能力

  • 采用零拷贝的原理、顺序写盘、随机读(索引文件)

  • 丰富的 API 使用

  • 代码优秀,底层通信框架采用 Netty NIO 框架

  • NameServer 代替 Zookeeper

  • 强调集群无单点,可扩展,任意一点高可用,水平可扩展

  • 消息失败重试机制、消息可查询

  • 开源社区活跃度、是否足够成熟(经过双十一考验)

专业术语:

  • Producer:消息生产者,负责产生消息,一般由业务系统负责产生消息。

  • Consumer:消息消费者,负责消费消息,一般是后台系统负责异步消费。

  • Push Consumer:Consumer 的一种,需要向 Consumer 对象注册监听。

  • Pull Consumer:Consumer 的一种,需要主动请求 Broker 拉取消息。

  • Producer Group:生产者集合,一般用于发送一类消息。

  • Consumer Group:消费者集合,一般用于接受一类消息进行消费。

  • Broker : MQ 消息服务(中转角色,用于消息存储与生产消费转发)。

7.3-RabbitMQ 集群架构模型与原理

RabbitMQ 四种集群架构

  • 主备模式

  • 远程模式:早期版本提供的多活的一种架构,主要做异地多活、数据备份、容灾的功能;配置非常复杂

  • 镜像模式:业界使用广泛的一种模式

  • 多活模式:与远程模式差不多

主备模式

一般在并发和数据量不高的情况下,这种模型非常的好用且简单,主备模式也称为 Warren 模式。

  • 主:提供所有的服务
  • 备:不提供服务,只用于备用;

当主节点产生故障时,备用节点顶上;原来的故障节点启动后加入进来,不会改变角色,会成为备份节点;

备用节点可以是多个节点。

它其实就是 兔子窝(warren)模式,与 activemq 利用 zookeeper 做协调实现主备节点切换一样。

从上图可以看到,利用了 HaProxy 来做负载均衡的高可用(它支持 4 层 TCP 和 7 层 HTTP 转发),下面是它的核心配置(后续会详细讲解)

listen rabbitmq_cluster
bind 0.0.0.0:5672
# 配置为 tcp 模式
mode tcp
# 简单的轮询配置
balance roundrobin
# 主节点
# inter:每 n 毫秒 对 mq 集群做健康检查
# rise:n 次正确证明服务器可用
# faill:n 次失败证明服务器不可用,并配置主备机制
server bhz76 192.168.11.76:5672 check inter 5000 rise 2 fall 2
# 备用节点
server bhz77 192.168.11.77:5672 backup check inter 5000 rise 2 fall 2

远程模式

远距离通信和复制,可以实现 双活 的一种模式,简称 Shovel 模式。这种方式现在已经不怎么使用了

所谓 Shovel 就是把消息进行不同数据中心的复制工作,可以跨地域的让两个 mq 集群互联。

Shove 架构模型

有消息来的时候,可以通过路由到远端集群中去处理

在使用了 shovel 插件之后,模型变成了近端同步确认,远端异步确认的方式,大大提高了订单确认速度,并且还能保住可靠性。

当第一个集群处理不过来或挂掉的时候,可以转移到第二个集群。

这个配置太复杂,所以就被淘汰了,这里笔者也不记录了,基本上不会使用。

镜像模式

业界主流的使用模式。经典队列镜像模式(Mirroring),但是在 3.8.0 开始,比经典镜像更好的一种方式 仲裁队列模式(quorum-queues) 替代了镜像模式

镜像模式能够保证 100% 数据不丢失,在实际工作中用的最多,并且实现集群非常的简单,一般互联网大厂都会构建这种镜像集群模式。

能够保证:

  • 高可靠:通过复制 来备份数据,就算其中一个挂掉,还有备份的顶上

  • 数据同步:

  • 3 节点:奇数节点,防止脑裂

集群架构图

三个节点保存的数据都是一致的,应用通过 VIP 路由到 master 上。

它有一个天然的缺陷,不能支持横向扩展,因为每个节点的数据都是一样的,这就会存在单节点数据容量的问题

多活模式

这种模式也是实现 异地数据复制 的主流模式,因为 Shovel 模式配置比较复杂,所以一般来说实现异地集群都是使用这种双活或则多活模型来实现

这种模型需要依赖 RabbitMQ 的 federation 插件,可以实现持续的可靠的 AMQP 数据通信,多活模式实际配置与应用非常简单

RabbitMQ 部署架构采用双中心模式(多中心),在两套(或多套)数据中心中各部署一套 RabbitMQ 集群,各中心的 RabbitMQ 服务除了需要为业务提供正常的消息服务外,中心之间还需要实现部分队列消息共享

Federation 插件是一个不需要构建 Cluster,而在 Brokers 之间传输消息的高性能插件,Federation 插件可以在 Brokers 或则 Cluster 之间传输消息,连接的双方可以使用不同的 users 和 virtual hosts,双方也可以使用版本不同的 RabbitMQ 和 Erlang。Federation 插件使用 AMQP 协议通讯,可以接受不连续的传输。

Federation Exchanges,可以看成 Downstream 从 Upstream 主动拉取消息,但并不是拉取所有消息,必须是在 Downstream 已经明确定义 Bindings 关系的 Exchange,也就是有实际的物理 Queue 来接收消息,才会从 Upstream 拉取消息到 Downstream。使用 AMQP 协议实施代理间通信,Downstream 会将绑定关系组合在一起,绑定/解除绑定命令将发送到 Upstream 交换机。因此,Federation Exchange 只接收具有订阅的消息;

Federation 和之前的远程模式功能差不多,但是远程模式只能在配置文件中配置,无法动态修改增加,而 Federation 插件可以动态控制哪些 Exchange 进行相互通信

本处贴出官方图来说明:

下游需要和上游 Exchange 进行绑定,才能进行互相通信。

7.4-Kafka 集群架构模型与原理

Kafka 是 Linkedin 开源的分布式消息系统,目前归属于 Apache 顶级项目。

Kafka 主要特点是基于 Pull(拉) 的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。

0.8 版本开始支持复制,不支持事物,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网的数据收集业务。想要实现消息重复、丢失这些缺点也是可以支持的,但是就丢失了 kafka 高吞吐量的优点

Kafka 有哪些特点

  • 分布式:消息支持分区

  • 跨平台:支持很多语言的客户端

  • 实时性:就算你消息堆积了上亿,但是你的存储是足够的,就不会影响 kafka 的性能

  • 伸缩性:支持水平扩展

Kafka 高性能的原因是什么?

  • 顺序写,Page Cache:空中接力,高效读写

顺序写入磁盘,消费者顺序读,并不删除已经消费过的数据(避免随机写操作)。顺序写之后,到顺序读时,如何知道自己消费到了哪一条消息?这就需要有一个 offset 来记录顺序读到了哪一个位置,通过这个 offset 来定位。不删除已消费过的数据,就是为了保证 offset 不变化(假设你删除了某一条数据,那么这条消息后面的所有消息位置都往前变化了;虽然这个还不太能想明白,大体就是这么个意思)

Page Cache:可以在廉价的机器上实现 100K 的消息吞吐量,使用操作系统的 Page Cache 顺序的读消息,来达到这个目的

  • 高性能、高吞吐
  • 后台异步、主动 Flush

一些碎片整理功能、大部分操作也是利用 Page Cache,假设你的生产和消费速率一样,基本上就在 Page Cache 级别完成了,甚至都不需要经过物理磁盘

操作系统级别的 Page Cache,即使服务器重启也还在

  • 预读策略,IO 调度

Page Cache 与 zero copy 原理解析

Page Cache 是操作系统主要实现的一种 磁盘缓存 的机制,目的是减少对磁盘 IO 的操作,简单说:就是将磁盘中的数据缓存到内存中,把对磁盘的访问变成对内存的访问

读取文件:

  1. 操作系统读取文件,会先从 Page Cache 中 去检查,如果存在就返回,不存在则去读取

  2. 读取是先放在 Page Cache 中,再返回到用户缓冲区

上图一共经过了 4 此 copy:

  1. 磁盘文件 copy 到 内核缓冲区

  2. 内核缓冲区 copy 到用户缓冲区

  3. 用户缓冲区 copy 到内核空间缓冲区

  4. 内核空间缓冲区 copy 到 socket 缓冲区

而使用 零拷贝 就能节省 3 次 copy:

与应用程序无关联,只是将文件 copy 到内核缓冲区,网卡直接从内核缓冲区获取数据发出

Kafak 集群模型

7.5-RabbitMQ 基础

初始 RabbitMQ

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ 使用 Erlang 语言编写,并且 RabbitMQ 是基于 AMQP 协议的。

哪些大厂在用 RabbitMQ,为什么?滴滴、美团、头条、去哪儿、艺龙... 都在使用,原因是:

  • 开源、性能优秀、稳定性保障

  • 提供可靠性消息投递模式(confirm)、返回模式(return)

  • 与 SpringAMQP 完美的整合、API 丰富

  • 集群模式丰富、表达式配置、HA 模式、镜像队列模型

  • 保证数据不丢失的前提做到高可靠性、可用性

高性能的原因

  • Erlang 语言最初在于交换机领域的架构模式,这样使得 RabbitMQ 在 Broker 之间进行数据交互的性能是非常优秀的

  • Erlang 的优点:Erlang 有着和原生 Socket 一样的延迟

什么是 AMQP 高级消息队列协议?

AMQP 0-9-1 模型解释,官方文档

AMQP 全称:Advanced Message Queuing Protocol,翻译为:高级消息队列协议。

**AMQP 定义:**是具有现代特征的 二进制协议。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计

下图是 AMQP 协议模型

发布消息:

  1. 找到 servier

  2. 找到 Virtual host

  3. 找到 Exchange,将消息投递进去

消费消息:监听 Message Queue

那么在这里可以看到,缺少了一些细节的描述,比如消息是投递到 Exchange 中的,但是监听了 MessageQueue,这是怎么回事?下面就来学习下 AMQP 的核心概念

AMQP 的核心概念

  • Server:又称 Broker,接受客户端的链接,实现 AMQP 实体服务

  • Connection:连接,应用程序与 Broker 的网络连接

  • Channel:网络信道,几乎所有的操作都在 Channel 中进行,Channel 是进行消息读写的通道。客户端可建立多个Channel,每个 Chann 代表一个会话任务。

  • Message:消息,服务器和应用程序之间传送的数据,由 Properties 和 Body 组成。Properties 可以对消息进行修饰,比如消息的优先级、延迟等高级特性;Body 则就是消息体内容。

  • Virtual host:虚拟地址,用于进行 逻辑隔离,最上层的消息路由。一个 Virtual Host 里面可以有若干个 Exchange 和 Queue,同一个 Virtual host 不能有相同名称的 Exchange 或 Queue。

  • Exchange:交换机,接收消息,根据路由键转发消息到绑定的队列。

  • Binding:Exchange 和 Queue 之间的虚拟连接, binding 中可以包含 routing key。

  • Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息。

  • Queue:也称为 Message Queue,消息队列,保存消息并将他们转发给消费者

RabbitMQ 的整体架构

  • 蓝色的:生产者和消费者

  • 绿色的:Exchange,将消息过滤、路由到指定队列

  • 红色的:Queue

从图上可以看出,生产者只需要知道 Exchange,消费者只需要知道 Queue 即可,他们之间没有耦合。

RabbitMQ 消息是如何流转的?

一条消息,投递到 Exchange,通过路由 key,路由到指定 Queue 上,消费者只对 Queue 监听即可。

7.6-RabbitMQ 核心概念

本文翻译自官方文档

Exchange 接收生产者的消息,并根据路由键转发消息到所绑定的队列。这是 RabbitMQ(AMQP)非常重要的概念,上图黄框中部分就是 Exchange 和 Queue 的关系,可以看见他们的一个 路由 的关系

交换机属性

  • Name:交换机名称

  • Type:交换机类型 direct、topic、fanout、headers

  • Durability:是否持久化,true 为持久化。如果是 false,那么在 RabbitMQ 服务重启之后,交换机就会没有了。

  • Auto Delete:当最后一个绑定到 Exchange 上的队列删除后,自动删除该 Exchange。即当 Exchange 没有队列的时候自动删除。实际生产环境中是不会设置自动删除的。

  • Internal:当前 Exchange 是否用于 RabbitMQ 内部使用,默认为 false。

  • Arguments:扩展参数,用于扩展 AMQP 协议自制定化使用。

Exchange 和 queue 是多对多的关系,即一个 Exchange 可以发送消息到多个队列,一个队列也可以接受多个 exchange 发送的消息,推荐在实际生产环境中,一个 Exchange 对应多个队列,尽量不使用多个 Exchange 对一个队列。

在实际的生产时,不推荐在代码中声明 exchange、queue 以及 exchange 和 queue的绑定关系,最好是在控制台就做好这些声明,在代码中直接使用。如果非要在代码中声明,一定要在生产者端和消费者端同时声明,以免启动时报错(找不到 exchange 和 queue 而报错)。

一个 exchange 和一个 queue 可以存在多个绑定关系,比如 exchangeA 和 queueA 可以同时有两个绑定关系:log.info.*log.warn.*

交换机类型

默认交换

它是一个「空字符串」的直接交换机,它有一个特殊的属性,使它对简单的应用程序非常有用:创建的每个队列都会自动绑定到一个与队列名称相同(完全匹配,不支持通配符)的路由键

比如:当你声明了一个名为 「abc」的队列时,将会使用 「abc」作为路由键(也称绑定建),将「abc」队列与「空字符串」的交换机进行绑定,因此使用路由键「abc」发送消息到默认交换机时,消息将被路由到「abc」队列

直接交换 Direct

直接交换器根据消息路由键将消息传递到队列。直接交换是消息 单播路由 的理想选择(尽管它们也可用于多播路由)。下面是它的工作原理:

  • 一个队列使用路由密钥 K 绑定到交换机
  • 当具有路由密钥 R 的新消息到达直接交换时,如果 K = R,则交换将其路由到队列

直接交换通常用于以循环方式在多个工作人员(同一应用程序的实例)之间分配任务。这样做时,重要的是要了解,在 AMQP 0-9-1 中,消息在 使用者之间而不是队列之间 进行负载平衡。

扇出交换 Fanout

扇出交换将消息路由到绑定到它的所有队列,并且 忽略路由键。如果 N 个队列绑定到一个扇出交换器,当新消息发布到该交换器时,该消息的副本将传递到所有 N 个队列。扇出交换是 消息广播路由 的理想选择。

因为扇出交换向绑定到它的每个队列传递消息的副本,所以它的用例非常相似:

  • 大型多人在线 (MMO) 游戏可将其用于排行榜更新或其他全球事件

  • 体育新闻网站可以使用扇出交换将比分更新近乎实时地分发给移动客户端

  • 分布式系统可以广播各种状态和配置更新

  • 群聊可以使用扇出交换在参与者之间分发消息(虽然 AMQP 没有内置的在线概念,所以 XMPP 可能是更好的选择)

主题交换 Topic

根据 消息路由键(RouteKey) 和绑定关系的 RouteKey 进行模糊匹配(支持通配符模式) 将消息路由到一个或多个队列。主题交换类型通常用于实现各种 发布/订阅模式 变体。主题交换通常用于消息的 多播路由

主题交换有非常广泛的用例。每当一个问题涉及 多个消费者/应用程序有选择地选择他们想要接收的消息类型时,就应该考虑使用主题交换。

示例用途:

  • 分发与特定地理位置相关的数据,例如销售点

  • 由多个工作人员完成的后台任务处理,每个工作人员都能够处理特定的任务集

  • 股票价格更新(以及其他类型财务数据的更新)

  • 涉及分类或标记的新闻更新(例如,仅针对特定运动或团队)

  • 云中不同类型服务的编排

  • 分布式架构/特定于操作系统的软件构建或打包,其中每个构建器只能处理一种架构或操作系统

标头交换 headers

标头交换设计用于在 多个属性上进行路由,这些属性比路由键更容易表示为消息标头。头交换 忽略路由键属性。相反,用于路由的属性取自 headers 属性。如果头的值等于绑定时指定的值,则认为消息匹配。

可以使用多个标头进行匹配,将队列绑定到标头交换。在这种情况下,代理需要来自应用程序开发人员的另一条信息,即它是否应该考虑与任何标头匹配的消息,还是所有标头?这就是 x-match 绑定参数的用途。当 x-match 参数设置为any 时,只有一个匹配的标头值就足够了。或者,将 x-match 设置为 all 要求所有值都必须匹配。

因为它们基于标头值进行路由,所以它们可以用作路由键不必是字符串的直接交换;例如,它可以是整数或散列(字典)。

请注意,以字符串 x- 开头的标头 不会用于评估匹配项。

队列

队列与交换共享一些属性,但也有一些额外的属性:

  • Name:队列名称
  • Durable :是否持久化,true 持久化,否则 MQ 重启后,该队列消失
  • Exclusive :仅由一个链接使用,该链接关闭时该队列被删除
  • Auto-delete:自动删除,当最后一个消费者取消订阅时,被删除
  • Arguments:扩展参数,可选的,由插件和特定的功能使用,比如消息的 TTL、队列长度限制等

在使用队列之前,必须先声明它。如果队列不存在,则声明队列将导致它被创建。如果队列已经存在并且其属性与声明中的属性相同,则声明将不起作用。当现有队列属性与声明中的属性不同时,将引发代码为 406 ( PRECONDITION_FAILED )的通道级异常。

队列名称

应用程序可以选择队列名称或要求 MQ 为它们生成一个名称。队列名称最多可以包含 255 个字节的 UTF-8 字符。

AMQP 0-9-1 可以代表应用程序生成唯一的队列名称。要使用此功能,请将空字符串作为队列名称参数传递。生成的名称将与队列声明响应一起返回给客户端。

amq 开头的队列名称。保留给 MQ 内部使用。尝试使用违反此规则的名称声明队列将导致通道级异常,回复代码为 403 ( ACCESS_REFUSED )。

队列持久性

在 AMQP 0-9-1 中,队列可以声明为持久的或临时的。持久队列的元数据 存储在磁盘上,而临时队列的元数据尽可能 存储在内存中

绑定

绑定是交换使用(除其他外)将消息路由到队列的规则。为了指示交换机 E 将消息路由到队列 Q,Q 必须绑定到 E。绑定可能具有一些交换类型使用的可选 路由键 属性。路由键的目的是选择某些发布到交换机的消息,以便路由到绑定队列。换句话说,路由键 就像一个过滤器。

打个比方:

  • 队列就像你在纽约市的目的地

  • 交换就像肯尼迪机场

  • 绑定是从肯尼迪机场到目的地的路线。可以有零种或多种方式到达它

拥有这一间接层可以实现使用直接发布到队列不可能或很难实现的路由方案,并且还消除了应用程序开发人员必须做的一定数量的重复工作。

如果一条消息不能被路由到任何队列(例如,因为它被发布到的交换没有绑定),它要么被 丢弃,要么返回给发布者,这取决于发布者设置的消息属性。

消费者

消息仅存储在队列中是没有用的,所以需要消费者来消费队列中的消息。在 AMQP 0-9-1 模型中,应用程序有两种方法可以做到这一点:

  • 订阅 API:MQ 会将消息推送给订阅者,这是推荐的选项
  • 轮询 API:这种方式非常低效,在大多数情况下应该避免

使用订阅 API 时,应用程序必须表明有兴趣来自特定队列的消息,简单的说,订阅了一个队列。每个队列可能有多个消费者,或则一个独占消费者(在消费时,在队列中排除其他消费者)

每个消费者(订阅)都有一个称为 消费者标签 的标识符(一个字符串),它用于取消订阅消息

消息确认

消费者应用程序(即接收和处理消息的应用程序)可能偶尔无法处理单个消息或有时会崩溃。网络问题也有可能导致问题。这就提出了一个问题:**MQ 何时应该从队列中删除消息?**AMQP 0-9-1 规范让消费者对此进行控制,有两种确认模式:

  • 在 MQ 向应用程序发送消息之后(使用 basic.deliver 或 basic.get-ok 方法)。

  • 在应用程序发回确认之后(使用 basic.ack 方法)。

前者称为 自动确认模型,后者称为 显式确认模型。使用显式模型,应用程序选择何时发送确认。它可以是在接收到消息之后,或者在处理之前将它持久化到数据存储之后,或者在完全处理消息之后(例如,成功获取一个网页,将其处理并存储到某个持久性数据存储中)。

如果消费者在 没有发送确认的情况下死亡,则 MQ 将其重新交付给另一个消费者,或者,如果当时没有可用的消费者,则代理将等到至少有一个消费者注册到同一队列,然后再尝试重新交付。

拒绝消息

当消费者应用程序收到一条消息时,对该消息的处理可能会成功,也可能不会。应用程序可以 通过拒绝消息向 MQ 指示消息处理失败(或当时无法完成)。拒绝消息时,应用程序可以要求 MQ 丢弃或重新排队。当队列中只有一个消费者时,请确保您不会通过一遍又一遍地拒绝和重新排队来自同一消费者的消息来创建无限的消息传递循环。

拒绝确认

使用 basic.reject 方法拒绝消息。basic.reject 有一个限制:无法像使用确认那样拒绝多条消息。但是,如果您使用的是 RabbitMQ,则有一个解决方案。RabbitMQ 提供了一个 AMQP 0-9-1 扩展,称为拒绝确认或 nacks

消息预取

对于多个消费者 共享 一个队列的情况,能够指定每个消费者在 发送下一个确认之前可以一次发送多少消息 很有用。这可以用作简单的负载平衡技术,或者如果消息倾向于批量发布,则可以提高吞吐量。例如,如果生产应用程序由于其正在执行的工作的性质而每分钟发送一次消息。

请注意,RabbitMQ 仅支持通道级预取计数,不支持基于连接或大小的预取。

消息属性和有效负载

AMQP 0-9-1 模型中的消息具有属性。有些属性非常常见,以至于 AMQP 0-9-1 规范定义了它们,应用程序开发人员不必考虑确切的属性名称。一些例子是

  • 内容类型

  • 内容编码

  • 路由键

  • 交付模式(持续与否)

  • 消息优先级

  • 消息发布时间戳

  • 有效期

  • 发布者应用程序 ID

AMQP 使用某些属性,但大多数属性都可以由接收它们的应用程序解释。一些属性是可选的,称为 headers。它们类似于 HTTP 中的 X-Header。消息属性是在发布消息时设置的。

消息还有一个有效载荷(它们携带的数据),AMQP 代理将其视为一个不透明的字节数组。代理不会检查或修改有效负载。消息可能只包含属性而没有负载。通常使用 JSON、Thrift、Protocol Buffers 和 MessagePack 等序列化格式来序列化结构化数据,以便将其作为消息负载发布。协议对等方通常使用 内容类型内容编码 字段来传达此信息,但这只是按照惯例。

消息可以作为持久性发布,这使得 MQ 将它们持久化到磁盘。如果服务器重新启动,系统会确保接收到的持久消息不会丢失。简单地将消息发布到持久交换或者它被路由到的队列是持久的这一事实不会使消息持久:这完全取决于消息本身的持久模式。将消息发布为持久性会影响性能(就像数据存储一样,持久性以一定的性能成本为代价)。

7.7-高级特性 - 消息如何保障 100% 投递成功

什么是生产端的可靠性投递?

  • 保障消息的成功发出

  • 保障 MQ 节点的成功接收

  • 发送端收到 MQ 节点(Broker)确认应答

  • 完善的消息进行补偿机制

在极端情况下,比如:发送端在接收 MQ 节点的确认应答时,网络闪断,MQ 是收到了消息,但是发送端会认为没有收到

生产端可靠性投递

BAT/TMD 互联网大厂的解决方案:

  • 第一种:消息落库,对消息状态进行打标

发送到 MQ 前入数据库 ,并给一个状态如 发送中,收到确认应答后修改状态为 发送成功

对于那些长期在发送中的消息,进行一个轮询重试发送到 MQ

  • 第二种:消息的延迟投递,做二次确认,回调检查

方式一:消息落库,对消息状态进行打标

上图中没有加事物,你有可能会说业务(BIZ)数据入库成功了,消息(MSG)入库失败了,小规模你可以加事物,但是大规模系统,大流量系统,是不会加事物的

  1. 发送消息前,将 业务数据消息 存入数据库中:并给消息一个状态,如 发送中

  2. 将消息发送到 MQ

  3. MQ 收到消息后,回调发送端,告知 确认收到消息

  4. 发送端监听到 确认收到消息,改变消息的状态,如 发送完成

上面是正常的 4 个步骤,比如在第 3 步到第 4 步之间,MQ 确认收到消息,这个时候网络闪断,发送端没有收到这个确认事件,数据库中的这一条消息 永远都是发送中状态

所以就需要一个定时任务,去扫描这些长时间(比如 5 分钟内,设置一个状态变更时间,对这个变更时间进行对比,超过 5 分钟则表示该消息出问题了)还在发送中状态的任务,将他们进行重新发送。

为什么要有一个重试上限?这个就很简单了,有以下几个原因:

  1. 此时整个网络有问题,投递不出去

  2. 消息有问题,路由不出去

  3. 业务有问题等

所以需要设置一个上限。


方式二:消息的延迟投递,做二次确认,回调检查

方式一在高并发场景下是否合适?有两次数据持久化:

  1. 业务数据
  2. 消息数据

在高并发情况下,数据库压力过大。而方式二则能解决这种情况,不敢说 100%,至少 99.9% 能保证可靠性投递

  • 蓝色的 Upstream:上游服务

  • 红色的 Downstream:下游服务

  • 灰色的 Callback service:回调服务

下面来看看方式二的原理:

  1. 第一次消息发送:业务数据入库后,发送一个 业务消息 到 MQ
  2. 接着发送一个 延迟检查的消息 到 MQ(专门的队列,不是和业务消息一个队列)
  3. 下游服务监听业务消息队列,
  4. 下游服务消费一个业务消息后,发送一个 消息确认事件 到 MQ(同样是专用队列)
  5. 回调服务监听 消息确认事件 队列,并落库
  6. 回调服务监听 延迟检查的消息 队列

由于是延迟消息,正常情况下,接受到该消息的时候,业务消息已经发送完成,在数据库中检查该消息的状态

然后通过 RPC 回调将结果告知上游服务,就算是下游服务繁忙,回调服务将消息异常的结果返回给了上游服务

上游服务也可以通过重试机制,发送延迟检查的信息,再次检查消息状态。这里重试机制就和方式一的类似。

如上一来,虽然麻烦了很多,但是避开了方式一需要同时写 业务数据和业务消息 的耗时,在高并发下,性能提高了。

7.8-幂等性概念

幂等性是什么?

可以借鉴数据库的乐观锁机制:比如执行一条更新库存的 SQL 语句

update t_reps set count = count - 1, version = versuion + 1 where version = 1;

简单说:就像上面这一条语句一样无论执行多少次,结果都是一样的。因为 version 不是 1 的话,执行就不会生效

消费端 - 幂等性保障

在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

消费端实现了幂等性,就意味着消息永远不会消费多次,即使收到了多条一样的消息。

业界主流的幂等性操作:

  1. 唯一 ID + 指纹码机制,利用数据库主键去重

  2. 利用 Redis 原子性去实现

唯一 ID + 指纹码 机制

  • 唯一 ID:全局的唯一 ID
  • 指纹码:不一定是我们系统产生的,有可能是外部产生的

说实话这里视频中说得很模糊,明白不了这个到底是个什么东西,大概说的就是个什么业务规则产生的

利用数据库主键去重 SQL

select count(1) from t_order where id = 唯一 ID + 指纹码 查询出来,看有没有,有则失败,没有则写入。

好处:实现简单

坏处:高并发下有数据库写入的性能瓶颈;将 ID 进行分库分表存储,分摊单库压力

Redis 原子性实现

Redis 有一些操作是原子性的,比如自增、是否存在某个 key。可以将这个当成锁来使用。需要考虑的问题:

  • 是否要对数据进行落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性?
  1. 消费一条消息,先 set 订单 ID 在 redis 中
  2. 如果 set 返回 0,则表示是一条未处理的消息,处理后入库

这两个操作要保证是原子性的,别 redis 标记成功,后面处理失败了。

  • 如果不进行落库,都存储在缓存中,如何设置定时同步的策略?

这个机制是,数据存储在缓存中,但是使用要刷新到数据库中去的,如何设置同步策略。

对于这个问题,笔者没有明白冲突点在哪里?唯一能想到的时,依赖于数据库中这条数据状态的业务如何搞定这里延迟更改的问题。这些业务如何处理缓存中已经成功,数据库中不成功的问题。

对于第一种来说:有一种简单的方案是搞一个定时任务,比如定时检查过去 20 分钟的数据状态,去 redis 中对比,如果不一致,就删除 redis 中的问题,通知生产端重新发送消息或则做一些补偿机制。

7.9-RabbitMQ 整合 Spring Boot 2.x

基本知识

生产端核心配置

spring:
	rabbitmq:
  	# 消息确认
  	publisher-confirms: true
    # 消息回调返回
    publisher-returns: true
    template:
    	mandatory: true

消费端核心配置

spring:
	rabbitmq:
  	listener:
    	simple:
      	# 消息确认方式,这里选择手动确认
      	acknowledge-mode: MANUAL
        # 并发数量,默认 1
        concurrency: 1
        # 最大并发 5 
        max-concurrency: 5

消费端监听使用 @RabbitListener 注解,它是一个复合注解,注解使用以下三个注解进行配置

@QueueBinding
@Queue
@Exchange

比如下面这一个例子

@RabbitListener(
	bindings = @QueueBinding(
        // 声明绑定的队列和队列信息
    	value = @Queue(value = "queue-1",durable = "true"),
        // 交换机信息
        exchange = @Exchange(
            value =  "exchange-1",
            durable = "true", 
            type = "topic",
            ignoreDeclarationExceptions = "true",
            key = "springboot.*"
        )
    )
)
@RabbitHandler
public void onMessage(Message message, Channel channel) throws Exception
// 通过 channel 可以获取通道的一些信息

由于这个配置是写在代码里面的,属于硬编码,强烈建议使用配置文件的方式;

对于使用 注解来创建 队列和交换机信息还是 手动先创建配置 好队列和交换机的信息,一般来说是选择先手动配置好,而不是使用程序的注解来创建

生产端项目

项目基础环境

添加新的项目,由于这个老师和之前的老师不是一个,习惯也不一致,这里会新开项目,不会继承之前的项目。

项目仓库地址,只是一个简单的 boot web 项目

添加 RabbitMQ 的 boot 依赖包

 <!-- rabbitmq -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

编写 application.yml 配置文件

server:
  port: 9010
  tomcat:
    uri-encoding: utf-8
  max-http-header-size: 80KB

spring:
  http:
    encoding:
      charset: UTF-8
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    # 仅包含非空的属性值
    default-property-inclusion: non_null

  rabbitmq:
    # 集群的话,用英文逗号分隔
    addresses: 127.0.0.1:5672
    username: guest
    password: guest
    # 默认是 / ,可以按不同的项目来分
    virtual-host: /
    # 链接超时
    connection-timeout: 15s
    # 消息确认
    # publisher-confirms 已过期,源码中设置为 true 会设置为 correlated 类型
    # publisher-confirms: true
    publisher-confirm-type: correlated
    # 如果使用返回消息的话,需要配合 template.mandatory 一起使用
    # publisher-returns: true
    # template:
      # mandatory: true

生产端 RabbitMQ 代码

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.UUID;

/**
 * 消息发送
 */
@Component
public class RabbitSender {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * <pre>
     * 消息确认,程序将消息投递到 MQ 后,MQ 响应一个成功事件;
     * 用于确认 MQ 是否已经收到消息
     * </pre>
     */
    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        /**
         *
         * @param correlationData 作为一个唯一的消息标识
         * @param ack broker 是否落盘成功,true 成功
         * @param cause 对应的失败异常信息
         */
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("confirm:correlationData:" + correlationData + " ; ack=" + ack + " ; cause=" + cause);
        }
    };

    /**
     * 对外发送消息的方法
     *
     * @param message    具体的消息内容
     * @param properties 额外的附加属性
     */
    public void send(Object message, Map<String, Object> properties) {
        // 创建消息头
        MessageHeaders mhs = new MessageHeaders(properties);
        // 创建消息
        Message<Object> msg = MessageBuilder.createMessage(message, mhs);

        // 设置  生产者回调,注意这里:不是每次都需要设置
        // 里面源码逻辑是:回调对象为 null 或则 此次设置的回调对象必须是上一次的回调对象 才会成功,否则会抛出异常
        rabbitTemplate.setConfirmCallback(confirmCallback);

        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                // 暂时没有搞懂这个 processor 的作用
                System.err.println("---> post todo:message:" + message);
                return message;
            }
        };
        rabbitTemplate.convertAndSend(
                // 发往的 exchange
                "exchange-1",
                // routingKey
                "springboot.rabbitmq",
                // 消息
                msg,
                messagePostProcessor,
                // 消息唯一 ID: 指定业务的唯一 ID
                correlationData);
    }
}

spring.rabbitmq 中的配置很重要,RabbitTemplate 里面设置的是 org.springframework.amqp.rabbit.connection.ConnectionFactory ,而 publisher-confirm-typepublisher-returns 配置是 连接工厂里面的配置,如果连接工厂都不开启 confirm 的话,就算在程序里面设置了 setConfirmCallback 这个确认回调也不会生效,最后也不会回调的

消费端项目

项目基础环境

项目仓库地址,直接复制前面生产端的简单修改。

编写 application.yml 配置文件

server:
  port: 9011
  tomcat:
    uri-encoding: utf-8
  max-http-header-size: 80KB

spring:
  http:
    encoding:
      charset: UTF-8
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    # 仅包含非空的属性值
    default-property-inclusion: non_null

  rabbitmq:
    # 集群的话,用英文逗号分隔
    addresses: 127.0.0.1:5672
    username: guest
    password: guest
    # 默认是 / ,可以按不同的项目来分
    virtual-host: /
    # 链接超时
    connection-timeout: 15s

    listener:
      # 这里配置的是一个工厂属性,boot 封装的
      # 在 @RabbitListener 注解中,可以指定你自己的 containerFactory 工厂
      # 然后有一些属性可以覆盖默认工厂的属性,比如 concurrency
      # 有了这个认知之后,你就可以针对每个消费者队列配置不一样的消费者线程
      simple:
        # 消费者成功消费消息后,需要手工做 ack 操作。默认为 auto
        acknowledge-mode: manual
        # 消费者并发线程数量,
        concurrency: 5
        # 最大消费并发线程数量
        max-concurrency: 10
        # 消息预取数量
        prefetch: 1

消费端 RabbitMQ 代码

import com.rabbitmq.client.Channel;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.Map;
import java.util.UUID;

/**
 * 消息消费者
 */
@Component
public class RabbitReceive {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitHandler
    @RabbitListener(
            // 队列与 exchange 的绑定关系,与一些持久化配置等
            bindings = @QueueBinding(
                    value = @Queue(value = "queue-1", durable = "true"),
                    exchange = @Exchange(
                            name = "exchange-1",
                            durable = "true",
                            type = ExchangeTypes.TOPIC,
                            // 忽略声明异常
                            ignoreDeclarationExceptions = "true"),
                    // routingKey: 这里由于是 topic 模式,所以支持通配符
                    key = "springboot.*"
            )
    )
    // 如果说 上面的这些名称,写到配置文件中的话,这里如何赋值?直接使用 boot 提供的 ${xxx} 就可以了
    // @RabbitListener(bindings = @QueueBinding(@Queue(value = "${my.rabbit.queue-1}", durable = "true")))
    public void onMessage(Message message, Channel channel) throws IOException {
        // 1. 收到消息以后进行业务端消费处理
        System.err.println("--------------------------");
        System.err.println("消费消息:" + message.getPayload());

        // 2. 处理手动 ack 操作
        // 需要手工 ack, 单条消息确认
        // 消息到达 MQ 之后,会给每一条消息一个唯一的 deliveryTag
        long deliveryTag = (long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
        channel.basicAck(deliveryTag, false);

    }
}

消费端启动测试

在启动之前,关注下 [http://localhost:15672/](http://localhost:15672/#/) 中的信息

由于笔者这里使用的单节点 MQ,这里的 Nodes 就只有一个节点。

exchanges 中也都是 mq 自带的 交换器。我们要看的是待会启动消费者项目后,我们在代码中生命的交换机、队列这些是否会自动创建。

启动消费者项目,再查看

可以看到 exchange 和 queue 还有 routing key 都正常的绑定上了。

生产端/消费端联调测试

编写生产端的消息发送测试类

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.util.HashMap;
import java.util.concurrent.TimeUnit;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitSenderTest {
    @Autowired
    private RabbitSender rabbitSender;

    @Test
    public void testSender() {
        HashMap<String, Object> properties = new HashMap<>();
        properties.put("attr1", "12345");
        properties.put("attr2", "abcd1");
        rabbitSender.send("hello rabbitmq!", properties);

        // 休眠 15 秒,因为有一些事件回调,等待他们的回调
        try {
            TimeUnit.SECONDS.sleep(15);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

然后运行这个测试,观看控制台信息

---> post todo:message:(Body:'[B@13ed066e(byte[552])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=552, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])
confirm:correlationData:CorrelationData [id=e8c8f8d9-b38a-4451-930b-cf0c8bf9ea50] ; ack=true ; cause=null

消费者控制台

--------------------------
消费消息:hello rabbitmq!

关于那个 MessagePostProcessor 的作用,目前还不知道是什么,经过 debug 调试,只得到以下信息

MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                // 暂时没有搞懂这个 processor 的作用
                // 经过 convertAndSend 的源码调试
                // 在发送前会调用这个方法,但是这里 message 中的 headers 是空的,就算我们发送消息的时候传递了 headers,也不会在该对象中
                // 大概看了下源码:这里返回的 headers 是 new 了一个空的,原来我们自己发送的消息和 headers 被序列化成了消息传递出去
                System.err.println("---> post todo:message:" + message);
                return message;
            }
        };
全部评论
来给鹏哥点个赞
点赞 回复 分享
发布于 2022-02-15 05:00

相关推荐

牛客583549203号:腾讯还好,况且实习而已,实习生流动性很大,属于正常现象,记得和HR委婉解释
点赞 评论 收藏
分享
03-26 15:18
已编辑
华北水利水电大学 Java
点赞 评论 收藏
分享
点赞 评论 收藏
分享
评论
1
1
分享

创作者周榜

更多
牛客网
牛客企业服务