我不想使用@KafkaListener或@StreamListener,但我想手动投票kafka。我使用的是春云启动流卡夫卡库,我有以下卡夫卡制作人
@Autowired
private KafkaTemplate<byte[], byte[]> template;
public void sendMessages() {
IntStream.range(2)
.forEach(val -> {
template.send("kafka-topic", "hello".getBytes());
});
}我想手动投票相同的卡夫卡主题使用春季-卡夫卡。我尝试了以下的消费者
@Autowired
private ConsumerFactory consumerFactory;
public void processKafkaRecords() throws InterruptedException {
Consumer<byte[], byte[]> consumer = consumerFactory.createConsumer("0", "consumer-1");
consumer.subscribe(Arrays.asList("kafka-topic"));
ConsumerRecords<byte[], byte[]> poll = consumer.poll(Duration.ofMillis(1000));
poll.forEach(record -> {
log.info("record {}", record);
});
}application.properties
spring.cloud.stream.bindings.pollableInput.destination=kafka-topic
spring.cloud.stream.bindings.pollableInput.group=kafka-topic
spring.cloud.stream.bindings.pollableInput.consumer.batch-mode=true
spring.cloud.stream.bindings.pollableInput.consumer.header-mode=none
spring.cloud.stream.bindings.pollableInput.consumer.use-native-decoding=true
spring.cloud.stream.kafka.bindings.pollableInput.consumer.autoCommitOffset=false然而,消费者永远得不到生产者发送的任何记录。有什么想法,如何手动投票一个卡夫卡的主题?
发布于 2020-02-06 03:32:44
可能有几个原因:
Duration.ofMillis(1000) -尝试增加时间,1在某些情况下可能太低,除非你的客户端和卡夫卡都在同一台机器上运行。因为poll(Duration)的文档表示,如果超时过期,空记录集将是--首先启动生产者,然后启动使用者,并且没有将偏移复位策略设置为最早,这样就看不到任何消息,因为默认情况下,使用者将从最新的偏移量中消费。因此,尝试设置来自同一个使用者组的以下auto.offset.reset=earliest。
https://stackoverflow.com/questions/60083681
复制相似问题