分享

Spring

 甘甘灰 2018-08-18

一、何为事务


定义:事务是指多个操作单元组成的合集,多个单元操作是整体不可分割的,要么都操作成功,要么都不成功。

其必须遵循的四个原则(ACID):


原子性(Atomicity -- 美 [ˌætəˈmɪsɪti]):事务是不可分割的最小单位,事务内的操作要么全成功(commit),要么一个失败全失败(rollback)

一致性(Consistency --  美 [kənˈsɪstənsi]):在事务执行前,数据库的数据处于正确的状态,而事务执行完成后,数据库的数据还应该是处于正确的状态,即数据完整性约束没有被破坏; 比如A向B转了10元钱,涉及的操作有,B+10,A-10,在转钱这个操作(两个action)最终成功的进行事务的提交后,必须保证A的账户金额确实-10,而不是B+10拿到钱后,就不管A-10了(万一A没扣钱,岂不是赚了一笔),否则数据的完整性无法达到两边甚至N边一致。

隔离性(Isolation -- 美 [ˌaɪsəˈleʃən]):并发事务执行之间互不影响,在一个事务内部的操作对其他事务是不会产生影响的,这需要事务隔离级别来指定隔离性;

       |

       |

      V

    --   五大隔离级别(不算默认的话,就是四大):


① Isolation.DEFAULT使用数据库设置的隔离级别 ( 默认 ) ,由 DBA 默认的设置来决定隔离级别 .

② Isolation.READ_UNCOMMITTED这是事务最低的隔离级别,它充许别外一个事务可以拿到这个事务未提交的数据。这种隔离级别会产生脏读,不可重复读和幻读

③ Isolation.READ_COMMITTED保证一个事务修改的数据提交后才能被另外一个事务读取。另外一个事务不能读取该事务未提交的数据。这种事务隔离级别可以避免脏读出现,但是可能会出现不可重复读和幻读

④ Isolation.REPEATABLE_READ: 这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻读。它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了避免下面的情况产生(不可重复读)。

⑤ Isolation.SERIALIZABLE :级别最高,花费的代价最高,但却是最可靠的事务隔离级别。事务被处理为顺序执行。除了防止脏读,不可重复读外,还避免了幻读。


       |

       |

      V


脏读: 一个事务读取了另一个事务未提交的数据     ---> 危险、可怕

               A 向 B 转100元钱 ==   B+100 and A-100  两个操作

              如果在上述的操作中,B对应的事务还未提交(此时数据已经update进去了),假设另一端B正在查询自己的账户,会发现:"咦,昨天卡里就100,怎么今天突然成了200",还没等B高兴过来,A那边的ATM机坏掉了,于是所有事务回滚,回到起点,假设B这时候又查了一次(怀疑中),会发现:"咦,奇了怪了,刚才200,怎么现在....." ,再来个大胆的假设,假设A转账的过程非常慢,而B查到了自己账户多出100元,于是乎就兴冲冲的取出来200,就去嗨皮了;结果A那边在漫长的等待后,ATM机子还是坏掉了,所有事务回滚,完了,那100块钱怎么处理?


不可重复读:一个事务读取表中的某条数据,多次读取发现结果不一样 (读取了另一个事务已提交的数据)

                假设A的账户里面有100元,A的朋友B想找A借钱,于是就去银行柜台想通过工作人员查询A的账户里有多少钱,在工作人员多次查询的情况下(假设这在一个事务内),巧的是,在工作人员第一次查询的时候,A账户金额还是显示的100,这个时候,另一端的A在ATM机上成功的给自己的账户里面存了100元,于是乎,工作人员第二次查询的时候,电脑屏幕上显示的A的账户金额为200元,这个时候,工作人员在没有确认A是通过ATM机给自己存了100元的前提下,是无法确认这两次查询到底哪一次是正确的,也有可能第一次查询系统出错了,也有可能是第二次查询的时候系统出错了,哈哈,说到这,我感觉很有意思了,总之,不可重复读区别于脏读,脏读读的是未提交的数据,而这个读的是提交的数据。


 幻读:一个事务在插入数据时先检测到记录不存在,于是乎准备进行插入,这时候却惊奇的发现刚才检测的不存在的记录居然存在了,这时候第一个事务肯定插不进去了,我们猜测一种情况就是主键冲突,怎么回事呢?原因就在于,事务在插入的时候,另一个事务已经将数据更新,造成了前一个事务有一种见了鬼的感觉。


持久性(Durability -- 美 [ˌdjʊrəˈbɪlətɪ]):事务一旦执行成功,它对数据库的数据的改变必须是永久的,不会因比如遇到系统故障或断电造成数据不一致或丢失。




二、事务分类


1. 数据库事务分为   -- 本地事务   -- 全局事务

       本地事务:普通事务,独立一个数据库(Connection),能保证在该数据库上操作的ACID

       全局事务(分布式事务):涉及两个或多个数据库源的事务,即跨越多台同类或异类数据库的事务(由每台数据库的本地事务组成的),分布式事务旨在保证这些本地事务的所有操作的ACID,使事务可以跨越多台数据库;


