我使用python中的多线程和队列创建了简单的代码。我有一个主线程,它不断地在队列中添加数据(队列maxsize是2000),将有5个不同的线程从队列中取出并在某个特定的通道上发布到redis。
代码工作得很好,但是5到6个小时后,发布机制变得很慢。当用于从队列中删除数据的线程变慢时,并且当队列大小达到maxsize时,开始在流错误上抛出缓冲区。将数据添加到队列的速度与开始时相同。
此问题在不同配置的Linux系统上出现的情况有所不同。如何识别它抛出的是哪种错误?如何调试问题。
如上所述-代码非常简单,主线程需要一个接一个地添加队列中的数据,其他5个线程可以逐个从队列中取出数据。
共享代码
import redis
import logging
import sys
logging.basicConfig(level = logging.DEBUG)
redisPub = redis.StrictRedis(host='127.0.0.1', port=6379)
def main():
try:
recv_sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.IPPROTO_UDP)
recv_sock.bind(("", 8000))
recv_sock.setblocking(0)
recv_sock.settimeout(5)
except Exception,e:
exc_type, exc_obj, exc_tb = sys.exc_info()
logging.error("Socket Connection Unsuccessful")
print "Program halted."
sys.exit()
sendThEvent = threading.Event()
sendThEvent.clear()
sendPktQ = Queue.Queue(maxsize=2000)
for i in range(0,5):
thSend = threading.Thread(name = 'sendThread', target = sendThread , args = (sendPktQ,sendThEvent))
thSend.setDaemon(True)
thSend.start()
packetsCounting = 0
while True:
try:
recvData, recvAddr = recv_sock.recvfrom(2048)
sendPktQ.put(recvData)
packetsCounting += 1
else:
logging.error("There is some error")
except socket.timeout:
continue
except Exception,e:
exc_type, exc_obj, exc_tb = sys.exc_info()
logging.error(e)
def sendThread(sendPktQ,listenEvent):
if sendPktQ == None:
pass
else:
while True:
instanceSentCnt += 1
if sendPktQ.qsize() < 1:
event_is_set = listenEvent.wait(0)
packetDict = sendPktQ.get()
data = redisPub.publish('chnlName', packetToSend)
logging.info("packet size reached to ============================================ %s ------------ %s"%(len(packetToSend),data))
if __name__ == "__main__":
main()任何人的意见都将受到高度赞赏。
发布于 2020-04-19 17:02:29
我看到一个sendPktQ.get(),但没有看到sendPktQ.task_done(),例如:
async def _dispatch_packet(self, destination=None) -> None:
"""Send a command unless in listen_only mode."""
if not self.command_queue.empty():
cmd = self.command_queue.get()
if not (destination is None or self.config.get("listen_only")):
destination.write(bytearray(f"{cmd}\r\n".encode("ascii")))
await asyncio.sleep(0.05)
self.command_queue.task_done()https://stackoverflow.com/questions/60601192
复制相似问题