首页
产品
CLup:PostgreSQL高可用集群平台CData高性能数据库云一体机CBackup数据库备份恢复云平台CPDA高性能双子星数据库机
解决方案
数据库专业技术服务全栈式PostgreSQL解决方案Oracle分布式存储化数据库云
文章
客户及伙伴
中启开源
关于我们
公司简介 联系我们
中启开源
往前插入
往后插入
删除

安装过程

1. 创建cminer的用户

新建cminer需要的用户:

  1. groupadd -g 581 cminer1
  2. useradd -m -g cminer1 -u 581 cminer1

如果这台机器上要创建多套cminer,则需要见不同的用户,如cminer2、cminer3等等。例如建cminer2用户的命令如下

  1. groupadd -g 582 cminer2
  2. useradd -m -g cminer2 -u 582 cminer2

把cminer的安装包 cminer_pg14.tar.xz 拷贝到/home/cminer1,以用户cminer1登陆机器,解压cminer.tar.xz:

  1. cd /home/cminer1
  2. tar -xf cminer2.0_pg14.tar.gz

如果有多个cminer2、cminer3等用户,需要分别在cminer2和cminer3等用户下分别执行上面的动作。

ls 查看

  1. [cminer1@cbbackup_21 ~]$ pwd
  2. /home/cminer1
  3. [cminer2@cbbackup_21 ~]$ ls -l
  4. drwxr-xr-x. 2 cminer1 cminer1 198 May 11 17:37 bin
  5. drwxrwxr-x. 2 cminer1 cminer1 36 May 11 22:13 log
  6. drwx------. 20 cminer1 cminer1 4096 May 2 22:14 pgdata14.2_16
  7. drwx------. 20 cminer1 cminer1 4096 May 12 13:49 pgdata14.2_8
  8. drwxr-xr-x. 6 cminer1 cminer1 56 Apr 23 00:59 pgsql-14.2_16
  9. drwxr-xr-x. 6 cminer1 cminer1 56 Apr 23 01:08 pgsql-14.2_8
  10. 说明 : 如果目标库的块大小(show block_size;查看)为8K , 则使用 pgdata14.2_8/pgsql-14.2_8
  11. 如果目标库的块大小为16K ,使用pgdata14.2_16/pgsql-14.2_16
  12. 通过配置环境变量 ~/.bashrc 来使用不同的库

检查用户cminer1的.bashrc:

  1. 这里目标库块大小为8K
  2. vi ~/.bashrc
  3. # User specific aliases and functions
  4. export PGDATA=~/pgdata14.2_8
  5. export PATH=~/pgsql-14.2_8/bin:$PATH
  6. export LD_LIBRARY_PATH=~/pgsql-14.2_8/lib:$LD_LIBRARY_PATH
  7. export PGHOST=/tmp
  8. export PGUSER=cminer2
  9. export PGDATABASE=postgres
  10. export LANG=en_US.UTF-8
  11. export PGPORT=2400

程序默认的端口为2400,如果想配置为其他端口,请修改上面文件中的export PGPORT=2400以及文件“~/pgdata/postgresql.conf”中的端口配置。

查看~/.pgpass密码文件,ip地址是要解码的源库的IP地址,用户密码是要解码的源库中的一个用户,通常是需要在要解码的源库中创建的用户。database名称固定填写replication,CMiner程序是通过流复制协议上面配置的IP地址以及用户密码去连接源库,把WAL日志拉导本地进行解码的。

  1. 192.168.0.64:5432:replication:cminer:cminer

如果有多个cminer2、cminer3等用户,需要分别在cminer2和cminer3等用户下分别执行上面的操作。

另还需要看压缩包中是否有readme.md,如果有请按readme.md中的内容进行完善安装需要的其他步骤。

2 安装python环境

cminer的高可用切换使用到了python,而cminer提供了自己的python安装包,并使用自己的python环境。所以cminer并不依赖操作系统上的python环境。

在/opt目录下解压cminer的python软件包:

  1. cd /opt
  2. tar xvf cminer_python3.6.tar.xz

3 在主数据库上的操作

需要在主库上建一个用户,让cminer程序可以通过这个用户到主库上拉取WAL日志和能读取一些系统表:

  1. create user cminer password 'cminer' replication;

