分享

大数据量增量同步方案杂谈【面试+工作】

 Java帮帮 2020-01-02


 大数据量增量同步方案杂谈【面试+工作】

阿里大数据量增量同步

目前做的项目使用阿里 DataX 作为不同数据源数据同步的实现工具。数据的批量一次性导入比较简单,对于增量数据需要对不同场景设计不同的方案。

会变的数据增量同步

每天全量同步

如人员表、订单表一类的会发生变化的数据,根据数据仓库的4个特点里的反映历史变化的这个特点的要求,我们建议每天对数据进行全量同步。也就是说每天保存的都是数据的全量数据,这样历史的数据和当前的数据都可以很方便地获得。

设定日分区,每天同步全量数据。

--全量同步

create table ods_user_full(

    uid bigint,

    uname string,

    deptno bigint,

    gender string,

    optime DATETIME 

) partitioned by (ds string);

查询全量用 where 分区 语句 如 where ds = "2017-10-19"

每天增量同步

真实场景中因为某些特殊情况,需要每天只做增量同步。又因为目前流行的大数据平台都不支持 Update 语句进行修改数据,只能用其他方法来实现。

两个表,结果表和增量表,用 full outer join 合并 + insert overwrite(阿里巴巴大数据实践中阿里平台使用方案)

操作如下

--结果表create table dw_user_inc(

    uid bigint,

    uname string,

    deptno bigint,

    gender string,

    optime DATETIME 

);--增量记录表create table ods_user_inc(

    uid bigint,

    uname string,

    deptno bigint,

    gender string,

    optime DATETIME 

)

insert overwrite table dw_user_inc 

select 

--所有select操作,如果ODS表有数据,说明发生了变动,以ODS表为准

case when b.uid is not null then b.uid else a.uid end as uid,

case when b.uid is not null then b.uname else a.uname end as uname,

case when b.uid is not null then b.deptno else a.deptno end as deptno,

case when b.uid is not null then b.gender else a.gender end as gender,

case when b.uid is not null then b.optime else a.optime end as optime

from 

dw_user_inc a 

full outer join ods_user_inc b

on a.uid  = b.uid ;

对比以上两种同步方式,可以很清楚看到两种同步方法的区别和优劣。第二种方法的优点是同步的增量数据量比较小,但是带来的缺点有可能有数据不一致的风险,而且还需要用额外的计算进行数据合并。如无必要,会变化的数据就使用方法一即可。如无必要,会变化的数据就使用方法一即可。如果对历史数据希望只保留一定的时间,超出时间的做自动删除,可以设置Lifecycle。

不变的数据增量同步

这个场景,由于数据生成后就不会发生变化,因此可以很方便地根据数据的生成规律进行分区,较常见的是根据日期进行分区,比如每天一个分区。 
做法是按日期字段 where 过滤所需日期,增量 insert。

阿里官方增量数据同步

增量数据同步是数据同步业务层面的实现,底层数据的同步分为批量Snapshot同步和变更操作日志同步。

数据集成Job通过数据存储数据API接口完成批量数据同步功能,因此数据集成Job本身不支持获取数据存储系统层面的变化过程。例如数据集成无法通过获取解析RDS BinLog日志数据变化量并同步到目的数据源。

