我们的应用程序是使用Spring集成框架设计的。完整的消息操作流程从侦听队列开始,在该队列中使用了JMS消息驱动适配器,然后定义了基于通道的队列端点,并且每个端点都由Service-Activator处理。
我们目前正在进入性能阶段,我们正在产生200个消息请求。最初,我们观察到消息不是并行执行的,在做了一些阅读之后,我们发现通过向JMS消息驱动侦听器适配器添加并发消费者和最大并发消费者属性,将有助于启用多线程模式。确实,这很有帮助,但我仍然看到了单线程效果。这是由于端点的定义方式造成的吗?向每个端点添加队列容量的优势是什么?您是否认为将队列容量添加到每个通道端点定义将再次有助于在多线程模式下运行。
请求的设计快照:

发布于 2012-06-27 04:28:24
查看您的频道的确切定义会很有帮助。
默认情况下,Spring通道在发送者的线程中使用其消息。换句话说,它是同步的。如果希望通道异步使用消息,则必须指定一个TaskExecutor。请参阅http://static.springsource.org/spring/docs/3.0.5.RELEASE/reference/scheduling.html
发布于 2012-06-27 09:00:02
看看你的流程图,它看起来像是流有很多单线程的元素,并且可以优化为更多的并发,希望有更高的吞吐量。
首先,使用消息驱动通道适配器(您还没有显示该适配器的配置),可以将其配置为具有多个默认消费者,并且可以使其在每个消费周期中使用合理数量的消息。
通过消息驱动通道适配器,将消息放入直接通道1的线程将不幸地运行流的其余部分,因为在其他任何地方都没有缓冲,因此当您的消息被放入“直接通道1”时,它将立即调用同一线程中的路由器,然后调用同一线程中的服务激活器和邮件适配器或JMS出站通道适配器。这里的变化可能是引入了队列通道,而不是直接通道1,这样消费消息的线程只是将消息放在队列通道中,然后就可以使用它了。
除了直接通道1(更改为队列通道1)之外,我认为它可以是单线程的,这取决于您的流有多快或有多慢,如果说邮件适配器很慢,那么直接通道4可以成为队列通道,也可以是,与直接通道5相同
您是否可以查看这些更改是否以粗体突出显示,以帮助改善流程
发布于 2016-03-12 06:32:46
为了提高性能,我建议使用带有任务执行器的executorchannel,在其中控制线程池大小的数量。在这种情况下,当消息到达jms队列时,使用者获取消息,并在单独的线程中处理流。请记住,在这种配置中,多线程工作是由任务执行器通道执行的,它将在一个单独的线程中执行消息的接收。因此,您必须仔细考虑您想要的多线程级别。
对于队列消息通道,您确实需要一个轮询器,它在通道上轮询以执行接收,队列容量是幕后原子队列的容量。
您可以在xml中以这种方式配置am executor通道。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:si="http://www.springframework.org/schema/integration"
xmlns:tx="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<tx:executor id="taskExecutor" pool-size="10" queue-capacity="10"/>
<si:channel id="ch" >
<si:dispatcher task-executor="taskExecutor"/>
</si:channel>
</beans>或者以这种方式在java-dsl中使用。
@Bean
public IntegrationFlow storeBookPageByPage(ConnectionFactory connectionFactory,
@Qualifier("createBookQueue") Destination createBookQueue,
@Qualifier("createBookResultQueue") Destination createBookResultQueue,
PdfBookMasterRepository pdfBookMasterRepository,
BookRepository bookRepository){
String tempFilePathBaseDir = environment.getProperty("bookService.storeBookPageByPage.tempFilePathBaseDir");
return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory)
.destination(createBookQueue)
.errorChannel(storeBookPageByPageErrorChannel()))
.channel(channels -> channels.executor(Executors.newScheduledThreadPool(5)))
.....
}https://stackoverflow.com/questions/11183940
复制相似问题