首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用spring的kafka集群间的事务传输消息

使用spring的kafka集群间的事务传输消息
EN

Stack Overflow用户
提问于 2019-04-26 18:28:22
回答 2查看 331关注 0票数 0

我有两个kafka集群。我需要使用kafka-spring实现它们之间的同步。

代码语言:javascript
复制
[cluster A, topic A]  <-- [spring app] --> [cluster B, topic B]

我创建了注释为@Transactional的侦听器,它使用kafkaTemplate发布消息。当有到两个集群的连接时,这可以完美地工作。当与目标群集的连接丢失时-监听程序似乎仍会确认新消息,但不会发布这些消息。我尝试了手动破解listener,禁用自动提交等,但它们似乎不像我认为的那样工作。当连接重新联机时,消息永远不会被传递。我需要你的帮助。

代码语言:javascript
复制
    @KafkaListener(topics = "A", containerFactory = "syncLocalListenerFactory")
    public void consumeLocal(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, @Payload SyncEvent message, Acknowledgment ack) {
        kafkaSyncRemoteTemplate.send("B", key, message);
        ack.acknowledge();
    }

我正在获取日志:

代码语言:javascript
复制
2019-04-26 12:11:40.808  WARN 21304 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Connection to node 1001 could not be established. Broker may not be available.
2019-04-26 12:11:40.828  WARN 21304 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-2, groupId=app-sync] Connection to node 1001 could not be established. Broker may not be available.
2019-04-26 12:11:47.829 ERROR 21304 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='...' and payload='...' to topic B:

org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for sync-2: 30002 ms has passed since batch creation plus linger time

2019-04-26 12:11:47.829 ERROR 21304 --- [ad | producer-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='...' and payload='...' to topic B:

org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for sync-2: 30002 ms has passed since batch creation plus linger time

-编辑

这里的kafkaProperties是从application.properties文件读取的默认kafka-spring属性,但在本例中它们都是默认的

代码语言:javascript
复制
    @Bean
    public ConsumerFactory<String, SyncEvent> syncLocalConsumerFactory() {
        Map<String, Object> config = kafkaProperties.buildConsumerProperties();

        config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaProperties.getStreams().getApplicationId() + "-sync");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        config.put(JsonDeserializer.VALUE_DEFAULT_TYPE, SyncEvent.class);
        config.put(JsonDeserializer.TRUSTED_PACKAGES, "app.structures");

        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        DefaultKafkaConsumerFactory<String, SyncEvent> cf = new DefaultKafkaConsumerFactory<>(config);
        cf.setValueDeserializer(new JsonDeserializer<>(SyncEvent.class, objectMapper));
        return cf;
    }

    @Bean(name = "syncLocalListenerFactory")
    public ConcurrentKafkaListenerContainerFactory<String, SyncEvent> kafkaSyncLocalListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SyncEvent> factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(syncLocalConsumerFactory());
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setAckOnError(false);
        factory.setErrorHandler(new SeekToCurrentErrorHandler(0));
        return factory;
    }
EN

回答 2

Stack Overflow用户

发布于 2019-04-26 18:47:58

This website描述了如何设置错误处理程序(使用SeekToCurrentErrorHandler),它可能会对您有所帮助。从Spring documentation

SeekToCurrentErrorHandler:一个错误处理程序,用于查找剩余记录中每个主题的当前偏移量。用于在消息失败后对分区进行倒带,以便可以重放。

票数 0
EN

Stack Overflow用户

发布于 2019-10-27 18:14:26

这是因为kafka事务不能跨集群。您的@Transactional注释没有任何意义,因此无论发布到集群B是否成功,偏移量都会提交给集群A。

您目前可以为跨群集流实现的最佳保证是“至少一次”处理,您可以通过确保只有在来自群集B的目标代理确认消息之后才将偏移量提交到群集A来达到它。

有关更多信息,请参阅我的博客帖子- https://medium.com/@harelopler/kafka-cross-cluster-stream-reaching-at-least-once-semantics-c74ed0eb1a54

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55865666

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档