分享

Maxwell:binlog 解析器,轻松同步 MySQL 数据

 古明地觉O_o 2022-12-08 发布于北京

楔子

先来思考一个问题:首先我们的业务数据都存储在 MySQL 中,现在我们要使用大数据框架(如 Hive、Spark)来对 MySQL 的数据进行分析,那么最关键的一步是什么呢?

没错,就是数据同步,我们首先要将 MySQL 的数据同步到 HDFS 上,而搞过数仓的小伙伴肯定知道可以采用 Sqoop 进行同步。通过调度工具在每天的凌晨将前一天的数据拷贝到 HDFS 上,所以 Sqoop 用于离线处理,也就是 T + 1 模式,当天同步前一天的数据。

但如果我们要做实时的数据分析呢?就意味着我们要实时地同步新写入 MySQL 的数据,那么 Sqoop 就不能满足我们的需求了,这时就需要其它的组件。比如阿里的 Canal、Flink 里面的 CDC,它们可以实时地抓取 MySQL 的数据。

如果你的公司已经使用了 Flink,那么推荐直接使用 Flink CDC,但如果没有使用 Flink,那么只是为了单存的同步 MySQL 数据而引入 Flink 就有点大材小用了。此时可以选择阿里的 Canal,不过这里我们要介绍的不是 Canal,而是另外一个组件 Maxwell。它和 Canal、Flink CDC 一样,都是通过监听 MySQL 的 binlog,来实时地同步 MySQL 的数据,但是 Maxwell 更加的轻量级,当然后面我们也会对比 Canal  和 Maxwell  的区别。

通过 Maxwell,我们可以将数据从 MySQL 同步到 Kafka。因为Maxwell不能直接同步到 HDFS,而同步到 Kafka 之后,我们可以再借助于 Flume(或者手动编写代码)同步到 HDFS。

以上就实现了 MySQL 业务数据到 HDFS 的实时同步,进而支持实时的数据分析。那么下面我们就来介绍一下 Maxwell。


什么是 Maxwell

Maxwell 是由美国 Zendesk 开源,使用 Java 编写的 MySQL 实时抓取软件,会实时读取 MySQL  binlog 并生成 JSON 格式的消息,然后作为生产者发送给 Kafka, Kinesis, RabbitMQ, Redis, Google Cloud Pub/Sub, 文件系统或其它平台的应用程序。

下面我们来下载 Maxwell,首先进入官网,页面中会出现一个 Download,点击下面的 URL 即可下载,但这下载的是最新版本。而我们需要的是 1.29.2,为什么需要这个版本呢?因为 1.29.2 之后的版本不支持 JDK8,我们点击导航栏的 Changelog 可以查看所有的历史版本,点击 v1.29.2 进行下载即可。

官网:https:///

下载完之后,先别急着安装,我们先了解一下 Maxwell 相关的工作原理。

MySQL 主从复制过程

简单来讲就是 MySQL 在新增、修改、删除一条记录时,会将逻辑写入到二进制文件(binlog)中。

主从同步的时候,从库向 MySQL 主库发送 dump 协议,主库会开启一个 dump thread 将 binlog 发送给从库,从库开启 I/O thread 负责接收,接收之后将 binlog 拷贝到 relay log(中继日志)。然后从库再开启 SQL thread 读取 relay log 中的事件,将数据变更同步过来,也就是所谓的 binlog 回放。

所以 Maxwell 的工作原理应该不难猜了,就是把自己伪装成 MySQL 的一个 slave,然后以 slave 的身份从 master 复制 binlog。

MySQL 的 binlog

既然聊到了 binlog,那么我们需要简单地介绍一下。binlog(二进制日志)可以说是 MySQL 中最重要的日志,它是 Server 层产生的,不管你用的什么存储引擎,都会有 binlog。

它记录了所有的 DDL 和 DML(除了数据查询)语句,以事件形式记录,还包含执行的消耗时间。并且 MySQL 的二进制日志是事务安全的,因为事务在 binlog 中也可以被还原出来。

一般来说开启 binlog 大概会有 1% 的性能损耗,但有两个最重要的使用场景,导致我们必须开启:

  • MySQL 在主从同步时采用的就是 binlog,通过 binlog 达到 master-salve 数据一致;

  • 数据恢复,通过使用 mysqlbinlog 工具来恢复数据;

