首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在几次尝试处理失败后将偏移量提交到reactor-kafka中的Kafka

在几次尝试处理失败后将偏移量提交到reactor-kafka中的Kafka
EN

Stack Overflow用户
提问于 2020-04-09 23:34:34
回答 1查看 1K关注 0票数 2

有一个消息到达的Kafka主题。我需要阅读一条消息,对其进行处理,然后继续下一条消息。消息处理可能会失败,如果发生这种情况,处理必须重试几次(比方说10次),然后才能继续处理下一条消息。如果处理失败10次,则需要删除该消息,我们应该继续处理下一条消息。

我们使用reactor-kafka,所有的处理都需要反应式的。

下面是我尝试解决这个问题的方法:

代码语言:javascript
复制
Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -> record.receiverOffset().commit())
        .subscribe();

(这里receiver是一个KafkaReceiver<String, String>)。

这适用于没有任何异常的情况,如果有异常,processRecord()将重试10次。这里的问题是,如果在允许的10次尝试后仍然失败,则偏移量不会被提交(当然),因此下次从Kafka读取相同的偏移量,因此,有效地,处理将永远停留在“错误”偏移量上。

我尝试实现以下显而易见的想法:如果异常比retryBackoff()操作符“传递得更远”,则提交当前偏移量。要提交偏移量,我们需要一个ReceiverRecord,所以我在ExceptionWithRecord中添加了异常的包装和当前记录:

代码语言:javascript
复制
// in processRecord()
.onErrorMap(ex -> new ExceptionWithRecord(record, ex))

代码语言:javascript
复制
Flux.defer(receiver::receive)
        .concatMap(this::processRecord)
        .retryBackoff(10, ofMillis(500))
        .concatMap(record -> record.receiverOffset().commit())
        .onErrorResume(this::extractRecordAndMaybeCommit)
        .subscribe();

extractRecordAndMaybeCommit()从给定的异常中提取ReceiverRecord并提交它:

代码语言:javascript
复制
return record.receiverOffset().commit();

这种传递记录并在重试耗尽后提交该记录的方法起作用,并调用.commit()方法。但它没有任何效果。

事实证明,当任何异常进入上面的反应式管道时,都会调用DefaultKafkaReceiver.dispose(),因此任何后续的提交尝试都会被忽略。因此,事实证明,一旦发布者“看到”任何异常,就不可能立即提交偏移量。

如何在仍然使用reactor-kafka的情况下实现“出现N个错误后提交”行为

EN

回答 1

Stack Overflow用户

发布于 2020-04-17 01:30:53

我找不到一个“合适的”和简单的方法来解决这个任务,所以我不得不求助于状态和副作用的蛮力:手动计算重试次数,当尝试次数超过限制时停止重试。

这是柜台:

代码语言:javascript
复制
public class RetryCounter {
    private final Map<TopicPartition, OffsetAttempts> partitionOffsets = new ConcurrentHashMap<>();

    public void onRecord(PartitionOffset partitionOffset) {
        var offsetAttempts = offsetAttemptsFor(partitionOffset);
        offsetAttempts.increaseAttemptNumber(partitionOffset.offset());
        offsetAttempts.pruneTooAncientFor(partitionOffset.offset());
    }

    public long currentAttemptFor(PartitionOffset partitionOffset) {
        var offsetAttempts = offsetAttemptsFor(partitionOffset);
        long result = offsetAttempts.currentAttemptFor(partitionOffset.offset());

        return result;
    }

    private OffsetAttempts offsetAttemptsFor(PartitionOffset partitionOffset) {
        return partitionOffsets.computeIfAbsent(partitionOffset.topicPartition(), key -> new OffsetAttempts());
    }

    private static class OffsetAttempts {
        private final NavigableMap<Long, Long> offsetAttempts = new ConcurrentSkipListMap<>();

        // this must exceed your Kafka batch size
        private static final int ANTIQUITY_SPREAD_THRESHOLD = 10000;

        public void increaseAttemptNumber(long offset) {
            offsetAttempts.merge(offset, 0L, (oldValue, value) -> oldValue + 1);
        }

        public long currentAttemptFor(long offset) {
            return offsetAttempts.getOrDefault(offset, 0L);
        }

        @Override
        public String toString() {
            return offsetAttempts.toString();
        }

        public void pruneTooAncientFor(long offset) {
            long antiquityThreshold = offset - ANTIQUITY_SPREAD_THRESHOLD;

            offsetAttempts.headMap(antiquityThreshold).clear();
        }
    }
}

然后我们计算每个偏移量的重试次数(对于每个分区独立),并在超过重试次数时停止处理:

代码语言:javascript
复制
RetryCounter counter = new RetryCounter();
Flux.defer(receiver::receive)
        .concatMap(record -> {
            counter.onRecord(record);
            if (counter.currentAttemptFor(record) >= 10) {
                // we tried 10 times, it's 11th, so let's log the error and return
                // to avoid calling processRecord() so that there is no error
                // in the reactive pipeline and we are able to commit
                logFinalError(record);
                return Mono.just(record).flatMap(this::commitRecord);
            } else {
                return processRecord(record).thenReturn(record).flatMap(this::commitRecord);
            }
        })
        .retryBackoff(Long.MAX_VALUE, ofMillis(500))
        .subscribe();
票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61124782

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档