Go Redis 客户端源码阅读(2)连接池初始化
初始化连接池的核心代码如下
client.pool = &redis.Pool{MaxIdle: client.MaxIdle,IdleTimeout: time.Duration(client.IdleTimeoutS) * time.Second,MaxActive: client.MaxActive,Dial: func() (redis.Conn, error) {var c redis.Connvar err errorfor i := 0; i < len(client.Servers)+1; i++ {//随机挑选一个IPindex := common.RandIntn(len(client.Servers))client.current_index = indexc, err = redis.DialTimeout("tcp", client.Servers[index],time.Duration(client.ConnTimeoutMs)*time.Millisecond,time.Duration(client.ReadTimeoutMs)*time.Millisecond,time.Duration(client.WriteTimeoutMs)*time.Millisecond)}}}
除了超时和最大活跃连接数最大空闲连接数外,最重要的就是指定连接函数。
连接函数只是定义,在用的时候才连接,连接的调用链路如下:
conn.go
redis.DialTimeout(){}
func Dial(network, address string, options ...DialOption) (Conn, error) {
然后调用
net.Dial的dial函数进行tcp连接,接着
"AUTH"验证和db选择
"SELECT"
返回一个连接
连接池的使用:
conn := client.pool.Get()defer conn.Close()_, err := conn.Do("SET", key, value)
1,从池中捞一个链接
2,发送
3,放回池子
连接池的定义
type Pool struct {// Dial is an application supplied function for creating and configuring a// connection.//// The connection returned from Dial must not be in a special state// (subscribed to pubsub channel, transaction started, ...).Dial func() (Conn, error) //连接函数// TestOnBorrow is an optional application supplied function for checking// the health of an idle connection before the connection is used again by// the application. Argument t is the time that the connection was returned// to the pool. If the function returns an error, then the connection is// closed.TestOnBorrow func(c Conn, t time.Time) error//每次从连接池取出连接的时候,检查连接的健康度,如果放回错误,则释放这个连接// Maximum number of idle connections in the pool.MaxIdle int //最大空闲连接// Maximum number of connections allocated by the pool at a given time.// When zero, there is no limit on the number of connections in the pool.MaxActive int //如果是0 无限制,非0 一定时间端内,池子则最大的连接数// Close connections after remaining idle for this duration. If the value// is zero, then idle connections are not closed. Applications should set// the timeout to a value less than the server's timeout.IdleTimeout time.Duration //如果是0 连接不关闭,非0 ,剩余关闭时间// If Wait is true and the pool is at the MaxActive limit, then Get() waits// for a connection to be returned to the pool before returning.Wait bool //当Wait 为true 时,并且池子则最大活跃连接数达到最大限制,获取连接的方法需要等待,有连接被放回池子,才能使用// mu protects fields defined below.mu sync.Mutexcond *sync.Condclosed boolactive int// Stack of idleConn with most recently used at the front.idle list.List //存放空闲连接的链表}
获取可用连接函数(放回的连接用完后,需要用户自己释放)
其实这里返回的连接不是最原始的连接,而是池化连接
type pooledConnection struct {p *Poolc Connstate int}
即对原始连接进行了包装
// Get gets a connection. The application must close the returned connection.// This method always returns a valid connection so that applications can defer// error handling to the first use of the connection. If there is an error// getting an underlying connection, then the connection Err, Do, Send, Flush// and Receive methods return that error.func (p *Pool) Get() Conn {c, err := p.get()if err != nil {return errorConnection{err}}return &pooledConnection{p: p, c: c}}//释放func (pc *pooledConnection) Close() error {c := pc.cif _, ok := c.(errorConnection); ok {return nil}pc.c = errorConnection{errConnClosed}if pc.state&internal.MultiState != 0 {c.Send("DISCARD")pc.state &^= (internal.MultiState | internal.WatchState)} else if pc.state&internal.WatchState != 0 {c.Send("UNWATCH")pc.state &^= internal.WatchState}if pc.state&internal.SubscribeState != 0 {c.Send("UNSUBSCRIBE")c.Send("PUNSUBSCRIBE")// To detect the end of the message stream, ask the server to echo// a sentinel value and read until we see that value.sentinelOnce.Do(initSentinel)c.Send("ECHO", sentinel)c.Flush()for {p, err := c.Receive()if err != nil {break}if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {pc.state &^= internal.SubscribeStatebreak}}}c.Do("")pc.p.put(c, pc.state != 0)return nil}
// get prunes stale connections and returns a connection from the idle list or// creates a new connection.func (p *Pool) get() (Conn, error) {p.mu.Lock()// Prune stale connections.if timeout := p.IdleTimeout; timeout > 0 {for i, n := 0, p.idle.Len(); i < n; i++ {e := p.idle.Back()if e == nil {//没有空闲连接了break}ic := e.Value.(idleConn)if ic.t.Add(timeout).After(nowFunc()) {//连接已经超时break}p.idle.Remove(e) //从空闲连接中移除p.release()//1,空闲连接数减一 2,给所有等待获取连接的协程发信号p.mu.Unlock()ic.c.Close()//以下几种,状态特殊处理,//最后将连接 重新放回连接池头部 ,如果达到最大连接数,则挤掉尾部连接,并放回//const (//WatchState = 1 << iota//MultiState//SubscribeState//MonitorState//)p.mu.Lock()}}for {// Get idle connection.for i, n := 0, p.idle.Len(); i < n; i++ {e := p.idle.Front()if e == nil {break}ic := e.Value.(idleConn)p.idle.Remove(e)//从空闲连接中取出test := p.TestOnBorrow //检查连接是否可用p.mu.Unlock()if test == nil || test(ic.c, ic.t) == nil {return ic.c, nil //可用就直接放回}ic.c.Close() //关闭不可用连接,(放回链表头部)p.mu.Lock()p.release()}// Check for pool closed before dialing a new connection.if p.closed {p.mu.Unlock()return nil, errors.New("redigo: get on closed pool")}// Dial new connection if under limit.if p.MaxActive == 0 || p.active < p.MaxActive {dial := p.Dial // 没有达到最大活跃连接数,重新生成一个连接,并返回p.active += 1p.mu.Unlock()c, err := dial()if err != nil {p.mu.Lock()p.release()p.mu.Unlock()c = nil}return c, err}if !p.Wait {p.mu.Unlock()return nil, ErrPoolExhausted}if p.cond == nil {p.cond = sync.NewCond(&p.mu)}p.cond.Wait() //循环中等待事件发生}}func (p *Pool) put(c Conn, forceClose bool) error {err := c.Err()p.mu.Lock()if !p.closed && err == nil && !forceClose {p.idle.PushFront(idleConn{t: nowFunc(), c: c})if p.idle.Len() > p.MaxIdle {//达到最大空闲连接数,将队列尾部的连接,弹出c = p.idle.Remove(p.idle.Back()).(idleConn).c} else {c = nil}}if c == nil {if p.cond != nil {p.cond.Signal() //没有达到最大空闲连接数,发信号,重新生成一个连接}p.mu.Unlock()return nil}p.release()p.mu.Unlock()return c.Close() //达到最大连接数,关闭连接}
推荐阅读
喜欢本文的朋友,欢迎关注“Go语言中文网”:
