使用cminer解码源库的某张表
1.源库环境
假设源库环境如下,下面的sql操作如果如果没有特别的说明,均在cminer数据库中执行
项 | 说明 |
---|---|
ip | 10.198.170.89 |
端口 | 1921 |
数据库用户 | cminer |
数据库用户密码 | cminer |
数据库名 | postgres |
源名称 | 10.198.170.89:1921 |
想要解码的表 | test |
2.配置cminer
2.1 配置cminer_settings
在使用cminer时,需要先配置cminer_settings表,将相关信息配置进去。
key | val | 说明 |
---|---|---|
output_mode | 1 | 输出模式,0:将解码后的数据写到数据库,1:将解码后的数据写到out_path目录下的文件里 |
wal_path | /home/cminer/waldata | 拉取的源库wal日志的存放位置,用于后续解码使用 |
db_host | 10.198.170.89 | 源库host |
db_port | 1921 | 源库port |
db_user | cminer | 源库数据库用户名 |
db_password | cminer | 源库数据库用户密码 |
db_name | postgres | 源库数据库名称 |
max_rows_per_commit | 100 | 保持默认值不修改 |
max_bytes_per_commit | 131072 | 保持默认值不修改 |
out_path | /home/cminer/outdata | 输出目录,当output_mode设为1时,解码后的数据会输出到该目录下的文件 |
middle_path | /home/cminer/middle_data | 中间值目录 |
source_name | 10.198.170.89:1921 | 源库名称 |
lic | 许可证 |
select * from cminer_settings;
输出如下
key | val
----------------------+---------------------------
output_mode | 1
wal_path | /home/cminer/waldata
db_host | 10.198.170.89
db_port | 1921
db_user | cminer
db_password | cminer
db_name | postgres
max_rows_per_commit | 100
max_bytes_per_commit | 131072
out_path | /home/cminer/outdata
middle_path | /home/cminer/middle_data
source_name | 127.0.0.1:1921
lic |
(13 rows)
检查一下上面配置的wal_path
,out_path
,middle_path
这三个目录是否存在且具有权限。如果要更新目录位置,可以使用下面的命令
-- 更新wal_path参数
update cminer_settings set val='/home/postgres/waldata' where key='wal_path';
-- 更新out_path参数
update cminer_settings set val='/home/postgres/outdata' where key='out_path';
-- 更新middle_path参数
update cminer_settings set val='/home/postgres/middle_data' where key='middle_path';
2.2 在cminer库中配置要解码的表
将要解码的表信息插入cminer_decode_table_def表,如果有多张表,一张表对应一行。
postgres=# select * from cminer_decode_table_def;
datname | nspname | relname
---------+---------+---------
(0 rows)
字段 | 说明 |
---|---|
datname | 这个表所在的数据库名称 |
nspname | 这个表所在的schema名称 |
relname | 表名称 |
insert into cminer_decode_table_def(datname,nspname,relname) values('postgres', 'public', 'test');
2.3 拉取解码需要的字典信息
将解码时需要的表结构字典信息拉取过来,如果执行报错,说明之前拉取过数据字典,需要先清空
清空字典信息
truncate table cminer_pg_attribute,cminer_pg_class,cminer_pg_database,cminer_pg_namespace,cminer_pkdef;
拉取字典信息
select cminer_build_dictionary();
2.4 拉取wal日志
使用pg_receivewal把要解码的源库的wal日志实时的拉到wal_path
的目录/home/cminer/waldata
,后续的解码程序会读此目录下的wal文件进行解码
# 让这个命令一直运行,后续的操作在另一个窗口进行,或让上面的命令以nohup的方式运行
pg_receivewal -h 10.198.170.89 -Ucminer -D /home/postgres/waldata
登录源库,在源库刷新检查点并切换wal日志
checkpoint;
select pg_switch_wal();
2.5 初始化解码位置
如果表cminer_decode_position
中第一行各列的值不为空,则用下面的sql将各列的值设置为空
UPDATE cminer_decode_position set checkpoint_lsn=null,checkpoint_timeline=null,decode_lsn=null,decode_timeline=null,oldest_trans_begin_lsn=null,oldest_trans_timeline=null;
初始化解码
select cminer_init_decode_lsn();
wal_path目录下需要有有效的wal日志且能找到checkpoint才可以初始化解码
3.启动解码
启动解码进程
select cminer_start_decode();
查看解码进程
postgres=# select * from cminer_worker;
pid | application_name | backend_start | state | query
------+----------------------+-------------------------------+-------+-----------------
1982 | cminer_decode_worker | 2023-07-02 22:05:41.871801+08 | idle | lsn: 0/00000000
(1 row)
4.测试解码
在源库做一个测试,插入、更新、删除一些行
insert into test values(1, '1');
insert into test values(2, '2');
insert into test values(3, '3');
update test set t='11' where id=1;
update test set t='22' where id=2;
delete from test where id=3;
到cminer所在主机的out_path
目录下查看解码出来的数据
```
[cminer@cminer:/]$cd /home/cminer/outdata
[cminer@cminer:/home/cminer/outdata]$ls
cmd_00000000000000000001 cmi_00000000000000000001
[cminer@cminer:/home/cminer/outdata]$ls -l
total 0
-rw------- 1 halo halo 0 Jul 2 22:05 cmd_00000000000000000001
-rw------- 1 halo halo 0 Jul 2 22:05 cmi_00000000000000000001
查看解出来的内容:
[cminer@cminer:/home/cminer/outdata]$cat cmd_00000000000000000001
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "insert","new":{"id":6, "t":"666"}, "pk":{"id":6},"ts_ms":1688307886985, "timeline":138, "lsn":"0/22000060"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":1, "t":null}, "pk":{"id":1},"ts_ms":1688308204190, "timeline":138, "lsn":"0/220004E8"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":2, "t":null}, "pk":{"id":2},"ts_ms":1688308204190, "timeline":138, "lsn":"0/22000528"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":3, "t":null}, "pk":{"id":3},"ts_ms":1688308204190, "timeline":138, "lsn":"0/22000568"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":4, "t":null}, "pk":{"id":4},"ts_ms":1688308204190, "timeline":138, "lsn":"0/220005A8"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":5, "t":null}, "pk":{"id":5},"ts_ms":1688308204190, "timeline":138, "lsn":"0/220005E8"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":6, "t":null}, "pk":{"id":6},"ts_ms":1688308204190, "timeline":138, "lsn":"0/22000628"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "insert","new":{"id":1, "t":"111"}, "pk":{"id":1},"ts_ms":1688308222518, "timeline":138, "lsn":"0/220006D0"}
{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "update","old":{"id":1, "t":"111"},"new":{"id":1, "t":"aaa"}, "pk":{"id":1},"ts_ms":1688308253341, "timeline":138, "lsn":"0/22000890"}
目录