首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >每秒发送消息的实现机制

每秒发送消息的实现机制
EN

Stack Overflow用户
提问于 2021-06-18 14:47:08
回答 1查看 221关注 0票数 0

我需要每秒钟向用户发送来自rabbitMQ的消息。为此,我使用两个线程。第一个接收来自rabbitMQ的消息并将它们排队。第二个线程从队列中获取消息,对它们进行处理,并通过web套接字将它们发送给用户。我的问题是如何最好地实施这一机制。现在我的代码如下所示:

代码语言:javascript
复制
def __init__(self):
    self.data = {}
    self.queue = Queue()

def download_data(self):
    started_time = time.perf_counter()
    while True:
        if time.perf_counter() - started_time >= 1:
            started_time = time.perf_counter()
            for _ in range(self.queue.qsize()):
                self.append_data(self.queue.get())
            self.sync_send_data_to_user(self.data)
            self.data = {}


def message_handle(self, ch, method, properties, body):
    message = json.loads(body)
    self.queue.put(message)
    ch.basic_ack(delivery_tag=method.delivery_tag)

def start_consuming(self):
    connection = pika.BlockingConnection(pika.ConnectionParameters(RABBITMQ_HOST))
    channel = connection.channel()
    rabbit_queue = channel.queue_declare("to_client")
    channel.basic_consume(on_message_callback=self.message_handle, queue="to_client")
    threading.Thread(target=channel.start_consuming, args=[]).start()

此代码工作,但有时它发送的消息超过1次每秒。第一个线程是rabbitMQ使用者回调函数message_handle,第二个线程是无限循环函数download_datamessage_handle检查是否超过1秒,如果发生这种情况,message_handle将阻塞使用队列的线程,直到检索队列中的所有项为止。

更新:--我想我让自己变得更难了,最后我改变了一些逻辑。现在我有两个线程,其中一个处理来自rabbitmq的消息并将它们发送到队列,函数message_handle。python中的队列是线程安全的,所以它应该可以工作。第二个线程检查是否已过1秒,如果已从队列中检索所有数据并将其发送给用户,则为download_data函数。

EN

回答 1

Stack Overflow用户

发布于 2021-06-20 07:38:06

  1. 你把AsyncIO (协同线)和线程混合在一起。这两种方法是完全不同的并发方式,产生不同的行为。更合理的方法是只使用这两种方法中的一种来开发您的解决方案。我会选择一个基于异步的解决方案,并在可能的情况下确保我的await,而不是阻止事件循环。
  2. Python受到吉尔问题的困扰。因此,即使您同时执行任务,您实际上在任何给定时间都绑定到单个线程执行。如果我添加上一节(创建多个线程),您可能会遇到意外的计时问题。Python确实不适合完美的计时(它还深受本地执行环境/加载的影响)
  3. 如果您关心的是确保真正的并发执行,则可以查看python多处理。同样,结果可能会有所不同。
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68037241

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档