首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么我的Kafka流应用程序的消费者组(app-id)的偏移在应用程序重新启动后会被重置?

为什么我的Kafka流应用程序的消费者组(app-id)的偏移在应用程序重新启动后会被重置?
EN

Stack Overflow用户
提问于 2019-01-11 11:05:32
回答 3查看 2.3K关注 0票数 6

我有一个Kafka流应用程序,每当我重新启动它时,它所消耗的主题的偏移就会被重置。因此,对于所有分区,延迟都会增加,应用程序需要重新处理所有数据。

更新:输出主题正在接收在应用程序重新启动后已经处理的大量事件,并不是像我在上一段中所说的那样,输入主题偏移正在被重置。但是,内部主题(KTABLE-SUPPRESS-STATE-STORE)偏移正在重置,请参见下面的注释。

在重新启动之前,我已经确保每个分区的滞后为1(这是输出主题)。属于该消费者组id (app-id)的所有使用者都是活动的。重新启动是立即的,大约需要30秒。

该应用程序仅使用一次作为处理保证。

我读过这个答案,对于Apache消费者群体,补偿是如何过期的?

我尝试过auto.offset.reset =最新的auto.offset.reset =最早的

这些主题的抵消似乎没有得到有效的承诺(但我对此并不确定)。

我假设在重新启动后,应用程序应该从该消费者组最新提交的偏移量中提取。

(KTABLE-SUPPRESS-STATE-STORE)更新:--我假设这是内部主题

是否确保在关闭之前提交所有消耗的偏移量?(在调用streams.close())之后

我很想知道这件事的任何线索。

更新

这是应用程序执行的代码:

代码语言:javascript
复制
final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

偏移复位只是和总是发生(在重新启动后)与KTABLE-SUPPRESS-STATE-STORE内部主题创建的卡夫卡流API。

我已经尝试过处理保证,确切地说是一次,至少一次

再一次,我将非常感谢任何有关这方面的线索。

更新:--这已经在2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)版本中解决了

EN

回答 3

Stack Overflow用户

发布于 2019-01-16 23:57:00

偏移复位只是和总是发生(在重新启动)与KTABLE-抑制状态存储内部主题创建的Kafka。

这是当前(版本2.1)所期望的行为,因为suppress()运算符仅在内存中工作。因此,在重新启动处理之前,必须从changelog主题重新创建抑制缓冲区。

注意,计划在将来的版本中让suppress()写入磁盘(cf )。https://issues.apache.org/jira/browse/KAFKA-7224)。这将避免从changelog主题重新创建缓冲区的开销。

票数 2
EN

Stack Overflow用户

发布于 2019-01-25 00:16:07

我认为@Matthias J. Sax的回复涵盖了压制的大部分内部内容。不过,我需要澄清一件事:当你说“重启应用程序”时,你到底做了什么?你是否优雅地关闭了整个应用程序,然后重新启动它?

票数 0
EN

Stack Overflow用户

发布于 2019-01-11 11:27:25

提交频率由参数commit.interval.ms控制。检查您的偏移量是否确实已提交。默认情况下,偏移每100毫秒或30秒提交一次,具体取决于处理保证配置。检查

票数 -2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54145281

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档