安装过程
1. 创建cminer的用户
新建cminer需要的用户:
groupadd -g 581 cminer1
useradd -m -g cminer1 -u 581 cminer1
如果这台机器上要创建多套cminer,则需要见不同的用户,如cminer2、cminer3等等。例如建cminer2用户的命令如下
groupadd -g 582 cminer2
useradd -m -g cminer2 -u 582 cminer2
把cminer的安装包 cminer_pg14.tar.xz 拷贝到/home/cminer1,以用户cminer1登陆机器,解压cminer.tar.xz:
cd /home/cminer1
tar -xf cminer2.0_pg14.tar.gz
如果有多个cminer2、cminer3等用户,需要分别在cminer2和cminer3等用户下分别执行上面的动作。
ls 查看
[cminer1@cbbackup_21 ~]$ pwd
/home/cminer1
[cminer2@cbbackup_21 ~]$ ls -l
drwxr-xr-x. 2 cminer1 cminer1 198 May 11 17:37 bin
drwxrwxr-x. 2 cminer1 cminer1 36 May 11 22:13 log
drwx------. 20 cminer1 cminer1 4096 May 2 22:14 pgdata14.2_16
drwx------. 20 cminer1 cminer1 4096 May 12 13:49 pgdata14.2_8
drwxr-xr-x. 6 cminer1 cminer1 56 Apr 23 00:59 pgsql-14.2_16
drwxr-xr-x. 6 cminer1 cminer1 56 Apr 23 01:08 pgsql-14.2_8
说明 : 如果目标库的块大小(show block_size;查看)为8K , 则使用 pgdata14.2_8/pgsql-14.2_8
如果目标库的块大小为16K ,使用pgdata14.2_16/pgsql-14.2_16
通过配置环境变量 ~/.bashrc 来使用不同的库
检查用户cminer1的.bashrc:
这里目标库块大小为8K
vi ~/.bashrc
# User specific aliases and functions
export PGDATA=~/pgdata14.2_8
export PATH=~/pgsql-14.2_8/bin:$PATH
export LD_LIBRARY_PATH=~/pgsql-14.2_8/lib:$LD_LIBRARY_PATH
export PGHOST=/tmp
export PGUSER=cminer2
export PGDATABASE=postgres
export LANG=en_US.UTF-8
export PGPORT=2400
程序默认的端口为2400,如果想配置为其他端口,请修改上面文件中的export PGPORT=2400
以及文件“~/pgdata/postgresql.conf”中的端口配置。
查看~/.pgpass密码文件,ip地址是要解码的源库的IP地址,用户密码是要解码的源库中的一个用户,通常是需要在要解码的源库中创建的用户。database名称固定填写replication,CMiner程序是通过流复制协议上面配置的IP地址以及用户密码去连接源库,把WAL日志拉导本地进行解码的。
192.168.0.64:5432:replication:cminer:cminer
如果有多个cminer2、cminer3等用户,需要分别在cminer2和cminer3等用户下分别执行上面的操作。
另还需要看压缩包中是否有readme.md,如果有请按readme.md中的内容进行完善安装需要的其他步骤。
2 安装python环境
cminer的高可用切换使用到了python,而cminer提供了自己的python安装包,并使用自己的python环境。所以cminer并不依赖操作系统上的python环境。
在/opt目录下解压cminer的python软件包:
cd /opt
tar xvf cminer_python3.6.tar.xz
3 在主数据库上的操作
需要在主库上建一个用户,让cminer程序可以通过这个用户到主库上拉取WAL日志和能读取一些系统表:
create user cminer password 'cminer' replication;
注意实际安装时需要把上面的密码“cminer”改成一个安全的不容易被其他人猜测出来的密码。注意同时需要修改在CMiner机器上的cminer1用户下的.pgpass中的密码。
一般情况下,创建的用户都用查询系统表pg_class、pg_attribute、pg_database、pg_namespace、pg_index这些表的权限。如果你的数据库不是官方发布的版本,做过特殊的设置,导致这个用户无法查询这些系统表,则需要给这个用户赋权。
检查cminer用户是否可以查询上面这些系统表:
psql -h 127.0.0.1 -Ucminer postgres
在psql中执行下面的SQL检查cminer用户是否有查询系统表的权限:
select count(*) from pg_class;
select count(*) from pg_attribute;
select * from pg_database;
select * from pg_namespace;
select count(*) pg_index;
如果上面的查询可以正常运行,则不需要额外赋权。
如果不能查询,请给这个用户赋权,保证这个用户可以查询pg_class、pg_attribute、pg_database、pg_namespace、pg_index。
4. 快速配置和运行
4.1 配置
启动CMiner,CMiner本身是一个PostgreSQL数据库,启动这个数据库:
su - cminer1
pg_ctl start
用psql进入到数据库中,可以看到一些以cminer开头的表:
postgres=# \d
List of relations
Schema | Name | Type | Owner
--------+------------------------+-------+----------
public | cminer_checkpoint | table | postgres
public | cminer_content | table | postgres
public | cminer_decode_position | table | postgres
public | cminer_kafka_topic | table | postgres
public | cminer_pg_attribute | table | postgres
public | cminer_pg_class | table | postgres
public | cminer_pg_database | table | postgres
public | cminer_pg_namespace | table | postgres
public | cminer_pkdef | table | postgres
public | cminer_settings | table | postgres
public | cminer_toast_attr | table | postgres
public | cminer_toast_data | table | postgres
public | cminer_transaction | table | postgres
运行之前需要对表cminer_settings做配置:
postgres=# select * from cminer_settings ;
key | val
----------------------------+----------------------------
wal_path | /home/cminer1/waldata
db_host | 127.0.0.1
db_port | 5432
db_user | repl
db_password | repl
db_name | postgres
max_rows_per_commit | 100
max_bytes_per_commit | 131072
out_path | /home/cminer1/outdata
middle_path | /home/cminer1/middle_data
output_mode | 1
wal_segment_size | 16777216
kafka.max_bytes_per_commit | 131072
kafka.max_rows_per_commit | 1000
kafka.bootstrap_servers | 127.0.0.1:9092
kafka.security_protocol | sasl_plaintext
kafka.sasl_mechanisms | PLAIN
kafka.sasl_username |
kafka.sasl_password |
kafka.api_verison_request | true
source_name | 127.0.0.1:5432
(21 rows)
上面的配置项说明:
- source_name: 为这个数据源起一个名字,在生成的每一条数据中会加上这一项,当接收到数据的程序知道数据是从哪儿来的。
- wal_path: 是我们从主库把WAL日志拉过来的目录,默认为“/home/postgres/waldata”。如果刚安装后的环境中此目录不为空,由一些文件,可能是打包CMiner软件之前环境的数据,可以把此目录清空。
- out_path:当我们把输出模式output_mode设置为1时,解码后的数据不再放到数据库中,而是生成到文件中,这些文件的目录就是out_path。如果刚安装后的环境中此目录不为空,由一些文件,可能是打包CMiner软件之前环境的数据,可以把此目录清空。
- middle_path: 程序运行时的一些临时文件的目录。如果刚安装后的环境中此目录不为空,由一些文件,可能是打包CMiner软件之前环境的数据,可以把此目录清空。
- decode_mode: 解码模式,如果没有此项,默认是1, 表示从FPI中找旧行的数据,此时源库的wal_level的级别不是logical,也可以完成解码。当wal_level的级别是logical时,可以把此解码模式设置为2,在此模式下,wal_level必须时logical。
- 其他的参数都是连接参数,如连接主库的IP、端口、用户名、密码、数据库名。这些连接参数主要是为连接主库拉数据字典信息。
- one_trans_spill_size=4194304 : 单位:字节 , 代表内存中一个事务超过4M 就写入outdata
- all_trans_spill_size=134217728 : 内存中的所有未提交的事务 超过128M 就写入outdata
修改相应的配置可以使用update语句例如:
update cminer_settings set val='/home/cminer1/waldata' where key='wal_path';
update cminer_settings set val='/home/cminer1/outdata' where key='out_path';
update cminer_settings set val='/home/cminer1/middledata' where key='middle_path';
update cminer_settings set val='192.168.0.64' where key='db_host';
update cminer_settings set val='5432' where key='db_port';
update cminer_settings set val='192.168.0.64:5432' where key='source_name';
通常我们在主库上建立一个有流复制权限的用户:
create user cminer password 'cminer' replication;
把表cminer_settings中的这些连接参数改成实际的值。
然后把要同步的表配置到表cminer_decode_table_def
,如要同步itpuxdb数据库中schema名称为public下的表 itpux_yg,则执行下面的SQL:
insert into cminer_decode_table_def(datname,nspname,relname) values('itpuxdb', 'public','itpux_yg');
注意要同步的表必须有主键,否则无法同步。
运行之前需要建数据字典,这是因为WAL日志中没有记录表的名称和列的名称,只记录了一个表名称和列名称的内部数字ID,我们的解码程序需要通过这个字典把这些数字翻译成真实的表名和列名,在psql中:
select cminer_build_dictionary();
DETAIL: connection to server at "192.168.0.64", port 5432 failed: FATAL: no pg_hba.conf entry for host "192.168.0.66", user "cminer", database "itpuxdb", no encryption
CONTEXT: PL/pgSQL function cminer_build_dictionary() line 29 at assignment
连接如果报错 , 需要在主库配置cminer机器192.168.0.66 , pg_hba.conf 配置
---------------------------------------------
host all cminer 192.168.0.66/24 md5
host replication cminer 192.168.0.66/24 md5
---------------------------------------------
pg_ctl reload
再次执行如果报错
ERROR: duplicate key value violates unique constraint "cminer_pg_database_pkey"
说明之前拉取过数据字典 , 需要先清空
truncate table cminer_pg_attribute,cminer_pg_class,cminer_pg_database,cminer_pg_namespace,cminer_pkdef;
select cminer_build_dictionary();
NOTICE: connect: hostaddr=192.168.0.64 port=5432 dbname=itpuxdb user=cminer password=cminer
cminer_build_dictionary
-------------------------
(1 row)
-- 返回这个结果说明成功
4.2 启动从主库实时拉WAL日志的程序
启动这个程序之前请保证连接到主库的密码已经配置到了.pgpass文件中,同时在配置表cminer_settings中的主库的IP地址、用户名、密码以及存放WAL的路径都配置正确了,如果没有配置正确,请修改,例如用下面的SQL修改存放WAL的路径:
update cminer_settings set val='/home/cminer1/waldata' where key='wal_path';
这个程序实际上是一个python脚本,它最终调用pg_receivewal命令去主库拉WAL,可以先用pg_receivewal测试是否可以从主库上拉WAL日志:
-- 先新建所需的目录
mkdir -p /home/cminer1/waldata
mkdir -p /home/cminer1/outdata
mkdir -p /home/cminer1/middle_data
pg_receivewal -h 192.168.0.64 -p 5432 -Ucminer -D /home/cminer1/waldata
如果用pg_receivewal命令拉日志没有问题了,就可以启动这个程序了,启动方法是运行脚本cminer_start_receive_wal.sh
:
cminer_start_receive_wal.sh
注意运行上面这个脚本之前需要修改
vi ~/bin/cminer_receive_wal.py
------------------------------------------
g_pg_root = '/home/cminer1/pgsql-14.2_8'
def connect_db():
db_name = 'postgres'
db_host = '192.168.0.66'
db_port = 2400
db_user = 'cminer2'
db_pass = 'cminer2' #这里的是cminer库的超级用户和密码
------------------------------------------
这个程序是以后台的方式运行。用ps命令看进程是否运行起来了:
ps -ef|grep cminer_receive_wal |grep -v grep
ps -ef|grep pg_receivewal |grep -v grep
如果不满足两个进程都正常运行,说明运行错误,检查日志 ~/log/cminer_receive_wal.log
实际可以用pg_receivewal直接收WAL日志,但我们提供的这个接受日志的程序提供了主库HA切换后,自动修改新主库的解码点,自动开始解码的功能。
通常数据库的HA功能是通过流复制搭建了1主多备的数据库+VIP的模式实现,当数据库发生HA切换后,把一台备库激活为新主库,VIP也漂移到新主库上,这是新主库的WAL的日志点会变化,而我们的这个程序可以自动的让解码进程从新主库的WAL日志点继续解码。
4.3 启动解码程序
程序默认的解码程序是把解码后的数据放入一个文件:
[postgres@pg01 ~]$ ls -l /home/postgres/outdata
total 31748
-rw------- 1 postgres postgres 1652845 Sep 28 11:22 cmd_00000000000000000001
-rw------- 1 postgres postgres 1852716 Sep 28 12:03 cmd_00000000000000000002
-rw------- 1 postgres postgres 17910079 Sep 28 12:04 cmd_00000000000000000003
-rw------- 1 postgres postgres 882112 Sep 28 11:22 cmi_00000000000000000001
-rw------- 1 postgres postgres 959024 Sep 28 12:03 cmi_00000000000000000002
-rw------- 1 postgres postgres 9239824 Sep 28 12:04 cmi_00000000000000000003
这种模式是配置output_mode=1。
然后cminer_kafka进程从这个文件中读取数据,然后发送到kafka中。
还有一种解码模式,是把解码后的数据放到数据库中,这种模式是配置output_mode=0,主要做为调试使用。修改模式的方法如下:
update cminer_settings set val='0' where key='output_mode';
启动解码程序需要配置一些参数,实例参数如下:
postgres=# select * from cminer_settings ;
output_mode | 1
db_user | cminer
db_password | cminer
max_rows_per_commit | 100
max_bytes_per_commit | 131072
kafka.max_bytes_per_commit | 131072
kafka.max_rows_per_commit | 1000
kafka.bootstrap_servers | 127.0.0.1:9092
kafka.security_protocol | sasl_plaintext
kafka.sasl_mechanisms | PLAIN
kafka.sasl_username |
kafka.sasl_password |
kafka.api_verison_request | true
lic |
wal_segment_size | 16777216
wal_path | /home/cminer1/waldata
out_path | /home/cminer1/out_path
middle_path | /home/cminer1/middle_path
db_host | 192.168.0.64
db_port | 5432
source_name | 192.168.0.64:5432
db_name | itpuxdb
(20 rows)
-- 需要配置 lic
select cminer_id();
cminer_id
--------------------------
05656e733333000c29d43ff2
-- 根据生成的cminer_id从公司获取license
update cminer_settings set val='BJH1TSyeft37D4FQNA9O7NjIuoxKYwKf7Ni1uIQvq2p8iXfi4DeY5s59dZLljp6MoaU+j2dBCRqY34CjlPufJiBpcnSRiSR5vk1A3JVlnx/6zUAHboxKz2opfFPqz1Q+jRTkLEAw+cfYZLl4u8Zlix1fadz9rJlsk9ZWXSvaxufyEUfVu2FwE4tGpEU/NjaEp4/McvOcFArBsFh7ZycxV29Rfp/Ytz9OA5kejXipQJQWPpk1IY2mrI0EktSuc9TJZ5wqfrPh+G6exwx3ryWUYOilBs9gxVNxEFB8UV64Jkbq35yfDs6sYDK1NhS5vqwVDO992F15QTqKkWAjgfSUyByntMRGoKb2oGTLgw==' where key = 'lic';
第一次启动解码程序时,需要知道从哪儿开始解码,这个信息是配置在表cminer_decode_position中的,也可以手工配置。
开始时解码位置应该为空:
postgres=# select * from cminer_decode_position;
checkpoint_lsn | checkpoint_timeline | decode_lsn | decode_timeline | oldest_trans_begin_lsn | oldest_trans_timeline
----------------+---------------------+------------+-----------------+------------------------+-----------------------
| | | | |
(1 row)
注意cminer_decode_postion表是有一行记录的,千万不能删除掉此记录。第一次解码时,我们需要从WAL日志中的一个checkpoint点开始解码,当把这条记录的各个值都设置为空的时候,然后再允许CMiner提供的一个函数cminer_init_decode_lsn,就可以把这个初始的解码位置记录导此表中。
如果第一次解码,此行各列的值如果不为空,可以用下面的SQL,把各列的值设置为空:
UPDATE cminer_decode_position set checkpoint_lsn=null,checkpoint_timeline=null,decode_lsn=null,decode_timeline=null,oldest_trans_begin_lsn=null,oldest_trans_timeline=null;
设置为空后,在psql中运行函数cminer_init_decode_lsn就可以初始化解码的位置:
select cminer_init_decode_lsn();
运行完后,我们再查看cminer_decode_position表,可以看到初始化了一个解码的位置:
postgres=# select * from cminer_decode_position;
checkpoint_lsn | checkpoint_timeline | decode_lsn | decode_timeline | oldest_trans_begin_lsn | oldest_trans_timeline
----------------+---------------------+------------+-----------------+------------------------+-----------------------
0/1000028 | 1 | 0/1000028 | 1 | |
(1 row)
初始化完解码位置后,我们就可以启动解码进程了:
postgres=# select cminer_start_decode();
cminer_start_decode
---------------------
24651
(1 row)
上面的24651是解码程序的进程号,可以查询cminer_worker的视图看到正在运行的解码程序:
postgres=# select * from cminer_worker;
pid | application_name | backend_start | state | query
-------+----------------------+-------------------------------+-------+-----------------
13947 | cminer_decode_worker | 2022-05-11 22:34:37.936902+08 | idle | lsn: 0/01CAC508
当然也可以查询pg_stat_activity:
postgres=# select pid, application_name, state, query from pg_stat_activity;
pid | application_name | state | query
-------+----------------------+--------+-------------------------------------------------------------------
8852 | | |
8854 | | |
13645 | psql | active | select pid, application_name, state, query from pg_stat_activity;
13947 | cminer_decode_worker | idle | lsn: 0/01CAC508
8850 | | |
8849 | | |
8851 |
停止这个程序的方法是:
postgres=# select cminer_stop_decode();
对于output_mode=0的模式下,解码完的数据放在cminer_content:
postgres=# select * from cminer_content;
dboid | lsn | xid | opr | reloid | plain_text
-------+-----------+-----+-----+--------+---------------------------------------------------------------------------------
13212 | 0/2003E10 | 560 | I | | {"schema":"public","table":"test01","opr": "insert","new":{"id":1 ,"t":"1111"}}
(1 row)
对于output_mode=1的模式下,解码完的数据生成在放在参数out_path指定的目录:
[postgres@cbbackup_21 outdata]$ ls -l
-- 此时,我们可以在主库中插入几条新数据,测试解码是否可以成功
insert into itpux_yg values(18001,'itpux10001',29,'女','2010-01-04 15:35:32','陕西省','13184658741','itpux10001@itpux.com','技术工程师',12000,'技术部','天天2号','oxhdil ambyw oiv lub xzdm
epb dozu okepvxn wks tjspw bdvzdh nhss yqrm gyxv fcxv kktne etnv fpaoe kb');
insert into itpux_yg values(18002,'itpux10001',29,'女','2010-01-04 15:35:32','陕西省','13184658741','itpux10001@itpux.com','技术工程师',12000,'技术部','天天2号','oxhdil ambyw oiv lub xzdm
epb dozu okepvxn wks tjspw bdvzdh nhss yqrm gyxv fcxv kktne etnv fpaoe kb');
insert into itpux_yg values(18003,'itpux10001',29,'女','2010-01-04 15:35:32','陕西省','13184658741','itpux10001@itpux.com','技术工程师',12000,'技术部','天天2号','oxhdil ambyw oiv lub xzdm
epb dozu okepvxn wks tjspw bdvzdh nhss yqrm gyxv fcxv kktne etnv fpaoe kb');
解析出的内容:
[postgres@cbbackup_21 outdata]$ more cmd_00000000000000000001
{"source_name":"127.0.0.1.5431","dbname":"postgres","schema":"public","table":"test01","opr": "insert","new":{"id":1, "note":"3c4c4d9594fc961ae01d1
55a1cbe4bfd"}, "pk":{"id":1},"ts_ms":1650822901911, "timeline":1, "lsn":"0/40000148"}
{"source_name":"127.0.0.1.5431","dbname":"postgres","schema":"public","table":"test01","opr": "insert","new":{"id":2, "note":"0f78ceebfced16ba6ae32
44e4e18c100"}, "pk":{"id":2},"ts_ms":1650822901911, "timeline":1, "lsn":"0/40000248"}
查看解码的延迟可以用下面的SQL语句:
select cminer_decode_delay();
结果是延迟的WAL字节数,如果这个值不大(如没有超过1MB),说明基本没有问题。
4.4 把数据同步到kafka
把数据同步到kafka中,需要安装另一个插件cminer_kafka,同时启动cminer_kafka进程。
创建cminer_kafka:
create extension cminer_kafka;
注意需要安装包librdkafka的包,否则会报下面的错误:
postgres=# create extension cminer_kafka;
ERROR: could not load library "/home/postgres/pgsql-10/lib/cminer_kafka.so": librdkafka.so.1: cannot open shared object file: No such file or directory
yum install librdkafka.x86_64
按照完之后,可以在cminer_settings中看到kafka的配置:
postgres=# select * from cminer_settings;
key | val
----------------------------+------------------------
wal_path | /home/postgres/waldata
db_host | 127.0.0.1
db_port | 5432
db_user | repl
db_password | repl
db_name | postgres
max_rows_per_commit | 100
max_bytes_per_commit | 131072
kafka.max_bytes_per_commit | 131072
kafka.bootstrap_servers | 127.0.0.1:9092
kafka.security_protocol | sasl_plaintext
kafka.sasl_mechanisms | PLAIN
kafka.sasl_username |
kafka.sasl_password |
kafka.api_verison_request | true
(15 rows)
如果kafka的服务没有配置验证,只配置“kafka.bootstrap_servers”,把“ kafka.sasl_username”保持为NULL就可以了,如果配置了验证需要配置其他的配置项。
同时会多一张kafka的topic的配置表,数据库同步的表和kafka映射关系:
postgres=# \d cminer_kafka_topic;
Table "public.cminer_kafka_topic"
Column | Type | Collation | Nullable | Default
--------+------+-----------+----------+---------
dboid | oid | | |
nsoid | oid | | |
reloid | oid | | |
topic | text | | |
Indexes:
"idx_cminer_kafka_topic" btree (dboid, nsoid, reloid)
我们需要在这样配置表中配置各个要同步的表与kafka的topic的映射关系,例如我们配置:
insert into cminer_kafka_topic values(13212, 2200, 16386, 'topic_01');
insert into cminer_kafka_topic values(13212, 2200, 16404, 'topic_01');
如果不在cminer_kafka_topic中的表的,会直接跳过,在kafka中将不会产生数据。
启动kafka 插件
select cminer_start_kafka_replay();
可以查询视图cminer_worker获得cminer_kafka的状态:
postgres=# select * from cminer_worker;
pid | application_name | backend_start | state | query
-------+----------------------+-------------------------------+--------+------------------------------
23451 | cminer_decode_worker | 2020-10-15 20:11:32.892015+08 | active | lsn: A/8B0264F0
22159 | cminer_kafka_replay | 2020-10-15 19:56:51.438767+08 | active | lsn: A/8A2C5B48, kafka error
(2 rows)
如果上面的query字段中“kafka error”,如下所示:
lsn: A/8A2C5B48, kafka error
说明连接到kafka失败了。
如果是临时的kafka,用下面的命令可以消费kafka的数据:
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_01
监控kafka消费情况
/home/postgres/pgdata/cminer_stats
tail-f kafka_stats_yymmddhhmiss.log
监控cmienr同步性能情况
tail -f cminert_stats_20200915182926.log
用下面的SQL可以查看发送到kafka数据的位置:
select * from cminer_kafka_replay_position;
附录:kafka的一些命令:
启动kafka
/usr/local/kafka_2.12-2.6.0/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
/usr/local/kafka_2.12-2.6.0/bin/kafka-server-start.sh config/server.properties
查看kafka消费的数据信息
/usr/local/kafka_2.12-2.6.0/bin/kafka-console-consumer.sh -bootstrap-server 127.0.0.1:9092 --topic topic_01