首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将项目放回queue.Queue

如何将项目放回queue.Queue
EN

Stack Overflow用户
提问于 2017-04-15 12:02:56
回答 2查看 3.2K关注 0票数 6

如何将项目返回到queue.Queue?如果任务失败,这在线程或多处理中很有用,这样任务就不会丢失。

docs for queue.Queue.get()说函数可以“从队列中移除和返回项目”,但我相信这里使用的“返回”一词指的是将项目返回给调用线程的函数,而不是将其放回项目队列中。下面的示例代码演示了这一点,它只是无限地阻塞主线程的第二个queue.Queue.get()调用,而不是在线程中进行print()调用。

代码语言:javascript
复制
import time
import threading
import queue


def threaded_func():
    thread_task = myqueue.get()
    print('thread_task: ' + thread_task)

myqueue = queue.Queue()
myqueue.put('some kind of task')
main_task = myqueue.get()
print('main_task: ' + main_task)

t = threading.Thread(target=threaded_func)
t.daemon = True
t.start()

time.sleep(5)
myqueue.get()   # This blocks indefinitely

我必须相信有一种简单的方法可以把任务放回去,那么它是什么呢?在两个操作中调用task_done(),然后调用put(),并在两个操作中将其放回队列中,这不是原子操作,因此可能会导致物品丢失。

一种可能的,但笨重的解决方案是尝试再次执行任务,但然后您必须添加几行额外的行来处理这种复杂性,我甚至不确定所有失败的任务是否一定能以这种方式恢复。

EN

回答 2

Stack Overflow用户

发布于 2017-04-15 13:11:46

并非所有失败的任务都可以恢复。除非有理由认为它们会在以后通过,否则您不应该重试它们。例如,如果您的工作项是一个URL,并且连接失败计数,那么您可以实现某种最大重试次数。

你最大的问题是你还没有实现一个可行的工作者模型。您需要2个队列才能与worker进行双向对话。一个用于发布工作项,另一个用于接收状态。一旦您有了它,接收者总是可以决定将该消息塞回到工作队列中。这是一个懒惰的工作者的例子,它只是传递它所告诉的。

代码语言:javascript
复制
import threading
import queue

def worker(in_q, out_q):
    while True:
        try:
            task, data = in_q.get()
            print('worker', task, data)
            if task == "done":
                return
            elif task == "pass this":
                out_q.put(("pass", data))
            else:
                out_q.put(("fail", data))
        except Exception as e:
            print('worker exception', e)
            out_q.put("exception", data)

in_que = queue.Queue()
out_que = queue.Queue()

work_thread = threading.Thread(target=worker, args=(in_que, out_que))
work_thread.start()

# lets make every other task a fail
in_que.put(('pass this', 0))
in_que.put(('fail this', 1))
in_que.put(('pass this', 2))
in_que.put(('fail this', 3))
in_que.put(('pass this', 4))
in_que.put(('fail this', 5))

pending_tasks = 6

while pending_tasks:
    status, data = out_que.get()
    if status == "pass":
        pending_tasks -= 1
    else:
        # make failing tast pass
        in_que.put(('pass this', data))

in_que.put(("done", None))
work_thread.join()
print('done')
票数 3
EN

Stack Overflow用户

发布于 2020-10-16 15:24:03

您可以使用PriorityQueue,其中条目按优先级顺序返回。

代码语言:javascript
复制
from tornado.queues import PriorityQueue

q = PriorityQueue()
q.put((2, 'item 1'))
q.put((2, 'item 2'))

q.get()  # item 1
q.put((1, 'item 1'))  # put item 1 back on the queue

q.get()  # item 1
q.get()  # item 2

请注意,从get()返回的也是优先级数和项的元组。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/43422071

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档