CLup产品文档

往前插入
往后插入
删除

CLup+PL/Proxy实现微服务及读写分离的解决方案

1. 方案介绍

PL/Proxy 是一个数据路由的插件,可以方便的实现数据路由的功能。在一台数据库中安装PL/Proxy后,此台数据库就变成了一个中间件,此台中间件的数据库本身并不存储数据,而是后端的一些数据库中存储数据。当请求发送到PL/Proxy后,PL/Proxy根据设定的数据路由规则把请求转发到后端的数据库中,架构如下:

架构

PL/Proxy转发的请求不能是普通的SQL,必须是一个函数。

我们以一个简单的例子来说明,假设我们有一个业务有如下操作:

  • 增加用户,用户信息有 用户名称和email地址
  • 根据用户名称查询用户的email地址。

我们建立这张用户表:

  1. CREATE TABLE users(username text primary key, email text);

按PL/Proxy的要求,业务“新建用户”的逻辑需要放到一个函数中:

  1. CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
  2. RETURNS integer AS $$
  3. INSERT INTO users (username, email) VALUES ($1,$2);
  4. SELECT 1;
  5. $$ LANGUAGE SQL;

根据用户名查询用户的email也放到一个函数中:

  1. CREATE OR REPLACE FUNCTION get_user_email(i_username text)
  2. RETURNS TABLE (
  3. email text
  4. ) AS
  5. $$
  6. BEGIN
  7. RETURN QUERY
  8. SELECT email FROM users WHERE username = i_username;
  9. END;
  10. $$ LANGUAGE plpgsql;

假设我们数据库架构是一主两备的架构,函数insert_user只能在主库上执行,而get_user_email函数我们希望是负载均衡到各个备库上执行,这个对不同函数的路由到不同的主备库上的功能我们可以用PL/Proxy来实现,实现架构如下:

架构

上面的是架构师的一个逻辑部署图,后续的实际例子中实际我们并没有把PL/Proxy插件安装在单独的数据库中,而是直接安装在存数据的数据库实例中。

在PL/Proxy中,我们需要把主库编一个组叫write_cluster:

  1. CREATE SERVER write_cluster FOREIGN DATA WRAPPER plproxy
  2. OPTIONS (
  3. connection_lifetime '1800',
  4. disable_binary '1',
  5. p0 'dbname=businessdb host=192.168.56.79'
  6. );

上面的192.168.56.79是主库的VIP地址。

把2个备库也编一个组叫read_cluster

  1. CREATE SERVER read_cluster FOREIGN DATA WRAPPER plproxy
  2. OPTIONS (
  3. connection_lifetime '1800',
  4. disable_binary '1',
  5. p0 'dbname=businessdb host=192.168.56.72',
  6. p1 'dbname=businessdb host=192.168.56.73',
  7. );

insert_user的请求发送到组write_cluster中。而get_user_email的请求发送到组read_cluster中。

我们需要在PL/Proxy所在的数据库中建同名的PL/Proxy的函数insert_user和get_user_email:

  1. CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
  2. RETURNS integer AS $$
  3. CLUSTER 'write_cluster';
  4. RUN ON ANY;
  5. $$ LANGUAGE plproxy;
  1. CREATE OR REPLACE FUNCTION get_user_email(i_username text)
  2. RETURNS SETOF text AS $$
  3. CLUSTER 'read_cluster';
  4. RUN ON ANY;
  5. $$ LANGUAGE plproxy;

注意路由函数的名称、函数参数、返回值与原先的业务函数完全相同,但语言类型是plproxy,这样当我们调用plproxy数据库中的路由函数,路由函数会把请求根据路由规则转发到后端数据库中的实际函数。

路由函数的函数体的内容都是路由规则:

  • CLUSTER ‘write_cluster’; 把请求转发到write_cluster组中
  • CLUSTER ‘read_cluster’; 把请求转发到read_cluster组中
  • RUN ON ANY;转发到组中的任意一台机器上

更多的路由规则的语法请见:https://plproxy.github.io/syntax.html

路由函数insert_user中“CLUSTER ‘write_cluster’;”表明这个请求只发到主库中。
路由函数get_user_email中“CLUSTER ‘read_cluster’;”表明这个请求负载均衡的只发送到各个备库。

当然PL/Proxy还有很多其它的路由规则。可以详细见PL/Proxy的官方的手册。

