分享

使用更改跟踪实现数据同步

 Mike Lee 2010-10-15
使用更改跟踪实现数据同步 收藏
SQL Server 2008 引入了更改跟踪,这是一种轻量型解决方案,它为应用程序提供了一种有效的更改跟踪机制。通常,若要使应用程序能够查询对数据库中的数据所做的更改和访问与这些更改相关的信息,应用程序开发人员必须实现自定义更改跟踪机制。创建这些机制通常涉及多项工作,并且常常涉及使用触发器、timestamp 列和新表组合来存储跟踪信息,同时还会涉及使用自定义清除过程。
通过更改跟踪,可以很容易地编写同步数据的应用,下面是一个使用更改跟踪实现单向数据同步的示例。
1. 建立示例环境
-- ====================================================
-- 测试的数据库
USE master;
GO
CREATE DATABASE DB_test;
GO
-- 启用更改跟踪
ALTER DATABASE DB_test SET
    CHANGE_TRACKING = ON(
           AUTO_CLEANUP = ON,          -- 打开自动清理选项
           CHANGE_RETENTION = 1 HOURS  -- 数据保存期为1 时
    );
 
ALTER DATABASE DB_test SET
    ALLOW_SNAPSHOT_ISOLATION ON;       -- 允许在测试数据库中使用 SNAPSHOT 事务隔离级别
GO
 
-- ====================================================
-- 测试的表
USE DB_test;
GO
-- a. 同步的源表
CREATE TABLE dbo.tb_source(
    pk_id int IDENTITY
       PRIMARY KEY,
    col1 int,
    col2 varchar(10),
    col3 nvarchar(max),
    col4 xml
);
GO
-- 启用更改跟踪
ALTER TABLE dbo.tb_source
    ENABLE CHANGE_TRACKING
       WITH(
           TRACK_COLUMNS_UPDATED = ON  -- 记录UPDATE 的列信息
       );
GO
 
-- b. 同步的目录表
CREATE TABLE dbo.tb_Target(
    pk_id int
       PRIMARY KEY,
    col1 int,
    col2 varchar(10),
    col3 nvarchar(max),
    col4 xml
);
GO
 
-- 记录同步情况的表
CREATE TABLE dbo.tb_Change_Tracking(
    id int IDENTITY
       PRIMARY KEY,
    object_name sysname
       UNIQUE,
    last_sync_version bigint,
    last_update_date datetime
);
GO
 
2. 实现同步处理的存储过程
-- ====================================================
-- 数据同步处理的存储过程
USE DB_test;
GO
-- 数据同步的存储过程- 同步未更新的数据
-- 单次更新,更新完成后退出
CREATE PROC dbo.p_SyncChangeData_tb_Srouce_Target
    @last_sync_version bigint = NULL OUTPUT,
    @min_valid_version bigint = NULL OUTPUT
AS
SET NOCOUNT ON;
-- ========================================
-- TRY...CATCH 中的标准事务处理模块
-- a. 当前的事务数
DECLARE
    @__trancount int;
SELECT
    @__trancount = @@TRANCOUNT;
 