若您有增量业务的需求,不仅可通过系统级别增量变化日志解析实现数据增量同步,也可通过以下方案进行解决。

  • 时间戳方式:它是一种基于快照比较的变化数据捕获方式,在源表上增加一个时间戳字段,系统中更新修改表数据的时候,同时修改时间戳字段的值。

    当进行数据抽取时,通过比较上次抽取时间与时间戳字段的值来决定抽取哪些数据。有的数据库的时间戳支持自动更新,即表的其它字段的数据发生改变时,自动更新时间戳字段的值。有的数据库不支持时间戳的自动更新,这就要求业务系统在更新业务数据时,手工更新时间戳字段。

    在阿里数据库使用实践中,所有的在线业务系统按惯例都默认添加了create_time、modify_time,对于在线业务写入均自动修改上述两个字段,记录修改时间。数据集成在进行数据同步过程中,仅需要在MySQL抽取环节配置where过滤条件做增量抽取。

  • 触发器方式:抽取的表上建立需要的触发器,一般要建立插入、修改、删除三个触发器,每当源表中的数据发生变化,就被相应的触发器将变化的数据写入一个临时表,抽取线程从临时表中抽取数据,临时表中抽取过的数据被标记或删除。对于此类应用方式,数据集成在进行数据同步时仅需指定抽取触发器写入表即可。

  • 表方式:在业务系统中添加系统日志表,当业务数据发生变化时,更新维护日志表内容,当作ETL加载时,通过读日志表数据决定加载那些数据及如何加载。对于此类应用方式,数据集成同样使用Where过滤条件做增量抽取。

  • 日志表方式:在业务系统中添加系统日志表,当业务数据发生变化时,更新维护日志表内容,当作ETL加载时,通过读日志表数据决定加载那些数据及如何加载。对于此类应用方式,数据集成同样使用Where过滤条件做增量抽取。

参考

      阿里帮助文档

       https://help.aliyun.com/document_detail/28243.html

  1. 阿里大数据平台文档

  2. https://helpcdn.aliyun.com/document_detail/50346.html?spm=5176.doc54070.6.577.8FjZc7

  3. 大数据之路-阿里巴巴大数据实践pdf书籍

  4. https://pan.baidu.com/s/1BC60mMniV7ZR_m1Y2xqhxQ


两个oracle数据库之间实现数据增量同步

在两个数据库中实现数据增量同步,令数据库之间的数据能够同步更新。

Oracle数据库IP:192.168.0.1(源库)、192.168.0.2(目标库)

1.在源库创建测试表TEST

create table TEST(  ID          NUMBER not null,  NAME        VARCHAR2(200));

2.插入一条数据

INSERT INTO TEST(ID,NAME) VALUES (1,'1111');commit;

3.在源库创建物化视图日志表

create materialized view log on TEST with rowid;

4.在目标库创建一个DBLink链接

create database link DBLINK_TESTconnect TO username identified by "123456"  using '(DESCRIPTION =    (ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.0.1)(PORT = 1521))    (CONNECT_DATA =      (SERVER = DEDICATED)      (SERVICE_NAME = orcl)    )  )' ;

5.在目标库创建针对源库中TEST表的物化视图表MV_TEST

create materialized view MV_TESTRefresh fast  on demand    with rowid    as SELECT * fromTEST@DBLINK_TEST;该表创建的同时,就会把源表中的数据同步过来;

6.手工执行同步

call dbms_mview.refresh('MV_TEST');


两个数据库之间数据同步,案例解释

这个类似电商网站的数据同步

存在2个数据库服务器,A和B(Oracle)

存在2个应用服务器C和D

欧洲部署:C+A

深圳部署:D+B

也就是:

欧洲的同学访问欧洲的服务器C,C连接A数据库

深圳的同学访问深圳的服务器D,D连接B数据库

欧洲的同学改动的是表T1,T2

深圳的同学可以改动所有的数据,包括T1,T2

然后问题来了,这2边的人的访问需要获取一致的数据,如何实现?

方案一:

通过在应用的业务逻辑代码中硬编码数据同步的逻辑,通过JMS,消息代理服务器为ActiveMQ,但是这样做的缺点现在已经体现的非常明显,就是需要针对于每一个实施了增删改的方法进行硬编码,开发工作量非常大

方案二:

建议的方式是使用专门的数据同步中间件,因为跨地域、机房的同步涉及到很多的问题:网络、安全、传输速度、数据冲突等。 

