首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何解决spring kafka中的重平衡或重复记录问题

如何解决spring kafka中的重平衡或重复记录问题
EN

Stack Overflow用户
提问于 2019-11-18 10:41:26
回答 1查看 174关注 0票数 0

我总是为卡夫卡担心1.重复2.遗失记录

我在spring Kafka 2.2.2版本中做了以下修改来解决上面的问题。有人能确认一下这是不是正确的。

代码语言:javascript
复制
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;

另外请确认,我是否需要在ConcurrentKafkaListenerContainerFactory中实现ConsumerRebalanceListener。

如果需要如何实现。

代码语言:javascript
复制
   factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            //TODO code 
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            //TODO code
        }
    });
EN

回答 1

Stack Overflow用户

发布于 2019-11-18 18:42:30

Kafka提供3种交付保证:最多一次,至少一次,恰好一次。

如果您的数据仅位于Kafka中,且您的处理结果是以Kafka写入的,则您可以获得精确的一次投递保证(参见此处的Kafka Streams库)。

如果您使用外部系统,例如其他数据库,Kafka最多只能保证一次,至少一次。在此场景下,您的应用程序必须确保从Kafka收到的消息不会被处理两次。你的应用程序可以通过将所有已处理的消息保存在数据库中来实现这一点,当收到新消息时,应用程序将首先检查该消息是否已处理。

有关此内容的更多信息,请阅读:

Message Reliability In Kafka

Kafka Consumer

编辑:

MAX_POLL_INTERVAL_MS_CONFIG非常高。如果您的消息将被处理并保存在数据库中,30秒就足够了。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58907332

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档