-- TRY...CATCH 处理
BEGIN TRY
    -- ========================================
    -- 源表信息
    DECLARE
       @object_name sysname,
       @object_id int;
    SELECT
       @object_name = N'dbo.tb_source',
        @object_id = OBJECT_ID(@object_name);
 
    -- ========================================
    -- 最后一次同步的版本
    IF @last_sync_version IS NULL
    BEGIN
       SELECT
           @last_sync_version = last_sync_version
       FROM dbo.tb_Change_Tracking
       WHERE object_name = @object_name;
 
       IF @@ROWCOUNT = 0
       BEGIN
           SET @last_sync_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);
           INSERT dbo.tb_Change_Tracking(
              object_name, last_sync_version)
           VALUES(
              @object_name, @last_sync_version);
       END;
    END;
 
    -- ========================================
    -- TRY...CATCH 中的标准事务处理模块
    -- b. 开启事务, 或者设置事务保存点
    SET TRANSACTION ISOLATION LEVEL SNAPSHOT; -- 使用快照隔离级别的事务
    IF @__trancount = 0
       BEGIN TRAN;
    ELSE
       SAVE TRAN __TRAN_SavePoint;
 
    -- ========================================
    -- 版本验证
    -- a. 验证是否有数据变更(如果上次同步的版本号= 当前数据库的最大版本号,则视为无数据变化)
    IF @last_sync_version = CHANGE_TRACKING_CURRENT_VERSION()
       GOTO lb_Return;
 
    -- b. 验证同步的版本号是否有效(如果上次同步的版本号< 当前可用的最小版本号,则视为无效)
    IF @last_sync_version < CHANGE_TRACKING_MIN_VALID_VERSION(@object_id)
    BEGIN
       SET @min_valid_version = CHANGE_TRACKING_MIN_VALID_VERSION(@object_id);     
       GOTO lb_Return;
    END;
 
    -- c. 验证同步的版本号是否有效(如果上次同步的版本号> 当前数据库的最大版本号,则视为无效)
    IF @last_sync_version > CHANGE_TRACKING_CURRENT_VERSION()
    BEGIN
       SET @last_sync_version = NULL;
       GOTO lb_Return;
    END;
 
    -- ========================================
    -- 同步数据
    -- a. 插入
    WITH
    CHG AS(
       SELECT
           DATA.*
       FROM dbo.tb_source DATA
           INNER JOIN CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG
              ON CHG.pk_id = DATA.pk_id
       WHERE CHG.SYS_CHANGE_OPERATION = N'I'
    )
    INSERT dbo.tb_Target
    SELECT * FROM CHG;
 
    -- b. 删除
    WITH
    CHG AS(
       SELECT
           CHG.*
       FROM CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG
       WHERE CHG.SYS_CHANGE_OPERATION = N'D'
    )
    DELETE DATA
    FROM dbo.tb_Target DATA
       INNER JOIN CHG
           ON CHG.pk_id = DATA.pk_id;
 
    -- c. 更新
    WITH
    COL AS(
       SELECT
           *
       FROM(
           SELECT
              name, column_id
           FROM sys.columns
           WHERE object_id = @object_id
       )DATA
           PIVOT(
              MAX(column_id)
              FOR name IN(  -- 源表的所有列的列表
                  [col1], [col2], [col3], [col4])
           )P               
    ),
    CHG AS(
       SELECT
           DATA.*,
           __SYS_CHANGE_COLUMNS = CHG.SYS_CHANGE_COLUMNS
       FROM dbo.tb_source DATA
           INNER JOIN CHANGETABLE(CHANGES dbo.tb_source, @last_sync_version) CHG
              ON CHG.pk_id = DATA.pk_id
       WHERE CHG.SYS_CHANGE_OPERATION = N'U'
    )
    UPDATE DATA SET             -- 判断列是否需要更新(也可以不判断,直接更新所有的列)
       [col1] = CASE
                  WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col1], CHG.__SYS_CHANGE_COLUMNS) = 1
                     THEN CHG.[col1]
                  ELSE DATA.[col1]
              END,
       [col2] = CASE
                  WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col2], CHG.__SYS_CHANGE_COLUMNS) = 1
                     THEN CHG.[col2]
                  ELSE DATA.[col2]
              END,
       [col3] = CASE
                  WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col3], CHG.__SYS_CHANGE_COLUMNS) = 1
                     THEN CHG.[col3]
                  ELSE DATA.[col3]
              END,
       [col4] = CASE
                  WHEN CHG.__SYS_CHANGE_COLUMNS IS NULL OR CHANGE_TRACKING_IS_COLUMN_IN_MASK(COL.[col4], CHG.__SYS_CHANGE_COLUMNS) = 1
                     THEN CHG.[col4]
                  ELSE DATA.[col4]
              END
    FROM dbo.tb_Target DATA
       INNER JOIN CHG
           ON CHG.pk_id = DATA.pk_id
       CROSS JOIN COL;
   
    -- ========================================
    -- 更新最后一次同步的版本号
    UPDATE dbo.tb_Change_Tracking SET
       @last_sync_version = CHANGE_TRACKING_CURRENT_VERSION(),
 
       last_sync_version = @last_sync_version,
       last_update_date = GETDATE();
 
    -- ========================================
    -- TRY...CATCH 中的标准事务处理模块
    -- c. 提交事务
    --    有可提交的事务, 并且事务是在当前模块中开启的情况下, 才提交事务
    IF XACT_STATE() = 1 AND @__trancount = 0
       COMMIT;
 