binlog 文件包含两类文件:二进制索引文件(后缀名为 .index)和二进制日志文件(后缀名为 .00000*)。

  • 索引文件用于记录相关二进制日志文件的路径;

  • 日志文件用于记录所有的 DDL 和 DML(除查询操作)语句,我们后面会实际查看一下这两个文件。

然后再来看看 binlog 如何开启,注意:MySQL 的安装这里就不说了,我安装的是 MySQL 8,打开 /etc/my.cnf:

如果 /etc 下不存在 my.cnf,可以通过 locate my.cnf 查找其位置。

以上就是 my.cnf 的全部内容,如果我们想开启 binlog,那么只需要在 mysqld 区域下面添加 log-bin=mysql-bin 即可。这个表示我们要开启 binlog,并且 binlog 日志的前缀叫 mysql-bin,当然你也可以起别的名字,但最好见名知意。

然后第一次生成的 binlog 文件就叫 mysql-bin.000001,当 MySQL 重启或者单个文件的大小达到阈值时(默认是 1073741824 字节,即 1 G,可以通过 max_binlog_size 配置),会按照顺序编号生成新的文件(即 mysql-bin.000002)。

然后 MySQL binlog 记录的内容的格式也有三种,分别是 row, statement, mixed,可以通过配置 binlog_format 进行设置,这几种格式的区别如下:

row

设置 binlog_format=row,那么 binlog 会记录每次操作后每一行记录的变化,它的优点是保持数据的绝对一致性,因为不管是什么 SQL,引用什么函数,它只记录执行后的结果。

但缺点也很明显,采用 row 模式,binlog 会占用很大空间。假设我们执行了一个 update 语句,修改了 100 万行记录,那么 binlog 就会记录这 100 万行变化之后的结果,很明显这会导致 binlog 文件的膨胀。

statement

设置 binlog_format=statement,那么 binlog 会记录每次操作的 SQL 语句,假设还是执行 update,但不管这个 update 修改了多少行的记录,binlog 只会记录这对应的一条 update 语句。

因此它的好处就是节省空间,而缺点则更加致命,因为它可能导致数据不一致。假设我们使用了 now(), uuid(), rand() 等函数,那么 binlog 记录的也只是这几个函数,但很明显从库在进行 binlog 回放时生成的结果就会和主库不一致。

mixed

混合级别,statement 的升级版,一定程度上解决了 statement 模式因为一些特殊情况而造成的数据不一致问题。它的默认行为还是 statement,但在某些特殊情况下,比如出现了重复执行生成的结果不一致的函数(uuid, rand, now 等)时、包含 AUTO_INCREMENT 字段的表被更新时、执行 INSERT DELAYED 语句时、使用 UDF 时,会按照 row 模式进行处理,也就是保存执行后的数据。


所以 mixed 相当于 row 和 statement 的结合版,特点是节省空间的同时还兼顾了一定的一致性,当然在极端情况下还是可能会造成数据不一致。但是 statement 和 mixed 对于需要 binlog 监控的情况是不方便的,因为这两种模式的 binlog 里面会包含 SQL 语句,而 Maxwell 将 SQL 语句传给 Kafka 没有任何意义,不可能说在 Kafka 端执行 SQL 语句还原数据。

因此综上所述,Maxwell 想做监控分析,还是要选择 row 模式比较合适。因为 Maxwell 毕竟不是真正意义上的 MySQL 从库,它只能拿到 binlog 日志,但无法对里面的 SQL 语句进行回放,所以如果想让 Maxwell 进行监控,那么 binlog 里面记录就应该是纯数据才可以。

Maxwell 和 Canal 的对比

Canal 是阿里开源的一款专门用于订阅 MySQL binlog 的组件,那么 Maxwell 和 binlog 之间有什么区别呢?

  • Canal 和 Maxwell 都是基于 Java 语言编写;

  • 数据格式的话 Canal 支持自定制,Maxwell 只支持 JSON,但 JSON 通用性非常广,所以只支持 JSON 也还可以接受;

  • 采集数据模式,Canal 只支持增量采集,比如 Canal 在订阅 binlog 的时候数据表已经有 100 条数据了,那么这 100 条 Canal 是无法同步的,它只能同步订阅之后新产生的数据。而 Maxwell 有一个初始化的功能,所以它既支持增量采集、又支持全量采集;

  • 数据落地的话,Canal 支持定制,但是比较麻烦。而 Maxwell 虽然不支持定制,但是它已经将一些主流的平台都支持了,所以相对要更简便一些;

  • 最后是 HA(高可用),两者都是支持的。

