首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Alpakka kafka消费者偏移量

Alpakka kafka消费者偏移量
EN

Stack Overflow用户
提问于 2020-04-03 08:07:11
回答 1查看 503关注 0票数 1

我在scala中使用Alpakka-kafka来消费Kafka主题。下面是我的代码:

代码语言:javascript
复制
    val kafkaConsumerSettings: ConsumerSettings[String, String] =
      ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
        .withBootstrapServers(kafkaConfig.server)
        .withGroupId(kafkaConfig.group)
        .withProperties(
          ConsumerConfig.MAX_POLL_RECORDS_CONFIG       -> "100",
          ConsumerConfig.AUTO_OFFSET_RESET_CONFIG      -> "earliest",
          CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
        )

    Consumer
        .plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
        .runWith(Sink.foreach(println))

但是,消费者只从topic中第一条未提交的消息开始轮询。我希望总是从偏移量0开始,而不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?

EN

回答 1

Stack Overflow用户

发布于 2020-04-17 20:35:15

我认为您需要添加几个配置项:

  1. ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False,这样你的工作永远不会保存任何offset
  2. ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",所以你的工作从头开始。

如果您的作业在过去已经提交了偏移量,则可能需要将其偏移量重置为最早。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61003164

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档