我已经配置了一个用于接收批量消息的使用者。在发生任何错误时,使用者也被配置为重试和重发。批次消费者工作得很好。但是,正如我注意到的,如果批处理消息中发生任何错误,整个批处理就会出错。
如下面的示例所示,让我们假设Message2出现故障,那么整个批处理似乎会在出现故障之前再次重试/重传。
我的查询:是否有任何方法可以配置使用者,以便只在批处理中错误消息,尝试重传或出错,以及批处理中的其他消息将被恢复。
public class MyConsumer : IConsumer<Batch<MyClass>>, IConsumer<Fault<MyClass>>
{
public async Task Consume(ConsumeContext<Batch<MyClass>> context)
{
for (int i = 0; i < context.Message.Length; i++)
{
if (i == 2)
{
throw new Exception();
}
}
}
public async Task Consume(ConsumeContext<Fault<MyClass>> context)
{
Console.WriteLine($"Error in Context. Name :{context.Message.Message.Name}");
}
}发布于 2021-09-22 22:02:09
如果配置正确,整个批将作为一个单元重新尝试,这意味着您需要以另一种方式跟踪单个项目。一种方法是让每个项目都是一个独立的幂等运算,但此时真正的问题是:“为什么要使用批处理进行离散操作?”
如果要单独处理批处理中的每条消息,只需使用常规的使用者,省去复杂性和麻烦。
如果出于任何原因必须使用批处理,请考虑捕获单个消息的异常,并发布某种类型的事件指示单个项失败。
https://stackoverflow.com/questions/69289367
复制相似问题