2. 我们使用一个实际的例子来讲解完整的搭建过程

2.1 plproxy的示例环境说明

一台主库: 192.168.56.71 二台备库: 192.168.56.72~73 VIP为:192.168.56.79

这三台机器的主机名分别为:pg01,pg02,pg03

此例子中实际我们并没有把PL/Proxy插件安装在单独的数据库机器中,而是直接安装在存数据的数据库实例中。

2.2 安装plproxy

需要在每台机器上安装plproxy插件,到https://plproxy.github.io/downloads/files/2.10.0/plproxy-2.10.0.tar.gz
下载安装包。

为了让普通用户可以创建plproxy的路由函数,安装完plproxy后,把plproxy的sql脚本(源码包中sql/plproxy_lang.sql文件)中创建plproxy language的方式改成TRUSTED。因为对于非安全的语言只能是超级用户才可以创建此language的函数,原先的内容为:

  1. CREATE OR REPLACE LANGUAGE plproxy HANDLER plproxy_call_handler VALIDATOR plproxy_validator;

把上面的“CREATE OR REPLACE TRUSTED LANGUAGE plproxy”改成“CREATE OR REPLACE LANGUAGE plproxy”:

  1. CREATE OR REPLACE TRUSTED LANGUAGE plproxy HANDLER plproxy_call_handler VALIDATOR plproxy_validator;

2.3 配置plproxy

用于业务的数据库用户为u01,相应建用户的语句为:

  1. CREATE USER u01 password 'u01pwd';

存业务数据的数据库叫businessdb:

  1. CREATE DATABASE businessdb owner u01;

连接businessdb,把businessdb中的public这个schema的属主改为u01:

  1. alter schema public owner to u01;

建一个plproxy的代理库,名叫proxydb:

  1. CREATE DATABASE proxydb owner u01;

应用就连接这个proxydb。

用超级用户连接proxydb,装载plproxy的扩展:

  1. CREATE EXTENSION plproxy;

然后在proxydb中执行:

  1. GRANT ALL ON FOREIGN DATA WRAPPER plproxy to u01;
  2. GRANT USAGE ON LANGUAGE plproxy TO u01;

然后用u01用户连接数据库proxydb,然后在其中建读和写的plproxy的cluster:

  1. CREATE SERVER read_cluster FOREIGN DATA WRAPPER plproxy
  2. OPTIONS (
  3. connection_lifetime '1800',
  4. disable_binary '1',
  5. p0 'dbname=businessdb host=192.168.56.72',
  6. p1 'dbname=businessdb host=192.168.56.73'
  7. );
  8. CREATE SERVER write_cluster FOREIGN DATA WRAPPER plproxy
  9. OPTIONS (
  10. connection_lifetime '1800',
  11. disable_binary '1',
  12. p0 'dbname=businessdb host=192.168.56.79'
  13. );

上面内容中的“192.168.56.79”是主库的vip,但如果CLup没有启动,机器192.168.56.71上还没有这个vip,可以通过下面的命令把vip加上:

  1. ip addr add 192.168.56.79/32 dev eth0

上面命令中“eth0”是你的网卡名,如果实际的网卡名不是“eth0”,需要换成实际的网卡名。

建用户映射,以便plproxy能访问底层的数据节点:

  1. CREATE USER MAPPING FOR public SERVER read_cluster OPTIONS (user 'u01', password 'u01pwd');
  2. CREATE USER MAPPING FOR public SERVER write_cluster OPTIONS (user 'u01', password 'u01pwd');

下面假设有一张业务表users,此表记录了用户名,和用户的email

  1. CREATE TABLE users(username text primary key, email text);

业务的需求是:

  • 插入数据到users表中。
  • 根据用户名(username)查询出用户的email。

则我们应该在businessdb中把表users建立起来,在其中建一个函数insert_user来完成第一个需求:插入数据的功能:

  1. CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
  2. RETURNS integer AS $$
  3. INSERT INTO users (username, email) VALUES ($1,$2);
  4. SELECT 1;
  5. $$ LANGUAGE SQL;

