vlambda博客
学习文章列表

Golang 连接池的几种实现案例

因为TCP的三次握手等等原因,建立一个连接是一件成本比较高的行为。所以在一个需要多次与特定实体交互的程序中,就需要维持一个连接池,里面有可以复用的连接可供重复使用。

而维持一个连接池,最基本的要求就是要做到:thread safe(线程安全),尤其是在Golang这种特性是goroutine的语言中。

实现简单的连接池

 
   
   
 
  1. type Pool struct {

  2. m sync.Mutex //保证多个goroutine访问时候,closed的线程安全

  3. res chan io.Closer //连接存储的chan

  4. factory func() (io.Closer,error) //新建连接的工厂方法

  5. closed bool //连接池关闭标志

  6. }

这个简单的连接池,我们利用chan来存储池里的连接。而新建结构体的方法也比较简单:

 
   
   
 
  1. func New(fn func() (io.Closer, error), size uint) (*Pool, error) {

  2. if size <= 0 {

  3. return nil, errors.New("size的值太小了。")

  4. }

  5. return &Pool{

  6. factory: fn,

  7. res: make(chan io.Closer, size),

  8. }, nil

  9. }

只需要提供对应的工厂函数和连接池的大小就可以了。

获取连接

那么我们要怎么从中获取资源呢?因为我们内部存储连接的结构是chan,所以只需要简单的select就可以保证线程安全:

 
   
   
 
  1. //从资源池里获取一个资源

  2. func (p *Pool) Acquire() (io.Closer,error) {

  3. select {

  4. case r,ok := <-p.res:

  5. log.Println("Acquire:共享资源")

  6. if !ok {

  7. return nil,ErrPoolClosed

  8. }

  9. return r,nil

  10. default:

  11. log.Println("Acquire:新生成资源")

  12. return p.factory()

  13. }

  14. }

我们先从连接池的res这个chan里面获取,如果没有的话我们就利用我们早已经准备好的工厂函数进行构造连接。同时我们在从res获取连接的时候利用ok先确定了这个连接池是否已经关闭。如果已经关闭的话我们就返回早已经准备好的连接已关闭错误。

关闭连接池

那么既然提到关闭连接池,我们是怎么样关闭连接池的呢?

 
   
   
 
  1. //关闭资源池,释放资源

  2. func (p *Pool) Close() {

  3. p.m.Lock()

  4. defer p.m.Unlock()


  5. if p.closed {

  6. return

  7. }


  8. p.closed = true


  9. //关闭通道,不让写入了

  10. close(p.res)


  11. //关闭通道里的资源

  12. for r:=range p.res {

  13. r.Close()

  14. }

  15. }

这边我们需要先进行p.m.Lock()*上锁操作,这么做是因为我们需要对结构体里面的*closed进行读写。需要先把这个标志位设定后,关闭res这个chan,使得Acquire方法无法再获取新的连接。我们再对res这个chan里面的连接进行Close操作。

释放连接

释放连接首先得有个前提,就是连接池还没有关闭。如果连接池已经关闭再往res里面送连接的话就好触发panic。

 
   
   
 
  1. func (p *Pool) Release(r io.Closer){

  2. //保证该操作和Close方法的操作是安全的

  3. p.m.Lock()

  4. defer p.m.Unlock()


  5. //资源池都关闭了,就省这一个没有释放的资源了,释放即可

  6. if p.closed {

  7. r.Close()

  8. return

  9. }


  10. select {

  11. case p.res <- r:

  12. log.Println("资源释放到池子里了")

  13. default:

  14. log.Println("资源池满了,释放这个资源吧")

  15. r.Close()

  16. }

  17. }

以上就是一个简单且线程安全的连接池实现方式了。我们可以看到的是,现在连接池虽然已经实现了,但是还有几个小缺点:

  1. 我们对连接最大的数量没有限制,如果线程池空的话都我们默认就直接新建一个连接返回了。一旦并发量高的话将会不断新建连接,很容易(尤其是MySQL)造成 too many connections的报错发生。

  2. 既然我们需要保证最大可获取连接数量,那么我们就不希望数量定的太死。希望空闲的时候可以维护一定的空闲连接数量idleNum,但是又希望我们能限制最大可获取连接数量maxNum。

  3. 第一种情况是并发过多的情况,那么如果并发量过少呢?现在我们在新建一个连接并且归还后,我们很长一段时间不再使用这个连接。那么这个连接很有可能在几个小时甚至更长时间之前就已经建立的了。长时间闲置的连接我们并没有办法保证它的可用性。便有可能我们下次获取的连接是已经失效的连接。