我在做选型的时候看过这样几个: 
1、tungsten-replicator:有点类似于MySQL的主从同步,双向同步可能是不支持的; 
2、SymmetricDS:基于触发器&SQL方式监控变更; 
3、阿里的Otter&Canal:基于BinLog或RedoLog的方式进行数据复制,功能比较强大,但是最好阅读下源代码再用。 

对比了之后还是觉得Otter更好用一些,No.1和.2自己都需要做很多额外的工作,并且Otter的同步效率比较高(大概是MySQL主从同步效率的十几倍)。监控、管理、最小同步单位可以控制到列、对数据冲突处理也有相应的支持(但是这块没有测试过)、异构数据库也是可以的。 但是开源的版本貌似是不支持Oracle的。。。。。 

总体上看有两种思路: 
1、框架思路:即在框架上做文章,应用A使用框架X的时候,X同时负责发送一份数据给目标节点; 
2、数据库同步思路:即框架是不限制的,只是控制数据库层面的复制工作 

如果No.2有成形的产品,还是No.2简单一些,No.1需要处理的工作更复杂,且对应用有一定的侵入,但是好处是适应性比较好(比如可以同步数据库以外的数据,如缓存)

方案三:

如果你不想改动数据的话。 

你可以在数据库表上部署触发器,通过触发器将CUD操作的数据捕获并保存到到临时中间表,每条记录需要有操作属性,Insert  Delete  Update . 

然后通过间隔调度任务读取中间表变化数据 封装为JMS消息,进行传输。 

如果你连触发器和扫描增量表的的任务代码你都不愿意写。 

那我推荐你使用kettle+ActiveMQ 组合方案。 

kettle开源的ETL工具,负责增强变化数据提取,并通过内置的调度任务执行,封装数据为JMS消息。。。交给ActiveMQ 的队列通道进行路由传输


多个系统之间大数据量增量同步解决方案

一、背景介绍

上海立邦TU报销系统,是上海立邦集团针对内部的报销的业务,编写的一套系统,此系统主要特点是和Web、SAP和Notes等系统实现无缝对接,从而完成整体业务的流转,目前立邦已经存在SAP报销系统、Notes系统,情况如下:

  • SAP报销系统:实现上海立邦主要业务的实现,例如:供应商、客户、员工、人员等信息的维护及控制

  • Notes:主要通过邮件实现对业务流转中的具体单据进行审核以及日常的业务管理

  • TU报销系统:主要通过同步SAP报销系统中的供应商、客户、员工、项目等主数据信息,编写报销单,递交到Notes系统进行数据流转,并且递交数据到SAP,从而完成整个业务处理

二、需求说明

在SAP报销系统中的主数据据达到百万级的数据,需要要将其中的供应商、客户、员工、项目等信息同步到本地数据库中:

1) SAP每天每种类型的主数据大约会增加2000条以上的数据量

2) 要求每天同步数据一次,在紧急状态下,可以实现手动同步

3) 5年只能不能因为数据量太大,导致同步业务失败,引起代码重构

三、概要设计

由于数据量太大,决定采用增量同步的方式进行数据同步;为了满足用户需求,同时设计手动同步的模块,实现手动及时同步。

四、详细设计

1. SAP的数据获取

采用PI方式获取SAP数据,通过WebService方式发布数据,由于是采用增量同步,因此有2个必备参数为:开始时间、中止时间,数据获取逻辑如下

  • 在返回数据列表中存在DataStatesMark字段

  • 新增的数据在DataStatesMark中为“I”

  • 编辑的数据在DataStatesMark中为“U”

  • 删除的数据在DataStatesMark中为“D”

2.  数据定时同步的设计

采用WindowsService的方式进行处理,增加定时器,通过设置启动时间,判断当前时间和启动时间的小时一致的话,直接运行程序;程序调用逻辑:

  • Window Service调用Web Service获取SAP中的增量数据

  • Windows Service调用本地同步方法,以获取到的增量数据为参数,同步增量数据到数据库中。