lb_Return:
    -- ========================================
    -- TRY...CATCH 中的标准事务处理模块
    -- d. 为了防止TRY 块中有遗漏的事务处理, 在TRY 模块的结束部分做最终的事务处理
    IF @__trancount = 0
    BEGIN
       IF XACT_STATE() = -1
           ROLLBACK TRAN;
       ELSE
       BEGIN     
           WHILE @@TRANCOUNT > 0
              COMMIT TRAN;
       END;
    END;
END TRY
BEGIN CATCH
    -- ========================================
    -- TRY...CATCH 中的标准事务处理模块
    -- e. CATCH 块的事务回滚
    IF XACT_STATE() <> 0
    BEGIN
       IF @__trancount = 0
           ROLLBACK TRAN;
       -- XACT_STATE 为-1 时, 不能回滚到事务保存点, 这种情况留给外层调用者做统一的事务回滚
       -- 通过@@TRANCOUNT > @__trancount 的判断, 即使在TRY 模块中没有设置事务保存点的情况下跳到此步骤, 也不会出错
       ELSE IF XACT_STATE() = 1 AND @@TRANCOUNT > @__trancount
           ROLLBACK TRAN __TRAN_SavePoint;
    END;
 
    -- ========================================
    -- 错误消息处理
    -- a. 获取错误信息
    --    这提提取了错误相关的全部信息, 可以根据实际需要调整
    DECLARE
       @__error_number int,
       @__error_message nvarchar(2048),
       @__error_severity int,
       @__error_state int,
       @__error_line int,
       @__error_procedure nvarchar(126),
       @__user_name nvarchar(128),
       @__host_name nvarchar(128);
 
    SELECT
       @__error_number = ERROR_NUMBER(),
       @__error_message = ERROR_MESSAGE(),
       @__error_severity = ERROR_SEVERITY(),
       @__error_state = ERROR_STATE(),
       @__error_line = ERROR_LINE(),
       @__error_procedure = ERROR_PROCEDURE(),
       @__user_name = SUSER_SNAME(),
       @__host_name = HOST_NAME();
 
    -- c. 如果没有打算在CATCH 模块中对错误进行处理, 则应该抛出错误给调用者
    RAISERROR(
       N'User: %s, Host: %s, Procedure: %s, Error %d, Level %d, State %d, Line %d, Message: %s ',
       @__error_severity,
       1,
       @__user_name,
       @__host_name,
       @__error_procedure,
       @__error_number,
       @__error_severity,
       @__error_state,
       @__error_line,
       @__error_message);
END CATCH;
GO
 
-- 数据同步的存储过程- 同步未更新的数据
-- 循环更新,直到手工结束
-- 可设置一个在SQL Agent服务启动时工作的JOB 来调用这个存储过程,这样就可以实现自动同步
CREATE PROC dbo.p_SyncChangeData_Circle_tb_Srouce_Target
    @interval int = 1 -- 循环检测之间相隔的秒数(指定负数或NULL值会调整为)
AS
SET NOCOUNT ON;
 
IF @interval IS NULL OR @interval < 0
    SET @interval = 1;
 
DECLARE
    @last_sync_version bigint,
    @min_valid_version bigint,
    @date_delay datetime;
 
WHILE 1 = 1
BEGIN
    EXEC dbo.p_SyncChangeData_tb_Srouce_Target
       @last_sync_version = @last_sync_version OUTPUT,
       @min_valid_version = @min_valid_version OUTPUT;
 
    IF @min_valid_version IS NOT NULL
    BEGIN
       RAISERROR(N'某些未同步的数据已经丢失', 16, 1);
       BREAK;
    END;
    IF @last_sync_version IS NULL
    BEGIN
       RAISERROR(N'源数据异常,同步版本号小于已经同步的版本号', 16, 1);
       BREAK;
    END;
   
    SET @date_delay = DATEADD(Second, @interval, GETDATE());
    WAITFOR TIME @date_delay;
END
GO
 
3.同步测试
-- ====================================================
-- 测试
USE DB_test;
GO
-- A.01. 变更源表数据
INSERT dbo.tb_source(
    col1)
VALUES(
    1);
 
INSERT dbo.tb_source(
    col1)
VALUES(
    2),
(
    3);
 
UPDATE dbo.tb_source SET
    col2 = 'update'
WHERE pk_id = 2;
 