在proxydb中建以下两个函数:

  1. --插数据的函数:
  2. CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
  3. RETURNS integer AS $$
  4. CLUSTER 'write_cluster';
  5. RUN ON ANY;
  6. $$ LANGUAGE plproxy;
  1. --查询数据的函数
  2. CREATE OR REPLACE FUNCTION get_user_email(i_username text)
  3. RETURNS SETOF text AS $$
  4. CLUSTER 'read_cluster';
  5. RUN ON ANY;
  6. SELECT email FROM users WHERE username = i_username;
  7. $$ LANGUAGE plproxy;

其中函数insert_user使用的是plproxy的“write_cluster”, 因为“write_cluster”中只有一台主库,所以插入数据的请求只会发到主库上。而“write_cluster”中用的是主库vip,所以不管哪台机器变成主库,请求都会发到这台机器上。

而函数get_user_email使用的是plproxy的‘read_cluster’,则请求会负载均衡的分发到“read_cluster”集群中的某一台机器上。

使用情况如下:

  1. proxydb=> select insert_user('osdba', 'osdba@163.com');
  2. insert_user
  3. -------------
  4. 1
  5. (1 row)
  6. proxydb=> select insert_user('chengfen', 'cf@163.com');
  7. insert_user
  8. -------------
  9. 1
  10. (1 row)
  11. proxydb=> select get_user_email('chengfen');
  12. get_user_email
  13. ----------------
  14. cf@163.com
  15. (1 row)
  16. proxydb=> select get_user_email('osdba');
  17. get_user_email
  18. ----------------
  19. osdba@163.com
  20. (1 row)

3. 与CLup的高可用功能集成

我们知道当主库坏掉后,一台备库会激活成主库,原先PL/Proxy的读负载均衡到两台备库上的,这时就需要改成读请求到剩下的一台备库上。

CLup可以在切换时回调一个数据库中的函数,我们写这么一个函数,然后在这个函数中修改PL/Proxy的切换后的路由即可。

回调函数为csha_update_plp_server:

  1. CREATE OR REPLACE FUNCTION csha_update_plp_server(in_type int, in_msg text, in_opr_ip text, in_before_cluster text, in_after_cluster text)
  2. RETURNS text AS
  3. $BODY$
  4. DECLARE
  5. v_ddl text;
  6. v_srv_opt text;
  7. v_ok_stb_db_array text[];
  8. v_plp_db_array text[];
  9. v_ok_stb_cnt int;
  10. v_plp_db_cnt int;
  11. v_db_list jsonb;
  12. v_curr_db_dict jsonb;
  13. v_pri_ip text;
  14. i int;
  15. k int;
  16. x text;
  17. rec RECORD;
  18. BEGIN
  19. v_srv_opt := '';
  20. FOR rec IN (select split_part(opt, '=', 1) as part from (select unnest(srvoptions) as opt from pg_foreign_server
  21. where srvname='read_cluster') t where t.opt like 'p%') LOOP
  22. IF length(v_srv_opt) =0 THEN
  23. v_srv_opt := 'drop '||rec.part;
  24. ELSE
  25. v_srv_opt := 'drop '||rec.part||','||v_srv_opt;
  26. END IF;
  27. END LOOP;
  28. v_db_list := (in_after_cluster::jsonb)->'db_list';
  29. select array_agg(db_dict->>'host') into v_ok_stb_db_array from (
  30. select value as db_dict from jsonb_array_elements(v_db_list) as t
  31. ) as t2
  32. where db_dict->>'state' = '1' and db_dict->>'is_primary' = '0';
  33. --RAISE WARNING 'v_db_list: %', v_db_list;
  34. --RAISE WARNING 'v_ok_stb_db_array: %', v_ok_stb_db_array;
  35. select db_dict->>'host' into v_pri_ip from (
  36. select value as db_dict from jsonb_array_elements(v_db_list) as t) as t2
  37. where db_dict->>'state' = '1' and db_dict->>'is_primary' = '1';
  38. select db_dict into v_curr_db_dict from (
  39. select value as db_dict from jsonb_array_elements(v_db_list) as t) as t2
  40. where db_dict->>'host' = in_opr_ip;
  41. v_ok_stb_cnt := array_length(v_ok_stb_db_array, 1);
  42. -- 因为plproxy中后端数必须是2n次方,所以必须把传进来的db_list进行循环补齐,以满足2n次方
  43. v_plp_db_cnt := ceil(log(2, v_ok_stb_cnt));
  44. v_plp_db_cnt := 1 << v_plp_db_cnt;
  45. k := 1;
  46. FOR i IN 1..v_plp_db_cnt LOOP
  47. v_plp_db_array := v_plp_db_array || v_ok_stb_db_array[k];
  48. k := k + 1;
  49. IF k > v_ok_stb_cnt THEN
  50. k := 1;
  51. END IF;
  52. END LOOP;
  53. -- 如果没有任何一个备库还是正常的,只把查询指向主库
  54. IF array_length(v_plp_db_array, 1) = 0 THEN
  55. v_plp_db_array := v_plp_db_array || v_pri_ip;
  56. END IF;
  57. i := 0;
  58. FOREACH x IN ARRAY v_plp_db_array
  59. LOOP
  60. v_srv_opt = v_srv_opt || format(',add p%s ''dbname=businessdb host=%s''', i, v_plp_db_array[i+1]);
  61. i := i + 1;
  62. END LOOP;
  63. v_ddl = 'ALTER SERVER read_cluster OPTIONS('|| v_srv_opt ||');';
  64. EXECUTE v_ddl;
  65. return v_ddl;
  66. END;
  67. $BODY$
  68. LANGUAGE 'plpgsql';