2. Java事务类型分为 -- JDBC事务 跟 -- JTA事务

       JDBC事务:即为上面说的数据库事务中的本地事务,通过connection对象控制管理

       JTA(Java Transaction API)事务:Java事务API,是Java EE数据库事务规范, JTA只提供了事物管理接口,由应用程序服务器厂商(如WebSphere Application Server)提供实现,JTA事务比JDBC更强大,支持分布式事务


3. 编程式事务和声明式事务

       编程式事务:通过代码在业务执行时根据需要自行实现事务的commit和rollback,粒度更小,可作用在代码块上,缺点:不可复用,重复的代码太多

       声明式事务:繁琐的有XML配置,简单粗暴的直接使用@Transactional注解实现



三、什么是Atomikos(以下摘自搜狗百科)


全称:Atomikos TransactionsEssentials 是一个为Java平台提供增值服务的并且开源类事务管理器,以下是包括在这个开源版本中的一些功能:

l 全面崩溃 / 重启恢复

l 兼容标准的SUN公司JTA API

l 嵌套事务

l 为XA和非XA提供内置的JDBC适配器


注释:XA:XA协议由Tuxedo首先提出的,并交给X/Open组织,作为资源管理器(数据库)与事务管理器的接口标准。目前,Oracle、Informix、DB2、Sybase、MySql、免费开源的Postgresql等各大数据库厂家都提供对XA的支持。XA协议采用两阶段提交方式来管理分布式事务。XA接口提供资源管理器与事务管理器之间进行通信的标准接口。XA协议包括两套函数,以xa_开头的及以ax_开头的。

有人说 XA 是 eXtended Architecture扩充体系结构) 的缩写, 其实我觉得这仅仅是一种巧合. eXtended Architecture 是一种CD ROM的驱动架构.


以下的函数使事务管理器可以对资源管理器进行的操作:

1)xa_open,xa_close:建立和关闭与资源管理器的连接。

2)xa_start,xa_end:开始和结束一个本地事务。

3)xa_prepare,xa_commit,xa_rollback:预提交、提交和回滚一个本地事务。

4)xa_recover:回滚一个已进行预提交的事务。

5)ax_开头的函数使资源管理器可以动态地在事务管理器中进行注册,并可以对XID(TRANSACTION IDS)进行操作。

6)ax_reg,ax_unreg;允许一个资源管理器在一个TMS(TRANSACTION MANAGER SERVER)中动态注册或撤消注册。



mysql数据库驱动实现 XADataSource接口





postgresql数据库驱动实现 XADataSource接口







四、Spring-Boot+Atomikos+MySql实现多库的分布式事务管理



(1)项目目录结构图




(2)Pom.xml


  1. <project xmlns="http://maven./POM/4.0.0" xmlns:xsi="http://www./2001/XMLSchema-instance"
  2. xsi:schemaLocation="http://maven./POM/4.0.0 http://maven./xsd/maven-4.0.0.xsd">
  3. <modelVersion>4.0.0</modelVersion>
  4. <groupId>com.appleyk</groupId>
  5. <artifactId>spring-boot-atomikos</artifactId>
  6. <version>0.0.1-SNAPSHOT</version>
  7. <packaging>war</packaging>
  8. <name>atomikos</name>
  9. <description>跨库的分布式事务统一管理</description>
  10. <!-- 继承官网最新父POM【假设当前项目不再继承其他POM】 -->
  11. <!-- http://projects./spring-boot/#quick-start -->
  12. <parent>
  13. <groupId>org.springframework.boot</groupId>
  14. <artifactId>spring-boot-starter-parent</artifactId>
  15. <version>1.5.9.RELEASE</version>
  16. </parent>
  17. <!-- 使用Java8,嘗試使用新特新【stream和lambda】 -->
  18. <properties>
  19. <java.version>1.8</java.version>
  20. </properties>
  21. <!-- Starter POMs是可以包含到应用中的一个方便的依赖关系描述符集合 -->
  22. <!-- 该Starters包含很多你搭建项目, 快速运行所需的依赖, 并提供一致的, 管理的传递依赖集。 -->
  23. <!-- 大多数的web应用都使用spring-boot-starter-web模块进行快速搭建和运行。 -->
  24. <!-- spring-boot-starter-web -->
  25. <!-- 对全栈web开发的支持, 包括Tomcat和 spring-webmvc -->
  26. <dependencies>
  27. <dependency>
  28. <groupId>org.springframework.boot</groupId>
  29. <artifactId>spring-boot-starter-web</artifactId>
  30. </dependency>
  31. <!-- 添加Mybatis、Spring-Mybatis依赖 -->
  32. <!-- mybatis-spring-boot-starter继承树那是相当全面 -->
  33. <dependency>
  34. <groupId>org.mybatis.spring.boot</groupId>
  35. <artifactId>mybatis-spring-boot-starter</artifactId>
  36. <version>1.1.1</version>
  37. </dependency>
  38. <!-- MySql驱动依赖 -->
  39. <dependency>
  40. <groupId>mysql</groupId>
  41. <artifactId>mysql-connector-java</artifactId>
  42. </dependency>
  43. <!-- https:///artifact/org.postgresql/postgresql -->
  44. <!-- PostGresQl驱动依赖 -->
  45. <dependency>
  46. <groupId>org.postgresql</groupId>
  47. <artifactId>postgresql</artifactId>
  48. </dependency>
  49. <dependency>
  50. <groupId>org.springframework.boot</groupId>
  51. <artifactId>spring-boot-starter-jta-atomikos</artifactId>
  52. </dependency>
  53. <!-- 添加热部署 devtools:监听文件变动 -->
  54. <!-- 当Java文件改动时,Spring-boo会快速重新启动 -->
  55. <!-- 最简单的测试,就是随便找一个文件Ctrl+S一下,就可以看到效果 -->
  56. <dependency>
  57. <groupId>org.springframework.boot</groupId>
  58. <artifactId>spring-boot-devtools</artifactId>
  59. <!-- optional=true,依赖不会传递 -->
  60. <!-- 本项目依赖devtools;若依赖本项目的其他项目想要使用devtools,需要重新引入 -->
  61. <optional>true</optional>
  62. </dependency>
  63. <!-- Spring 单元测试 -->
  64. <dependency>
  65. <groupId>org.springframework.boot</groupId>
  66. <artifactId>spring-boot-starter-test</artifactId>
  67. <scope>test</scope>
  68. </dependency>
  69. <!-- https:///artifact/tk.mybatis/mapper-spring-boot-starter -->
  70. <!-- mybatis通用mapper -->
  71. <dependency>
  72. <groupId>tk.mybatis</groupId>
  73. <artifactId>mapper-spring-boot-starter</artifactId>
  74. <version>1.1.5</version>
  75. </dependency>
  76. </dependencies>
  77. </project>



