go-redis 连接池实现思想
本着 ”拿来“ 主义,写了这篇文章,主要是用来做记录日常工作,只是拿来别人的思想,提高自己处理问题的能力,和扩展自己解决问题和搬砖的思路,也希望可以给别人带来一些思路。
为什么会有redis连接池的概念
在传统的C/S模型中,客户端和服务端需要通信的话就要建立连接,在这里每次连接都需要花费不少的资源,例如常说的TCP三次握手等。随着网络和互联网应用越来越庞大和集中,单个请求对应单个链接的模式已经满足不了高并发的场景了,所以连接池的概念就出现了,基本上网络相关的连接池都是在客户端这边维护。
连接池则可以实现在客户端建立多个链接并且不释放,当需要使用链接的时候通过一定的算法获取已经建立的链接,使用完了以后则还给连接池,这就免去了TCP建立连接所带来的系统调用所消耗的时间。上面都是瞎BB,不想看的直接跳到下面看正文。
go-redis 连接池设计思路
go-redis也提供了三种对应服务端的客户端模式,集群,哨兵,和单机模式, 这里我们主要从连接池方面了解下,所以三种模式在连接池这一块都是公用的。
先列一下基本的代码
https://github.com/go-redis/redis/blob/master/internal/pool/pool.go
type Options struct {
//建立连接函数
Dialer func() (net.Conn, error)
//在链接关闭的时候的回调
OnClose func(*Conn) error
//连接池中的链接的最大数量
PoolSize int
//最小的空闲链接的数量
MinIdleConns int
//单个链接的最大生命周期
MaxConnAge time.Duration
//从连接池获取链接的超时时间
PoolTimeout time.Duration
//链接空闲的超时时间
IdleTimeout time.Duration
//检查过期的链接的周期频率
IdleCheckFrequency time.Duration
}
// 包装net.conn的
type Conn struct {
netConn net.Conn
rd *proto.Reader
rdLocked bool
wr *proto.Writer
//该链接是否初始化,比如如果需要执行命令之前需要执行的auth,select db 等的标识,代表已经auth,select过
Inited bool
// 是否放进连接池中
pooled bool
// 创建的时间,超过maxconnage的链接需要淘汰
createdAt time.Time
// 该链接执行命令的时候所记录的时间,就是上次用过这个链接的时间点
usedAt atomic.Value
}
// 连接池的接口
type Pooler interface {
NewConn() (*Conn, error)
CloseConn(*Conn) error
Get() (*Conn, error)
Put(*Conn)
Remove(*Conn, error)
Len() int
IdleLen() int
Stats() *Stats
Close() error
}
// 实现上面Pooler接口的实体连接池
type ConnPool struct {
opt *Options
dialErrorsNum uint32 // atomic
lastDialErrorMu sync.RWMutex
lastDialError error
// 一个带容量(poolsize)的阻塞队列,
queue chan struct{}
// 锁
connsMu sync.Mutex
// 连接池中所有的链接
conns []*Conn
// 连接池中空闲的链接
idleConns []*Conn
// 池容量,如果没有可用的话要等待
poolSize int
// 连接池空闲链接的数量
idleConnsLen int
// 连接池的状态信息,业务可以获取到作为告警等用处
stats Stats
// 连接池是否关闭
_closed uint32 // atomic
}
一般的网络应用包中,Client包封装都是将 connpool直接封在Client结构里,用的时候直接GET,用完PUT
下面我们从两个方面去看下go-redis对连接池的思路
1. 一个是服务启动连接池初始化
// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {
opt.init()
c := Client{
baseClient: baseClient{
opt: opt,
connPool: newConnPool(opt),
},
}
c.baseClient.init()
c.init()
return &c
}
func newConnPool(opt *Options) *pool.ConnPool {
return pool.NewConnPool(&pool.Options{
Dialer: opt.Dialer,
PoolSize: opt.PoolSize,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
})
}
// 服务启动的时候,redis客户端会在这里执行连接池的初始化
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
// client获取链接之前,会向这个chan写入数据,如果能够写入说明不用等待就可以获取链接,
否则需要等待其他地方从这个chan取走数据才可以获取链接,假如获取链接成功的话,在用完的时候,要向这个chan取出一个struct{}
不然的话就会让别人一直阻塞,go-redis用这种方法保证池中的链接数量不会超过poolsize
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
idleConns: make([]*Conn, 0, opt.PoolSize),
}
for i := 0; i < opt.MinIdleConns; i++ {
// 按照配置的最小空闲链接数,往池中add MinIdleConns个链接
// 开启协程addIdleConn初始化链接的
p.checkMinIdleConns()
}
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
// 修复链接池,无限循环判断链接是否过期,过期的链接清理掉
// 开启协程后台循环执行
go p.reaper(opt.IdleCheckFrequency)
}
return p
}
// 往池中加新链接
func (p *ConnPool) addIdleConn() {
cn, err := p.newConn(true)
if err != nil {
return
}
// conns idleConns 是共享资源,需要加锁控制
p.connsMu.Lock()
p.conns = append(p.conns, cn)
p.idleConns = append(p.idleConns, cn)
p.connsMu.Unlock()
}
func NewConn(netConn net.Conn) *Conn {
cn := &Conn{
netConn: netConn,
// 链接的出生时间点
createdAt: time.Now(),
}
cn.rd = proto.NewReader(netConn)
cn.wr = proto.NewWriter(netConn)
// 链接上次使用的时间点
cn.SetUsedAt(time.Now())
return cn
}
// 修复链接池,无限循环判断链接是否过期,过期的链接清理掉
func (p *ConnPool) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for range ticker.C {
if p.closed() {
break
}
n, err := p.ReapStaleConns()
if err != nil {
internal.Logf("ReapStaleConns failed: %s", err)
continue
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
}
}
// 无限循环判断链接是否过期
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
// 需要向queue chan写进数据才能往下执行,否则就会阻塞,等queue有容量
p.getTurn()
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()
if cn != nil {
p.removeConn(cn)
}
// 用完之后,就要从queue chan读取出你放进去的数据,让queue有容量写入
p.freeTurn()
if cn != nil {
p.closeConn(cn)
n++
} else {
break
}
}
return n, nil
}
// 每次总idleConns的切片头部取出一个来判断是否过期,如果过期的话,更新idleConns,并且关闭过期链接
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
return cn
}
// 根据链接的出生时间点和上次使用的时间点,判断该链接是否过期
func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
return false
}
now := time.Now()
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
return true
}
if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
return true
}
return false
}
至此go-redis连接池的初始化就完成了,主要工作有几个
1. 初始化配置的空闲链接数
2. 单独开个协程定时循环检查空闲链接池中的链接是否过期
2. 一个是redis-client执行命令之前获取链接和用完之后归还链接
func (c *baseClient) _getConn() (*pool.Conn, error) {
cn, err := c.connPool.Get()
if err != nil {
return nil, err
}
//这里主要工作是当配置配了密码和DB的时候,这个链接之前命令之前要执行auth和select db命令
err = c.initConn(cn)
if err != nil {
c.connPool.Remove(cn, err)
if err := internal.Unwrap(err); err != nil {
return nil, err
}
return nil, err
}
return cn, nil
}
// Get returns existed connection from the pool or creates a new one.
func (p *ConnPool) Get() (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
// 如果能够写入说明不用等待就可以获取链接,
否则需要等待其他地方从这个chan取走数据才可以获取链接,假如获取链接成功的话,在用完的时候,要向这个chan取出一个struct{}
不然的话就会让别人一直阻塞(如果在pooltimeout时间内没有等待到,就会超时返回),go-redis用这种方法保证池中的链接数量
不会超过poolsize
err := p.waitTurn()
if err != nil {
return nil, err
}
for {
// 共享资源,操作要加锁
p.connsMu.Lock()
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
break
}
// 判断从空闲链接切片中拿出来的链接是否过期,兜底
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
// 命中统计
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
// 如果没有空闲链接的话,就重新拨号建一个
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p._NewConn(true)
if err != nil {
// 获取链接后,要释放掉开始往queue队列里面放的数据
p.freeTurn()
return nil, err
}
return newcn, nil
}
// queue这里的功能固定数量的令牌桶(获取conn链接的令牌),用之前拿,用完之后放回,不会增加令牌数量也不会减少。
func (p *ConnPool) getTurn() {
p.queue <- struct{}{}
}
// 等候获取queue中的令牌
func (p *ConnPool) waitTurn() error {
select {
case p.queue <- struct{}{}:
return nil
default:
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)
select {
case p.queue <- struct{}{}:
if !timer.Stop() {
<-timer.C
}
timers.Put(timer)
return nil
case <-timer.C:
timers.Put(timer)
atomic.AddUint32(&p.stats.Timeouts, 1)
return ErrPoolTimeout
}
}
}
// 放回令牌
func (p *ConnPool) freeTurn() {
<-p.queue
}
// 链接用完之后(获取服务端响应后),要放回Pool中,最后放回令牌
// 一般链接用完之后都是放回空闲链接切片里
func (p *ConnPool) Put(cn *Conn) {
if !cn.pooled {
p.Remove(cn, nil)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
p.freeTurn()
}
至此go-redis的client执行命令的时候,主要做了下面3个工作
1. 从连接池中获取链接
2. 构造命令协议,往conn链接中写入命令协议
3. 获取redis服务端响应后,把链接放回连接池
go-redis 优秀的方面
1. 针对单个链接有最大的生存时间概念
2. 针对单个链接有距离上次活跃的时间,最大的空闲时间概念
3. 单独有协程会清理上面2种过期的链接,这样可以让pool链接都保持最新( go-mysql中的连接池还没有这个功能,mysql服务端那边会把超过一定空闲的连接断掉,这也算是go-redis的一个比较好的有点 )
4. queue chan conn,用这样一个固定容量的信道,可以把它比作一个固定容量的令牌桶,用来限制控制池中最大的链接数量
5. go-redis把字符串转成[]byte,用的指针转换的方式,减少不必要的对象资源的分配减少GC的数量。
// BytesToString converts byte slice to string.
func BytesToString(b []byte) string {
return *(*string)(unsafe.Pointer(&b))
}
// StringToBytes converts string to byte slice.
func StringToBytes(s string) []byte {
return *(*[]byte)(unsafe.Pointer(
&struct {
string
Cap int
}{s, len(s)},
))
}
go-redis 不足的地方
本着优点也是缺点的论点,我还是提个不足的点
虽然清理那些超过生命周期的链接是好的方面,但是还是可以从这里在优化一下,例如虽然这个链接超过了业务自定义的生存的生命周期,但是如果这个链接本身还是有效的,这样就不是浪费一个有效的链接,而且有必要的话还要创建一个新的链接,这样的话是不是有些浪费系统资源
单纯从不释放掉有效的链接方面的解决方案
对每个链接都定时去ping一下服务端,记录下记录,剔除掉已经断开的链接。(grpc维护的连接池是这样实现的)
至此,文章到这里也结束了,如有说的不对的地方,希望大家指出和交流。