注意实际安装时需要把上面的密码“cminer”改成一个安全的不容易被其他人猜测出来的密码。注意同时需要修改在CMiner机器上的cminer1用户下的.pgpass中的密码。

一般情况下,创建的用户都用查询系统表pg_class、pg_attribute、pg_database、pg_namespace、pg_index这些表的权限。如果你的数据库不是官方发布的版本,做过特殊的设置,导致这个用户无法查询这些系统表,则需要给这个用户赋权。
检查cminer用户是否可以查询上面这些系统表:

  1. psql -h 127.0.0.1 -Ucminer postgres

在psql中执行下面的SQL检查cminer用户是否有查询系统表的权限:

  1. select count(*) from pg_class;
  2. select count(*) from pg_attribute;
  3. select * from pg_database;
  4. select * from pg_namespace;
  5. select count(*) pg_index;

如果上面的查询可以正常运行,则不需要额外赋权。
如果不能查询,请给这个用户赋权,保证这个用户可以查询pg_class、pg_attribute、pg_database、pg_namespace、pg_index。

4. 快速配置和运行

4.1 配置

启动CMiner,CMiner本身是一个PostgreSQL数据库,启动这个数据库:

  1. su - cminer1
  2. pg_ctl start

用psql进入到数据库中,可以看到一些以cminer开头的表:

  1. postgres=# \d
  2. List of relations
  3. Schema | Name | Type | Owner
  4. --------+------------------------+-------+----------
  5. public | cminer_checkpoint | table | postgres
  6. public | cminer_content | table | postgres
  7. public | cminer_decode_position | table | postgres
  8. public | cminer_kafka_topic | table | postgres
  9. public | cminer_pg_attribute | table | postgres
  10. public | cminer_pg_class | table | postgres
  11. public | cminer_pg_database | table | postgres
  12. public | cminer_pg_namespace | table | postgres
  13. public | cminer_pkdef | table | postgres
  14. public | cminer_settings | table | postgres
  15. public | cminer_toast_attr | table | postgres
  16. public | cminer_toast_data | table | postgres
  17. public | cminer_transaction | table | postgres

运行之前需要对表cminer_settings做配置:

  1. postgres=# select * from cminer_settings ;
  2. key | val
  3. ----------------------------+----------------------------
  4. wal_path | /home/cminer1/waldata
  5. db_host | 127.0.0.1
  6. db_port | 5432
  7. db_user | repl
  8. db_password | repl
  9. db_name | postgres
  10. max_rows_per_commit | 100
  11. max_bytes_per_commit | 131072
  12. out_path | /home/cminer1/outdata
  13. middle_path | /home/cminer1/middle_data
  14. output_mode | 1
  15. wal_segment_size | 16777216
  16. kafka.max_bytes_per_commit | 131072
  17. kafka.max_rows_per_commit | 1000
  18. kafka.bootstrap_servers | 127.0.0.1:9092
  19. kafka.security_protocol | sasl_plaintext
  20. kafka.sasl_mechanisms | PLAIN
  21. kafka.sasl_username |
  22. kafka.sasl_password |
  23. kafka.api_verison_request | true
  24. source_name | 127.0.0.1:5432
  25. (21 rows)

上面的配置项说明:

修改相应的配置可以使用update语句例如:

  1. update cminer_settings set val='/home/cminer1/waldata' where key='wal_path';
  2. update cminer_settings set val='/home/cminer1/outdata' where key='out_path';
  3. update cminer_settings set val='/home/cminer1/middledata' where key='middle_path';
  4. update cminer_settings set val='192.168.0.64' where key='db_host';
  5. update cminer_settings set val='5432' where key='db_port';
  6. update cminer_settings set val='192.168.0.64:5432' where key='source_name';

通常我们在主库上建立一个有流复制权限的用户:

  1. create user cminer password 'cminer' replication;

把表cminer_settings中的这些连接参数改成实际的值。

然后把要同步的表配置到表cminer_decode_table_def,如要同步itpuxdb数据库中schema名称为public下的表 itpux_yg,则执行下面的SQL:

  1. insert into cminer_decode_table_def(datname,nspname,relname) values('itpuxdb', 'public','itpux_yg');

注意要同步的表必须有主键,否则无法同步。

