例如,我有4个来源,发表计量学。我想在一个队列/交换中复用/合并所有这些消息。
--------+----+----+----+----+ -------+---------+----+---------+---------+
Source1 | M1 | M2 | M3 | | => Result | M1 | M4 | M2 | M3 | M6 | M5 | M7 |
Source2 | M4 | | | M5 |
Source3 | | | M6 | |
Source4 | | | | M7 |For each queue:
* Read one message
* Publish message to the Result queue在RabbitMQ中是否有一种“本地”的方法来实现这一点,或者我应该编写自己的消费者/出版者?
编辑1
一些需要澄清的例子,比方说过了一段时间后
Processing "window"
+-+
Source1 |X|XXXXXXXXXXXXX
Source2 |Y|YYYYYYY
Source3 |Z|ZZZZZZZZZZ
Source4 |W|WW
+-+然后以后
Processing "window"
+-+
Source1 XXX|X|XXXXXXXXXX
Source2 YYY|Y|YYYY
Source3 ZZZ|Z|ZZZZZZZ
Source4 WWW| |
+-+然后以后
Processing "window"
+-+
Source1 XXXXXXXXX|X|XXXX
Source2 YYYYYYYY | |
Source3 ZZZZZZZZZ|Z|Z
Source4 WWW | |
+-+结果消费顺序是:X Y Z W X Y Z W X Y Z W X Y Z X Y Z X Y Z X Y Z X Y Z X Z X Z X Z X X X
X,Y,Z,W然后X,Y,Z.X Z ..。
这样,即使一个源是“垃圾邮件”来自其他来源的所有其他消息有机会被消费。
由于技术/财务原因,我一次只需要消耗一条消息。
消费者比生产者慢得多,但是生产者出版了很多东西,但偶尔也会出版。
如果每个源发布到绑定到同一个队列的交换,则结果可能是XXXXXXXXXXXXXX YYYYYYYY ZZZZZZZZZZZ WWW或XXXXX Y XXXXX YYY XXX YYYY ZZZZZZZZZZZ WWW (取决于每个源的发布速率)。
发布于 2020-10-07 20:34:44
我认为,只要运行一个订阅所有队列的脚本,就可以实现您想要的结果。
关键的要求是使用单个应用程序线程来处理所有消息,而不管它们来自哪个队列。您使用的语言和客户端库会有所不同--如果您使用的是PHP,那么您就必须不使用单线程,但是可能有一些客户端库假设每个回调位于单独的工作线程上,您将需要一些共享资源来阻止它们。
就实际的RabbitMQ方面而言,您需要:
basic.consume将消息推送给您;与使用basic.get进行显式轮询相比,通常建议这样做。basic.consume调用使用单个“通道”basic.qos如果有4个队列,A、B、C和D,它们在启动使用者时有不同数量的消息:
https://stackoverflow.com/questions/64233179
复制相似问题