我们使用只有一次交付的处理器(通过生产者提交消费者偏移量),并且需要了解当从kafka-cluster-1中的主题消费消息并生成到kafka-cluster-2上的主题(反之亦然)时,是否可能发生这种情况。
这是事务处理器中的一段代码:
messageProducer.beginTransaction(partitionId)
resultPublisher.publish(partitionId, resultTopic, messageRecord.key(), result)
val offsetAndMetadata = messageConsumer.getUncommittedOffsets(listenTopic, messageRecord)
messageProducer.sendOffsetsToTransaction(partitionId, offsetAndMetadata, consumerGroupId)
messageProducer.commitTransaction(partitionId)我的理解是,生产者将尝试在同一集群中的消费者主题上提交偏移量。
我做了一些研究,但找不到任何与多个集群相关的东西。
这是完全可能的吗?
发布于 2019-02-20 20:58:45
您可以将偏移量“手动”发送到同一集群上您自己的主题,生成的消息将发送到该集群。这样,您就可以使用事务提供的保证。
您需要为偏移量创建自己的主题,类似于Kafka的内部__consumer_offsets,您应该使用groupId、topic、partition作为键,并使用最近的偏移量(已经读取或要读取)作为值。记住使用日志压缩。
AFAIK不可能有跨两个不同集群的事务。
https://stackoverflow.com/questions/54784800
复制相似问题