vlambda博客
学习文章列表

peewee连接池使用指南

peewee是一个小巧的ORM模型,相对于SQLAlchemy而言,优点如下

  • 语法简洁

  • 更好的性能

  • 丰富的文档


使用SQLAlchemy,可以处理各种复杂的查询和表间关系,有更多的高级特性,但是弄明白这些特性可能需要几倍于peewee的时间,相对而言,peewee提供组件化、易于理解和学习的API;更重要的是有丰富的文档和示例。这对于快速构建一个项目是非常关键的。我们在技术选型时,不仅要考虑功能性,更多的要考虑易用性、可维护性、项目的周期等等。


为什么要使用DB连接池?

  • 限制对DB的并发访问,减少高并发导致的DB连接数达到最大值

  • 可以控制可用连接的数量,可以提前创建连接,连接可复用且减少创建连接的性能开销


是否需要提前创建连接?

  • 可以在服务启动之前预先创建多个连接,避免在运行时连接创建开销,在连接失效时,主动ping()进行重连

  • 如果服务连接多个DB,需要知道连接哪些数据库,配置信息需要提前注册

  • 如果不在意第一次创建连接的开销,可以在有新的请求时再创建新的连接,连接释放时放到空闲队列

  • 风险是在扩容时,会导致很快db连接数的最大值


分析peewee连接池的设计

这里分析peewee-3.13.3的源码,以PooledMySQLDatabase为例

peewee/playhouse/pool.py

class PooledDatabase(object): def __init__(self, database, max_connections=20, stale_timeout=None, timeout=None, **kwargs): self._max_connections = make_int(max_connections) self._stale_timeout = make_int(stale_timeout) self._wait_timeout = make_int(timeout) if self._wait_timeout == 0: self._wait_timeout = float('inf')
# Available / idle connections stored in a heap, sorted oldest first. self._connections = []
# Mapping of connection id to PoolConnection. Ordinarily we would want # to use something like a WeakKeyDictionary, but Python typically won't # allow us to create weak references to connection objects. self._in_use = {}
# Use the memory address of the connection as the key in the event the # connection object is not hashable. Connections will not get # garbage-collected, however, because a reference to them will persist # in "_in_use" as long as the conn has not been closed. self.conn_key = id
        super(PooledDatabase, self).__init__(database, **kwargs)
def _connect(self): while True: try: # Remove the oldest connection from the heap. ts, conn = heapq.heappop(self._connections) key = self.conn_key(conn) except IndexError: ts = conn = None logger.debug('No connection available in pool.') break else: if self._is_closed(conn): # This connecton was closed, but since it was not stale # it got added back to the queue of available conns. We # then closed it and marked it as explicitly closed, so # it's safe to throw it away now. # (Because Database.close() calls Database._close()). logger.debug('Connection %s was closed.', key) ts = conn = None elif self._stale_timeout and self._is_stale(ts): # If we are attempting to check out a stale connection, # then close it. We don't need to mark it in the "closed" # set, because it is not in the list of available conns # anymore. logger.debug('Connection %s was stale, closing.', key) self._close(conn, True) ts = conn = None else: break
if conn is None: if self._max_connections and ( len(self._in_use) >= self._max_connections): raise MaxConnectionsExceeded('Exceeded maximum connections.') conn = super(PooledDatabase, self)._connect() ts = time.time() - random.random() / 1000 key = self.conn_key(conn) logger.debug('Created new connection %s.', key)
self._in_use[key] = PoolConnection(ts, conn, time.time()) return conn
def _close(self, conn, close_conn=False): key = self.conn_key(conn) if close_conn: super(PooledDatabase, self)._close(conn) elif key in self._in_use: pool_conn = self._in_use.pop(key) if self._stale_timeout and self._is_stale(pool_conn.timestamp): logger.debug('Closing stale connection %s.', key) super(PooledDatabase, self)._close(conn) elif self._can_reuse(conn): logger.debug('Returning %s to pool.', key) heapq.heappush(self._connections, (pool_conn.timestamp, conn)) else: logger.debug('Closed %s.', key)


class PooledMySQLDatabase(PooledDatabase, MySQLDatabase): def _is_closed(self, conn): try: conn.ping(False) except: return True else: return False

self._max_connections: 存放空闲的DB连接,是一个优先队列

self._in_use = {}:存放正在使用的DB连接,表示一个正在使用队列

  • 首先从空闲队列中取一个空闲连接

  • 如果空闲连接不存在,则创建一个新的DB连接,并将其放到self._in_use,如果超出了最大连接数,抛出MaxConnectionsExceeded异常

  • 如果空闲连接存在

    • 判断空闲连接是否可用,通过ping()操作实现,如果连接已关闭,则创建一个新的DB连接

    • 判断空闲连接是否超时,如果连接的存活时间超出指定阈值,则关闭连接,重新创建一个新的DB连接

    • 否则将空闲连接加入到self._in_use


根据上面的分析,并未发现如何将self._in_use重新加入到self._max_connections空闲队列。


我们在_close()中发现了入队操作

heapq.heappush(self._connections, (pool_conn.timestamp, conn))

说明在DB连接关闭时,才会将连接重新放入到空闲队列,这就需要我们在代码中显示调用关闭连接的方法,否则很快就会出现MaxConnectionsExceeded的异常。


如何显示关闭连接?

If you want to manage transactions separately, you can use the Database.connection_context() context manager.
>>> with db.connection_context():... # db connection is open.... pass...>>> db.is_closed() # db connection is closed.TrueThe connection_context() method can also be used as a decorator:
@db.connection_context()def prepare_database(): # DB connection will be managed by the decorator, which opens # a connection, calls function, and closes upon returning. db.create_tables(MODELS) # Create schema. load_fixture_data(db)