我想从一个kafka集群消费消息并发布到另一个kafka集群。想知道如何使用spring-kafka进行配置吗?
发布于 2021-01-08 23:00:59
只需使用不同的bootstrap.servers属性配置消费者和生产者工厂。
如果使用的是Spring Boot,请参见
和
如果要创建自己的工厂@Bean,请在其中设置属性。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#connecting
发布于 2021-01-09 00:15:19
您可以使用spring cloud stream kafka绑定器。
创建两个流,一个用于消费,另一个用于生产。
针对消费者
public interface InputStreamExample{
String INPUT = "consumer-in";
@Input(INPUT)
MessageChannel readFromKafka();
}对于生产者
public interface ProducerStreamExample{
String OUTPUT = "produce-out";
@Output(OUTPUT)
MessageChannel produceToKafka();
}对于消费消息的使用:
@StreamListener(value = ItemStream.INPUT)
public void processMessage(){
/*
code goes here
*/
}用于生产
//here producerStreamExample is instance of ProducerStreamExample
producerStreamExample.produceToKafka().send(/*message goes here*/);现在使用绑定器配置消费者集群和生产者集群,消费者集群可以使用consumer-in,生产集群可以使用producing out。
属性文件
spring.cloud.stream.binders.kafka-a.environment.spring.cloud.stream.kafka.binder.brokers:<consumer cluster>
#other properties for this binders
#bind kafka-a to consumer-in
spring.cloud.stream.bindings.consumer-in.binder=kafka-a #kafka-a binding to consumer-in
#similary other properties of consumer-in, like
spring.cloud.stream.bindings.consumer-in.destination=<topic>
spring.cloud.stream.bindings.consumer-in.group=<consumer group>
#now configure cluster to produce
spring.cloud.stream.binders.kafka-b.environment.spring.cloud.stream.kafka.binder.brokers:<cluster where to produce>
spring.cloud.stream.bindings.produce-out.binder=kafka-b #here kafka-b, binding to produce-out
#similary you can do other configuration like topic
spring.cloud.stream.bindings.produce-out.destination=<topic>有关更多配置,请参阅:https://cloud.spring.io/spring-cloud-stream-binder-kafka/spring-cloud-stream-binder-kafka.html
https://stackoverflow.com/questions/65624819
复制相似问题