分享

腾讯大牛教你ClickHouse实时同步MySQL数据

 头号码甲 2022-02-25

作者 史鹏宙 CSIG云与智慧产业事业群研发工程师

ClickHouse作为OLAP分析引擎已经被广泛使用,数据的导入导出是用户面临的第一个问题。由于ClickHouse本身无法很好地支持单条大批量的写入,因此在实时同步数据方面需要借助其他服务协助。本文给出一种结合Canal+Kafka的方案,并且给出在多个MySQL实例分库分表的场景下,如何将多张MySQL数据表写入同一张ClickHouse表的方法,欢迎大家批评指正。

首先来看看我们的需求背景:

  1. 实时同步多个MySQL实例数据到ClickHouse,每天规模500G,记录数目亿级别,可以接受分钟级别的同步延迟;

  2. 某些数据库表存在分库分表的操作,用户需要跨MySQL实例跨数据库的表同步到ClickHouse的一张表中;

  3. 现有的MySQL binlog开源组件(Canal),无法做到多张源数据表到一张目的表的映射关系。

基本原理

一、使用JDBC方式同步

  1. 使用Canal组件完成binlog的解析和数据同步;

  2. Canal-Server进程会伪装成MySQL的slave,使用MySQL的binlog同步协议完成数据同步;

  3. Canal-Adapter进程负责从canal-server获取解析后的binlog,并且通过jdbc接口写入到ClickHouse;

image.png

优点:

  1. Canal组件原生支持;

缺点:

  1. Canal-Adpater写入时源表和目的表一一对应,灵活性不足;

  2. 需要维护两个Canal组件进程;

二、Kafka+ClickHouse物化视图方式同步

  1. Canal-Server完成binlog的解析,并且将解析后的json写入Kafka;

  2. Canal-Server可以根据正则表达式过滤数据库和表名,并且根据规则写入Kafka的topic;

  3. ClickHouse使用KafkaEngine和Materialized View完成消息消费,并写入本地表;

image.png

优点:

  1. Kafka支持水平扩展,可以根据数据规模调整partition数目;

  2. Kafka引入后将写入请求合并,防止ClickHouse生成大量的小文件,从而影响查询性能;

  3. Canal-Server支持规则过滤,可以灵活配置上游的MySQL实例的数据库名和表名,并且指明写入的Kafka topic名称;

缺点:

  1. 需要维护Kafka和配置规则;

  2. ClickHouse需要新建相关的视图、Kafka Engine的外表等;

具体步骤

一、准备工作

  1. 如果使用TencentDB,则在控制台确认binlog_format为ROW,无需多余操作。

image.png

如果是自建MySQL,则在客户端中查询变量:

>   show variables like '%binlog%';

+-----------------------------------------+----------------------+

| Variable_name                           | Value                |

+-----------------------------------------+----------------------+

| binlog_format                           | ROW                  |

+-----------------------------------------+----------------------+

 

> show variables like '%log_bin%';

+---------------------------------+--------------------------------------------+

| Variable_name                   | Value                                      |

+---------------------------------+--------------------------------------------+

| log_bin                         | ON                                         |

| log_bin_basename                |  /data/mysql_root/log/20146/mysql-bin        |

| log_bin_index                   |  /data/mysql_root/log/20146/mysql-bin.index |

+---------------------------------+--------------------------------------------+
  1. 创建账号canal,用于同步binlog

CREATE USER canal IDENTIFIED BY 'canal';

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';

FLUSH PRIVILEGES;

二、Canal组件部署

前置条件:

Canal组件部署的机器需要跟ClickHouse服务和MySQL网络互通;

需要在机器上部署java8,配置JAVA_HOME、PATH等环境变量;

基本概念:

image.png

1. Canal-Server组件部署

Canal-Server的主要作用是订阅binlog信息并解析和定义instance相关信息,建议每个Canal-Server进程对应一个MySQL实例;

1)下载canal.deployer-1.1.4.tar.gz,解压

2)修改配置文件conf/canal.properties,需要关注的配置如下:

...

# 端口相关信息,如果同一台机器部署多个进程需要修改

canal.port = 11111

canal.metrics.pull.port = 11112

canal.admin.port = 11110

...

# 服务模式

canal.serverMode = tcp

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息队列时 这两个值必须为true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目录下必须有同名的目录

canal.destinations = example,example2

3)配置instance

可以参照example新增新的instance,主要修改配置文件conf/${instance_name}/instance.properties文件。

样例1: 同步某个数据库的以XX前缀开头的表

订阅 172.21.48.35的MySQL的testdb数据库中的以tb_开头的表的数据变更(例如tb_20200801 、 tb_20200802等),主要的步骤如下:

步骤1:创建example2实例:cddeployer/conf && cp -r example example2

步骤2:修改deployer/conf/example2/instance.properties文件

...

# 上游MySQL实例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步账户信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 过滤数据库名称和表名

canal.instance.filter.regex=testdb\\.tb_.*,

步骤3:在conf/canal.properties中修改 canal.destinations ,新增example2

样例2: 同步多个数据库的以XX前缀开头的表,且输出到Kafka

订阅 172.21.48.35的MySQL的empdb_0数据库的employees_20200801表,empdb_1数据库的employees_20200802表,并且数据写入Kafka;

步骤1:创建example2实例:cddeployer/conf && cp -r example example3

步骤2:修改deployer/conf/example3/instance.properties文件

...

# 上游MySQL实例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步账户信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 过滤数据库名称和表名

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

# Kafka的topic名称和匹配的规则

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

 

# Kafka topic的分区数目(即partition数目)

canal.mq.partitionsNum=3

 

# 根据employees_开头的表中的 emp_no字段来进行数据hash,分布到不同的partition

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

步骤3:在Kafka中新建topic employees_topic,指定分区数目为3

步骤4:在conf/canal.properties中修改 canal.destinations ,新增example3;修改服务模式为kafka,配置kafka相关信息;

# 服务模式

canal.serverMode = kafka

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息队列时 这两个值必须为true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目录下必须有同名的目录

canal.destinations =  example,example2,example3

2. Canal-Adapter组件部署(只针对方案一)

Canal-Adapter的主要作用是通过JDBC接口写入ClickHouse数据,可以配置多个表的写入;

1)下载canal.adapter-1.1.4.tar.gz,解压;

2)在lib目录下新增clickhouse驱动jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

3)修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;

server:

   port: 8081

spring:

   jackson:

     date-format: yyyy-MM-dd HH

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多