首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Kafka中使用实时消息

在Kafka中使用实时消息
EN

Stack Overflow用户
提问于 2016-09-30 17:41:07
回答 2查看 3.1K关注 0票数 3

我已经启动了我的zookeeper和Kafka服务器。我启动了我的Kafka制作器,它发送了10条主题为'xxx‘的消息。然后叫住了我的卡夫卡制作人。现在我启动了我的Kafka用户,并订阅了主题'xxx‘。我的消费者使用我的Kafka生成器发送的10条消息,该生成器现在没有运行。我需要我的Kafka消费者应该只消费运行Kafka服务器的消息。有什么方法可以做到这一点吗?以下是我的消费者属性中的内容。

代码语言:javascript
复制
props.put("bootstrap.servers", "localhost:9092");
    String consumeGroup = "cg1";
    props.put("group.id", consumeGroup);
    props.put("enable.auto.commit", "true");
    props.put("auto.offset.reset", "earliest");
    props.put("auto.commit.interval.ms", "100");
    props.put("heartbeat.interval.ms", "3000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
EN

回答 2

Stack Overflow用户

发布于 2016-09-30 17:47:19

设置以下属性:

代码语言:javascript
复制
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

它告诉消费者只阅读最新的消息,即在消费者启动后发布的消息。

票数 2
EN

Stack Overflow用户

发布于 2020-12-15 14:11:56

请新建一个主题,并将属性ConsumerConfig.AUTO_OFFSET_RESET_CONFIG保留为"latest“。确保不提交偏移量。也就是说,我们不应该使用commitSync()

默认情况下,接收器从每个已分配分区的上次提交的偏移量开始使用记录。如果提交的偏移量不可用,则使用为KafkaConsumer配置的偏移量重置策略ConsumerConfig#AUTO_OFFSET_RESET_CONFIG将起始偏移量设置为分区上最早或最晚的偏移量。

我认为在你的例子中,你正在提交偏移量,或者对于给定的主题,有一个可用的提交偏移量。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39788414

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档