我的正常脚本在20秒内处理了大约30,000条记录。考虑到我必须处理的数据量(超过5000万条记录),我认为使用python的多处理是明智的。
在我的过程结束时,我使用sqlalchemy core更新数据库,其中我以50,000条为一批来更新处理过的记录。SQLAlchemy Core requires that you pass it a list for it to do a bulk update or even insert。我将这个列表命名为py_list
对于Python的多进程,我将通过一个我称之为mp_list的multiprocessing.manager.list()来捕获进程的结果。
在我将mp_list传递给SQLAlchemy bulk update语句之前,一切都很正常。此操作将失败,并显示错误AttributeError: 'list' object has no attribute 'keys'。Googling将我带到一个question on SO,它指出multiprocessing.manager.list()甚至multiprocessing.manager.dict()是/不是真正的python列表/字典。
那么问题是,如何将multiprocessing.manager.list转换为真正的python列表。
mp_list的填充方式如下:
import multiprocessing
manager = multiprocessing.Manager()
mp_list = manager.list()
def populate_mp_list(pid, is_processed):
'''Mark the record as having been processed'''
dict = {}
dict['b_id'] = pid
dict['is_processed'] = is_processed
mp_list.append(dict)抛出错误的SQLALchemy代码如下:
CONN = Engine.connect()
trans = CONN.begin()
stmt = mytable.update().where(mytable.c.id == bindparam('b_id')).\
values(is_processed=bindparam('is_processed'))
CONN.execute(stmt, mp_list)
trans.commit(我尝试过将mp_list转换为真正的python列表。创建的新列表可以工作,但其创建的时间损失会抵消多处理中节省的所有时间。
如果我对返回的mp_list进行循环并创建一个新列表。
y = []
for x in mp_list:
y.append(x)另外,如果我对mp_list进行“复制”,每次复制都会增加3秒!平均罚点球,这并不酷。
y = mp_list[0:len(mp_list)]那么,将multiprocessing.manager.list转换为SQLAlchemy核心可用的列表的最快方法是什么?
发布于 2019-05-14 11:57:30
希望我没迟到。
这不管用吗?
pythonlist = list(mp_list)同样的事情也适用于dict:-
pythondict = dict(mp_dict)发布于 2013-12-19 05:53:00
以下各项的性能如何:
Y=x,表示mp_list中的x
发布于 2019-07-04 18:34:04
使用list可以获得简单的解决方案。
result_list = list(proxy_list)https://stackoverflow.com/questions/20664695
复制相似问题