CLup+PL/Proxy实现微服务及读写分离的解决方案
1. 方案介绍
PL/Proxy 是一个数据路由的插件,可以方便的实现数据路由的功能。在一台数据库中安装PL/Proxy后,此台数据库就变成了一个中间件,此台中间件的数据库本身并不存储数据,而是后端的一些数据库中存储数据。当请求发送到PL/Proxy后,PL/Proxy根据设定的数据路由规则把请求转发到后端的数据库中,架构如下:
PL/Proxy转发的请求不能是普通的SQL,必须是一个函数。
我们以一个简单的例子来说明,假设我们有一个业务有如下操作:
- 增加用户,用户信息有 用户名称和email地址
- 根据用户名称查询用户的email地址。
我们建立这张用户表:
CREATE TABLE users(username text primary key, email text);
按PL/Proxy的要求,业务“新建用户”的逻辑需要放到一个函数中:
CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)RETURNS integer AS $$INSERT INTO users (username, email) VALUES ($1,$2);SELECT 1;$$ LANGUAGE SQL;
根据用户名查询用户的email也放到一个函数中:
CREATE OR REPLACE FUNCTION get_user_email(i_username text)RETURNS TABLE (email text) AS$$BEGINRETURN QUERYSELECT email FROM users WHERE username = i_username;END;$$ LANGUAGE plpgsql;
假设我们数据库架构是一主两备的架构,函数insert_user只能在主库上执行,而get_user_email函数我们希望是负载均衡到各个备库上执行,这个对不同函数的路由到不同的主备库上的功能我们可以用PL/Proxy来实现,实现架构如下:
上面的是架构师的一个逻辑部署图,后续的实际例子中实际我们并没有把PL/Proxy插件安装在单独的数据库中,而是直接安装在存数据的数据库实例中。
在PL/Proxy中,我们需要把主库编一个组叫write_cluster:
CREATE SERVER write_cluster FOREIGN DATA WRAPPER plproxyOPTIONS (connection_lifetime '1800',disable_binary '1',p0 'dbname=businessdb host=192.168.56.79');
上面的192.168.56.79是主库的VIP地址。
把2个备库也编一个组叫read_cluster
CREATE SERVER read_cluster FOREIGN DATA WRAPPER plproxyOPTIONS (connection_lifetime '1800',disable_binary '1',p0 'dbname=businessdb host=192.168.56.72',p1 'dbname=businessdb host=192.168.56.73',);
insert_user的请求发送到组write_cluster中。而get_user_email的请求发送到组read_cluster中。
我们需要在PL/Proxy所在的数据库中建同名的PL/Proxy的函数insert_user和get_user_email:
CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)RETURNS integer AS $$CLUSTER 'write_cluster';RUN ON ANY;$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION get_user_email(i_username text)RETURNS SETOF text AS $$CLUSTER 'read_cluster';RUN ON ANY;$$ LANGUAGE plproxy;
注意路由函数的名称、函数参数、返回值与原先的业务函数完全相同,但语言类型是plproxy,这样当我们调用plproxy数据库中的路由函数,路由函数会把请求根据路由规则转发到后端数据库中的实际函数。
路由函数的函数体的内容都是路由规则:
- CLUSTER ‘write_cluster’; 把请求转发到write_cluster组中
- CLUSTER ‘read_cluster’; 把请求转发到read_cluster组中
- RUN ON ANY;转发到组中的任意一台机器上
更多的路由规则的语法请见:https://plproxy.github.io/syntax.html
路由函数insert_user中“CLUSTER ‘write_cluster’;”表明这个请求只发到主库中。
路由函数get_user_email中“CLUSTER ‘read_cluster’;”表明这个请求负载均衡的只发送到各个备库。
当然PL/Proxy还有很多其它的路由规则。可以详细见PL/Proxy的官方的手册。
2. 我们使用一个实际的例子来讲解完整的搭建过程
2.1 plproxy的示例环境说明
一台主库: 192.168.56.71 二台备库: 192.168.56.72~73 VIP为:192.168.56.79
这三台机器的主机名分别为:pg01,pg02,pg03
此例子中实际我们并没有把PL/Proxy插件安装在单独的数据库机器中,而是直接安装在存数据的数据库实例中。
2.2 安装plproxy
需要在每台机器上安装plproxy插件,到https://plproxy.github.io/downloads/files/2.10.0/plproxy-2.10.0.tar.gz
下载安装包。
为了让普通用户可以创建plproxy的路由函数,安装完plproxy后,把plproxy的sql脚本(源码包中sql/plproxy_lang.sql文件)中创建plproxy language的方式改成TRUSTED。因为对于非安全的语言只能是超级用户才可以创建此language的函数,原先的内容为:
CREATE OR REPLACE LANGUAGE plproxy HANDLER plproxy_call_handler VALIDATOR plproxy_validator;
把上面的“CREATE OR REPLACE TRUSTED LANGUAGE plproxy”改成“CREATE OR REPLACE LANGUAGE plproxy”:
CREATE OR REPLACE TRUSTED LANGUAGE plproxy HANDLER plproxy_call_handler VALIDATOR plproxy_validator;
2.3 配置plproxy
用于业务的数据库用户为u01,相应建用户的语句为:
CREATE USER u01 password 'u01pwd';
存业务数据的数据库叫businessdb:
CREATE DATABASE businessdb owner u01;
连接businessdb,把businessdb中的public这个schema的属主改为u01:
alter schema public owner to u01;
建一个plproxy的代理库,名叫proxydb:
CREATE DATABASE proxydb owner u01;
应用就连接这个proxydb。
用超级用户连接proxydb,装载plproxy的扩展:
CREATE EXTENSION plproxy;
然后在proxydb中执行:
GRANT ALL ON FOREIGN DATA WRAPPER plproxy to u01;GRANT USAGE ON LANGUAGE plproxy TO u01;
然后用u01用户连接数据库proxydb,然后在其中建读和写的plproxy的cluster:
CREATE SERVER read_cluster FOREIGN DATA WRAPPER plproxyOPTIONS (connection_lifetime '1800',disable_binary '1',p0 'dbname=businessdb host=192.168.56.72',p1 'dbname=businessdb host=192.168.56.73');CREATE SERVER write_cluster FOREIGN DATA WRAPPER plproxyOPTIONS (connection_lifetime '1800',disable_binary '1',p0 'dbname=businessdb host=192.168.56.79');
上面内容中的“192.168.56.79”是主库的vip,但如果CLup没有启动,机器192.168.56.71上还没有这个vip,可以通过下面的命令把vip加上:
ip addr add 192.168.56.79/32 dev eth0
上面命令中“eth0”是你的网卡名,如果实际的网卡名不是“eth0”,需要换成实际的网卡名。
建用户映射,以便plproxy能访问底层的数据节点:
CREATE USER MAPPING FOR public SERVER read_cluster OPTIONS (user 'u01', password 'u01pwd');CREATE USER MAPPING FOR public SERVER write_cluster OPTIONS (user 'u01', password 'u01pwd');
下面假设有一张业务表users,此表记录了用户名,和用户的email
CREATE TABLE users(username text primary key, email text);
业务的需求是:
- 插入数据到users表中。
- 根据用户名(username)查询出用户的email。
则我们应该在businessdb中把表users建立起来,在其中建一个函数insert_user来完成第一个需求:插入数据的功能:
CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)RETURNS integer AS $$INSERT INTO users (username, email) VALUES ($1,$2);SELECT 1;$$ LANGUAGE SQL;
在proxydb中建以下两个函数:
--插数据的函数:CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)RETURNS integer AS $$CLUSTER 'write_cluster';RUN ON ANY;$$ LANGUAGE plproxy;
--查询数据的函数CREATE OR REPLACE FUNCTION get_user_email(i_username text)RETURNS SETOF text AS $$CLUSTER 'read_cluster';RUN ON ANY;SELECT email FROM users WHERE username = i_username;$$ LANGUAGE plproxy;
其中函数insert_user使用的是plproxy的“write_cluster”, 因为“write_cluster”中只有一台主库,所以插入数据的请求只会发到主库上。而“write_cluster”中用的是主库vip,所以不管哪台机器变成主库,请求都会发到这台机器上。
而函数get_user_email使用的是plproxy的‘read_cluster’,则请求会负载均衡的分发到“read_cluster”集群中的某一台机器上。
使用情况如下:
proxydb=> select insert_user('osdba', 'osdba@163.com');insert_user-------------1(1 row)proxydb=> select insert_user('chengfen', 'cf@163.com');insert_user-------------1(1 row)proxydb=> select get_user_email('chengfen');get_user_email----------------cf@163.com(1 row)proxydb=> select get_user_email('osdba');get_user_email----------------osdba@163.com(1 row)
3. 与CLup的高可用功能集成
我们知道当主库坏掉后,一台备库会激活成主库,原先PL/Proxy的读负载均衡到两台备库上的,这时就需要改成读请求到剩下的一台备库上。
CLup可以在切换时回调一个数据库中的函数,我们写这么一个函数,然后在这个函数中修改PL/Proxy的切换后的路由即可。
回调函数为csha_update_plp_server:
CREATE OR REPLACE FUNCTION csha_update_plp_server(in_type int, in_msg text, in_opr_ip text, in_before_cluster text, in_after_cluster text)RETURNS text AS$BODY$DECLAREv_ddl text;v_srv_opt text;v_ok_stb_db_array text[];v_plp_db_array text[];v_ok_stb_cnt int;v_plp_db_cnt int;v_db_list jsonb;v_curr_db_dict jsonb;v_pri_ip text;i int;k int;x text;rec RECORD;BEGINv_srv_opt := '';FOR rec IN (select split_part(opt, '=', 1) as part from (select unnest(srvoptions) as opt from pg_foreign_serverwhere srvname='read_cluster') t where t.opt like 'p%') LOOPIF length(v_srv_opt) =0 THENv_srv_opt := 'drop '||rec.part;ELSEv_srv_opt := 'drop '||rec.part||','||v_srv_opt;END IF;END LOOP;v_db_list := (in_after_cluster::jsonb)->'db_list';select array_agg(db_dict->>'host') into v_ok_stb_db_array from (select value as db_dict from jsonb_array_elements(v_db_list) as t) as t2where db_dict->>'state' = '1' and db_dict->>'is_primary' = '0';--RAISE WARNING 'v_db_list: %', v_db_list;--RAISE WARNING 'v_ok_stb_db_array: %', v_ok_stb_db_array;select db_dict->>'host' into v_pri_ip from (select value as db_dict from jsonb_array_elements(v_db_list) as t) as t2where db_dict->>'state' = '1' and db_dict->>'is_primary' = '1';select db_dict into v_curr_db_dict from (select value as db_dict from jsonb_array_elements(v_db_list) as t) as t2where db_dict->>'host' = in_opr_ip;v_ok_stb_cnt := array_length(v_ok_stb_db_array, 1);-- 因为plproxy中后端数必须是2的n次方,所以必须把传进来的db_list进行循环补齐,以满足2的n次方v_plp_db_cnt := ceil(log(2, v_ok_stb_cnt));v_plp_db_cnt := 1 << v_plp_db_cnt;k := 1;FOR i IN 1..v_plp_db_cnt LOOPv_plp_db_array := v_plp_db_array || v_ok_stb_db_array[k];k := k + 1;IF k > v_ok_stb_cnt THENk := 1;END IF;END LOOP;-- 如果没有任何一个备库还是正常的,只把查询指向主库IF array_length(v_plp_db_array, 1) = 0 THENv_plp_db_array := v_plp_db_array || v_pri_ip;END IF;i := 0;FOREACH x IN ARRAY v_plp_db_arrayLOOPv_srv_opt = v_srv_opt || format(',add p%s ''dbname=businessdb host=%s''', i, v_plp_db_array[i+1]);i := i + 1;END LOOP;v_ddl = 'ALTER SERVER read_cluster OPTIONS('|| v_srv_opt ||');';EXECUTE v_ddl;return v_ddl;END;$BODY$LANGUAGE 'plpgsql';
此函数的参数说明如下:
- in_type: 类型,当集群上线时也会调用此函数,但此时参数传入的值为0;当发现是备库故障时,此参数传入为1;当发现是主库故障时,此参数传入为2;当是手工切换数据库,此值输入的为3,当是把坏的节点加回时,此参数传入的是4
- in_msg: 故障一些信息
- in_opr_ip:如果是故障切换,此值为故障节点的ip,如果是人工切换,则是旧节点IP,如果是把节点加入集群是这个节点的ip
- in_before_cluster: 切换前的集群信息
- in_after_cluster: 切换后的集群信息
实际使用是请把函数定义中的where srvname=’read_cluster’中的plproxy中的只读服务名换成实际的服务名,把dbname=businessdb中的“businessdb”换成实际的业务数据库的名称。
上面的in_before_cluster、in_after_cluster的集群信息是一个json串,格式大致如下:
{"cluster_id": 19,"cluster_type": 1,"state": 0,"lock_time": 0,"cluster_name": "srcluster01","vip": "192.168.56.79","port": 5432,"pgdata": "/home/postgres/pgdata","remark": "sr cluster","cstlb_list": "","read_vip_host": "","read_vip": "","trigger_db_name": "proxydb","trigger_db_func": "csha_update_plp_server","ha_db_user": "postgres","ha_db_pass": "postgres","probe_db_name": "cs_sys_ha","probe_interval": "10","probe_timeout": "10","db_repl_user": "postgres","db_repl_pass": "postgres","probe_pri_sql": "UPDATE cs_sys_heartbeat SET hb_time = now()","probe_stb_sql": "select 1","db_list": [{"id": 1,"host": "192.168.56.71","port": 5432,"state": 1,"pgdata": "/home/postgres/pgdata","cluster_id": 19,"is_primary": 1,"repl_app_name": "stb71"}, {"id": 2,"host": "192.168.56.72","port": 5432,"state": 1,"pgdata": "/home/postgres/pgdata","cluster_id": 19,"is_primary": 0,"repl_app_name": "stb72"}, {"id": 3,"host": "192.168.56.73","port": 5432,"state": 1,"pgdata": "/home/postgres/pgdata","cluster_id": 19,"is_primary": 0,"repl_app_name": "stb73"}]}
我们到CLup管理界面的菜单HA集群->集群定义中,点集群列表中的编辑按钮,到流复制集群的配置界面中:
上图中把配置项“触发DB名称”设置为“proxydb”,把触发函数设置为“csha_update_plp_server”,这样当发生故障切换后,clup会调用此函数csha_update_plp_server去更新plproxy中的配置。