我试图与PyGreSQL和多进程并行运行多个查询,但是下面的代码挂起而不返回:
from pg import DB
from multiprocessing import Pool
from functools import partial
def create_query(table_name):
return f"""create table {table_name} (id integer);
CREATE INDEX ON {table_name} USING BTREE (id);"""
my_queries = [ create_query('foo'), create_query('bar'), create_query('baz') ]
def execute_query(conn_string, query):
con = DB(conn_string)
con.query(query)
con.close()
rs_conn_string = "host=localhost port=5432 dbname=postgres user=postgres password="
pool = Pool(processes=len(my_queries))
pool.map(partial(execute_query,rs_conn_string), my_queries)有什么办法让它起作用吗?另外,在一个查询失败而另一个查询被回滚的情况下,是否有可能在同一个“事务”中运行3个查询?
发布于 2020-05-30 22:43:18
一个明显的问题是,您总是运行pool.map,不仅是在主进程中,而且在并行子进程中使用的解释器导入脚本时也是如此。你应该这样做,而不是:
def run_all():
with Pool(processes=len(my_queries)) as pool:
pool.map(partial(execute_query,rs_conn_string), my_queries)
if __name__ == '__main__':
run_all()关于您的第二个问题,这是不可能的,因为事务是每个连接,如果您这样做的话,事务在单独的进程中存在。
发布于 2022-03-04 13:43:14
PyGreSql使用connection.poll()方法添加了异步。至于池,我喜欢覆盖MySQL.connectors池包装器来处理pgdb连接对象。有几个“可选的”连接方法调用会失败,您必须注释掉这些调用(例如,检查连接状态等等,如果需要,可以在Pgdb连接对象级别上实现这些调用,但是调用与MySQL.connectors接口不匹配)。可能有一些低级的bug,因为lib只是抽象的,但是这个解决方案已经运行了几个月了,没有任何问题。
https://stackoverflow.com/questions/62092577
复制相似问题