分布式事务

1、什么是分布式事务,分布式事务与本地事务的区别?

本地事务,就是只对本项目模块事务进行管理的,对于其他的数据库事务不做任何的改变,在项目编写中,经常使用@Transactional(rollBackFor=Exception.class)实现当前方法如果出现任何的异常就会使用数据库的redo.log日志实现事务的回滚。

分布式事务,它是在全局的角度看待问题,在多个模块之间发挥自己的作用,如果其中的一个模块出现了问题,那么就会将涉及到的多个模块的事务都会进行回滚,他是跨数据库的,跨项目模块的。

2、分布式事务的解决方案?

分布式事务的解决方案可以使用Seata实现。

什么是Seata?

Seata是一个开源的分布式事务的解决方案,它主要提供一个高性能和简单实用的分布式事务服务。

Seata为用户提供了四种模式:AT、TCC、SAGA、XA模式。

后面。将主要使用XA模式实现分布式事务问题。

3、实战案例一(微服务系统,不使用分布式事务)

第一、创建数据库bank1
CREATE DATABASE BANK1;

CREATE TABLE `account_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名',
  `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号',
  `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码',
  `account_balance` double DEFAULT NULL COMMENT '帐户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
第二、创建数据库bank2
CREATE DATABASE BANK2;

CREATE TABLE `account_info` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名',
  `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号',
  `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码',
  `account_balance` double DEFAULT NULL COMMENT '帐户余额',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
第三、分别向数据库bank1和bank2插入数据
BANK1:

INSERT INTO ACCOUNT_INFO VALUES(2,"张三",1,123456,1000);

BANK2:

INSERT INTO ACCOUNT_INFO VALUES(3,"李四",2,123456,0);
第四、搭建项目环境(BANK1)
pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hui</groupId>
    <artifactId>bank1</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--  web  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--  mysql-connector  -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
        <!--  mybatis-plus  -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.0</version>
        </dependency>
        <!--  nacos  -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2021.0.1.0</version>
        </dependency>
        <!--  open-feign  -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-openfeign</artifactId>
            <version>3.1.0</version>
        </dependency>
        <!--  lombok  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!--  loadbanancer  -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!--  逆向工程数据库  -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>3.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-engine-core</artifactId>
            <version>2.0</version>
        </dependency>
    </dependencies>
</project>
Controller
/**
 * @Description: 用户转账操作
 * @Author: huidou 惠豆
 * @CreateTime: 2022/6/14 23:19
 */
@Controller
@RequestMapping("/bank1")
public class AccountInfoController {
    @Autowired
    private IAccountInfoService iAccountInfoService;
    /**
     * 向李四赚钱
     * @param accountNo
     * @param amount
     * @return
     */
    @ResponseBody
    @RequestMapping("/transfer")
    public String account(String accountNo, Double amount) {
        Integer accountBalance = iAccountInfoService.updateAccountBalance(accountNo, amount);
        if (accountBalance >= 1) {
            return "转账成功,向李四转账成功";
        } else {
            return "转账失败,没能向李四转账成功";
        }
    } 
}
service
public interface IAccountInfoService extends IService<AccountInfo> {
    /**
     * 向李四转账
     * @param accountNo 银行卡号
     * @param amount 金额
     */
    Integer updateAccountBalance(String accountNo, Double amount);
}

@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
    @Autowired
    private AccountInfoMapper accountInfoMapper;
    // 远程服务接口
    @Autowired
    private Bank2Client bank2Client;
    /**
     * 向李四转账
     * 从异常捕获来分析,bank1和bank2都设置了本地事务,这是后就会出现两种情况:
     * 1、bank1在调用bank2接口之后出现了异常。
     * 2、bank1正常调用bank2,但是bank2出现了异常。
     *
     * 结果分别是:
     * 第一、bank1出现了异常会根据本地事务的特点,遇见异常就进行回滚,但是bank2是正常的,就会出现bank1没有扣钱,但是bank2已经成功的加钱了。
     * 第二、bank1没有出现问题,但是bank2出现了问题,由于bank1调用bank2,benk2出现了Exception就会抛给bank1,所以最后bank1和bank2都会根据本地事务进行回滚。
     * @param accountNo 银行卡号
     * @param amount 金额
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Integer updateAccountBalance(String accountNo, Double amount) {
        // 1、首先查询自己的账户信息
        AccountInfo accountInfo = accountInfoMapper.selectById(2);
        if (accountInfo == null) {
            return 0;
        }
        // 2、判断自己的余额信息
        if (accountInfo.getAccountBalance() < amount) {
            return 0;
        }
        // 3、修改自己的余额信息
        accountInfo.setAccountBalance(accountInfo.getAccountBalance() - amount);
        int update = accountInfoMapper.updateById(accountInfo);
        // 4、测试以下远程调用的异常会不会引起当前方法的异常
        try {
            bank2Client.transfer(accountNo,amount);
        } catch (Exception e) {
            System.out.println("bank2出现错误了");
            System.out.println(e.getMessage());
        }
        return 1;
    }
}
mapper
@Repository
public interface AccountInfoMapper extends BaseMapper<AccountInfo> {
}
pojo
@Data
@TableName("account_info")
public class AccountInfo implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    private String accountName;

    private String accountNo;

    private String accountPassword;

    private Double accountBalance;
}
openfeign
/**
 * @Description: openfeign 远程调用服务接口
 * @Author: huidou 惠豆
 * @CreateTime: 2022/6/14 23:49
 */
@Component
@FeignClient(value = "seata-bank2")
public interface Bank2Client {
    // 调用bank2的服务接口,实现向李四转账
    @RequestMapping("/bank2/transfer")
    String transfer(@RequestParam("accountNo") String accountNo,@RequestParam("amount") Double amount);
}
搭建项目环境(BANK2)
pom
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.hui</groupId>
    <artifactId>bank2</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!--  web  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--  mysql-connector  -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.28</version>
        </dependency>
        <!--  mybatis-plus  -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.0</version>
        </dependency>
        <!--  nacos  -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
            <version>2021.0.1.0</version>
        </dependency>
        <!--  lombok  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.22</version>
        </dependency>
        <!--  loadbanaer  -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-loadbalancer</artifactId>
            <version>3.1.2</version>
        </dependency>
        <!--  逆向工程数据库  -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>3.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-engine-core</artifactId>
            <version>2.0</version>
        </dependency>
    </dependencies>

</project>
controller
/**
 * @Description: 用户转账操作
 * @Author: huidou 惠豆
 * @CreateTime: 2022/6/14 23:19
 */
@Controller
@RequestMapping("/bank2")
public class AccountInfoController {
    @Autowired
    private IAccountInfoService iAccountInfoService;
    /**
     * 接收来自张三的钱
     * @param accountNo
     * @param amount
     * @return
     */
    @ResponseBody
    @RequestMapping("/transfer")
    public String account(String accountNo, Double amount) {
        iAccountInfoService.updateAccountBalance(accountNo, amount);
        return "转账成功,接收来自张三的钱";
    }
}
service
public interface IAccountInfoService extends IService<AccountInfo> {

    /**
     * 李四账户增加金额
     * @param accountNo 银行卡号
     * @param amount 金额
     */
    void updateAccountBalance(String accountNo, Double amount);
}
@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {

    @Autowired
    private AccountInfoMapper accountInfoMapper;

    /**
     * 李四增加金额
     * @param accountNo 银行卡号
     * @param amount 金额
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void updateAccountBalance(String accountNo, Double amount) {
        try {
            System.out.println(10/0);
            // 1、根据账户ID,查询对应的账号信息(目前只有一人,就给死ID)
            QueryWrapper<AccountInfo> queryWrapper = new QueryWrapper<>();
            queryWrapper.eq("account_no",accountNo);
            AccountInfo accountInfo = accountInfoMapper.selectOne(queryWrapper);
            // 2、判断该账户是不是为Null
            if (accountInfo == null) {
                return;
            }
            // 3、给该账户添加钱(来自张三的)
            accountInfo.setAccountBalance(accountInfo.getAccountBalance() + amount);
            // 4、更新数据
            int updateById = accountInfoMapper.updateById(accountInfo);
        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("出现了10/0的异常");
        }
    }
}
pojo
@Data
@TableName("account_info")
public class AccountInfo implements Serializable {

    private static final long serialVersionUID = 1L;

    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    private String accountName;

    private String accountNo;

    private String accountPassword;

    private Double accountBalance;
}
项目的配置文件
server:
  port: 6001
spring:
  application:
    name: seata-bank1
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.205.150:8848
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/bank1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
    driver-class-name: com.mysql.cj.jdbc.Driver


server:
  port: 6002
spring:
  application:
    name: seata-bank2
  cloud:
    nacos:
      discovery:
        server-addr: 192.168.205.150:8848
  datasource:
    username: root
    password: root
    url: jdbc:mysql://localhost:3306/bank2?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC
    driver-class-name: com.mysql.cj.jdbc.Driver
Nacos服务列表

  • 项目测试

    1、两个项目中都不加@Transactional(rollbackFor = Exception.class),逻辑正常调用。

    2、两个项目中都加上@Transactional(rollbackFor = Exception.class),逻辑正常调用。

    3、两个项目中都加上@Transactional(rollbackFor = Exception.class),bank1在调用bank2接口之后设置异常查看是否出现数据不一致(bank2逻辑正常)。

    4、两个项目中都加上@Transactional(rollbackFor = Exception.class),bank1在调用bank2接口之后(bank1逻辑正常),但是bank2逻辑设置错误。

    结果:

    1、对于第一种情况,逻辑是正确的,数据符合一致性。

    2、对于第二种情况,逻辑是正确的,数据符合一致性。

    3、对于第三种情况,由于bank1在调用完bank2之后出现了异常,bank2的执行逻辑正确,再加上都有

    @Transactional(rollbackFor = Exception.class)注解,所以导致bank1数据回滚,但是bank2数据还是修改了,因为这个注解是本地注解,不能管到其他的事务模块中。

    4、对于第四种情况容易出现误解,由于bank1的执行逻辑是正确的,但是bank2的逻辑是错误的,是会报出异常的,加上bank1调用了bank2,这时bank1就会接收到bank2抛出的异常,由于两个接口都有@Transactional(rollbackFor = Exception.class)注解,所以最后都会进行回滚,这个时候,不清楚异常接收的原理的时候,还以为两个本地事务注解就能实现分布式事务,其实不是的,要真是,那还要分布式事务的理由在那里,第三种情况出现了又该如何处理?

4、实战案例二(微服务系统,使用分布式事务XA模式)

第一、添加Seata依赖
		<!--  seata  -->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <version>2021.0.1.0</version>
        </dependency>
第二、数据库添加Undo_log
CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT
CHARSET=utf8;
第三、BANK1(在业务层调用BANK2接口之后设置异常)
service
@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
    @Autowired
    private AccountInfoMapper accountInfoMapper;
    // 远程服务接口
    @Autowired
    private Bank2Client bank2Client;
    /**
     * 向李四转账
     * 从异常捕获来分析,bank1和bank2都设置了本地事务,这是后就会出现两种情况:
     * 1、bank1在调用bank2接口之后出现了异常。
     * 2、bank1正常调用bank2,但是bank2出现了异常。
     *
     * 结果分别是:
     * 第一、bank1出现了异常会根据本地事务的特点,遇见异常就进行回滚,但是bank2是正常的,就会出现bank1没有扣钱,但是bank2已经成功的加钱了。
     * 第二、bank1没有出现问题,但是bank2出现了问题,由于bank1调用bank2,benk2出现了Exception就会抛给bank1,所以最后bank1和bank2都会根据本地事务进行回滚。
     * @param accountNo 银行卡号
     * @param amount 金额
     */
    @GlobalTransactional // 这个注解只需要在其中的一个模块方法上添加即可,因为是全局,所以一个就够了
    @Override
    @Transactional(rollbackFor = Exception.class)
    public Integer updateAccountBalance(String accountNo, Double amount) {
        // 1、首先查询自己的账户信息
        AccountInfo accountInfo = accountInfoMapper.selectById(2);
        if (accountInfo == null) {
            return 0;
        }
        // 2、判断自己的余额信息
        if (accountInfo.getAccountBalance() < amount) {
            return 0;
        }
        // 3、修改自己的余额信息
        accountInfo.setAccountBalance(accountInfo.getAccountBalance() - amount);
        int update = accountInfoMapper.updateById(accountInfo);
        // 4、测试以下远程调用的异常会不会引起当前方法的异常
        bank2Client.transfer(accountNo,amount);
        /**
         * 在这里设置错误,检测是否出现不一致性
         * 错误情况就是:bank1数据回滚,不改变,但是bank2数据改变
         * 正常情况就是:出现异常,大家都不改变。
         */
        System.out.println(10/0);
        return 1;
    }
}
第四、BANK2(业务层逻辑正确)
service
@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
    @Autowired
    private AccountInfoMapper accountInfoMapper;
    /**
     * 李四增加金额
     * @param accountNo 银行卡号
     * @param amount 金额
     */
    @Override
    @Transactional(rollbackFor = Exception.class)
    public void updateAccountBalance(String accountNo, Double amount) {
        // 1、根据账户ID,查询对应的账号信息(目前只有一人,就给死ID)
        QueryWrapper<AccountInfo> queryWrapper = new QueryWrapper<>();
        queryWrapper.eq("account_no",accountNo);
        AccountInfo accountInfo = accountInfoMapper.selectOne(queryWrapper);
        // 2、判断该账户是不是为Null
        if (accountInfo == null) {
            return;
        }
        // 3、给该账户添加钱(来自张三的)
        accountInfo.setAccountBalance(accountInfo.getAccountBalance() + amount);
        // 4、更新数据
        int updateById = accountInfoMapper.updateById(accountInfo);
    }
}

结果:

数据库初始数据图:


数据库执行之后的结果还是一样的,所以说明分布式事务Seata生效了。

5、使用分布式事务解决的原理


分布式事务的核心思想其实就是一个投票的过程,对于一个决定,如果没有一个人有问题,那么就会执行,反之,只要有一个人有问题,就会进行回退取消。

从上面的图可以看出,当用户服务的事务管理器TM会向全局事务控制申请开启分布式事务,这时会产生一个全局唯一的XID,然后,用户服务会向全局事务进行注册自己的本地事务,注册完之后,会进行执行自己的逻辑,之后在条用积分服务,积分服务也是一样的会进行注册本地事务,由于两个服务都有自己的本地服务,倘若都成功了,就都会向TC提交一个投票也就是Commit,这时TC就会向两个本地事务发送Commit的命令;但是若有一个出现了问题那么@Transactional(rollbackFor = Exception.class)注解就会进行回滚,由于TC管辖了该本地事务,就会知道该事务想自己提交了rollback的请求,这时TC就会向所有的本地事务发送rollback的命令。

6、基于Atomikos实现XA强一致性分布式事务实战

是哦也能够Atomikos实现XA模式的强一致性,模拟实现跨库的转账实现,在一个项目模块中,使用多数据源(两个数据库)实现不同数据库的数据改变。

原理图如下:



使用Atomikos框架技术的好处在于:项目不需要直接与数据库进行交互进行数据的改变,而是让Atomikos技术对数据库进行封装,由于这个Atomikos技术内部实现了分布式事务的XA模式,所以对于开发人员来说,我不需要像之前那样不但要添加依赖,添加注解,还要与本地事务做出关联,现在就不需要了,只需要简简单单的添加依赖与注解即可,注解也是常用的@Transactional即可,只需要关注业务逻辑实现。

开始一个项目实战案例:

项目需求:在一个项目中,配置多个数据源,用于实现转账功能,一个数据库扣减金额;一个数据库添加金额

第一、创建数据库
CREATE DATABASE TX-XA-01

CREATE TABLE `user_account` (
  `account_no` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '账户编号',
  `account_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '账户\r\n名称',
  `account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额',
  PRIMARY KEY (`account_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