那么我们可以从已经成熟使用的MySQL连接池库和Redis连接池库中看看,它们是怎么解决这些问题的。

Golang标准库的Sql连接池

Golang的连接池实现在标准库 database/sql/sql.go下。当我们运行:

 
   
   
 
  1. db, err := sql.Open("mysql", "xxxx")

的时候,就会打开一个连接池。我们可以看看返回的db的结构体:

 
   
   
 
  1. type DB struct {

  2. waitDuration int64 // Total time waited for new connections.

  3. mu sync.Mutex // protects following fields

  4. freeConn []*driverConn

  5. connRequests map[uint64]chan connRequest

  6. nextRequest uint64 // Next key to use in connRequests.

  7. numOpen int // number of opened and pending open connections


  8. // Used to signal the need for new connections

  9. // a goroutine running connectionOpener() reads on this chan and

  10. // maybeOpenNewConnections sends on the chan (one send per needed connection)

  11. // It is closed during db.Close(). The close tells the connectionOpener

  12. // goroutine to exit.

  13. openerCh chan struct{}

  14. closed bool

  15. maxIdle int // zero means defaultMaxIdleConns; negative means 0

  16. maxOpen int // <= 0 means unlimited

  17. maxLifetime time.Duration // maximum amount of time a connection may be reused

  18. cleanerCh chan struct{}

  19. waitCount int64 // Total number of connections waited for.

  20. maxIdleClosed int64 // Total number of connections closed due to idle.

  21. maxLifetimeClosed int64 // Total number of connections closed due to max free limit.

  22. }

上面省去了一些暂时不需要关注的field。我们可以看的,DB这个连接池内部存储连接的结构freeConn,并不是我们之前使用的chan,而是[]*driverConn,一个连接切片。同时我们还可以看到,里面有maxIdle等相关变量来控制空闲连接数量。值得注意的是,DB的初始化函数Open函数并没有新建数据库连接。而新建连接在哪个函数呢?我们可以在Query方法一路往回找,我们可以看到这个函数: func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)。而我们从连接池获取连接的方法,就从这里开始:

获取连接

 
   
   
 
  1. // conn returns a newly-opened or cached *driverConn.

  2. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {

  3. // 先判断db是否已经关闭。

  4. db.mu.Lock()

  5. if db.closed {

  6. db.mu.Unlock()

  7. return nil, errDBClosed

  8. }

  9. // 注意检测context是否已经被超时等原因被取消。

  10. select {

  11. default:

  12. case <-ctx.Done():

  13. db.mu.Unlock()

  14. return nil, ctx.Err()

  15. }

  16. lifetime := db.maxLifetime


  17. // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。

  18. numFree := len(db.freeConn)

  19. if strategy == cachedOrNewConn && numFree > 0 {

  20. conn := db.freeConn[0]

  21. copy(db.freeConn, db.freeConn[1:])

  22. db.freeConn = db.freeConn[:numFree-1]

  23. conn.inUse = true

  24. db.mu.Unlock()

  25. if conn.expired(lifetime) {

  26. conn.Close()

  27. return nil, driver.ErrBadConn

  28. }

  29. // Lock around reading lastErr to ensure the session resetter finished.

  30. conn.Lock()

  31. err := conn.lastErr

  32. conn.Unlock()

  33. if err == driver.ErrBadConn {

  34. conn.Close()

  35. return nil, driver.ErrBadConn

  36. }

  37. return conn, nil

  38. }


  39. // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待

  40. if db.maxOpen > 0 && db.numOpen >= db.maxOpen {

  41. // 下面的动作相当于往connRequests这个map插入自己的号码牌。

  42. // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。

  43. req := make(chan connRequest, 1)

  44. reqKey := db.nextRequestKeyLocked()

  45. db.connRequests[reqKey] = req

  46. db.waitCount++

  47. db.mu.Unlock()


  48. waitStart := time.Now()


  49. // Timeout the connection request with the context.

  50. select {

  51. case <-ctx.Done():

  52. // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。

  53. db.mu.Lock()

  54. delete(db.connRequests, reqKey)

  55. db.mu.Unlock()


  56. atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))


  57. select {

  58. default:

  59. case ret, ok := <-req:

  60. // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!

  61. if ok && ret.conn != nil {

  62. db.putConn(ret.conn, ret.err, false)

  63. }

  64. }

  65. return nil, ctx.Err()

  66. case ret, ok := <-req:

  67. // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。

  68. atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))


  69. if !ok {

  70. return nil, errDBClosed

  71. }

  72. if ret.err == nil && ret.conn.expired(lifetime) {

  73. ret.conn.Close()

  74. return nil, driver.ErrBadConn

  75. }

  76. if ret.conn == nil {

  77. return nil, ret.err

  78. }

  79. ret.conn.Lock()

  80. err := ret.conn.lastErr

  81. ret.conn.Unlock()

  82. if err == driver.ErrBadConn {

  83. ret.conn.Close()

  84. return nil, driver.ErrBadConn

  85. }

  86. return ret.conn, ret.err

  87. }

  88. }

  89. // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。

  90. db.numOpen++ // optimistically

  91. db.mu.Unlock()

  92. ci, err := db.connector.Connect(ctx)

  93. if err != nil {

  94. db.mu.Lock()

  95. db.numOpen-- // correct for earlier optimism

  96. db.maybeOpenNewConnections()

  97. db.mu.Unlock()

  98. return nil, err

  99. }

  100. db.mu.Lock()

  101. dc := &driverConn{

  102. db: db,

  103. createdAt: nowFunc(),

  104. ci: ci,

  105. inUse: true,

  106. }

  107. db.addDepLocked(dc, dc)

  108. db.mu.Unlock()

  109. return dc, nil

  110. }

  111. 复制代码

