首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >春季云流手册Poller Kafka

春季云流手册Poller Kafka
EN

Stack Overflow用户
提问于 2020-02-05 20:32:27
回答 1查看 1.1K关注 0票数 0

我不想使用@KafkaListener或@StreamListener,但我想手动投票kafka。我使用的是春云启动流卡夫卡库,我有以下卡夫卡制作人

代码语言:javascript
复制
  @Autowired
  private KafkaTemplate<byte[], byte[]> template;

  public void sendMessages() {
    IntStream.range(2)
             .forEach(val -> {
               template.send("kafka-topic", "hello".getBytes());
             });
  }

我想手动投票相同的卡夫卡主题使用春季-卡夫卡。我尝试了以下的消费者

代码语言:javascript
复制
 @Autowired
  private ConsumerFactory consumerFactory;

  public void processKafkaRecords() throws InterruptedException {
    Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
    consumer.subscribe(Arrays.asList("kafka-topic"));
    ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
    poll.forEach(record -> {
      log.info("record {}", record);
    });
  }

application.properties

代码语言:javascript
复制
spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true

spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false

然而,消费者永远得不到生产者发送的任何记录。有什么想法,如何手动投票一个卡夫卡的主题?

EN

回答 1

Stack Overflow用户

发布于 2020-02-06 03:32:44

可能有几个原因:

  1. Duration.ofMillis(1000) -尝试增加时间,1在某些情况下可能太低,除非你的客户端和卡夫卡都在同一台机器上运行。因为poll(Duration)的文档表示,如果超时过期,空记录集将是--首先启动生产者,然后启动使用者,并且没有将偏移复位策略设置为最早,这样就看不到任何消息,因为默认情况下,使用者将从最新的偏移量中消费。因此,尝试设置来自同一个使用者组的以下auto.offset.reset=earliest
  2. Another使用者可能正在运行,并且只有一个分区,或者使用者组已经处于最后一个偏移量。在本例中,您可以尝试更改使用者组id.

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60083681

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档