3. 同步增量数据逻辑的设计

  • 在数据库中设置同步数据的临时表,增加DataStatusMark字段,将通过WebService中获取到的数据全部插入到临时表中

  • 编写同步的存储过程,实现临时表到正式表的同步。

4. WindowsService数据同步步骤

1) 调用WebService获取增量的SAP端数据

2) 调用方法,将获取到的增量的SAP端数据插入到临时表中

3) 调用存储过程,实现将临时表中的数据,同步到正式表中。

五、同步存储过程技术说明

1. 传入参数 outmsg输出参数、Error 错误输出参数

2. 循环临时表中的数据判断DataStatusMark字段

3. 如果DataStatusMark为“D”直接实现将Enable字段修改为“D”,代表已经将数据删除

4. 如果DataStatusMark为“I”或者为“U”,根据业务主键字段,在正式表中判断数据是否存在,存在的话,直接插入,否则更新。

5. 增加错误判断机制,如果出错,直接抛出异常给Error参数

6. 执行的每一步,必须实现数据数据,通过OutMsg参数接收。


Sqlserver2008-CDC 实现数据同步,增量更新

在Sqlserver2008上利用CDC实现了数据更新的跟踪,比以往的利用时间戳,触发器实现更加方便快捷.

实现步骤如下:

1.配置cdc

-- 开启cdc
USE db1
GO
EXEC sys.sp_cdc_enable_db

--验证
--0 :未开启cdc 1:开启cdc
SELECT is_cdc_enabled  FROM sys.databases  WHERE database_id=DB_ID()

--表开启cdc
USE db1;
GO
EXEC sys.sp_cdc_enable_table
      @source_schema ='dbo'
      ,@source_name='t_cdc_ta'
      ,@role_name=null
      ,@capture_instance=NULL
      ,@supports_net_changes=1
      ,@index_name=null
      ,@captured_column_list=null
      ,@filegroup_name=default
      ,@allow_partition_switch=1

/*
开启之后会生成cdc构架,并生成查询函数和变更数据表
cdc.captured_columns
cdc.change_tables
cdc.ddl_history
cdc.index_columns
cdc.lsn_time_mapping
dbo.systranschemas
cdc.dbo_t_cdc_ta_CT 以构架名和表名组合的变更数据表
*/

--表结构
CREATE TABLE [t_cdc_ta]
(
   [id] [int] IDENTITY(1,1) PRIMARY KEY NOT NULL,
   [name] [varchar](20) NULL,
   [addr] [varchar](20) NULL,
   [ttime] [datetime] NULL
)

2.跟踪变更数据

当往源表t_cdc_ta中新增,插入,删除数据时,可以在变更数据表[cdc].[dbo_t_cdc_ta_CT]中看到如下数据

__$operation:

1-删除 ,2-新增,4-更新

3.根据变更数据,利用ETL可以实现数据的增量更新

脚本如下:

USE [db1]
GO

CREATE TABLE [dbo].[cdc_capture_log](
   [cdc_capture_log_id] [int] IDENTITY(1,1) NOT NULL,
   [capture_instance] [nvarchar](50) NOT NULL,
   [start_time] [datetime] NOT NULL,
   [min_lsn] [binary](10) NOT NULL,
   [max_lsn] [binary](10) NOT NULL,
   [end_time] [datetime] NULL,
   [status_code] [int] NOT NULL DEFAULT(0)
)



CREATE PROCEDURE [dbo].[usp_init_cdc_capture_log]
   @capture_instance NVARCHAR(50)