(3)application.properties配置多数据


  1. server.port=8088
  2. server.session.timeout=10
  3. server.tomcat.uri-encoding=utf8
  4. #主数据源 -- Master
  5. mysql.datasource.master.url = jdbc\:mysql\://localhost\:3306/master?useUnicode\=true&characterEncoding\=utf-8
  6. mysql.datasource.master.username = root
  7. mysql.datasource.master.password = root
  8. mysql.datasource.master.minPoolSize = 3
  9. mysql.datasource.master.maxPoolSize = 25
  10. mysql.datasource.master.maxLifetime = 20000
  11. mysql.datasource.master.borrowConnectionTimeout = 30
  12. mysql.datasource.master.loginTimeout = 30
  13. mysql.datasource.master.maintenanceInterval = 60
  14. mysql.datasource.master.maxIdleTime = 60
  15. mysql.datasource.master.testQuery = select 1
  16. #从数据源 -- Slave
  17. mysql.datasource.slave.url =jdbc\:mysql\://localhost\:3306/slave?useUnicode\=true&characterEncoding\=utf-8
  18. mysql.datasource.slave.username =root
  19. mysql.datasource.slave.password =root
  20. mysql.datasource.slave.minPoolSize = 3
  21. mysql.datasource.slave.maxPoolSize = 25
  22. mysql.datasource.slave.maxLifetime = 20000
  23. mysql.datasource.slave.borrowConnectionTimeout = 30
  24. mysql.datasource.slave.loginTimeout = 30
  25. mysql.datasource.slave.maintenanceInterval = 60
  26. mysql.datasource.slave.maxIdleTime = 60
  27. mysql.datasource.slave.testQuery = select 1
  28. #在application.properties文件中引入日志配置文件
  29. #===================================== log =============================
  30. logging.config=classpath:logback-boot.xml



