首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring集成中发布订阅的时间限制聚合

Spring集成中发布订阅的时间限制聚合
EN

Stack Overflow用户
提问于 2015-03-11 20:01:42
回答 1查看 314关注 0票数 1

我试图使用与DSL和lambda实现以下功能:

给出一条消息,将其发送给N消费者(通过publish-subscribe)。等待有限的时间,并返回在此间隔内到达表单使用者(<= N)的所有结果。

下面是我到目前为止拥有的一个示例配置:

代码语言:javascript
复制
@Configuration
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
public class ExampleConfiguration {

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedRate(1000).maxMessagesPerPoll(1).get();
    }

    @Bean
    public MessageChannel publishSubscribeChannel() {
        return MessageChannels.publishSubscribe(splitterExecutorService()).applySequence(true).get();
    }

    @Bean
    public ThreadPoolTaskExecutor splitterExecutorService() {
        final ThreadPoolTaskExecutor executorService = new ThreadPoolTaskExecutor();

        executorService.setCorePoolSize(3);
        executorService.setMaxPoolSize(10);

        return executorService;
    }

    @Bean
    public DirectChannel errorChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel requestChannel() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel channel1() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel channel2() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel collectorChannel() {
        return new DirectChannel();
    }

    @Bean
    public TransformerChannel1 transformerChannel1() {
        return new TransformerChannel1();
    }

    @Bean
    public TransformerChannel2 transformerChannel2() {
        return new TransformerChannel2();
    }

    @Bean
    public IntegrationFlow errorFlow() {
        return IntegrationFlows.from(errorChannel())
                .handle(m -> System.err.println("[" + Thread.currentThread().getName() + "] " + m.getPayload()))
                .get();
    }

    @Bean
    public IntegrationFlow channel1Flow() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .transform("1: "::concat)
                .transform(transformerChannel1())
                .channel(collectorChannel())
                .get();
    }

    @Bean
    public IntegrationFlow channel2Flow() {
        return IntegrationFlows.from(publishSubscribeChannel())
                .transform("2: "::concat)
                .transform(transformerChannel2())
                .channel(collectorChannel())
                .get();
    }

    @Bean
    public IntegrationFlow splitterFlow() {
        return IntegrationFlows.from(requestChannel())
                .channel(publishSubscribeChannel())
                .get();
    }

    @Bean
    public IntegrationFlow collectorFlow() {
        return IntegrationFlows.from(collectorChannel())
                .resequence(r -> r.releasePartialSequences(true),
                        null)
                .aggregate(a ->
                        a.sendPartialResultOnExpiry(true)
                                .groupTimeout(500)
                        , null)
                .get();
    }

}

TransformerChannel1TransformerChannel2是示例消费者,它们只是通过睡眠来模拟延迟。

消息流是:

代码语言:javascript
复制
 splitterFlow -> channel1Flow \
              -> channel2Flow / -> collectorFlow

一切似乎都如期而至,但我看到的警告如下:

已收到回复消息,但接收线程已收到答复。

这是意料之中的,考虑到返回了部分结果。

问题:

  • 总的来说,这是一个好办法吗?
  • 什么是正确的方式来优雅地服务或丢弃那些延迟的消息?
  • 如何处理例外情况?理想情况下,我希望将它们发送到errorChannel,但不确定在哪里指定。
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-03-11 20:59:51

是的,解决方案看起来不错。我想它适合Scatter-Gather模式。该实现是从版本4.1提供的。

另一方面,aggregator还有更多的选项,因为该版本也是- expire-groups-upon-timeout,默认情况下它是聚合器的true。使用此选项作为false,您将能够实现丢弃所有这些延迟消息的要求。不幸的是,DSL不支持它现在还没有。因此,即使您将项目升级到使用SpringIntegration4.1,也于事无补。

对于那些“收到回复消息但接收线程已经收到答复”的spring.integraton.messagingTemplate.throwExceptionOnLateReply = true选项,另一个选项是在jar的一个jar的spring.integration.properties文件中使用META-INF文件。

无论如何,我认为Scatter-Gather是用例的最佳解决方案。您可以从这里找到如何从JavaConfig中配置它。

更新

异常和错误通道呢?

既然您已经与throwExceptionOnLateReply达成了协议,那么我想您将通过@MessagingGatewayrequestChannel发送一条消息。最后一个选项是errorChannel。另一方面,PublishSubscribeChannelerrorHandler选项,您可以将MessagePublishingErrorHandler与您的errorChannel一起用作默认选项。

顺便说一句,别忘了框架为errorChannel提供了endpoint bean和LoggingHandlerendpoint。所以,请想一想,如果你真的需要覆盖那些东西。默认的errorChannelPublishSubscribeChannel,因此您可以简单地向它添加自己的订阅服务器。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28996232

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档