分享

Spring Boot 集成 Seata 解决分布式事务问题

 瑶疏影 2021-01-12

seata 简介

Seata 是 阿里巴巴2019年开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。在 Seata 开源之前,Seata 对应的内部版本在阿里内部一直扮演着分布式一致性中间件的角色,帮助阿里度过历年的双11,对各业务进行了有力的支撑。经过多年沉淀与积累,2019.1 Seata 正式宣布对外开源 。目前 Seata 1.0 已经 GA。

微服务中的分布式事务问题

让我们想象一下传统的单片应用程序,它的业务由3个模块组成,他们使用单个本地数据源。自然,本地事务将保证数据的一致性。

微服务架构已发生了变化。上面提到的3个模块被设计为3种服务。本地事务自然可以保证每个服务中的数据一致性。但是整个业务逻辑范围如何?

Seata怎么办?

我们说,分布式事务是由一批分支事务组成的全局事务,通常分支事务只是本地事务。

Seata有3个基本组成部分:

  • 事务协调器(TC):维护全局事务和分支事务的状态,驱动全局提交或回滚。

  • 事务管理器TM:定义全局事务的范围:开始全局事务,提交或回滚全局事务。

  • 资源管理器(RM):管理正在处理的分支事务的资源,与TC对话以注册分支事务并报告分支事务的状态,并驱动分支事务的提交或回滚。

Seata管理的分布式事务的典型生命周期:

  1. TM要求TC开始一项新的全局事务。TC生成代表全局事务的XID。

  2. XID通过微服务的调用链传播。

  3. RM将本地事务注册为XID到TC的相应全局事务的分支。

  4. TM要求TC提交或回退相应的XID全局事务。

  5. TC驱动XID的相应全局事务下的所有分支事务以完成分支提交或回滚。

快速开始

用例

用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:

  • 仓储服务:对给定的商品扣除仓储数量。

  • 订单服务:根据采购需求创建订单。

  • 账户服务:从用户帐户中扣除余额。

环境准备

步骤 1:建立数据库

  1. # db_seata

  2. DROP SCHEMA IF EXISTS db_seata;

  3. CREATE SCHEMA db_seata;

  4. USE db_seata;

  5. # Account

  6. CREATE TABLE `account_tbl` (

  7. `id` INT(11) NOT NULL AUTO_INCREMENT,

  8. `user_id` VARCHAR(255) DEFAULT NULL,

  9. `money` INT(11) DEFAULT 0,

  10. PRIMARY KEY (`id`)

  11. ) ENGINE = InnoDB DEFAULT CHARSET = utf8;

  12. INSERT INTO account_tbl (id, user_id, money)

  13. VALUES (1, '1001', 10000);

  14. INSERT INTO account_tbl (id, user_id, money)

  15. VALUES (2, '1002', 10000);

  16. # Order

  17. CREATE TABLE `order_tbl`

  18. (

  19. `id` INT(11) NOT NULL AUTO_INCREMENT,

  20. `user_id` VARCHAR(255) DEFAULT NULL,

  21. `commodity_code` VARCHAR(255) DEFAULT NULL,

  22. `count` INT(11) DEFAULT '0',

  23. `money` INT(11) DEFAULT '0',

  24. PRIMARY KEY (`id`)

  25. ) ENGINE = InnoDB DEFAULT CHARSET = utf8;

  26. # Storage

  27. CREATE TABLE `storage_tbl` (

  28. `id` INT(11) NOT NULL AUTO_INCREMENT,

  29. `commodity_code` VARCHAR(255) DEFAULT NULL,

  30. `count` INT(11) DEFAULT '0',

  31. PRIMARY KEY (`id`),

  32. UNIQUE KEY `commodity_code` (`commodity_code`)

  33. ) ENGINE = InnoDB DEFAULT CHARSET = utf8;

  34. INSERT INTO storage_tbl (id, commodity_code, count)

  35. VALUES (1, '2001', 1000);

  36. CREATE TABLE `undo_log` (

  37. `id` bigint(20) NOT NULL AUTO_INCREMENT,

  38. `branch_id` bigint(20) NOT NULL,

  39. `xid` varchar(100) NOT NULL,

  40. `context` varchar(128) NOT NULL,

  41. `rollback_info` longblob NOT NULL,

  42. `log_status` int(11) NOT NULL,

  43. `log_created` datetime NOT NULL,

  44. `log_modified` datetime NOT NULL,

  45. PRIMARY KEY (`id`),

  46. UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)

  47. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

seata AT 模式需要 undo_log 表,另外三张是业务表。

步骤 2: 启动 Seata Server

Server端存储模式(store.mode)现有file、db两种(后续将引入raft),file模式无需改动,直接启动即可。db模式需要导入用于存储全局事务回话信息的三张表。

注:file模式为单机模式,全局事务会话信息内存中读写并持久化本地文件root.data,性能较高; db模式为高可用模式,全局事务会话信息通过db共享,相应性能差些

可以直接通过bash 脚本启动 Seata Server,也可以通过 Docker 镜像启动,但是 Docker 方式目前只支持使用 file 模式,不支持将 Seata-Server 注册到 Eureka 或 Nacos 等注册中心。

通过脚本启动

