在流应用程序中,我使用交互式查询和状态器,以便更快地扩展和使用来自主题的数据。然而,我经常在日志中看到警告:
anomaly-timeline-3 | 2019-03-01 08:43:58,177 INFO
anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Reinitializing StandbyTask TaskId: 1_0
anomaly-timeline-3 | ProcessorTopology:
anomaly-timeline-3 | KSTREAM-SOURCE-0000000012:
anomaly-timeline-3 | topics: [anomaly-timeline-two-minutes-error-score-repartition]
anomaly-timeline-3 | children: [KSTREAM-REDUCE-0000000009]
anomaly-timeline-3 | KSTREAM-REDUCE-0000000009:
anomaly-timeline-3 | states: [two-minutes-error-score]
anomaly-timeline-3 | Partitions [anomaly-timeline-two-minutes-error-score-repartition-0]
anomaly-timeline-3 | from changelogs [anomaly-timeline-two-minutes-error-score-changelog-0]
anomaly-timeline-3 | 2019-03-01 08:43:58,474 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.clients.consumer.internals.Fetcher [Consumer clientId=anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2-restore-consumer, groupId=] Resetting offset for partition anomaly-timeline-two-minutes-error-score-changelog-0 to offset 14787709.
anomaly-timeline-3 | 2019-03-01 08:48:57,991 WARN anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Updating StandbyTasks failed. Deleting StandbyTasks stores to recreate from scratch. org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {anomaly-timeline-one-hour-error-score-changelog-0=14818811}
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1099)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
anomaly-timeline-3 |
anomaly-timeline-3 | org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {anomaly-timeline-one-hour-error-score-changelog-0=14818811}
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
anomaly-timeline-3 | at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1099)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
anomaly-timeline-3 | at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
anomaly-timeline-3 | 2019-03-01 08:48:57,995 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Reinitializing StandbyTask TaskId: 3_0
anomaly-timeline-3 | ProcessorTopology:
anomaly-timeline-3 | KSTREAM-SOURCE-0000000022:
anomaly-timeline-3 | topics: [anomaly-timeline-one-hour-error-score-repartition]
anomaly-timeline-3 | children: [KSTREAM-REDUCE-0000000019]
anomaly-timeline-3 | KSTREAM-REDUCE-0000000019:
anomaly-timeline-3 | states: [one-hour-error-score]
anomaly-timeline-3 | Partitions [anomaly-timeline-one-hour-error-score-repartition-0]
anomaly-timeline-3 | from changelogs [anomaly-timeline-one-hour-error-score-changelog-0]
anomaly-timeline-3 | 2019-03-01 08:48:58,303 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.clients.consumer.internals.Fetcher [Consumer clientId=anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2-restore-consumer, groupId=] Resetting offset for partition anomaly-timeline-one-hour-error-score-changelog-0 to offset 14818854.因此,似乎是因为某种原因,卡夫卡重新初始化了一个备用任务,然后在更新中失败。如果我理解日志记录,这可能会导致从零开始重新创建存储。
所以我的问题是:
发布于 2019-03-23 01:28:56
尽管这些都是警告,但卡夫卡似乎并没有像它应该的那样运行。这个假设正确吗?
是。
为什么这个StandbyTask失败了?
StandbyTask似乎是从无效的偏移量中提取的。不过,这并不是真的失败。
它是在删除我的州商店吗?
在这种情况下,只有本地存储被删除,changelog主题不受影响。本地存储被删除,因为它与changelog主题不同步。这样就可以从头开始重新创建商店。
我应该以及如何为这个流线程配置重置策略吗?
无法为还原使用者配置重置策略。如果发生上述情况,Kafka将删除本地存储和变更主题上的seeksToBeginning(),以便从头开始重新创建存储。
为什么它要重置这个变化的偏移量?
也许StandbyTask落后了?
您可以尝试为org.apache.kafka.streams.processor.internals.ProcessorStateManager启用跟踪日志记录。在本地检查点文件中跟踪StandyTasks的偏移量,该文件是在提交时编写的。偏移将登录到commit上:
log.trace("Writing checkpoint: {}", this.checkpointableOffsets);这应该有助于找出一个StandbyTask是否落后。在这种情况下,您可能需要更多的线程或更多的实例来避免这种情况。
https://stackoverflow.com/questions/54941869
复制相似问题