首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring集成+向kafka发送2个事件

Spring集成+向kafka发送2个事件
EN

Stack Overflow用户
提问于 2019-11-25 23:03:54
回答 2查看 105关注 0票数 0

我有一个消息网关,它在输入通道接收http Json请求。

我想发送两个事件给卡夫卡作为部分相同的请求,一个事件的接收和处理一个事件。

用实现这一目标的最佳方法是什么?

我就是这样做的,但不确定是否有更好的方法:

代码语言:javascript
复制
@Bean
public IntegrationFlow processMessage() {
    return IntegrationFlows
            .from("inputChannel")
            .routeToRecipients(r -> r.recipient("inputChannel2")
                                    .recipient("inputChannel3"))
            .get();
}

@Bean
public IntegrationFlow sendReceived(MessageTransformer messageTransformer) {
    return IntegrationFlows
            .from("inputChannel2")
            .transform(messageTransformer)
            .handle( this.kafkaMessageHandler() )
            .get();
}

@Bean
public IntegrationFlow sendProcessed(MessageTransformer2 messageTransformer) {
    return IntegrationFlows
            .from("inputChannel3")
            .transform(messageTransformer)
            .handle( this.kafkaMessageHandler() )
            .get();
}

谢谢。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-11-25 23:39:22

好吧,你做的一切都没问题。实现相同目标的另一种方法是使用poublishSubscribeChannel来代替:

https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-publishsubscribechannel

https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-subflows

此外,您还可以考虑为Kafka消息处理程序配置一个单独的配置,并从这两个分发流中为其重用一个通道。

票数 0
EN

Stack Overflow用户

发布于 2019-11-26 02:12:59

这是它完成工作的另一种方式:

代码语言:javascript
复制
@Bean
public IntegrationFlow subscribersFlow(MessageTransformer messageTransformer, MessageTransformer2 messageTransformer2) {
    return IntegrationFlows
            .from("inputChannel")
            .publishSubscribeChannel(s -> s
                    .subscribe(f -> f
                            .transform(messageTransformer)
                            .handle(  this.kafkaMessageHandler() )
                    )
                    .subscribe(f -> f
                            .transform(messageTransformer2)
                            .handle( this.kafkaMessageHandler() )
                    )
            )
            .get();
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59041379

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档