vlambda博客
学习文章列表

Go Redis 客户端源码阅读(2)连接池初始化

初始化连接池的核心代码如下

 client.pool = &redis.Pool{    MaxIdle:     client.MaxIdle,    IdleTimeout: time.Duration(client.IdleTimeoutS) * time.Second,    MaxActive:   client.MaxActive,    Dial: func() (redis.Conn, error) { var c redis.Conn var err error for i := 0; i < len(client.Servers)+1; i++ { //随机挑选一个IP        index := common.RandIntn(len(client.Servers)) client.current_index = index c, err = redis.DialTimeout("tcp", client.Servers[index], time.Duration(client.ConnTimeoutMs)*time.Millisecond, time.Duration(client.ReadTimeoutMs)*time.Millisecond, time.Duration(client.WriteTimeoutMs)*time.Millisecond)          }        }        }

除了超时和最大活跃连接数最大空闲连接数外,最重要的就是指定连接函数。

连接函数只是定义,在用的时候才连接,连接的调用链路如下:

conn.go

redis.DialTimeout(){}

func Dial(network, address string, options ...DialOption) (Conn, error) {

然后调用

 net.Dial的dial函数进行tcp连接,接着
"AUTH"验证和db选择
"SELECT"

返回一个连接


连接池的使用:

conn := client.pool.Get()  defer conn.Close()  _, err := conn.Do("SET", key, value)

1,从池中捞一个链接

2,发送

3,放回池子

连接池的定义

type Pool struct {
// Dial is an application supplied function for creating and configuring a // connection. // // The connection returned from Dial must not be in a special state // (subscribed to pubsub channel, transaction started, ...).  Dial func() (Conn, error) //连接函数
// TestOnBorrow is an optional application supplied function for checking // the health of an idle connection before the connection is used again by // the application. Argument t is the time that the connection was returned // to the pool. If the function returns an error, then the connection is // closed.  TestOnBorrow func(c Conn, t time.Time) error   //每次从连接池取出连接的时候,检查连接的健康度,如果放回错误,则释放这个连接
// Maximum number of idle connections in the pool.  MaxIdle int  //最大空闲连接
// Maximum number of connections allocated by the pool at a given time. // When zero, there is no limit on the number of connections in the pool.  MaxActive int  //如果是0 无限制,非0 一定时间端内,池子则最大的连接数
// Close connections after remaining idle for this duration. If the value // is zero, then idle connections are not closed. Applications should set // the timeout to a value less than the server's timeout.  IdleTimeout time.Duration //如果是0 连接不关闭,非0 ,剩余关闭时间
// If Wait is true and the pool is at the MaxActive limit, then Get() waits // for a connection to be returned to the pool before returning.  Wait bool  //当Wait 为true 时,并且池子则最大活跃连接数达到最大限制,获取连接的方法需要等待,有连接被放回池子,才能使用
// mu protects fields defined below. mu sync.Mutex cond *sync.Cond closed bool active int
// Stack of idleConn with most recently used at the front. idle list.List //存放空闲连接的链表}

获取可用连接函数(放回的连接用完后,需要用户自己释放)

其实这里返回的连接不是最原始的连接,而是池化连接

type pooledConnection struct { p *Pool c Conn state int}

即对原始连接进行了包装

// Get gets a connection. The application must close the returned connection.// This method always returns a valid connection so that applications can defer// error handling to the first use of the connection. If there is an error// getting an underlying connection, then the connection Err, Do, Send, Flush// and Receive methods return that error.func (p *Pool) Get() Conn { c, err := p.get() if err != nil { return errorConnection{err} } return &pooledConnection{p: p, cc}}
//释放func (pc *pooledConnection) Close() error { c := pc.c if _, ok := c.(errorConnection); ok { return nil } pc.c = errorConnection{errConnClosed}
if pc.state&internal.MultiState != 0 { c.Send("DISCARD") pc.state &^= (internal.MultiState | internal.WatchState) } else if pc.state&internal.WatchState != 0 { c.Send("UNWATCH") pc.state &^= internal.WatchState } if pc.state&internal.SubscribeState != 0 { c.Send("UNSUBSCRIBE") c.Send("PUNSUBSCRIBE") // To detect the end of the message stream, ask the server to echo // a sentinel value and read until we see that value. sentinelOnce.Do(initSentinel) c.Send("ECHO", sentinel) c.Flush() for {      p, err := c.Receive() if err != nil { break } if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) { pc.state &^= internal.SubscribeState break } } } c.Do("") pc.p.put(c, pc.state != 0) return nil}

// get prunes stale connections and returns a connection from the idle list or// creates a new connection.func (p *Pool) get() (Conn, error) { p.mu.Lock()
// Prune stale connections.
if timeout := p.IdleTimeout; timeout > 0 { for i, n := 0, p.idle.Len(); i < n; i++ {      e := p.idle.Back()      if e == nil {//没有空闲连接了 break }      ic := e.Value.(idleConn)      if ic.t.Add(timeout).After(nowFunc()) {//连接已经超时 break }      p.idle.Remove(e) //从空闲连接中移除      p.release()//1,空闲连接数减一 2,给所有等待获取连接的协程发信号 p.mu.Unlock()      ic.c.Close()//以下几种,状态特殊处理,      //最后将连接 重新放回连接池头部 ,如果达到最大连接数,则挤掉尾部连接,并放回       //const ( //WatchState = 1 << iota //MultiState //SubscribeState //MonitorState //) p.mu.Lock() } }
for {
// Get idle connection.
for i, n := 0, p.idle.Len(); i < n; i++ {      e := p.idle.Front() if e == nil { break }      ic := e.Value.(idleConn)      p.idle.Remove(e)//从空闲连接中取出      test := p.TestOnBorrow //检查连接是否可用 p.mu.Unlock() if test == nil || test(ic.c, ic.t) == nil {        return ic.cnil  //可用就直接放回 }      ic.c.Close()  //关闭不可用连接,(放回链表头部) p.mu.Lock() p.release() }
// Check for pool closed before dialing a new connection.
if p.closed { p.mu.Unlock() return nil, errors.New("redigo: get on closed pool") }
// Dial new connection if under limit.
if p.MaxActive == 0 || p.active < p.MaxActive {      dial := p.Dial // 没有达到最大活跃连接数,重新生成一个连接,并返回 p.active += 1 p.mu.Unlock() c, err := dial() if err != nil { p.mu.Lock() p.release() p.mu.Unlock() c = nil } return c, err }
if !p.Wait { p.mu.Unlock() return nil, ErrPoolExhausted }
if p.cond == nil { p.cond = sync.NewCond(&p.mu) }    p.cond.Wait() //循环中等待事件发生 }}
func (p *Pool) put(c Conn, forceClose bool) error {  err := c.Err() p.mu.Lock() if !p.closed && err == nil && !forceClose { p.idle.PushFront(idleConn{t: nowFunc(), cc}) if p.idle.Len() > p.MaxIdle {//达到最大空闲连接数,将队列尾部的连接,弹出 c = p.idle.Remove(p.idle.Back()).(idleConn).c } else { c = nil } }
if c == nil { if p.cond != nil {      p.cond.Signal()   //没有达到最大空闲连接数,发信号,重新生成一个连接 } p.mu.Unlock() return nil }
p.release() p.mu.Unlock()   return c.Close() //达到最大连接数,关闭连接}



推荐阅读




喜欢本文的朋友,欢迎关注“Go语言中文网