peewee连接池使用指南
peewee是一个小巧的ORM模型,相对于SQLAlchemy而言,优点如下
语法简洁
更好的性能
丰富的文档
使用SQLAlchemy,可以处理各种复杂的查询和表间关系,有更多的高级特性,但是弄明白这些特性可能需要几倍于peewee的时间,相对而言,peewee提供组件化、易于理解和学习的API;更重要的是有丰富的文档和示例。这对于快速构建一个项目是非常关键的。我们在技术选型时,不仅要考虑功能性,更多的要考虑易用性、可维护性、项目的周期等等。
为什么要使用DB连接池?
限制对DB的并发访问,减少高并发导致的DB连接数达到最大值
可以控制可用连接的数量,可以提前创建连接,连接可复用且减少创建连接的性能开销
是否需要提前创建连接?
可以在服务启动之前预先创建多个连接,避免在运行时连接创建开销,在连接失效时,主动ping()进行重连
如果服务连接多个DB,需要知道连接哪些数据库,配置信息需要提前注册
如果不在意第一次创建连接的开销,可以在有新的请求时再创建新的连接,连接释放时放到空闲队列
风险是在扩容时,会导致很快db连接数的最大值
分析peewee连接池的设计
这里分析peewee-3.13.3的源码,以PooledMySQLDatabase为例
peewee/playhouse/pool.py
class PooledDatabase(object):
def __init__(self, database, max_connections=20, stale_timeout=None,
timeout=None, **kwargs):
self._max_connections = make_int(max_connections)
self._stale_timeout = make_int(stale_timeout)
self._wait_timeout = make_int(timeout)
if self._wait_timeout == 0:
self._wait_timeout = float('inf')
# Available / idle connections stored in a heap, sorted oldest first.
self._connections = []
# Mapping of connection id to PoolConnection. Ordinarily we would want
# to use something like a WeakKeyDictionary, but Python typically won't
# allow us to create weak references to connection objects.
self._in_use = {}
# Use the memory address of the connection as the key in the event the
# connection object is not hashable. Connections will not get
# garbage-collected, however, because a reference to them will persist
# in "_in_use" as long as the conn has not been closed.
self.conn_key = id
super(PooledDatabase, self).__init__(database, **kwargs)
def _connect(self):
while True:
try:
# Remove the oldest connection from the heap.
ts, conn = heapq.heappop(self._connections)
key = self.conn_key(conn)
except IndexError:
ts = conn = None
logger.debug('No connection available in pool.')
break
else:
if self._is_closed(conn):
# This connecton was closed, but since it was not stale
# it got added back to the queue of available conns. We
# then closed it and marked it as explicitly closed, so
# it's safe to throw it away now.
# (Because Database.close() calls Database._close()).
logger.debug('Connection %s was closed.', key)
ts = conn = None
elif self._stale_timeout and self._is_stale(ts):
# If we are attempting to check out a stale connection,
# then close it. We don't need to mark it in the "closed"
# set, because it is not in the list of available conns
# anymore.
logger.debug('Connection %s was stale, closing.', key)
self._close(conn, True)
ts = conn = None
else:
break
if conn is None:
if self._max_connections and (
len(self._in_use) >= self._max_connections):
raise MaxConnectionsExceeded('Exceeded maximum connections.')
conn = super(PooledDatabase, self)._connect()
ts = time.time() - random.random() / 1000
key = self.conn_key(conn)
logger.debug('Created new connection %s.', key)
self._in_use[key] = PoolConnection(ts, conn, time.time())
return conn
def _close(self, conn, close_conn=False):
key = self.conn_key(conn)
if close_conn:
super(PooledDatabase, self)._close(conn)
elif key in self._in_use:
pool_conn = self._in_use.pop(key)
if self._stale_timeout and self._is_stale(pool_conn.timestamp):
logger.debug('Closing stale connection %s.', key)
super(PooledDatabase, self)._close(conn)
elif self._can_reuse(conn):
logger.debug('Returning %s to pool.', key)
heapq.heappush(self._connections, (pool_conn.timestamp, conn))
else:
logger.debug('Closed %s.', key)
class PooledMySQLDatabase(PooledDatabase, MySQLDatabase):
def _is_closed(self, conn):
try:
conn.ping(False)
except:
return True
else:
return False
self._max_connections: 存放空闲的DB连接,是一个优先队列
self._in_use = {}:存放正在使用的DB连接,表示一个正在使用队列
首先从空闲队列中取一个空闲连接
如果空闲连接不存在,则创建一个新的DB连接,并将其放到self._in_use,如果超出了最大连接数,抛出MaxConnectionsExceeded异常
如果空闲连接存在
判断空闲连接是否可用,通过ping()操作实现,如果连接已关闭,则创建一个新的DB连接
判断空闲连接是否超时,如果连接的存活时间超出指定阈值,则关闭连接,重新创建一个新的DB连接
否则将空闲连接加入到self._in_use
根据上面的分析,并未发现如何将self._in_use重新加入到self._max_connections空闲队列。
我们在_close()中发现了入队操作
heapq.heappush(self._connections, (pool_conn.timestamp, conn))
说明在DB连接关闭时,才会将连接重新放入到空闲队列,这就需要我们在代码中显示调用关闭连接的方法,否则很快就会出现MaxConnectionsExceeded的异常。
如何显示关闭连接?
If you want to manage transactions separately, you can use the Database.connection_context() context manager.
with db.connection_context():
# db connection is open.
pass
...
# db connection is closed. db.is_closed()
True
The connection_context() method can also be used as a decorator:
def prepare_database():
# DB connection will be managed by the decorator, which opens
# a connection, calls function, and closes upon returning.
db.create_tables(MODELS) # Create schema.
load_fixture_data(db)