3. 分布式事务解决方案之2PC(两阶段提交)针对不同的分布式场景业界常见的解决方案有2PC、TCC、可靠消息最终一致性、最大努力通知这几种。 3.1. 什么是2PC2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase),2是指两阶段,P是指准备阶段,C是提交阶段。 在这里插入图片描述 失败情况 : 在这里插入图片描述 3.2. 解决方案3.2.1 XA方案2PC的传统方案是在数据库层面实现的,如Oracle、MySQL都支持2PC协议,为了统一标准减少行业内不必要的对接成本,需要制定标准化的处理模型及接口标准,国际开放标准组织Open Group定义分布式事务处理模型DTP(Distributed Transaction Processing Reference Model)。 在这里插入图片描述 执行流程如下 : 1、应用程序(AP)持有用户库和积分库两个数据源。 2、应用程序(AP)通过TM通知用户库RM新增用户,同时通知积分库RM为该用户新增积分,RM此时并未提交事务,此时用户和积分资源锁定。 3、TM收到执行回复,只要有一方失败则分别向其他RM发起回滚事务,回滚完毕,资源锁释放。 4、TM收到执行回复,全部成功,此时向所有RM发起提交事务,提交完毕,资源锁释放。 DTP模型定义如下角色 :
3.2.2 Seata方案Seata是阿里中间件团队发起的开源项目Fescar,后更名Seata,它是一个是开源的分布式事务框架。传统2PC的问题在Seata中得到了解决,它通过对本地关系数据库的分支事务的协调来驱动完成全局事务,是工作在应用层的中间件。主要优点是性能较好,且不长时间占用连接资源,它以高效并且对业务0入侵的方式解决微服务场景下面临的分布式事务问题,它目前提供AT模式(即2PC)及TCC模式的分布式事务解决方案。 在这里插入图片描述 与传统2PC的模型类似,Seata定义了三个组件来协议分布式事务的处理过程 : 在这里插入图片描述
Seata实现2PC与传统2PC的差别 : 3.3. Seata实现2PC事务3.3.1. 业务说明本实例通过Seata中间件实现分布式事务,模拟两个账户的转账交易过程。两个账户在两个不同的银行(张三在bank1、李四在bank2),bank1和bank2是两个微服务。交易过程中,张三给李四转账制定金额。 在这里插入图片描述 3.3.2.程序组成部分本实例程序组成 部分如下 : 在这里插入图片描述 交互流程如下 : 3.3.3.创建数据库bank1库,包含张三账户 CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank1` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `bank1`;/*Table structure for table `account_info` */DROP TABLE IF EXISTS `account_info`;CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;/*Data for the table `account_info` */insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (2,'张三','1',NULL,1000);/*Table structure for table `de_duplication` */DROP TABLE IF EXISTS `de_duplication`;CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;/*Data for the table `de_duplication` *//*Table structure for table `local_cancel_log` */DROP TABLE IF EXISTS `local_cancel_log`;CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Data for the table `local_cancel_log` *//*Table structure for table `local_confirm_log` */DROP TABLE IF EXISTS `local_confirm_log`;CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Data for the table `local_confirm_log` *//*Table structure for table `local_trade_log` */DROP TABLE IF EXISTS `local_trade_log`;CREATE TABLE `local_trade_log` ( `tx_no` bigint(20) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;DROP TABLE IF EXISTS `local_try_log`;CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Data for the table `local_try_log` *//*Table structure for table `undo_log` */DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)) ENGINE=InnoDB AUTO_INCREMENT=167 DEFAULT CHARSET=utf8;/*Data for the table `undo_log` */insert into `undo_log`(`id`,`branch_id`,`xid`,`context`,`rollback_info`,`log_status`,`log_created`,`log_modified`,`ext`) values (166,2019228885,'192.168.1.101:8888:2019228047','serializer=jackson','{}',1,'2019-08-11 15:16:43','2019-08-11 15:16:43',NULL); bank2库,包含李四账户 CREATE DATABASE /*!32312 IF NOT EXISTS*/`bank2` /*!40100 DEFAULT CHARACTER SET utf8 */;USE `bank2`;/*Table structure for table `account_info` */DROP TABLE IF EXISTS `account_info`;CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;/*Data for the table `account_info` */insert into `account_info`(`id`,`account_name`,`account_no`,`account_password`,`account_balance`) values (3,'李四的账户','2',NULL,0);/*Table structure for table `de_duplication` */DROP TABLE IF EXISTS `de_duplication`;CREATE TABLE `de_duplication` ( `tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;/*Data for the table `de_duplication` *//*Table structure for table `local_cancel_log` */DROP TABLE IF EXISTS `local_cancel_log`;CREATE TABLE `local_cancel_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Data for the table `local_cancel_log` *//*Table structure for table `local_confirm_log` */DROP TABLE IF EXISTS `local_confirm_log`;CREATE TABLE `local_confirm_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Data for the table `local_confirm_log` *//*Table structure for table `local_trade_log` */DROP TABLE IF EXISTS `local_trade_log`;CREATE TABLE `local_trade_log` ( `tx_no` bigint(20) NOT NULL, `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;DROP TABLE IF EXISTS `local_try_log`;CREATE TABLE `local_try_log` ( `tx_no` varchar(64) NOT NULL COMMENT '事务id', `create_time` datetime DEFAULT NULL, PRIMARY KEY (`tx_no`)) ENGINE=InnoDB DEFAULT CHARSET=utf8;/*Data for the table `local_try_log` *//*Table structure for table `undo_log` */DROP TABLE IF EXISTS `undo_log`;CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)) ENGINE=InnoDB DEFAULT CHARSET=utf8; 3.3.4.启动TC(事务协调器)(1)下载seata服务器 在这里插入图片描述 如上图出现“Server started。。。“的字样则表示启动成功。 3.3.5 discover-serverdiscover-server是服务注册中心,测试工程将自己注册至discover-server。 3.3.6 创建dtx-seata-demodtx-seata-demo是seata的测试工程,根据业务需求需要创建两个dex-seata-demo工程。 在这里插入图片描述 在dtx-seata-demo父工程中指定了spring-cloud-alibaba-dependencies的版本。 在这里插入图片描述 (3)配置seata 在src/main/resource中,新增registry.conf、file.conf文件,内容可拷贝seata-server-0.7.1中的配置文件子。 在registry.conf中registry.type使用file: 在这里插入图片描述 在file.conf中更改service.vgroup_mapping.[springcloud服务名]-fescar-service-group = "default",并修改 service.default.grouplist =[seata服务端地址] 在这里插入图片描述 关于vgroup_mapping的配置: @Configurationpublic class DatabaseConfiguration { @Bean @ConfigurationProperties(prefix = "spring.datasource.ds0") public DruidDataSource ds0() { DruidDataSource druidDataSource = new DruidDataSource(); return druidDataSource;} @Primary @Bean public DataSource dataSource(DruidDataSource ds0) { DataSourceProxy pds0 = new DataSourceProxy(ds0); return pds0; }} 3.3.7 Seata执行流程1、正常提交流程 在这里插入图片描述 2、回滚流程 在这里插入图片描述 要点说明 : 1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目的就是第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有undo_log. 2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。 3.3.8 dtx-seata-demo-bank1dtx-seata-demo-bank1实现如下功能: @Mapper@Componentpublic interface AccountInfoDao { //更新账户金额@Update("update account_info set account_balance = account_balance + #{amount} where account_no = #{accountNo}")int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);} (2) FeignClient @FeignClient(value = "seata‐demo‐bank2",fallback = Bank2ClientFallback.class) public interface Bank2Client {@GetMapping("/bank2/transfer")String transfer(@RequestParam("amount") Double amount); }@Componentpublic class Bank2ClientFallback implements Bank2Client{@Overridepublic String transfer(Double amount) {return "fallback"; }} (3)Service @Servicepublic class AccountInfoServiceImpl implements AccountInfoService {private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);@AutowiredAccountInfoDao accountInfoDao;@AutowiredBank2Client bank2Client;//张三转账@Override@GlobalTransactional@Transactionalpublic void updateAccountBalance(String accountNo, Double amount) { logger.info("******** Bank1 Service Begin ... xid: {}" , RootContext.getXID()); //张三扣减金额 accountInfoDao.updateAccountBalance(accountNo,amount*‐1); //向李四转账 String remoteRst = bank2Client.transfer(amount); //远程调用失败 if(remoteRst.equals("fallback")){ throw new RuntimeException("bank1 下游服务异常"); } //人为制造错误 if(amount==3){ throw new RuntimeException("bank1 make exception 3"); } } } 将@GlobalTransactional注解标注在全局事务发起的Service实现方法上,开启全局事务 : @RestControllerpublic class Bank1Controller {@AutowiredAccountInfoService accountInfoService;//转账@GetMapping("/transfer")public String transfer(Double amount){accountInfoService.updateAccountBalance("1",amount);return "bank1"+amount; }} 3.3.9 dtx-seata-demo-bank2dtx-seata-demo-bank2实现如下功能: @Mapper@Componentpublic interface AccountInfoDao {//向李四转账@Update("UPDATE account_info SET account_balance = account_balance + #{amount} WHERE account_no = #{accountNo}")int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);} (2)Service @Servicepublic class AccountInfoServiceImpl implements AccountInfoService {private Logger logger = LoggerFactory.getLogger(AccountInfoServiceImpl.class);@AutowiredAccountInfoDao accountInfoDao;@Override @Transactionalpublic void updateAccountBalance(String accountNo, Double amount) { logger.info("******** Bank2 Service Begin ... xid: {}" , RootContext.getXID()); //李四增加金额accountInfoDao.updateAccountBalance(accountNo,amount);//制造异常if(amount==2){throw new RuntimeException("bank1 make exception 2"); }} } (3)Controller @RestControllerpublic class Bank2Controller {@AutowiredAccountInfoService accountInfoService;@GetMapping("/transfer")public String transfer(Double amount){accountInfoService.updateAccountBalance("2",amount);return "bank2"+amount; }} 3.3.10 测试场景
3.4. 小结传统2PC(基于数据库XA协议)和Seata实现2PC的两种2PC方案,由于Seata的零入侵并且解决了传统2PC长期锁资源的问题,所以推荐采用Seata实现2PC。 |
|
来自: liang1234_ > 《分布式事物》