我有一个脚本,可以循环处理熊猫的数据,并根据一些搜索和几何操作将地理信息系统数据输出到地质公园。当我使用for循环时,它可以工作,但是对于超过4k的记录,它需要一段时间。由于它是作为自己的函数构建的,它基于行迭代返回所需的内容,因此我尝试使用以下多进程运行它:
import pandas as pd, bwe_mapping
from multiprocessing import Pool
#Sample dataframe
bwes = [['id', 7216],['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92], ['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]
bwedf = pd.read_csv(bwes)
geopackage = "datalocation\geopackage.gpkg"
tracklayer = "tracks"
if __name__=='__main__':
def task(item):
bwe_mapping.map_bwe(item, geopackage, tracklayer)
pool = Pool()
for index, row in bwedf.iterrows():
task(row)
with Pool() as pool:
for results in pool.imap_unordered(task, bwedf.iterrows()):
print(results)当我运行这个任务时,我的任务管理器会填充16个新的python任务,但没有迹象表明正在执行任何操作。如果使用numpy.array.split()将我的熊猫df分解成4或8个较小的,并在bwedf.iterrows()中运行for,行:对于它自己的处理器上的每个数据,会更好吗?不需要按任何顺序进行任何处理;只要我能够将输出(即位势数据)存储到一个列表中,在最后连接到地质层。我是否应该将for循环放在函数中,并将整个数据帧和gis数据传递给它搜索呢?
发布于 2022-10-24 20:43:12
如果在windows/macOS上运行,那么它将使用spawn来创建工作人员,这意味着任何子都必须在导入主脚本时找到要执行的函数。
您的代码在您的if __name__=='__main__':中有函数定义,因此子程序无法访问它。
只需将函数def移到if __name__=='__main__':之前即可使其工作。
正在发生的情况是,每个子程序在尝试运行一个函数时都会崩溃,因为它从未看到它的定义。
重现问题的最小代码:
from multiprocessing import Pool
if __name__ == '__main__':
def task(item):
print(item)
return item
pool = Pool()
with Pool() as pool:
for results in pool.imap_unordered(task, range(10)):
print(results)解决方案是将函数定义移到if __name__=='__main__':行之前。
编辑:现在要在dataframe中的行上迭代,这个简单的示例演示了如何进行迭代,注意迭代行返回一个索引和一个行,这就是为什么它是解压缩的。
import os
import pandas as pd
from multiprocessing import Pool
import time
# Sample dataframe
bwes = [['id', 7216], ['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92],
['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]
bwef = pd.DataFrame(bwes)
def task(item):
time.sleep(1)
index, row = item
# print(os.getpid(), tuple(row))
return str(os.getpid()) + " " + str(tuple(row))
if __name__ == '__main__':
with Pool() as pool:
for results in pool.imap_unordered(task, bwef.iterrows()):
print(results)time.sleep(1)只存在于其中,因为只有少量的工作,一个工人可能会抢占所有的工作,所以我强迫每个工人等待其他的工作,您应该删除它,结果如下:
13228 ('id', 7216)
11376 ('item_id', 3277841)
15580 ('Date', '2019-01-04T00:00:00.000Z')
10712 ('start_lat', -56.92)
11376 ('End_lat', -59.87)
13228 ('start_lon', 45.87)
10712 ('End_lon', 44.67)您的“示例”数据have似乎已被转换,但您只需正确构造dataframe,我建议您在跨多个核运行它之前,先使用迭代行顺序运行代码。
显然,将数据发送给工人并从他们那里返回需要时间,所以确保每个工作人员都在做大量的计算工作,而不仅仅是将其发送回父进程。
https://stackoverflow.com/questions/74185743
复制相似问题