所以相比 Canal,这里更建议使用 Maxwell,因为使用起来更简单。


安装 Maxwell

我们将刚才下载的 Maxwell 安装包上传到服务器,然后解压到 /opt 目录下,这里我的操作系统是 CentOS 7。

结构还是很简单的,bin 目录里面是一些启动脚本;而 config.properties.example 就是配置文件,当然这是一个模板,我们拷贝一份并将结尾的 .example 给去掉即可;lib 目录里面就是一些依赖的 jar 包;

我们说 Maxwell 采用 Java 编写,所以我们还要安装 JDK。另外 Maxwell 是用来订阅 MySQL 的 binlog,所以我们还要安装 MySQL,开启 binlog 并将 binlog 格式设置为 row。

[mysqld]
# 开启 binlog 可能意味着主从复制
# 因此最好每个节点要有一个 id
# 当然单机的话没有也无所谓
server_id=1  
# 二进制日志文件名
log-bin=mysql-bin 
# binlog 格式
binlog_format=row  
# 默认 MySQL 会对所有库开启 binlog 监控
# 但很多时候我们不需要监控所有库
# 因此可以通过 binlog-do-db 监控指定的库
# 如果想监控多个库,那么就指定多个 binlog-do-db 即可
binlog-do-db=little_cloud  

整个安装过程比较简单,细节我就不演示了,记得修改完配置文件之后重启 MySQL。

整个过程是没有问题的,可以查看一下生成的 binlog 文件,虽然这个时候我们还没有创建 little_cloud 这个数据库,但是重启之后 binlog 文件已经生成了,位于 /var/lib/mysql 目录下。

对于 MySQL 生成的 binlog 二进制日志文件,其初始是不为空的(有一部分固定内容),然后前缀由 my.cnf 中的 log-bin 参数指定。除了 binlog 二进制日志文件之外,MySQL 还会产生一个二进制索引文件来记录使用的 binlog 二进制文件的路径。

如果没有特殊说明,那么为了描述方便,我们会用 binlog 文件默认代指 binlog 二进制日志文件。

然后创建数据库 little_cloud,因为我们只对这个库进行 binlog 监控,执行 create database little_cloud,执行之后查看 binlog 文件。

虽然这是一个二进制文件,但仔细看的话还是能看到期望的信息的。然后我们在 little_cloud 这个库下面创建一张表叫 girl,写入一些数据。

