首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在类中使用multiprocess.Pool.map

在类中使用multiprocess.Pool.map
EN

Stack Overflow用户
提问于 2021-11-24 08:39:06
回答 2查看 825关注 0票数 0
代码语言:javascript
复制
from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        self.count += i
        return self.count

a = Acc()
a.multiprocess()
print(a.count)

我想输出应该是30,但它是。我不知道multiprocess.Pool.map是如何工作的,它是如何与类协作的。请详细告诉我。

顺便说一下,如果我在里面打印self.count

代码语言:javascript
复制
    def run(self, i):
        print(self.count)
        self.count += i
        return self.count

它给

代码语言:javascript
复制
0
1
0
1
00

1
10

1
00

11

0
1
00

1001



11

0
10

10

1

更令人困惑的是,为什么有混合0和1。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2021-11-24 12:30:22

首先,通过将flush=True添加到print语句中,使打印输出更加有序,以便每个打印输出占据自己的行:

代码语言:javascript
复制
from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

指纹:

代码语言:javascript
复制
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 0
i = 0
i = 1
i = 0
i = 1
i = 1
i = 0
i = 1
i = 1
a.count = 0

分析

现在让我们分析一下正在发生的事情。a = Acc()的创建是由主进程完成的。正在执行的多处理池进程是一个不同的地址空间,因此当它们执行辅助函数self.run时,对象a必须序列化/反序列化到将执行辅助函数的进程的地址空间。在这个新的地址空间中,self.count的初始值为0,这个值是打印出来的,然后增加到1并返回。同时,对象a正在被序列化/反序列化3次,因此其他3个进程也可以进行相同的处理,它们也将打印0并返回值1。但是由于所有这些增量都发生在存在于主进程地址空间以外的地址空间中的a副本上,所以主进程中的原始a保持不变。因此,当map函数继续执行并且a进一步从主进程复制到处理池时,它总是与self.count = 0一起执行。

那么问题是为什么i = 1有时会被打印而不是i = 0呢?

当您像这里一样使用可迭代指定30个元素来执行map时,默认情况下,这30个任务将根据您提供的块大小参数划分为“块”。由于我们采用了默认的chunksize=None,所以map函数根据可迭代长度和池大小计算默认的chunksize值:

代码语言:javascript
复制
chunksize, remainder = divmod(len(iterable), 4 * pool_size)
if remainder:
    chunksize += 1

在这种情况下,池大小为4,因此chunksize将被计算为2。这意味着多处理池中的每个进程一次处理两个任务队列的任务,因此他们用不同的i值处理同一个对象两次(忽略了这个值)。

如果我们指定一个1的块大小,以便每个进程一次只处理一个对象,那么我们有:

代码语言:javascript
复制
from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=1)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

指纹;

代码语言:javascript
复制
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
i = 0
a.count = 0

如果我们指定的块大小为30,以便单个进程针对单个对象处理所有任务:

代码语言:javascript
复制
from multiprocessing import Pool

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        result = pool.map(self.run, [1]*30, chunksize=30)
        pool.close()
        pool.join()

    def run(self, i):
        print('i =', self.count, flush=True)
        self.count += i
        return self.count

if __name__ == '__main__':
    a = Acc()
    a.multiprocess()
    print('a.count =', a.count)

指纹:

代码语言:javascript
复制
i = 0
i = 1
i = 2
i = 3
i = 4
i = 5
i = 6
i = 7
i = 8
i = 9
i = 10
i = 11
i = 12
i = 13
i = 14
i = 15
i = 16
i = 17
i = 18
i = 19
i = 20
i = 21
i = 22
i = 23
i = 24
i = 25
i = 26
i = 27
i = 28
i = 29
a.count = 0

当然,在最后一种情况下,没有发生多处理,因为多处理池的单个进程处理了所有提交的任务。

票数 0
EN

Stack Overflow用户

发布于 2021-11-24 10:25:52

如果强制使用多重处理,我将遵循以下方法。由于我们希望并行运行代码,所以我不希望在map中传递实例方法。

我将把run转换成一个函数而不是一个方法。它将接受和争论,并返回同样的。

代码语言:javascript
复制
def run(i):
    return i

然后在multiprocess方法中,循环pool.map的返回值,然后将其添加到self.count中。

代码语言:javascript
复制
def multiprocess(self):
    pool = Pool(processes=4)
    for r_value in pool.map(run, [1]*30):
        self.count += r_value
    pool.close()
    pool.join()

它将输出作为

代码语言:javascript
复制
30

Process finished with exit code 0

完整代码:

代码语言:javascript
复制
from multiprocessing import Pool

def run(i):
    return i

class Acc:
    def __init__(self):
        self.count = 0

    def multiprocess(self):
        pool = Pool(processes=4)
        for r_value in pool.map(run, [1]*30):
            self.count += r_value
        pool.close()
        pool.join()



if __name__ =='__main__':
    a = Acc()
    a.multiprocess()
    print(a.count)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70092963

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档