首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多处理循环

多处理循环
EN

Stack Overflow用户
提问于 2022-10-24 19:11:23
回答 1查看 56关注 0票数 0

我有一个脚本,可以循环处理熊猫的数据,并根据一些搜索和几何操作将地理信息系统数据输出到地质公园。当我使用for循环时,它可以工作,但是对于超过4k的记录,它需要一段时间。由于它是作为自己的函数构建的,它基于行迭代返回所需的内容,因此我尝试使用以下多进程运行它:

代码语言:javascript
复制
import pandas as pd, bwe_mapping

from multiprocessing import Pool
    
#Sample dataframe
bwes = [['id', 7216],['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92], ['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]

bwedf = pd.read_csv(bwes)

geopackage = "datalocation\geopackage.gpkg"
tracklayer = "tracks"

if __name__=='__main__':
    def task(item):
        bwe_mapping.map_bwe(item, geopackage, tracklayer)
    pool = Pool()
    for index, row in bwedf.iterrows():
        task(row)
    with Pool() as pool:
         for results in pool.imap_unordered(task, bwedf.iterrows()):
            print(results)

当我运行这个任务时,我的任务管理器会填充16个新的python任务,但没有迹象表明正在执行任何操作。如果使用numpy.array.split()将我的熊猫df分解成4或8个较小的,并在bwedf.iterrows()中运行for,行:对于它自己的处理器上的每个数据,会更好吗?不需要按任何顺序进行任何处理;只要我能够将输出(即位势数据)存储到一个列表中,在最后连接到地质层。我是否应该将for循环放在函数中,并将整个数据帧和gis数据传递给它搜索呢?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-10-24 20:43:12

如果在windows/macOS上运行,那么它将使用spawn来创建工作人员,这意味着任何子都必须在导入主脚本时找到要执行的函数。

您的代码在您的if __name__=='__main__':中有函数定义,因此子程序无法访问它。

只需将函数def移到if __name__=='__main__':之前即可使其工作。

正在发生的情况是,每个子程序在尝试运行一个函数时都会崩溃,因为它从未看到它的定义。

重现问题的最小代码:

代码语言:javascript
复制
from multiprocessing import Pool

if __name__ == '__main__':
    def task(item):
        print(item)
        return item

    pool = Pool()

    with Pool() as pool:
        for results in pool.imap_unordered(task, range(10)):
            print(results)

解决方案是将函数定义移到if __name__=='__main__':行之前。

编辑:现在要在dataframe中的行上迭代,这个简单的示例演示了如何进行迭代,注意迭代行返回一个索引和一个行,这就是为什么它是解压缩的。

代码语言:javascript
复制
import os
import pandas as pd
from multiprocessing import Pool
import time

# Sample dataframe
bwes = [['id', 7216], ['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92],
        ['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]

bwef = pd.DataFrame(bwes)

def task(item):
    time.sleep(1)
    index, row = item
    # print(os.getpid(), tuple(row))
    return str(os.getpid()) + " " + str(tuple(row))


if __name__ == '__main__':

    with Pool() as pool:
        for results in pool.imap_unordered(task, bwef.iterrows()):
            print(results)

time.sleep(1)只存在于其中,因为只有少量的工作,一个工人可能会抢占所有的工作,所以我强迫每个工人等待其他的工作,您应该删除它,结果如下:

代码语言:javascript
复制
13228 ('id', 7216)
11376 ('item_id', 3277841)
15580 ('Date', '2019-01-04T00:00:00.000Z')
10712 ('start_lat', -56.92)
11376 ('End_lat', -59.87)
13228 ('start_lon', 45.87)
10712 ('End_lon', 44.67)

您的“示例”数据have似乎已被转换,但您只需正确构造dataframe,我建议您在跨多个核运行它之前,先使用迭代行顺序运行代码。

显然,将数据发送给工人并从他们那里返回需要时间,所以确保每个工作人员都在做大量的计算工作,而不仅仅是将其发送回父进程。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74185743

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档