首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >清空队列时出现问题

清空队列时出现问题
EN

Stack Overflow用户
提问于 2021-04-15 11:49:43
回答 1查看 58关注 0票数 0

问题

我是一个多进程的新手,对于我尝试过的所有事情,我都一无所获。每次我认为我想明白了什么,我就遇到了一个新的障碍。我的目标是使用多个进程加载一个队列,然后使用多个进程从队列中提取数据并处理数据。我尝试恢复到基本的队列处理,但是一旦我实现了多个进程,我就无法从队列中获得任何东西。我遗漏了什么?

代码

代码语言:javascript
复制
rom multiprocessing import Process, Lock
from queue import Queue
import os

q = Queue(5)


def get_from_q():
    print('trying to get')
    print(q.get())


if __name__ == '__main__':

    # put items at the end of the queue
    for x in range(6):
        print('adding ', x)
        q.put(x)

    PROCESSOR_COUNT = os.cpu_count()
    processes = []
    for p in range(PROCESSOR_COUNT):
        print('spawning process')
        p = Process(target=get_from_q)
        processes.append(p)

    for p in processes:
        print('starting')
        p.start()

    for p in processes:
        print('joining')
        p.join()

结果:

代码语言:javascript
复制
    adding 0
    adding 1
    adding 2
    adding 3
    adding 4
    adding 5

预期结果

代码语言:javascript
复制
    adding 0
    adding 1
    adding 2
    adding 3
    adding 4
    adding 5
    spawning process
    spawning process
    spawning process
    spawning processv
    starting
    starting
    starting
    starting
    trying to get 
    0
    trying to get 
    1 
    trying to get 
    2 
    trying to get 
    3 
    trying to get 
    4
    trying to get 
    5
    joining
    joining
    joining
    joining
EN

回答 1

Stack Overflow用户

发布于 2021-04-16 01:16:32

如果在使用spawn创建新进程的平台下运行,则在创建新进程时,不是继承主进程的地址空间,而是通过从程序顶部重新执行所有代码来初始化新地址空间。这意味着你在全局作用域中定义的任何东西都会被重新执行,例如在你的代码中:

代码语言:javascript
复制
q = Queue(5)

这意味着您创建的每个进程都会执行此代码,这意味着每个进程都有自己的q副本。这是不可行的。您需要创建一次q并将其作为参数传递。我还向print函数添加了flush=True,以减少来自不同进程的输出交错的可能性。

代码语言:javascript
复制
from multiprocessing import Process, Lock, Queue
import os


def get_from_q(q):
    print('trying to get', q.get(), flush=True)


if __name__ == '__main__':
    PROCESSOR_COUNT = os.cpu_count()

    q = Queue(PROCESSOR_COUNT) # or put no size limitation on this

    # put items at the end of the queue
    for x in range(PROCESSOR_COUNT):
        print('adding ', x)
        q.put(x)

    processes = []
    for p in range(PROCESSOR_COUNT):
        print('spawning process')
        p = Process(target=get_from_q, args=(q,))
        processes.append(p)

    for p in processes:
        print('starting', flush=True)
        p.start()

    for p in processes:
        print('joining', flush=True)
        p.join()

打印:

代码语言:javascript
复制
adding  0
adding  1
adding  2
adding  3
adding  4
adding  5
adding  6
adding  7
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
spawning process
starting
starting
starting
starting
starting
starting
starting
starting
joining
trying to get 0
trying to get 1
trying to get 2
trying to get 3
trying to get 4
trying to get 5
trying to get 6
joining
joining
joining
trying to get 7
joining
joining
joining
joining

使用进程池的

在这里,队列被池实现隐藏:

代码语言:javascript
复制
from multiprocessing import Pool, cpu_count


def worker(x):
    print('x =', x, flush=True)
    return x ** 2


if __name__ == '__main__':
    PROCESSOR_COUNT = cpu_count()

    pool = Pool(PROCESSOR_COUNT) #
    print(pool.map(worker, range(PROCESSOR_COUNT)))

打印:

代码语言:javascript
复制
x = 0
x = 1
x = 2
x = 3
x = 4
x = 5
x = 6
x = 7
[0, 1, 4, 9, 16, 25, 36, 49]
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67102004

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档