AS
   BEGIN
       SET nocount ON ;
       DECLARE @start_lsn BINARY(10),
           @end_lsn BINARY(10),
           @prev_max_lsn BINARY(10)
       --get the max LSN for the capture instance from --the last extract
       SELECT  @prev_max_lsn = MAX(max_lsn)
       FROM    dbo.cdc_capture_log
       WHERE   capture_instance = @capture_instance
        -- if no row found in cdc_capture_log get the min lsn -- for the capture instance
       IF @prev_max_lsn IS NULL
           SET @start_lsn = sys.fn_cdc_get_min_lsn(@capture_instance)
       ELSE
           SET @start_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn)
                   
       -- get the max lsn
       SET @end_lsn = sys.fn_cdc_get_max_lsn()
       
       IF @start_lsn>=@end_lsn            
        SET   @start_lsn=@end_lsn
       
       
       INSERT  INTO dbo.cdc_capture_log
               (
                 capture_instance,
                 start_time,
                 min_lsn,
                 max_lsn
               )
       VALUES  (
                 @capture_instance,
                 GETDATE(),
                 @start_lsn,
                 @end_lsn
               )
       SELECT  CAST(SCOPE_IDENTITY() AS INT) cdc_capture_log_id
   END

GO



create procedure [dbo].[usp_extract_userm_capture_log]
@cdc_capture_log_id INT
AS
BEGIN
set nocount on;
DECLARE @start_lsn binary(10),@end_lsn binary(10)-- get the lsn range to process
SELECT @start_lsn = min_lsn,@end_lsn = max_lsn from dbo.cdc_capture_log
where cdc_capture_log_id = @cdc_capture_log_id

-- extract and return the changes
select  m.tran_end_time modified_ts,
       x.*
from    cdc.fn_cdc_get_all_changes_dbo_t_cdc_ta(@start_lsn, @end_lsn, 'all') x
       join cdc.lsn_time_mapping m on m.start_lsn = x.__$start_lsn ;
end


GO



CREATE PROCEDURE [dbo].[usp_end_cdc_capture_log]
   @cdc_capture_log_id INT
AS
   BEGIN
       SET nocount ON ;
       UPDATE  dbo.cdc_capture_log
       SET     end_time = GETDATE(),
               status_code = 1
       WHERE   cdc_capture_log_id = @cdc_capture_log_id
   END

GO


--在另一个库上建一个相同的结构的表作为同步数据测试用表

USE montior
GO
CREATE TABLE [dbo].[t_cdc_ta](
   [id] [int] PRIMARY KEY  NOT NULL,
   [name] [varchar](20) NULL,
   [addr] [varchar](20) NULL,
   [ttime] [datetime] NULL
)

GO

CREATE  PROC  [dbo].[p_merge]
@oper INT,
@id INT,
@name VARCHAR(20),
@addr VARCHAR(20),
@ttime DATETIME
AS

  -- 删除
  IF @oper=1
  BEGIN
    DELETE FROM  dbo.t_cdc_ta
    WHERE id=@id
  END
  ELSE IF @oper=2  --  新增
  BEGIN
     INSERT INTO  dbo.t_cdc_ta(id,NAME,addr,ttime)
     VALUES(@id,@name,@addr,@ttime)
  END
  ELSE  IF @oper=4   -- 更新
  BEGIN
     UPDATE  dbo.t_cdc_ta
     SET  NAME=@name,addr=@addr,ttime=@ttime
     WHERE id=@id    
  END
 
GO

停用cdc

EXEC sp_cdc_disable_table
EXEC sp_cdc_disable_db

这样能实现一个定时的同步更新,利用作业来不断的读取新增加的lsn来更新目的数据表,当然同步的时间一定要大于数据变更的清理作业的时间,

默认配置cdc的时候会配置两个job

cdc.db1_capture :捕获变更的作业

cdc.db1_cleanup : 数据清理作业 ,每天凌晨两天清理

之前看到一个哥们在同步数据的时候用的 SSIS的条件拆分组件,我测试了下这个数据变更是有先后顺序的,不能直接拆分数据集直接执行,

这里我时显得方式是利用循环组件一条一条数据处理,希望能有更好的办法

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多