使用cminer解码源库的某张表
1.源库环境
假设源库环境如下,下面的sql操作如果如果没有特别的说明,均在cminer数据库中执行
| 项 | 说明 |
|---|---|
| ip | 10.198.170.89 |
| 端口 | 1921 |
| 数据库用户 | cminer |
| 数据库用户密码 | cminer |
| 数据库名 | postgres |
| 源名称 | 10.198.170.89:1921 |
| 想要解码的表 | test |
2.配置cminer
2.1 配置cminer_settings
在使用cminer时,需要先配置cminer_settings表,将相关信息配置进去。
| key | val | 说明 |
|---|---|---|
| output_mode | 1 | 输出模式,0:将解码后的数据写到数据库,1:将解码后的数据写到out_path目录下的文件里 |
| wal_path | /home/cminer/waldata | 拉取的源库wal日志的存放位置,用于后续解码使用 |
| db_host | 10.198.170.89 | 源库host |
| db_port | 1921 | 源库port |
| db_user | cminer | 源库数据库用户名 |
| db_password | cminer | 源库数据库用户密码 |
| db_name | postgres | 源库数据库名称 |
| max_rows_per_commit | 100 | 保持默认值不修改 |
| max_bytes_per_commit | 131072 | 保持默认值不修改 |
| out_path | /home/cminer/outdata | 输出目录,当output_mode设为1时,解码后的数据会输出到该目录下的文件 |
| middle_path | /home/cminer/middle_data | 中间值目录 |
| source_name | 10.198.170.89:1921 | 源库名称 |
| lic | 许可证 |
select * from cminer_settings;
输出如下
key | val----------------------+---------------------------output_mode | 1wal_path | /home/cminer/waldatadb_host | 10.198.170.89db_port | 1921db_user | cminerdb_password | cminerdb_name | postgresmax_rows_per_commit | 100max_bytes_per_commit | 131072out_path | /home/cminer/outdatamiddle_path | /home/cminer/middle_datasource_name | 127.0.0.1:1921lic |(13 rows)
检查一下上面配置的wal_path,out_path,middle_path这三个目录是否存在且具有权限。如果要更新目录位置,可以使用下面的命令
-- 更新wal_path参数update cminer_settings set val='/home/postgres/waldata' where key='wal_path';-- 更新out_path参数update cminer_settings set val='/home/postgres/outdata' where key='out_path';-- 更新middle_path参数update cminer_settings set val='/home/postgres/middle_data' where key='middle_path';
2.2 在cminer库中配置要解码的表
将要解码的表信息插入cminer_decode_table_def表,如果有多张表,一张表对应一行。
postgres=# select * from cminer_decode_table_def;datname | nspname | relname---------+---------+---------(0 rows)
| 字段 | 说明 |
|---|---|
| datname | 这个表所在的数据库名称 |
| nspname | 这个表所在的schema名称 |
| relname | 表名称 |
insert into cminer_decode_table_def(datname,nspname,relname) values('postgres', 'public', 'test');
2.3 拉取解码需要的字典信息
将解码时需要的表结构字典信息拉取过来,如果执行报错,说明之前拉取过数据字典,需要先清空
清空字典信息
truncate table cminer_pg_attribute,cminer_pg_class,cminer_pg_database,cminer_pg_namespace,cminer_pkdef;
拉取字典信息
select cminer_build_dictionary();
2.4 拉取wal日志
使用pg_receivewal把要解码的源库的wal日志实时的拉到wal_path的目录/home/cminer/waldata,后续的解码程序会读此目录下的wal文件进行解码
# 让这个命令一直运行,后续的操作在另一个窗口进行,或让上面的命令以nohup的方式运行pg_receivewal -h 10.198.170.89 -Ucminer -D /home/postgres/waldata
登录源库,在源库刷新检查点并切换wal日志
checkpoint;select pg_switch_wal();
2.5 初始化解码位置
如果表cminer_decode_position中第一行各列的值不为空,则用下面的sql将各列的值设置为空
UPDATE cminer_decode_position set checkpoint_lsn=null,checkpoint_timeline=null,decode_lsn=null,decode_timeline=null,oldest_trans_begin_lsn=null,oldest_trans_timeline=null;
初始化解码
select cminer_init_decode_lsn();
wal_path目录下需要有有效的wal日志且能找到checkpoint才可以初始化解码
3.启动解码
启动解码进程
select cminer_start_decode();
查看解码进程
postgres=# select * from cminer_worker;pid | application_name | backend_start | state | query------+----------------------+-------------------------------+-------+-----------------1982 | cminer_decode_worker | 2023-07-02 22:05:41.871801+08 | idle | lsn: 0/00000000(1 row)
4.测试解码
在源库做一个测试,插入、更新、删除一些行
insert into test values(1, '1');insert into test values(2, '2');insert into test values(3, '3');update test set t='11' where id=1;update test set t='22' where id=2;delete from test where id=3;
到cminer所在主机的out_path目录下查看解码出来的数据
```[cminer@cminer:/]$cd /home/cminer/outdata[cminer@cminer:/home/cminer/outdata]$lscmd_00000000000000000001 cmi_00000000000000000001[cminer@cminer:/home/cminer/outdata]$ls -ltotal 0-rw------- 1 halo halo 0 Jul 2 22:05 cmd_00000000000000000001-rw------- 1 halo halo 0 Jul 2 22:05 cmi_00000000000000000001
查看解出来的内容:
[cminer@cminer:/home/cminer/outdata]$cat cmd_00000000000000000001{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "insert","new":{"id":6, "t":"666"}, "pk":{"id":6},"ts_ms":1688307886985, "timeline":138, "lsn":"0/22000060"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":1, "t":null}, "pk":{"id":1},"ts_ms":1688308204190, "timeline":138, "lsn":"0/220004E8"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":2, "t":null}, "pk":{"id":2},"ts_ms":1688308204190, "timeline":138, "lsn":"0/22000528"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":3, "t":null}, "pk":{"id":3},"ts_ms":1688308204190, "timeline":138, "lsn":"0/22000568"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":4, "t":null}, "pk":{"id":4},"ts_ms":1688308204190, "timeline":138, "lsn":"0/220005A8"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":5, "t":null}, "pk":{"id":5},"ts_ms":1688308204190, "timeline":138, "lsn":"0/220005E8"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "delete","old":{"id":6, "t":null}, "pk":{"id":6},"ts_ms":1688308204190, "timeline":138, "lsn":"0/22000628"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "insert","new":{"id":1, "t":"111"}, "pk":{"id":1},"ts_ms":1688308222518, "timeline":138, "lsn":"0/220006D0"}{"source_name":"10.198.170.89:1921","dbname":"postgres","schema":"public","table":"test","opr": "update","old":{"id":1, "t":"111"},"new":{"id":1, "t":"aaa"}, "pk":{"id":1},"ts_ms":1688308253341, "timeline":138, "lsn":"0/22000890"}
目录