vlambda博客
学习文章列表

go-redis 连接池实现思想

本着 ”拿来“ 主义,写了这篇文章,主要是用来做记录日常工作,只是拿来别人的思想,提高自己处理问题的能力,和扩展自己解决问题和搬砖的思路,也希望可以给别人带来一些思路。




为什么会有redis连接池的概念


    在传统的C/S模型中,客户端和服务端需要通信的话就要建立连接,在这里每次连接都需要花费不少的资源,例如常说的TCP三次握手等。随着网络和互联网应用越来越庞大和集中,单个请求对应单个链接的模式已经满足不了高并发的场景了,所以连接池的概念就出现了,基本上网络相关的连接池都是在客户端这边维护。

    连接池则可以实现在客户端建立多个链接并且不释放,当需要使用链接的时候通过一定的算法获取已经建立的链接,使用完了以后则还给连接池,这就免去了TCP建立连接所带来的系统调用所消耗的时间。上面都是瞎BB,不想看的直接跳到下面看正文。


go-redis 连接池设计思路


    go-redis也提供了三种对应服务端的客户端模式,集群,哨兵,和单机模式, 这里我们主要从连接池方面了解下,所以三种模式在连接池这一块都是公用的。

    先列一下基本的代码

https://github.com/go-redis/redis/blob/master/internal/pool/pool.go

type Options struct { //建立连接函数 Dialer func() (net.Conn, error) //在链接关闭的时候的回调 OnClose func(*Conn) error //连接池中的链接的最大数量 PoolSize int //最小的空闲链接的数量 MinIdleConns int //单个链接的最大生命周期 MaxConnAge time.Duration //从连接池获取链接的超时时间 PoolTimeout time.Duration //链接空闲的超时时间 IdleTimeout time.Duration //检查过期的链接的周期频率 IdleCheckFrequency time.Duration} 
// 包装net.conn的type Conn struct { netConn net.Conn
rd *proto.Reader rdLocked bool wr *proto.Writer //该链接是否初始化,比如如果需要执行命令之前需要执行的auth,select db 等的标识,代表已经auth,select过 Inited bool // 是否放进连接池中 pooled bool // 创建的时间,超过maxconnage的链接需要淘汰 createdAt time.Time // 该链接执行命令的时候所记录的时间,就是上次用过这个链接的时间点 usedAt atomic.Value} // 连接池的接口type Pooler interface { NewConn() (*Conn, error) CloseConn(*Conn) error
Get() (*Conn, error) Put(*Conn) Remove(*Conn, error)
Len() int IdleLen() int Stats() *Stats
Close() error} // 实现上面Pooler接口的实体连接池type ConnPool struct { opt *Options
dialErrorsNum uint32 // atomic
lastDialErrorMu sync.RWMutex lastDialError error // 一个带容量(poolsize)的阻塞队列, queue chan struct{} // 锁 connsMu sync.Mutex // 连接池中所有的链接 conns []*Conn // 连接池中空闲的链接 idleConns []*Conn // 池容量,如果没有可用的话要等待 poolSize int // 连接池空闲链接的数量 idleConnsLen int // 连接池的状态信息,业务可以获取到作为告警等用处 stats Stats // 连接池是否关闭 _closed uint32 // atomic}

 

 一般的网络应用包中,Client包封装都是将 connpool直接封在Client结构里,用的时候直接GET,用完PUT

下面我们从两个方面去看下go-redis对连接池的思路


1. 一个是服务启动连接池初始化