use little_cloud;
create table girl 
(
    id int primary key auto_increment, 
    name varchar(200)
);
insert into girl(name
values ('satori'), ('koishi');

然后再查看 binlog 文件:

我们说过当 binlog 写满了(大小由 max_binlog_size 配置,默认是 1G)或者 MySQL 重启了,那么 binlog 会发生滚动。这里重启 MySQL,然后再写入一条数据:

use little_cloud;
insert into girl(name
values ('scarlet');

此时会生成 mysql-bin.000002,查看一下里面的内容:

我们看到重启之后再写入数据时,日志会同步到 mysql-bin.000002 中。然后 mysql-bin.index 记录了使用的日志文件的路径,此时有两条记录,最后一条是正在使用的日志文件。

初始化 Maxwell 元数据库

在 MySQL 中建立一个数据库用于存储 Maxwell 元数据,因为 Maxwell 也是有很多元数据需要存储的:

-- 就叫 maxwell_metadata
create database maxwell_metadata;  

-- 还要给 Maxwell 准备一个用户
-- 用户名:maxwell,密码:123456
create user 'maxwell'@'%' identified 
with mysql_native_password by '123456';

-- 赋予其操作权限
-- 对 maxwell_metadata 数据库具备所有权限
-- 对其它数据库只有读权限、复制权限
grant all privileges on maxwell_metadata.* 
to 'maxwell'@'%' with grant option;

grant selectreplication slavereplication client
on *.* to 'maxwell'@'%' with grant option;

-- 刷新权限
flush privileges;

如果 MySQL 告诉你密码不符合要求,那么可以通过如下命令设置:

  • set global validate_password_length=4;

  • set global validate_password_policy=0;

将最小长度设置为 4,安全级别设置为 0。

Maxwell 启动方式

上面我们算是完成所有的准备工作,下面我们来看如何启动 Maxwell,而启动也有两种方式,分别是命令参数启动和指定配置文件启动。

命令行参数启动:

# 需要先配置环境变量,否则需要指定完整路径
maxwell --user='maxwell' --password='123456' \
        --host='localhost' --port=3306 \
        --schema_database=maxwell_metadata \
        --producer=stdout

比较简单,其中的 schema_database 指的是存储元数据的库,这里指定为刚才创建的 maxwell_metadata,一旦执行会自动在库里面创建相应的表。

而 producer 表示将数据发送给谁,stdout 是控制台,当然还可以选择 kafka, rabbitMQ,不过此时需要指定相应的参数(比如 kafka, rabbitMQ 的 IP、发送到哪一个 Topic 等)。因此将 producer 指定为 stdout 一般用于测试,在生产上我们更倾向于使用配置文件启动。

指定配置文件启动:

将 config.properties.example 拷贝一份,起名为 config.properties,然后修改里面的配置,启动时通过 --config 指定。config.properties 里面有大量的配置,基本都是见名知意,而且还有很多注释,所以这里不做过多解释了。

maxwell --config $MAXWELL_HOME/config.properties

Maxwell 案例演示

知道了如何启动 Maxwell,那么下面就来实际操作一下,先让 Maxwell 监控数据并输出到控制台,看看生成的 JSON 长什么样,然后再来了解一下如何将监控的数据对接到 kafka 中,那么下面就开始吧。

监控 MySQL 数据并输出到控制台

以命令行模式启动 Maxwell:

此时的 Maxwell 就启动成功了,然后我们再启动一个终端连接 MySQL,创建一张表、往表里面写入一些数据:

我们看到总共产生了 6 行变化,那么理论上 Maxwell 会打印 6 条 JSON。另外创建表虽然也会写入 binlog,但对于 Maxwell 来说,因为没有涉及到数据的变化,所以虽然会告诉我们创建了表,但是不会生成 JSON。我们来看一下 Maxwell 控制台生成了什么数据吧。

打印了 6 条 JSON,我们先看第 1 条,里面的 database 指的就是操作的数据库,table 表示操作的表,type 表示执行的哪一种操作,ts 表示执行操作时的时间戳,xid 表示事务 ID,commit 表示事务是否正确提交,data 显然就是我们插入的数据。

再看第 2 条和第 3 条,因为我们一次性插入了两条数据,所以会打印两条 JSON,显然这两条数据应该在同一个事务中,因此它们的 xid 是一样的。而 xoffset 表示的就是数据进行操作所做的偏移,或者你理解为当出现了 xoffset 就表示等待提交,当操作完最后一条数据之后,就真正提交了。

最后的 update 也是类似的,只不过 update 除了 data 之外,还包含了一个 old,表示修改前的数据。

再来看看 delete 语句:

delete from students 
where id > 1;

相信你已经猜到 Maxwell 输出的 JSON 了,我们来看一下:

此时打印的 JSON 中的 type 为 delete,data 就是删除的行。

监控 MySQL 的数据并输出到 kafka

下面我们再来看看如何将变更后的数据输出到 kafka 中,不过既然要输出到 kafka,那么首先要安装 kafka。但由于 kafka 不是我们的重点,所以这里就不介绍如何安装了,因为 kafka 现在应用的非常广泛,所以这里默认你有 kafka 的基础,如果没有的话强烈建议私下了解一下,kafka 在工作中使用是非常广泛的。

当输出到 kafka 中,那么我们采用配置文件的方式启动会更方便一些,首先 Ctrl + C 终止 Maxwell 进程,然后打开 config.properties 修改如下内容:

# MySQL 的配置
host=localhost
port=3306
user=maxwell
password=123456
schema_database=maxwell_metadata

# 输出到 kafka 中,然后指定地址,这两个是默认值
# 所以 Maxwell 默认就是输出到 kafka 中的
# 如果你的 Maxwell 和 kafka 不在一个节点上
# 那么这里就需要将 localhost 改成 kafka 所在的主机的地址
# 这里我为了方便,就都使用同一个节点
producer=kafka  
kafka.bootstrap.servers=localhost:9092

# 也是默认值,不过是被注释掉了,我们将其打开,或者重新添加一行也可以
# 表示输出到 maxwell 这个主题中,我们也可以改成别的名字
# 主题可以不存在,会自动创建
kafka_topic=maxwell

我们指定配置文件启动:

显示启动成功了,然后我们再来启动 kafka 的一个消费者,监听 maxwell 这个 topic。

那么最后我们来往表里面写入几条数据,看看 kafka 的消费者控制台能否打印出消息。

insert into students(name, age) 
values('mashiro'17);
insert into students(name, age)
values('nanami'18), ('sorata'17);

执行 SQL 语句,查看消费者控制台:

我们看到确实发送到了 kafka 中,至于修改、删除都是类似的。然后我们关注一下分区的问题,这里写入了几个分区呢?我们查看一下:

我们看到只写入了一个分区,但是在实际生产中我们为了提高并发度,往往会写入多个分区。不仅如此,实现多分区也能更好地实现隔离,因为 Maxwell 可以监控多个库、每个库也可以有多个表,那么对于我们当前而言,所有库中的所有表的变化都会写入主题 maxwell 的 0 号分区。显然这是不方便的,会造成混乱,因此如何控制 Maxwell 将数据写入多个分区就变得至关重要。

那么如何写入多分区呢?首先我们需要手动通过 kafka-topics 创建一个具有多分区的主题,这一步必须由我们手动来做,Maxwell 创建的主题默认只有 1 个分区。然后打开配置文件,里面有一个参数叫 producer_partition_by,它有多个可选值,最常用的是 database 和 table。如果指定为 database,那么不同的数据库下表的变化就会写入不同的分区,如果指定为 table,那么不同表的变化会写入不同的分区。

监控 MySQL 的指定表数据

在生产上,可能对 MySQL 的多个库都开启了 binlog,但我们只需要对某一个库下面的表、或者只对某张表进行监听,那么 Maxwell 可不可以进行过滤呢?答案是可以的,举个例子:

maxwell --user='maxwell' --password='123456' \
        --host='localhost' --port=3306 \
        --schema_database=maxwell_metadata \
        --producer=stdout \
        --filter='exclude: *.*, include: little_cloud.students'

我们看到命令行参数中多了一个 --filter,里面的 exclude 表示排除哪些表,include 表示包含哪些表。先使用 exclude 排除所有,然后再使用 include 包含指定的表。当然 include 和 exclude 都可以指定多个,之间用逗号分割,比如:

--filter='exclude: *.*, include: foo.*, include: bar.baz'

如果是配置文件的话,那么就修改 filter 选项即可。

监控 MySQL 指定表的全量数据

Maxwell 默认只能监控新增的数据,如果监控的表里面已经存在了部分数据,那么是否可以把这些已存在的数据也同步过去呢?答案是可以的,Maxwell 是支持数据初始化的,可以通过修改 Maxwell 的元数据来对 MySQL 的某张表进行数据初始化,也就是我们常说的全量同步。

具体做法也很简单,首先我们说 Maxwell 会有自身的元数据,这些元数据我们存放在 maxwell_metadata 库中,在这个库下面有一张表 bootstrap,我们只需要将全量同步的表的表名和所在的库名插入进去即可。举个栗子:我们将 little_cloud 下面的 students 表做一个全量同步的话,那么可以这么做:

-- 先停止 Maxwell 进程
insert into maxwell_metadata.bootstrap(database_name, table_name)
values ('little_cloud''students');

该表里面还有很多其它字段我们不需要关心,当启动 Maxwell 的时候就会自动同步,同步完之后,bootstrap 表中的is_complete字段由0变成1表示同步完成,started_at 和 completed_at 从 NULL 变成具体的时间。那么下面重启 Maxwell 进程:

我们看到数据就同步完成了,需要注意的是里面的 type 字段,前面会多一个 bootstrap- 前缀,表示这是全量同步生成的。而一旦同步完成,那么表中元数据就会修改,下一次就不会再同步了,除非你再插入一条记录。另外,关于元数据不止 bootstrap 这一张表,还有很多其它的表,可以点进去看一看,看看 Maxwell 都记录了哪些东西,甚至你也可以模仿 Maxwell 实现一个属于自己的 binlog 解析器。

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多