首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >应用程序显示,每次重新启动时重置分区事件-x偏移量为0。

应用程序显示,每次重新启动时重置分区事件-x偏移量为0。
EN

Stack Overflow用户
提问于 2019-07-19 08:46:47
回答 1查看 3.5K关注 0票数 2

我有一个应用程序,它从一个主题(事件)中读取并执行一个简单的处理:

代码语言:javascript
复制
@Configuration
class EventKStreamConfiguration {

    private val logger = LoggerFactory.getLogger(javaClass)

    @StreamListener
    fun process(@Input("event") eventStream: KStream<String, EventReceived>) {

        eventStream.foreach { key, value ->
            logger.info("--------> Processing Event {}", value)
            // Save in DB
        }
    }
}

这个应用程序使用的是来自Cloud的Kafka环境,其中有一个事件主题,有6个分区。完整的配置是:

代码语言:javascript
复制
spring:
  application:
    name: events-processor
  cloud:
    stream:
      schema-registry-client:
        endpoint: ${schema-registry-url:http://localhost:8081}
      kafka:
        streams:
          binder:
            brokers: ${kafka-brokers:localhost}
            configuration:
              application:
                id: ${spring.application.name}
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              schema:
                registry:
                  url: ${spring.cloud.stream.schema-registry-client.endpoint}
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
              processing:
                guarantee: exactly_once
          bindings:
            event:
              consumer:
                valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
      bindings:
        event:
          destination: event

  data:
    mongodb:
      uri: ${mongodb-uri:mongodb://localhost/test}

server:
  port: 8085

logging:
  level:
    org.springframework.kafka.config: debug

---

spring:
  profiles: confluent-cloud
  cloud:
    stream:
      kafka:
        streams:
          binder:
            autoCreateTopics: false
            configuration:
              retry:
                backoff:
                  ms: 500
              security:
                protocol: SASL_SSL
              sasl:
                mechanism: PLAIN
                jaas:
                  config: xxx
              basic:
                auth:
                  credentials:
                    source: USER_INFO
              schema:
                registry:
                  basic:
                    auth:
                      user:
                        info: yyy

消息正在由KStream正确处理。--如果我重新启动应用程序--它们不会重新处理。注意:我不希望它们被重新处理,所以这种行为是可以的。

但是,启动日志显示了一些奇怪的信息:

  1. 首先,它显示还原使用者客户端的创建。自动偏移复位无:
代码语言:javascript
复制
2019-07-19 10:20:17.120  INFO 82473 --- [           main] o.a.k.s.p.internals.StreamThread         : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] Creating restore consumer client
2019-07-19 10:20:17.123  INFO 82473 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = none
  1. 然后,它创建了一个消费者客户端的自动偏移复位最早。
代码语言:javascript
复制
2019-07-19 10:20:17.235  INFO 82473 --- [           main] o.a.k.s.p.internals.StreamThread         : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] Creating consumer client
2019-07-19 10:20:17.241  INFO 82473 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = earliest
  1. 启动日志的最后跟踪显示偏移重置为0。这发生在应用程序每次重新启动时:
代码语言:javascript
复制
2019-07-19 10:20:31.577  INFO 82473 --- [-StreamThread-1] o.a.k.s.p.internals.StreamThread         : stream-thread [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
2019-07-19 10:20:31.578  INFO 82473 --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f] State transition from REBALANCING to RUNNING
2019-07-19 10:20:31.669  INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-3 to offset 0.
2019-07-19 10:20:31.669  INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-0 to offset 0.
2019-07-19 10:20:31.669  INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-1 to offset 0.
2019-07-19 10:20:31.669  INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-5 to offset 0.
2019-07-19 10:20:31.670  INFO 82473 --- [events-processor] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=events-processor-9a8069c4-3fb6-4d76-a207-efbbadd52b8f-StreamThread-1-consumer, groupId=events-processor] Resetting offset for partition event-4 to offset 0.
  1. 配置两个使用者的原因是什么?
  2. 为什么第二个有auto.offset.reset = earliest,而我还没有显式地配置它,而且Kafka的默认值是最新的?
  3. 我想要默认(auto.offset.reset =最新)行为,而且它似乎运行良好。然而,这与我在日志中看到的不相矛盾吗?

更新:

我会这样重新表述第三个问题:为什么日志显示分区在每次重新启动时被重新设置为0,尽管如此,没有消息被重新传递到KStream?

更新2:

我简化了场景,这次是使用本地Kafka Streams应用程序。这种行为与所观察到的完全相同。但是,检查使用者组和分区(我发现)是有意义的。

KStream:

代码语言:javascript
复制
fun main() {

    val props = Properties()
    props[StreamsConfig.APPLICATION_ID_CONFIG] = "streams-wordcount"
    props[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
    props[StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG] = 0
    props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name
    props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] = Serdes.String().javaClass.name

    val builder = StreamsBuilder()

    val source = builder.stream<String, String>("streams-plaintext-input")

    source.foreach { key, value -> println("$key $value") }

    val streams = KafkaStreams(builder.build(), props)
    val latch = CountDownLatch(1)

    // attach shutdown handler to catch control-c
    Runtime.getRuntime().addShutdownHook(object : Thread("streams-wordcount-shutdown-hook") {
        override fun run() {
            streams.close()
            latch.countDown()
        }
    })

    try {
        streams.start()
        latch.await()
    } catch (e: Throwable) {
        exitProcess(1)
    }

    exitProcess(0)
}

这就是我所看到的:

1)对于空主题,启动显示重置所有分区以偏移0:

代码语言:javascript
复制
07:55:03.885 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-2 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-4 to offset 0.
07:55:03.886 [streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-3549a54e-49db-4490-bd9f-7156e972021a-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0

2)我在主题中添加了一条消息,并检查了使用者组,发现记录在第4分区中:

代码语言:javascript
复制
TOPIC                   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                                         HOST            CLIENT-ID
streams-plaintext-input 0          -               0               -               streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1      streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 5          -               0               -               streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1      streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 1          -               0               -               streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1      streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 2          -               0               -               streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1      streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 3          -               0               -               streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1      streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer
streams-plaintext-input 4          1               1               0               streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer-905a307a-4c49-4d8b-ac2e-5525ba2e8a8e /127.0.0.1      streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer

3)重新启动应用程序。现在,重置只影响空分区(0、1、2、3、5):

代码语言:javascript
复制
07:57:39.477 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-2 to offset 0.
07:57:39.478 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
07:57:39.478 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
07:57:39.479 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
07:57:39.479 [streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-b1565eca-7d80-4550-97d2-e78ead62a840-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0.

4)我插入另一条消息,检查使用者组状态,结果相同:记录在分区2中,当重新启动应用程序时,它只重置空分区(0、1、3、5):

代码语言:javascript
复制
TOPIC                   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                                                         HOST            CLIENT-ID
streams-plaintext-input 0          -               0               -               streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1      streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 5          -               0               -               streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1      streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 1          -               0               -               streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1      streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 2          1               1               0               streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1      streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 3          -               0               -               streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1      streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
streams-plaintext-input 4          1               1               0               streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer-cb04e2bd-598f-455f-b913-1370b4144dd6 /127.0.0.1      streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer
代码语言:javascript
复制
08:00:42.313 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-3 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-0 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-1 to offset 0.
08:00:42.314 [streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=streams-wordcount-addb08ed-62ce-47f9-a446-f2ee0592c53d-StreamThread-1-consumer, groupId=streams-wordcount] Resetting offset for partition streams-plaintext-input-5 to offset 0.
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-19 10:49:28

  1. 配置两个使用者的原因是什么?

Restore Consumer Client是一个致力于容错和状态管理的专用用户。它负责从变更主题中恢复状态。它是从应用程序使用者客户端单独显示的。您可以在这里找到更多信息:https://docs.confluent.io/current/streams/monitoring.html#kafka-restore-consumer-client-id

  1. 当我还没有显式地配置auto.offset.reset =,而且Kafka的默认值是最新的,为什么第二个是最早的呢?

你说得对,auto.offset.reset默认值是卡夫卡消费者中的latest。但是在Spring中,使用者startOffset的默认值是earliest。因此,它在第二个消费者中显示了earliest。此外,它还依赖于spring.cloud.stream.bindings.<channelName>.group绑定。如果它是显式设置的,那么startOffset设置为earliest,否则它将设置为anonymous使用者的latest

参考资料:泉云流卡夫卡消费者属性

  1. 我想要默认(auto.offset.reset =最新)行为,而且它似乎运行良好。然而,这与我在日志中看到的不相矛盾吗?

对于anonymous使用者组,startOffset的默认值将是latest

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57108767

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档