我试图使用与DSL和lambda实现以下功能:
给出一条消息,将其发送给N消费者(通过publish-subscribe)。等待有限的时间,并返回在此间隔内到达表单使用者(<= N)的所有结果。
下面是我到目前为止拥有的一个示例配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class ExampleConfiguration {
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedRate(1000).maxMessagesPerPoll(1).get();
}
@Bean
public MessageChannel publishSubscribeChannel() {
return MessageChannels.publishSubscribe(splitterExecutorService()).applySequence(true).get();
}
@Bean
public ThreadPoolTaskExecutor splitterExecutorService() {
final ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();
executorService.setCorePoolSize(3);
executorService.setMaxPoolSize(10);
return executorService;
}
@Bean
public DirectChannel errorChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel requestChannel() {
return new DirectChannel();
}
@Bean
public DirectChannel channel1() {
return new DirectChannel();
}
@Bean
public DirectChannel channel2() {
return new DirectChannel();
}
@Bean
public DirectChannel collectorChannel() {
return new DirectChannel();
}
@Bean
public TransformerChannel1 transformerChannel1() {
return new TransformerChannel1();
}
@Bean
public TransformerChannel2 transformerChannel2() {
return new TransformerChannel2();
}
@Bean
public IntegrationFlow errorFlow() {
return IntegrationFlows.from(errorChannel())
.handle(m -> System.err.println("[" + Thread.currentThread().getName() + "] " + m.getPayload()))
.get();
}
@Bean
public IntegrationFlow channel1Flow() {
return IntegrationFlows.from(publishSubscribeChannel())
.transform("1: "::concat)
.transform(transformerChannel1())
.channel(collectorChannel())
.get();
}
@Bean
public IntegrationFlow channel2Flow() {
return IntegrationFlows.from(publishSubscribeChannel())
.transform("2: "::concat)
.transform(transformerChannel2())
.channel(collectorChannel())
.get();
}
@Bean
public IntegrationFlow splitterFlow() {
return IntegrationFlows.from(requestChannel())
.channel(publishSubscribeChannel())
.get();
}
@Bean
public IntegrationFlow collectorFlow() {
return IntegrationFlows.from(collectorChannel())
.resequence(r -> r.releasePartialSequences(true),
null)
.aggregate(a ->
a.sendPartialResultOnExpiry(true)
.groupTimeout(500)
, null)
.get();
}
}TransformerChannel1和TransformerChannel2是示例消费者,它们只是通过睡眠来模拟延迟。
消息流是:
splitterFlow -> channel1Flow \
-> channel2Flow / -> collectorFlow一切似乎都如期而至,但我看到的警告如下:
已收到回复消息,但接收线程已收到答复。
这是意料之中的,考虑到返回了部分结果。
问题:
errorChannel,但不确定在哪里指定。发布于 2015-03-11 20:59:51
是的,解决方案看起来不错。我想它适合Scatter-Gather模式。该实现是从版本4.1提供的。
另一方面,aggregator还有更多的选项,因为该版本也是- expire-groups-upon-timeout,默认情况下它是聚合器的true。使用此选项作为false,您将能够实现丢弃所有这些延迟消息的要求。不幸的是,DSL不支持它现在还没有。因此,即使您将项目升级到使用SpringIntegration4.1,也于事无补。
对于那些“收到回复消息但接收线程已经收到答复”的spring.integraton.messagingTemplate.throwExceptionOnLateReply = true选项,另一个选项是在jar的一个jar的spring.integration.properties文件中使用META-INF文件。
无论如何,我认为Scatter-Gather是用例的最佳解决方案。您可以从这里找到如何从JavaConfig中配置它。
更新
异常和错误通道呢?
既然您已经与throwExceptionOnLateReply达成了协议,那么我想您将通过@MessagingGateway向requestChannel发送一条消息。最后一个选项是errorChannel。另一方面,PublishSubscribeChannel有errorHandler选项,您可以将MessagePublishingErrorHandler与您的errorChannel一起用作默认选项。
顺便说一句,别忘了框架为errorChannel提供了endpoint bean和LoggingHandler的endpoint。所以,请想一想,如果你真的需要覆盖那些东西。默认的errorChannel是PublishSubscribeChannel,因此您可以简单地向它添加自己的订阅服务器。
https://stackoverflow.com/questions/28996232
复制相似问题