(4)日志文件logback-boot.xml配置 (设置日志级别为error,方便输出查看)


  1. <configuration>
  2. <!-- %m输出的信息,%p日志级别,%t线程名,%d日期,%c类的全名,%i索引【从数字0开始递增】,,, -->
  3. <!-- appender是configuration的子节点,是负责写日志的组件。 -->
  4. <!-- ConsoleAppender:把日志输出到控制台 -->
  5. <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
  6. <encoder>
  7. <pattern>%d %p (%file:%line\)- %m%n</pattern>
  8. <!-- 控制台也要使用UTF-8,不要使用GBK,否则会中文乱码 -->
  9. <charset>UTF-8</charset>
  10. </encoder>
  11. </appender>
  12. <!-- RollingFileAppender:滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 -->
  13. <!-- 以下的大概意思是:1.先按日期存日志,日期变了,将前一天的日志文件名重命名为XXX%日期%索引,新的日志仍然是sys.log -->
  14. <!-- 2.如果日期没有发生变化,但是当前日志的文件大小超过1KB时,对当前日志进行分割 重命名-->
  15. <appender name="syslog"
  16. class="ch.qos.logback.core.rolling.RollingFileAppender">
  17. <!-- <File>log/sys.log</File> -->
  18. <File>opt/spring-boot-web/logs/sys.log</File>
  19. <!-- rollingPolicy:当发生滚动时,决定 RollingFileAppender 的行为,涉及文件移动和重命名。 -->
  20. <!-- TimeBasedRollingPolicy: 最常用的滚动策略,它根据时间来制定滚动策略,既负责滚动也负责出发滚动 -->
  21. <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
  22. <!-- 活动文件的名字会根据fileNamePattern的值,每隔一段时间改变一次 -->
  23. <!-- 文件名:log/sys.2017-12-05.0.log -->
  24. <fileNamePattern>log/sys.%d.%i.log</fileNamePattern>
  25. <!-- 每产生一个日志文件,该日志文件的保存期限为30天 -->
  26. <maxHistory>30</maxHistory>
  27. <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
  28. <!-- maxFileSize:这是活动文件的大小,默认值是10MB,本篇设置为1KB,只是为了演示 -->
  29. <maxFileSize>10MB</maxFileSize>
  30. </timeBasedFileNamingAndTriggeringPolicy>
  31. </rollingPolicy>
  32. <encoder>
  33. <!-- pattern节点,用来设置日志的输入格式 -->
  34. <pattern>
  35. %d %p (%file:%line\)- %m%n
  36. </pattern>
  37. <!-- 记录日志的编码 -->
  38. <charset>UTF-8</charset> <!-- 此处设置字符集 -->
  39. </encoder>
  40. </appender>
  41. <!-- 控制台输出日志级别 -->
  42. <root level="error">
  43. <appender-ref ref="STDOUT" />
  44. </root>
  45. <!-- 指定项目中某个包,当有日志操作行为时的日志记录级别 -->
  46. <!-- com.appley为根包,也就是只要是发生在这个根包下面的所有日志操作行为的权限都是DEBUG -->
  47. <!-- 级别依次为【从高到低】:FATAL > ERROR > WARN > INFO > DEBUG > TRACE -->
  48. <logger name="com.appleyk" level="error">
  49. <appender-ref ref="syslog" />
  50. </logger>
  51. </configuration>


(5)mysql数据库



A. 结构 (数据库引擎 InnoDB)





B. sql脚本


master_a.sql

  1. --
  2. -- Table structure for table `a`
  3. --
  4. DROP TABLE IF EXISTS `a`;
  5. CREATE TABLE `a` (
  6. `id` int(11) NOT NULL AUTO_INCREMENT,
  7. `name` varchar(45) DEFAULT NULL,
  8. `sex` char(2) DEFAULT NULL,
  9. `age` int(11) DEFAULT NULL,
  10. PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;


slave_b.sql

  1. --
  2. -- Table structure for table `b`
  3. --
  4. DROP TABLE IF EXISTS `b`;
  5. CREATE TABLE `b` (
  6. `id` int(11) NOT NULL AUTO_INCREMENT,
  7. `name` varchar(45) DEFAULT NULL,
  8. `sex` char(2) DEFAULT NULL,
  9. `age` int(11) DEFAULT NULL,
  10. PRIMARY KEY (`id`)
  11. ) ENGINE=InnoDB DEFAULT CHARSET=utf8;


为了简单,特意将从库的b表设计的和主库的a表一样,不同的只是数据库不一样和表名不一样



(6)加载主库数据源的属性(参数)




MasterConfig.java


  1. package com.appleyk.config;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. @ConfigurationProperties(prefix="mysql.datasource.master")
  4. public class MasterConfig {
  5. private String url;
  6. private String username;
  7. private String password;
  8. private int minPoolSize;
  9. private int maxPoolSize;
  10. private int maxLifetime;
  11. private int borrowConnectionTimeout;
  12. private int loginTimeout;
  13. private int maintenanceInterval;
  14. private int maxIdleTime;
  15. private String testQuery;
  16. public String getUrl() {
  17. return url;
  18. }
  19. public void setUrl(String url) {
  20. this.url = url;
  21. }
  22. public String getUsername() {
  23. return username;
  24. }
  25. public void setUsername(String username) {
  26. this.username = username;
  27. }
  28. public String getPassword() {
  29. return password;
  30. }
  31. public void setPassword(String password) {
  32. this.password = password;
  33. }
  34. public int getMinPoolSize() {
  35. return minPoolSize;
  36. }
  37. public void setMinPoolSize(int minPoolSize) {
  38. this.minPoolSize = minPoolSize;
  39. }
  40. public int getMaxPoolSize() {
  41. return maxPoolSize;
  42. }
  43. public void setMaxPoolSize(int maxPoolSize) {
  44. this.maxPoolSize = maxPoolSize;
  45. }
  46. public int getMaxLifetime() {
  47. return maxLifetime;
  48. }
  49. public void setMaxLifetime(int maxLifetime) {
  50. this.maxLifetime = maxLifetime;
  51. }
  52. public int getBorrowConnectionTimeout() {
  53. return borrowConnectionTimeout;
  54. }
  55. public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
  56. this.borrowConnectionTimeout = borrowConnectionTimeout;
  57. }
  58. public int getLoginTimeout() {
  59. return loginTimeout;
  60. }
  61. public void setLoginTimeout(int loginTimeout) {
  62. this.loginTimeout = loginTimeout;
  63. }
  64. public int getMaintenanceInterval() {
  65. return maintenanceInterval;
  66. }
  67. public void setMaintenanceInterval(int maintenanceInterval) {
  68. this.maintenanceInterval = maintenanceInterval;
  69. }
  70. public int getMaxIdleTime() {
  71. return maxIdleTime;
  72. }
  73. public void setMaxIdleTime(int maxIdleTime) {
  74. this.maxIdleTime = maxIdleTime;
  75. }
  76. public String getTestQuery() {
  77. return testQuery;
  78. }
  79. public void setTestQuery(String testQuery) {
  80. this.testQuery = testQuery;
  81. }
  82. }



