为了让这件事成功,我挣扎了几天。我想要完成的是调用不同的子流(即集成流),从主流开始,基于消息内容,在子流完成后返回到主流。它同样授权给一个特定的类来完成某件事情并返回主流。这种责任也可能需要一些步骤,因此它也可以作为流来实现。以下是我的主要流程:
public IntegrationFlow processingFlow(
MessageChannel eventIn,
MessageChannel eventOut,
ChangedEventsLoader changedEventsLoader,
CalculatorRouter calculatorRouter) {
return IntegrationFlows.from(eventIn)
.handle(changedEventsLoader)
.route(
CalculatorRouter::getSportId,
CalculatorRouter::routeCalculation)
.channel(eventOut)
.get();}
以下是路由器的实现:
@Service
@AllArgsConstructor
public class CalculatorRouter {
private final MessageChannel eventOut;
public RouterSpec<Integer, MethodInvokingRouter> routeCalculation(
RouterSpec<Integer, MethodInvokingRouter> mapping) {
return mapping
.channelMapping(1, "subflowCalculationChannel")
.defaultOutputToParentFlow();
}
public Integer getSportId(Event event) {
return 1;
}
@Bean
public MessageChannel subflowCalculationChannel() {
return MessageChannels.direct().get();
}
}下面是一个子流的例子:
@Configuration
@AllArgsConstructor
public class CalculatorExample {
@Bean
public IntegrationFlow calculateProbabilities(MessageChannel subflowCalculationChannel) {
return IntegrationFlows.from(subflowCalculationChannel)
.<Event>handle((p, m) -> p * 2)
.get();
}
}问题是,子流与主流缺少某种连接。我试图通过在路由部分使用defaultOutputToParentFlow()来解决这个问题,但这还不够。
发布于 2018-10-04 12:32:11
从某些版本开始,我们决定将Java路由器行为与使用注释或XML的标准配置相一致。因此,如果我们发送到路由器,我们不能期望从那里的答复。我们只能继续使用通道作为子流的输出。
在您的示例中,主流中有一个.channel(eventOut)。因此,您的所有路由子流都应该准确地响应此通道:
.<Event>handle((p, m) -> corners1H2HCustomBet.getCalculation(p))
.channel(eventOut)
.get();我认为.defaultOutputToParentFlow();没有为您提供任何东西,因为您没有默认的映射。这已经有点不同了:它对其他映射没有任何影响。
还要注意这个JavaDoc:
/**
* Add a subflow as an alternative to a {@link #channelMapping(Object, String)}.
* {@link #prefix(String)} and {@link #suffix(String)} cannot be used when subflow
* mappings are used.
* <p> If subflow should refer to the external {@link IntegrationFlow} bean and
* there is a requirement to expect reply from there, such a reference should be
* wrapped with a {@code .gateway()}:
* <pre class="code">
* {@code
* .subFlowMapping(false, sf -> sf.gateway(evenFlow())))
* }
* </pre>
* @param key the key.
* @param subFlow the subFlow.
* @return the router spec.
*/
public RouterSpec<K, R> subFlowMapping(K key, IntegrationFlow subFlow) {与您的基于信道的路由配置无关,但可能在将来有用.
更新
下面是subFlowMapping的一个示例(Kotlin)并返回到主流:
@Bean
fun splitRouteAggregate() =
IntegrationFlow { f ->
f.split()
.route<Int, Boolean>({ o -> o % 2 == 0 },
{ m ->
m.subFlowMapping(true) { sf -> sf.gateway(oddFlow()) }
.subFlowMapping(false) { sf -> sf.gateway(evenFlow()) }
})
.aggregate()
}
@Bean
fun oddFlow() =
IntegrationFlow { flow ->
flow.handle<Any> { _, _ -> "odd" }
}
@Bean
fun evenFlow() =
IntegrationFlow { flow ->
flow.handle<Any> { _, _ -> "even" }
}https://stackoverflow.com/questions/52646564
复制相似问题