Go Redis 客户端源码阅读(2)连接池初始化
初始化连接池的核心代码如下
client.pool = &redis.Pool{
MaxIdle: client.MaxIdle,
IdleTimeout: time.Duration(client.IdleTimeoutS) * time.Second,
MaxActive: client.MaxActive,
Dial: func() (redis.Conn, error) {
var c redis.Conn
var err error
for i := 0; i < len(client.Servers)+1; i++ {
//随机挑选一个IP
index := common.RandIntn(len(client.Servers))
client.current_index = index
c, err = redis.DialTimeout("tcp", client.Servers[index],
time.Duration(client.ConnTimeoutMs)*time.Millisecond,
time.Duration(client.ReadTimeoutMs)*time.Millisecond,
time.Duration(client.WriteTimeoutMs)*time.Millisecond)
}
}
}
除了超时和最大活跃连接数最大空闲连接数外,最重要的就是指定连接函数。
连接函数只是定义,在用的时候才连接,连接的调用链路如下:
conn.go
redis.DialTimeout(){}
func Dial(network, address string, options ...DialOption) (Conn, error) {
然后调用
net.Dial的dial函数进行tcp连接,接着
"AUTH"验证和db选择
"SELECT"
返回一个连接
连接池的使用:
conn := client.pool.Get()
defer conn.Close()
_, err := conn.Do("SET", key, value)
1,从池中捞一个链接
2,发送
3,放回池子
连接池的定义
type Pool struct {
// Dial is an application supplied function for creating and configuring a
// connection.
//
// The connection returned from Dial must not be in a special state
// (subscribed to pubsub channel, transaction started, ...).
Dial func() (Conn, error) //连接函数
// TestOnBorrow is an optional application supplied function for checking
// the health of an idle connection before the connection is used again by
// the application. Argument t is the time that the connection was returned
// to the pool. If the function returns an error, then the connection is
// closed.
TestOnBorrow func(c Conn, t time.Time) error
//每次从连接池取出连接的时候,检查连接的健康度,如果放回错误,则释放这个连接
// Maximum number of idle connections in the pool.
MaxIdle int //最大空闲连接
// Maximum number of connections allocated by the pool at a given time.
// When zero, there is no limit on the number of connections in the pool.
MaxActive int //如果是0 无限制,非0 一定时间端内,池子则最大的连接数
// Close connections after remaining idle for this duration. If the value
// is zero, then idle connections are not closed. Applications should set
// the timeout to a value less than the server's timeout.
IdleTimeout time.Duration //如果是0 连接不关闭,非0 ,剩余关闭时间
// If Wait is true and the pool is at the MaxActive limit, then Get() waits
// for a connection to be returned to the pool before returning.
Wait bool //当Wait 为true 时,并且池子则最大活跃连接数达到最大限制,获取连接的方法需要等待,有连接被放回池子,才能使用
// mu protects fields defined below.
mu sync.Mutex
cond *sync.Cond
closed bool
active int
// Stack of idleConn with most recently used at the front.
idle list.List //存放空闲连接的链表
}
获取可用连接函数(放回的连接用完后,需要用户自己释放)
其实这里返回的连接不是最原始的连接,而是池化连接
type pooledConnection struct {
p *Pool
c Conn
state int
}
即对原始连接进行了包装
// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get() Conn {
c, err := p.get()
if err != nil {
return errorConnection{err}
}
return &pooledConnection{p: p, c: c}
}
//释放
func (pc *pooledConnection) Close() error {
c := pc.c
if _, ok := c.(errorConnection); ok {
return nil
}
pc.c = errorConnection{errConnClosed}
if pc.state&internal.MultiState != 0 {
c.Send("DISCARD")
pc.state &^= (internal.MultiState | internal.WatchState)
} else if pc.state&internal.WatchState != 0 {
c.Send("UNWATCH")
pc.state &^= internal.WatchState
}
if pc.state&internal.SubscribeState != 0 {
c.Send("UNSUBSCRIBE")
c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
c.Send("ECHO", sentinel)
c.Flush()
for {
p, err := c.Receive()
if err != nil {
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
pc.state &^= internal.SubscribeState
break
}
}
}
c.Do("")
pc.p.put(c, pc.state != 0)
return nil
}
// get prunes stale connections and returns a connection from the idle list or
// creates a new connection.
func (p *Pool) get() (Conn, error) {
p.mu.Lock()
// Prune stale connections.
if timeout := p.IdleTimeout; timeout > 0 {
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Back()
if e == nil {//没有空闲连接了
break
}
ic := e.Value.(idleConn)
if ic.t.Add(timeout).After(nowFunc()) {//连接已经超时
break
}
p.idle.Remove(e) //从空闲连接中移除
p.release()//1,空闲连接数减一 2,给所有等待获取连接的协程发信号
p.mu.Unlock()
ic.c.Close()//以下几种,状态特殊处理,
//最后将连接 重新放回连接池头部 ,如果达到最大连接数,则挤掉尾部连接,并放回
//const (
//WatchState = 1 << iota
//MultiState
//SubscribeState
//MonitorState
//)
p.mu.Lock()
}
}
for {
// Get idle connection.
for i, n := 0, p.idle.Len(); i < n; i++ {
e := p.idle.Front()
if e == nil {
break
}
ic := e.Value.(idleConn)
p.idle.Remove(e)//从空闲连接中取出
test := p.TestOnBorrow //检查连接是否可用
p.mu.Unlock()
if test == nil || test(ic.c, ic.t) == nil {
return ic.c, nil //可用就直接放回
}
ic.c.Close() //关闭不可用连接,(放回链表头部)
p.mu.Lock()
p.release()
}
// Check for pool closed before dialing a new connection.
if p.closed {
p.mu.Unlock()
return nil, errors.New("redigo: get on closed pool")
}
// Dial new connection if under limit.
if p.MaxActive == 0 || p.active < p.MaxActive {
dial := p.Dial // 没有达到最大活跃连接数,重新生成一个连接,并返回
p.active += 1
p.mu.Unlock()
c, err := dial()
if err != nil {
p.mu.Lock()
p.release()
p.mu.Unlock()
c = nil
}
return c, err
}
if !p.Wait {
p.mu.Unlock()
return nil, ErrPoolExhausted
}
if p.cond == nil {
p.cond = sync.NewCond(&p.mu)
}
p.cond.Wait() //循环中等待事件发生
}
}
func (p *Pool) put(c Conn, forceClose bool) error {
err := c.Err()
p.mu.Lock()
if !p.closed && err == nil && !forceClose {
p.idle.PushFront(idleConn{t: nowFunc(), c: c})
if p.idle.Len() > p.MaxIdle {//达到最大空闲连接数,将队列尾部的连接,弹出
c = p.idle.Remove(p.idle.Back()).(idleConn).c
} else {
c = nil
}
}
if c == nil {
if p.cond != nil {
p.cond.Signal() //没有达到最大空闲连接数,发信号,重新生成一个连接
}
p.mu.Unlock()
return nil
}
p.release()
p.mu.Unlock()
return c.Close() //达到最大连接数,关闭连接
}
推荐阅读
喜欢本文的朋友,欢迎关注“Go语言中文网”: