首页
产品
CLup:PostgreSQL高可用集群平台CData高性能数据库云一体机CBackup数据库备份恢复云平台CPDA高性能双子星数据库机
解决方案
数据库专业技术服务全栈式PostgreSQL解决方案Oracle分布式存储化数据库云
文章
客户及伙伴
中启开源
关于我们
公司简介 联系我们
中启开源

1. 背景

某天,某个金融客户给我们反馈,查询pg_stat_activity中只有1400个连接,但是数据库配置的最大连接数是3000,但这时数据库的日志中就报不能建新连接的错误。

这个数据库的版本是PostgreSQL9.5。

2. 问题分析

经过中启乘数的PostgreSQL专家分析,这是当CPU 100%时,在PostgreSQL9.5版本下,真实的连接数无法反映到视图pg_stat_activity中导致。

在PostgreSQL9.5中,对每个连接在共享内存中放了一个ProcArray的结构,每次事务提交或回滚时需要更新这个结构,更新时需要加“ProcArrayLock”的底层锁,当大并发下时,这个锁竞争很激烈,导致新建连接慢以及真实的连接信息不能反映到pg_stat_activity视图中。而PostgreSQL9.6对这个锁进行了优化,官方文档对此有说明,如下所示:
减少ProcArrayLock的争用

要触发这个问题,需要以下几个前提:

这样就有可能触发此问题。而且,在PosgreSQL9.5及更低的版本下,在大量的DML事务和CPU达到100%时,即使停掉了一些连接,只要占用CPU 100%的查询不停掉,还是无法建新连接(报连接数满的错误)。而PostgreSQL9.6、PostgreSQL10、PostgreSQL 11等更高版本,则之前出现过连接数满的情况,但当连接数下来后,即使CPU一直是 100%,仍然可以建新连接。

而再PostgreSQL9.5和更低版本下,当连接数没有满时,但有大量的DML事务和大量占用CPU高的查询(把CPU占满)时,即使连接数没有满,建新连接时也会hang住。

如果要重新此问题,一定是需要并发的DML事务,如果事务数低,则这个问题不能重现,如在我的测试中并发的事务调整到100时,问题不出来,但当调整为200时,问题才出现。

如果要防止这个问题,可以通过升级数据库版本来解决这个问题。如果暂时不能升级数据库版本,当CPU 100%时,可以把占用CPU高的查询给取消掉,也可以避免这个这个问题。

3. 重现此问题

把数据库的最大连接数参数max_connections设置为300。

造一张测试表:

  1. create user u01 password 'u01';
  2. create table test01(id int primary key, t text);
  3. alter table test01 owner to u01;
  4. insert into test01 select seq, seq||'xxxa;ldksljadflfa;kdfsa;lsdkj;alkdsfj;asdklfja;sdfkl;jasdflkajsdf' from generate_series(1, 30000) as seq;

上面的SQL中我们新建了一个u01的用户,然后就在此用户u01下做测试。

3.1 测试的思路

用golang语言写一个压测程序godbtest(此压测程序的源码在后面),开800个并发的更新:

  1. UPDATE test01 set t = 'xxx%dxx' WHERE id=:val

其中val值是一个从0~30000的随机数。

开并发200个耗CPU的查询:

  1. select count(*) from test01;

然后开2000个简单查询把连接数给挤爆:

  1. SELECT * FROM test01 WHERE id=10

3.2 实际操作过程

开始时,数据库中基本没有啥连接:

开始时数据库无压力

CPU也很空闲:
开始时CPU很空闲

运行并发800个更新的命令:

  1. ./godbtest -u 800

如下图:
godbtest启动800个并发更新

另一个窗口运行并发200个耗CPU的select count(*) form test01的命令:

  1. ./godbtest -S 200 -u 0

如下图:

运行200个并发count()查询

这时CPU 100%了,如下图所示:

CPU 100%的图

另一个窗口运行并发2000个简单查询把连接数挤爆:

  1. ./godbtest -s 2000 -t 4

上面命令中“-t 4“是表示建连接之后执行” SELECT * FROM test01 WHERE id=10”,休眠4ms后在重复执行。

如下图所示:
运行并发2000个简单查询把连接数挤爆

统计连接数,发现只有1700多,没有3000个,如下图:

统计连接数,发现只有1700多,没有3000个

而我们的程序不停的一直连接数据库,实际连接肯定是到达了3000,但在pg_stat_activity视图中只能看到1700多个连接,这就说明了pg_stat_activity在这种情况下无法反映实际的连接数。

另开一个窗口,手工psql连接,发现确实是连接不上去了:

手工psql连接发现确实是连接不上去

把连接挤爆简单查询的停下来:

把连接挤爆简单查询的停下来

发现仍然无法连接上去:
发现仍然无法连接上去

这时的连接数已经降到1500多了:
这时的连接数已经降到1500多

只有把耗CPU的select count(*) form test01停下来之后,才可以对数据库建新连接:
把耗CPU的count停下来之后

这时再用psql连接测试,就可以连接上去了:
再用psql连接测试,发现过一会,就可以连接上去

从上图可以看出把占CPU高的查询select count(*) from test01一停下来,就立即可以建新连接了。

4. 附压力测试程序

