CDC 实现数据同步,增量更新
在Sqlserver2008上利用CDC实现了数据更新的跟踪,比以往的利用时间戳,触发器实现更加方便快捷.
参考资料:
http://www./tip.asp?tip=1755
http://blog.csdn.net/ldslove/archive/2010/05/20/5612248.aspx
实现步骤如下:
1.配置cdc -- 开启cdc USE db1 GO EXEC sys.sp_cdc_enable_db
--验证 --0 :未开启cdc 1:开启cdc SELECT is_cdc_enabled FROM sys.databases WHERE database_id=DB_ID()
--表开启cdc USE db1; GO EXEC sys.sp_cdc_enable_table @source_schema ='dbo' ,@source_name='t_cdc_ta' ,@role_name=null ,@capture_instance=NULL ,@supports_net_changes=1 ,@index_name=null ,@captured_column_list=null ,@filegroup_name=default ,@allow_partition_switch=1
/* 开启之后会生成cdc构架,并生成查询函数和变更数据表 cdc.captured_columns cdc.change_tables cdc.ddl_history cdc.index_columns cdc.lsn_time_mapping dbo.systranschemas cdc.dbo_t_cdc_ta_CT 以构架名和表名组合的变更数据表 */
--表结构 CREATE TABLE [t_cdc_ta] ( [id] [int] IDENTITY(1,1) PRIMARY KEY NOT NULL, [name] [varchar](20) NULL, [addr] [varchar](20) NULL, [ttime] [datetime] NULL )
2.跟踪变更数据
当往源表t_cdc_ta中新增,插入,删除数据时,可以在变更数据表[cdc].[dbo_t_cdc_ta_CT]中看到如下数据
__$operation:
1-删除 ,2-新增,4-更新
3.根据变更数据,利用ETL可以实现数据的增量更新
脚本如下: USE [db1] GO
CREATE TABLE [dbo].[cdc_capture_log]( [cdc_capture_log_id] [int] IDENTITY(1,1) NOT NULL, [capture_instance] [nvarchar](50) NOT NULL, [start_time] [datetime] NOT NULL, [min_lsn] [binary](10) NOT NULL, [max_lsn] [binary](10) NOT NULL, [end_time] [datetime] NULL, [status_code] [int] NOT NULL DEFAULT(0) )
CREATE PROCEDURE [dbo].[usp_init_cdc_capture_log] @capture_instance NVARCHAR(50) AS BEGIN SET nocount ON ; DECLARE @start_lsn BINARY(10), @end_lsn BINARY(10), @prev_max_lsn BINARY(10) --get the max LSN for the capture instance from --the last extract SELECT @prev_max_lsn = MAX(max_lsn) FROM dbo.cdc_capture_log WHERE capture_instance = @capture_instance -- if no row found in cdc_capture_log get the min lsn -- for the capture instance IF @prev_max_lsn IS NULL SET @start_lsn = sys.fn_cdc_get_min_lsn(@capture_instance) ELSE SET @start_lsn = sys.fn_cdc_increment_lsn(@prev_max_lsn) -- get the max lsn SET @end_lsn = sys.fn_cdc_get_max_lsn() IF @start_lsn>=@end_lsn SET @start_lsn=@end_lsn INSERT INTO dbo.cdc_capture_log ( capture_instance, start_time, min_lsn, max_lsn ) VALUES ( @capture_instance, GETDATE(), @start_lsn, @end_lsn ) SELECT CAST(SCOPE_IDENTITY() AS INT) cdc_capture_log_id END
GO
create procedure [dbo].[usp_extract_userm_capture_log] @cdc_capture_log_id INT AS BEGIN set nocount on; DECLARE @start_lsn binary(10),@end_lsn binary(10)-- get the lsn range to process SELECT @start_lsn = min_lsn,@end_lsn = max_lsn from dbo.cdc_capture_log where cdc_capture_log_id = @cdc_capture_log_id
-- extract and return the changes select m.tran_end_time modified_ts, x.* from cdc.fn_cdc_get_all_changes_dbo_t_cdc_ta(@start_lsn, @end_lsn, 'all') x join cdc.lsn_time_mapping m on m.start_lsn = x.__$start_lsn ; end
GO
CREATE PROCEDURE [dbo].[usp_end_cdc_capture_log] @cdc_capture_log_id INT AS BEGIN SET nocount ON ; UPDATE dbo.cdc_capture_log SET end_time = GETDATE(), status_code = 1 WHERE cdc_capture_log_id = @cdc_capture_log_id END
GO
--在另一个库上建一个相同的结构的表作为同步数据测试用表
USE montior GO CREATE TABLE [dbo].[t_cdc_ta]( [id] [int] PRIMARY KEY NOT NULL, [name] [varchar](20) NULL, [addr] [varchar](20) NULL, [ttime] [datetime] NULL )
GO
CREATE PROC [dbo].[p_merge] @oper INT, @id INT, @name VARCHAR(20), @addr VARCHAR(20), @ttime DATETIME AS -- 删除 IF @oper=1 BEGIN DELETE FROM dbo.t_cdc_ta WHERE id=@id END ELSE IF @oper=2 -- 新增 BEGIN INSERT INTO dbo.t_cdc_ta(id,NAME,addr,ttime) VALUES(@id,@name,@addr,@ttime) END ELSE IF @oper=4 -- 更新 BEGIN UPDATE dbo.t_cdc_ta SET NAME=@name,addr=@addr,ttime=@ttime WHERE id=@id END GO
停用cdc EXEC sp_cdc_disable_table EXEC sp_cdc_disable_db
这样能实现一个定时的同步更新,利用作业来不断的读取新增加的lsn来更新目的数据表,当然同步的时间一定要大于数据变更的清理作业的时间,
默认配置cdc的时候会配置两个job
cdc.db1_capture :捕获变更的作业
cdc.db1_cleanup : 数据清理作业 ,每天凌晨两天清理
之前看到一个哥们在同步数据的时候用的 SSIS的条件拆分组件,我测试了下这个数据变更是有先后顺序的,不能直接拆分数据集直接执行,
这里我时显得方式是利用循环组件一条一条数据处理,希望能有更好的办法。。。
|