Python 中创建 PostgreSQL 数据库连接池
征战 Java 多年,习惯于使用数据库之前都必须创建一个连接池,即使是单线程的应用,只要有多个方法中需用到数据库连接,建立一两个连接的也会考虑先池化他们。连接池的好处多多,1) 如果反复创建连接相当耗时,2) 对于单个连接一路用到底的应用,有连接池时避免了数据库连接对象传来传去,3) 忘记关连接了,连接池幸许还能帮忙在一定时长后关掉,当然密集取连接的应用势将耗尽连接,3) 一个应用打开连接的数量是可控的
接触到 Python 后,在使用 PostgreSQL 也自然而然的考虑创建连接池,使用时从池中取,用完后还回去,而不是每次需要连接时创建一个物理的。Python 连接 PostgreSQL 是主要有两个包,py-postgresql 和 psycopg2, 而本文的实例将使用后者。
Psycopg 在 psycopg2.pool 模块中提供了两个连接池的实现在,它们都继承自 psycopg2.pool.AbstractConnectionPool, 该抽象类的基本方法是
两个连接池的实现类是
所以最安全保险的做法还是使用 ThreadedConnectionPool, 在单线程应用中, SimpleConnectionPool 也不见得比 ThreadedConnectionPool 效率高多少。
下面来看一个具体的连接池实现,其中用到了 Context Manager, 使用时结合
db_helper.py
几点说明:
使用方式
如果不用事物
如果需要用到事物
在写作本文时,查看 psycopg 的官网时,发现 Psycopg 3.0 正式版在 2021-10-13 日发布了(Psycopg 3.0 released), 更好的支持 async。在 Psycopg2 2.2 版本时就开始支持异步了。而且还注意到 Psycopg 的主要部分是用 C 实现的,才使得它效率比较高,也难怪经常用
在创建连接池时加上参数 keepalivesXxx 能让服务器及时断掉死链接,否则在 Linux 下默认要 2 个小时后才断开。死链接的情况发生在客户端异常退出(如断电)时先前建立的链接就变为死链接了。
[2022-05-04] 小小改进
根据条件来返回 cursor 或 (cursor, connection), 使用的时候就不需要
链接:
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
接触到 Python 后,在使用 PostgreSQL 也自然而然的考虑创建连接池,使用时从池中取,用完后还回去,而不是每次需要连接时创建一个物理的。Python 连接 PostgreSQL 是主要有两个包,py-postgresql 和 psycopg2, 而本文的实例将使用后者。
Psycopg 在 psycopg2.pool 模块中提供了两个连接池的实现在,它们都继承自 psycopg2.pool.AbstractConnectionPool, 该抽象类的基本方法是
- getconn(key=None): 获取连接
- putconn(conn, key=None, close=False): 归还连接
- closeall(): 关闭连接池中的所有连接
两个连接池的实现类是
- psycopg2.pool.SimpleConnectionPool(minconn, maxconn, *args, **kwars): 给单线程应用用的
- psycopg2.pool.ThreadedConnectionPool(minconn, maxconn, *args, **kwars): 多线程时更安全,其实就是在 getconn() 和 putconn() 时加了锁来控制
所以最安全保险的做法还是使用 ThreadedConnectionPool, 在单线程应用中, SimpleConnectionPool 也不见得比 ThreadedConnectionPool 效率高多少。
下面来看一个具体的连接池实现,其中用到了 Context Manager, 使用时结合
with 键字更方便,用完后不用显式的调用 putconn() 归还连接db_helper.py
1from psycopg2 import pool
2from psycopg2.extras import RealDictCursor
3from contextlib import contextmanager
4import atexit
5
6
7class DBHelper:
8 def __init__(self):
9 self._connection_pool = None
10
11 def initialize_connection_pool(self):
12 db_dsn = 'postgresql://admin:password@localhost/testdb?connect_timeout=5'
13 self._connection_pool = pool.ThreadedConnectionPool(1, 3,db_dsn)
14
15 @contextmanager
16 def get_resource(self, autocommit=True):
17 if self._connection_pool is None:
18 self.initialize_connection_pool()
19
20 conn = self._connection_pool.getconn()
21 conn.autocommit = autocommit
22 cursor = conn.cursor(cursor_factory=RealDictCursor)
23 try:
24 yield cursor, conn
25 finally:
26 cursor.close()
27 self._connection_pool.putconn(conn)
28
29 def shutdown_connection_pool(self):
30 if self._connection_pool is not None:
31 self._connection_pool.closeall()
32
33
34db_helper = DBHelper()
35
36
37@atexit.register
38def shutdown_connection_pool():
39 db_helper.shutdown_connection_pool()几点说明:
- 只在第一次调用
get_resource()时创建连接池,而不是在from db_helper import db_helper引用时就创建连接池 - Context Manager 返回了两个对象,cursor 和 connection, 需要用 connection 管理事物时用它
- 默认时 cursor 返回的记录是字典,而非数组
- 默认时连接为自动提交
- 最后的 @atexit.register 那个 ShutdownHook 可能有点多余,在进程退出时连接也被关闭,TIME_WAIT 时间应该会稍长些
使用方式
如果不用事物
1from db_helper import db_helper
2
3
4with db_helper.get_resource() as (cursor, _):
5 cursor.execute('select * from users')
6 for record in cursor.fetchall():
7 ... process record, record['name'] ... 如果需要用到事物
1with db_helper.get_resource(autocommit=False) as (cursor, _):
2 try:
3 cursor.execute('update users set name = %s where id = %s', ('new_name', 1))
4 cursor.execute('delete from orders where user_id = %s', (1,))
5 conn.commit()
6 except:
7 conn.rollback()在写作本文时,查看 psycopg 的官网时,发现 Psycopg 3.0 正式版在 2021-10-13 日发布了(Psycopg 3.0 released), 更好的支持 async。在 Psycopg2 2.2 版本时就开始支持异步了。而且还注意到 Psycopg 的主要部分是用 C 实现的,才使得它效率比较高,也难怪经常用
pip install psycopg2 安装不成功,而要用 pip install psycopg2-binary 来安装的原因。在创建连接池时加上参数 keepalivesXxx 能让服务器及时断掉死链接,否则在 Linux 下默认要 2 个小时后才断开。死链接的情况发生在客户端异常退出(如断电)时先前建立的链接就变为死链接了。
pool.ThreadedConnectionPool(1, 3, db_dsn, keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)PostgreSQL 服务端会对连接在空闲 tcp_keepalives_idle 秒后,主动发送tcp_keepalives_count 个 tcp_keeplive 侦测包,每个侦探包在 tcp_keepalives_interval 秒内都没有回应,就认为是死连接,于是切断它。
[2022-05-04] 小小改进
1@contextmanager
2def get_resource(self, autocommit=True) -> Union[RealDictCursor, tuple[RealDictCursor, connection]]:
3 ......
4 try:
5 if autocommit:
6 yield cursor
7 else:
8 yield cursor, conn
9 ......根据条件来返回 cursor 或 (cursor, connection), 使用的时候就不需要
_ 了1with get_resource() as cursor:
2 ......
3
4with get_resource(false) as (cursor, connection):
5 ......链接:
- PostgreSql连接池(psycopg2.pool)
- Should PostgreSQL connections be pooled in a Python web app, or create a new connection per request?
- Simple Connection Pooling with psycopg2
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。