此函数的参数说明如下:

  • in_type: 类型,当集群上线时也会调用此函数,但此时参数传入的值为0;当发现是备库故障时,此参数传入为1;当发现是主库故障时,此参数传入为2;当是手工切换数据库,此值输入的为3,当是把坏的节点加回时,此参数传入的是4
  • in_msg: 故障一些信息
  • in_opr_ip:如果是故障切换,此值为故障节点的ip,如果是人工切换,则是旧节点IP,如果是把节点加入集群是这个节点的ip
  • in_before_cluster: 切换前的集群信息
  • in_after_cluster: 切换后的集群信息

实际使用是请把函数定义中的where srvname=’read_cluster’中的plproxy中的只读服务名换成实际的服务名,把dbname=businessdb中的“businessdb”换成实际的业务数据库的名称。

上面的in_before_cluster、in_after_cluster的集群信息是一个json串,格式大致如下:

  1. {
  2. "cluster_id": 19,
  3. "cluster_type": 1,
  4. "state": 0,
  5. "lock_time": 0,
  6. "cluster_name": "srcluster01",
  7. "vip": "192.168.56.79",
  8. "port": 5432,
  9. "pgdata": "/home/postgres/pgdata",
  10. "remark": "sr cluster",
  11. "cstlb_list": "",
  12. "read_vip_host": "",
  13. "read_vip": "",
  14. "trigger_db_name": "proxydb",
  15. "trigger_db_func": "csha_update_plp_server",
  16. "ha_db_user": "postgres",
  17. "ha_db_pass": "postgres",
  18. "probe_db_name": "cs_sys_ha",
  19. "probe_interval": "10",
  20. "probe_timeout": "10",
  21. "db_repl_user": "postgres",
  22. "db_repl_pass": "postgres",
  23. "probe_pri_sql": "UPDATE cs_sys_heartbeat SET hb_time = now()",
  24. "probe_stb_sql": "select 1",
  25. "db_list": [{
  26. "id": 1,
  27. "host": "192.168.56.71",
  28. "port": 5432,
  29. "state": 1,
  30. "pgdata": "/home/postgres/pgdata",
  31. "cluster_id": 19,
  32. "is_primary": 1,
  33. "repl_app_name": "stb71"
  34. }, {
  35. "id": 2,
  36. "host": "192.168.56.72",
  37. "port": 5432,
  38. "state": 1,
  39. "pgdata": "/home/postgres/pgdata",
  40. "cluster_id": 19,
  41. "is_primary": 0,
  42. "repl_app_name": "stb72"
  43. }, {
  44. "id": 3,
  45. "host": "192.168.56.73",
  46. "port": 5432,
  47. "state": 1,
  48. "pgdata": "/home/postgres/pgdata",
  49. "cluster_id": 19,
  50. "is_primary": 0,
  51. "repl_app_name": "stb73"
  52. }]
  53. }

我们到CLup管理界面的菜单HA集群->集群定义中,点集群列表中的编辑按钮,到流复制集群的配置界面中:

上图中把配置项“触发DB名称”设置为“proxydb”,把触发函数设置为“csha_update_plp_server”,这样当发生故障切换后,clup会调用此函数csha_update_plp_server去更新plproxy中的配置。