我在scala中使用Alpakka-kafka来消费Kafka主题。下面是我的代码:
val kafkaConsumerSettings: ConsumerSettings[String, String] =
ConsumerSettings(actorSystem, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaConfig.server)
.withGroupId(kafkaConfig.group)
.withProperties(
ConsumerConfig.MAX_POLL_RECORDS_CONFIG -> "100",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",
CommonClientConfigs.SECURITY_PROTOCOL_CONFIG -> "SSL"
)
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics(kafkaConfig.topic))
.runWith(Sink.foreach(println))但是,消费者只从topic中第一条未提交的消息开始轮询。我希望总是从偏移量0开始,而不管提交的消息是什么。使用Alpakka消费者,如何手动指定偏移量?
发布于 2020-04-17 20:35:15
我认为您需要添加几个配置项:
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> False,这样你的工作永远不会保存任何offsetConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "earliest",所以你的工作从头开始。如果您的作业在过去已经提交了偏移量,则可能需要将其偏移量重置为最早。
https://stackoverflow.com/questions/61003164
复制相似问题