一、何为事务
定义:事务是指多个操作单元组成的合集,多个单元操作是整体不可分割的,要么都操作成功,要么都不成功。 其必须遵循的四个原则(ACID):
原子性(Atomicity -- 美 [ˌætəˈmɪsɪti]):事务是不可分割的最小单位,事务内的操作要么全成功(commit),要么一个失败全失败(rollback)
一致性(Consistency -- 美 [kənˈsɪstənsi]):在事务执行前,数据库的数据处于正确的状态,而事务执行完成后,数据库的数据还应该是处于正确的状态,即数据完整性约束没有被破坏; 比如A向B转了10元钱,涉及的操作有,B+10,A-10,在转钱这个操作(两个action)最终成功的进行事务的提交后,必须保证A的账户金额确实-10,而不是B+10拿到钱后,就不管A-10了(万一A没扣钱,岂不是赚了一笔),否则数据的完整性无法达到两边甚至N边一致。
隔离性(Isolation -- 美 [ˌaɪsəˈleʃən]):并发事务执行之间互不影响,在一个事务内部的操作对其他事务是不会产生影响的,这需要事务隔离级别来指定隔离性;
| | V -- 五大隔离级别(不算默认的话,就是四大):
① Isolation.DEFAULT:使用数据库设置的隔离级别 ( 默认 ) ,由 DBA 默认的设置来决定隔离级别 .
② Isolation.READ_UNCOMMITTED:这是事务最低的隔离级别,它充许别外一个事务可以拿到这个事务未提交的数据。这种隔离级别会产生脏读,不可重复读和幻读
③ Isolation.READ_COMMITTED:保证一个事务修改的数据提交后才能被另外一个事务读取。另外一个事务不能读取该事务未提交的数据。这种事务隔离级别可以避免脏读出现,但是可能会出现不可重复读和幻读
④ Isolation.REPEATABLE_READ: 这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻读。它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了避免下面的情况产生(不可重复读)。 ⑤ Isolation.SERIALIZABLE :级别最高,花费的代价最高,但却是最可靠的事务隔离级别。事务被处理为顺序执行。除了防止脏读,不可重复读外,还避免了幻读。
| | V
脏读: 一个事务读取了另一个事务未提交的数据 ---> 危险、可怕 A 向 B 转100元钱 == B+100 and A-100 两个操作 如果在上述的操作中,B对应的事务还未提交(此时数据已经update进去了),假设另一端B正在查询自己的账户,会发现:"咦,昨天卡里就100,怎么今天突然成了200",还没等B高兴过来,A那边的ATM机坏掉了,于是所有事务回滚,回到起点,假设B这时候又查了一次(怀疑中),会发现:"咦,奇了怪了,刚才200,怎么现在....." ,再来个大胆的假设,假设A转账的过程非常慢,而B查到了自己账户多出100元,于是乎就兴冲冲的取出来200,就去嗨皮了;结果A那边在漫长的等待后,ATM机子还是坏掉了,所有事务回滚,完了,那100块钱怎么处理?
不可重复读:一个事务读取表中的某条数据,多次读取发现结果不一样 (读取了另一个事务已提交的数据)
假设A的账户里面有100元,A的朋友B想找A借钱,于是就去银行柜台想通过工作人员查询A的账户里有多少钱,在工作人员多次查询的情况下(假设这在一个事务内),巧的是,在工作人员第一次查询的时候,A账户金额还是显示的100,这个时候,另一端的A在ATM机上成功的给自己的账户里面存了100元,于是乎,工作人员第二次查询的时候,电脑屏幕上显示的A的账户金额为200元,这个时候,工作人员在没有确认A是通过ATM机给自己存了100元的前提下,是无法确认这两次查询到底哪一次是正确的,也有可能第一次查询系统出错了,也有可能是第二次查询的时候系统出错了,哈哈,说到这,我感觉很有意思了,总之,不可重复读区别于脏读,脏读读的是未提交的数据,而这个读的是提交的数据。
幻读:一个事务在插入数据时先检测到记录不存在,于是乎准备进行插入,这时候却惊奇的发现刚才检测的不存在的记录居然存在了,这时候第一个事务肯定插不进去了,我们猜测一种情况就是主键冲突,怎么回事呢?原因就在于,事务在插入的时候,另一个事务已经将数据更新,造成了前一个事务有一种见了鬼的感觉。
持久性(Durability -- 美 [ˌdjʊrəˈbɪlətɪ]):事务一旦执行成功,它对数据库的数据的改变必须是永久的,不会因比如遇到系统故障或断电造成数据不一致或丢失。
二、事务分类
1. 数据库事务分为 -- 本地事务 -- 全局事务 本地事务:普通事务,独立一个数据库(Connection),能保证在该数据库上操作的ACID 全局事务(分布式事务):涉及两个或多个数据库源的事务,即跨越多台同类或异类数据库的事务(由每台数据库的本地事务组成的),分布式事务旨在保证这些本地事务的所有操作的ACID,使事务可以跨越多台数据库;
2. Java事务类型分为 -- JDBC事务 跟 -- JTA事务 JDBC事务:即为上面说的数据库事务中的本地事务,通过connection对象控制管理 JTA(Java Transaction API)事务:Java事务API,是Java EE数据库事务规范, JTA只提供了事物管理接口,由应用程序服务器厂商(如WebSphere Application Server)提供实现,JTA事务比JDBC更强大,支持分布式事务。
3. 编程式事务和声明式事务 编程式事务:通过代码在业务执行时根据需要自行实现事务的commit和rollback,粒度更小,可作用在代码块上,缺点:不可复用,重复的代码太多 声明式事务:繁琐的有XML配置,简单粗暴的直接使用@Transactional注解实现
三、什么是Atomikos(以下摘自搜狗百科)
全称:Atomikos TransactionsEssentials 是一个为Java平台提供增值服务的并且开源类事务管理器,以下是包括在这个开源版本中的一些功能: l 全面崩溃 / 重启恢复 l 兼容标准的SUN公司JTA API l 嵌套事务 l 为XA和非XA提供内置的JDBC适配器
注释:XA:XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2、Sybase、MySql、免费开源的Postgresql等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。 有人说 XA 是 eXtended Architecture(扩充体系结构) 的缩写, 其实我觉得这仅仅是一种巧合. eXtended Architecture 是一种CD ROM的驱动架构.
以下的函数使事务管理器可以对资源管理器进行的操作: 1)xa_open,xa_close:建立和关闭与资源管理器的连接。 2)xa_start,xa_end:开始和结束一个本地事务。 3)xa_prepare,xa_commit,xa_rollback:预提交、提交和回滚一个本地事务。 4)xa_recover:回滚一个已进行预提交的事务。 5)ax_开头的函数使资源管理器可以动态地在事务管理器中进行注册,并可以对XID(TRANSACTION IDS)进行操作。 6)ax_reg,ax_unreg;允许一个资源管理器在一个TMS(TRANSACTION MANAGER SERVER)中动态注册或撤消注册。
mysql数据库驱动实现 XADataSource接口
postgresql数据库驱动实现 XADataSource接口
四、Spring-Boot+Atomikos+MySql实现多库的分布式事务管理
(1)项目目录结构图
(2)Pom.xml
<project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance" xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.appleyk</groupId> <artifactId>spring-boot-atomikos</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>war</packaging> <description>跨库的分布式事务统一管理</description> <!-- 继承官网最新父POM【假设当前项目不再继承其他POM】 --> <!-- http://projects./spring-boot/#quick-start --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <!-- 使用Java8,嘗試使用新特新【stream和lambda】 --> <java.version>1.8</java.version> <!-- Starter POMs是可以包含到应用中的一个方便的依赖关系描述符集合 --> <!-- 该Starters包含很多你搭建项目, 快速运行所需的依赖, 并提供一致的, 管理的传递依赖集。 --> <!-- 大多数的web应用都使用spring-boot-starter-web模块进行快速搭建和运行。 --> <!-- spring-boot-starter-web --> <!-- 对全栈web开发的支持, 包括Tomcat和 spring-webmvc --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <!-- 添加Mybatis、Spring-Mybatis依赖 --> <!-- mybatis-spring-boot-starter继承树那是相当全面 --> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <artifactId>mysql-connector-java</artifactId> <!-- https:///artifact/org.postgresql/postgresql --> <groupId>org.postgresql</groupId> <artifactId>postgresql</artifactId> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> <!-- 添加热部署 devtools:监听文件变动 --> <!-- 当Java文件改动时,Spring-boo会快速重新启动 --> <!-- 最简单的测试,就是随便找一个文件Ctrl+S一下,就可以看到效果 --> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <!-- optional=true,依赖不会传递 --> <!-- 本项目依赖devtools;若依赖本项目的其他项目想要使用devtools,需要重新引入 --> <optional>true</optional> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <!-- https:///artifact/tk.mybatis/mapper-spring-boot-starter --> <groupId>tk.mybatis</groupId> <artifactId>mapper-spring-boot-starter</artifactId>
(3)application.properties配置多数据
server.session.timeout=10 server.tomcat.uri-encoding=utf8 mysql.datasource.master.url = jdbc\:mysql\://localhost\:3306/master?useUnicode\=true&characterEncoding\=utf-8 mysql.datasource.master.username = root mysql.datasource.master.password = root mysql.datasource.master.minPoolSize = 3 mysql.datasource.master.maxPoolSize = 25 mysql.datasource.master.maxLifetime = 20000 mysql.datasource.master.borrowConnectionTimeout = 30 mysql.datasource.master.loginTimeout = 30 mysql.datasource.master.maintenanceInterval = 60 mysql.datasource.master.maxIdleTime = 60 mysql.datasource.master.testQuery = select 1 mysql.datasource.slave.url =jdbc\:mysql\://localhost\:3306/slave?useUnicode\=true&characterEncoding\=utf-8 mysql.datasource.slave.username =root mysql.datasource.slave.password =root mysql.datasource.slave.minPoolSize = 3 mysql.datasource.slave.maxPoolSize = 25 mysql.datasource.slave.maxLifetime = 20000 mysql.datasource.slave.borrowConnectionTimeout = 30 mysql.datasource.slave.loginTimeout = 30 mysql.datasource.slave.maintenanceInterval = 60 mysql.datasource.slave.maxIdleTime = 60 mysql.datasource.slave.testQuery = select 1 #在application.properties文件中引入日志配置文件 #===================================== log ============================= logging.config=classpath:logback-boot.xml
(4)日志文件logback-boot.xml配置 (设置日志级别为error,方便输出查看)
<!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, --> <!-- appender是configuration的子节点,是负责写日志的组件。 --> <!-- ConsoleAppender:把日志输出到控制台 --> <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> <pattern>%d %p (%file:%line\)- %m%n</pattern> <!-- 控制台也要使用UTF-8,不要使用GBK,否则会中文乱码 --> <!-- RollingFileAppender:滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 --> <!-- 以下的大概意思是:1.先按日期存日志,日期变了,将前一天的日志文件名重命名为XXX%日期%索引,新的日志仍然是sys.log --> <!-- 2.如果日期没有发生变化,但是当前日志的文件大小超过1KB时,对当前日志进行分割 重命名--> class="ch.qos.logback.core.rolling.RollingFileAppender"> <!-- <File>log/sys.log</File> --> <File>opt/spring-boot-web/logs/sys.log</File> <!-- rollingPolicy:当发生滚动时,决定 RollingFileAppender 的行为,涉及文件移动和重命名。 --> <!-- TimeBasedRollingPolicy: 最常用的滚动策略,它根据时间来制定滚动策略,既负责滚动也负责出发滚动 --> <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"> <!-- 活动文件的名字会根据fileNamePattern的值,每隔一段时间改变一次 --> <!-- 文件名:log/sys.2017-12-05.0.log --> <fileNamePattern>log/sys.%d.%i.log</fileNamePattern> <!-- 每产生一个日志文件,该日志文件的保存期限为30天 --> <maxHistory>30</maxHistory> <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"> <!-- maxFileSize:这是活动文件的大小,默认值是10MB,本篇设置为1KB,只是为了演示 --> <maxFileSize>10MB</maxFileSize> </timeBasedFileNamingAndTriggeringPolicy> <!-- pattern节点,用来设置日志的输入格式 --> %d %p (%file:%line\)- %m%n <charset>UTF-8</charset> <!-- 此处设置字符集 --> <appender-ref ref="STDOUT" /> <!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 --> <!-- com.appley为根包,也就是只要是发生在这个根包下面的所有日志操作行为的权限都是DEBUG --> <!-- 级别依次为【从高到低】:FATAL > ERROR > WARN > INFO > DEBUG > TRACE --> <logger name="com.appleyk" level="error"> <appender-ref ref="syslog" />
(5)mysql数据库
A. 结构 (数据库引擎 InnoDB)
B. sql脚本
master_a.sql
-- Table structure for table `a` DROP TABLE IF EXISTS `a`; `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(45) DEFAULT NULL, `sex` char(2) DEFAULT NULL, `age` int(11) DEFAULT NULL, ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
slave_b.sql
-- Table structure for table `b` DROP TABLE IF EXISTS `b`; `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(45) DEFAULT NULL, `sex` char(2) DEFAULT NULL, `age` int(11) DEFAULT NULL, ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
为了简单,特意将从库的b表设计的和主库的a表一样,不同的只是数据库不一样和表名不一样
(6)加载主库数据源的属性(参数)
MasterConfig.java
package com.appleyk.config; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix="mysql.datasource.master") public class MasterConfig { private int borrowConnectionTimeout; private int loginTimeout; private int maintenanceInterval; private String testQuery; public void setUrl(String url) { public String getUsername() { public void setUsername(String username) { this.username = username; public String getPassword() { public void setPassword(String password) { this.password = password; public int getMinPoolSize() { public void setMinPoolSize(int minPoolSize) { this.minPoolSize = minPoolSize; public int getMaxPoolSize() { public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; public int getMaxLifetime() { public void setMaxLifetime(int maxLifetime) { this.maxLifetime = maxLifetime; public int getBorrowConnectionTimeout() { return borrowConnectionTimeout; public void setBorrowConnectionTimeout(int borrowConnectionTimeout) { this.borrowConnectionTimeout = borrowConnectionTimeout; public int getLoginTimeout() { public void setLoginTimeout(int loginTimeout) { this.loginTimeout = loginTimeout; public int getMaintenanceInterval() { return maintenanceInterval; public void setMaintenanceInterval(int maintenanceInterval) { this.maintenanceInterval = maintenanceInterval; public int getMaxIdleTime() { public void setMaxIdleTime(int maxIdleTime) { this.maxIdleTime = maxIdleTime; public String getTestQuery() { public void setTestQuery(String testQuery) { this.testQuery = testQuery;
(7)加载从库数据源的属性(参数)
SlaveConfig.java
package com.appleyk.config; import org.springframework.boot.context.properties.ConfigurationProperties; @ConfigurationProperties(prefix="mysql.datasource.slave") public class SlaveConfig { private int borrowConnectionTimeout; private int loginTimeout; private int maintenanceInterval; private String testQuery; public void setUrl(String url) { public String getUsername() { public void setUsername(String username) { this.username = username; public String getPassword() { public void setPassword(String password) { this.password = password; public int getMinPoolSize() { public void setMinPoolSize(int minPoolSize) { this.minPoolSize = minPoolSize; public int getMaxPoolSize() { public void setMaxPoolSize(int maxPoolSize) { this.maxPoolSize = maxPoolSize; public int getMaxLifetime() { public void setMaxLifetime(int maxLifetime) { this.maxLifetime = maxLifetime; public int getBorrowConnectionTimeout() { return borrowConnectionTimeout; public void setBorrowConnectionTimeout(int borrowConnectionTimeout) { this.borrowConnectionTimeout = borrowConnectionTimeout; public int getLoginTimeout() { public void setLoginTimeout(int loginTimeout) { this.loginTimeout = loginTimeout; public int getMaintenanceInterval() { return maintenanceInterval; public void setMaintenanceInterval(int maintenanceInterval) { this.maintenanceInterval = maintenanceInterval; public int getMaxIdleTime() { public void setMaxIdleTime(int maxIdleTime) { this.maxIdleTime = maxIdleTime; public String getTestQuery() { public void setTestQuery(String testQuery) { this.testQuery = testQuery;
(8)配置主数据源
MasterDBSource.java
package com.appleyk.datasource; import java.sql.SQLException; import javax.sql.DataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Primary; import com.appleyk.config.MasterConfig; import com.atomikos.jdbc.AtomikosDataSourceBean; import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource; * SqlSessionFactoryBuilder:build方法创建SqlSessionFactory实例。 * SqlSessionFactory:创建SqlSession实例的工厂。 * SqlSession:用于执行持久化操作的对象,类似于jdbc中的Connection。 * SqlSessionTemplate:MyBatis提供的持久层访问模板化的工具,线程安全,可通过构造参数或依赖注入SqlSessionFactory实例 * 主库的数据源模板,应用在主库所对应的Dao层上(扫描对应的mapper),实现主数据源的指定+增删改查 * @author yukun24@126.com * @blob http://blog.csdn.net/appleyk * @date 2018年3月16日-下午1:08:53 @Configuration // ---> 标注此注解,Spring—Boot启动时,会自动进行相应的主数据源配置 -->注入Bean @MapperScan(basePackages = "com.appleyk.mapper.master", sqlSessionTemplateRef = "masterSqlSessionTemplate") public class MasterDBSource { public DataSource testDataSource(MasterConfig masterConfig) throws SQLException { * MySql数据库驱动 实现 XADataSource接口 MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(masterConfig.getUrl()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); mysqlXaDataSource.setPassword(masterConfig.getPassword()); mysqlXaDataSource.setUser(masterConfig.getUsername()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); // * Postgresql数据库驱动 实现 XADataSource // * 包 --> org.postgresql.xa.PGXADataSource; // PGXADataSource pgxaDataSource = new PGXADataSource(); // pgxaDataSource.setUrl(masterConfig.getUrl()); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName("MasterDB"); xaDataSource.setMinPoolSize(masterConfig.getMinPoolSize()); xaDataSource.setMaxPoolSize(masterConfig.getMaxPoolSize()); xaDataSource.setMaxLifetime(masterConfig.getMaxLifetime()); xaDataSource.setBorrowConnectionTimeout(masterConfig.getBorrowConnectionTimeout()); xaDataSource.setLoginTimeout(masterConfig.getLoginTimeout()); xaDataSource.setMaintenanceInterval(masterConfig.getMaintenanceInterval()); xaDataSource.setMaxIdleTime(masterConfig.getMaxIdleTime()); xaDataSource.setTestQuery(masterConfig.getTestQuery()); System.err.println("主数据源注入成功....."); @Bean(name = "masterSqlSessionFactory") public SqlSessionFactory masterSqlSessionFactory(@Qualifier("MasterDB") DataSource dataSource) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); @Bean(name = "masterSqlSessionTemplate") public SqlSessionTemplate masterSqlSessionTemplate( @Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory);
(9)配置从数据源 (同上,只是改了个名字)
SlaveDBSource.java
package com.appleyk.datasource; import java.sql.SQLException; import javax.sql.DataSource; import org.apache.ibatis.session.SqlSessionFactory; import org.mybatis.spring.SqlSessionFactoryBean; import org.mybatis.spring.SqlSessionTemplate; import org.mybatis.spring.annotation.MapperScan; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.appleyk.config.SlaveConfig; import com.atomikos.jdbc.AtomikosDataSourceBean; import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource; * SqlSessionFactoryBuilder:build方法创建SqlSessionFactory实例。 * SqlSessionFactory:创建SqlSession实例的工厂。 * SqlSession:用于执行持久化操作的对象,类似于jdbc中的Connection。 * SqlSessionTemplate:MyBatis提供的持久层访问模板化的工具,线程安全,可通过构造参数或依赖注入SqlSessionFactory实例 * 从库的数据源模板,应用在从库所对应的Dao层上(扫描对应的mapper),实现从数据源的指定+增删改查 * @author yukun24@126.com * @blob http://blog.csdn.net/appleyk * @date 2018年3月16日-下午1:08:53 @Configuration // ---> 标注此注解,Spring—Boot启动时,会自动进行相应的从数据源配置 -->注入Bean @MapperScan(basePackages = "com.appleyk.mapper.slave", sqlSessionTemplateRef = "slaveSqlSessionTemplate") public class SlaveDBSource { public DataSource testDataSource(SlaveConfig slaveConfig) throws SQLException { MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource(); mysqlXaDataSource.setUrl(slaveConfig.getUrl()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); mysqlXaDataSource.setPassword(slaveConfig.getPassword()); mysqlXaDataSource.setUser(slaveConfig.getUsername()); mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true); AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean(); xaDataSource.setXaDataSource(mysqlXaDataSource); xaDataSource.setUniqueResourceName("SlaveDB"); xaDataSource.setMinPoolSize(slaveConfig.getMinPoolSize()); xaDataSource.setMaxPoolSize(slaveConfig.getMaxPoolSize()); xaDataSource.setMaxLifetime(slaveConfig.getMaxLifetime()); xaDataSource.setBorrowConnectionTimeout(slaveConfig.getBorrowConnectionTimeout()); xaDataSource.setLoginTimeout(slaveConfig.getLoginTimeout()); xaDataSource.setMaintenanceInterval(slaveConfig.getMaintenanceInterval()); xaDataSource.setMaxIdleTime(slaveConfig.getMaxIdleTime()); xaDataSource.setTestQuery(slaveConfig.getTestQuery()); System.err.println("从数据源注入成功....."); @Bean(name = "slaveSqlSessionFactory") public SqlSessionFactory masterSqlSessionFactory(@Qualifier("SlaveDB") DataSource dataSource) throws Exception { SqlSessionFactoryBean bean = new SqlSessionFactoryBean(); bean.setDataSource(dataSource); @Bean(name = "slaveSqlSessionTemplate") public SqlSessionTemplate slaveSqlSessionTemplate( @Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception { return new SqlSessionTemplate(sqlSessionFactory);
看似没有配置事务管理器,其实atomikos已经在暗处给我们提供了一个全局性的分布式事务管理器,无需担心,好吧
(10)Spring-Boot全局启动入口
Application.java
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import com.appleyk.config.MasterConfig; import com.appleyk.config.SlaveConfig; @SpringBootApplication // same as @Configuration @EnableAutoConfiguration @ComponentScan @EnableConfigurationProperties(value = { MasterConfig.class, SlaveConfig.class }) public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args);
启动Spring-Boot
(11)准备DAO层
利用mybatis的通用mapper,先为主库master和从库slave的表a和表b分别进行Java实体映射
A.java
package com.appleyk.entity; import javax.persistence.Table; public void setId(Integer id) { public String getName() { public void setName(String name) { public void setSex(String sex) { public Integer getAge() { public void setAge(Integer age) {
这种映射很简单,就是和表a的字段一一对应
B.java 的内容和 A.java的内容一模一样,不同的是下面这个地方
利用mybatis的通用mapper,再为主库master和从库slave的表a和表b分别进行mapper接口的增删改查实现
AMapper.java
package com.appleyk.mapper.master; import com.appleyk.entity.A; import tk.mybatis.mapper.common.Mapper; public interface AMapepr extends Mapper<A>{
是的,你没看错,里面一句增删改查的代码都没有,就是这么通用好使,B的mapper和A的一样,如下
Dao层布置完,准备Service层,走业务逻辑
(12)准备Service层
分布式事务应用场景:
有一个数据,格式为json串,序列化后实则为一个对象,假设是A,现master库需要存储A,而slave库由于业务需要也要存储这个对象A(通过转化A对象为B对象),于是乎,我们定义一个ObjectService,作为整个存储操作的入口服务
ObjectService.javapackage com.appleyk.service; import com.appleyk.entity.A; public interface ObjectService { boolean Save(A a) throws Exception;
master库存储A对象的接口为
AService.java package com.appleyk.service; import com.appleyk.entity.A; public interface AService {
其实现为
AServiceImpl.java package com.appleyk.service.Impl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import com.appleyk.entity.A; import com.appleyk.mapper.master.AMapepr; import com.appleyk.service.AService; public class AServiceImpl implements AService { public boolean SaveA(A a) { return aMapper.insert(a) > 0;
slave库存储B对象的接口为
BService.java package com.appleyk.service; import com.appleyk.entity.B; public interface BService { boolean SaveB(B b) throws Exception;
其实现为
BServiceImpl.java package com.appleyk.service.Impl; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import com.appleyk.entity.B; import com.appleyk.mapper.slave.BMapepr; import com.appleyk.service.BService; public class BServiceImpl implements BService { public boolean SaveB(B b) throws Exception{ int count = bMapper.insert(b); if(b.getName().length()>5){ System.err.println("B事务回滚"); throw new Exception("名称超过5"); System.err.println("B事务提交");
放大招了,放大招了,我们看ObjectService的实现
ObjectServiceImpl.java
package com.appleyk.service.Impl; import java.sql.SQLException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Primary; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import com.appleyk.entity.A; import com.appleyk.entity.B; import com.appleyk.service.AService; import com.appleyk.service.BService; import com.appleyk.service.ObjectService; public class ObjectServiceImpl implements ObjectService { private AService aService; private BService bService; @Transactional(rollbackFor = { Exception.class, SQLException.class }) public boolean Save(A a) throws Exception { if (!aService.SaveA(a)) { if (!bService.SaveB(b)) { System.err.println("A事务回滚"); throw new Exception("我的错,保存B异常"); System.err.println("A事务提交");
(13)Controller层对外提供Restful风格的API接口
ObjectController.java
package com.appleyk.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import com.appleyk.entity.A; import com.appleyk.result.ResponseMessage; import com.appleyk.result.ResponseResult; import com.appleyk.service.ObjectService; @RequestMapping("/rest/v1.0.1/object") public class ObjectController { private ObjectService objService; public ResponseResult SaveObject(@RequestBody A a) throws Exception { if (objService.Save(a)) { return new ResponseResult(ResponseMessage.OK); return new ResponseResult(ResponseMessage.INTERNAL_SERVER_ERROR);
主库的a表数据集空
从库的b表数据集也空
(15)利用Insomnia进行API测试
json数据
启动项目
测试(异常的)
保存对象的时候提示了异常,别慌,我们看一下后台输出的内容是不是按照我们假定的方式走的
mysql控制台验证一把
测试(正常的) -- 我们传入name的时候,长度设置小点,比如 name = kobe
后台输出
mysql可视化工具验证走一把
掉个头,我们来让A对象存储的时候,发生异常,而且抛出的异常还是未做检查的
测试(异常的)
后台输出
由于ArithmeticException继承Exception(异常的基类),而我们又设置了
所以,整个分布式事务会进行回滚,A对象和B对象都将无法正确的进行存储
mysql控制台进行验证
|