运行之前需要建数据字典,这是因为WAL日志中没有记录表的名称和列的名称,只记录了一个表名称和列名称的内部数字ID,我们的解码程序需要通过这个字典把这些数字翻译成真实的表名和列名,在psql中:

  1. select cminer_build_dictionary();
  2. DETAIL: connection to server at "192.168.0.64", port 5432 failed: FATAL: no pg_hba.conf entry for host "192.168.0.66", user "cminer", database "itpuxdb", no encryption
  3. CONTEXT: PL/pgSQL function cminer_build_dictionary() line 29 at assignment
  4. 连接如果报错 , 需要在主库配置cminer机器192.168.0.66 , pg_hba.conf 配置
  5. ---------------------------------------------
  6. host all cminer 192.168.0.66/24 md5
  7. host replication cminer 192.168.0.66/24 md5
  8. ---------------------------------------------
  9. pg_ctl reload
  10. 再次执行如果报错
  11. ERROR: duplicate key value violates unique constraint "cminer_pg_database_pkey"
  12. 说明之前拉取过数据字典 , 需要先清空
  13. truncate table cminer_pg_attribute,cminer_pg_class,cminer_pg_database,cminer_pg_namespace,cminer_pkdef;
  14. select cminer_build_dictionary();
  15. NOTICE: connect: hostaddr=192.168.0.64 port=5432 dbname=itpuxdb user=cminer password=cminer
  16. cminer_build_dictionary
  17. -------------------------
  18. (1 row)
  19. -- 返回这个结果说明成功

4.2 启动从主库实时拉WAL日志的程序

启动这个程序之前请保证连接到主库的密码已经配置到了.pgpass文件中,同时在配置表cminer_settings中的主库的IP地址、用户名、密码以及存放WAL的路径都配置正确了,如果没有配置正确,请修改,例如用下面的SQL修改存放WAL的路径:

  1. update cminer_settings set val='/home/cminer1/waldata' where key='wal_path';

这个程序实际上是一个python脚本,它最终调用pg_receivewal命令去主库拉WAL,可以先用pg_receivewal测试是否可以从主库上拉WAL日志:

  1. -- 先新建所需的目录
  2. mkdir -p /home/cminer1/waldata
  3. mkdir -p /home/cminer1/outdata
  4. mkdir -p /home/cminer1/middle_data
  5. pg_receivewal -h 192.168.0.64 -p 5432 -Ucminer -D /home/cminer1/waldata

如果用pg_receivewal命令拉日志没有问题了,就可以启动这个程序了,启动方法是运行脚本cminer_start_receive_wal.sh:

  1. cminer_start_receive_wal.sh

注意运行上面这个脚本之前需要修改

  1. vi ~/bin/cminer_receive_wal.py
  2. ------------------------------------------
  3. g_pg_root = '/home/cminer1/pgsql-14.2_8'
  4. def connect_db():
  5. db_name = 'postgres'
  6. db_host = '192.168.0.66'
  7. db_port = 2400
  8. db_user = 'cminer2'
  9. db_pass = 'cminer2' #这里的是cminer库的超级用户和密码
  10. ------------------------------------------

这个程序是以后台的方式运行。用ps命令看进程是否运行起来了:

  1. ps -ef|grep cminer_receive_wal |grep -v grep
  2. ps -ef|grep pg_receivewal |grep -v grep

如果不满足两个进程都正常运行,说明运行错误,检查日志 ~/log/cminer_receive_wal.log

实际可以用pg_receivewal直接收WAL日志,但我们提供的这个接受日志的程序提供了主库HA切换后,自动修改新主库的解码点,自动开始解码的功能。
通常数据库的HA功能是通过流复制搭建了1主多备的数据库+VIP的模式实现,当数据库发生HA切换后,把一台备库激活为新主库,VIP也漂移到新主库上,这是新主库的WAL的日志点会变化,而我们的这个程序可以自动的让解码进程从新主库的WAL日志点继续解码。

4.3 启动解码程序

程序默认的解码程序是把解码后的数据放入一个文件:

  1. [postgres@pg01 ~]$ ls -l /home/postgres/outdata
  2. total 31748
  3. -rw------- 1 postgres postgres 1652845 Sep 28 11:22 cmd_00000000000000000001
  4. -rw------- 1 postgres postgres 1852716 Sep 28 12:03 cmd_00000000000000000002
  5. -rw------- 1 postgres postgres 17910079 Sep 28 12:04 cmd_00000000000000000003
  6. -rw------- 1 postgres postgres 882112 Sep 28 11:22 cmi_00000000000000000001
  7. -rw------- 1 postgres postgres 959024 Sep 28 12:03 cmi_00000000000000000002
  8. -rw------- 1 postgres postgres 9239824 Sep 28 12:04 cmi_00000000000000000003

