我有一个DirectChannel(CHANNEL-ABC),多个生产者向它发送消息,而该通道只有一个使用者。
我的应用程序轮询其中一个文件夹中的文件,每个文件调用CHANNEL-1。
生产者如下.
IntegrationFlows.from("CHANNEL-1")
.handle(
...
logger.info("CHANNEL-1 flow started")
...
)
.split()
.handle(
...
)
.aggregate(...)
.handle(
...
logger.info("Aggregated")
)
.channel("CHANNEL-ABC")
.handle(
...
logger.info("CHANNEL-1 flow ended")
...
)
.get();
IntegrationFlows.from("CHANNEL-2")
.handle(
...
logger.info("CHANNEL-2 flow started")
...
)
.handle(
...
)
.channel("CHANNEL-ABC")
.handle(
...
logger.info("CHANNEL-2 flow ended")
...
)
.get();
IntegrationFlows.from("CHANNEL-3")
.handle(
...
logger.info("CHANNEL-3 flow started")
...
)
.channel("CHANNEL-ABC")
.handle(
...
logger.info("CHANNEL-3 flow ended")
...
)
.get();注:所有的CHANNEL-1,CHANNEL-2和CHANNEL-3都是DirectChannel。
使用者如下.
IntegrationFlows.from("CHANNEL-ABC")
.handle(
...
logger.info("CHANNEL-ABC flow started")
...
)
.handle(....)
.handle(
...
logger.info("CHANNEL-3 flow ended")
...
)
.get();我在轮询文件夹中放置了多个文件,并观察到了如下日志。
文件-1 //CHANNEL-ABC called but route after channel calling not executed
CHANNEL-1 flow started
Aggregated
CHANNEL-ABC flow started
CHANNEL-ABC flow ended用于文件-2 //CHANNEL-ABC flow not executed and routed to another rout after that
CHANNEL-1 flow started
Aggregated
CHANNEL-2 flow ended文件-3 //CHANNEL-ABC flow not executed but remaining flow does get executed
CHANNEL-1 flow started
Aggregated
CHANNEL-1 flow ended用于文件-4 //CHANNEL-ABC flow not executed also remaining flow doesn't get executed
CHANNEL-1 flow started
Aggregated有时它跳过调用DirectChannel,有时它调用,但将流返回到其他信道。我希望在单线程中执行我的流,如果不是DirectChannle,那么对于上述场景,最佳的通道选择是什么?而且上面提到的行为是预期的行为,如果是的话,我怎样才能阻止它呢?
更新:我添加.log只是为了解释,现在我已经更新了流以及上面观察到的结果。
发布于 2022-05-26 15:01:19
您刚刚在流结束时遇到了log()操作符的问题。
请参阅docs中的更多内容:https://docs.spring.io/spring-integration/docs/current/reference/html/dsl.html#java-dsl-log
当在流的末尾使用这个操作符时,它是一个单向处理程序,并且流结束。
这意味着您的CHANNEL-ABC将有多个订阅者,所有这些配置都会导致DirectChannel:https://docs.spring.io/spring-integration/docs/current/reference/html/core.html#channel-implementations-directchannel的默认循环分发策略。
有关更多细节,请参见GH问题:https://github.com/spring-projects/spring-integration/issues/3615
要解决这个问题,只需删除流中的所有.log()操作符即可。CHANNEL-ABC流中的一个很好。
更新
接收方列表路由器方法:
.routeToRecipients(r -> r
.recipient("CHANNEL-ABC")
.recipient("CHANNEL-3-POST-PROCESS"))
.channel("CHANNEL-3-POST-PROCESS")https://stackoverflow.com/questions/72390404
复制相似问题