(7)加载从库数据源的属性(参数)




SlaveConfig.java


  1. package com.appleyk.config;
  2. import org.springframework.boot.context.properties.ConfigurationProperties;
  3. @ConfigurationProperties(prefix="mysql.datasource.slave")
  4. public class SlaveConfig {
  5. private String url;
  6. private String username;
  7. private String password;
  8. private int minPoolSize;
  9. private int maxPoolSize;
  10. private int maxLifetime;
  11. private int borrowConnectionTimeout;
  12. private int loginTimeout;
  13. private int maintenanceInterval;
  14. private int maxIdleTime;
  15. private String testQuery;
  16. public String getUrl() {
  17. return url;
  18. }
  19. public void setUrl(String url) {
  20. this.url = url;
  21. }
  22. public String getUsername() {
  23. return username;
  24. }
  25. public void setUsername(String username) {
  26. this.username = username;
  27. }
  28. public String getPassword() {
  29. return password;
  30. }
  31. public void setPassword(String password) {
  32. this.password = password;
  33. }
  34. public int getMinPoolSize() {
  35. return minPoolSize;
  36. }
  37. public void setMinPoolSize(int minPoolSize) {
  38. this.minPoolSize = minPoolSize;
  39. }
  40. public int getMaxPoolSize() {
  41. return maxPoolSize;
  42. }
  43. public void setMaxPoolSize(int maxPoolSize) {
  44. this.maxPoolSize = maxPoolSize;
  45. }
  46. public int getMaxLifetime() {
  47. return maxLifetime;
  48. }
  49. public void setMaxLifetime(int maxLifetime) {
  50. this.maxLifetime = maxLifetime;
  51. }
  52. public int getBorrowConnectionTimeout() {
  53. return borrowConnectionTimeout;
  54. }
  55. public void setBorrowConnectionTimeout(int borrowConnectionTimeout) {
  56. this.borrowConnectionTimeout = borrowConnectionTimeout;
  57. }
  58. public int getLoginTimeout() {
  59. return loginTimeout;
  60. }
  61. public void setLoginTimeout(int loginTimeout) {
  62. this.loginTimeout = loginTimeout;
  63. }
  64. public int getMaintenanceInterval() {
  65. return maintenanceInterval;
  66. }
  67. public void setMaintenanceInterval(int maintenanceInterval) {
  68. this.maintenanceInterval = maintenanceInterval;
  69. }
  70. public int getMaxIdleTime() {
  71. return maxIdleTime;
  72. }
  73. public void setMaxIdleTime(int maxIdleTime) {
  74. this.maxIdleTime = maxIdleTime;
  75. }
  76. public String getTestQuery() {
  77. return testQuery;
  78. }
  79. public void setTestQuery(String testQuery) {
  80. this.testQuery = testQuery;
  81. }
  82. }



(8)配置主数据源