// NewClient returns a client to the Redis Server specified by Options.func NewClient(opt *Options) *Client { opt.init()
c := Client{ baseClient: baseClient{ opt: opt, connPool: newConnPool(opt), }, } c.baseClient.init() c.init()
return &c}
func newConnPool(opt *Options) *pool.ConnPool { return pool.NewConnPool(&pool.Options{ Dialer: opt.Dialer, PoolSize: opt.PoolSize, MinIdleConns: opt.MinIdleConns, MaxConnAge: opt.MaxConnAge, PoolTimeout: opt.PoolTimeout, IdleTimeout: opt.IdleTimeout, IdleCheckFrequency: opt.IdleCheckFrequency, })}// 服务启动的时候,redis客户端会在这里执行连接池的初始化func NewConnPool(opt *Options) *ConnPool { p := &ConnPool{ opt: opt, // client获取链接之前,会向这个chan写入数据,如果能够写入说明不用等待就可以获取链接,否则需要等待其他地方从这个chan取走数据才可以获取链接,假如获取链接成功的话,在用完的时候,要向这个chan取出一个struct{}不然的话就会让别人一直阻塞,go-redis用这种方法保证池中的链接数量不会超过poolsize queue: make(chan struct{}, opt.PoolSize), conns: make([]*Conn, 0, opt.PoolSize), idleConns: make([]*Conn, 0, opt.PoolSize), }
for i := 0; i < opt.MinIdleConns; i++ { // 按照配置的最小空闲链接数,往池中add MinIdleConns个链接 // 开启协程addIdleConn初始化链接的 p.checkMinIdleConns() }
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { // 修复链接池,无限循环判断链接是否过期,过期的链接清理掉 // 开启协程后台循环执行 go p.reaper(opt.IdleCheckFrequency) }
return p} // 往池中加新链接func (p *ConnPool) addIdleConn() { cn, err := p.newConn(true) if err != nil { return } // conns idleConns 是共享资源,需要加锁控制 p.connsMu.Lock() p.conns = append(p.conns, cn) p.idleConns = append(p.idleConns, cn) p.connsMu.Unlock()}
func NewConn(netConn net.Conn) *Conn { cn := &Conn{ netConn: netConn, // 链接的出生时间点 createdAt: time.Now(), } cn.rd = proto.NewReader(netConn) cn.wr = proto.NewWriter(netConn) // 链接上次使用的时间点 cn.SetUsedAt(time.Now()) return cn}// 修复链接池,无限循环判断链接是否过期,过期的链接清理掉func (p *ConnPool) reaper(frequency time.Duration) { ticker := time.NewTicker(frequency) defer ticker.Stop()
for range ticker.C { if p.closed() { break } n, err := p.ReapStaleConns() if err != nil { internal.Logf("ReapStaleConns failed: %s", err) continue } atomic.AddUint32(&p.stats.StaleConns, uint32(n)) }}// 无限循环判断链接是否过期func (p *ConnPool) ReapStaleConns() (int, error) { var n int for { // 需要向queue chan写进数据才能往下执行,否则就会阻塞,等queue有容量 p.getTurn()
p.connsMu.Lock() cn := p.reapStaleConn() p.connsMu.Unlock()
if cn != nil { p.removeConn(cn) } // 用完之后,就要从queue chan读取出你放进去的数据,让queue有容量写入 p.freeTurn()
if cn != nil { p.closeConn(cn) n++ } else { break } } return n, nil}// 每次总idleConns的切片头部取出一个来判断是否过期,如果过期的话,更新idleConns,并且关闭过期链接func (p *ConnPool) reapStaleConn() *Conn { if len(p.idleConns) == 0 { return nil }
cn := p.idleConns[0] if !p.isStaleConn(cn) { return nil }
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...) p.idleConnsLen--
return cn}
// 根据链接的出生时间点和上次使用的时间点,判断该链接是否过期func (p *ConnPool) isStaleConn(cn *Conn) bool { if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 { return false }
now := time.Now() if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout { return true } if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge { return true }
return false}
 

 至此go-redis连接池的初始化就完成了,主要工作有几个

1. 初始化配置的空闲链接数

2. 单独开个协程定时循环检查空闲链接池中的链接是否过期


2. 一个是redis-client执行命令之前获取链接和用完之后归还链接


