SpringCloud3(...+Seata+Redis高级+多级缓存+rabbitmq高级)
一、Seata分布式事务
1.什么是分布式事务?
在传统数据库事务中,事务需要满足ACID(原子性、一致性、隔离性、持久性)。传统数据库事务是单个服务或单数据源下产生的事务,而分布式事务是指跨服务或跨数据源的事务,比如常见的下单付款案例,包括下面几个行为:
- 创建新订单
- 扣减商品库存
- 从用户账户余额扣除金额
订单的创建、库存的扣减、账户扣款在各自服务和数据库内是一个本地事务,可以保证 ACID 原则。 但是当我们把三件事情看做一个"业务",要满足保证“业务”的原子性(同成功同失败),这就是分布式系统下的事务。此时的 ACID 就难以满足了,这就是分布式事务要解决的问题。
2.CAP定理
分布式系统有三个指标:Consistency(一致性)、Availability(可用性)、Partition tolerance (分区容错性),这三个指标不可能同时达到,这个结论就叫做 CAP 定理。
(1)Consistency一致性
用户访问分布式系统中的任意节点,得到的数据必须一致。
- 比如现在包含两个节点,其中的初始数据是一致的
- 当我们修改其中一个节点的数据时,两者的数据产生了差异
-
要想保住一致性,就必须实现 node01 到 node02 的数据同步
(2)Availability可用性
用户访问集群中的任意健康节点,必须能得到响应,而不是超时或拒绝。
- 如图,有三个节点的集群,访问任何一个都可以及时得到响应
-
当有部分节点因为网络故障或其它原因无法访问时,代表节点不可用
(3)Partition Tolerance分区容错
Partition(分区):因为网络故障或其它原因导致分布式系统中的部分节点与其它节点失去连接(没有宕机),形成独立分区。
Tolerance(容错):在集群出现分区时,整个系统也要持续对外提供服务。
- 在分布式系统中,系统间的网络不能 100% 保证健康,一定会有故障的时候,而服务有必须对外保证服务。因此 Partition Tolerance 不可避免。当节点接收到新的数据变更时,就会出现问题了。
- 如果此时要保证一致性,就必须等待网络恢复,完成数据同步后,整个集群才对外提供服务,但服务处于阻塞状态,不可用。
- 如果此时要保证可用性,就不能等待网络恢复,那 node01、node02 与 node03 之间就会出现数据不一致。
- 综上:在 P 一定会出现的情况下,A 和 C 之间只能实现一个。
3.BASE理论
BASE 理论是对 CAP 的一种解决思路,包含三个思想:
- Basically Available(基本可用):分布式系统在出现故障时,允许损失部分可用性,来保证核心可用。
- Soft State(软状态):在一定时间内,允许出现中间状态,比如临时的不一致状态。
- Eventually Consistent(最终一致性):虽然无法保证强一致性,但是在软状态结束后,最终达到数据一致。
- AP 模式:各子事务分别执行和提交,允许临时出现结果不一致,然后采用弥补措施恢复数据即可,实现数据最终一致。
- CP 模式:各个子事务执行后互相等待,同时提交,同时回滚,达成强一致。但事务等待过程中,处于弱可用状态。
解决分布式事务就要求各个子系统之间必须能感知彼此的事务状态,这样才能保证状态一致。因此需要一个事务协调者(TC)来协调每一个事务的参与者。
【tips】这里的子系统事务,称为分支事务;有关联的各个分支事务在一起称为全局事务,即整个分布式事务。
4.Seata简介
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。
(1)seata架构
Seata 事务管理中有三个重要的角色
- TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,协调全局事务提交或回滚。
- TM (Transaction Manager) - 事务管理器:定义全局事务的范围、开始全局事务、提交或回滚全局事务。
- RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与 TC 交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
Seata 基于上述架构提供了四种不同的分布式事务解决方案:
- XA 模式:强一致性分阶段事务模式,牺牲了一定的可用性,无业务侵入
- TCC 模式:最终一致的分阶段事务模式,有业务侵入
- AT 模式:最终一致的分阶段事务模式,无业务侵入,也是 Seata 的默认模式
- SAGA 模式:长事务模式,有业务侵入
- 【tips】以上无论哪种方案,都离不开 TC 的协调。
(2)部署TC服务
- 下载 seata-server 包:https://seata.io/zh-cn/blog/download.html
-
修改 conf 目录下的 registry.conf 文件
registry { # tc服务的注册中心类,这里选择nacos,也可以是eureka、zookeeper等 type = "nacos" nacos { # seata tc 服务注册到 nacos的服务名称,可以自定义 application = "seata-tc-server" serverAddr = "127.0.0.1:8848" group = "DEFAULT_GROUP" namespace = "" cluster = "SH" username = "nacos" password = "nacos" } } config { # 读取tc服务端的配置文件的方式,这里是从nacos配置中心读取,这样如果tc是集群,可以共享配置 type = "nacos" # 配置nacos地址等信息 nacos { serverAddr = "127.0.0.1:8848" namespace = "" group = "DEFAULT_GROUP" username = "nacos" password = "nacos" dataId = "seataServer.properties" } }
- 在nacos中添加seataServer.properties配置:为了让 TC 服务的集群可以共享配置,我们选择了 Nacos 作为统一配置中心。因此服务端配置文件 seataServer.properties 文件需要在Nacos 中配好。在 Nacos 后台新建一个配置文件:http://localhost:8848/nacos/
# 数据存储方式,db代表数据库 store.mode=db store.db.datasource=druid store.db.dbType=mysql # 这是MySQL8的驱动,MySQL5使用的是com.mysql.jdbc.Driver store.db.driverClassName=com.mysql.cj.jdbc.Driver store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useSSL=false&useUnicode=true&characterEncoding=utf-8&serverTimezone=GMT%2B8 store.db.user=root store.db.password=123456 store.db.minConn=5 store.db.maxConn=30 store.db.globalTable=global_table store.db.branchTable=branch_table store.db.queryLimit=100 store.db.lockTable=lock_table store.db.maxWait=5000 # 事务、日志等配置 server.recovery.committingRetryPeriod=1000 server.recovery.asynCommittingRetryPeriod=1000 server.recovery.rollbackingRetryPeriod=1000 server.recovery.timeoutRetryPeriod=1000 server.maxCommitRetryTimeout=-1 server.maxRollbackRetryTimeout=-1 server.rollbackRetryTimeoutUnlockEnable=false server.undo.logSaveDays=7 server.undo.logDeletePeriod=86400000 # 客户端与服务端传输方式 transport.serialization=seata transport.compressor=none # 关闭metrics功能,提高性能 metrics.enabled=false metrics.registryType=compact metrics.exporterList=prometheus metrics.exporterPrometheusPort=9898
-
TC 服务在管理分布式事务时,需要记录事务相关数据到数据库中,因此需要提前创建好这些表。新建一个名为 seata 的数据库,运行 SQL 。这些表主要记录全局事务、分支事务、全局锁信息。
SET NAMES utf8mb4; SET FOREIGN_KEY_CHECKS = 0; -- ---------------------------- -- Table structure for branch_table -- ---------------------------- DROP TABLE IF EXISTS `branch_table`; CREATE TABLE `branch_table` ( `branch_id` bigint(20) NOT NULL, `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `transaction_id` bigint(20) NULL DEFAULT NULL, `resource_group_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `branch_type` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `status` tinyint(4) NULL DEFAULT NULL, `client_id` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `gmt_create` datetime(6) NULL DEFAULT NULL, `gmt_modified` datetime(6) NULL DEFAULT NULL, PRIMARY KEY (`branch_id`) USING BTREE, INDEX `idx_xid`(`xid`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of branch_table -- ---------------------------- -- ---------------------------- -- Table structure for global_table -- ---------------------------- DROP TABLE IF EXISTS `global_table`; CREATE TABLE `global_table` ( `xid` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `transaction_id` bigint(20) NULL DEFAULT NULL, `status` tinyint(4) NOT NULL, `application_id` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `transaction_service_group` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `transaction_name` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `timeout` int(11) NULL DEFAULT NULL, `begin_time` bigint(20) NULL DEFAULT NULL, `application_data` varchar(2000) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `gmt_create` datetime NULL DEFAULT NULL, `gmt_modified` datetime NULL DEFAULT NULL, PRIMARY KEY (`xid`) USING BTREE, INDEX `idx_gmt_modified_status`(`gmt_modified`, `status`) USING BTREE, INDEX `idx_transaction_id`(`transaction_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact; -- ---------------------------- -- Records of global_table -- ---------------------------- -- ---------------------------- -- Records of lock_table -- ---------------------------- SET FOREIGN_KEY_CHECKS = 1;
- 启动TC服务:进入 bin 目录,运行其中的 seata-server.bat 即可
-
启动成功后,在 Nacos 的服务列表页面,可以看到 seata-tc-server 的信息
(3)微服务整合Seata
-
在每个微服务中导入seata依赖
<dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <exclusions> <!--版本较低,1.3.0,因此排除--> <exclusion> <artifactId>seata-spring-boot-starter</artifactId> <groupId>io.seata</groupId> </exclusion> </exclusions> </dependency> <!--seata starter 采用1.4.2版本--> <dependency> <groupId>io.seata</groupId> <artifactId>seata-spring-boot-starter</artifactId> <version>${seata.version}</version> </dependency>
-
修改配置文件:配置 TC 服务信息,通过Nacos注册中心,结合服务名称获取 TC 地址
seata: registry: # TC服务注册中心的配置,微服务根据这些信息去注册中心获取tc服务地址 type: nacos # 注册中心类型 nacos nacos: server-addr: 127.0.0.1:8848 # nacos地址 namespace: "" # namespace,默认为空 group: DEFAULT_GROUP # 分组,默认是DEFAULT_GROUP application: seata-tc-server # seata服务名称 username: nacos password: nacos tx-service-group: seata-demo # 事务组名称 service: vgroup-mapping: # 事务组与cluster的映射关系 seata-demo: SH
-
【tips】微服务如何根据上面这些配置寻找 TC 呢?
-
注册到 Nacos 中的微服务,确定一个具体实例需要四个信息:namespace+group+application+cluster
- namespace:命名空间,为空就是默认的public
- group:分组
- application:服务名
- cluster:集群名
- 微服务通过配置文件中的这四个信息就能找到我们部署的TC服务:
-
注册到 Nacos 中的微服务,确定一个具体实例需要四个信息:namespace+group+application+cluster
- 重启微服务,可以在seata的shell控制台看到三个微服务的RM和TM都已成功注册:
(4)XA模式解决分布式事务
1)XA模式原理
XA 规范是 X/Open 组织定义的分布式事务处理(DTP,Distributed Transaction Processing)标准,描述了全局的 TM 与局部的 RM 之间的接口。几乎所有主流数据库都对 XA 规范提供了支持,这种实现的原理都是基于两阶段提交。
- 正常情况:
- 异常情况:
-
第1阶段:
- 1.1 事务协调者通知每个事务参与者执行本地事务
-
1.2 本地事务执行完成后报告事务执行状态给事务协调者,此时事务不提交,继续持有数据库锁
-
第2阶段:事务协调者基于第1阶段的报告状态来判断下一步操作:
- 如果第1阶段执行本地事务都成功,则通知所有事务参与者提交事务
- 如果第1阶段任意参与者失败,则通知所有事务参与者回滚事务
2)Seata对XA模式的改进
Seata 对上述原始 XA 模式做了简单的封装和改造,以适应seata的事务模型(TC、TM、RM),基本架构如下图:
-
第1阶段:
-
RM的工作:
- 1.3:注册分支事务到TC
- 1.4:执行分支业务 sql 但不提交
- 1.5:报告sql执行状态到 TC
-
RM的工作:
-
第2阶段:
-
TC的工作:
- 2.2:TC 检查各分支事务执行状态
- 2.3:根据执行状态做出决定:如果都成功,通知所有 RM 提交事务;如果有失败,通知所有 RM 回滚事务
-
RM的工作:
- 接收 TC 指令,提交或回滚事务
-
TC的工作:
3)XA模式的优缺点
优点:
- 事务的强一致性,满足 ACID 原则。
- 常用数据库都支持XA模式,实现简单,并且没有代码侵入。
- 因为一阶段需要锁定数据库资源,等待二阶段结束才释放,资源锁定周期过长,性能较差,牺牲了一定的可用性。
- 依赖关系型数据库实现事务。
由于 Seata 的 starter 已经完成了 XA 模式的自动装配,因此XA模式实现起来非常简单:
-
修改每一个参与事务的微服务的 application.yml 文件,开启XA模式:
seata: data-source-proxy-mode: XA
- 给发起全局事务的入口方法(本例中是 OrderServiceImpl 中的 create 方法)添加 @GlobalTransactional 注解:
- 重启 order-service,再次测试:发现无论出现什么异常情况,三个微服务都能成功回滚。
(5)AT模式
1)原理框架图
AT 模式和XA 模式一样,都是分阶段提交的事务模型,但是AT模式弥补了XA模型中资源锁定周期过长的缺陷。
-
第1阶段:
-
RM 的工作:
- 注册分支事务
- 记录 undo-log(数据快照)
- 执行业务 SQL 并提交事务
- 报告事务状态
-
RM 的工作:
-
第2阶段:
- TC的工作同XA模式
-
RM的工作:根据TC的通知来决定提交/回滚:
- 提交时 RM 的工作:删除 undo-log 即可
- 回滚时 RM 的工作:根据 undo-log 恢复数据到更新前,并删除undo-log
2)AT与XA的区别
- XA 模式第一阶段不提交事务,锁定资源;AT 模式第一阶段直接提交,不锁定资源。
- XA 模式依赖数据库机制实现回滚;AT 模式利用数据快照实现数据回滚。
- XA 模式强一致;AT 模式最终一致。
3)多线程环境下AT模式的脏写问题
在多线程并发访问 AT 模式的分布式事务时,有可能出现脏写问题。
- 如图,当事务 1 因为某些原因需要回滚并恢复快照时,另一个线程的事务 2 白白更新了一次数据,出现了脏写问题。
- 解决方案——全局锁:在提交事务之前,先去获取全局锁,避免同一时刻有另外一个事务在操作当前数据,拿不到全局锁的事务超过一定时间则回滚。如图,这样一来事务 2 就更新失败了,此时事务 1 恢复数据不会被另一个线程的事务 2 造成脏读的问题。
4)AT模式的优缺点
-
优点:
- 一阶段完成直接提交事务,释放数据库资源,性能比较好
- 利用全局锁实现读写隔离
- 没有代码侵入,框架自动完成回滚和提交
-
缺点:
- 两阶段之间属于软状态,属于最终一致
- 框架的快照功能影响性能,但比XA模式要好很多
5)AT模式的实现
AT 模式中的快照生成、回滚等动作都是由框架自动完成,没有任何代码侵入,因此实现非常简单。 但AT模式需要一张表lock_table来记录全局锁、另一张表undo_log来记录数据快照 undo_log。
【tips】lock_table表要放在与TC有关的数据库中,undo_log表要放在与微服务有关的数据库中。
-
创建两张表
-- ---------------------------- -- Table structure for lock_table -- ---------------------------- DROP TABLE IF EXISTS `lock_table`; CREATE TABLE `lock_table` ( `row_key` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL, `xid` varchar(96) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `transaction_id` bigint(20) NULL DEFAULT NULL, `branch_id` bigint(20) NOT NULL, `resource_id` varchar(256) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `table_name` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `pk` varchar(36) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL, `gmt_create` datetime NULL DEFAULT NULL, `gmt_modified` datetime NULL DEFAULT NULL, PRIMARY KEY (`row_key`) USING BTREE, INDEX `idx_branch_id`(`branch_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Compact;
-- ---------------------------- -- Table structure for undo_log -- ---------------------------- DROP TABLE IF EXISTS `undo_log`; CREATE TABLE `undo_log` ( `branch_id` bigint(20) NOT NULL COMMENT 'branch transaction id', `xid` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'global transaction id', `context` varchar(128) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` longblob NOT NULL COMMENT 'rollback info', `log_status` int(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` datetime(6) NOT NULL COMMENT 'create datetime', `log_modified` datetime(6) NOT NULL COMMENT 'modify datetime', UNIQUE INDEX `ux_undo_log`(`xid`, `branch_id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci COMMENT = 'AT transaction mode undo table' ROW_FORMAT = Compact;
-
修改每一个参与事务的微服务的 application.yml 文件,开启AT模式(默认):
seata: data-source-proxy-mode: AT
- 重启,测试。
(6)TCC模式
1)原理及架构图
TCC 模式与 AT 模式非常相似,每阶段都是独立事务,不同的是 TCC 模式通过人工编码来实现数据恢复。需要实现三个方法:
- Try:资源的检测和预留(冻结);
- Confirm:业务执行和提交;要求 Try 成功 Confirm 一定要能成功。
- Cancel:预留资源的释放(回滚),可以理解为 try 的反向操作。
- 阶段一(Try):检查余额是否充足,如果充足则冻结金额(资源预留)增加 30 元,可用余额扣除 30,此时用户余额 = 冻结金额 + 可用金额,数量依然是 100 不变。事务直接提交无需等待其它事务:
- 阶段二(Confirm):如果要提交(Confirm),则扣减 30 确认,可以提交。由于Try阶段已冻结金额30,所以直接将冻结金额减掉即可
- 阶段二 (Canncel):如果要回滚(Cancel),则无法扣减 30,那么就要释放冻结金额,恢复可用金额
【架构图】
2)优缺点
-
优点:
- 一阶段完成直接提交事务,释放数据库资源,性能好
- 相比 AT 模型,无需生成快照,无需使用全局锁,性能最强
- 不依赖数据库事务,而是依赖补偿操作,可以用于非事务型数据库
-
缺点:
- 有代码侵入,需要人为编写 try、Confirm 和 Cancel 接口,太麻烦
- 软状态,事务是最终一致
- 需要考虑 Confirm 和 Cancel 的失败情况,做好幂等处理
3)TCC中的空回滚和业务悬挂问题
- 空回滚:当某分支事务的 try 阶段阻塞时,可能导致全局事务超时而触发二阶段的 cancel 操作。即在未执行 try 操作时先执行了 cancel 操作,这时cancel 不能做回滚,就是空回滚。因此在执行 cancel 操作时,应当判断 try 是否已经执行,如果尚未执行,则应该空回滚。
- 业务悬挂:空回滚后出现一个新问题:对于已经空回滚的业务,之前被阻塞的 try 操作恢复,继续执行 try,但此时整个业务都已经结束了(cancel操作都结束了),现在我们不能让其再去走 confirm 或 cancel ,因此该事务一直处于中间状态,这就是业务悬挂。我们应当去避免出现业务悬挂,因此执行 try 操作时,应当判断 cancel 是否已经执行过了,如果已经执行,应当阻止空回滚后的 try 操作,避免悬挂。
4)实现扣减余额的案例
-
设计数据库表:实现 TCC 模式需要去记录冻结状态,因此需要一个数据表。
- xid:全局事务 id
- freeze_money:记录用户冻结金额
- state:记录事务状态
CREATE TABLE `account_freeze_tbl` ( `xid` varchar(128) NOT NULL, `user_id` varchar(255) DEFAULT NULL COMMENT '用户id', `freeze_money` int(11) unsigned DEFAULT '0' COMMENT '冻结金额', `state` int(1) DEFAULT NULL COMMENT '事务状态,0:try,1:confirm,2:cancel', PRIMARY KEY (`xid`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 ROW_FORMAT=COMPACT;
-
分析T、C、C
-
Try 业务:
- 记录冻结金额和事务状态到 account_freeze 表
- 扣减 account 表可用金额
-
Confirm 业务:
- 根据 xid 删除 account_freeze 表的冻结记录
-
Cancel 业务:
- 修改 account_freeze 表,冻结金额为 0,state 为 2
- 修改 account 表,恢复可用金额
- 如何判断是否空回滚——cancel 业务中,根据 xid 查询 account_freeze,如果为 null 则说明 try 还没做,需要空回滚
- 如何避免业务悬挂 try 业务中——根据 xid 查询 account_freeze,如果已经存在则证明 Cancel 已经执行,拒绝执行 try 业务
-
Try 业务:
- 声明TCC接口:TCC 的 Try、Confirm、Cancel 方法都需要在接口中基于注解来声明,首先是接口上要用 @LocalTCC,try 逻辑方法用注解@TwoPhaseBusinessAction(name="try 方法名", commitMethod="confirm 方法名", rollbackMethod="cancel 方法名") 注明 ,在该方法参数上加入 @BusinessActionContextParameter(paramName="try 方法的参数"),可以使得该参数传入 BusinessActionContext 类,便于 confirm 和 cancel 读取。
【tips】Seata 全局事务的 id 可以通过 RootContext.getXID() 获取, 也可以通过 BusinessActionContext 参数的 getXid() 方法获取。
- 编写实现类,实现TCC的三个业务方法
(7)SAGA模式
1)简介
SAGA 模式是 Seata提供的长事务解决方案。
Seata 官网对于 Saga 的指南:https://seata.io/zh-cn/docs/user/saga.html
分布式事务执行过程中,依次执行各参与者的正向操作,如果所有正向操作均执行成功,那么分布式事务提交。如果任意一个正向操作执行失败,那么分布式事务会去退回去执行前面各参与者的逆向回滚操作,回滚已提交的参与者,使分布式事务回到初始状态。
SAGA 模式也分为两个阶段:
- 一阶段:直接提交本地事务
- 二阶段:成功则什么都不做;失败则通过编写补偿业务来回滚
-
优点:
- 事务参与者可以基于事件驱动实现异步调用,吞吐高
- 一阶段直接提交事务,无锁,性能好
- 不用编写 TCC 中的三个阶段,实现简单
-
缺点:
- 软状态持续时间不确定,时效性差 没有锁,没有事务隔离,会有脏写
二、Redis分布式缓存
1.单机Redis存在的问题
2.Redis数据持久化
(1)RDB持久化
RDB 全称 Redis Database Backup file(Redis数据备份文件),也被叫做 Redis 数据快照。简单来说就是把内存中的所有数据都记录到磁盘中。当 Redis 实例故障重启后,从磁盘读取快照文件,恢复数据。快照文件称为 RDB 文件,默认是保存在当前运行目录。
1)RDB 持久化的四种情况
-
执行 save 命令
-
客户端连接redis服务后执行save命令,会立即执行一次RDB,进行数据备份。但 save 命令会导致主进程执行 RDB,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到。
-
客户端连接redis服务后执行save命令,会立即执行一次RDB,进行数据备份。但 save 命令会导致主进程执行 RDB,这个过程中其它所有命令都会被阻塞。只有在数据迁移时可能用到。
-
执行 bgsave 命令
- 执行bgsave命令后会开启独立进程异步完成 RDB,主进程可以持续处理用户请求,不受影响。
-
Redis 正常停机时
- Redis 停机时会在退出前自动执行一次 save 命令,实现 RDB 持久化。但宕机时不会自动执行save命令实现持久化。
-
触发 RDB 条件时
-
Redis 内部有触发 RDB 的机制,可以在 redis.conf 文件中配置:
# 900秒内,如果至少有1个key被修改,则执行 bgsave # save "" 则表示禁用RDB save 900 1 save 300 10 save 60 10000
-
RDB 的其它配置也可以在 redis.conf 文件中配置:
# 是否压缩 ,建议不开启,压缩也会消耗cpu,磁盘的话不值钱 rdbcompression yes # RDB文件名称 dbfilename dump.rdb # 文件保存的路径目录 dir ./
-
Redis 内部有触发 RDB 的机制,可以在 redis.conf 文件中配置:
2)bgsave的执行流程
bgsave 开始时会 fork(复刻) 主进程得到子进程,子进程共享主进程的内存数据(页表映射)。完成 fork 后读取内存数据并写入 RDB 文件。
fork 采用的是 copy-on-write 技术:
- 当主进程执行读操作时,访问共享内存
- 当主进程执行写操作时,则会拷贝一份数据,再执行写操作
3)RDB的缺点
- 如果 RDB 执行间隔时间长,两次 RDB 之间写入数据有丢失的风险
- fork 子进程、压缩RDB文件、写出 RDB 文件都比较耗时
(2)AOF持久化
AOF 全称为 Append Only File(追加文件),Redis 处理的每一个写命令都会记录在 AOF 文件中,因此AOF文件也可以看做是命令的日志文件。
1)AOF的使用
-
AOF 默认是关闭的,需要修改 redis.conf 配置文件来开启 AOF:
# 是否开启AOF功能,默认是no appendonly yes # AOF文件的名称 appendfilename "appendonly.aof"
-
AOF 下命令记录的频率也可以通过 redis.conf 文件来配置:
# 表示每执行一次写命令,立即记录到AOF文件 appendfsync always # 写命令执行完先放入AOF缓冲区,然后表示每隔1秒将缓冲区数据写到AOF文件,是默认方案 appendfsync everysec # 写命令执行完先放入AOF缓冲区,由操作系统决定何时将缓冲区内容写回磁盘 appendfsync no
三种命令记录策略的比较:
2)AOF文件重写
- 由于AOF文件是记录命令,所以会比 RDB 文件大的多。而且 AOF 会记录对同一个 key 的多次写操作,但其实只有最后一次写操作才有意义。可以通过执行 bgrewriteaof 命令,让 AOF 文件执行重写功能,用最少的命令达到相同效果。
如图,AOF文件中原本有三个命令,但是有两个都是对 num 的操作,第二次会覆盖第一次的值,因此第一个命令记录下来没有意义。 所以重写命令后,AOF文件内容就是:mset name jack num 666
-
Redis 会在触发阈值时自动重写 AOF 文件。阈值也可以在 redis.conf 中配置
# AOF文件比上次文件 增长超过多少百分比则触发重写 auto-aof-rewrite-percentage 100 # AOF文件体积最小多大以上才触发重写 auto-aof-rewrite-min-size 64mb
(3)RDB和AOF的对比
RDB 和 AOF 各有自己的优缺点,如果对数据安全性要求较高,在实际开发中往往会结合两者来使用。Redis 支持同时开启 RDB 和 AOF,在这种情况下当 Redis 重启的时候会优先载入 AOF 文件来恢复原始的数据,因为在通常情况下 AOF 文件保存的数据集要比 RDB 文件保存的数据集完整。
2.Redis主从复制
单节点 Redis 的并发能力是有上限的,要进一步提高 Redis 的并发能力,就需要搭建主从(master-slave)集群,实现读写分离。
(1)搭建Redis主从架构
这里演示上图一主两从的三节点Redis集群。
- 在同一台虚拟机中开启 3 个 Redis 实例,模拟主从集群,信息如下:
IP | PORT | 角色 |
192.168.152.100 | 7001 | master |
7002 | slave | |
7003 | slave |
- 要在同一台虚拟机开启3个Redis实例,需要准备3个不同的配置文件和目录
- 将配置文件拷贝到以上三个目录
-
修改每个实例的端口、工作目录(配置文件所在的目录)、声明ip
port 700x dir /usr/local/redis-master-slave/700x/ replica-announce-ip 192.168.152.100
-
分别启动三个实例,并开启主从关系
# 连接 7002 redis-cli -p 7002 # 执行slaveof slaveof 192.168.152.100 7001
# 连接 7003 redis-cli -p 7003 # 执行slaveof slaveof 192.168.152.100 7001
-
连接 7001 节点,查看集群状态:
# 连接 7001 redis-cli -p 7001 # 查看状态 info replication
(2)主从数据同步原理
1)第一次同步:全量同步,将 master 节点的所有数据都拷贝给 slave 节点,流程如下
【tips】①master如何判断slave是不是第一次来同步数据?
首先要了解两个概念:
Replication Id:简称 replid,是数据集的标记,id 一致则说明是同一数据集。每一个 master 都有唯一的replid,slave 则会继承 master 节点的 replid。
offset:偏移量,随着记录在 repl_baklog 中的数据增多而逐渐增大。slave 完成同步时也会记录当前同步的offset,即 slave 的 offset 永远小于等于 master 的 offset;当 slave 的 offset 小于 master 的 offset,说明 slave 数据落后于 master,需要更新。
因此 slave 做数据同步,必须向 master 声明自己的 replid 和 offset,master 才可以判断到底需要同步哪些数据。而 slave 原本也是一个 master,有自己的 replid 和 offset,当第一次与 master 建立连接变成 slave时,发送的 replid 和 offset 是自己的 replid 和 offset。master 判断 slave 发送来的 replid 与自己的不一致,说明这是一个全新的 slave,就知道要做全量同步了。master 会将自己的 replid 和 offset 都发送给这个 slave,slave 保存这些信息。以后 slave 的replid 就与 master 一致了。因此master判断一个节点是否是第一次同步的依据,就是看 replid 是否一致。
②全量同步完整流程描述:
- slave 节点请求增量同步
- master 节点判断 replid,发现不一致,拒绝增量同步,选择全量同步
- master 将完整内存数据生成 RDB,发送 RDB 到 slave
- slave 清空本地数据,加载 master 的 RDB
- master 将 RDB 期间的命令记录在 repl_baklog,并持续将 log 中的命令发送给 slave
- slave 执行接收到的命令,保持与 master 之间的同步
全量同步需要先做 RDB,然后将 RDB 文件通过网络传输给 slave,成本较高。因此除了主从第一次连接做全量同步,其它大多数时候 slave 与 master 都是做增量同步。
【tips】master 怎么知道 slave 与自己的数据差异在哪里?——repl_baklog 的原理
repl_backlog 文件是一个固定大小的环形数组,环形数组头部的数据可能会被覆盖。repl_baklog 中会记录 Redis 处理过的命令日志及 offset,包括 master 当前的 offset 和 slave 已经拷贝到的 offset,slave 与 master 的 offset 之间的差(如左图)就是 salve 需要增量拷贝的数据。如果 slave 出现网络阻塞,导致 master 的 offset 远远超过了 slave 的 offset(如右图),此时 master 继续写入新数据,其 offset 就会覆盖旧的数据,直到将 slave 现在的 offset 也覆盖了,此时如果 slave 恢复,需要同步,却发现自己的 offset 都没有了,无法完成增量同步了,只能做全量同步。
(3)主从同步的优化
可以从以下几个方面来优化 Redis 主从集群:
- 在 master 中配置 "repl-diskless-sync yes" 启用无磁盘复制,避免全量同步时的磁盘 IO
- Redis 单节点上的内存占用不要太大,减少 RDB 导致的过多磁盘IO
- 适当提高 repl_baklog 的大小,发现 slave 宕机时尽快实现故障恢复,尽可能避免全量同步
- 限制一个 master 上的 slave 节点数量,如果实在是太多 slave,则可以采用主-从-从链式结构,减少 master 压力
(4)全量同步和增量同步的区别
|
全量同步 | 增量同步 |
执行流程 |
master 将完整内存数据生成 RDB,发送 RDB 到 slave。
后续命令则记录在 repl_baklog,逐个发送给slave
|
slave 提交自己的 offset 到 master,master 获取
repl_baklog 中从该 offset 之后的命令给slave
|
执行时机 |
|
|
3.Redis哨兵(sentinel)机制
(1)哨兵的作用
Redis 提供了哨兵机制来实现主从集群的故障自动恢复。哨兵的结构如图:
- 监控:Sentinel 会不断检查 master 和 slave 是否按预期工作。
- 自动故障恢复:如果 master 故障,Sentinel 会将一个 slave 提升为 master。当故障实例恢复后也以新的 master 为主。
- 通知:Sentinel 充当 Redis 客户端的服务发现来源,当集群发生故障转移时,会将主从集群变更的最新信息推送给 Redis 客户端。
(2)如何判断实例是否健康?——监控的原理
Sentinel 基于心跳机制监测服务状态,每隔 1 秒向集群的每个实例发送 ping 命令:
- 主观下线:如果某 Sentinel 节点发现某实例未在规定时间响应,则认为该实例主观下线。(Sentinel主观地认为实例下线了,实际下没下线不一定。)
- 客观下线:若超过指定数量(quorum) 的 Sentinel 都认为该实例主观下线,则该实例客观下线。quorum 值最好超过 Sentinel 实例数量的一半。
(3)自动故障恢复原理
一旦发现 master 故障,sentinel 需要在 salve 中选择一个作为新的 master,选择规则如下:
- 首先会判断 slave 节点与 master 节点的断开时间长短,如果超过指定值(down-after-milliseconds * 10),则会排除该 slave 节点(断开时间太长了,不适合当master)
- 然后判断 slave 节点的 slave-priority 值(越小优先级越高,如果是 0 则永不参与选举);如果 slave-prority 一样,则判断 slave 节点的 offset 值(越大说明数据越新,优先级越高)
- 最后是判断 slave 节点的运行 id 大小(越小优先级越高)
- sentinel 给备选的 slave1 节点发送 slaveof no one 命令,让该节点成为 master
- sentinel 给所有其它 slave 发送 slaveof 192.168.150.101 7002 之类的命令,让这些 slave 成为新 master 的从节点,开始从新的 master 上同步数据
- 最后,sentinel 将故障节点(原master)标记为 slave,当故障节点恢复后会自动成为新的 master 的 slave 节点
(4)搭建哨兵集群
这里我们在同一台虚拟机搭建一个三节点形成的 Sentinel 集群,来监控之前搭建的 Redis 主从集群。三个 sentinel 实例信息如下:
sentinel节点 | IP | PORT |
s1 |
192.168.152.100 |
27001 |
s2 |
27002 | |
s3 | 27003 |
-
在同一台虚拟机开启 3 个实例,必须准备三份不同的配置文件和目录。创建三个文件夹,名字分别叫 s1、s2、s3
# 进入/tmp目录 cd /tmp # 创建目录 mkdir s1 s2 s3
-
在 s1 目录创建一个 sentinel.conf 文件,添加下面的内容
port 27001 sentinel announce-ip 192.168.152.100 sentinel monitor mymaster 192.168.152.100 7001 2 sentinel down-after-milliseconds mymaster 5000 sentinel failover-timeout mymaster 60000 dir "/tmp/s1"
sentinel monitor mymaster 192.168.152.100 7001 2:指定主节点信息
mymaster:主节点名称,自定义
192.168.152.100 7001:主节点的 ip 和 端口
2:选举 master 时的 quorum 值
-
将 s1/sentinel.conf 文件拷贝到 s2、s3 两个目录中(在 /tmp 目录执行下列命令),并修改 s2、s3 两个文件夹内的配置文件,修改端口和工作目录
# 方式一:逐个拷贝 cp s1/sentinel.conf s2 cp s1/sentinel.conf s3 # 方式二:管道组合命令,一键拷贝 echo s2 s3 | xargs -t -n 1 cp s1/sentinel.conf sed -i -e 's/27001/27002/g' -e 's/s1/s2/g' s2/sentinel.conf sed -i -e 's/27001/27003/g' -e 's/s1/s3/g' s3/sentinel.conf
-
分别启动 3 个 redis 实例
# 第1个 redis-sentinel s1/sentinel.conf # 第2个 redis-sentinel s2/sentinel.conf # 第3个 redis-sentinel s3/sentinel.conf
- 测试:尝试让 master 节点 7001 宕机,查看 sentinel 日志
- 查看 7003 的日志
- 查看 7002 的日志
(5)RedisTemplate的哨兵模式
在 Sentinel 集群监管下的 Redis 主从集群,其节点会因为自动故障转移而发生变化,Redis 客户端必须感知这种变化,及时更新连接信息。
Spring 中 RedisTemplate 底层利用 lettuce 实现了节点的感知和自动切换。
我们通过一个测试(资料中有)来实现 RedisTemplate 集成哨兵机制。
-
导入Redis的starter依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
-
在配置文件 application.yml 中配置 redis 的 sentinel 信息
spring: redis: sentinel: master: mymaster # 指定master名称 nodes: # 指定redis-sentinel集群信息 - 192.168.150.101:27001 - 192.168.150.101:27002 - 192.168.150.101:27003
【tips】这里不需要配置 redis 集群地址,因为在 sentinel 模式下主从地址是可能变更的,所以不能把他写死,也不需要写。所以上面直接光配置 sentinel 即可。 -
配置读写分离(★):在项目的启动类中,添加一个新的 bean
@Bean public LettuceClientConfigurationBuilderCustomizer clientConfigurationBuilderCustomizer(){ return clientConfigurationBuilder -> clientConfigurationBuilder.readFrom(ReadFrom.REPLICA_PREFERRED); }
【tips】其中 ReadFrom 是配置 Redis 读写策略的,是一个枚举,包括四种:- MASTER:从主节点读取
- MASTER_PREFERRED:优先从 master 节点读取,master 不可用才读取 slave(replica)
- REPLICA:从 slave(replica) 节点读取
- REPLICA _PREFERRED:优先从 slave(replica) 节点读取,所有的 slave(replica) 都不可用才读取 master
- 启动项目
4.Redis分片集群
主从和哨兵可以解决高可用、高并发读的问题,但是依然有两个问题没有解决:海量数据存储和高并发写的问题。
使用分片集群可以解决上述问题。
(1)分片集群特征和结构
- 集群中有多个master,每个master保存不同数据
- 每个master都可以有多个slave节点
- master之间通过ping监测彼此健康状态
- 客户端请求可以访问集群任意节点,最终都会被转发到正确节点
(2)搭建Redis分片集群
我们在一台虚拟机搭建一主一从的3个主从集群,共6个节点:
IP | PORT | 角色 |
192.168.152.100 | 7001 |
|
7002 |
master |
|
7003 | ||
8001 |
slave |
|
8002 | ||
8003 |
-
准备上面的6个节点实例
-
在/tmp下准备一个新的redis.conf文件,内容如下:
# 端口要改成对应实例的端口 port 6379 # 开启集群功能 cluster-enabled yes # 集群的配置文件名称,不需要我们创建,由redis自己维护 cluster-config-file /tmp/6379/nodes.conf # 这里的6379端口也要改成对应实例的端口 # 节点心跳失败的超时时间 cluster-node-timeout 5000 # 持久化文件存放目录 dir /tmp/6379 # 绑定地址 bind 0.0.0.0 # 让redis后台运行 daemonize yes # 注册的实例ip replica-announce-ip 192.168.150.101 # 保护模式 protected-mode no # 数据库数量 databases 1 # 日志 logfile /tmp/6379/run.log # 端口同理
-
将上述文件拷贝到每个目录下:
# 进入/tmp目录 cd /tmp # 执行拷贝 echo 7001 7002 7003 8001 8002 8003 | xargs -t -n 1 cp redis.conf
-
修改各实例目录下的redis.conf:将6379端口改为对应实例的端口:
# 进入/tmp目录 cd /tmp # 修改配置文件 printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t sed -i 's/6379/{}/g' {}/redis.conf
-
在/tmp下准备一个新的redis.conf文件,内容如下:
-
启动服务
# 进入/tmp目录 cd /tmp # 一键启动所有服务 printf '%s\n' 7001 7002 7003 8001 8002 8003 | xargs -I{} -t redis-server {}/redis.conf
-
创建主从集群:虽然服务启动了,但是目前每个服务之间都是独立的,没有任何关联。我们需要执行命令来创建集群。在Redis5.0之前创建集群比较麻烦,5.0之后集群管理命令都集成到了redis-cli中。
-
5.0之前:Redis5.0之前集群命令都是用redis安装包下的src/redis-trib.rb来实现的。因为redis-trib.rb是有ruby语言编写的所以需要安装ruby环境。
# 安装依赖 yum -y install zlib ruby rubygems gem install redis
通过命令来管理集群:# 进入redis的src目录 cd /tmp/redis-6.2.4/src # 创建集群 ./redis-trib.rb create --replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003
-
5.0以后:集群管理以及集成到了redis-cli中,直接执行以下命令创建集群:
redis-cli --cluster create --cluster-replicas 1 192.168.150.101:7001 192.168.150.101:7002 192.168.150.101:7003 192.168.150.101:8001 192.168.150.101:8002 192.168.150.101:8003
【tips】命令说明: - redis-cli --cluster 或者 ./redis-trib.rb:代表集群操作命令
- create:代表是创建集群
- --replicas 1 或者 --cluster-replicas 1 :指定集群中每个master的副本(slave)个数为1,此时 节点总数 ÷ (replicas + 1) = master的数量。因此节点列表中的前n个就是master;其它节点都是slave节点,随机分配到不同master
-
5.0之前:Redis5.0之前集群命令都是用redis安装包下的src/redis-trib.rb来实现的。因为redis-trib.rb是有ruby语言编写的所以需要安装ruby环境。
- 执行创建集群命令后会提示创建的主从节点信息,如果同意这种创建就输入yes:
-
通过以下命令查看集群状态:
redis-cli -p 7001 cluster nodes
redis-cli -c -p 7001
(3)散列插槽
Redis会把每一个master节点映射到0~16383共16384个插槽(hash slot)上(上面查看集群信息时也能看到)。
数据key不是与节点绑定,而是与插槽绑定。redis会根据key的有效部分计算插槽值,计算方式是利用CRC16算法得到一个hash值,然后对16384取余,得到的结果就是slot值。有效部分分两种情况:
- key中包含"{}",且“{}”中至少包含1个字符,“{}”中的部分是有效部分
- key中不包含“{}”,整个key都是有效部分
- 例如:key是num,那么就根据num计算,如果是key是{itcast}num,则根据itcast计算。
【tips】为什么数据key不是与节点绑定,而是与插槽绑定呢?——因为节点可能因为某种情况而宕机,丢失数据;而key与插槽绑定,即使节点宕机,也可以随插槽转移到其他正常节点。
如何将同一类数据固定保存在某一个Redis实例中?
——让这些数据key的有效部分相同,可以使用{}规定其有效部分,这样根据{}内有效部分计算出的插槽值就都一样了,因此也就实现了固定保存。
(4)集群伸缩
redis-cli --cluster提供了很多操作集群的命令,可以通过以下命令查看:
- 向集群中添加新节点(add-node)
【tips】--cluster-slave:添加的节点是slave节点(默认不加,代表添加的是master节点)
--cluster-master-id:添加的节点是slave节点,并指明该slave节点的master节点id
-
为指定节点分配(reshard)插槽
redis-cli --cluster reshard 指定节点的ip:port
(5)故障转移
Redis分片集群不需要哨兵,可以自动实现故障转移的主从切换。
1)当集群中有一个master(以7002为例)宕机会发生什么呢?
- 首先7002实例与其它实例失去连接
- 然后是7002疑似宕机(fail?):
- 最后是确定下线(fail),自动将一个slave(这里是8001)作为新的master:
2)数据迁移(failover)——手动故障转移
利用cluster failover命令可以手动让集群中的某个master宕机,切换到执行cluster failover命令的这个slave节点,实现无感知的数据迁移。其流程如下图所示:
手动的故障转移支持三种不同模式:
- 缺省值(推荐):默认的流程,如图1~6歩
- force:省略了对offset的一致性校验(如图3、4步),不管数据一不一致
- takeover:直接执行第5歩,忽略数据一致性、忽略master状态和其它master的意见
(6)RedisTemplate访问分片集群
RedisTemplate底层同样基于lettuce实现了分片集群的支持,而使用的步骤与哨兵模式基本一致:
- 引入redis的starter依赖
-
配置分片集群地址:与哨兵模式相比,其中只有分片集群的配置方式略有差异,如下:
spring: redis: cluster: nodes: # 指定分片集群的每一个节点信息 - 192.168.150.101:7001 - 192.168.150.101:7002 - 192.168.150.101:7003 - 192.168.150.101:8001 - 192.168.150.101:8002 - 192.168.150.101:8003
- 配置读写分离
三、多级缓存
缓存就是把访问量较高的热点数据从传统的关系型数据库中加载到内存中,当用户再次访问热点数据时直接从内存中加载,减少了对数据库的访问量。同时缓存存储在内存中,数据读取速度非常快,能大量减少对数据库的访问,减少数据库的压力。
我们把缓存分为两类:
-
分布式缓存,例如Redis:
- 优点:存储容量更大、可靠性更好、可以在集群间共享
- 缺点:访问缓存有网络开销
- 场景:缓存数据量较大、可靠性要求较高、需要在集群间共享
-
进程本地缓存,例如HashMap、GuavaCache:
- 优点:读取本地内存,没有网络开销,速度更快
- 缺点:存储容量有限、可靠性较低、无法共享
- 场景:性能要求较高,缓存数据量较小
1.Redis缓存存在的问题
传统的缓存策略一般是请求到达Tomcat后,先查询Redis,如果未命中则查询数据库。
存在下面的问题:
- 请求要经过Tomcat处理,Tomcat的性能成为整个系统的瓶颈
- 当Redis缓存失效时,会对数据库产生冲击
2.多级缓存方案
多级缓存就是充分利用请求处理的每个环节,分别添加缓存,减轻Tomcat压力,提升服务性能。用作缓存的Nginx是业务Nginx,需要部署为集群,再有专门的Nginx用来做反向代理:
3.JVM进程缓存
(1)准备商品案例
-
在Linux中安装MySQL:后面数据同步需要用到MySQL的主从功能,所以需要在虚拟机中利用Docker来运行一个MySQL容器。
-
准备两个目录,用于挂载容器的数据和配置文件目录:
# 进入/tmp目录 cd /tmp # 创建文件夹 mkdir mysql # 进入mysql目录 cd mysql
-
进入mysql目录后,执行下面的Docker命令:
docker run \ -p 3306:3306 \ --name mysql_multi \ -v $PWD/conf:/etc/mysql/conf.d \ -v $PWD/logs:/logs \ -v $PWD/data:/var/lib/mysql \ -e MYSQL_ROOT_PASSWORD=123 \ --privileged \ -d \ mysql:5.7.25
-
修改配置在/tmp/mysql/conf目录添加一个my.cnf文件,作为mysql_multi的配置文件
# 创建文件 touch /my.cnf # 配置文件内容 [mysqld] skip-name-resolve character_set_server=utf8 datadir=/var/lib/mysql server-id=1000
-
修改配置后重启mysql_multi容器使其生效
#重启mysql_multi容器 docker restart mysql_multi
- 测试连接,创建数据库表
-
准备两个目录,用于挂载容器的数据和配置文件目录:
- 导入Demo工程,项目结构如下:
- 导入商品查询页面:商品查询是购物页面,与商品管理的页面是分离的。 部署方式如图:
-
- 我们需要准备一个反向代理的nginx服务器,如上图红框所示,将静态商品页面放到nginx目录中。 页面需要的数据通过ajax向服务端(nginx业务集群)查询。
- 配置nginx反向代理
(2)初识Caffeine
1)Caffeine的使用
- 创建缓存对象
- 存储数据put
-
取数据,不存在时的两种处理方案:
- 不存在则返回null——getIfPresent
- 不存在则根据key去数据库查询——get
2)Caffeine缓存驱逐(淘汰)策略
Caffeine提供了三种缓存驱逐策略:
-
基于容量:设置缓存的数量上限
// 创建缓存对象 Cache<String, String> cache = Caffeine.newBuilder() .maximumSize(1) // 设置缓存大小上限为 1 .build();
-
基于时间:设置缓存的有效时间
// 创建缓存对象 Cache<String, String> cache = Caffeine.newBuilder() .expireAfterWrite(Duration.ofSeconds(10)) // 设置缓存有效期为 10 秒,从最后一次写入开始计时 .build();
- 基于引用:设置缓存为软引用或弱引用,利用GC来回收缓存数据。性能较差,不建议使用。
【tips】在默认情况下,当一个缓存元素过期时,Caffeine不会自动立即将其清理和驱逐,而是在一次读或写操作后,或者在空闲时间完成对失效数据的驱逐。
3)实现进程本地缓存
利用Caffeine实现商品查询的本地进程缓存,需求如下:
- 缓存初始大小为100
- 缓存上限为10000
-
给 根据id查询商品 的业务添加缓存,缓存未命中时查询数据库
- 定义商品缓存的Bean:
-
- 给根据id查询商品的方法findById添加缓存:
-
-
给 根据id查询商品库存 的业务添加缓存,缓存未命中时查询数据库
- 定义库存缓存的Bean:
-
- 给根据id查询库存的方法findStockById添加缓存:
-
4.Lua语法基础
进程本地缓存(黄框)是在Tomcat中使用Java进行编码,静态资源的本地缓存是在nginx业务集群(绿框)中使用Lua进行编码。
(1)初始Lua
Lua 是一种轻量小巧的脚本语言,用标准C语言编写并以源代码形式开放, 其设计目的是为了嵌入应用程序中,从而为应用程序提供灵活的扩展和定制功能。官网:https://www.lua.org/
【HelloWorld案例】
-
在Linux虚拟机的任意目录下,新建一个hello.lua文件
touch hello.lua
-
在文件中添加下面的内容
print("Hello World!")
-
运行命令lua xx.lua打算
lua hello.lua
(2)变量
- 数据类型
可以利用type函数获取给定变量或者值的数据类型:
-
变量声明——Lua声明变量时不需要指定变量的数据类型
-- 声明字符串 local str = 'hello' -- lua中用..来拼接字符串 local str1 = 'hello' .. 'world' -- 声明数字 local num = 21 -- 声明布尔类型 local flag = true -- 声明数组,key为索引的table local arr = {'java', 'python', 'lua'} -- 声明table,类似java的map local map = {name='Jack', age=21}
【tips】前面的local表示声明的是局部变量,不加local表示声明的是全局变量。 -
访问table中的元素
-
数组table
-- 访问数组 print(arr[1])
【tips】lua数组下标从1开始。 -
map table
-- 访问map table,两种访问方式 print(map['name']) print(map.name)
-
数组table
(3)循环
table可以用for in循环来遍历:
-
遍历数组table
-- 声明数组 key为索引的 table local arr = {'java', 'python', 'lua'} -- 循环遍历数组:for ... in ipairs(...) do ... end for index,value in ipairs(arr) do print(index, value) end
【tips】index和value是我们自定义的键值对变量;ipairs(arr)表示解析arr;do和end可以理解为java中的{},表示从do开始到end结束。 -
遍历map table
-- 声明map,也就是table local map = {name='Jack', age=21} -- 遍历table for key,value in pairs(map) do print(key, value) end
【tips】pairs(map)表示解析map table,这是与遍历数组table的一个区别。
(4)函数
-
定义函数
function 函数名( argument1, argument2..., argumentn) -- 函数体 return 返回值 end
【tips】lua中函数以end结束;不需要指定返回值类型;函数体不需要用{}包裹,。 -
举例:定义一个打印数组的函数:
function printArr(arr) for index, value in ipairs(arr) do print(value) end end
(5)条件控制
-
lua中的条件控制类似java,也有if和else:
if(布尔表达式) then --[ 布尔表达式为 true 时执行该语句块 --] else --[ 布尔表达式为 false 时执行该语句块 --] end
【tips】以then开始,以end结束。
- 与java不同,lua布尔表达式中的逻辑运算是基于英文单词:
5.实现多级缓存
(1)OpenResty的安装和使用
OpenResty是一个基于 Nginx的高性能 Web 平台,用于方便地搭建能够处理超高并发、扩展性极高的动态 Web 应用、Web 服务和***关。官方网站: https://openresty.org/cn/
具备下列特点
- 具备Nginx的完整功能
- 基于Lua语言进行扩展,集成了大量精良的 Lua 库、第三方模块
- 允许使用Lua自定义业务逻辑、自定义库
Linux安装OpenResty
-
安装OpenResty的依赖开发库,执行命令
yum install -y pcre-devel openssl-devel gcc --skip-broken
-
添加 OpenResty 仓库,便于未来安装或更新软件包(通过yum check-update命令)。运行下面的命令添加OpenResty仓库:
yum-config-manager --add-repo https://openresty.org/package/centos/openresty.repo
-
安装OpenResty:
yum install -y openresty
-
安装opm工具(opm是OpenResty的一个管理工具,可以帮助安装一个第三方的Lua模块):
yum install -y openresty-opm
- OpenResty安装完毕,查看目录结构:OpenResty默认安装目录是/usr/local/openresty
【tips】可以直接把OR当nginx来用。
-
配置nginx的环境变量:
# 打开配置文件 vim /etc/profile # 在最下面加入两行 export NGINX_HOME=/usr/local/openresty/nginx export PATH=${NGINX_HOME}/sbin:$PATH # 然后让配置生效: source /etc/profile
-
启动和运行OR——OR底层是基于Nginx的,从OR目录下的nginx目录可以看出,结构与windows中安装的nginx基本一致,所以OR的运行方式与nginx基本一致:
# 启动nginx nginx # 重新加载配置 nginx -s reload # 停止 nginx -s stop
- 浏览器访问nginx
(2)OR快速入门
1)实现商品详情页(item.html)数据查询
商品详情页面目前展示的是假数据,在浏览器的控制台可以看到查询商品信息的请求:
而这个请求最终被反向代理到虚拟机的OpenResty集群:
【需求】:在OpenResty中接收这个请求,并返回一段商品的假数据。
-
在nginx.conf的http下面,添加对OpenResty的Lua模块的加载:
# 加载lua 模块 lua_package_path "/usr/local/openresty/lualib/?.lua;;"; # 加载c模块 lua_package_cpath "/usr/local/openresty/lualib/?.so;;";
- 在OR下的nginx目录下创建 lua/item.lua文件,用来写业务逻辑
-
在item.lua中返回假数据
-- 返回假数据,这里的ngx.say()函数是写数据到Response中 ngx.say('{"id":10001,"name":"SALSA AIR}')
-
在nginx.conf的server下面,添加对/api/item这个路径的监听,将监听到的请求交给lua目录下的item.lua处理:
location /api/item { # 响应类型,这里返回json default_type application/json; # 响应数据由lua/item.lua这个文件来决定,我们在该文件中编写业务逻辑即可 content_by_lua_file lua/item.lua; }
-
重新加载nginx配置:
nginx -s reload
2)获取请求参数
OpenResty提供了各种API用来获取不同类型的请求参数:
3)从OR向Tomcat发送请求
nginx内部提供了api用来发送http请求:
local resp = ngx.location.capture("/path",{ -- 请求路径 method = ngx.HTTP_GET, -- 请求方式 args = {a=1,b=2}, -- get方式传参数 body = "c=3&d=4" -- post方式传参数 })
返回的响应内容resp包括:
- resp.status:响应状态码
- resp.header:响应头,是一个table
- resp.body:响应体,就是响应数据
location /path { # 这里是windows电脑的ip和Java服务的端口,需要确保windows***处于关闭状态 proxy_pass http://192.168.152.1:8081; }
【案例】获取请求路径中的商品id信息(上面的路径占位符格式),根据id向Tomcat查询商品信息。修改item.lua,满足下面的需求:
-
获取请求参数中的id
location ~ /api/item/(\d+){ default_type application/json; content_by_lua_file lua/item.lua; }
-
根据id向Tomcat服务发送请求,
-
反向代理请求路径到Tomcat
location /item { proxy_pass http://192.168.152.1:8081; }
-
封装http查询的函数,放到OR函数库中:在/usr/local/openresty/lualib下创建common.lua文件,在该文件中封装http查询的函数
-- 封装函数,发送http请求,并解析响应 local function read_http(path,params) local resp = ngx.location.capture(path,{ method = ngx.HTTP_GET, args = params, }) if not resp then -- 记录错误信息,返回404 ngx.log(ngx.ERR, "http not found, path: ", path , ", args: ", args) ngx.exit(404) end return resp.body end -- 将方法导出,得到的是table类型 local _M = { read_http = read_http } return _M
-
根据id向Tomcat服务发送请求,查询库存信息
--导入common函数库和cjson local common = require('common') local read_http = common.read_http local cjson = require('cjson') --获取路径参数 local id = ngx.var[1] --查询商品信息 local itemJson = read_http("/item/"..id,nil) --查询库存信息 local stockJson = read_http("/item/stock/"..id,nil)
-
反向代理请求路径到Tomcat
-
组装商品信息、库存信息,序列化为JSON格式并返回
--封装数据 --将Json转为Lua中的table local item = cjson.decode(itemJson) local stock = cjson.decode(stockJson) item.stock = stock.stock item.sold = stock.sold --table -> Json local resJson = cjson.encode(item) --返回结果 ngx.say(resJson)
【tips】OR提供了一个cjson模块用来处理JSON的序列化和反序列化:
-
序列化(table -> json)encode:
local obj = { name = 'jack', age = 21 } local json = cjson.encode(obj)
-
反序列化(json -> table)decode:
local json = '{"name": "jack", "age": 21}' -- 反序列化 local obj = cjson.decode(json); print(obj.name)
4)Tomcat集群的负载均衡
Tomcat一般部署成集群。由于进程缓存在Tomcat集群中是不被共享的,即一个请求的缓存数据只能在固定一台Tomcat中查到。我们知道负载均衡默认采取轮询的方式,此时如果我们要多次查询某个数据,需要轮询一遍所有的Tomcat才能找到,命中率太低,也不合理。因此需要修改Tomcat集群负载均衡的策略,采用基于请求路径的hash算法策略,使请求路径与服务器一一对应。
-
配置负载均衡
upstream tomcat-cluster{ hash $request_uri; # 基于request_uri的hash策略 server 192.168.152.1:8081; server 192.168.152.1:8082; }
-
将反向代理修改为Tomcat集群
location /item{ proxy_pass http://tomcat-cluster; }
5)添加Redis缓存
-
缓存预热(这里我们把商品和库存所有数据都提前放入Redis)
-
利用Docker安装Redis
docker run --name redis -p 6379:6379 -d redis redis-server --appendonly yes
-
在item-service服务中引入Redis依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
-
配置Redis地址
spring: redis: host: 192.168.152.100
-
编写初始化类,实现缓存预热
@Component public class RedisHandler implements InitializingBean { @Autowired private StringRedisTemplate redisTemplate; @Autowired private ItemService itemService; @Autowired private ItemStockService stockService; @Override public void afterPropertiesSet() throws Exception { //初始化缓存 //1.查询商品信息 List<Item> itemList = itemService.list(); //2.放入缓存 for (Item item : itemList) { //序列化为json String json = JSON.toJSONString(item); //存入Redis redisTemplate.opsForValue().set("item:id:"+ item.getId(),json); } //3.查询库存信息 List<ItemStock> stockList = stockService.list(); //4.放入缓存 for (ItemStock itemStock: stockList) { //序列化为json String json = JSON.toJSONString(itemStock); //存入Redis redisTemplate.opsForValue().set("itemStock:id:"+ itemStock.getId(),json); } } }
-
利用Docker安装Redis
-
让OR优先查询Redis缓存:OR提供了操作Redis的模块redis.lua,只要引入该模块就能使用OR操作Redis:
-
引入Redis模块,并初始化Redis对象
-- 引入redis模块 local redis = require("resty.redis") -- 初始化Redis对象 local red = redis:new() -- 设置Redis超时时间 red:set_timeouts(1000, 1000, 1000)
-
封装函数,用来释放Redis连接(不是真的关闭Redis,其实是放入连接池)
-- 关闭redis连接的工具方法,其实是放入连接池 local function close_redis(red) local pool_max_idle_time = 10000 -- 连接的空闲时间,单位是毫秒 local pool_size = 100 --连接池大小 local ok, err = red:set_keepalive(pool_max_idle_time, pool_size) if not ok then ngx.log(ngx.ERR, "放入Redis连接池失败: ", err) end end
-
封装函数,从Redis读数据并返回
-- 查询redis的方法 ip和port是redis地址,key是查询的key local function read_redis(ip, port, key) -- 获取一个连接 local ok, err = red:connect(ip, port) if not ok then ngx.log(ngx.ERR, "连接redis失败 : ", err) return nil end -- 查询redis local resp, err = red:get(key) -- 查询失败处理 if not resp then ngx.log(ngx.ERR, "查询Redis失败: ", err, ", key = " , key) end --得到的数据为空处理 if resp == ngx.null then resp = nil ngx.log(ngx.ERR, "查询Redis数据为空, key = ", key) end close_redis(red) return resp end
-
引入Redis模块,并初始化Redis对象
-
修改item.lua,封装一个函数read_data,实现先查询Redis,如果未命中,再查询tomcat。查询商品和库存时都调用read_data这个函数
-- 封装函数,先查询redis,再查询http local function read_data(key, path, params) -- 查询redis local resp = read_redis("127.0.0.1", 6379, key) -- 判断redis是否命中 if not resp then -- Redis查询失败,查询http resp = read_http(path, params) end return resp end
6.Nginx本地缓存
OpenResty为Nginx提供了shard dict (分享字典)的功能,可以在nginx的多个worker之间共享数据,实现缓存功能。
-
在nginx.conf的http下添加配置,开启共享字典:
# 共享字典,也就是本地缓存,名称叫做:item_cache,大小150mb lua_shared_dict item_cache 150m;
-
操作共享字典(数据):
-- 获取本地缓存对象 local item_cache = ngx.shared.item_cache -- 存储,指定key、value、过期时间,单位s,默认为0代表永不过期 item_cache:set('key', 'value', 1000) -- 读取 local val = item_cache:get('key')
7.缓存同步问题
(1)数据同步策略
缓存数据同步的常见方式有三种:
-
设置有效期:给缓存设置有效期,到期后自动删除。再次查询时更新
- 优势:简单、方便
- 缺点:时效性差,缓存过期之前可能不一致
- 场景:更新频率较低,时效性要求低的业务
-
同步双写:在修改数据库的同时直接修改缓存
- 优势:时效性强,缓存与数据库强一致
- 缺点:有代码侵入,耦合度高
- 场景:对一致性、时效性要求较高的缓存数据
-
异步通知:修改数据库时发送事件通知,相关服务监听到通知后修改缓存数据
- 优势:低耦合,可以同时通知多个缓存服务
- 缺点:时效性一般,可能存在中间不一致状态
- 场景:时效性要求一般,有多个服务需要同步
- 基于MQ的异步通知:可以看到使用MQ还是会对业务代码有一定的侵入,至少要有 发布消息 的代码。
- 基于Canal的异步通知:代码0侵入
(2)基于Canal的异步通知
Canal是阿里巴巴旗下的一款开源项目,基于Java开发,基于数据库增量日志解析,提供增量数据订阅&消费。
Canal是基于mysql主从同步来实现的。Canal就是伪装成MySQL的一个slave节点,从而监听master的binary log变化,再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。
1)安装和配置Canal
-
开启MySQL主从同步:Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。 这里以之前Docker运行的mysql为例:
-
开启binlog:修改mysql容器挂载的日志文件:
vim /tmp/mysql/conf/my.cnf # 添加内容 # 设置binary log文件的存放地址/var/lib/mysql/和文件名叫做mysql-bin log-bin=/var/lib/mysql/mysql-bin # 指定对哪个database记录binary log events,这里记录heima这个库 binlog-do-db=heima - `binlog-do-db=heima
-
设置用户权限:这里仅提供对heima这个库的操作权限,在数据库中新建查询,执行以下命令:
create user canal@'%' IDENTIFIED by 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal'; FLUSH PRIVILEGES;
- 重启mysql容器
-
测试主从同步设置是否成功:在Navicat中的查询中输入以下命令,看到如图信息表示主从同步设置成功:
show master status;
-
开启binlog:修改mysql容器挂载的日志文件:
-
安装Canal
-
创建网络,将MySQL、Canal、MQ的容器放到同一个Docker网络中:
# 创建名为heima的网络 docker network create heima # 让mysql容器加入heima网络 docker network connect heima mysql
-
安装Canal:上传Canal镜像到虚拟机,然后加载镜像:
docker load -i canal.tar
-
运行命令创建Canal容器:
docker run -p 11111:11111 --name canal \ -e canal.destinations=heima \ -e canal.instance.master.address=mysql:3306 \ # master数据库地址,因为canal和mysql在同一网络,所以这里用容器名互联 -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.instance.connectionCharset=UTF-8 \ -e canal.instance.tsdb.enable=true \ -e canal.instance.gtidon=false \ -e canal.instance.filter.regex=heima\\..* \ --network heima \ -d canal/canal-server:v1.1.5
【命令说明】: - -p 11111:11111:这是canal的默认监听端口
- -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器地址,可以通过 docker inspect 容器id 来查看
- -e canal.instance.dbUsername=canal:数据库用户名
- -e canal.instance.dbPassword=canal :数据库密码
- -e canal.instance.filter.regex=:要监听的表名称,这里我们只监听heima数据库下的所有表。
- 表名称监听支持的语法:mysql 数据解析关注的表,Perl正则表达式。多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
- 常见例子:
- 1. 所有表:.* or .*\\..*
- 2. canal schema下所有表: canal\\..*
- 3. canal下的以canal打头的表:canal\\.canal.*
- 4. canal schema下的一张表:canal.test1
- 5. 多个规则组合使用然后以逗号隔开:canal\\..*,mysql.test1,mysql.test2
-
创建网络,将MySQL、Canal、MQ的容器放到同一个Docker网络中:
Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal客户端。
这里我们使用GitHub上的第三方开源的canal客户端canal-starter。地址:https://github.com/NormanGyllenhaal/canal-client
-
导入依赖
<!--canal--> <dependency> <groupId>top.javatool</groupId> <artifactId>canal-spring-boot-starter</artifactId> <version>1.2.1-RELEASE</version> </dependency>
-
配置canal信息
canal: destination: heima # canal实例名称,要跟canal-server运行时设置的destination一致,如下图 server: 192.168.150.101:11111 # canal地址
- 编写监听器,监听canal的消息
- 修改pojo实体类:Canal推送给canal-client的是被修改的这一行数据(row),canal-client则会自动把行数据封装到Item实体类中,封装过程中需要知道数据库与实体的映射关系,因此要在实体类中添加注释:
四、RabbitMQ高级特性
1.MQ的常见问题
- 消息可靠性
- 死信交换机——实现消息延迟投递
- 惰性队列——解决消息堆积
- MQ集群——解决高可用问题
2.★消息可靠性——生产者确认机制+消息持久化+消费者确认机制+失败重试机制
消息从生产者发送到exchange,再到queue,再到消费者,有哪些导致消息丢失的可能性?
-
publisher发送时丢失:
- 生产者发送的消息未送达exchange
- 消息到达exchange但未到达queue
- 由于MQ是内存存储,所以MQ宕机,queue将消息丢失;
- consumer接收到消息后未消费就宕机。
因此,要保证消息的可靠性,首先需要保证生产者将消息成功发送到交换机并路由到队列(生产者消息确认),其次要保证消息的持久化,然后要保证消息成功被消费者接收(消费者确认)。
(1)生产者消息确认机制——保证消息顺利发送到mq
1)生产者确认机制的原理
RabbitMQ提供了publisher confirm机制来避免消息在发送到MQ的过程中丢失。
消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种情况:
-
publisher-confirm,发送者确认
- 消息成功投递到交换机,返回ACK
- 消息未投递到交换机,返回NACK
-
publisher-return,发送者回执
- 消息投递到交换机了但是没有路由到队列,调用ReturnCallback,返回ACK及路由失败原因
-
ConfirmCallback和ReturnCallback:
ConfirmCallback是成功抵达exchange确认,ReturnCallback是是否抵达mq确认,没有抵达mq才出发ReturnCallback。
- 导入demo工程,项目结构比较简单
-
在publisher微服务的application.yml中添加配置:
spring: rabbitmq: publisher-confirm-type: correlated publisher-returns: true template: mandatory: true
【配置说明】-
publish-confirm-type:开启publisher-confirm机制,这里支持两种类型:
- simple(不推荐):同步等待confirm结果,直到超时
-
correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
- publisher-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
- template.mandatory:定义消息路由失败时的策略:true,则调用ReturnCallback;false:则直接丢弃消息
-
publish-confirm-type:开启publisher-confirm机制,这里支持两种类型:
-
每个RabbitTemplate只能配置一个ReturnCallback,因此需要在项目启动过程中配置全局ReturnCallback:
@Slf4j @Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 获取RabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 设置ReturnCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",replyCode, replyText, exchange, routingKey, message.toString()); }); } }
-
发送消息,指定消息id、消息ConfirmCallback
@Test public void testSendMessage2SimpleQueue() throws InterruptedException { // 1.准备消息 String message = "hello, spring amqp!"; // 消息ID,需要封装到CorrelationData中 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 添加callback到correctationData correlationData.getFuture().addCallback( result -> { if(result.isAck()){ // ack,消息成功 log.debug("消息发送成功, ID:{}", correlationData.getId()); }else{ // nack,消息失败 log.error("消息发送失败, ID:{}, 原因{}",correlationData.getId(), result.getReason()); } }, ex -> log.error("消息发送异常, ID:{}, 原因{}",correlationData.getId(),ex.getMessage()) ); // 2.发送消息 rabbitTemplate.convertAndSend("amq.direct", "simple", message, correlationData); }
(2)消息持久化——保证mq消息不丢失
MQ默认是内存存储消息,开启持久化功能可以确保缓存在MQ中的消息不丢失。
-
交换机持久化
@Bean public DirectExchange simpleExchange(){ // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除 return new DirectExchange("simple.direct", true, false); }
-
队列持久化
@Bean public Queue simpleQueue(){ // 使用QueueBuilder构建队列,durable就是持久化的 return QueueBuilder.durable("simple.queue").build(); }
-
消息持久化:SpringAMQP中消息默认是持久的,可以通过MessageProperties中的DeliveryMode来指定
Message msg = MessageBuilder .withBody(message.getBytes(StandardCharsets.UTF_8)) // 消息体 .setDeliveryMode(MessageDeliveryMode.PERSISTENT) // 持久化 .build();
【tips】SpringAMQP中交换机、队列和消息默认都是持久的。
(3)消费者消息确认——保证消费者成功消费后才删除mq的消息
RabbitMQ支持消费者确认机制:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。
SpringAMQP则允许配置三种消费者确认模式:
-
manual:手动ack,需要在业务代码结束后,调用api发送ack。
只要没有手动发送ack,mq中的消息就一直是unacked状态,即使消费者程序宕机了,消息也不会丢失,会都变成ready状态,当消费者重新启动后,可以再次接收这些消息。
-
★auto:自动ack,由spring监测消费者监听类listener代码(即消费者的业务逻辑)是否出现异常,没有异常则返回ack;抛出异常则返回nack。
如果消费者程序宕机,会导致mq中的消息丢失。
- none:关闭ack,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。
spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none # none,关闭ack;manual,手动ack;auto:自动ack
(4)消费失败重试机制
当消费者出现异常后,消息会不断requeue(重新入队)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力。
1)实现失败重试机制
可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。在消费者的yml文件中配置开启失败重试机制:
2)失败消息处理策略
在开启失败重试机制后,当重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,该接口包含三种不同的处理策略:
- RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认。
- ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队。
- RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机,由该交换机将失败消息转发给指定队列。推荐。
-
定义接收失败消息的交换机、队列及其绑定关系:
- 定义RepublishMessageRecoverer:
3.死信交换机DLX
(1)死信与死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或 basic.nack声明消费失败,并且消息的requeue参数设置为false(不重新入队)
- 消息是一个过期消息,超时无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
(2)存活时间TTL(Time-To-Live)
如果一个队列中的消息TTL结束仍未消费,则会变为死信,ttl超时分为两种情况:
- 消息所在的队列设置了存活时间
- 消息本身设置了存活时间
【案例】基于队列TTL实现发送一个延迟消息,让10秒后消费者才收到该消息。
- 声明一组死信交换机dl.direct及与之绑定的死信队列dl.queue:
【tips】消费者监听的不是目标队列,而是死信队列!
-
给目标队列ttl.queue设置超时时间,即在声明该队列时配置x-message-ttl属性,并将目标队列与死信交换机绑定:
- 向目标队列发送消息(也可以在发送消息时给消息本身设置TTL,如下)
(3)延迟队列
利用TTL结合死信交换机,我们实现了消息发出后,消费者延迟收到消息的效果。这种消息模式就称为延迟队列(Delay Queue)模式。
RabbitMQ官方推出了一个DelayExchange插件,可以原生支持延迟队列效果,避免了手动声明、绑定死信交换机和队列的繁琐步骤。
【tips】虽然是延迟队列(DelayQueue),但是由于该插件是基于交换机实现的,所以叫DelayExchange插件。
1)安装和使用插件
- 下载插件:RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html 其中包含各种各样的插件。
-
查看RabbitMQ插件目录对应的数据卷:之前设定的RabbitMQ的数据卷名称为mq-plugins,使用以下命令查看数据卷:
docker volume inspect mq-plugins
- 上传插件至以上目录
-
进入mq容器内部:
docker exec -it mq bash
-
进入容器内部后,执行下面命令开启插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
插件启动完成:
- 退出mq容器
-
插件的使用:DelayExchange插件的原理是对官方原生交换机做了改进,一是将DelayExchange接收到的消息暂存在内存中(原生交换机无法存储消息);二是在DelayExchange中计时,超时后再投递消息。
- 在RabbitMQ管理平台添加一个DelayExchange并指定路由方式:
-
- 消息的延迟时间需要在发送消息时指定:
-
2)SpringAMQP使用延迟队列插件
DelayExchange的本质就是官方的三种交换机,只是添加了延迟功能。因此使用时只需要声明一个交换机,交换机的类型可以是任意类型,然后设定delayed属性为true即可。然后向这个delay为true的交换机中发送消息,要给消息添加一个header:x-delay,值为延迟的时间,单位为毫秒。
- 基于注解的方式声明交换机:
- 基于Bean的方式声明交换机:
- 发送消息
4.惰性队列
(1)消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积有三种种思路:
- 增加更多消费者,提高消费速度
- 在消费者内开启线程池加快单个消费者的消息处理速度
- 扩大队列容积,提高堆积上限——惰性队列
(2)惰性队列Lazy Queues
1)惰性队列的特征及优缺
特征:
- 接收到消息后直接存入磁盘而非内存
- 消费者要消费消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储
-
优点:
- 基于磁盘存储,消息上限高
- 没有间歇性的page-out,性能比较稳定
-
缺点:
- 基于磁盘存储,消息时效性会降低
- 性能受限于磁盘的IO
-
将一个运行中的队列修改为惰性队列:
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
-
用SpringAMQP声明惰性队列分两种方式:要设置一个队列为惰性队列,只需要在声明队列时,指定x-queue-mode属性为lazy即可。
- 基于注解:
-
- 基于Bean:
5.MQ集群
(1)集群分类
RabbitMQ的集群有两种模式:
- 普通集群:是一种分布式集群,将队列分散到集群的各个节点,从而提高整个集群的并发能力。但一旦某个节点出现问题,该节点上的队列也就不能用了,所以存在一定的数据可用问题。
- 镜像集群:是一种主从集群,在普通集群的基础上,添加了主从备份功能,提高集群的数据可用性。
(2)普通集群
1)特征
普通集群,或者叫标准集群(classic cluster),具备下列特征:
- 会在集群的各个节点间共享部分数据,包括:交换机、队列元信息(队列的描述信息,如队列的名字、位于哪个节点等),但不包含队列中的消息
- 当访问集群某节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回
- 若队列所在节点宕机,队列中的消息就会丢失
-
计划部署3节点的mq集群,节点信息如下表:
主机名 控制台端口 通信端口 mq1 8081 ---> 15672 8071 ---> 5672
mq2 8082 ---> 15672
8072 ---> 5672
mq3 8083 ---> 15672
8073---> 5672
-
获取cookie:集群模式中的每个MQ节点使用 cookie 来确定彼此是否被允许相互通信。 要使两个节点能够通信,二者必须具有相同的共享秘密,称为Erlang cookie,因此每个集群节点必须具有相同的cookie。实例之间也需要使用cookie来相互通信。可以先在之前启动的mq容器中获取一个cookie值,作为集群的cookie。执行下面的命令:
docker exec -it mq cat /var/lib/rabbitmq/.erlang.cookie
可以看到cookie值如下:
- 获取到cookie后停止并删除现有的mq,准备搭建mq集群
-
准备集群配置:在/tmp目录下新建一个配置文件 rabbitmq.conf:
loopback_users.guest = false listeners.tcp.default = 5672 # 通信端口 cluster_formation.peer_discovery_backend = rabbit_peer_discovery_classic_config # 集群中的信息 cluster_formation.classic_config.nodes.1 = rabbit@mq1 cluster_formation.classic_config.nodes.2 = rabbit@mq2 cluster_formation.classic_config.nodes.3 = rabbit@mq3
再创建一个文件.erlang.cookie记录cookie:cd /tmp # 创建cookie文件 touch .erlang.cookie # 写入cookie echo "ZEAYNIMRHOIEYVPQEBKL" > .erlang.cookie # 修改cookie文件的权限为只读 chmod 600 .erlang.cookie
-
准备集群:创建三个目录,分别将 rabbitmq.conf和.erlang.cookie拷贝到目录下:
cd /tmp # 创建目录 mkdir mq1 mq2 mq3 # 拷贝 cp rabbitmq.conf mq1 cp rabbitmq.conf mq2 cp rabbitmq.conf mq3 cp .erlang.cookie mq1 cp .erlang.cookie mq2 cp .erlang.cookie mq3
-
创建网络mq-net:
docker network create mq-net
-
启动集群:分别运行mq1/2/3:
docker run -d --net mq-net \ -v ${PWD}/mq1/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \ -v ${PWD}/.erlang.cookie:/var/lib/rabbitmq/.erlang.cookie \ -e RABBITMQ_DEFAULT_USER=itcast \ -e RABBITMQ_DEFAULT_PASS=123321 \ --name mq1 \ --hostname mq1 \ -p 8071:5672 \ -p 8081:15672 \ rabbitmq:3.8-management
【tips】运行mq2、mq3记得改名称和端口映射。 - 测试
(3)镜像集群
1)特征
镜像集群:本质是主从模式,具备下面的特征:
- 交换机、队列、队列中的消息都会在各个mq的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其它节点叫做该队列的镜像节点。一个队列的主节点可能是另一个队列的镜像节点。
- 所有操作都是主节点完成,然后同步给镜像节点。
- 主节点宕机后,镜像节点会替代成新的主节点。
ha-mode |
ha-params |
效果 |
准确模式exactly |
队列的副本量count |
集群中队列副本(主服务器和镜像服务器之和)的数量。
换句话说:count = 镜像数量 + 1。如果群集中的节点数少于count,则该队列将镜像到所有节点。如果有集群总数大于count+1,并且包含镜像的节点出现故障,则将在另一个节点上创建一个新的镜像。
|
all |
(none) |
队列在群集中的所有节点之间进行镜像。队列将镜像到任何新加入的节点。镜像到所有节点将对所有群集节点施加额外的压力,包括网络I / O,磁盘I / O和磁盘空间使用情况。
推荐使用exactly,设置副本数为(N / 2 +1)。
|
nodes |
node names |
指定队列创建到哪些节点,如果指定的节点全部不存在,则会出现异常。如果指定的节点在集群中存在,但是暂时不可用,会创建节点到当前客户端连接到的节点。 |
这里以rabbitmqctl命令为案例来讲解配置语法。需要注意的是,运行rabbitmqctl命令需要进入某个mq容器去运行。
-
exactly模式
rabbitmqctl set_policy ha-two "^two\." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
- rabbitmqctl set_policy:固定写法
- ha-two:策略名称,自定义
- "^two\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以two.开头的队列名称
- '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}': 策略内容
- "ha-mode":"exactly":策略模式,此处是exactly模式,指定副本数量
- "ha-params":2:策略参数,这里是2,即副本数量count为2,1主1镜像
- "ha-sync-mode":"automatic":同步策略,默认是manual,即新加入的镜像节点不会同步旧的消息。如果设置为automatic,则新加入的镜像节点会把主节点中所有消息都同步,会带来额外的网络开销
-
all模式
rabbitmqctl set_policy ha-all "^all\." '{"ha-mode":"all"}'
- ha-all:策略名称,自定义
- "^all\.":匹配所有以all.开头的队列名
- '{"ha-mode":"all"}':策略内容
- "ha-mode":"all":策略模式,此处是all模式,即所有节点都会称为镜像节点
-
nodes模式:
rabbitmqctl set_policy ha-nodes "^nodes\." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'
- ha-nodes:策略名称,自定义
- "^nodes\.":匹配队列的正则表达式,符合命名规则的队列才生效,这里是任何以nodes.开头的队列名称
- '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}': 策略内容
- "ha-mode":"nodes":策略模式,此处是nodes模式
- "ha-params":["rabbit@mq1", "rabbit@mq2"]:策略参数,这里指定副本所在节点名称
(4)仲裁队列
1)特征
仲裁队列是RabbitMQ3.8版本以后才有的新功能,用来替代镜像集群,具备下列特征:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 使用非常简单,没有复杂的配置
- 主从同步基于Raft协议,强一致
2)搭建仲裁队列集群
- 在集群中的任意控制台添加一个队列,将队列类型选为Quorum类型。
- 在任意控制台查看队列:可以看到,仲裁队列的 + 2字样,代表这个队列有2个镜像节点。 因为仲裁队列默认的镜像数为5,所以如果集群有7个节点,那么镜像数肯定是5;而我们集群只有3个节点,因此镜像数量就是3。
3)SpringAMQP搭建仲裁队列集群
- 使用AMQP连接集群需要改一下配置文件:之前连接单节点时,只配置了host和port;连接多节点集群的话就不能这么配了,需要配置addresses:
- 声明仲裁队列