首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >python和队列中的线程会导致内存泄漏或内存错误

python和队列中的线程会导致内存泄漏或内存错误
EN

Stack Overflow用户
提问于 2020-03-09 21:04:51
回答 1查看 585关注 0票数 0

我使用python中的多线程和队列创建了简单的代码。我有一个主线程,它不断地在队列中添加数据(队列maxsize是2000),将有5个不同的线程从队列中取出并在某个特定的通道上发布到redis。

代码工作得很好,但是5到6个小时后,发布机制变得很慢。当用于从队列中删除数据的线程变慢时,并且当队列大小达到maxsize时,开始在流错误上抛出缓冲区。将数据添加到队列的速度与开始时相同。

此问题在不同配置的Linux系统上出现的情况有所不同。如何识别它抛出的是哪种错误?如何调试问题。

如上所述-代码非常简单,主线程需要一个接一个地添加队列中的数据,其他5个线程可以逐个从队列中取出数据。

共享代码

代码语言:javascript
复制
   import redis
   import logging
   import sys
   logging.basicConfig(level = logging.DEBUG)
   redisPub = redis.StrictRedis(host='127.0.0.1', port=6379)

def main():
    try:
        recv_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
        recv_sock.bind(("", 8000))
        recv_sock.setblocking(0)
        recv_sock.settimeout(5)
    except Exception,e:
        exc_type, exc_obj, exc_tb = sys.exc_info()
        logging.error("Socket Connection Unsuccessful")
        print "Program halted."
        sys.exit()

    sendThEvent     = threading.Event()
    sendThEvent.clear()
    sendPktQ = Queue.Queue(maxsize=2000)
    for i in range(0,5):
        thSend = threading.Thread(name = 'sendThread', target = sendThread , args = (sendPktQ,sendThEvent))
        thSend.setDaemon(True)
        thSend.start()

    packetsCounting = 0
    while True:
        try:
            recvData, recvAddr = recv_sock.recvfrom(2048)
            sendPktQ.put(recvData)
            packetsCounting += 1

            else:
                logging.error("There is some error")
        except socket.timeout:
            continue
        except Exception,e:
            exc_type, exc_obj, exc_tb = sys.exc_info()
            logging.error(e)



def sendThread(sendPktQ,listenEvent):
    if sendPktQ == None:
        pass
    else:
        while True:
            instanceSentCnt += 1
            if sendPktQ.qsize() < 1:
                event_is_set = listenEvent.wait(0)

            packetDict = sendPktQ.get()
            data = redisPub.publish('chnlName', packetToSend)
            logging.info("packet size reached to ============================================ %s ------------ %s"%(len(packetToSend),data))



if __name__ == "__main__":
    main()

任何人的意见都将受到高度赞赏。

EN

回答 1

Stack Overflow用户

发布于 2020-04-19 17:02:29

我看到一个sendPktQ.get(),但没有看到sendPktQ.task_done(),例如:

代码语言:javascript
复制
async def _dispatch_packet(self, destination=None) -> None:
    """Send a command unless in listen_only mode."""
    if not self.command_queue.empty():
        cmd = self.command_queue.get()

        if not (destination is None or self.config.get("listen_only")):
            destination.write(bytearray(f"{cmd}\r\n".encode("ascii")))
            await asyncio.sleep(0.05)

        self.command_queue.task_done()

请参阅queue library docs

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60601192

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档