首页
产品
CLup:PostgreSQL高可用集群平台 CMiner: PostgreSQL中的CDC CData高性能数据库云一体机 CBackup数据库备份恢复云平台 CPDA高性能双子星数据库机 CSYun超融合虚拟机产品
解决方案
数据库专业技术服务全栈式PostgreSQL解决方案Oracle分布式存储化数据库云
文章
客户及伙伴
中启开源
关于我们
公司简介 联系我们
中启开源

本文详细讲解了一种如何把Oracle中的表的数据增量同步到PostgreSQL中的方法,此方法不需要做写程序,只需要使用使用PG的插件oracle_fdw就可以完成,比较方便和易操作。

1. 增量数据迁移的理论基础

我们先看通常的整个数据迁移过程:

可以看到本方法的亮点在于,记录增量时,并不把所有的增量数据全部记录下来,只是把发生变化的数据表的rowid或主键记录下来,然后增量时通过批量反查主库,然后把数据“merge”到目标数据库完成了一次增量同步。由于只记录了变化数据的行的rowid或主键,所以对原数据库的写性能影响比较少。

由于是批量“merge”,整体的效率是比一条一条的增量同步快,另在一条一条的增量同步中,多次增量同步的数据不能有交集,否则可能会报错。

虽然说是批量“merge”,但数据库本身并没有这种支持增删改的批量“merge”,需要分增、删除、改。
同一条记录如果被增、删、改多次时,在一条一条的同步方法中,需要考虑先后次序,但通过本方法中是通过表在数据增量同步前与后的数据发生变化的总体情况来全局考虑,避免了这个问题,这时把原理再详细说明一下:

下面讲述实际的操作过程:

2. Oracle中记录增量日志的方法

3. 具体增量同步的方法

我们以一张名为emp表做同步的示例,这张表emp是oracle中的一张练习表,可以通过执行oracle提供的脚本“$ORACLE_HOME/rdbms/admin/scott.sql”来建立。

为了详细讲解如何从源库中反查数据,我们把反查数据的方法逻辑上分拆成几步:

3.1 反查时数据的具体方法

3.2 创建oracle_fdw

在PostgreSQL建同步的表。

3.3 增量同步的操作步骤

3.4 清理已同步过的增量日志

4. 增量同步方法的改进

上面的整个操作过程比较复杂,如果有很多表要同步,手工操作起来比较麻烦,可以优化:

思路:可以把上面的手工过程封装在函数中,把对在Oracle数据库中的ddl操作都能在PG中执行。

  1. CREATE FOREIGN TABLE fdw_replica_cmd (
  2. cmd varchar(4000)
  3. ) SERVER oradb OPTIONS ( schema 'SCOTT', table 'REPLICA_CMD');
  1. INSERT INTO fdw_replica_cmd values('CREATE TABLE test01(id number)');
  1. CREATE OR REPLACE FUNCTION add_table_to_replica(arg_schema_name text, arg_table_name text)
  2. RETURNS text AS
  3. $BODY$
  4. DECLARE
  5. full_table_name text := arg_schema_name||'.'||arg_table_name;
  6. r1 RECORD;
  7. r2 RECORD;
  8. cols_name_list text[];
  9. cols_type_list text[];
  10. pk_name_list text[];
  11. cols_name_str text;
  12. cols_name_type_str text;
  13. create_tmp_table_sql text;
  14. a_cols_name_str text;
  15. ab_join_cond_str text;
  16. mt_join_cond_str text;
  17. pk_null_str text;
  18. cols_null_str text;
  19. item text;
  20. va text;
  21. i int;
  22. out_info text;
  23. local_sql text;
  24. ora_sql text;
  25. BEGIN
  26. FOR r1 IN
  27. SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod) as coltype,a.attnum
  28. FROM pg_catalog.pg_attribute a
  29. WHERE a.attrelid = full_table_name::regclass
  30. AND a.attnum > 0
  31. AND NOT a.attisdropped
  32. ORDER BY a.attnum
  33. LOOP
  34. cols_name_list := cols_name_list || r1.attname::text;
  35. cols_type_list := cols_type_list || r1.coltype::text;
  36. END LOOP;
  37. FOR r2 IN
  38. SELECT
  39. pg_attribute.attname,
  40. format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
  41. FROM pg_index, pg_class, pg_attribute
  42. WHERE
  43. pg_class.oid = full_table_name::regclass AND
  44. indrelid = pg_class.oid AND
  45. pg_attribute.attrelid = pg_class.oid AND
  46. pg_attribute.attnum = any(pg_index.indkey)
  47. AND indisprimary
  48. LOOP
  49. pk_name_list := pk_name_list || r2.attname::text;
  50. END LOOP;
  51. i :=1;
  52. FOREACH item IN ARRAY cols_name_list
  53. LOOP
  54. IF i = 1 THEN
  55. cols_name_type_str := cols_name_list[i] ||' '||cols_type_list[i];
  56. ELSE
  57. cols_name_type_str := cols_name_type_str || ', ' || cols_name_list[i] ||' '||cols_type_list[i];
  58. END IF;
  59. i := i+1;
  60. END LOOP;
  61. i :=1;
  62. FOREACH item IN ARRAY pk_name_list
  63. LOOP
  64. IF i = 1 THEN
  65. ab_join_cond_str := 'a.'||item ||' = b.'||item;
  66. mt_join_cond_str := 'm.'||item ||' = t.'||item;
  67. ELSE
  68. ab_join_cond_str := ab_join_cond_str || ' AND a.' || item ||' = b.'||item;
  69. mt_join_cond_str := mt_join_cond_str || ' AND m.' || item ||' = t.'||item;
  70. END IF;
  71. i := i+1;
  72. END LOOP;
  73. i := 1;
  74. FOREACH item IN ARRAY cols_name_list
  75. LOOP
  76. IF i =1 THEN
  77. a_cols_name_str := 'a.'||item;
  78. ELSE
  79. a_cols_name_str := a_cols_name_str||', a.'||item;
  80. END IF;
  81. i := i + 1;
  82. END LOOP;
  83. -- 生成pk_null_str,和cols_null_str
  84. -- pk_null_str中除了主键的列名外,其它填null
  85. -- cols_null_str即对每一列填null
  86. pk_null_str = '';
  87. i := 1;
  88. FOREACH item IN ARRAY cols_name_list
  89. LOOP
  90. IF item = ANY(pk_name_list) THEN
  91. va := item;
  92. ELSE
  93. va := 'null';
  94. END IF;
  95. IF i =1 THEN
  96. pk_null_str := va;
  97. cols_null_str :='null';
  98. ELSE
  99. pk_null_str := pk_null_str ||', '||va;
  100. cols_null_str := cols_null_str || ', null';
  101. END IF;
  102. i := i + 1;
  103. END LOOP;
  104. -- Oracle中生成表的物化视图log
  105. ora_sql := format('CREATE MATERIALIZED VIEW LOG ON %s.%s WITH PRIMARY KEY', arg_schema_name, arg_table_name);
  106. INSERT INTO fdw_replica_cmd values( ora_sql);
  107. out_info := format(E'Run in oracle: %s\n', ora_sql);
  108. -- Oracle中生成用于同步的一个的视图
  109. ora_sql := format( E'CREATE VIEW vw____%s AS \n'||
  110. E'SELECT 1 as x____action, null as x____row_id, %s FROM %s a \n'||
  111. E' WHERE EXISTS (SELECT 1 FROM mlog$_%s b WHERE %s)\n' ||
  112. E'UNION ALL \n'||
  113. E'SELECT DISTINCT 2 AS x____action, null AS x____row_id, %s FROM mlog$_%s a \n'||
  114. E' WHERE NOT EXISTS (SELECT 1 FROM %s b WHERE %s) \n'||
  115. E'UNION ALL \n'||
  116. E'SELECT 3 AS x____action, rowidtochar(rowid) AS x____row_id, %s FROM mlog$_%s',
  117. arg_table_name, a_cols_name_str, arg_table_name, arg_table_name, ab_join_cond_str,
  118. pk_null_str, arg_table_name, arg_table_name, ab_join_cond_str,
  119. cols_null_str, arg_table_name
  120. );
  121. INSERT INTO fdw_replica_cmd values( ora_sql);
  122. out_info := out_info || format(E'Run in oracle: %s\n', ora_sql);
  123. -- Oracle中的增量数据的视图映射到本地的一个外部表上
  124. local_sql := format ( E'CREATE FOREIGN TABLE vw____%s ( \n'||
  125. E'x____action smallint NOT NULL, \n' ||
  126. E'x____row_id text, \n' ||
  127. E'%s )' ||
  128. E'SERVER oradb OPTIONS ( schema \'%s\', table \'VW____%s\')',
  129. arg_table_name,
  130. cols_name_type_str,
  131. upper(arg_schema_name),
  132. upper(arg_table_name)
  133. );
  134. EXECUTE local_sql;
  135. out_info := out_info || format(E'Run: %s\n', local_sql);
  136. -- Oracle中的远程的表映射到本地的一个外部表上
  137. local_sql := format ( E'CREATE FOREIGN TABLE fdw____%s ( \n'||
  138. E'%s )' ||
  139. E'SERVER oradb OPTIONS ( schema \'%s\', table \'%s\')',
  140. arg_table_name,
  141. cols_name_type_str,
  142. upper(arg_schema_name),
  143. upper(arg_table_name)
  144. );
  145. EXECUTE local_sql;
  146. out_info := out_info || format(E'Run: %s\n', local_sql);
  147. return out_info;
  148. END;
  149. $BODY$
  150. LANGUAGE plpgsql VOLATILE;
  1. CREATE OR REPLACE FUNCTION remove_table_from_replica(arg_schema_name text, arg_table_name text)
  2. RETURNS text AS
  3. $BODY$
  4. DECLARE
  5. out_info text;
  6. local_sql text;
  7. ora_sql text;
  8. v_detail text;
  9. BEGIN
  10. -- 删除在Oracle中生成表的物化视图log
  11. ora_sql := format('DROP MATERIALIZED VIEW LOG ON %s.%s', arg_schema_name, arg_table_name);
  12. BEGIN
  13. INSERT INTO fdw_replica_cmd values( ora_sql);
  14. out_info := format(E'Run in oracle: %s\n', ora_sql);
  15. EXCEPTION
  16. WHEN OTHERS THEN
  17. GET STACKED DIAGNOSTICS v_detail = PG_EXCEPTION_DETAIL;
  18. out_info := format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n %s\n', ora_sql, SQLSTATE, SQLERRM, v_detail);
  19. END;
  20. -- 删除在Oracle中生成用于同步的一个的视图
  21. ora_sql := format( E'DROP VIEW %s.vw____%s', arg_schema_name, arg_table_name);
  22. BEGIN
  23. INSERT INTO fdw_replica_cmd values( ora_sql);
  24. out_info := out_info || format(E'Run in oracle: %s\n', ora_sql);
  25. EXCEPTION
  26. WHEN OTHERS THEN
  27. GET STACKED DIAGNOSTICS v_detail = PG_EXCEPTION_DETAIL;
  28. out_info := out_info || format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n %s\n', ora_sql, SQLSTATE, SQLERRM, v_detail);
  29. END;
  30. -- 删除把Oracle中的增量数据的视图映射到本地的外部表
  31. local_sql := format ('DROP FOREIGN TABLE %s.vw____%s', arg_schema_name, arg_table_name);
  32. BEGIN
  33. EXECUTE local_sql;
  34. out_info := out_info || format(E'Run: %s\n', local_sql);
  35. EXCEPTION
  36. WHEN OTHERS THEN
  37. out_info := out_info || format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n', ora_sql, SQLSTATE, SQLERRM);
  38. END;
  39. -- 删除把Oracle中的远程的表映射到本地的外部表上
  40. local_sql := format ( 'DROP FOREIGN TABLE %s.fdw____%s', arg_schema_name, arg_table_name);
  41. BEGIN
  42. EXECUTE local_sql;
  43. out_info := out_info || format(E'Run: %s\n', local_sql);
  44. EXCEPTION
  45. WHEN OTHERS THEN
  46. out_info := out_info || format(E'Run in oracle failed: %s;\n SQLSTATE=%s, %s\n', ora_sql, SQLSTATE, SQLERRM);
  47. END;
  48. return out_info;
  49. END;
  50. $BODY$
  51. LANGUAGE plpgsql VOLATILE;
  1. CREATE OR REPLACE FUNCTION refresh_increment(arg_schema_name text, arg_table_name text)
  2. RETURNS text AS
  3. $BODY$
  4. DECLARE
  5. full_table_name text := arg_schema_name||'.'||arg_table_name;
  6. tmp_table_name text := 'x____tmp_'||arg_schema_name||'_'||arg_table_name;
  7. r1 RECORD;
  8. r2 RECORD;
  9. cols_name_list text[];
  10. cols_type_list text[];
  11. pk_name_list text[];
  12. cols_name_str text;
  13. cols_name_type_str text;
  14. create_tmp_table_sql text;
  15. a_cols_name_str text;
  16. ab_join_cond_str text;
  17. mt_join_cond_str text;
  18. pk_null_str text;
  19. cols_null_str text;
  20. up_set_str text;
  21. item text;
  22. va text;
  23. i int;
  24. insert_sql text;
  25. merge_sql text;
  26. delete_sql text;
  27. tj_sql text;
  28. merge_nums int;
  29. delete_nums int;
  30. BEGIN
  31. FOR r1 IN
  32. SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod) as coltype,a.attnum
  33. FROM pg_catalog.pg_attribute a
  34. WHERE a.attrelid = full_table_name::regclass
  35. AND a.attnum > 0
  36. AND NOT a.attisdropped
  37. ORDER BY a.attnum
  38. LOOP
  39. cols_name_list := cols_name_list || r1.attname::text;
  40. cols_type_list := cols_type_list || r1.coltype::text;
  41. END LOOP;
  42. FOR r2 IN
  43. SELECT
  44. pg_attribute.attname,
  45. format_type(pg_attribute.atttypid, pg_attribute.atttypmod)
  46. FROM pg_index, pg_class, pg_attribute
  47. WHERE
  48. pg_class.oid = full_table_name::regclass AND
  49. indrelid = pg_class.oid AND
  50. pg_attribute.attrelid = pg_class.oid AND
  51. pg_attribute.attnum = any(pg_index.indkey)
  52. AND indisprimary
  53. LOOP
  54. pk_name_list := pk_name_list || r2.attname::text;
  55. END LOOP;
  56. i :=1;
  57. FOREACH item IN ARRAY cols_name_list
  58. LOOP
  59. IF i = 1 THEN
  60. cols_name_type_str := cols_name_list[i] ||' '||cols_type_list[i];
  61. ELSE
  62. cols_name_type_str := cols_name_type_str || ', ' || cols_name_list[i] ||' '||cols_type_list[i];
  63. END IF;
  64. i := i+1;
  65. END LOOP;
  66. i :=1;
  67. FOREACH item IN ARRAY pk_name_list
  68. LOOP
  69. IF i = 1 THEN
  70. ab_join_cond_str := 'a.'||item ||' = b.'||item;
  71. mt_join_cond_str := 'm.'||item ||' = t.'||item;
  72. ELSE
  73. ab_join_cond_str := ab_join_cond_str || ' AND a.' || item ||' = b.'||item;
  74. mt_join_cond_str := mt_join_cond_str || ' AND m.' || item ||' = t.'||item;
  75. END IF;
  76. i := i+1;
  77. END LOOP;
  78. cols_name_str := array_to_string(cols_name_list, ',');
  79. -- 组合a_cols_name_str内容为:a.col1, a.col2, a.col3 ...
  80. i := 1;
  81. FOREACH item IN ARRAY cols_name_list
  82. LOOP
  83. IF i =1 THEN
  84. a_cols_name_str := 'a.'||item;
  85. ELSE
  86. a_cols_name_str := a_cols_name_str||', a.'||item;
  87. END IF;
  88. i := i + 1;
  89. END LOOP;
  90. pk_null_str = '';
  91. i := 1;
  92. FOREACH item IN ARRAY cols_name_list
  93. LOOP
  94. IF item = ANY(pk_name_list) THEN
  95. va := item;
  96. ELSE
  97. va := 'null';
  98. END IF;
  99. IF i =1 THEN
  100. pk_null_str := va;
  101. cols_null_str :='null';
  102. ELSE
  103. pk_null_str := pk_null_str ||', '||va;
  104. cols_null_str := cols_null_str || ', null';
  105. END IF;
  106. i := i + 1;
  107. END LOOP;
  108. -- 生成update语句中的set col1=v1,col2=v2的字符串
  109. i := 1;
  110. FOREACH item IN ARRAY cols_name_list
  111. LOOP
  112. IF item = ANY(pk_name_list) THEN
  113. CONTINUE;
  114. END IF;
  115. IF i =1 THEN
  116. up_set_str := item ||' = t.'||item;
  117. ELSE
  118. up_set_str := up_set_str || ',' || item || ' = t.' || item;
  119. END IF;
  120. i := i + 1;
  121. END LOOP;
  122. -- 增量数据需要访问多次,如果多次访问远程的增量表,每次访问的数据是不一样的,无法保证一致性,因此把远程的增量数据保证到本地的临时表中。
  123. -- 创建这张临时表,保存这次操作的增量数据
  124. create_tmp_table_sql := format(
  125. E'CREATE TEMP TABLE IF NOT EXISTS \n'||
  126. E'%s(x____action int, x____row_id text, %s)',
  127. tmp_table_name, cols_name_type_str);
  128. -- 把远程Oracle的增量数据(即对应本地的一张外部表)的数据插到本地的临时表中
  129. insert_sql := format(
  130. E'INSERT INTO %s\n'||
  131. E'SELECT x____action, x____row_id, %s FROM vw____%s\n',
  132. tmp_table_name, cols_name_str, arg_table_name);
  133. -- merge增量数据的SQL
  134. merge_sql := format(
  135. E'WITH upsert as \n'||
  136. E'(UPDATE %s AS m SET %s\n'||
  137. E' FROM %s t \n'||
  138. E' WHERE t.x____action = 1 AND %s \n'||
  139. E'RETURNING m.*)\n'||
  140. E'INSERT INTO %s \n'||
  141. E'SELECT %s FROM %s a \n'||
  142. E' WHERE a.x____action = 1 \n'||
  143. E' AND NOT EXISTS(SELECT 1 FROM upsert b WHERE %s)',
  144. full_table_name, up_set_str,
  145. tmp_table_name, mt_join_cond_str,
  146. full_table_name, cols_name_str,
  147. tmp_table_name, ab_join_cond_str);
  148. --增量数据中的删除的SQL
  149. delete_sql := format(
  150. E'DELETE FROM %s a WHERE EXISTS(SELECT 1 FROM %s b WHERE x____action=2 AND %s)',
  151. full_table_name, tmp_table_name, ab_join_cond_str);
  152. EXECUTE create_tmp_table_sql;
  153. EXECUTE insert_sql;
  154. EXECUTE merge_sql;
  155. EXECUTE delete_sql;
  156. tj_sql := 'SELECT sum(case x____action when 1 then 1 else 0 end), '||
  157. 'sum(case x____action when 2 then 1 else 0 end) FROM '||tmp_table_name;
  158. EXECUTE tj_sql into merge_nums, delete_nums;
  159. RETURN format('merge %s rows, delete %s rows.', merge_nums, delete_nums);
  160. END;
  161. $BODY$
  162. LANGUAGE plpgsql VOLATILE;
  1. CREATE OR REPLACE FUNCTION clean_current_increment_log(arg_schema_name text, arg_table_name text)
  2. RETURNS int AS
  3. $BODY$
  4. DECLARE
  5. -- 定义在一次循环时清理的rowid个数
  6. rowid_count_in_loop int := 200;
  7. full_table_name text := arg_schema_name||'.'||arg_table_name;
  8. tmp_table_name text := 'x____tmp_'||arg_schema_name||'_'||arg_table_name;
  9. ref refcursor;
  10. rowid text;
  11. all_rowid_str text;
  12. ora_sql text;
  13. clean_sql text;
  14. clean_tmp_table_sql text;
  15. i int;
  16. total int;
  17. BEGIN
  18. i := 0;
  19. total :=0;
  20. -- 循环存储增量的昨时表,找出当时的rowid
  21. OPEN ref FOR EXECUTE 'SELECT x____row_id FROM '|| quote_ident(tmp_table_name)||' WHERE x____action=3';
  22. LOOP
  23. FETCH ref INTO rowid;
  24. EXIT WHEN not found;
  25. IF i = 0 THEN
  26. all_rowid_str := ''''||rowid||'''';
  27. ELSE
  28. all_rowid_str := all_rowid_str || ', '''||rowid||'''';
  29. END IF;
  30. i := i + 1;
  31. IF i > rowid_count_in_loop THEN
  32. ora_sql = format('DELETE FROM mlog$_%s WHERE rowid in (%s)', arg_table_name, all_rowid_str);
  33. INSERT INTO fdw_replica_cmd values( ora_sql);
  34. total := total + i;
  35. i := 0;
  36. all_rowid_str :='';
  37. END IF;
  38. END LOOP;
  39. CLOSE ref;
  40. IF i >0 THEN
  41. ora_sql = format('DELETE FROM mlog$_%s WHERE rowid in (%s)', arg_table_name, all_rowid_str);
  42. INSERT INTO fdw_replica_cmd values( ora_sql);
  43. total := total + i;
  44. END IF;
  45. IF total > 0 THEN
  46. clean_tmp_table_sql = 'TRUNCATE TABLE '||quote_ident(tmp_table_name);
  47. EXECUTE clean_tmp_table_sql;
  48. END IF;
  49. return total;
  50. END;
  51. $BODY$
  52. LANGUAGE plpgsql VOLATILE;