我有一个消息网关,它在输入通道接收http Json请求。
我想发送两个事件给卡夫卡作为部分相同的请求,一个事件的接收和处理一个事件。
用实现这一目标的最佳方法是什么?
我就是这样做的,但不确定是否有更好的方法:
@Bean
public IntegrationFlow processMessage() {
return IntegrationFlows
.from("inputChannel")
.routeToRecipients(r -> r.recipient("inputChannel2")
.recipient("inputChannel3"))
.get();
}
@Bean
public IntegrationFlow sendReceived(MessageTransformer messageTransformer) {
return IntegrationFlows
.from("inputChannel2")
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
.get();
}
@Bean
public IntegrationFlow sendProcessed(MessageTransformer2 messageTransformer) {
return IntegrationFlows
.from("inputChannel3")
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
.get();
}谢谢。
发布于 2019-11-25 23:39:22
好吧,你做的一切都没问题。实现相同目标的另一种方法是使用poublishSubscribeChannel来代替:
https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows
此外,您还可以考虑为Kafka消息处理程序配置一个单独的配置,并从这两个分发流中为其重用一个通道。
发布于 2019-11-26 02:12:59
这是它完成工作的另一种方式:
@Bean
public IntegrationFlow subscribersFlow(MessageTransformer messageTransformer, MessageTransformer2 messageTransformer2) {
return IntegrationFlows
.from("inputChannel")
.publishSubscribeChannel(s -> s
.subscribe(f -> f
.transform(messageTransformer)
.handle( this.kafkaMessageHandler() )
)
.subscribe(f -> f
.transform(messageTransformer2)
.handle( this.kafkaMessageHandler() )
)
)
.get();
}https://stackoverflow.com/questions/59041379
复制相似问题