简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?

释放连接

我们可以直接找到 func(db*DB)putConnDBLocked(dc*driverConn,err error)bool这个方法。就像注释说的,这个方法主要的目的是:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

我们主要来看看里面重点那几行:

 
   
   
 
  1. ...

  2. // 如果已经超过最大打开数量了,就不需要在回归pool了

  3. if db.maxOpen > 0 && db.numOpen > db.maxOpen {

  4. return false

  5. }

  6. // 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。

  7. if c := len(db.connRequests); c > 0 {

  8. var req chan connRequest

  9. var reqKey uint64

  10. for reqKey, req = range db.connRequests {

  11. break

  12. }

  13. delete(db.connRequests, reqKey) // 删除这个在排队的请求。

  14. if err == nil {

  15. dc.inUse = true

  16. }

  17. // 把连接给这个正在排队的连接。

  18. req <- connRequest{

  19. conn: dc,

  20. err: err,

  21. }

  22. return true

  23. } else if err == nil && !db.closed {

  24. // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。

  25. if db.maxIdleConnsLocked() > len(db.freeConn) {

  26. db.freeConn = append(db.freeConn, dc)

  27. db.startCleanerLocked()

  28. return true

  29. }

  30. db.maxIdleClosed++

  31. }

  32. ...

我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。

现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。

值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。

redis Golang实现的Redis客户端

而它的连接池结构如下

 
   
   
 
  1. type ConnPool struct {

  2. ...

  3. queue chan struct{}


  4. connsMu sync.Mutex

  5. conns []*Conn

  6. idleConns []*Conn

  7. poolSize int

  8. idleConnsLen int


  9. stats Stats


  10. _closed uint32 // atomic

  11. closedCh chan struct{}

  12. }

我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看 queueconnsidleConns这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]*Conn结构:connsidleConns,那么问题来了:

到底连接存在哪里?

新建连接池连接

我们先从新建连接池连接开始看:

 
   
   
 
  1. func NewConnPool(opt *Options) *ConnPool {

  2. ....

  3. p.checkMinIdleConns()


  4. if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {

  5. go p.reaper(opt.IdleCheckFrequency)

  6. }

  7. ....

  8. }

初始化连接池的函数有个和前面两个不同的地方。

  1. checkMinIdleConns方法,在连接池初始化的时候就会往连接池填满空闲的连接。

  2. go p.reaper(opt.IdleCheckFrequency)则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。

