我正在使用RabbitMQ和MassTransit库在多服务体系结构中开发一个服务。
该服务通过消费者接收交易。根据过滤规则(在配置json文件中设置并通过选项导入到服务),确定需要发送事务信息的地址,并将项发布到单独的队列中以供将来发送。
在用于发送的队列使用者中,我只将数据发送到为该事务指定的地址。
现在需要分批发送数据。在这里,批处理消费者的MassTransit功能可能会有所帮助。
但是调度上也有困难。例如,使用者接收4项交易。其中两个需要送到一个地址,另两个要送到另一个地址。在代码中,我为每个地址创建了两个带有事务的数组,并尝试发送。如果两个数组都发送成功,那么一切都很好。如果两个数组都收到错误,则整个批处理将进行重试,这也很好。但是,如果其中一个数组发送成功,而另一个数组未发送,则整个批处理将重复。
实际的问题是,是否可以为一个实体创建两个单独的队列(使用一个接口)并按照规则分别向每个队列发送数据?或者有另一种方法来解决这个问题,根据发送地址将事务划分成批处理?
发布于 2022-10-16 08:25:56
可以为一个实体创建两个单独的队列吗?
我想请你简化这个过程。如果体系结构如此混乱以至于读者需要30分钟才能理解这个问题,那么它就太复杂了。考虑在12个月内支持此代码。
但是,一个选项是使用发送到批处理的批处理。
第一批读取自定义消息头(例如__filterby),将消息分成两个不同的队列(端点)。
然后,根据逻辑将代码重新批处理到专用端点/使用者。这意味着有一个端点/队列。这里有一些伪代码来解释我的意思。
public async Task Consume(ConsumeContext<Batch<OrderAudit>> context)
{
var arraya = Contect.Messages(m => m?.Headers?.filterby == 'arraya';
ConsumeContext<IArrayA> a = arraya;
// Send
var arrayb = Contect.Messages(m => m?.Headers?.filterby == 'arrayb';
ConsumeContext<IArrayB> b = arrayb;
// send
}而且,这种感觉接近于基于主题/路由_键将RabbitMQ交换直接传输到多个队列。您可以重新设计解决方案来修复此模式。
可能有用的参考资料
https://masstransit-project.com/troubleshooting/common-gotchas.html#sharing-a-queue
https://masstransit-project.com/usage/producers.html#message-initializers
https://www.rabbitmq.com/tutorials/tutorial-five-python.html
https://stackoverflow.com/questions/74051881
复制相似问题