我在生产者-消费者模式中有两个线程。当消费者收到数据时,它会调用一个耗时的函数expensive(),然后进入for循环。
但是,如果消费者正在工作时有新数据到达,它应该中止当前工作(退出循环),并从新数据开始。
我尝试使用类似这样的queue.Queue:
q = queue.Queue()
def producer():
while True:
...
q.put(d)
def consumer():
while True:
d = q.get()
expensive(d)
for i in range(10000):
...
if not q.empty():
break但此代码的问题是,如果生产者将数据放得太快,并且队列中有许多项,消费者将执行expensive(d)调用加上一个循环迭代,然后对每个项进行中止,这是非常耗时的。代码应该可以工作,但并未优化。
发布于 2021-04-09 09:52:13
在不修改expensive代码的情况下,一种解决方案是将其作为单独的进程运行,这将为您提供过早终止它的能力。但是,由于没有提到expensive运行了多长时间,这可能会更省时,也可能不会。
import multiprocessing as mp
q = queue.Queue()
def producer():
while True:
...
q.put(d)
def consumer():
while True:
d = q.get()
exp = mp.Thread(target=expensive, args=(d,))
for i in range(10000):
...
if not q.empty():
exp.terminate() # or exp.kill()
break发布于 2021-04-09 15:23:29
好吧,一种方法是使用队列设计,它可以保留等待和工作线程的内部列表。然后,您可以创建几个使用者线程来等待队列,并在工作到达时设置一个已知的使用者线程来完成工作。当线程结束时,它调用队列来从工作列表中删除自己,并将自己添加到等待列表中。
每个消费者线程都有一个“中止”原子,它可以通知线程提前结束。当线程执行内部循环时,会有一些延迟,但这并不重要。
如果新的工作从生产者到达队列,并且工作队列不为空,则可以设置工作线程的“中止”布尔值,并将它们的优先级设置为最小。然后,可以将新工作分派到池中的一个等待线程上,从而使其正常工作。
等待线程将需要一个'start‘函数,该函数用信号通知事件/sema/condvar等待thread..well..waits开启。这允许提供工作的生产者设置特定的线程运行,而不是“通常”的做法,即池中的任何线程都可以接手工作。
这样的设计允许“立即”开始新的工作,通过取消优先级来使以前的工作线程变得无关紧要,并避免线程/进程终止的开销。
https://stackoverflow.com/questions/67013844
复制相似问题