首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >python多处理队列未正确并行化

python多处理队列未正确并行化
EN

Stack Overflow用户
提问于 2020-02-06 17:22:54
回答 1查看 132关注 0票数 0

我很难让我的代码正确地并行运行。我需要的是:

  • I有4种磁盘代码,我需要在一定的时间内进行进化,在离散时间步骤中,
  • ,我想在并行的

中进化每一种代码。

由于代码的性质以及我需要存储的数据,在尝试了进程池之后,我决定我最好的方法是使用队列。我试过两种不同的密码。在这个例子中,disk_codes只是数字,但在我的实际代码中,它们是另一个代码的单独实例,用于求解磁盘中的运动方程。

代码1:

代码语言:javascript
复制
import multiprocessing

try:
    import queue
except:
    import Queue as queue

def evolve_single_disk(queue, dt):
    print "Empty queue? ", queue.empty()
    code = queue.get()
    print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
    queue.task_done()

if __name__ == '__main__':
    ncores = 4
    code_queue = queue.Queue()
    disk_codes = range(ncores)

    for disk in disk_codes:
        code_queue.put(disk)

    dt = 1
    t_end = 10
    t = 0

    # Evolve!
    while t < t_end:
        print "t=", t

        processes = multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, ))

        processes.start()
        processes.join()

        disk_codes = code_queue.get()
        print "disk codes: ", disk_codes


        t += dt

这导致:

代码语言:javascript
复制
t= 0
Empty queue?  False
Evolving disk 0 in Process-1
disk codes:  0

t= 1
Empty queue?  False
Evolving disk 1 in Process-2
disk codes:  1

t= 2
Empty queue?  False
Evolving disk 2 in Process-3
disk codes:  2

t= 3
Empty queue?  False
Evolving disk 3 in Process-4
disk codes:  3

t= 4
Empty queue?  True

因此,在每一个时间步骤中,其中一个磁盘是“进化”的。这不是我想要的,因为我希望所有四个磁盘在同一时间步骤中并行进化。

然后我试了一下:

代码2:

代码语言:javascript
复制
import multiprocessing

try:
    import queue
except:
    import Queue as queue


def evolve_single_disk(queue, dt):
    print "Empty queue? ", queue.empty()
    code = queue.get()  
    print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
    queue.task_done()


if __name__ == '__main__':
    ncores = 4
    code_queue = queue.Queue()
    disk_codes = range(ncores)

    for disk in disk_codes:
        code_queue.put(disk)

    dt = 1
    t_end = 10
    t = 0

    # Evolve!
    while t < t_end:
        print ""
        print "t=", t

        processes = [multiprocessing.Process(target=evolve_single_disk, args=(code_queue, dt, )) for x in range(ncores)]

        for p in processes:
            p.start()
            p.join()

        disk_codes = [code_queue.get() for p in processes]
        print "disk codes: ", disk_codes

        t += dt

其结果是:

代码语言:javascript
复制
t= 0
Empty queue?  False
Evolving disk 0 in Process-1
Empty queue?  False
Evolving disk 0 in Process-2
Empty queue?  False
Evolving disk 0 in Process-3
Empty queue?  False
Evolving disk 0 in Process-4
disk codes:  [0, 1, 2, 3]

t= 1
Empty queue?  True

...and,那么代码就挂起了。在这里,我在每个时间步骤中启动了4个进程,但是每个进程都接收到了完全相同的磁盘。

我如何才能正确地做到这一点,以便在每一时间步骤上有4个进程,并且每个进程都演化成一个磁盘?我读过文档和许多教程,所以我的问题/答案,但我仍然非常困惑。

编辑:

我尝试使用multiprocessing队列,但是当我尝试将磁盘代码放入队列时,我得到了一个TypeError。不幸的是,磁盘代码也是不可挑选的。当使用带有磁盘代码的multiprocessing队列时,跟踪:

代码语言:javascript
复制
t= 0
<multiprocessing.queues.Queue object at 0x7fee4a2d3950>
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Traceback (most recent call last):
  File "/home/fran/anaconda2/lib/python2.7/multiprocessing/queues.py", line 268, in _feed
    send(obj)
TypeError: expected string or Unicode object, NoneType found
Empty queue?  True
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-02-06 17:38:34

将其移到while循环中:

代码语言:javascript
复制
    for disk in disk_codes:
        code_queue.put(disk)

以下是完整的代码:

代码语言:javascript
复制
import multiprocessing

def evolve_single_disk(queue, result, dt):
    print "Empty queue? ", queue.empty()
    code = queue.get()
    print "Evolving disk {0} in {1}".format(code, multiprocessing.current_process().name)
    result.put(code)


if __name__ == '__main__':
    ncores = 4
    code_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    disk_codes = range(ncores)

    dt = 1
    t_end = 10
    t = 0

    # Evolve!
    while t < t_end:
        for disk in disk_codes:
            code_queue.put(disk)

        print ""
        print "t=", t

        process_list = list()
        for x in range(ncores):
            process = multiprocessing.Process(target=evolve_single_disk, args=(code_queue, result_queue, dt))
            process_list.append(process)

        for p in process_list:
            p.start()
            p.join()

        disk_codes = [result_queue.get() for p in process_list]
        print "disk codes: ", disk_codes

        t += dt

输出

代码语言:javascript
复制
t= 0
Empty queue?  False
Evolving disk 0 in Process-1
Empty queue?  False
Evolving disk 1 in Process-2
Empty queue?  False
Evolving disk 2 in Process-3
Empty queue?  False
Evolving disk 3 in Process-4
disk codes:  [0, 1, 2, 3]

t= 1
Empty queue?  False
Evolving disk 0 in Process-5
Empty queue?  False
Evolving disk 1 in Process-6
Empty queue?  False
Evolving disk 2 in Process-7
Empty queue?  False
Evolving disk 3 in Process-8
disk codes:  [0, 1, 2, 3]

...

t= 9
Empty queue?  False
Evolving disk 0 in Process-37
Empty queue?  False
Evolving disk 1 in Process-38
Empty queue?  False
Evolving disk 2 in Process-39
Empty queue?  False
Evolving disk 3 in Process-40
disk codes:  [0, 1, 2, 3]
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60100434

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档