首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >multiprocessing.Pipe甚至比multiprocessing.Queue慢?

multiprocessing.Pipe甚至比multiprocessing.Queue慢?
EN

Stack Overflow用户
提问于 2018-01-20 07:09:00
回答 3查看 5.4K关注 0票数 11

我试图从Pipe包中对Queue的速度进行基准测试。我不认为Pipe会更快,因为Queue在内部使用Pipe

奇怪的是,在发送大型numpy数组时,PipeQueue慢。我在这里错过了什么?

管道:

代码语言:javascript
复制
import sys
import time
from multiprocessing import Process, Pipe
import numpy as np

NUM = 1000


def worker(conn):
    for task_nbr in range(NUM):
        conn.send(np.random.rand(400, 400, 3))
    sys.exit(1)


def main():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()


if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

# Took 10.86s.

队列

代码语言:javascript
复制
import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
import numpy as np

NUM = 1000

def worker(q):
    for task_nbr in range(NUM):
        q.put(np.random.rand(400, 400, 3))
    sys.exit(1)

def main():
    recv_q = Queue()
    Process(target=worker, args=(recv_q,)).start()
    for num in range(NUM):
        message = recv_q.get()

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

# Took 6.86s.
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-01-23 04:48:37

您可以做一个实验,并将以下内容放入上面的管道代码中。

代码语言:javascript
复制
def worker(conn):
    for task_nbr in range(NUM):
        data = np.random.rand(400, 400, 3)
    sys.exit(1)

def main():
    parent_conn, child_conn = Pipe(duplex=False)
    p = Process(target=worker, args=(child_conn,))
    p.start()
    p.join()

这给了您为测试创建数据所需的时间。在我的系统中,这大约需要2.9秒。

在外壳下,queue对象实现一个缓冲区和一个线程发送。线程仍然处于相同的进程中,但是通过使用它,数据创建不必等待系统IO完成。它有效地并行化了操作。尝试使用一些简单的线程实现来修改管道代码(免责声明,这里的代码仅供测试,还没有准备好生产)。

代码语言:javascript
复制
import sys
import time
import threading
from multiprocessing import Process, Pipe, Lock
import numpy as np
import copy

NUM = 1000

def worker(conn):
    _conn = conn
    _buf = []
    _wlock = Lock()
    _sentinel = object() # signal that we're done
    def thread_worker():
        while 1:
            if _buf:
                _wlock.acquire()
                obj = _buf.pop(0)
                if obj is _sentinel: return
                _conn.send(data)
                _wlock.release()
    t = threading.Thread(target=thread_worker)
    t.start()
    for task_nbr in range(NUM):
        data = np.random.rand(400, 400, 3)
        data[0][0][0] = task_nbr    # just for integrity check
        _wlock.acquire()
        _buf.append(data)
        _wlock.release()
    _wlock.acquire()
    _buf.append(_sentinel)
    _wlock.release()
    t.join()
    sys.exit(1)

def main():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()
        assert num == message[0][0][0], 'Data was corrupted'        

if __name__ == "__main__":
    start_time = time.time()
    main()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration

    print "Duration: %s" % duration
    print "Messages Per Second: %s" % msg_per_sec

在我的机器上,这需要3.4秒才能运行,几乎与上面的队列代码完全相同。

来自https://docs.python.org/2/library/threading.html

在Cython中,由于全局解释器锁,只有一个线程可以同时执行Python代码.但是,如果要同时运行多个I/O绑定任务,线程仍然是一个合适的模型。

queuepipe之间的差异肯定是一个奇怪的实现细节,除非您深入了解它。

票数 10
EN

Stack Overflow用户

发布于 2018-01-28 22:46:17

根据您的print命令,我假定您使用的是Python2。但是,这种奇怪的行为不能用Python3复制,因为Pipe实际上比Queue快。

代码语言:javascript
复制
import sys
import time
from multiprocessing import Process, Pipe, Queue
import numpy as np

