有一个消息到达的Kafka主题。我需要阅读一条消息,对其进行处理,然后继续下一条消息。消息处理可能会失败,如果发生这种情况,处理必须重试几次(比方说10次),然后才能继续处理下一条消息。如果处理失败10次,则需要删除该消息,我们应该继续处理下一条消息。
我们使用reactor-kafka,所有的处理都需要反应式的。
下面是我尝试解决这个问题的方法:
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.subscribe();(这里receiver是一个KafkaReceiver<String, String>)。
这适用于没有任何异常的情况,如果有异常,processRecord()将重试10次。这里的问题是,如果在允许的10次尝试后仍然失败,则偏移量不会被提交(当然),因此下次从Kafka读取相同的偏移量,因此,有效地,处理将永远停留在“错误”偏移量上。
我尝试实现以下显而易见的想法:如果异常比retryBackoff()操作符“传递得更远”,则提交当前偏移量。要提交偏移量,我们需要一个ReceiverRecord,所以我在ExceptionWithRecord中添加了异常的包装和当前记录:
// in processRecord()
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))和
Flux.defer(receiver::receive)
.concatMap(this::processRecord)
.retryBackoff(10, ofMillis(500))
.concatMap(record -> record.receiverOffset().commit())
.onErrorResume(this::extractRecordAndMaybeCommit)
.subscribe();extractRecordAndMaybeCommit()从给定的异常中提取ReceiverRecord并提交它:
return record.receiverOffset().commit();这种传递记录并在重试耗尽后提交该记录的方法起作用,并调用.commit()方法。但它没有任何效果。
事实证明,当任何异常进入上面的反应式管道时,都会调用DefaultKafkaReceiver.dispose(),因此任何后续的提交尝试都会被忽略。因此,事实证明,一旦发布者“看到”任何异常,就不可能立即提交偏移量。
如何在仍然使用reactor-kafka的情况下实现“出现N个错误后提交”行为
发布于 2020-04-17 01:30:53
我找不到一个“合适的”和简单的方法来解决这个任务,所以我不得不求助于状态和副作用的蛮力:手动计算重试次数,当尝试次数超过限制时停止重试。
这是柜台:
public class RetryCounter {
private final Map<TopicPartition, OffsetAttempts> partitionOffsets = new ConcurrentHashMap<>();
public void onRecord(PartitionOffset partitionOffset) {
var offsetAttempts = offsetAttemptsFor(partitionOffset);
offsetAttempts.increaseAttemptNumber(partitionOffset.offset());
offsetAttempts.pruneTooAncientFor(partitionOffset.offset());
}
public long currentAttemptFor(PartitionOffset partitionOffset) {
var offsetAttempts = offsetAttemptsFor(partitionOffset);
long result = offsetAttempts.currentAttemptFor(partitionOffset.offset());
return result;
}
private OffsetAttempts offsetAttemptsFor(PartitionOffset partitionOffset) {
return partitionOffsets.computeIfAbsent(partitionOffset.topicPartition(), key -> new OffsetAttempts());
}
private static class OffsetAttempts {
private final NavigableMap<Long, Long> offsetAttempts = new ConcurrentSkipListMap<>();
// this must exceed your Kafka batch size
private static final int ANTIQUITY_SPREAD_THRESHOLD = 10000;
public void increaseAttemptNumber(long offset) {
offsetAttempts.merge(offset, 0L, (oldValue, value) -> oldValue + 1);
}
public long currentAttemptFor(long offset) {
return offsetAttempts.getOrDefault(offset, 0L);
}
@Override
public String toString() {
return offsetAttempts.toString();
}
public void pruneTooAncientFor(long offset) {
long antiquityThreshold = offset - ANTIQUITY_SPREAD_THRESHOLD;
offsetAttempts.headMap(antiquityThreshold).clear();
}
}
}然后我们计算每个偏移量的重试次数(对于每个分区独立),并在超过重试次数时停止处理:
RetryCounter counter = new RetryCounter();
Flux.defer(receiver::receive)
.concatMap(record -> {
counter.onRecord(record);
if (counter.currentAttemptFor(record) >= 10) {
// we tried 10 times, it's 11th, so let's log the error and return
// to avoid calling processRecord() so that there is no error
// in the reactive pipeline and we are able to commit
logFinalError(record);
return Mono.just(record).flatMap(this::commitRecord);
} else {
return processRecord(record).thenReturn(record).flatMap(this::commitRecord);
}
})
.retryBackoff(Long.MAX_VALUE, ofMillis(500))
.subscribe();https://stackoverflow.com/questions/61124782
复制相似问题