-- A.02. 同步源表数据,并查询同步结果
DECLARE
    @last_sync_version bigint,
    @min_valid_version bigint;
 
EXEC dbo.p_SyncChangeData_tb_Srouce_Target
    @last_sync_version = @last_sync_version OUTPUT,
    @min_valid_version = @min_valid_version OUTPUT;
IF @min_valid_version IS NOT NULL
    RAISERROR(N'某些未同步的数据已经丢失', 16, 1);
IF @last_sync_version IS NULL
    RAISERROR(N'源数据异常,同步版本号小于已经同步的版本号', 16, 1);
 
SELECT * FROM dbo.tb_source;
SELECT * FROM dbo.tb_Target;
SELECT * FROM dbo.tb_Change_Tracking;
 
-- B.01. 变更源表数据
INSERT dbo.tb_source(
    col1)
VALUES(
    11);
 
UPDATE dbo.tb_source SET
    col2 = 'update.1'
WHERE pk_id = 2;
 
UPDATE dbo.tb_source SET
    col2 = 'update 3'
WHERE pk_id = 3;
 
DELETE FROM dbo.tb_source
WHERE pk_id < 3;
 
SET IDENTITY_INSERT dbo.tb_source ON;
INSERT dbo.tb_source(
    pk_id, col1, col2)
VALUES(
    1, 11, 'insert');
SET IDENTITY_INSERT dbo.tb_source OFF;
 
-- B.02. 同步源表数据,并查询同步结果
EXEC dbo.p_SyncChangeData_tb_Srouce_Target
    @last_sync_version = @last_sync_version OUTPUT,
    @min_valid_version = @min_valid_version OUTPUT;
IF @min_valid_version IS NOT NULL
    RAISERROR(N'某些未同步的数据已经丢失', 16, 1);
IF @last_sync_version IS NULL
    RAISERROR(N'源数据异常,同步版本号小于已经同步的版本号', 16, 1);
 
SELECT * FROM dbo.tb_source;
SELECT * FROM dbo.tb_Target;
SELECT * FROM dbo.tb_Change_Tracking;
GO
 
4.删除示例测试环境
-- ====================================================
-- /* -- 删除测试环境
USE DB_test;
GO
IF OBJECT_ID(N'dbo.p_SyncChangeData_tb_Srouce_Target') IS NOT NULL
    DROP PROC dbo.p_SyncChangeData_tb_Srouce_Target;
 
IF OBJECT_ID(N'dbo.tb_source') IS NOT NULL
    DROP TABLE dbo.tb_source;
 
IF OBJECT_ID(N'dbo.tb_Target') IS NOT NULL
    DROP TABLE dbo.tb_Target;
 
IF OBJECT_ID(N'dbo.tb_Change_Tracking') IS NOT NULL
    DROP TABLE dbo.tb_Change_Tracking;
GO
--*/
/*-- 删除测试数据库
USE master;
GO
ALTER DATABASE DB_test SET
    SINGLE_USER
    WITH
       ROLLBACK AFTER 0;
GO
DROP DATABASE DB_test;
--*/
 
附:
关于使用更改跟踪实现数据同步的一些补充说明:
1.  使用 SNAPSHOT 事务隔离级别
在进行同步的处理中,需要记录当前已经处理的最后一个版本号;而在查询更改跟踪信息时,无法指定要查询的更改跟踪信息的结束版本号。这就要求同步处理过程中,无论是查询更改的最后一个版本号,还是查询更改跟踪信息,都能保证是同一个变更版本的结束版本号。把所有的处理放在事务中,并使用SNAPSHOT事务隔离级别可以保证这点。
2.  自动同步
更改跟踪的信息只能被查询,所以在示例中,是在数据变更后,手动运行同步的存储过程实现数据同步。如果要自动同步,则只能使用定时查询的方式。本示例提供了一个名为“dbo.p_SyncChangeData_Circle_tb_Srouce_Target”的存储过程,可以使用Job或者其他程序在SQL Server启动时,调用这个存储过程,则这个存储过程会定时检查数据变更,并实现数据变更的自动同步。
 
本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/zjcxc/archive/2009/02/23/3924959.aspx

    本站是提供个人知识管理的网络存储空间,所有内容均由用户发布,不代表本站观点。请注意甄别内容中的联系方式、诱导购买等信息,谨防诈骗。如发现有害或侵权内容,请点击一键举报。
    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多