分享

Mysql 数据实时同步hbase

 码农书馆 2020-08-26

一、前言

要实时同步数据,首先要能实时的监控到数据库数据的变化,可以使用canal、Maxwell 等工具完成。我选用canal,因为它更灵活,更合格我的项目需求。

二、通过canal监控数据库数据变化

Canal安装教程:https://www.aliyun.com/jiaocheng/1117575.html

三、项目整体架构

项目整体架构、离线同步:https://blog.csdn.net/beyond_qjm/article/details/83623738

四、主要代码

  1. import com.alibaba.otter.canal.client.CanalConnector;
  2. import com.alibaba.otter.canal.client.CanalConnectors;
  3. import com.alibaba.otter.canal.protocol.CanalEntry;
  4. import com.alibaba.otter.canal.protocol.CanalEntry.*;
  5. import com.alibaba.otter.canal.protocol.Message;
  6. import org.apache.commons.logging.Log;
  7. import org.apache.commons.logging.LogFactory;
  8. import org.apache.hadoop.hbase.client.Delete;
  9. import org.apache.hadoop.hbase.client.Get;
  10. import org.apache.hadoop.hbase.util.Bytes;
  11. import qjm.data.synch.hbase.HbaseSerialization;
  12. import qjm.data.synch.hbase.HbaseUtils;
  13. import qjm.data.synch.modle.Employee;
  14. import qjm.data.synch.service.SqlDataService;

  15. import java.io.IOException;
  16. import java.net.InetSocketAddress;
  17. import java.util.List;

  18. /**
  19. * 实时同步数据
  20. */
  21. public class OnlineSynch {
  22. static final Log LOG = LogFactory.getLog(OnlineSynch.class);

  23. SqlDataService sqlDataService = new SqlDataService("SqlMapConfig.xml");
  24. HbaseUtils hbaseUtils = new HbaseUtils();

  25. /**
  26. * 从关系型数据库同步数据到hbase
  27. */
  28. public void synchToHbase(){
  29. // 创建链接
  30. CanalConnector connector = CanalConnectors.newSingleConnector(
  31. new InetSocketAddress("192.168.135.132",
  32. 11111),
  33. "example",
  34. "",
  35. ""
  36. );
  37. int batchSize = 1000;
  38. Long batchId = null;
  39. try {
  40. connector.connect();
  41. //指定监听数据库
  42. connector.subscribe("grg_hr\\..*");
  43. connector.rollback();
  44. while (true) {
  45. // 获取指定数量的数据
  46. Message message = connector.getWithoutAck(batchSize);
  47. batchId = message.getId();
  48. int size = message.getEntries().size();
  49. if (batchId == -1 || size == 0) {
  50. LOG.info("waitting...");
  51. try {
  52. Thread.sleep(1000);
  53. } catch (InterruptedException e) {
  54. }
  55. } else {
  56. LOG.info(String.format("\nmessage[batchId=%s,size=%s]", batchId, size));
  57. handleEntry(message.getEntries());
  58. }

  59. connector.ack(batchId); // 提交确认
  60. }
  61. } catch (Exception e) {
  62. // 处理失败, 回滚数据
  63. if (batchId != null) connector.rollback(batchId);

  64. LOG.error("Error: " + e.getMessage());
  65. throw new RuntimeException(e);
  66. } finally {
  67. connector.disconnect();
  68. }
  69. }

  70. /**
  71. * 处理
  72. * @param entries
  73. */
  74. private void handleEntry(List<Entry> entries) {
  75. //循环事件
  76. for (Entry entry : entries) {
  77. if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
  78. continue;
  79. }

  80. RowChange rowChange = null;
  81. try {
  82. rowChange = RowChange.parseFrom(entry.getStoreValue());
  83. } catch (Exception e) {
  84. throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
  85. }

  86. //输出事件信息
  87. CanalEntry.EventType eventType = rowChange.getEventType();
  88. Header header = entry.getHeader();
  89. LOG.info(String.format("\n================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
  90. header.getLogfileName(), header.getLogfileOffset(),
  91. header.getSchemaName(), header.getTableName(),
  92. eventType));

  93. //解析事件
  94. for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {
  95. if (eventType == EventType.DELETE) {
  96. LOG.info("\n-------> delete");
  97. deleteData(header.getTableName(), rowData.getBeforeColumnsList());
  98. } else if (eventType == EventType.INSERT) {
  99. LOG.info("\n-------> insert");
  100. updateData(header.getTableName(), rowData.getAfterColumnsList());
  101. } else if (eventType == EventType.UPDATE) {
  102. //LOG.info("\n-------> before");
  103. //printColumn(rowData.getBeforeColumnsList());
  104. LOG.info("\n-------> after");
  105. updateData(header.getTableName(),rowData.getAfterColumnsList());
  106. }
  107. }
  108. }
  109. }

  110. /**
  111. * 更新数据
  112. */
  113. private void updateData(String tableName, List<Column> columns){
  114. /**
  115. * 1. 获取主键
  116. * 2. 根据主键查询
  117. * 3. 更新到hbase
  118. */
  119. //获取主键
  120. Long key = getKey(columns);

  121. HbaseSerialization serialization = null;
  122. //根据不同表做处理
  123. if(tableName.equals("hr_employee")){
  124. serialization = sqlDataService.getEmployeeById(key);
  125. }

  126. if (serialization != null){
  127. try {
  128. Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
  129. LOG.info("before : \n"+employee);

  130. hbaseUtils.putData(serialization);

  131. employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
  132. LOG.info("before : \n"+employee);
  133. } catch (Exception e) {
  134. LOG.error(e.getMessage());
  135. }
  136. }
  137. }

  138. /**
  139. * 删除数据
  140. */
  141. private void deleteData(String tableName, List<Column> columns){
  142. /**
  143. * 1. 获取主键
  144. * 2. 根据主键删除hbase数据
  145. */
  146. //获取主键
  147. Long key = getKey(columns);

  148. Class clazz = null;
  149. //根据不同表做处理
  150. if(tableName.equals("hr_employee")){
  151. clazz = Employee.class;
  152. }

  153. try {
  154. Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
  155. LOG.info("before : \n"+employee);

  156. hbaseUtils.deleteData(clazz, new Delete(Bytes.toBytes(key)));

  157. employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class);
  158. LOG.info("after : \n"+employee);
  159. } catch (Exception e) {
  160. LOG.error(e.getMessage());
  161. }

  162. }

  163. /**
  164. * 获取主键
  165. * @return
  166. */
  167. private Long getKey(List<Column> columns){
  168. try{
  169. for (Column column : columns) {
  170. if(column.getName().equals("id")){
  171. return Long.valueOf(column.getValue());
  172. }
  173. }
  174. }catch (Exception e){
  175. e.printStackTrace();
  176. throw new RuntimeException("Not found primary key !");
  177. }
  178. throw new RuntimeException("Not found primary key !");
  179. }

  180. }

五、项目代码

githuphttps://github.com/beyondQjm/data-synch.git

CSDNhttps://download.csdn.net/download/beyond_qjm/10758716

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多