我正在读关于SQS和消息队列的文章,我想知道当有一个快速的生产者和缓慢的消费者时会发生什么。会发生什么?消息堆积存储在哪里?它是否在内存堆栈上,最终发生堆栈溢出,服务器崩溃?这就是内存被用作有限队列的问题吗?
发布于 2020-01-13 05:00:48
如果持续使用消息的速度比生成消息的速度慢,则SQS (或您选择使用的任何其他消息系统)中的消息通常都会过期并丢失。如果生产者和消费者在相同的内存流中,那么缓慢的消费者可能会导致内存问题,在这种情况下,你可能会耗尽内存并崩溃,但在基于消息的体系结构中,这种情况不太典型。更常见的情况是,消息将存储在磁盘上,但通常仍有一个有限的保留期。
编辑
如果您使用内存中的无界队列,您可能会遇到问题,请使用以下示例:
from threading import Thread
import time
from multiprocessing import Queue
class SlowConsumer(Thread):
def __init__(self, queue: Queue):
self.queue = queue
Thread.__init__(self)
def run(self):
while True:
v = self.queue.get()
print(f"Processing #{v}")
time.sleep(1)
q = Queue()
c = SlowConsumer(q)
c.start()
i=0
while True:
q.put(i)
i+=1
time.sleep(.1)消费者总是会越来越落后于生产者。最后,该进程将耗尽可用内存。这是一种应该避免的病理情况--通常通过使用有界队列来避免。注意:实际上,同样的问题也可能存在于支持无限保留的外部队列中,比如Kafka,但我们不会耗尽内存,而是会耗尽磁盘。对于磁盘上的队列,最好也设置一个保留期,如果有一个合理的保留期的话。
https://stackoverflow.com/questions/59707536
复制相似问题