我总是为卡夫卡担心1.重复2.遗失记录
我在spring Kafka 2.2.2版本中做了以下修改来解决上面的问题。有人能确认一下这是不是正确的。
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;另外请确认,我是否需要在ConcurrentKafkaListenerContainerFactory中实现ConsumerRebalanceListener。
如果需要如何实现。
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//TODO code
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//TODO code
}
});发布于 2019-11-18 18:42:30
Kafka提供3种交付保证:最多一次,至少一次,恰好一次。
如果您的数据仅位于Kafka中,且您的处理结果是以Kafka写入的,则您可以获得精确的一次投递保证(参见此处的Kafka Streams库)。
如果您使用外部系统,例如其他数据库,Kafka最多只能保证一次,至少一次。在此场景下,您的应用程序必须确保从Kafka收到的消息不会被处理两次。你的应用程序可以通过将所有已处理的消息保存在数据库中来实现这一点,当收到新消息时,应用程序将首先检查该消息是否已处理。
有关此内容的更多信息,请阅读:
编辑:
MAX_POLL_INTERVAL_MS_CONFIG非常高。如果您的消息将被处理并保存在数据库中,30秒就足够了。
https://stackoverflow.com/questions/58907332
复制相似问题