这种模式是配置output_mode=1。

然后cminer_kafka进程从这个文件中读取数据,然后发送到kafka中。

还有一种解码模式,是把解码后的数据放到数据库中,这种模式是配置output_mode=0,主要做为调试使用。修改模式的方法如下:

  1. update cminer_settings set val='0' where key='output_mode';

启动解码程序需要配置一些参数,实例参数如下:

  1. postgres=# select * from cminer_settings ;
  2. output_mode | 1
  3. db_user | cminer
  4. db_password | cminer
  5. max_rows_per_commit | 100
  6. max_bytes_per_commit | 131072
  7. kafka.max_bytes_per_commit | 131072
  8. kafka.max_rows_per_commit | 1000
  9. kafka.bootstrap_servers | 127.0.0.1:9092
  10. kafka.security_protocol | sasl_plaintext
  11. kafka.sasl_mechanisms | PLAIN
  12. kafka.sasl_username |
  13. kafka.sasl_password |
  14. kafka.api_verison_request | true
  15. lic |
  16. wal_segment_size | 16777216
  17. wal_path | /home/cminer1/waldata
  18. out_path | /home/cminer1/out_path
  19. middle_path | /home/cminer1/middle_path
  20. db_host | 192.168.0.64
  21. db_port | 5432
  22. source_name | 192.168.0.64:5432
  23. db_name | itpuxdb
  24. (20 rows)
  25. -- 需要配置 lic
  26. select cminer_id();
  27. cminer_id
  28. --------------------------
  29. 05656e733333000c29d43ff2
  30. -- 根据生成的cminer_id从公司获取license
  31. update cminer_settings set val='BJH1TSyeft37D4FQNA9O7NjIuoxKYwKf7Ni1uIQvq2p8iXfi4DeY5s59dZLljp6MoaU+j2dBCRqY34CjlPufJiBpcnSRiSR5vk1A3JVlnx/6zUAHboxKz2opfFPqz1Q+jRTkLEAw+cfYZLl4u8Zlix1fadz9rJlsk9ZWXSvaxufyEUfVu2FwE4tGpEU/NjaEp4/McvOcFArBsFh7ZycxV29Rfp/Ytz9OA5kejXipQJQWPpk1IY2mrI0EktSuc9TJZ5wqfrPh+G6exwx3ryWUYOilBs9gxVNxEFB8UV64Jkbq35yfDs6sYDK1NhS5vqwVDO992F15QTqKkWAjgfSUyByntMRGoKb2oGTLgw==' where key = 'lic';

第一次启动解码程序时,需要知道从哪儿开始解码,这个信息是配置在表cminer_decode_position中的,也可以手工配置。
开始时解码位置应该为空:

  1. postgres=# select * from cminer_decode_position;
  2. checkpoint_lsn | checkpoint_timeline | decode_lsn | decode_timeline | oldest_trans_begin_lsn | oldest_trans_timeline
  3. ----------------+---------------------+------------+-----------------+------------------------+-----------------------
  4. | | | | |
  5. (1 row)

注意cminer_decode_postion表是有一行记录的,千万不能删除掉此记录。第一次解码时,我们需要从WAL日志中的一个checkpoint点开始解码,当把这条记录的各个值都设置为空的时候,然后再允许CMiner提供的一个函数cminer_init_decode_lsn,就可以把这个初始的解码位置记录导此表中。

如果第一次解码,此行各列的值如果不为空,可以用下面的SQL,把各列的值设置为空:

  1. UPDATE cminer_decode_position set checkpoint_lsn=null,checkpoint_timeline=null,decode_lsn=null,decode_timeline=null,oldest_trans_begin_lsn=null,oldest_trans_timeline=null;

设置为空后,在psql中运行函数cminer_init_decode_lsn就可以初始化解码的位置:

  1. select cminer_init_decode_lsn();