CREATE DATABASE TX-XA-02

CREATE TABLE `user_account` (
  `account_no` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '账户编号',
  `account_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '账户\r\n名称',
  `account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额',
  PRIMARY KEY (`account_no`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
第二、向数据库插入数据
CREATE DATABASE TX-XA-01
INSERT INTO USER_ACCOUNT VALUES("1001","张三",10000);

CREATE DATABASE TX-XA-02
INSERT INTO USER_ACCOUNT VALUES("1002","李四",10000);
第三、创建项目,配置项目环境
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.hui</groupId>
    <artifactId>atomikos-xa</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
    </parent>

    <dependencies>
        <!--  web  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!--  durid数据源  -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.1.22</version>
        </dependency>
        <!--  mysql-connector  -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <!--  mybatis-plus  -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.3.0</version>
        </dependency>
        <!--  atomikos  -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jta-atomikos</artifactId>
        </dependency>
        <!--  lombok  -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--  逆向工程数据库  -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-generator</artifactId>
            <version>3.5.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.velocity</groupId>
            <artifactId>velocity-engine-core</artifactId>
            <version>2.0</version>
        </dependency>
    </dependencies>
</project>
第四、多数据源配置
DBConfig1
/**
 * @Description: 第一个数据库信息的绑定
 * @Author: huidou 惠豆
 * @CreateTime: 2022/6/15 17:09
 */
@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master")
public class DBConfig1 {

    private String url;
    private String username;
    private String password;
    private String driverClassName;

}
DBConfig2
/**
 * @Description: 第二个数据库信息的绑定
 * @Author: huidou 惠豆
 * @CreateTime: 2022/6/15 17:09
 */
@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave")
public class DBConfig2 {

    private String url;
    private String username;
    private String password;
    private String driverClassName;

}
MybatisConfig1、MybatisConfig2
/**
 * @Description: 配置第一个数据源,将其与Atomikos进行整合
 * @Author: huidou 惠豆
 * @CreateTime: 2022/6/15 17:09
 */
@Configuration
@MapperScan(basePackages = "com.hui.mapper1",sqlSessionTemplateRef = "masterSqlSessionTemplate")
public class MybatisConfig1 {

    /**
     * 创建一个DataSource数据源
     * @param dbConfig1
     * @return
     */
    @Bean(name = "masterDataSource")
    public DataSource getDataSource(DBConfig1 dbConfig1) {
        // 1. 将本地事务注册到 Atomikos全局事务中
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        // 2. 设置rm供应商
        sourceBean.setUniqueResourceName("masterDataSource");
        sourceBean.setXaDataSourceClassName(dbConfig1.getDriverClassName());
        // 3. 设置testquery
        sourceBean.setTestQuery("select 1");
        // 4. 设置超时时间
        sourceBean.setBorrowConnectionTimeout(3);
        // 设置Mysql链接
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setUrl(dbConfig1.getUrl());
        dataSource.setUser(dbConfig1.getUsername());
        dataSource.setPassword(dbConfig1.getPassword());
        sourceBean.setXaDataSource(dataSource);
        return sourceBean;
    }

    /**
     * SqlSessionFactory是mybaits 重要的对象
     * @return
     */
    @Bean(name = "masterSqlSessionFactory")
    public SqlSessionFactory getSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception {
        MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);
        return sessionFactoryBean.getObject();
    }

    /**
     * 负责管理mybatis 的sqlsession sql
     * SqlSessionTemplate 替换默认的mybaits 实现的defalutsqlsession不能参与spring事务不能注入 线程不安全
     * @return
     */
    @Bean(name = "masterSqlSessionTemplate")
    public SqlSessionTemplate getSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
/**
 * @Description: 配置第二个数据源,将其与Atomikos进行整合
 * @Author: huidou 惠豆
 * @CreateTime: 2022/6/15 17:09
 */
@Configuration
@MapperScan(basePackages = "com.hui.mapper2",sqlSessionTemplateRef = "slaveSqlSessionTemplate")
public class MybatisConfig2 {

    /**
     * 创建一个DataSource数据源
     * @param dbConfig2
     * @return
     */
    @Bean(name = "slaveDataSource")
    public DataSource getDataSource(DBConfig2 dbConfig2) {
        // 1. 将本地事务注册到 Atomikos全局事务中
        AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
        // 2. 设置rm供应商
        sourceBean.setUniqueResourceName("slaveDataSource");
        sourceBean.setXaDataSourceClassName(dbConfig2.getDriverClassName());
        // 3. 设置testquery
        sourceBean.setTestQuery("select 1");
        // 4. 设置超时时间
        sourceBean.setBorrowConnectionTimeout(3);
        // 设置Mysql链接
        MysqlXADataSource dataSource = new MysqlXADataSource();
        dataSource.setUrl(dbConfig2.getUrl());
        dataSource.setUser(dbConfig2.getUsername());
        dataSource.setPassword(dbConfig2.getPassword());
        sourceBean.setXaDataSource(dataSource);
        return sourceBean;
    }

    /**
     * SqlSessionFactory是mybaits 重要的对象
     * @return
     */
    @Bean(name = "slaveSqlSessionFactory")
    public SqlSessionFactory getSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
        MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
        sessionFactoryBean.setDataSource(dataSource);
        return sessionFactoryBean.getObject();
    }

    /**
     * 负责管理mybatis 的sqlsession sql
     * SqlSessionTemplate 替换默认的mybaits 实现的defalutsqlsession不能参与spring事务不能注入 线程不安全
     * @return
     */
    @Bean(name = "slaveSqlSessionTemplate")
    public SqlSessionTemplate getSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
        return new SqlSessionTemplate(sqlSessionFactory);
    }
}
逻辑层实现
controller
/**
 * 实现转账操作
 */
