我有一个Kafka流应用程序,每当我重新启动它时,它所消耗的主题的偏移就会被重置。因此,对于所有分区,延迟都会增加,应用程序需要重新处理所有数据。
更新:输出主题正在接收在应用程序重新启动后已经处理的大量事件,并不是像我在上一段中所说的那样,输入主题偏移正在被重置。但是,内部主题(KTABLE-SUPPRESS-STATE-STORE)偏移正在重置,请参见下面的注释。
在重新启动之前,我已经确保每个分区的滞后为1(这是输出主题)。属于该消费者组id (app-id)的所有使用者都是活动的。重新启动是立即的,大约需要30秒。
该应用程序仅使用一次作为处理保证。
我读过这个答案,对于Apache消费者群体,补偿是如何过期的?。
我尝试过auto.offset.reset =最新的和auto.offset.reset =最早的。
这些主题的抵消似乎没有得到有效的承诺(但我对此并不确定)。
我假设在重新启动后,应用程序应该从该消费者组最新提交的偏移量中提取。
(KTABLE-SUPPRESS-STATE-STORE)更新:--我假设这是内部主题
是否确保在关闭之前提交所有消耗的偏移量?(在调用streams.close())之后
我很想知道这件事的任何线索。
更新
这是应用程序执行的代码:
final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
.stream(inputTopicNames, Consumed.with(..., ...)
.withTimestampExtractor(...);
events
.filter((k, v) -> ...)
.flatMapValues(v -> ...)
.flatMapValues(v -> ...)
.selectKey((k, v) -> v)
.groupByKey(Grouped.with(..., ...))
.windowedBy(
TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))
.advanceBy(Duration.ofSeconds(windowSizeInSecs))
.grace(Duration.ofSeconds(windowSizeGraceInSecs)))
.reduce((agg, new) -> {
...
return agg;
})
.suppress(Suppressed.untilWindowCloses(
Suppressed.BufferConfig.unbounded()))
.toStream()
.to(outPutTopicNameOfGroupedData, Produced.with(..., ...));偏移复位只是和总是发生(在重新启动后)与KTABLE-SUPPRESS-STATE-STORE内部主题创建的卡夫卡流API。
我已经尝试过处理保证,确切地说是一次和,至少一次。
再一次,我将非常感谢任何有关这方面的线索。
更新:--这已经在2.2.1 (https://issues.apache.org/jira/browse/KAFKA-7895)版本中解决了
发布于 2019-01-16 23:57:00
偏移复位只是和总是发生(在重新启动)与KTABLE-抑制状态存储内部主题创建的Kafka。
这是当前(版本2.1)所期望的行为,因为suppress()运算符仅在内存中工作。因此,在重新启动处理之前,必须从changelog主题重新创建抑制缓冲区。
注意,计划在将来的版本中让suppress()写入磁盘(cf )。https://issues.apache.org/jira/browse/KAFKA-7224)。这将避免从changelog主题重新创建缓冲区的开销。
发布于 2019-01-25 00:16:07
我认为@Matthias J. Sax的回复涵盖了压制的大部分内部内容。不过,我需要澄清一件事:当你说“重启应用程序”时,你到底做了什么?你是否优雅地关闭了整个应用程序,然后重新启动它?
发布于 2019-01-11 11:27:25
提交频率由参数commit.interval.ms控制。检查您的偏移量是否确实已提交。默认情况下,偏移每100毫秒或30秒提交一次,具体取决于处理保证配置。检查这
https://stackoverflow.com/questions/54145281
复制相似问题