获取连接

 
   
   
 
  1. func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {

  2. if p.closed() {

  3. return nil, ErrClosed

  4. }


  5. //这边和前面sql获取连接函数的流程不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。

  6. err := p.waitTurn(ctx)

  7. if err != nil {

  8. return nil, err

  9. }

  10. //前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。

  11. for {

  12. p.connsMu.Lock()

  13. //从空闲连接里面先获取一个空闲连接。

  14. cn := p.popIdle()

  15. p.connsMu.Unlock()


  16. if cn == nil {

  17. // 没有空闲连接时候直接跳出循环。

  18. break

  19. }

  20. // 判断是否已经过时,是的话close掉了然后继续取出。

  21. if p.isStaleConn(cn) {

  22. _ = p.CloseConn(cn)

  23. continue

  24. }


  25. atomic.AddUint32(&p.stats.Hits, 1)

  26. return cn, nil

  27. }


  28. atomic.AddUint32(&p.stats.Misses, 1)


  29. // 如果没有空闲连接的话,这边就直接新建连接了。

  30. newcn, err := p.newConn(ctx, true)

  31. if err != nil {

  32. // 归还令牌。

  33. p.freeTurn()

  34. return nil, err

  35. }


  36. return newcn, nil

  37. }

我们可以试着回答开头那个问题:连接到底存在哪里?答案是从 cn:=p.popIdle()这句话可以看出,获取连接这个动作,是从 idleConns里面获取的,而里面的函数也证明了这一点。同时我的理解是:

  1. sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。

  2. redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。

而:

 
   
   
 
  1. func (p *ConnPool) freeTurn() {

  2. <-p.queue

  3. }

  4. func (p *ConnPool) waitTurn(ctx context.Context) error {

  5. ...

  6. case p.queue <- struct{}{}:

  7. return nil

  8. ...

  9. }

就是在靠queue这个chan来维持令牌数量。

那么 conns的作用是什么呢?我们可以来看看新建连接这个函数:

新建连接

 
   
   
 
  1. func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {

  2. cn, err := p.dialConn(ctx, pooled)

  3. if err != nil {

  4. return nil, err

  5. }


  6. p.connsMu.Lock()

  7. p.conns = append(p.conns, cn)

  8. if pooled {

  9. // 如果连接池满了,会在后面移除。

  10. if p.poolSize >= p.opt.PoolSize {

  11. cn.pooled = false

  12. } else {

  13. p.poolSize++

  14. }

  15. }

  16. p.connsMu.Unlock()

  17. return cn, nil

  18. }

基本逻辑出来了。就是如果新建连接的话,我并不会直接放在 idleConns里面,而是先放 conns里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。

归还连接

 
   
   
 
  1. func (p *ConnPool) Put(cn *Conn) {

  2. if cn.rd.Buffered() > 0 {

  3. internal.Logger.Printf("Conn has unread data")

  4. p.Remove(cn, BadConnError{})

  5. return

  6. }

  7. //这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。

  8. if !cn.pooled {

  9. p.Remove(cn, nil)

  10. return

  11. }


  12. p.connsMu.Lock()

  13. p.idleConns = append(p.idleConns, cn)

  14. p.idleConnsLen++

  15. p.connsMu.Unlock()

  16. //我们可以看到很明显的这个归还号码牌的动作。

  17. p.freeTurn()

  18. }

其实归还的过程,就是从 conns转移到 idleConns的过程。当然了,如果新建这个连接时候发现已经 超卖了,后面归还时候就不转移,直接删除了。

等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:

  1. 先 waitTurn,拿到令牌。而令牌数量是根据pool里面的 queue决定的。

  2. 拿到令牌了,去库房 idleConns里面拿空闲的连接。没有的话就自己 newConn一个,并且把他记录到 conns里面。

  3. 用完了,就调用 put归还:也就是从 conns转移到 idleConns。归还的时候就检查在 newConn时候是不是已经做了超卖标记了。是的话就不转移到 idleConns

我当时疑惑了好久,既然始终都需要获得令牌才能得到连接,令牌数量是定的。为什么还会超卖呢?翻了一下源码,我的答案是:

虽然 Get方法获取连接是 newConn这个私用方法,受到令牌管制导致不会出现超卖。但是这个方法接受传参:pooledbool。所以我猜是担心其他人调用这个方法时候,不管三七二十一就传了true,导致poolSize越来越大。

总的来说,redis这个连接池的连接数控制,还是在 queue这个我称为令牌的chan进行操作。

总结

上面可以看到,连接池的最基本的保证,就是获取连接时候的线程安全。但是在实现诸多额外特性时候却又从不同角度来实现。还是非常有意思的。但是不管存储结构是用chan还是还是slice,都可以很好的实现这一点。如果像sql或者redis那样用slice来存储连接,就得维护一个结构来表示排队等候的效果。

首发于掘金,点击阅读原文直达


推荐阅读




喜欢本文的朋友,欢迎关注“Go语言中文网