首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在python中正确实现生产者和消费者

如何在python中正确实现生产者和消费者
EN

Stack Overflow用户
提问于 2021-04-09 09:22:31
回答 2查看 1.4K关注 0票数 2

我在生产者-消费者模式中有两个线程。当消费者收到数据时,它会调用一个耗时的函数expensive(),然后进入for循环。

但是,如果消费者正在工作时有新数据到达,它应该中止当前工作(退出循环),并从新数据开始。

我尝试使用类似这样的queue.Queue:

代码语言:javascript
复制
q = queue.Queue()

def producer():
    while True:
        ...
        q.put(d)
      
def consumer():
    while True:
        d = q.get()
        expensive(d)
        for i in range(10000):
            ...
            if not q.empty():
                break

但此代码的问题是,如果生产者将数据放得太快,并且队列中有许多项,消费者将执行expensive(d)调用加上一个循环迭代,然后对每个项进行中止,这是非常耗时的。代码应该可以工作,但并未优化。

EN

回答 2

Stack Overflow用户

发布于 2021-04-09 09:52:13

在不修改expensive代码的情况下,一种解决方案是将其作为单独的进程运行,这将为您提供过早终止它的能力。但是,由于没有提到expensive运行了多长时间,这可能会更省时,也可能不会。

代码语言:javascript
复制
import multiprocessing as mp

q = queue.Queue()


def producer():
    while True:
        ...
        q.put(d)
  
def consumer():
    while True:
        d = q.get()
        exp = mp.Thread(target=expensive, args=(d,))
        for i in range(10000):
            ...
            if not q.empty():
                exp.terminate() # or exp.kill()
                break
票数 0
EN

Stack Overflow用户

发布于 2021-04-09 15:23:29

好吧,一种方法是使用队列设计,它可以保留等待和工作线程的内部列表。然后,您可以创建几个使用者线程来等待队列,并在工作到达时设置一个已知的使用者线程来完成工作。当线程结束时,它调用队列来从工作列表中删除自己,并将自己添加到等待列表中。

每个消费者线程都有一个“中止”原子,它可以通知线程提前结束。当线程执行内部循环时,会有一些延迟,但这并不重要。

如果新的工作从生产者到达队列,并且工作队列不为空,则可以设置工作线程的“中止”布尔值,并将它们的优先级设置为最小。然后,可以将新工作分派到池中的一个等待线程上,从而使其正常工作。

等待线程将需要一个'start‘函数,该函数用信号通知事件/sema/condvar等待thread..well..waits开启。这允许提供工作的生产者设置特定的线程运行,而不是“通常”的做法,即池中的任何线程都可以接手工作。

这样的设计允许“立即”开始新的工作,通过取消优先级来使以前的工作线程变得无关紧要,并避免线程/进程终止的开销。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67013844

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档