我正在设置spring集成流,以使用来自MQ的消息,并通过创建从MQ消息创建的请求对web服务调用进行并行调用。
下面是spring集成流的样子
1. Webservice Client to make a webservice Call (Service Activator)
2. Transformer to convert response to Entity Object
3. Handler to save data to DB.
我能够从分散收集模式中进行并行的webservice调用,但是我看不到聚集模式中发生了聚合,基本上,流不会出现在采集类中。
我尝试了发布-订阅通道,任务执行器作为分散收集模式的输入通道,并且按照日志,webservice调用与两个任务执行者并行进行,但是在webservice调用之后,它永远不会到达采集者。
<si:service-activator input-channel="transformedEntity"
ref="incidentHandler" output-channel="outputChannelFromMQ" />
<si:scatter-gather input-channel="outputChannelFromMQ"
requires-reply="false" output-channel="gatherResponseOutputChannel" gather-channel="gatherChannel" gather-timeout="4000">
<si:scatterer apply-sequence="true">
<si:recipient channel="distributionChannel1" />
<si:recipient channel="distributionChannel2" />
</si:scatterer>
</si:scatter-gather>
<si:publish-subscribe-channel id="outputChannelFromMQ" apply-sequence="true"
task-executor="taskExecutor" />
<task:executor id="taskExecutor" queue-capacity="25" pool-size="10-10" />
<si:chain id="planngedBagsChain" input-channel="distributionChannel1"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient1" method="getResponse" />
<si:service-activator ref="serviceHandler1" method="saveToDB" />
</si:chain>
<si:chain id="bagHistoryChain" input-channel="distributionChannel2"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient2" method="getResponse" />
<si:transformer ref="transformer" />
<si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>
<si:service-activator input-channel="gatherResponseOutputChannel"
ref="responseTransformer" method="receiveResponse" output-channel="toRabbitMQ" />发布于 2019-07-11 17:14:15
启用调试日志后,能够找到要发送到的所有通道。有了以上的配置,它将不会正确的采集。更改为下面的配置有效。
<si:service-activator input-channel="transformedEntity"
ref="incidentHandler" output-channel="outputChannelFromMQ" />
<si:channel id="outputChannelFromMQ"></si:channel>
<si:scatter-gather input-channel="outputChannelFromMQ"
requires-reply="false" scatter-channel="scatterInputChannel" output-channel="toRabbit"
gather-channel="gatherChannel" gather-timeout="4000">
<si:gatherer id="responseGatherer" ref="responseTransformer" release-strategy-expression="size() == 2"/>
</si:scatter-gather>
<si:publish-subscribe-channel id="scatterInputChannel" apply-sequence="true"
task-executor="taskExecutor" />
<task:executor id="taskExecutor" queue-capacity="25" pool-size="10-10" />
<si:chain id="planngedBagsChain" input-channel="scatterInputChannel"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient1" method="getResponse" />
<si:service-activator ref="serviceHandler1" method="saveToDB" />
</si:chain>
<si:chain id="bagHistoryChain" input-channel="scatterInputChannel"
output-channel="gatherChannel">
<si:service-activator ref="webServiceClient2" method="getResponse" />
<si:transformer ref="transformer" />
<si:service-activator ref="serviceHandler2" method="saveToDB" />
</si:chain>https://stackoverflow.com/questions/56961801
复制相似问题