首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >多路复用RabbitMQ消息

多路复用RabbitMQ消息
EN

Stack Overflow用户
提问于 2020-10-06 20:12:43
回答 1查看 396关注 0票数 1

例如,我有4个来源,发表计量学。我想在一个队列/交换中复用/合并所有这些消息。

代码语言:javascript
复制
--------+----+----+----+----+        -------+---------+----+---------+---------+
Source1 | M1 | M2 | M3 |    |   =>   Result | M1 | M4 | M2 | M3 | M6 | M5 | M7 |
Source2 | M4 |    |    | M5 |
Source3 |    |    | M6 |    |
Source4 |    |    |    | M7 |
代码语言:javascript
复制
For each queue:
 * Read one message
 * Publish message to the Result queue

在RabbitMQ中是否有一种“本地”的方法来实现这一点,或者我应该编写自己的消费者/出版者?

编辑1

一些需要澄清的例子,比方说过了一段时间后

代码语言:javascript
复制
     Processing "window"
        +-+
Source1 |X|XXXXXXXXXXXXX
Source2 |Y|YYYYYYY
Source3 |Z|ZZZZZZZZZZ
Source4 |W|WW
        +-+

然后以后

代码语言:javascript
复制
     Processing "window"
           +-+
Source1 XXX|X|XXXXXXXXXX
Source2 YYY|Y|YYYY
Source3 ZZZ|Z|ZZZZZZZ
Source4 WWW| |
           +-+

然后以后

代码语言:javascript
复制
     Processing "window"
                 +-+
Source1 XXXXXXXXX|X|XXXX
Source2 YYYYYYYY | |
Source3 ZZZZZZZZZ|Z|Z
Source4 WWW      | |
                 +-+

结果消费顺序是:X Y Z W X Y Z W X Y Z W X Y Z X Y Z X Y Z X Y Z X Y Z X Z X Z X Z X X X

X,Y,Z,W然后X,Y,Z.X Z ..。

这样,即使一个源是“垃圾邮件”来自其他来源的所有其他消息有机会被消费。

由于技术/财务原因,我一次只需要消耗一条消息。

消费者比生产者慢得多,但是生产者出版了很多东西,但偶尔也会出版。

如果每个源发布到绑定到同一个队列的交换,则结果可能是XXXXXXXXXXXXXX YYYYYYYY ZZZZZZZZZZZ WWWXXXXX Y XXXXX YYY XXX YYYY ZZZZZZZZZZZ WWW (取决于每个源的发布速率)。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-10-07 20:34:44

我认为,只要运行一个订阅所有队列的脚本,就可以实现您想要的结果。

关键的要求是使用单个应用程序线程来处理所有消息,而不管它们来自哪个队列。您使用的语言和客户端库会有所不同--如果您使用的是PHP,那么您就必须不使用单线程,但是可能有一些客户端库假设每个回调位于单独的工作线程上,您将需要一些共享资源来阻止它们。

就实际的RabbitMQ方面而言,您需要:

  • 为服务器注册订阅,以便使用basic.consume将消息推送给您;与使用basic.get进行显式轮询相比,通常建议这样做。
  • 对所有basic.consume调用使用单个“通道”
  • 使用人工致谢,以便消息保持在队列中,直到进程完成。
  • 每个队列预取限制设置为1的basic.qos

如果有4个队列,A、B、C和D,它们在启动使用者时有不同数量的消息:

  1. 当您第一次订阅时,预取限制将意味着来自每个队列的一条消息将被发送到通道;将它们称为A1、B1、C1和D1。
  2. 客户端库将依次在应用程序中引发一个异步事件。
  3. 您的单个辅助线程将处理其中的第一个事件,并开始处理消息A1。
  4. 除非您手动确认该消息,否则不会有其他消息到达。
  5. 一旦确认了第一条消息(A1),就可以从该队列(A2)预取一条新消息。
  6. 同时,您的辅助线程将解除阻塞并处理下一个已经引发的事件,用于消息B1。
  7. 只有在处理了B1、C1和D1的挂起事件之后,工作线程才会看到消息A2的事件
  8. 只要队列中有消息等待,它们就会以循环的方式被处理。即使除了一个队列之外的所有队列都变成空的,它们也会在消息到达时立即返回到轮转状态,因为只有一条来自繁忙队列的消息将被预取,其余的将只在RabbitMQ服务器上等待。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64233179

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档