@RestController
@RequestMapping("/userAccount")
public class UserAccountController {

    @Autowired
    private IUserAccountService iUserAccountService;

    /**
     * 实现转账操作
     * @param sourceAccountNo 源账户
     * @param targetAccountNo 目标账户
     * @param amount          转账金额
     * @return
     */
    @RequestMapping("/transfer")
    public String transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) {
        Integer transfer = iUserAccountService.transfer(sourceAccountNo, targetAccountNo, amount);
        if (transfer == 1) {
            return "转账成功!";
        }
        return "转账失败!";
    }
}
service
@Service
public class UserAccountServiceImpl  implements IUserAccountService {

    // 原账户所在数据库
    @Autowired
    private UserAccountMapper1 userAccountMapper1;

    // 目标账户所在数据库
    @Autowired
    private UserAccountMapper2 userAccountMapper2;

    /**
     * 实现转账操作
     * @param sourceAccountNo 源账户
     * @param targetAccountNo 目标账户
     * @param amount          转账金额
     * @return
     */
    @Override
    public Integer transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) {
        // 1、查询原账户
        UserAccount sourceAccount = userAccountMapper1.selectById(sourceAccountNo);
        // 2、查询目标账户
        UserAccount targetAccount = userAccountMapper2.selectById(targetAccountNo);
        // 3、判断原账户与目标账户是否存在
        if (sourceAccount != null && targetAccount != null) {
            // 4、判断原账户余额是否不足
            if (sourceAccount.getAccountBalance().compareTo(amount) < 0) {
                throw new RuntimeException("原账户金额不足,无法实现转账操作");
            }
            // 5、原账户扣减余额,并更新
            sourceAccount.setAccountBalance(sourceAccount.getAccountBalance().subtract(amount));
            int updateSourceAccount = userAccountMapper1.updateById(sourceAccount);
            // 6、目标账户增加余额,并更新
            targetAccount.setAccountBalance(targetAccount.getAccountBalance().add(amount));
            int updateTargetAccount = userAccountMapper2.updateById(targetAccount);
            if (updateSourceAccount == 1 && updateTargetAccount == 1) {
                return 1;
            }
        }
        return 0;
    }
}
mapper
@Repository
public interface UserAccountMapper1 extends BaseMapper<UserAccount> {}