运行完后,我们再查看cminer_decode_position表,可以看到初始化了一个解码的位置:

  1. postgres=# select * from cminer_decode_position;
  2. checkpoint_lsn | checkpoint_timeline | decode_lsn | decode_timeline | oldest_trans_begin_lsn | oldest_trans_timeline
  3. ----------------+---------------------+------------+-----------------+------------------------+-----------------------
  4. 0/1000028 | 1 | 0/1000028 | 1 | |
  5. (1 row)

初始化完解码位置后,我们就可以启动解码进程了:

  1. postgres=# select cminer_start_decode();
  2. cminer_start_decode
  3. ---------------------
  4. 24651
  5. (1 row)

上面的24651是解码程序的进程号,可以查询cminer_worker的视图看到正在运行的解码程序:

  1. postgres=# select * from cminer_worker;
  2. pid | application_name | backend_start | state | query
  3. -------+----------------------+-------------------------------+-------+-----------------
  4. 13947 | cminer_decode_worker | 2022-05-11 22:34:37.936902+08 | idle | lsn: 0/01CAC508

当然也可以查询pg_stat_activity:

  1. postgres=# select pid, application_name, state, query from pg_stat_activity;
  2. pid | application_name | state | query
  3. -------+----------------------+--------+-------------------------------------------------------------------
  4. 8852 | | |
  5. 8854 | | |
  6. 13645 | psql | active | select pid, application_name, state, query from pg_stat_activity;
  7. 13947 | cminer_decode_worker | idle | lsn: 0/01CAC508
  8. 8850 | | |
  9. 8849 | | |
  10. 8851 |

停止这个程序的方法是:

  1. postgres=# select cminer_stop_decode();

对于output_mode=0的模式下,解码完的数据放在cminer_content:

  1. postgres=# select * from cminer_content;
  2. dboid | lsn | xid | opr | reloid | plain_text
  3. -------+-----------+-----+-----+--------+---------------------------------------------------------------------------------
  4. 13212 | 0/2003E10 | 560 | I | | {"schema":"public","table":"test01","opr": "insert","new":{"id":1 ,"t":"1111"}}
  5. (1 row)

