分布式事务
1、什么是分布式事务,分布式事务与本地事务的区别?
本地事务,就是只对本项目模块事务进行管理的,对于其他的数据库事务不做任何的改变,在项目编写中,经常使用@Transactional(rollBackFor=Exception.class)实现当前方法如果出现任何的异常就会使用数据库的redo.log日志实现事务的回滚。
分布式事务,它是在全局的角度看待问题,在多个模块之间发挥自己的作用,如果其中的一个模块出现了问题,那么就会将涉及到的多个模块的事务都会进行回滚,他是跨数据库的,跨项目模块的。
2、分布式事务的解决方案?
分布式事务的解决方案可以使用Seata实现。
什么是Seata?
Seata是一个开源的分布式事务的解决方案,它主要提供一个高性能和简单实用的分布式事务服务。
Seata为用户提供了四种模式:AT、TCC、SAGA、XA模式。
后面。将主要使用XA模式实现分布式事务问题。
3、实战案例一(微服务系统,不使用分布式事务)
CREATE DATABASE BANK1; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;第二、创建数据库bank2
CREATE DATABASE BANK2; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;第三、分别向数据库bank1和bank2插入数据
BANK1: INSERT INTO ACCOUNT_INFO VALUES(2,"张三",1,123456,1000); BANK2: INSERT INTO ACCOUNT_INFO VALUES(3,"李四",2,123456,0);第四、搭建项目环境(BANK1)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hui</groupId> <artifactId>bank1</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql-connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.0</version> </dependency> <!-- nacos --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2021.0.1.0</version> </dependency> <!-- open-feign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> <version>3.1.0</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> <!-- loadbanancer --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> <version>3.1.2</version> </dependency> <!-- 逆向工程数据库 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>2.0</version> </dependency> </dependencies> </project>Controller
/** * @Description: 用户转账操作 * @Author: huidou 惠豆 * @CreateTime: 2022/6/14 23:19 */ @Controller @RequestMapping("/bank1") public class AccountInfoController { @Autowired private IAccountInfoService iAccountInfoService; /** * 向李四赚钱 * @param accountNo * @param amount * @return */ @ResponseBody @RequestMapping("/transfer") public String account(String accountNo, Double amount) { Integer accountBalance = iAccountInfoService.updateAccountBalance(accountNo, amount); if (accountBalance >= 1) { return "转账成功,向李四转账成功"; } else { return "转账失败,没能向李四转账成功"; } } }service
public interface IAccountInfoService extends IService<AccountInfo> { /** * 向李四转账 * @param accountNo 银行卡号 * @param amount 金额 */ Integer updateAccountBalance(String accountNo, Double amount); } @Slf4j @Service public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService { @Autowired private AccountInfoMapper accountInfoMapper; // 远程服务接口 @Autowired private Bank2Client bank2Client; /** * 向李四转账 * 从异常捕获来分析,bank1和bank2都设置了本地事务,这是后就会出现两种情况: * 1、bank1在调用bank2接口之后出现了异常。 * 2、bank1正常调用bank2,但是bank2出现了异常。 * * 结果分别是: * 第一、bank1出现了异常会根据本地事务的特点,遇见异常就进行回滚,但是bank2是正常的,就会出现bank1没有扣钱,但是bank2已经成功的加钱了。 * 第二、bank1没有出现问题,但是bank2出现了问题,由于bank1调用bank2,benk2出现了Exception就会抛给bank1,所以最后bank1和bank2都会根据本地事务进行回滚。 * @param accountNo 银行卡号 * @param amount 金额 */ @Override @Transactional(rollbackFor = Exception.class) public Integer updateAccountBalance(String accountNo, Double amount) { // 1、首先查询自己的账户信息 AccountInfo accountInfo = accountInfoMapper.selectById(2); if (accountInfo == null) { return 0; } // 2、判断自己的余额信息 if (accountInfo.getAccountBalance() < amount) { return 0; } // 3、修改自己的余额信息 accountInfo.setAccountBalance(accountInfo.getAccountBalance() - amount); int update = accountInfoMapper.updateById(accountInfo); // 4、测试以下远程调用的异常会不会引起当前方法的异常 try { bank2Client.transfer(accountNo,amount); } catch (Exception e) { System.out.println("bank2出现错误了"); System.out.println(e.getMessage()); } return 1; } }mapper
@Repository public interface AccountInfoMapper extends BaseMapper<AccountInfo> { }pojo
@Data @TableName("account_info") public class AccountInfo implements Serializable { private static final long serialVersionUID = 1L; @TableId(value = "id", type = IdType.AUTO) private Long id; private String accountName; private String accountNo; private String accountPassword; private Double accountBalance; }openfeign
/** * @Description: openfeign 远程调用服务接口 * @Author: huidou 惠豆 * @CreateTime: 2022/6/14 23:49 */ @Component @FeignClient(value = "seata-bank2") public interface Bank2Client { // 调用bank2的服务接口,实现向李四转账 @RequestMapping("/bank2/transfer") String transfer(@RequestParam("accountNo") String accountNo,@RequestParam("amount") Double amount); }搭建项目环境(BANK2)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hui</groupId> <artifactId>bank2</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql-connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.0</version> </dependency> <!-- nacos --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2021.0.1.0</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> <!-- loadbanaer --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> <version>3.1.2</version> </dependency> <!-- 逆向工程数据库 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>2.0</version> </dependency> </dependencies> </project>controller
/** * @Description: 用户转账操作 * @Author: huidou 惠豆 * @CreateTime: 2022/6/14 23:19 */ @Controller @RequestMapping("/bank2") public class AccountInfoController { @Autowired private IAccountInfoService iAccountInfoService; /** * 接收来自张三的钱 * @param accountNo * @param amount * @return */ @ResponseBody @RequestMapping("/transfer") public String account(String accountNo, Double amount) { iAccountInfoService.updateAccountBalance(accountNo, amount); return "转账成功,接收来自张三的钱"; } }service
public interface IAccountInfoService extends IService<AccountInfo> { /** * 李四账户增加金额 * @param accountNo 银行卡号 * @param amount 金额 */ void updateAccountBalance(String accountNo, Double amount); }
@Slf4j @Service public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService { @Autowired private AccountInfoMapper accountInfoMapper; /** * 李四增加金额 * @param accountNo 银行卡号 * @param amount 金额 */ @Override @Transactional(rollbackFor = Exception.class) public void updateAccountBalance(String accountNo, Double amount) { try { System.out.println(10/0); // 1、根据账户ID,查询对应的账号信息(目前只有一人,就给死ID) QueryWrapper<AccountInfo> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("account_no",accountNo); AccountInfo accountInfo = accountInfoMapper.selectOne(queryWrapper); // 2、判断该账户是不是为Null if (accountInfo == null) { return; } // 3、给该账户添加钱(来自张三的) accountInfo.setAccountBalance(accountInfo.getAccountBalance() + amount); // 4、更新数据 int updateById = accountInfoMapper.updateById(accountInfo); } catch (Exception e) { e.printStackTrace(); throw new RuntimeException("出现了10/0的异常"); } } }pojo
@Data @TableName("account_info") public class AccountInfo implements Serializable { private static final long serialVersionUID = 1L; @TableId(value = "id", type = IdType.AUTO) private Long id; private String accountName; private String accountNo; private String accountPassword; private Double accountBalance; }项目的配置文件
server: port: 6001 spring: application: name: seata-bank1 cloud: nacos: discovery: server-addr: 192.168.205.150:8848 datasource: username: root password: root url: jdbc:mysql://localhost:3306/bank1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC driver-class-name: com.mysql.cj.jdbc.Driver server: port: 6002 spring: application: name: seata-bank2 cloud: nacos: discovery: server-addr: 192.168.205.150:8848 datasource: username: root password: root url: jdbc:mysql://localhost:3306/bank2?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC driver-class-name: com.mysql.cj.jdbc.DriverNacos服务列表
-
项目测试
1、两个项目中都不加@Transactional(rollbackFor = Exception.class),逻辑正常调用。
2、两个项目中都加上@Transactional(rollbackFor = Exception.class),逻辑正常调用。
3、两个项目中都加上@Transactional(rollbackFor = Exception.class),bank1在调用bank2接口之后设置异常查看是否出现数据不一致(bank2逻辑正常)。
4、两个项目中都加上@Transactional(rollbackFor = Exception.class),bank1在调用bank2接口之后(bank1逻辑正常),但是bank2逻辑设置错误。
结果:
1、对于第一种情况,逻辑是正确的,数据符合一致性。
2、对于第二种情况,逻辑是正确的,数据符合一致性。
3、对于第三种情况,由于bank1在调用完bank2之后出现了异常,bank2的执行逻辑正确,再加上都有
@Transactional(rollbackFor = Exception.class)注解,所以导致bank1数据回滚,但是bank2数据还是修改了,因为这个注解是本地注解,不能管到其他的事务模块中。
4、对于第四种情况容易出现误解,由于bank1的执行逻辑是正确的,但是bank2的逻辑是错误的,是会报出异常的,加上bank1调用了bank2,这时bank1就会接收到bank2抛出的异常,由于两个接口都有@Transactional(rollbackFor = Exception.class)注解,所以最后都会进行回滚,这个时候,不清楚异常接收的原理的时候,还以为两个本地事务注解就能实现分布式事务,其实不是的,要真是,那还要分布式事务的理由在那里,第三种情况出现了又该如何处理?
4、实战案例二(微服务系统,使用分布式事务XA模式)
第一、添加Seata依赖<!-- seata --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <version>2021.0.1.0</version> </dependency>第二、数据库添加Undo_log
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;第三、BANK1(在业务层调用BANK2接口之后设置异常)
service
@Slf4j @Service public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService { @Autowired private AccountInfoMapper accountInfoMapper; // 远程服务接口 @Autowired private Bank2Client bank2Client; /** * 向李四转账 * 从异常捕获来分析,bank1和bank2都设置了本地事务,这是后就会出现两种情况: * 1、bank1在调用bank2接口之后出现了异常。 * 2、bank1正常调用bank2,但是bank2出现了异常。 * * 结果分别是: * 第一、bank1出现了异常会根据本地事务的特点,遇见异常就进行回滚,但是bank2是正常的,就会出现bank1没有扣钱,但是bank2已经成功的加钱了。 * 第二、bank1没有出现问题,但是bank2出现了问题,由于bank1调用bank2,benk2出现了Exception就会抛给bank1,所以最后bank1和bank2都会根据本地事务进行回滚。 * @param accountNo 银行卡号 * @param amount 金额 */ @GlobalTransactional // 这个注解只需要在其中的一个模块方法上添加即可,因为是全局,所以一个就够了 @Override @Transactional(rollbackFor = Exception.class) public Integer updateAccountBalance(String accountNo, Double amount) { // 1、首先查询自己的账户信息 AccountInfo accountInfo = accountInfoMapper.selectById(2); if (accountInfo == null) { return 0; } // 2、判断自己的余额信息 if (accountInfo.getAccountBalance() < amount) { return 0; } // 3、修改自己的余额信息 accountInfo.setAccountBalance(accountInfo.getAccountBalance() - amount); int update = accountInfoMapper.updateById(accountInfo); // 4、测试以下远程调用的异常会不会引起当前方法的异常 bank2Client.transfer(accountNo,amount); /** * 在这里设置错误,检测是否出现不一致性 * 错误情况就是:bank1数据回滚,不改变,但是bank2数据改变 * 正常情况就是:出现异常,大家都不改变。 */ System.out.println(10/0); return 1; } }第四、BANK2(业务层逻辑正确)
@Slf4j @Service public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService { @Autowired private AccountInfoMapper accountInfoMapper; /** * 李四增加金额 * @param accountNo 银行卡号 * @param amount 金额 */ @Override @Transactional(rollbackFor = Exception.class) public void updateAccountBalance(String accountNo, Double amount) { // 1、根据账户ID,查询对应的账号信息(目前只有一人,就给死ID) QueryWrapper<AccountInfo> queryWrapper = new QueryWrapper<>(); queryWrapper.eq("account_no",accountNo); AccountInfo accountInfo = accountInfoMapper.selectOne(queryWrapper); // 2、判断该账户是不是为Null if (accountInfo == null) { return; } // 3、给该账户添加钱(来自张三的) accountInfo.setAccountBalance(accountInfo.getAccountBalance() + amount); // 4、更新数据 int updateById = accountInfoMapper.updateById(accountInfo); } }
结果:
数据库初始数据图:
数据库执行之后的结果还是一样的,所以说明分布式事务Seata生效了。
5、使用分布式事务解决的原理
分布式事务的核心思想其实就是一个投票的过程,对于一个决定,如果没有一个人有问题,那么就会执行,反之,只要有一个人有问题,就会进行回退取消。
从上面的图可以看出,当用户服务的事务管理器TM会向全局事务控制申请开启分布式事务,这时会产生一个全局唯一的XID,然后,用户服务会向全局事务进行注册自己的本地事务,注册完之后,会进行执行自己的逻辑,之后在条用积分服务,积分服务也是一样的会进行注册本地事务,由于两个服务都有自己的本地服务,倘若都成功了,就都会向TC提交一个投票也就是Commit,这时TC就会向两个本地事务发送Commit的命令;但是若有一个出现了问题那么@Transactional(rollbackFor = Exception.class)注解就会进行回滚,由于TC管辖了该本地事务,就会知道该事务想自己提交了rollback的请求,这时TC就会向所有的本地事务发送rollback的命令。
6、基于Atomikos实现XA强一致性分布式事务实战
是哦也能够Atomikos实现XA模式的强一致性,模拟实现跨库的转账实现,在一个项目模块中,使用多数据源(两个数据库)实现不同数据库的数据改变。
原理图如下:
使用Atomikos框架技术的好处在于:项目不需要直接与数据库进行交互进行数据的改变,而是让Atomikos技术对数据库进行封装,由于这个Atomikos技术内部实现了分布式事务的XA模式,所以对于开发人员来说,我不需要像之前那样不但要添加依赖,添加注解,还要与本地事务做出关联,现在就不需要了,只需要简简单单的添加依赖与注解即可,注解也是常用的@Transactional即可,只需要关注业务逻辑实现。
开始一个项目实战案例:
项目需求:在一个项目中,配置多个数据源,用于实现转账功能,一个数据库扣减金额;一个数据库添加金额
第一、创建数据库CREATE DATABASE TX-XA-01 CREATE TABLE `user_account` ( `account_no` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '账户编号', `account_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '账户\r\n名称', `account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额', PRIMARY KEY (`account_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
CREATE DATABASE TX-XA-02 CREATE TABLE `user_account` ( `account_no` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '账户编号', `account_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '账户\r\n名称', `account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额', PRIMARY KEY (`account_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;第二、向数据库插入数据
CREATE DATABASE TX-XA-01 INSERT INTO USER_ACCOUNT VALUES("1001","张三",10000); CREATE DATABASE TX-XA-02 INSERT INTO USER_ACCOUNT VALUES("1002","李四",10000);第三、创建项目,配置项目环境
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hui</groupId> <artifactId>atomikos-xa</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> </parent> <dependencies> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- durid数据源 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.22</version> </dependency> <!-- mysql-connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.0</version> </dependency> <!-- atomikos --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 逆向工程数据库 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>2.0</version> </dependency> </dependencies> </project>第四、多数据源配置
/** * @Description: 第一个数据库信息的绑定 * @Author: huidou 惠豆 * @CreateTime: 2022/6/15 17:09 */ @Data @ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master") public class DBConfig1 { private String url; private String username; private String password; private String driverClassName; }DBConfig2
/** * @Description: 第二个数据库信息的绑定 * @Author: huidou 惠豆 * @CreateTime: 2022/6/15 17:09 */ @Data @ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave") public class DBConfig2 { private String url; private String username; private String password; private String driverClassName; }MybatisConfig1、MybatisConfig2
/** * @Description: 配置第一个数据源,将其与Atomikos进行整合 * @Author: huidou 惠豆 * @CreateTime: 2022/6/15 17:09 */ @Configuration @MapperScan(basePackages = "com.hui.mapper1",sqlSessionTemplateRef = "masterSqlSessionTemplate") public class MybatisConfig1 { /** * 创建一个DataSource数据源 * @param dbConfig1 * @return */ @Bean(name = "masterDataSource") public DataSource getDataSource(DBConfig1 dbConfig1) { // 1. 将本地事务注册到 Atomikos全局事务中 AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean(); // 2. 设置rm供应商 sourceBean.setUniqueResourceName("masterDataSource"); sourceBean.setXaDataSourceClassName(dbConfig1.getDriverClassName()); // 3. 设置testquery sourceBean.setTestQuery("select 1"); // 4. 设置超时时间 sourceBean.setBorrowConnectionTimeout(3); // 设置Mysql链接 MysqlXADataSource dataSource = new MysqlXADataSource(); dataSource.setUrl(dbConfig1.getUrl()); dataSource.setUser(dbConfig1.getUsername()); dataSource.setPassword(dbConfig1.getPassword()); sourceBean.setXaDataSource(dataSource); return sourceBean; } /** * SqlSessionFactory是mybaits 重要的对象 * @return */ @Bean(name = "masterSqlSessionFactory") public SqlSessionFactory getSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception { MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource); return sessionFactoryBean.getObject(); } /** * 负责管理mybatis 的sqlsession sql * SqlSessionTemplate 替换默认的mybaits 实现的defalutsqlsession不能参与spring事务不能注入 线程不安全 * @return */ @Bean(name = "masterSqlSessionTemplate") public SqlSessionTemplate getSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ return new SqlSessionTemplate(sqlSessionFactory); } }
/** * @Description: 配置第二个数据源,将其与Atomikos进行整合 * @Author: huidou 惠豆 * @CreateTime: 2022/6/15 17:09 */ @Configuration @MapperScan(basePackages = "com.hui.mapper2",sqlSessionTemplateRef = "slaveSqlSessionTemplate") public class MybatisConfig2 { /** * 创建一个DataSource数据源 * @param dbConfig2 * @return */ @Bean(name = "slaveDataSource") public DataSource getDataSource(DBConfig2 dbConfig2) { // 1. 将本地事务注册到 Atomikos全局事务中 AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean(); // 2. 设置rm供应商 sourceBean.setUniqueResourceName("slaveDataSource"); sourceBean.setXaDataSourceClassName(dbConfig2.getDriverClassName()); // 3. 设置testquery sourceBean.setTestQuery("select 1"); // 4. 设置超时时间 sourceBean.setBorrowConnectionTimeout(3); // 设置Mysql链接 MysqlXADataSource dataSource = new MysqlXADataSource(); dataSource.setUrl(dbConfig2.getUrl()); dataSource.setUser(dbConfig2.getUsername()); dataSource.setPassword(dbConfig2.getPassword()); sourceBean.setXaDataSource(dataSource); return sourceBean; } /** * SqlSessionFactory是mybaits 重要的对象 * @return */ @Bean(name = "slaveSqlSessionFactory") public SqlSessionFactory getSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception { MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean(); sessionFactoryBean.setDataSource(dataSource); return sessionFactoryBean.getObject(); } /** * 负责管理mybatis 的sqlsession sql * SqlSessionTemplate 替换默认的mybaits 实现的defalutsqlsession不能参与spring事务不能注入 线程不安全 * @return */ @Bean(name = "slaveSqlSessionTemplate") public SqlSessionTemplate getSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){ return new SqlSessionTemplate(sqlSessionFactory); } }逻辑层实现
/** * 实现转账操作 */ @RestController @RequestMapping("/userAccount") public class UserAccountController { @Autowired private IUserAccountService iUserAccountService; /** * 实现转账操作 * @param sourceAccountNo 源账户 * @param targetAccountNo 目标账户 * @param amount 转账金额 * @return */ @RequestMapping("/transfer") public String transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) { Integer transfer = iUserAccountService.transfer(sourceAccountNo, targetAccountNo, amount); if (transfer == 1) { return "转账成功!"; } return "转账失败!"; } }service
@Service public class UserAccountServiceImpl implements IUserAccountService { // 原账户所在数据库 @Autowired private UserAccountMapper1 userAccountMapper1; // 目标账户所在数据库 @Autowired private UserAccountMapper2 userAccountMapper2; /** * 实现转账操作 * @param sourceAccountNo 源账户 * @param targetAccountNo 目标账户 * @param amount 转账金额 * @return */ @Override public Integer transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) { // 1、查询原账户 UserAccount sourceAccount = userAccountMapper1.selectById(sourceAccountNo); // 2、查询目标账户 UserAccount targetAccount = userAccountMapper2.selectById(targetAccountNo); // 3、判断原账户与目标账户是否存在 if (sourceAccount != null && targetAccount != null) { // 4、判断原账户余额是否不足 if (sourceAccount.getAccountBalance().compareTo(amount) < 0) { throw new RuntimeException("原账户金额不足,无法实现转账操作"); } // 5、原账户扣减余额,并更新 sourceAccount.setAccountBalance(sourceAccount.getAccountBalance().subtract(amount)); int updateSourceAccount = userAccountMapper1.updateById(sourceAccount); // 6、目标账户增加余额,并更新 targetAccount.setAccountBalance(targetAccount.getAccountBalance().add(amount)); int updateTargetAccount = userAccountMapper2.updateById(targetAccount); if (updateSourceAccount == 1 && updateTargetAccount == 1) { return 1; } } return 0; } }mapper
@Repository public interface UserAccountMapper1 extends BaseMapper<UserAccount> {} @Repository public interface UserAccountMapper2 extends BaseMapper<UserAccount> {}application.yml
server: port: 6003 spring: autoconfigure: #停用druid连接池的自动配置 exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure datasource: #选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理 type: com.alibaba.druid.pool.xa.DruidXADataSource #以下是自定义字段 dynamic: primary: master datasource: master: url: jdbc:mysql://localhost:3306/tx-xa-01?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver slave: url: jdbc:mysql://localhost:3306/tx-xa-02?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true username: root password: root driver-class-name: com.mysql.cj.jdbc.Driver validation-query: SELCET 1 logging: pattern: console: logging.pattern.console=%d{MM/dd HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n业务测试
发送请求:
http://localhost:6003/userAccount/transfer?sourceAccountNo=1001&targetAccountNo=1002&amount=1000
正常情况下,可以实现跨数据库对数据进行修改,但是如果我恶意做一个操作,代码如下:
/** * 实现转账操作 * @param sourceAccountNo 源账户 * @param targetAccountNo 目标账户 * @param amount 转账金额 * @return */ @Override public Integer transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) { // 1、查询原账户 UserAccount sourceAccount = userAccountMapper1.selectById(sourceAccountNo); // 2、查询目标账户 UserAccount targetAccount = userAccountMapper2.selectById(targetAccountNo); // 3、判断原账户与目标账户是否存在 if (sourceAccount != null && targetAccount != null) { // 4、判断原账户余额是否不足 if (sourceAccount.getAccountBalance().compareTo(amount) < 0) { throw new RuntimeException("原账户金额不足,无法实现转账操作"); } // 5、原账户扣减余额,并更新 sourceAccount.setAccountBalance(sourceAccount.getAccountBalance().subtract(amount)); int updateSourceAccount = userAccountMapper1.updateById(sourceAccount); System.out.println(10/0); // 6、目标账户增加余额,并更新 targetAccount.setAccountBalance(targetAccount.getAccountBalance().add(amount)); int updateTargetAccount = userAccountMapper2.updateById(targetAccount); if (updateSourceAccount == 1 && updateTargetAccount == 1) { return 1; } } return 0; }
从代码上可以看出来,我们在对数据库1做完操作之后进行的一个异常,这时就会造成报错,第一个数据库扣减成功,第二个数据库因异常无法执行,则不能添加,形成数据不一致。
如何解决?
使用Atomikos实现分布式事务数据XA模式的强一致性。
代码上如何修改:很简单,由于我们之前把多数据源已经都与Atomikos框架整合在一起了,所以这个时候,只需要在方法上添加一个注解@Transactional,该注解是import org.springframework.transaction.annotation.Transactional;下的,即可,该注解就包含了Atomikos对事务的处理,然后在主启动类上添加一个注解@EnableTransactionManager,即可实现。
结果:
发送请求:
http://localhost:6003/userAccount/transfer?sourceAccountNo=1001&targetAccountNo=1002&amount=1000
实现数据一致性。