某天,某个金融客户给我们反馈,查询pg_stat_activity中只有1400个连接,但是数据库配置的最大连接数是3000,但这时数据库的日志中就报不能建新连接的错误。
这个数据库的版本是PostgreSQL9.5。
经过中启乘数的PostgreSQL专家分析,这是当CPU 100%时,在PostgreSQL9.5版本下,真实的连接数无法反映到视图pg_stat_activity中导致。
在PostgreSQL9.5中,对每个连接在共享内存中放了一个ProcArray的结构,每次事务提交或回滚时需要更新这个结构,更新时需要加“ProcArrayLock”的底层锁,当大并发下时,这个锁竞争很激烈,导致新建连接慢以及真实的连接信息不能反映到pg_stat_activity视图中。而PostgreSQL9.6对这个锁进行了优化,官方文档对此有说明,如下所示:
要触发这个问题,需要以下几个前提:
这样就有可能触发此问题。而且,在PosgreSQL9.5及更低的版本下,在大量的DML事务和CPU达到100%时,即使停掉了一些连接,只要占用CPU 100%的查询不停掉,还是无法建新连接(报连接数满的错误)。而PostgreSQL9.6、PostgreSQL10、PostgreSQL 11等更高版本,则之前出现过连接数满的情况,但当连接数下来后,即使CPU一直是 100%,仍然可以建新连接。
而再PostgreSQL9.5和更低版本下,当连接数没有满时,但有大量的DML事务和大量占用CPU高的查询(把CPU占满)时,即使连接数没有满,建新连接时也会hang住。
如果要重新此问题,一定是需要并发的DML事务,如果事务数低,则这个问题不能重现,如在我的测试中并发的事务调整到100时,问题不出来,但当调整为200时,问题才出现。
如果要防止这个问题,可以通过升级数据库版本来解决这个问题。如果暂时不能升级数据库版本,当CPU 100%时,可以把占用CPU高的查询给取消掉,也可以避免这个这个问题。
把数据库的最大连接数参数max_connections设置为300。
造一张测试表:
create user u01 password 'u01';
create table test01(id int primary key, t text);
alter table test01 owner to u01;
insert into test01 select seq, seq||'xxxa;ldksljadflfa;kdfsa;lsdkj;alkdsfj;asdklfja;sdfkl;jasdflkajsdf' from generate_series(1, 30000) as seq;
上面的SQL中我们新建了一个u01的用户,然后就在此用户u01下做测试。
用golang语言写一个压测程序godbtest(此压测程序的源码在后面),开800个并发的更新:
UPDATE test01 set t = 'xxx%dxx' WHERE id=:val
其中val值是一个从0~30000的随机数。
开并发200个耗CPU的查询:
select count(*) from test01;
然后开2000个简单查询把连接数给挤爆:
SELECT * FROM test01 WHERE id=10
开始时,数据库中基本没有啥连接:
CPU也很空闲:
运行并发800个更新的命令:
./godbtest -u 800
如下图:
另一个窗口运行并发200个耗CPU的select count(*) form test01
的命令:
./godbtest -S 200 -u 0
如下图:
这时CPU 100%了,如下图所示:
另一个窗口运行并发2000个简单查询把连接数挤爆:
./godbtest -s 2000 -t 4
上面命令中“-t 4“是表示建连接之后执行” SELECT * FROM test01 WHERE id=10”,休眠4ms后在重复执行。
如下图所示:
统计连接数,发现只有1700多,没有3000个,如下图:
而我们的程序不停的一直连接数据库,实际连接肯定是到达了3000,但在pg_stat_activity视图中只能看到1700多个连接,这就说明了pg_stat_activity在这种情况下无法反映实际的连接数。
另开一个窗口,手工psql连接,发现确实是连接不上去了:
把连接挤爆简单查询的停下来:
发现仍然无法连接上去:
这时的连接数已经降到1500多了:
只有把耗CPU的select count(*) form test01
停下来之后,才可以对数据库建新连接:
这时再用psql连接测试,就可以连接上去了:
从上图可以看出把占CPU高的查询select count(*) from test01
一停下来,就立即可以建新连接了。
godbtest的源码如下:
package main
import (
"database/sql"
"flag"
"fmt"
_ "github.com/lib/pq"
"log"
"math/rand"
"os"
"time"
)
var g_db *sql.DB
func connectDB() *sql.DB {
url := "host=192.168.160.22 port=5432 user=u01 password=u01 dbname=postgres sslmode=disable"
db, err := sql.Open("postgres", url)
if err != nil {
log.Println("Connect to db failed:", err)
return nil
} else {
return db
}
}
func runSelectCount(sleepMs int) {
var strSql string
var errMsg string
//var rowNum int64
for {
strSql = "SELECT count(*) FROM test01"
_, err := g_db.Exec(strSql)
if err != nil {
errMsg = "执行SQL(prepare阶段)出错:" + err.Error()
log.Println(errMsg)
//time.Sleep(time.Duration(2)*time.Second)
continue
}
}
}
func runSelect(sleepMs int) {
var strSql string
var errMsg string
//var rowNum int64
for {
strSql = "SELECT * FROM test01 WHERE id=10"
_, err := g_db.Exec(strSql)
if err != nil {
errMsg = "执行SQL(prepare阶段)出错:" + err.Error()
log.Println(errMsg)
//time.Sleep(time.Duration(2)*time.Second)
continue
}
time.Sleep(time.Duration(sleepMs)*time.Millisecond)
}
}
func runUpdate(sleepMs int) {
var strSql string
var errMsg string
//var rowNum int64
rand.Seed(time.Now().Unix())
for {
tx, err := g_db.Begin()
if err != nil {
errMsg = "Begin transaction 出错:" + err.Error()
log.Println(errMsg)
time.Sleep(time.Duration(2)*time.Second)
continue
}
strSql = fmt.Sprintf("UPDATE test01 set t = 'xxx%dxx' WHERE id=%d", rand.Intn(30000), rand.Intn(30000))
stmt, err := g_db.Prepare(strSql)
if err != nil {
tx.Rollback()
errMsg = "执行SQL(prepare阶段)出错:" + err.Error()
log.Println(errMsg)
time.Sleep(time.Duration(2)*time.Second)
continue
}
var res sql.Result
//bind_data := make([]interface{}, 0)
//var i int = 0
//var plh int = 1
//for k, v := range row {
// if i > 0 {
// sqlBuf.WriteString(", ")
// }
// sqlBuf.WriteString(k + "=$" + strconv.Itoa(plh))
// bind_data = append(bind_data, v)
// i++
// plh++
//}
//res, err = stmt.Exec(bind_data...)
res, err = stmt.Exec()
if err != nil {
tx.Rollback()
stmt.Close()
errMsg = "执行SQL出错:" + err.Error()
log.Println(errMsg)
continue
}
//rowNum, err = res.RowsAffected()
_, err = res.RowsAffected()
//log.Printf("RowsAffected=%d\n", rowNum)
stmt.Close()
time.Sleep(time.Duration(sleepMs)*time.Millisecond)
tx.Commit()
}
}
func main() {
var updateCnt int
var selectCnt int
var selectCountCnt int
var sleepMs int
var h bool
flag.BoolVar(&h, "h", false, "查看帮助")
flag.IntVar(&updateCnt,"u",0,"UPDATE的并发连接数,默认0")
flag.IntVar(&selectCnt,"s",0,"SELECT的并发连接数,默认0")
flag.IntVar(&selectCountCnt,"S",0,"SELECT count(*) FROM TEST01 并发连接数,默认0")
flag.IntVar(&sleepMs,"t",100,"sleep的毫秒数,默认100")
flag.Parse()
if h {
flag.Usage()
os.Exit(0)
}
g_db = connectDB()
g_db.SetMaxOpenConns(5000)
g_db.SetMaxIdleConns(1000)
g_db.Ping()
for i:=0; i< selectCnt; i++ {
go runSelect(sleepMs)
}
for i:=0; i< selectCountCnt; i++ {
go runSelectCount(sleepMs)
}
for i:=0; i< updateCnt; i++ {
go runUpdate(sleepMs)
}
for {
time.Sleep(time.Duration(10)*time.Second)
}
}