func (c *baseClient) _getConn() (*pool.Conn, error) { cn, err := c.connPool.Get() if err != nil { return nil, err } //这里主要工作是当配置配了密码和DB的时候,这个链接之前命令之前要执行auth和select db命令 err = c.initConn(cn) if err != nil { c.connPool.Remove(cn, err) if err := internal.Unwrap(err); err != nil { return nil, err } return nil, err }
return cn, nil}
// Get returns existed connection from the pool or creates a new one.func (p *ConnPool) Get() (*Conn, error) { if p.closed() { return nil, ErrClosed } // 如果能够写入说明不用等待就可以获取链接,否则需要等待其他地方从这个chan取走数据才可以获取链接,假如获取链接成功的话,在用完的时候,要向这个chan取出一个struct{}不然的话就会让别人一直阻塞(如果在pooltimeout时间内没有等待到,就会超时返回),go-redis用这种方法保证池中的链接数量不会超过poolsize err := p.waitTurn() if err != nil { return nil, err }
for { // 共享资源,操作要加锁 p.connsMu.Lock() cn := p.popIdle() p.connsMu.Unlock()
if cn == nil { break } // 判断从空闲链接切片中拿出来的链接是否过期,兜底 if p.isStaleConn(cn) { _ = p.CloseConn(cn) continue } // 命中统计 atomic.AddUint32(&p.stats.Hits, 1) return cn, nil } // 如果没有空闲链接的话,就重新拨号建一个 atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p._NewConn(true) if err != nil { // 获取链接后,要释放掉开始往queue队列里面放的数据 p.freeTurn() return nil, err }
return newcn, nil}// queue这里的功能固定数量的令牌桶(获取conn链接的令牌),用之前拿,用完之后放回,不会增加令牌数量也不会减少。func (p *ConnPool) getTurn() { p.queue <- struct{}{}}// 等候获取queue中的令牌func (p *ConnPool) waitTurn() error { select { case p.queue <- struct{}{}: return nil default: timer := timers.Get().(*time.Timer) timer.Reset(p.opt.PoolTimeout)
select { case p.queue <- struct{}{}: if !timer.Stop() { <-timer.C } timers.Put(timer) return nil case <-timer.C: timers.Put(timer) atomic.AddUint32(&p.stats.Timeouts, 1) return ErrPoolTimeout } }}// 放回令牌func (p *ConnPool) freeTurn() { <-p.queue}
// 链接用完之后(获取服务端响应后),要放回Pool中,最后放回令牌// 一般链接用完之后都是放回空闲链接切片里func (p *ConnPool) Put(cn *Conn) { if !cn.pooled { p.Remove(cn, nil) return }
p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) p.idleConnsLen++ p.connsMu.Unlock() p.freeTurn()}

 至此go-redis的client执行命令的时候,主要做了下面3个工作

1. 从连接池中获取链接

2. 构造命令协议,往conn链接中写入命令协议

3. 获取redis服务端响应后,把链接放回连接池


go-redis 优秀的方面


1. 针对单个链接有最大的生存时间概念

2. 针对单个链接有距离上次活跃的时间,最大的空闲时间概念

3. 单独有协程会清理上面2种过期的链接,这样可以让pool链接都保持最新( go-mysql中的连接池还没有这个功能,mysql服务端那边会把超过一定空闲的连接断掉,这也算是go-redis的一个比较好的有点 )

4. queue chan conn,用这样一个固定容量的信道,可以把它比作一个固定容量的令牌桶,用来限制控制池中最大的链接数量

5.  go-redis把字符串转成[]byte,用的指针转换的方式,减少不必要的对象资源的分配减少GC的数量。

// BytesToString converts byte slice to string.func BytesToString(b []byte) string { return *(*string)(unsafe.Pointer(&b))}
// StringToBytes converts string to byte slice.func StringToBytes(s string) []byte { return *(*[]byte)(unsafe.Pointer( &struct { string Cap int }{s, len(s)}, ))}

 

go-redis 不足的地方


本着优点也是缺点的论点,我还是提个不足的点

虽然清理那些超过生命周期的链接是好的方面,但是还是可以从这里在优化一下,例如虽然这个链接超过了业务自定义的生存的生命周期,但是如果这个链接本身还是有效的,这样就不是浪费一个有效的链接,而且有必要的话还要创建一个新的链接,这样的话是不是有些浪费系统资源

单纯从不释放掉有效的链接方面的解决方案

对每个链接都定时去ping一下服务端,记录下记录,剔除掉已经断开的链接。(grpc维护的连接池是这样实现的)


至此,文章到这里也结束了,如有说的不对的地方,希望大家指出和交流。