CLup+PL/Proxy实现微服务及读写分离的解决方案
1. 方案介绍
PL/Proxy 是一个数据路由的插件,可以方便的实现数据路由的功能。在一台数据库中安装PL/Proxy后,此台数据库就变成了一个中间件,此台中间件的数据库本身并不存储数据,而是后端的一些数据库中存储数据。当请求发送到PL/Proxy后,PL/Proxy根据设定的数据路由规则把请求转发到后端的数据库中,架构如下:
PL/Proxy转发的请求不能是普通的SQL,必须是一个函数。
我们以一个简单的例子来说明,假设我们有一个业务有如下操作:
- 增加用户,用户信息有 用户名称和email地址
- 根据用户名称查询用户的email地址。
我们建立这张用户表:
CREATE TABLE users(username text primary key, email text);
按PL/Proxy的要求,业务“新建用户”的逻辑需要放到一个函数中:
CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
RETURNS integer AS $$
INSERT INTO users (username, email) VALUES ($1,$2);
SELECT 1;
$$ LANGUAGE SQL;
根据用户名查询用户的email也放到一个函数中:
CREATE OR REPLACE FUNCTION get_user_email(i_username text)
RETURNS TABLE (
email text
) AS
$$
BEGIN
RETURN QUERY
SELECT email FROM users WHERE username = i_username;
END;
$$ LANGUAGE plpgsql;
假设我们数据库架构是一主两备的架构,函数insert_user只能在主库上执行,而get_user_email函数我们希望是负载均衡到各个备库上执行,这个对不同函数的路由到不同的主备库上的功能我们可以用PL/Proxy来实现,实现架构如下:
上面的是架构师的一个逻辑部署图,后续的实际例子中实际我们并没有把PL/Proxy插件安装在单独的数据库中,而是直接安装在存数据的数据库实例中。
在PL/Proxy中,我们需要把主库编一个组叫write_cluster:
CREATE SERVER write_cluster FOREIGN DATA WRAPPER plproxy
OPTIONS (
connection_lifetime '1800',
disable_binary '1',
p0 'dbname=businessdb host=192.168.56.79'
);
上面的192.168.56.79是主库的VIP地址。
把2个备库也编一个组叫read_cluster
CREATE SERVER read_cluster FOREIGN DATA WRAPPER plproxy
OPTIONS (
connection_lifetime '1800',
disable_binary '1',
p0 'dbname=businessdb host=192.168.56.72',
p1 'dbname=businessdb host=192.168.56.73',
);
insert_user的请求发送到组write_cluster中。而get_user_email的请求发送到组read_cluster中。
我们需要在PL/Proxy所在的数据库中建同名的PL/Proxy的函数insert_user和get_user_email:
CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
RETURNS integer AS $$
CLUSTER 'write_cluster';
RUN ON ANY;
$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION get_user_email(i_username text)
RETURNS SETOF text AS $$
CLUSTER 'read_cluster';
RUN ON ANY;
$$ LANGUAGE plproxy;
注意路由函数的名称、函数参数、返回值与原先的业务函数完全相同,但语言类型是plproxy,这样当我们调用plproxy数据库中的路由函数,路由函数会把请求根据路由规则转发到后端数据库中的实际函数。
路由函数的函数体的内容都是路由规则:
- CLUSTER ‘write_cluster’; 把请求转发到write_cluster组中
- CLUSTER ‘read_cluster’; 把请求转发到read_cluster组中
- RUN ON ANY;转发到组中的任意一台机器上
更多的路由规则的语法请见:https://plproxy.github.io/syntax.html
路由函数insert_user中“CLUSTER ‘write_cluster’;”表明这个请求只发到主库中。
路由函数get_user_email中“CLUSTER ‘read_cluster’;”表明这个请求负载均衡的只发送到各个备库。
当然PL/Proxy还有很多其它的路由规则。可以详细见PL/Proxy的官方的手册。
2. 我们使用一个实际的例子来讲解完整的搭建过程
2.1 plproxy的示例环境说明
一台主库: 192.168.56.71 二台备库: 192.168.56.72~73 VIP为:192.168.56.79
这三台机器的主机名分别为:pg01,pg02,pg03
此例子中实际我们并没有把PL/Proxy插件安装在单独的数据库机器中,而是直接安装在存数据的数据库实例中。
2.2 安装plproxy
需要在每台机器上安装plproxy插件,到https://plproxy.github.io/downloads/files/2.10.0/plproxy-2.10.0.tar.gz
下载安装包。
为了让普通用户可以创建plproxy的路由函数,安装完plproxy后,把plproxy的sql脚本(源码包中sql/plproxy_lang.sql文件)中创建plproxy language的方式改成TRUSTED。因为对于非安全的语言只能是超级用户才可以创建此language的函数,原先的内容为:
CREATE OR REPLACE LANGUAGE plproxy HANDLER plproxy_call_handler VALIDATOR plproxy_validator;
把上面的“CREATE OR REPLACE TRUSTED LANGUAGE plproxy”改成“CREATE OR REPLACE LANGUAGE plproxy”:
CREATE OR REPLACE TRUSTED LANGUAGE plproxy HANDLER plproxy_call_handler VALIDATOR plproxy_validator;
2.3 配置plproxy
用于业务的数据库用户为u01,相应建用户的语句为:
CREATE USER u01 password 'u01pwd';
存业务数据的数据库叫businessdb:
CREATE DATABASE businessdb owner u01;
连接businessdb,把businessdb中的public这个schema的属主改为u01:
alter schema public owner to u01;
建一个plproxy的代理库,名叫proxydb:
CREATE DATABASE proxydb owner u01;
应用就连接这个proxydb。
用超级用户连接proxydb,装载plproxy的扩展:
CREATE EXTENSION plproxy;
然后在proxydb中执行:
GRANT ALL ON FOREIGN DATA WRAPPER plproxy to u01;
GRANT USAGE ON LANGUAGE plproxy TO u01;
然后用u01用户连接数据库proxydb,然后在其中建读和写的plproxy的cluster:
CREATE SERVER read_cluster FOREIGN DATA WRAPPER plproxy
OPTIONS (
connection_lifetime '1800',
disable_binary '1',
p0 'dbname=businessdb host=192.168.56.72',
p1 'dbname=businessdb host=192.168.56.73'
);
CREATE SERVER write_cluster FOREIGN DATA WRAPPER plproxy
OPTIONS (
connection_lifetime '1800',
disable_binary '1',
p0 'dbname=businessdb host=192.168.56.79'
);
上面内容中的“192.168.56.79”是主库的vip,但如果CLup没有启动,机器192.168.56.71上还没有这个vip,可以通过下面的命令把vip加上:
ip addr add 192.168.56.79/32 dev eth0
上面命令中“eth0”是你的网卡名,如果实际的网卡名不是“eth0”,需要换成实际的网卡名。
建用户映射,以便plproxy能访问底层的数据节点:
CREATE USER MAPPING FOR public SERVER read_cluster OPTIONS (user 'u01', password 'u01pwd');
CREATE USER MAPPING FOR public SERVER write_cluster OPTIONS (user 'u01', password 'u01pwd');
下面假设有一张业务表users,此表记录了用户名,和用户的email
CREATE TABLE users(username text primary key, email text);
业务的需求是:
- 插入数据到users表中。
- 根据用户名(username)查询出用户的email。
则我们应该在businessdb中把表users建立起来,在其中建一个函数insert_user来完成第一个需求:插入数据的功能:
CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
RETURNS integer AS $$
INSERT INTO users (username, email) VALUES ($1,$2);
SELECT 1;
$$ LANGUAGE SQL;
在proxydb中建以下两个函数:
--插数据的函数:
CREATE OR REPLACE FUNCTION insert_user(i_username text, i_emailaddress text)
RETURNS integer AS $$
CLUSTER 'write_cluster';
RUN ON ANY;
$$ LANGUAGE plproxy;
--查询数据的函数
CREATE OR REPLACE FUNCTION get_user_email(i_username text)
RETURNS SETOF text AS $$
CLUSTER 'read_cluster';
RUN ON ANY;
SELECT email FROM users WHERE username = i_username;
$$ LANGUAGE plproxy;
其中函数insert_user使用的是plproxy的“write_cluster”, 因为“write_cluster”中只有一台主库,所以插入数据的请求只会发到主库上。而“write_cluster”中用的是主库vip,所以不管哪台机器变成主库,请求都会发到这台机器上。
而函数get_user_email使用的是plproxy的‘read_cluster’,则请求会负载均衡的分发到“read_cluster”集群中的某一台机器上。
使用情况如下:
proxydb=> select insert_user('osdba', 'osdba@163.com');
insert_user
-------------
1
(1 row)
proxydb=> select insert_user('chengfen', 'cf@163.com');
insert_user
-------------
1
(1 row)
proxydb=> select get_user_email('chengfen');
get_user_email
----------------
cf@163.com
(1 row)
proxydb=> select get_user_email('osdba');
get_user_email
----------------
osdba@163.com
(1 row)
3. 与CLup的高可用功能集成
我们知道当主库坏掉后,一台备库会激活成主库,原先PL/Proxy的读负载均衡到两台备库上的,这时就需要改成读请求到剩下的一台备库上。
CLup可以在切换时回调一个数据库中的函数,我们写这么一个函数,然后在这个函数中修改PL/Proxy的切换后的路由即可。
回调函数为csha_update_plp_server:
CREATE OR REPLACE FUNCTION csha_update_plp_server(in_type int, in_msg text, in_opr_ip text, in_before_cluster text, in_after_cluster text)
RETURNS text AS
$BODY$
DECLARE
v_ddl text;
v_srv_opt text;
v_ok_stb_db_array text[];
v_plp_db_array text[];
v_ok_stb_cnt int;
v_plp_db_cnt int;
v_db_list jsonb;
v_curr_db_dict jsonb;
v_pri_ip text;
i int;
k int;
x text;
rec RECORD;
BEGIN
v_srv_opt := '';
FOR rec IN (select split_part(opt, '=', 1) as part from (select unnest(srvoptions) as opt from pg_foreign_server
where srvname='read_cluster') t where t.opt like 'p%') LOOP
IF length(v_srv_opt) =0 THEN
v_srv_opt := 'drop '||rec.part;
ELSE
v_srv_opt := 'drop '||rec.part||','||v_srv_opt;
END IF;
END LOOP;
v_db_list := (in_after_cluster::jsonb)->'db_list';
select array_agg(db_dict->>'host') into v_ok_stb_db_array from (
select value as db_dict from jsonb_array_elements(v_db_list) as t
) as t2
where db_dict->>'state' = '1' and db_dict->>'is_primary' = '0';
--RAISE WARNING 'v_db_list: %', v_db_list;
--RAISE WARNING 'v_ok_stb_db_array: %', v_ok_stb_db_array;
select db_dict->>'host' into v_pri_ip from (
select value as db_dict from jsonb_array_elements(v_db_list) as t) as t2
where db_dict->>'state' = '1' and db_dict->>'is_primary' = '1';
select db_dict into v_curr_db_dict from (
select value as db_dict from jsonb_array_elements(v_db_list) as t) as t2
where db_dict->>'host' = in_opr_ip;
v_ok_stb_cnt := array_length(v_ok_stb_db_array, 1);
-- 因为plproxy中后端数必须是2的n次方,所以必须把传进来的db_list进行循环补齐,以满足2的n次方
v_plp_db_cnt := ceil(log(2, v_ok_stb_cnt));
v_plp_db_cnt := 1 << v_plp_db_cnt;
k := 1;
FOR i IN 1..v_plp_db_cnt LOOP
v_plp_db_array := v_plp_db_array || v_ok_stb_db_array[k];
k := k + 1;
IF k > v_ok_stb_cnt THEN
k := 1;
END IF;
END LOOP;
-- 如果没有任何一个备库还是正常的,只把查询指向主库
IF array_length(v_plp_db_array, 1) = 0 THEN
v_plp_db_array := v_plp_db_array || v_pri_ip;
END IF;
i := 0;
FOREACH x IN ARRAY v_plp_db_array
LOOP
v_srv_opt = v_srv_opt || format(',add p%s ''dbname=businessdb host=%s''', i, v_plp_db_array[i+1]);
i := i + 1;
END LOOP;
v_ddl = 'ALTER SERVER read_cluster OPTIONS('|| v_srv_opt ||');';
EXECUTE v_ddl;
return v_ddl;
END;
$BODY$
LANGUAGE 'plpgsql';
此函数的参数说明如下:
- in_type: 类型,当集群上线时也会调用此函数,但此时参数传入的值为0;当发现是备库故障时,此参数传入为1;当发现是主库故障时,此参数传入为2;当是手工切换数据库,此值输入的为3,当是把坏的节点加回时,此参数传入的是4
- in_msg: 故障一些信息
- in_opr_ip:如果是故障切换,此值为故障节点的ip,如果是人工切换,则是旧节点IP,如果是把节点加入集群是这个节点的ip
- in_before_cluster: 切换前的集群信息
- in_after_cluster: 切换后的集群信息
实际使用是请把函数定义中的where srvname=’read_cluster’中的plproxy中的只读服务名换成实际的服务名,把dbname=businessdb中的“businessdb”换成实际的业务数据库的名称。
上面的in_before_cluster、in_after_cluster的集群信息是一个json串,格式大致如下:
{
"cluster_id": 19,
"cluster_type": 1,
"state": 0,
"lock_time": 0,
"cluster_name": "srcluster01",
"vip": "192.168.56.79",
"port": 5432,
"pgdata": "/home/postgres/pgdata",
"remark": "sr cluster",
"cstlb_list": "",
"read_vip_host": "",
"read_vip": "",
"trigger_db_name": "proxydb",
"trigger_db_func": "csha_update_plp_server",
"ha_db_user": "postgres",
"ha_db_pass": "postgres",
"probe_db_name": "cs_sys_ha",
"probe_interval": "10",
"probe_timeout": "10",
"db_repl_user": "postgres",
"db_repl_pass": "postgres",
"probe_pri_sql": "UPDATE cs_sys_heartbeat SET hb_time = now()",
"probe_stb_sql": "select 1",
"db_list": [{
"id": 1,
"host": "192.168.56.71",
"port": 5432,
"state": 1,
"pgdata": "/home/postgres/pgdata",
"cluster_id": 19,
"is_primary": 1,
"repl_app_name": "stb71"
}, {
"id": 2,
"host": "192.168.56.72",
"port": 5432,
"state": 1,
"pgdata": "/home/postgres/pgdata",
"cluster_id": 19,
"is_primary": 0,
"repl_app_name": "stb72"
}, {
"id": 3,
"host": "192.168.56.73",
"port": 5432,
"state": 1,
"pgdata": "/home/postgres/pgdata",
"cluster_id": 19,
"is_primary": 0,
"repl_app_name": "stb73"
}]
}
我们到CLup管理界面的菜单HA集群
->集群定义
中,点集群列表中的编辑
按钮,到流复制集群的配置界面中:
上图中把配置项“触发DB名称”设置为“proxydb”,把触发函数设置为“csha_update_plp_server”,这样当发生故障切换后,clup会调用此函数csha_update_plp_server去更新plproxy中的配置。