我使用的是spring,如果我不设置ConcurrentKafkaListenerContainerFactory的并发性,那么一切都正常,当我将它设置为大于1的数字时,我得到了一个例外:
消费者:type=app-info,id=client-3
我的配置:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConcurrency(kafkaConfig.getConcurrency());
factory.getContainerProperties().setAckMode(AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}物业:
kafka.enable-auto-commit=false
kafka.client-id=client-1
kafka.concurrency=2发布于 2017-04-25 12:00:38
我开了一个在github上发表这篇文章。当前不支持为每个线程设置不同的client.id。
作为一个解决方案,您可以为每个KafkaMessageListenerContainer启动一个单独的ConcurrentMessageListenerContainer (这是ConcurrentMessageListenerContainer内部所做的)。
编辑
虽然不理想,但您可以省略client.id,卡夫卡客户端将为每个客户端生成一个(consumer-1、consumer-2等)
https://stackoverflow.com/questions/43601004
复制相似问题