首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >队列不为空时queue.get()超时

队列不为空时queue.get()超时
EN

Stack Overflow用户
提问于 2022-08-26 19:39:09
回答 2查看 263关注 0票数 1

我试图实现一个多处理队列,该队列是从第二个进程函数中填充的。通过测试这个函数,它显示多进程工作正常,并且队列似乎正在填充。通过打印数据,确认了worker函数中的数据是正确的。此外,我已经证实,在运行queue.put(ret)后,qsize会增加。但是,运行queue.get()会导致代码挂起和超时。这似乎与我正在放入队列中的数据有关,因为我的worker函数中的一个简单的queue.put("hello")工作正常。知道为什么会发生这种事吗?我已附上有关守则如下。

代码语言:javascript
复制
def worker(queue):
    print("Running worker")
    ret = odrive.find_any()
    queue.put(ret)
    #print(ret)

def findODrives():
    M = []
    foundAll = False
    queue = Queue()
    print("Starting findODrives")
    while foundAll == False:
        doneFinding = False
        while doneFinding == False:
            print("Finding ODrive")
            p = multiprocessing.Process(target=worker, name="ODriveFind", args=(queue,))
            p.start()
            p.join(10)
            if p.is_alive():
                print("worker is running... let's kill it...")
                # Terminate worker
                p.terminate()
                p.join()
                doneFinding = True
            elif(queue.qsize() != 0):
                print("Queue is not zero")
                print((queue.qsize()))
                try:
                    obj = queue.get(timeout=10)
                except:
                    print("Failed to get data in queue")
                print(obj)
                M.append(obj)
                print("test")
                print("Found ODrive with serial number: " + str(M[-1].serial_number))

def main():
    while(True):
        inp = input("Please press enter to continue with an operation, or QUIT to exit...")
        if inp == "QUIT":
            exit()
        MList = findODrives()

if __name__ == "__main__":
    main()

编辑:忘记包含导入语句。我使用的是多处理队列,如下所示:

代码语言:javascript
复制
import odrive
from odrive.enums import *
from odrive.utils import *
import multiprocessing
from multiprocessing import Queue
EN

回答 2

Stack Overflow用户

发布于 2022-08-26 19:52:09

将此用于队列

代码语言:javascript
复制
import multiprocessing

manager = multiprocessing.Manager()
queue = manager.Queue()
票数 0
EN

Stack Overflow用户

发布于 2022-08-26 20:05:56

这并不直接回答这个问题,而是演示了如何使用队列在子进程之间进行通信。在这种情况下,一种客户机/服务器策略。

真正重要的是,您必须有某种方式告诉服务器,它不会接收更多的数据,以便它能够优雅地终止。所以,我们在这里要做的是向队列发送一些整数。当我们完成任务后,我们就什么也不派了。服务器接收到的任何非无的信息都会被打印出来。如果没有收到,它就终止。

希望这将帮助您在良好实践的基础上更进一步:

代码语言:javascript
复制
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager

def server(q):
    while (v := q.get()) is not None:
        print(v)

def client(q):
    for i in range(10):
        q.put(i)
    q.put(None)

def process():
    with Manager() as manager:
        q = manager.Queue()
        with ProcessPoolExecutor() as executor:
            wait(executor.submit(func, q) for func in (server, client))
                
if __name__ == '__main__':
    process()

输出:

代码语言:javascript
复制
0
1
2
3
4
5
6
7
8
9
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/73505788

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档