首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >流经常重新创建商店。

流经常重新创建商店。
EN

Stack Overflow用户
提问于 2019-03-01 09:42:48
回答 1查看 277关注 0票数 0

在流应用程序中,我使用交互式查询和状态器,以便更快地扩展和使用来自主题的数据。然而,我经常在日志中看到警告:

代码语言:javascript
复制
anomaly-timeline-3                    | 2019-03-01 08:43:58,177 INFO 
anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Reinitializing StandbyTask TaskId: 1_0
anomaly-timeline-3                    |         ProcessorTopology:
anomaly-timeline-3                    |                 KSTREAM-SOURCE-0000000012:
anomaly-timeline-3                    |                         topics:         [anomaly-timeline-two-minutes-error-score-repartition]
anomaly-timeline-3                    |                         children:       [KSTREAM-REDUCE-0000000009]
anomaly-timeline-3                    |                 KSTREAM-REDUCE-0000000009:
anomaly-timeline-3                    |                         states:         [two-minutes-error-score]
anomaly-timeline-3                    | Partitions [anomaly-timeline-two-minutes-error-score-repartition-0]
anomaly-timeline-3                    |  from changelogs [anomaly-timeline-two-minutes-error-score-changelog-0]
anomaly-timeline-3                    | 2019-03-01 08:43:58,474 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.clients.consumer.internals.Fetcher [Consumer clientId=anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2-restore-consumer, groupId=] Resetting offset for partition anomaly-timeline-two-minutes-error-score-changelog-0 to offset 14787709.
anomaly-timeline-3                    | 2019-03-01 08:48:57,991 WARN anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Updating StandbyTasks failed. Deleting StandbyTasks stores to recreate from scratch. org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {anomaly-timeline-one-hour-error-score-changelog-0=14818811}
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1099)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
anomaly-timeline-3                    |
anomaly-timeline-3                    | org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {anomaly-timeline-one-hour-error-score-changelog-0=14818811}
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:1002)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:508)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1259)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1187)
anomaly-timeline-3                    |         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.maybeUpdateStandbyTasks(StreamThread.java:1099)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:767)
anomaly-timeline-3                    |         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:736)
anomaly-timeline-3                    | 2019-03-01 08:48:57,995 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.streams.processor.internals.StreamThread stream-thread [anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2] Reinitializing StandbyTask TaskId: 3_0
anomaly-timeline-3                    |         ProcessorTopology:
anomaly-timeline-3                    |                 KSTREAM-SOURCE-0000000022:
anomaly-timeline-3                    |                         topics:         [anomaly-timeline-one-hour-error-score-repartition]
anomaly-timeline-3                    |                         children:       [KSTREAM-REDUCE-0000000019]
anomaly-timeline-3                    |                 KSTREAM-REDUCE-0000000019:
anomaly-timeline-3                    |                         states:         [one-hour-error-score]
anomaly-timeline-3                    | Partitions [anomaly-timeline-one-hour-error-score-repartition-0]
anomaly-timeline-3                    |  from changelogs [anomaly-timeline-one-hour-error-score-changelog-0]
anomaly-timeline-3                    | 2019-03-01 08:48:58,303 INFO anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2 org.apache.kafka.clients.consumer.internals.Fetcher [Consumer clientId=anomaly-timeline-a3b6b7d6-3bd8-40a6-b070-874964bed3ee-StreamThread-2-restore-consumer, groupId=] Resetting offset for partition anomaly-timeline-one-hour-error-score-changelog-0 to offset 14818854.

因此,似乎是因为某种原因,卡夫卡重新初始化了一个备用任务,然后在更新中失败。如果我理解日志记录,这可能会导致从零开始重新创建存储。

所以我的问题是:

  • 尽管这些都是警告,但卡夫卡似乎并没有像它应该的那样运行。这个假设正确吗?
  • 为什么这个StandbyTask失败了?
  • 它是在删除我的州商店吗?
  • 我应该以及如何为这个流线程配置重置策略吗?
  • 为什么它要重置这个变化的偏移量?
EN

回答 1

Stack Overflow用户

发布于 2019-03-23 01:28:56

尽管这些都是警告,但卡夫卡似乎并没有像它应该的那样运行。这个假设正确吗?

是。

为什么这个StandbyTask失败了?

StandbyTask似乎是从无效的偏移量中提取的。不过,这并不是真的失败。

它是在删除我的州商店吗?

在这种情况下,只有本地存储被删除,changelog主题不受影响。本地存储被删除,因为它与changelog主题不同步。这样就可以从头开始重新创建商店。

我应该以及如何为这个流线程配置重置策略吗?

无法为还原使用者配置重置策略。如果发生上述情况,Kafka将删除本地存储和变更主题上的seeksToBeginning(),以便从头开始重新创建存储。

为什么它要重置这个变化的偏移量?

也许StandbyTask落后了?

您可以尝试为org.apache.kafka.streams.processor.internals.ProcessorStateManager启用跟踪日志记录。在本地检查点文件中跟踪StandyTasks的偏移量,该文件是在提交时编写的。偏移将登录到commit上:

代码语言:javascript
复制
log.trace("Writing checkpoint: {}", this.checkpointableOffsets);

这应该有助于找出一个StandbyTask是否落后。在这种情况下,您可能需要更多的线程或更多的实例来避免这种情况。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54941869

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档