对于output_mode=1的模式下,解码完的数据生成在放在参数out_path指定的目录:

  1. [postgres@cbbackup_21 outdata]$ ls -l
  2. -- 此时,我们可以在主库中插入几条新数据,测试解码是否可以成功
  3. insert into itpux_yg values(18001,'itpux10001',29,'女','2010-01-04 15:35:32','陕西省','13184658741','itpux10001@itpux.com','技术工程师',12000,'技术部','天天2号','oxhdil ambyw oiv lub xzdm
  4. epb dozu okepvxn wks tjspw bdvzdh nhss yqrm gyxv fcxv kktne etnv fpaoe kb');
  5. insert into itpux_yg values(18002,'itpux10001',29,'女','2010-01-04 15:35:32','陕西省','13184658741','itpux10001@itpux.com','技术工程师',12000,'技术部','天天2号','oxhdil ambyw oiv lub xzdm
  6. epb dozu okepvxn wks tjspw bdvzdh nhss yqrm gyxv fcxv kktne etnv fpaoe kb');
  7. insert into itpux_yg values(18003,'itpux10001',29,'女','2010-01-04 15:35:32','陕西省','13184658741','itpux10001@itpux.com','技术工程师',12000,'技术部','天天2号','oxhdil ambyw oiv lub xzdm
  8. epb dozu okepvxn wks tjspw bdvzdh nhss yqrm gyxv fcxv kktne etnv fpaoe kb');

解析出的内容:

  1. [postgres@cbbackup_21 outdata]$ more cmd_00000000000000000001
  2. {"source_name":"127.0.0.1.5431","dbname":"postgres","schema":"public","table":"test01","opr": "insert","new":{"id":1, "note":"3c4c4d9594fc961ae01d1
  3. 55a1cbe4bfd"}, "pk":{"id":1},"ts_ms":1650822901911, "timeline":1, "lsn":"0/40000148"}
  4. {"source_name":"127.0.0.1.5431","dbname":"postgres","schema":"public","table":"test01","opr": "insert","new":{"id":2, "note":"0f78ceebfced16ba6ae32
  5. 44e4e18c100"}, "pk":{"id":2},"ts_ms":1650822901911, "timeline":1, "lsn":"0/40000248"}

查看解码的延迟可以用下面的SQL语句:

  1. select cminer_decode_delay();

结果是延迟的WAL字节数,如果这个值不大(如没有超过1MB),说明基本没有问题。

4.4 把数据同步到kafka

把数据同步到kafka中,需要安装另一个插件cminer_kafka,同时启动cminer_kafka进程。

创建cminer_kafka:

  1. create extension cminer_kafka;

注意需要安装包librdkafka的包,否则会报下面的错误:

  1. postgres=# create extension cminer_kafka;
  2. ERROR: could not load library "/home/postgres/pgsql-10/lib/cminer_kafka.so": librdkafka.so.1: cannot open shared object file: No such file or directory
  1. yum install librdkafka.x86_64

按照完之后,可以在cminer_settings中看到kafka的配置:

  1. postgres=# select * from cminer_settings;
  2. key | val
  3. ----------------------------+------------------------
  4. wal_path | /home/postgres/waldata
  5. db_host | 127.0.0.1
  6. db_port | 5432
  7. db_user | repl
  8. db_password | repl
  9. db_name | postgres
  10. max_rows_per_commit | 100
  11. max_bytes_per_commit | 131072
  12. kafka.max_bytes_per_commit | 131072
  13. kafka.bootstrap_servers | 127.0.0.1:9092
  14. kafka.security_protocol | sasl_plaintext
  15. kafka.sasl_mechanisms | PLAIN
  16. kafka.sasl_username |
  17. kafka.sasl_password |
  18. kafka.api_verison_request | true
  19. (15 rows)

如果kafka的服务没有配置验证,只配置“kafka.bootstrap_servers”,把“ kafka.sasl_username”保持为NULL就可以了,如果配置了验证需要配置其他的配置项。

同时会多一张kafka的topic的配置表,数据库同步的表和kafka映射关系:

  1. postgres=# \d cminer_kafka_topic;
  2. Table "public.cminer_kafka_topic"
  3. Column | Type | Collation | Nullable | Default
  4. --------+------+-----------+----------+---------
  5. dboid | oid | | |
  6. nsoid | oid | | |
  7. reloid | oid | | |
  8. topic | text | | |
  9. Indexes:
  10. "idx_cminer_kafka_topic" btree (dboid, nsoid, reloid)

我们需要在这样配置表中配置各个要同步的表与kafka的topic的映射关系,例如我们配置:

  1. insert into cminer_kafka_topic values(13212, 2200, 16386, 'topic_01');
  2. insert into cminer_kafka_topic values(13212, 2200, 16404, 'topic_01');

如果不在cminer_kafka_topic中的表的,会直接跳过,在kafka中将不会产生数据。

启动kafka 插件

  1. select cminer_start_kafka_replay();

可以查询视图cminer_worker获得cminer_kafka的状态:

  1. postgres=# select * from cminer_worker;
  2. pid | application_name | backend_start | state | query
  3. -------+----------------------+-------------------------------+--------+------------------------------
  4. 23451 | cminer_decode_worker | 2020-10-15 20:11:32.892015+08 | active | lsn: A/8B0264F0
  5. 22159 | cminer_kafka_replay | 2020-10-15 19:56:51.438767+08 | active | lsn: A/8A2C5B48, kafka error
  6. (2 rows)

如果上面的query字段中“kafka error”,如下所示:

  1. lsn: A/8A2C5B48, kafka error

说明连接到kafka失败了。

如果是临时的kafka,用下面的命令可以消费kafka的数据:

  1. bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic topic_01

监控kafka消费情况

  1. /home/postgres/pgdata/cminer_stats
  2. tail-f kafka_stats_yymmddhhmiss.log

监控cmienr同步性能情况

  1. tail -f cminert_stats_20200915182926.log

用下面的SQL可以查看发送到kafka数据的位置:

  1. select * from cminer_kafka_replay_position;

附录:kafka的一些命令:

  1. 启动kafka
  2. /usr/local/kafka_2.12-2.6.0/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
  3. /usr/local/kafka_2.12-2.6.0/bin/kafka-server-start.sh config/server.properties
  4. 查看kafka消费的数据信息
  5. /usr/local/kafka_2.12-2.6.0/bin/kafka-console-consumer.sh -bootstrap-server 127.0.0.1:9092 --topic topic_01