在 https://github.com/seata/seata/releases 下载相应版本的 Seata Server,解压后执行以下命令启动,这里使用 file 配置

通过 Docker 启动
  1. docker run --name seata-server -p 8091:8091 seataio/seata-server:latest

项目介绍

项目名地址说明
sbm-account-service127.0.0.1:8081账户服务
sbm-order-service127.0.0.1:8082订单服务
sbm-storage-service127.0.0.1:8083仓储服务
sbm-business-service127.0.0.1:8084主业务
seata-server172.16.2.101:8091seata-server

核心代码

为了不让篇幅太长,这里只给出部分代码,详细代码文末会给出源码地址

maven 引入 seata 的依赖 eata-spring-boot-starter

  1. <dependency>

  2. <groupId>io.seata</groupId>

  3. <artifactId>seata-spring-boot-starter</artifactId>

  4. <version>1.0.0</version>

  5. </dependency>

仓储服务

application.properties
  1. spring.application.name=account-service

  2. server.port=8081

  3. spring.datasource.url=jdbc:mysql://172.16.2.101:3306/db_seata?useSSL=false&serverTimezone=UTC

  4. spring.datasource.username=root

  5. spring.datasource.password=123456

  6. seata.tx-service-group=my_test_tx_group

  7. mybatis.mapper-locations=classpath*:mapper/*Mapper.xml

  8. seata.service.grouplist=172.16.2.101:8091

  9. logging.level.io.seata=info

  10. logging.level.io.seata.samples.account.persistence.AccountMapper=debug

StorageService
  1. public interface StorageService {

  2. /**

  3. * 扣除存储数量

  4. */

  5. void deduct(String commodityCode, int count);

  6. }

订单服务

  1. public interface OrderService {

  2. /**

  3. * 创建订单

  4. */

  5. Order create(String userId, String commodityCode, int orderCount);

  6. }

账户服务

  1. public interface AccountService {

  2. /**

  3. * 从用户账户中借出

  4. */

  5. void debit(String userId, int money);

  6. }

主要业务逻辑

只需要使用一个 @GlobalTransactional 注解在业务方法上。

  1. @GlobalTransactional

  2. public void purchase(String userId, String commodityCode, int orderCount) {

  3. LOGGER.info('purchase begin ... xid: ' + RootContext.getXID());

  4. storageClient.deduct(commodityCode, orderCount);

  5. orderClient.create(userId, commodityCode, orderCount);

  6. }

XID 的传递

全局事务ID的跨服务传递,需要我们自己实现,这里通过拦截器的方式。每个服务都需要添加下面两个类。

SeataFilter
  1. @Component

  2. public class SeataFilter implements Filter {

  3. @Override

  4. public void init(FilterConfig filterConfig) throws ServletException {

  5. }

  6. @Override

  7. public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException {

  8. HttpServletRequest req = (HttpServletRequest) servletRequest;

  9. String xid = req.getHeader(RootContext.KEY_XID.toLowerCase());

  10. boolean isBind = false;

  11. if (StringUtils.isNotBlank(xid)) {

  12. RootContext.bind(xid);

  13. isBind = true;

  14. }

  15. try {

  16. filterChain.doFilter(servletRequest, servletResponse);

  17. } finally {

  18. if (isBind) {

  19. RootContext.unbind();

  20. }

  21. }

  22. }

  23. @Override

  24. public void destroy() {

  25. }

  26. }

SeataRestTemplateAutoConfiguration
  1. @Configuration

  2. public class SeataRestTemplateAutoConfiguration {

  3. @Autowired(

  4. required = false

  5. )

  6. private Collection<RestTemplate> restTemplates;

  7. @Autowired

  8. private SeataRestTemplateInterceptor seataRestTemplateInterceptor;

  9. public SeataRestTemplateAutoConfiguration() {

  10. }

  11. @Bean

  12. public SeataRestTemplateInterceptor seataRestTemplateInterceptor() {

  13. return new SeataRestTemplateInterceptor();

  14. }

  15. @PostConstruct

  16. public void init() {

  17. if (this.restTemplates != null) {

  18. Iterator var1 = this.restTemplates.iterator();

  19. while (var1.hasNext()) {

  20. RestTemplate restTemplate = (RestTemplate) var1.next();

  21. List<ClientHttpRequestInterceptor> interceptors = new ArrayList(restTemplate.getInterceptors());

  22. interceptors.add(this.seataRestTemplateInterceptor);

  23. restTemplate.setInterceptors(interceptors);

  24. }

  25. }

  26. }

  27. }

测试

测试成功场景:

  1. curl -X POST http://127.0.0.1:8084/api/business/purchase/commit

此时返回结果为:true

测试失败场景:

UserId 为1002 的用户下单,sbm-account-service会抛出异常,事务会回滚

  1. http://127.0.0.1:8084/api/business/purchase/rollback

此时返回结果为:false

查看 undo_log 的日志或者主键,可以看到在执行过程中有保存数据。如查看主键自增的值,在执行前后的值会发生变化,在执行前是 1,执行后是 7 。

源码地址

https://github.com/gf-huanchupk/SpringBootLearning/tree/master/springboot-seata

参考

http:///zh-cn/docs/overview/what-is-seata.html

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多