首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >multiprocessing.queue无法在枪手工作人员超时后获取数据

multiprocessing.queue无法在枪手工作人员超时后获取数据
EN

Stack Overflow用户
提问于 2017-10-10 02:29:43
回答 1查看 1.4K关注 0票数 0

我有一个烧瓶应用程序,并使用古尼康(同步模式)作为网络服务器。要异步推送信息,我使用“gunicorn服务器钩子”启动维护进程(multiprocessing.Process()),并使用multiprocessing.Queue() (实际上是logging.handlers.QueueHandler(队列)与python兼容)发送消息。但我发现,如果持枪工人在“关键工人超时”时重新启动,维护过程将不会从持枪工人发送的队列中获取消息( queue.qsize()而不是0,根据日志,它成功地将消息放入队列,但Queue.get(超时)会引发空异常),但可以从主进程获得消息。我的日志:

代码语言:javascript
复制
  34 pid:24831 wechatlog : 2017-10-10 06:32:43,552 wechat_middle.py[line:34] DEBUG recive <LogRecord: wechat, 40, /www_upload/src/api_server.py, 543, "{'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'}">
  35 pid:23930 wechat    : 2017-10-10 06:38:56,805 api_server.py[line:543] ERROR {'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'}
  36 pid:24831 wechatlog : 2017-10-10 06:38:56,807 wechat_middle.py[line:34] DEBUG recive <LogRecord: wechat, 40, /www_upload/src/api_server.py, 543, "{'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'}">
  37 pid:24887 wechat    : 2017-10-10 07:07:50,904 api_server.py[line:543] ERROR {'tag_list': 1, 'msg': 'company_test sid:8607550100000080 id: 8607550100000080 his: 1', 'lastsend': 'serial_error'}
  38 pid:24831 wechatlog : 2017-10-10 07:07:51,810 maintain_task.py[line:274] INFO current qsize: 1, debug_size: 0
  39 pid:24831 wechatlog : 2017-10-10 07:07:55,813 maintain_task.py[line:274] INFO current qsize: 1, debug_size: 1
  40 pid:24831 wechatlog : 2017-10-10 07:07:57,813 wechat_middle.py[line:25] INFO in debug mode, queue id 139972199063056, size 1
  41 pid:24831 wechatlog : 2017-10-10 07:07:59,816 wechat_middle.py[line:31] ERROR in debug mode, queue get nothing.
  42 pid:24831 wechatlog : 2017-10-10 07:07:59,816 maintain_task.py[line:274] INFO current qsize: 1, debug_size: 1
  43 pid:24831 wechatlog : 2017-10-10 07:08:00,817 maintain_task.py[line:281] ERROR queue is empty
  44 pid:24831 wechatlog : 2017-10-10 07:08:00,818 maintain_task.py[line:283] ERROR the message block the queue: None

在2017-10:10 06:38:56至2017-10-10 07:50之间,枪手的日志报告如下:

代码语言:javascript
复制
 [2017-10-10 06:41:08 +0800] [23906] [CRITICAL] WORKER TIMEOUT (pid:24838)

我的代码:

代码语言:javascript
复制
maintain_task.py
def wechat_push_thread(queue):
    we = wechat_middler_ware(queue=queue)
    wechat_log_logger = configs.make_logger_handler('wechatlog', filename='wechat')
    wechat_log_logger.info(f'queue id: {id(queue)}')
    debug_size = 0
    while True:
        try:
            we.listen(2)
        except Exception as e:
            wechat_log_logger.exception(e)
        # for debug
        if queue.qsize() > 0:
            wechat_log_logger.info(f'current qsize: {queue.qsize()}, debug_size: {debug_size}')
            if debug_size == queue.qsize():
                if we.debug_flag:
                    try:
                        msg = queue.get(timeout=1)
                    except Empty:
                        msg = None
                        wechat_log_logger.error(f'queue is empty')
                    wechat_log_logger.error(f'the message block the queue: {msg}')
                we.debug_flag = True
            debug_size = queue.qsize()
        else:
            we.debug_flag = False
            debug_size = 0
        # endfor debug
        if quit_event.wait(timeout=2):
            break
    logger.info('wechat_push_thread clean env')

wechat_middle.py
class wechat_middler_ware:
    def __init__(self, queue):
        self.q = queue
        self.logger = configs.make_logger_handler('wechatlog', filename='wechat')
        self.push_api = Push_Server(logger=self.logger)
        self.debug_flag = False

    def listen(self, timeout):
        while True:
            if self.debug_flag:
                self.logger.info(f'in debug mode, queue id {id(self.q)}, size {self.q.qsize()}')
            try:
                msg = self.q.get(timeout=timeout)
                self.logger.debug(f'recive {msg}')
            except Empty:
                if self.debug_flag:
                    self.logger.error(f'in debug mode, queue get nothing.')
                break
            else:
                ...
EN

回答 1

Stack Overflow用户

发布于 2017-10-20 07:02:27

根据py医生的说法:

警告如果在关联进程使用管道或队列时使用此方法,则管道或队列可能会损坏,其他进程可能无法使用。类似地,如果进程获得了锁或信号量等,则终止它可能会导致其他进程陷入死锁。

枪手的主人杀死了“超时”的工作人员,因此队列被其他进程所无法使用。现在,我使用multiprocessing.manager.queue而不是multiprocessing.queue。它可以很好地工作,即使工人被主人杀死。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46657799

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档