NUM = 20000


def worker_pipe(conn):
    for task_nbr in range(NUM):
        conn.send(np.random.rand(40, 40, 3))
    sys.exit(1)


def main_pipe():
    parent_conn, child_conn = Pipe(duplex=False)
    Process(target=worker_pipe, args=(child_conn,)).start()
    for num in range(NUM):
        message = parent_conn.recv()


def pipe_test():
    start_time = time.time()
    main_pipe()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration
    print("Pipe")
    print("Duration: " + str(duration))
    print("Messages Per Second: " + str(msg_per_sec))

def worker_queue(q):
    for task_nbr in range(NUM):
        q.put(np.random.rand(40, 40, 3))
    sys.exit(1)

def main_queue():
    recv_q = Queue()
    Process(target=worker_queue, args=(recv_q,)).start()
    for num in range(NUM):
        message = recv_q.get()

def queue_test():
    start_time = time.time()
    main_queue()
    end_time = time.time()
    duration = end_time - start_time
    msg_per_sec = NUM / duration
    print("Queue")
    print("Duration: " + str(duration))
    print("Messages Per Second: " + str(msg_per_sec))


if __name__ == "__main__":
    for i in range(2):
        queue_test()
        pipe_test()

在以下方面的成果:

代码语言:javascript
复制
Queue
Duration: 3.44321894646
Messages Per Second: 5808.51822408
Pipe
Duration: 2.69065594673
Messages Per Second: 7433.13169575
Queue
Duration: 3.45295906067
Messages Per Second: 5792.13354361
Pipe
Duration: 2.78426194191
Messages Per Second: 7183.23218766


------------------
(program exited with code: 0)
Press return to continue
票数 6
EN

Stack Overflow用户

发布于 2019-09-08 15:03:59

在我的系统中,Pipe(duplex=False)Pipe(duplex=True)慢(是Pipe(duplex=True)的两倍,或者一半)。对于任何寻找性能的人来说,这里是一个并行的比较:

代码语言:javascript
复制
from time import time
from multiprocessing import Process, Queue, Pipe

n = 1000
buffer = b'\0' * (1000*1000) # 1 megabyte

def print_elapsed(name, start):
    elapsed = time() - start
    spi = elapsed / n
    ips = n / elapsed
    print(f'{name}: {spi*1000:.3f} ms/item, {ips:.0f} item/sec')

def producer(q):
    start = time()
    for i in range(n):
        q.put(buffer)
    print_elapsed('producer', start)

def consumer(q):
    start = time()
    for i in range(n):
        out = q.get()
    print_elapsed('consumer', start)

class PipeQueue():
    def __init__(self, **kwargs):
        self.out_pipe, self.in_pipe = Pipe(**kwargs)
    def put(self, item):
        self.in_pipe.send_bytes(item)
    def get(self):
        return self.out_pipe.recv_bytes()
    def close(self):
        self.out_pipe.close()
        self.in_pipe.close()

print('duplex=True')
q = PipeQueue(duplex=True)
producer_process = Process(target=producer, args=(q,))
consumer_process = Process(target=consumer, args=(q,))
consumer_process.start()
producer_process.start()
consumer_process.join()
producer_process.join()
q.close()

print('duplex=False')
q = PipeQueue(duplex=False)
producer_process = Process(target=producer, args=(q,))
consumer_process = Process(target=consumer, args=(q,))
consumer_process.start()
producer_process.start()
consumer_process.join()
producer_process.join()
q.close()

结果:

代码语言:javascript
复制
duplex=True
consumer: 0.301 ms/item, 3317 item/sec
producer: 0.298 ms/item, 3358 item/sec
duplex=False
consumer: 0.673 ms/item, 1486 item/sec
producer: 0.669 ms/item, 1494 item/sec

我认为这必须归结到CPython socket.socketpair,但我不确定。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48353601

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档