@Repository
public interface UserAccountMapper2 extends BaseMapper<UserAccount> {}
application.yml
server:
  port: 6003
spring:
  autoconfigure:
    #停用druid连接池的自动配置
    exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
  datasource:
    #选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理
    type: com.alibaba.druid.pool.xa.DruidXADataSource
    #以下是自定义字段
    dynamic:
      primary: master
      datasource:
        master:
          url: jdbc:mysql://localhost:3306/tx-xa-01?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
          username: root
          password: root
          driver-class-name: com.mysql.cj.jdbc.Driver
        slave:
          url: jdbc:mysql://localhost:3306/tx-xa-02?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
          username: root
          password: root
          driver-class-name: com.mysql.cj.jdbc.Driver
        validation-query: SELCET 1

logging:
  pattern:
    console: logging.pattern.console=%d{MM/dd HH:mm:ss.SSS} %clr(%-5level) ---  [%-15thread] %cyan(%-50logger{50}):%msg%n
业务测试

发送请求:

http://localhost:6003/userAccount/transfer?sourceAccountNo=1001&targetAccountNo=1002&amount=1000

正常情况下,可以实现跨数据库对数据进行修改,但是如果我恶意做一个操作,代码如下:

/**
     * 实现转账操作
     * @param sourceAccountNo 源账户
     * @param targetAccountNo 目标账户
     * @param amount          转账金额
     * @return
     */
    @Override
    public Integer transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) {
        // 1、查询原账户
        UserAccount sourceAccount = userAccountMapper1.selectById(sourceAccountNo);
        // 2、查询目标账户
        UserAccount targetAccount = userAccountMapper2.selectById(targetAccountNo);
        // 3、判断原账户与目标账户是否存在
        if (sourceAccount != null && targetAccount != null) {
            // 4、判断原账户余额是否不足
            if (sourceAccount.getAccountBalance().compareTo(amount) < 0) {
                throw new RuntimeException("原账户金额不足,无法实现转账操作");
            }
            // 5、原账户扣减余额,并更新
            sourceAccount.setAccountBalance(sourceAccount.getAccountBalance().subtract(amount));
            int updateSourceAccount = userAccountMapper1.updateById(sourceAccount);
            System.out.println(10/0);
            // 6、目标账户增加余额,并更新
            targetAccount.setAccountBalance(targetAccount.getAccountBalance().add(amount));
            int updateTargetAccount = userAccountMapper2.updateById(targetAccount);
            if (updateSourceAccount == 1 && updateTargetAccount == 1) {
                return 1;
            }
        }
        return 0;
    }

从代码上可以看出来,我们在对数据库1做完操作之后进行的一个异常,这时就会造成报错,第一个数据库扣减成功,第二个数据库因异常无法执行,则不能添加,形成数据不一致。

如何解决?

使用Atomikos实现分布式事务数据XA模式的强一致性。

代码上如何修改:很简单,由于我们之前把多数据源已经都与Atomikos框架整合在一起了,所以这个时候,只需要在方法上添加一个注解@Transactional,该注解是import org.springframework.transaction.annotation.Transactional;下的,即可,该注解就包含了Atomikos对事务的处理,然后在主启动类上添加一个注解@EnableTransactionManager,即可实现。

结果:

发送请求:

http://localhost:6003/userAccount/transfer?sourceAccountNo=1001&targetAccountNo=1002&amount=1000

实现数据一致性。




























全部评论

相关推荐

offerboyyyy:之前看到降温完收到offer了的呢佬,可以签保底等
点赞 评论 收藏
分享
评论
点赞
1
分享
牛客网
牛客企业服务