我在RabbitMQ中使用MassTransit。按照自定义中间件的official documentation page上的示例,我尝试在消息消费管道上创建一个过滤器,该过滤器将根据特定条件过滤出一些消息。我的过滤器看起来像这样:
public class MyCustomFilter<T> : IFilter<T>
where T : class, ConsumeContext
{
public void Probe(ProbeContext context) { }
public async Task Send(T context, IPipe<T> next)
{
if (/* certain condition */)
{
await next.Send(context);
}
}
}问题在于,当消息未沿管道向下传递时(即未调用await next.Send(context) ),消息将在_skipped消费者RabbitMQ队列中结束。有没有办法防止消息进入该队列?
发布于 2020-03-17 00:15:24
skipped (死信)队列通过DeadLetterFilter调用获取消息。代码如下:
async Task IFilter<ReceiveContext>.Send(ReceiveContext context, IPipe<ReceiveContext> next)
{
await next.Send(context).ConfigureAwait(false);
if (context.IsDelivered || context.IsFaulted)
return;
context.LogSkipped();
await _deadLetterPipe.Send(context).ConfigureAwait(false);
}因此,您可以想象,如果上下文将IsDelivered或IsFaulted设置为true,则消息将不会进入死信队列。
如果您加入了过滤器,那么您的消息最终会进入有毒(error)队列,所以我猜这不是一个选择。
您可以通过对筛选器中的过滤邮件执行以下操作来模拟正在传递的邮件:
public Task Send(T context, IPipe<T> next)
=> condition
? next.Send(context)
: context.NotifyConsumed(context as ConsumeContext<MyMessage>, TimeSpan.Zero, "Filtered");https://stackoverflow.com/questions/60707712
复制相似问题