一、前言
要实时同步数据,首先要能实时的监控到数据库数据的变化,可以使用canal、Maxwell 等工具完成。我选用canal,因为它更灵活,更合格我的项目需求。
二、通过canal监控数据库数据变化
Canal安装教程:https://www.aliyun.com/jiaocheng/1117575.html
三、项目整体架构
项目整体架构、离线同步:https://blog.csdn.net/beyond_qjm/article/details/83623738
四、主要代码
import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.util.Bytes; import qjm.data.synch.hbase.HbaseSerialization; import qjm.data.synch.hbase.HbaseUtils; import qjm.data.synch.modle.Employee; import qjm.data.synch.service.SqlDataService;
import java.io.IOException; import java.net.InetSocketAddress;
public class OnlineSynch { static final Log LOG = LogFactory.getLog(OnlineSynch.class);
SqlDataService sqlDataService = new SqlDataService("SqlMapConfig.xml"); HbaseUtils hbaseUtils = new HbaseUtils();
public void synchToHbase(){ CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress("192.168.135.132", connector.subscribe("grg_hr\\..*"); Message message = connector.getWithoutAck(batchSize); batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { } catch (InterruptedException e) { LOG.info(String.format("\nmessage[batchId=%s,size=%s]", batchId, size)); handleEntry(message.getEntries());
connector.ack(batchId); // 提交确认 if (batchId != null) connector.rollback(batchId);
LOG.error("Error: " + e.getMessage()); throw new RuntimeException(e);
private void handleEntry(List<Entry> entries) { for (Entry entry : entries) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
RowChange rowChange = null; rowChange = RowChange.parseFrom(entry.getStoreValue()); throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
CanalEntry.EventType eventType = rowChange.getEventType(); Header header = entry.getHeader(); LOG.info(String.format("\n================> binlog[%s:%s] , name[%s,%s] , eventType : %s", header.getLogfileName(), header.getLogfileOffset(), header.getSchemaName(), header.getTableName(),
for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) { if (eventType == EventType.DELETE) { LOG.info("\n-------> delete"); deleteData(header.getTableName(), rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { LOG.info("\n-------> insert"); updateData(header.getTableName(), rowData.getAfterColumnsList()); } else if (eventType == EventType.UPDATE) { //LOG.info("\n-------> before"); //printColumn(rowData.getBeforeColumnsList()); LOG.info("\n-------> after"); updateData(header.getTableName(),rowData.getAfterColumnsList());
private void updateData(String tableName, List<Column> columns){ Long key = getKey(columns);
HbaseSerialization serialization = null; if(tableName.equals("hr_employee")){ serialization = sqlDataService.getEmployeeById(key);
if (serialization != null){ Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("before : \n"+employee);
hbaseUtils.putData(serialization);
employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("before : \n"+employee); LOG.error(e.getMessage());
private void deleteData(String tableName, List<Column> columns){ Long key = getKey(columns);
if(tableName.equals("hr_employee")){
Employee employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("before : \n"+employee);
hbaseUtils.deleteData(clazz, new Delete(Bytes.toBytes(key)));
employee = hbaseUtils.getData(new Get(Bytes.toBytes(key)), Employee.class); LOG.info("after : \n"+employee); LOG.error(e.getMessage());
private Long getKey(List<Column> columns){ for (Column column : columns) { if(column.getName().equals("id")){ return Long.valueOf(column.getValue()); throw new RuntimeException("Not found primary key !"); throw new RuntimeException("Not found primary key !");
五、项目代码
githup : https://github.com/beyondQjm/data-synch.git
CSDN : https://download.csdn.net/download/beyond_qjm/10758716
|