godbtest的源码如下:

  1. package main
  2. import (
  3. "database/sql"
  4. "flag"
  5. "fmt"
  6. _ "github.com/lib/pq"
  7. "log"
  8. "math/rand"
  9. "os"
  10. "time"
  11. )
  12. var g_db *sql.DB
  13. func connectDB() *sql.DB {
  14. url := "host=192.168.160.22 port=5432 user=u01 password=u01 dbname=postgres sslmode=disable"
  15. db, err := sql.Open("postgres", url)
  16. if err != nil {
  17. log.Println("Connect to db failed:", err)
  18. return nil
  19. } else {
  20. return db
  21. }
  22. }
  23. func runSelectCount(sleepMs int) {
  24. var strSql string
  25. var errMsg string
  26. //var rowNum int64
  27. for {
  28. strSql = "SELECT count(*) FROM test01"
  29. _, err := g_db.Exec(strSql)
  30. if err != nil {
  31. errMsg = "执行SQL(prepare阶段)出错:" + err.Error()
  32. log.Println(errMsg)
  33. //time.Sleep(time.Duration(2)*time.Second)
  34. continue
  35. }
  36. }
  37. }
  38. func runSelect(sleepMs int) {
  39. var strSql string
  40. var errMsg string
  41. //var rowNum int64
  42. for {
  43. strSql = "SELECT * FROM test01 WHERE id=10"
  44. _, err := g_db.Exec(strSql)
  45. if err != nil {
  46. errMsg = "执行SQL(prepare阶段)出错:" + err.Error()
  47. log.Println(errMsg)
  48. //time.Sleep(time.Duration(2)*time.Second)
  49. continue
  50. }
  51. time.Sleep(time.Duration(sleepMs)*time.Millisecond)
  52. }
  53. }
  54. func runUpdate(sleepMs int) {
  55. var strSql string
  56. var errMsg string
  57. //var rowNum int64
  58. rand.Seed(time.Now().Unix())
  59. for {
  60. tx, err := g_db.Begin()
  61. if err != nil {
  62. errMsg = "Begin transaction 出错:" + err.Error()
  63. log.Println(errMsg)
  64. time.Sleep(time.Duration(2)*time.Second)
  65. continue
  66. }
  67. strSql = fmt.Sprintf("UPDATE test01 set t = 'xxx%dxx' WHERE id=%d", rand.Intn(30000), rand.Intn(30000))
  68. stmt, err := g_db.Prepare(strSql)
  69. if err != nil {
  70. tx.Rollback()
  71. errMsg = "执行SQL(prepare阶段)出错:" + err.Error()
  72. log.Println(errMsg)
  73. time.Sleep(time.Duration(2)*time.Second)
  74. continue
  75. }
  76. var res sql.Result
  77. //bind_data := make([]interface{}, 0)
  78. //var i int = 0
  79. //var plh int = 1
  80. //for k, v := range row {
  81. // if i > 0 {
  82. // sqlBuf.WriteString(", ")
  83. // }
  84. // sqlBuf.WriteString(k + "=$" + strconv.Itoa(plh))
  85. // bind_data = append(bind_data, v)
  86. // i++
  87. // plh++
  88. //}
  89. //res, err = stmt.Exec(bind_data...)
  90. res, err = stmt.Exec()
  91. if err != nil {
  92. tx.Rollback()
  93. stmt.Close()
  94. errMsg = "执行SQL出错:" + err.Error()
  95. log.Println(errMsg)
  96. continue
  97. }
  98. //rowNum, err = res.RowsAffected()
  99. _, err = res.RowsAffected()
  100. //log.Printf("RowsAffected=%d\n", rowNum)
  101. stmt.Close()
  102. time.Sleep(time.Duration(sleepMs)*time.Millisecond)
  103. tx.Commit()
  104. }
  105. }
  106. func main() {
  107. var updateCnt int
  108. var selectCnt int
  109. var selectCountCnt int
  110. var sleepMs int
  111. var h bool
  112. flag.BoolVar(&h, "h", false, "查看帮助")
  113. flag.IntVar(&updateCnt,"u",0,"UPDATE的并发连接数,默认0")
  114. flag.IntVar(&selectCnt,"s",0,"SELECT的并发连接数,默认0")
  115. flag.IntVar(&selectCountCnt,"S",0,"SELECT count(*) FROM TEST01 并发连接数,默认0")
  116. flag.IntVar(&sleepMs,"t",100,"sleep的毫秒数,默认100")
  117. flag.Parse()
  118. if h {
  119. flag.Usage()
  120. os.Exit(0)
  121. }
  122. g_db = connectDB()
  123. g_db.SetMaxOpenConns(5000)
  124. g_db.SetMaxIdleConns(1000)
  125. g_db.Ping()
  126. for i:=0; i< selectCnt; i++ {
  127. go runSelect(sleepMs)
  128. }
  129. for i:=0; i< selectCountCnt; i++ {
  130. go runSelectCount(sleepMs)
  131. }
  132. for i:=0; i< updateCnt; i++ {
  133. go runUpdate(sleepMs)
  134. }
  135. for {
  136. time.Sleep(time.Duration(10)*time.Second)
  137. }
  138. }