peewee连接池使用指南
peewee是一个小巧的ORM模型,相对于SQLAlchemy而言,优点如下
语法简洁
更好的性能
丰富的文档
使用SQLAlchemy,可以处理各种复杂的查询和表间关系,有更多的高级特性,但是弄明白这些特性可能需要几倍于peewee的时间,相对而言,peewee提供组件化、易于理解和学习的API;更重要的是有丰富的文档和示例。这对于快速构建一个项目是非常关键的。我们在技术选型时,不仅要考虑功能性,更多的要考虑易用性、可维护性、项目的周期等等。
为什么要使用DB连接池?
限制对DB的并发访问,减少高并发导致的DB连接数达到最大值
可以控制可用连接的数量,可以提前创建连接,连接可复用且减少创建连接的性能开销
是否需要提前创建连接?
可以在服务启动之前预先创建多个连接,避免在运行时连接创建开销,在连接失效时,主动ping()进行重连
如果服务连接多个DB,需要知道连接哪些数据库,配置信息需要提前注册
如果不在意第一次创建连接的开销,可以在有新的请求时再创建新的连接,连接释放时放到空闲队列
风险是在扩容时,会导致很快db连接数的最大值
分析peewee连接池的设计
这里分析peewee-3.13.3的源码,以PooledMySQLDatabase为例
peewee/playhouse/pool.py
class PooledDatabase(object):def __init__(self, database, max_connections=20, stale_timeout=None,timeout=None, **kwargs):self._max_connections = make_int(max_connections)self._stale_timeout = make_int(stale_timeout)self._wait_timeout = make_int(timeout)if self._wait_timeout == 0:self._wait_timeout = float('inf')# Available / idle connections stored in a heap, sorted oldest first.self._connections = []# Mapping of connection id to PoolConnection. Ordinarily we would want# to use something like a WeakKeyDictionary, but Python typically won't# allow us to create weak references to connection objects.self._in_use = {}# Use the memory address of the connection as the key in the event the# connection object is not hashable. Connections will not get# garbage-collected, however, because a reference to them will persist# in "_in_use" as long as the conn has not been closed.self.conn_key = idsuper(PooledDatabase, self).__init__(database, **kwargs)def _connect(self):while True:try:# Remove the oldest connection from the heap.ts, conn = heapq.heappop(self._connections)key = self.conn_key(conn)except IndexError:ts = conn = Nonelogger.debug('No connection available in pool.')breakelse:if self._is_closed(conn):# This connecton was closed, but since it was not stale# it got added back to the queue of available conns. We# then closed it and marked it as explicitly closed, so# it's safe to throw it away now.# (Because Database.close() calls Database._close()).logger.debug('Connection %s was closed.', key)ts = conn = Noneelif self._stale_timeout and self._is_stale(ts):# If we are attempting to check out a stale connection,# then close it. We don't need to mark it in the "closed"# set, because it is not in the list of available conns# anymore.logger.debug('Connection %s was stale, closing.', key)self._close(conn, True)ts = conn = Noneelse:breakif conn is None:if self._max_connections and (len(self._in_use) >= self._max_connections):raise MaxConnectionsExceeded('Exceeded maximum connections.')conn = super(PooledDatabase, self)._connect()ts = time.time() - random.random() / 1000key = self.conn_key(conn)logger.debug('Created new connection %s.', key)self._in_use[key] = PoolConnection(ts, conn, time.time())return conndef _close(self, conn, close_conn=False):key = self.conn_key(conn)if close_conn:super(PooledDatabase, self)._close(conn)elif key in self._in_use:pool_conn = self._in_use.pop(key)if self._stale_timeout and self._is_stale(pool_conn.timestamp):logger.debug('Closing stale connection %s.', key)super(PooledDatabase, self)._close(conn)elif self._can_reuse(conn):logger.debug('Returning %s to pool.', key)heapq.heappush(self._connections, (pool_conn.timestamp, conn))else:logger.debug('Closed %s.', key)
class PooledMySQLDatabase(PooledDatabase, MySQLDatabase):def _is_closed(self, conn):try:conn.ping(False)except:return Trueelse:return False
self._max_connections: 存放空闲的DB连接,是一个优先队列
self._in_use = {}:存放正在使用的DB连接,表示一个正在使用队列
首先从空闲队列中取一个空闲连接
如果空闲连接不存在,则创建一个新的DB连接,并将其放到self._in_use,如果超出了最大连接数,抛出MaxConnectionsExceeded异常
如果空闲连接存在
判断空闲连接是否可用,通过ping()操作实现,如果连接已关闭,则创建一个新的DB连接
判断空闲连接是否超时,如果连接的存活时间超出指定阈值,则关闭连接,重新创建一个新的DB连接
否则将空闲连接加入到self._in_use
根据上面的分析,并未发现如何将self._in_use重新加入到self._max_connections空闲队列。
我们在_close()中发现了入队操作
heapq.heappush(self._connections, (pool_conn.timestamp, conn))
说明在DB连接关闭时,才会将连接重新放入到空闲队列,这就需要我们在代码中显示调用关闭连接的方法,否则很快就会出现MaxConnectionsExceeded的异常。
如何显示关闭连接?
If you want to manage transactions separately, you can use the Database.connection_context() context manager.with db.connection_context():# db connection is open.pass...db.is_closed() # db connection is closed.TrueThe connection_context() method can also be used as a decorator:def prepare_database():# DB connection will be managed by the decorator, which opens# a connection, calls function, and closes upon returning.db.create_tables(MODELS) # Create schema.load_fixture_data(db)