MasterDBSource.java

  1. package com.appleyk.datasource;
  2. import java.sql.SQLException;
  3. import javax.sql.DataSource;
  4. import org.apache.ibatis.session.SqlSessionFactory;
  5. import org.mybatis.spring.SqlSessionFactoryBean;
  6. import org.mybatis.spring.SqlSessionTemplate;
  7. import org.mybatis.spring.annotation.MapperScan;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import org.springframework.context.annotation.Primary;
  12. import com.appleyk.config.MasterConfig;
  13. import com.atomikos.jdbc.AtomikosDataSourceBean;
  14. import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
  15. /**
  16. * SqlSessionFactoryBuilder:build方法创建SqlSessionFactory实例。
  17. * SqlSessionFactory:创建SqlSession实例的工厂。
  18. * SqlSession:用于执行持久化操作的对象,类似于jdbc中的Connection。
  19. * SqlSessionTemplate:MyBatis提供的持久层访问模板化的工具,线程安全,可通过构造参数或依赖注入SqlSessionFactory实例
  20. *
  21. * 主库的数据源模板,应用在主库所对应的Dao层上(扫描对应的mapper),实现主数据源的指定+增删改查
  22. * @author yukun24@126.com
  23. * @blob http://blog.csdn.net/appleyk
  24. * @date 2018年3月16日-下午1:08:53
  25. */
  26. @Configuration // ---> 标注此注解,Spring—Boot启动时,会自动进行相应的主数据源配置 -->注入Bean
  27. @MapperScan(basePackages = "com.appleyk.mapper.master", sqlSessionTemplateRef = "masterSqlSessionTemplate")
  28. public class MasterDBSource {
  29. // 配置主数据源
  30. @Primary
  31. @Bean(name = "MasterDB")
  32. public DataSource testDataSource(MasterConfig masterConfig) throws SQLException {
  33. /**
  34. * MySql数据库驱动 实现 XADataSource接口
  35. */
  36. MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
  37. mysqlXaDataSource.setUrl(masterConfig.getUrl());
  38. mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
  39. mysqlXaDataSource.setPassword(masterConfig.getPassword());
  40. mysqlXaDataSource.setUser(masterConfig.getUsername());
  41. mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
  42. // /**
  43. // * Postgresql数据库驱动 实现 XADataSource
  44. // * 包 --> org.postgresql.xa.PGXADataSource;
  45. // */
  46. // PGXADataSource pgxaDataSource = new PGXADataSource();
  47. // pgxaDataSource.setUrl(masterConfig.getUrl());
  48. //
  49. /**
  50. * 设置分布式-- 主数据源
  51. */
  52. AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
  53. xaDataSource.setXaDataSource(mysqlXaDataSource);
  54. xaDataSource.setUniqueResourceName("MasterDB");
  55. xaDataSource.setMinPoolSize(masterConfig.getMinPoolSize());
  56. xaDataSource.setMaxPoolSize(masterConfig.getMaxPoolSize());
  57. xaDataSource.setMaxLifetime(masterConfig.getMaxLifetime());
  58. xaDataSource.setBorrowConnectionTimeout(masterConfig.getBorrowConnectionTimeout());
  59. xaDataSource.setLoginTimeout(masterConfig.getLoginTimeout());
  60. xaDataSource.setMaintenanceInterval(masterConfig.getMaintenanceInterval());
  61. xaDataSource.setMaxIdleTime(masterConfig.getMaxIdleTime());
  62. xaDataSource.setTestQuery(masterConfig.getTestQuery());
  63. System.err.println("主数据源注入成功.....");
  64. return xaDataSource;
  65. }
  66. @Bean(name = "masterSqlSessionFactory")
  67. public SqlSessionFactory masterSqlSessionFactory(@Qualifier("MasterDB") DataSource dataSource) throws Exception {
  68. SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  69. bean.setDataSource(dataSource);
  70. return bean.getObject();
  71. }
  72. @Bean(name = "masterSqlSessionTemplate")
  73. public SqlSessionTemplate masterSqlSessionTemplate(
  74. @Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
  75. return new SqlSessionTemplate(sqlSessionFactory);
  76. }
  77. }


(9)配置从数据源 (同上,只是改了个名字)



SlaveDBSource.java

  1. package com.appleyk.datasource;
  2. import java.sql.SQLException;
  3. import javax.sql.DataSource;
  4. import org.apache.ibatis.session.SqlSessionFactory;
  5. import org.mybatis.spring.SqlSessionFactoryBean;
  6. import org.mybatis.spring.SqlSessionTemplate;
  7. import org.mybatis.spring.annotation.MapperScan;
  8. import org.springframework.beans.factory.annotation.Qualifier;
  9. import org.springframework.context.annotation.Bean;
  10. import org.springframework.context.annotation.Configuration;
  11. import com.appleyk.config.SlaveConfig;
  12. import com.atomikos.jdbc.AtomikosDataSourceBean;
  13. import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
  14. /**
  15. * SqlSessionFactoryBuilder:build方法创建SqlSessionFactory实例。
  16. * SqlSessionFactory:创建SqlSession实例的工厂。
  17. * SqlSession:用于执行持久化操作的对象,类似于jdbc中的Connection。
  18. * SqlSessionTemplate:MyBatis提供的持久层访问模板化的工具,线程安全,可通过构造参数或依赖注入SqlSessionFactory实例
  19. *
  20. * 从库的数据源模板,应用在从库所对应的Dao层上(扫描对应的mapper),实现从数据源的指定+增删改查
  21. *
  22. * @author yukun24@126.com
  23. * @blob http://blog.csdn.net/appleyk
  24. * @date 2018年3月16日-下午1:08:53
  25. */
  26. @Configuration // ---> 标注此注解,Spring—Boot启动时,会自动进行相应的从数据源配置 -->注入Bean
  27. @MapperScan(basePackages = "com.appleyk.mapper.slave", sqlSessionTemplateRef = "slaveSqlSessionTemplate")
  28. public class SlaveDBSource {
  29. // 配置从数据源
  30. @Bean(name = "SlaveDB")
  31. public DataSource testDataSource(SlaveConfig slaveConfig) throws SQLException {
  32. MysqlXADataSource mysqlXaDataSource = new MysqlXADataSource();
  33. mysqlXaDataSource.setUrl(slaveConfig.getUrl());
  34. mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
  35. mysqlXaDataSource.setPassword(slaveConfig.getPassword());
  36. mysqlXaDataSource.setUser(slaveConfig.getUsername());
  37. mysqlXaDataSource.setPinGlobalTxToPhysicalConnection(true);
  38. /**
  39. * 设置分布式 -- 从数据源
  40. */
  41. AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
  42. xaDataSource.setXaDataSource(mysqlXaDataSource);
  43. xaDataSource.setUniqueResourceName("SlaveDB");
  44. /**
  45. * 连接池配置
  46. */
  47. xaDataSource.setMinPoolSize(slaveConfig.getMinPoolSize());
  48. xaDataSource.setMaxPoolSize(slaveConfig.getMaxPoolSize());
  49. xaDataSource.setMaxLifetime(slaveConfig.getMaxLifetime());
  50. xaDataSource.setBorrowConnectionTimeout(slaveConfig.getBorrowConnectionTimeout());
  51. xaDataSource.setLoginTimeout(slaveConfig.getLoginTimeout());
  52. xaDataSource.setMaintenanceInterval(slaveConfig.getMaintenanceInterval());
  53. xaDataSource.setMaxIdleTime(slaveConfig.getMaxIdleTime());
  54. xaDataSource.setTestQuery(slaveConfig.getTestQuery());
  55. System.err.println("从数据源注入成功.....");
  56. return xaDataSource;
  57. }
  58. @Bean(name = "slaveSqlSessionFactory")
  59. public SqlSessionFactory masterSqlSessionFactory(@Qualifier("SlaveDB") DataSource dataSource) throws Exception {
  60. SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
  61. bean.setDataSource(dataSource);
  62. return bean.getObject();
  63. }
  64. @Bean(name = "slaveSqlSessionTemplate")
  65. public SqlSessionTemplate slaveSqlSessionTemplate(
  66. @Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
  67. return new SqlSessionTemplate(sqlSessionFactory);
  68. }
  69. }


看似没有配置事务管理器,其实atomikos已经在暗处给我们提供了一个全局性的分布式事务管理器,无需担心,好吧


(10)Spring-Boot全局启动入口


Application.java


  1. package com.appleyk;
  2. import org.springframework.boot.SpringApplication;
  3. import org.springframework.boot.autoconfigure.SpringBootApplication;
  4. import org.springframework.boot.context.properties.EnableConfigurationProperties;
  5. import com.appleyk.config.MasterConfig;
  6. import com.appleyk.config.SlaveConfig;
  7. @SpringBootApplication // same as @Configuration @EnableAutoConfiguration @ComponentScan
  8. @EnableConfigurationProperties(value = { MasterConfig.class, SlaveConfig.class })
  9. public class Application {
  10. public static void main(String[] args) {
  11. SpringApplication.run(Application.class, args);
  12. }
  13. }



启动Spring-Boot





(11)准备DAO层


利用mybatis的通用mapper,先为主库master和从库slave的表a和表b分别进行Java实体映射




A.java


  1. package com.appleyk.entity;
  2. import javax.persistence.Table;
  3. @Table(name = "a")
  4. public class A {
  5. private Integer id;
  6. private String name;
  7. private String sex;
  8. private Integer age;
  9. public A(){
  10. }
  11. public Integer getId() {
  12. return id;
  13. }
  14. public void setId(Integer id) {
  15. this.id = id;
  16. }
  17. public String getName() {
  18. return name;
  19. }
  20. public void setName(String name) {
  21. this.name = name;
  22. }
  23. public String getSex() {
  24. return sex;
  25. }
  26. public void setSex(String sex) {
  27. this.sex = sex;
  28. }
  29. public Integer getAge() {
  30. return age;
  31. }
  32. public void setAge(Integer age) {
  33. this.age = age;
  34. }
  35. }

这种映射很简单,就是和表a的字段一一对应



B.java 的内容和 A.java的内容一模一样,不同的是下面这个地方





利用mybatis的通用mapper,再为主库master和从库slave的表a和表b分别进行mapper接口的增删改查实现




AMapper.java


  1. package com.appleyk.mapper.master;
  2. import com.appleyk.entity.A;
  3. import tk.mybatis.mapper.common.Mapper;
  4. public interface AMapepr extends Mapper<A>{
  5. }

是的,你没看错,里面一句增删改查的代码都没有,就是这么通用好使,B的mapper和A的一样,如下





Dao层布置完,准备Service层,走业务逻辑



(12)准备Service层





分布式事务应用场景:


有一个数据,格式为json串,序列化后实则为一个对象,假设是A,现master库需要存储A,而slave库由于业务需要也要存储这个对象A(通过转化A对象为B对象),于是乎,我们定义一个ObjectService,作为整个存储操作的入口服务



ObjectService.java
  1. package com.appleyk.service;
  2. import com.appleyk.entity.A;
  3. public interface ObjectService {
  4. boolean Save(A a) throws Exception;
  5. }


master库存储A对象的接口为


AService.java

  1. package com.appleyk.service;
  2. import com.appleyk.entity.A;
  3. public interface AService {
  4. boolean SaveA(A a);
  5. }


其实现为


AServiceImpl.java

  1. package com.appleyk.service.Impl;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Primary;
  4. import org.springframework.stereotype.Service;
  5. import com.appleyk.entity.A;
  6. import com.appleyk.mapper.master.AMapepr;
  7. import com.appleyk.service.AService;
  8. @Service
  9. @Primary
  10. public class AServiceImpl implements AService {
  11. @Autowired
  12. private AMapepr aMapper;
  13. @Override
  14. public boolean SaveA(A a) {
  15. return aMapper.insert(a) > 0;
  16. }
  17. }


slave库存储B对象的接口为


BService.java

  1. package com.appleyk.service;
  2. import com.appleyk.entity.B;
  3. public interface BService {
  4. boolean SaveB(B b) throws Exception;
  5. }

其实现为


BServiceImpl.java

  1. package com.appleyk.service.Impl;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.context.annotation.Primary;
  4. import org.springframework.stereotype.Service;
  5. import com.appleyk.entity.B;
  6. import com.appleyk.mapper.slave.BMapepr;
  7. import com.appleyk.service.BService;
  8. @Service
  9. @Primary
  10. public class BServiceImpl implements BService {
  11. @Autowired
  12. private BMapepr bMapper;
  13. @Override
  14. public boolean SaveB(B b) throws Exception{
  15. int count = bMapper.insert(b);
  16. if(b.getName().length()>5){
  17. System.err.println("B事务回滚");
  18. throw new Exception("名称超过5");
  19. }
  20. System.err.println("B事务提交");
  21. return count >0;
  22. }
  23. }





放大招了,放大招了,我们看ObjectService的实现


ObjectServiceImpl.java


  1. package com.appleyk.service.Impl;
  2. import java.sql.SQLException;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.context.annotation.Primary;
  5. import org.springframework.stereotype.Service;
  6. import org.springframework.transaction.annotation.Transactional;
  7. import com.appleyk.entity.A;
  8. import com.appleyk.entity.B;
  9. import com.appleyk.service.AService;
  10. import com.appleyk.service.BService;
  11. import com.appleyk.service.ObjectService;
  12. @Service
  13. @Primary
  14. public class ObjectServiceImpl implements ObjectService {
  15. @Autowired
  16. private AService aService;
  17. @Autowired
  18. private BService bService;
  19. @Override
  20. @Transactional(rollbackFor = { Exception.class, SQLException.class })
  21. public boolean Save(A a) throws Exception {
  22. if (!aService.SaveA(a)) {
  23. return false;
  24. }
  25. //int i = 1 / 0;
  26. B b = new B(a);
  27. try {
  28. if (!bService.SaveB(b)) {
  29. return false;
  30. }
  31. } catch (Exception e) {
  32. System.err.println("A事务回滚");
  33. throw new Exception("我的错,保存B异常");
  34. }
  35. System.err.println("A事务提交");
  36. return true;
  37. }
  38. }





(13)Controller层对外提供Restful风格的API接口



ObjectController.java


  1. package com.appleyk.controller;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.web.bind.annotation.PostMapping;
  4. import org.springframework.web.bind.annotation.RequestBody;
  5. import org.springframework.web.bind.annotation.RequestMapping;
  6. import org.springframework.web.bind.annotation.RestController;
  7. import com.appleyk.entity.A;
  8. import com.appleyk.result.ResponseMessage;
  9. import com.appleyk.result.ResponseResult;
  10. import com.appleyk.service.ObjectService;
  11. @RestController
  12. @RequestMapping("/rest/v1.0.1/object")
  13. public class ObjectController {
  14. @Autowired
  15. private ObjectService objService;
  16. @PostMapping("/save")
  17. public ResponseResult SaveObject(@RequestBody A a) throws Exception {
  18. if (objService.Save(a)) {
  19. return new ResponseResult(ResponseMessage.OK);
  20. }
  21. return new ResponseResult(ResponseMessage.INTERNAL_SERVER_ERROR);
  22. }
  23. }


(14)测试前,看一眼mysql



主库的a表数据集空



从库的b表数据集也空






(15)利用Insomnia进行API测试



json数据


  1. {
  2. "name": "appleyk",
  3. "sex": "F",
  4. "age":27
  5. }


启动项目





测试(异常的)





保存对象的时候提示了异常,别慌,我们看一下后台输出的内容是不是按照我们假定的方式走的





mysql控制台验证一把



测试(正常的)  --  我们传入name的时候,长度设置小点,比如 name = kobe





后台输出





mysql可视化工具验证走一把









掉个头,我们来让A对象存储的时候,发生异常,而且抛出的异常还是未做检查的








测试(异常的)





后台输出






由于ArithmeticException继承Exception(异常的基类),而我们又设置了



所以,整个分布式事务会进行回滚,A对象和B对象都将无法正确的进行存储


mysql控制台进行验证

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多