我有一个例子,我想在同一个‘主’线程中运行DefaultMessageListenerContainer。现在,它使用SimpleAsyncTaskExecutor,每次收到消息时都会生成新线程。
我们有一个测试用例,它连接到不同的分布式系统,并进行处理,最后它断言了很少的东西。当DefaultMessageListenerContainer在单独的线程中运行时,主线程返回并开始执行断言,然后DefaultMessageListenerContainer才能完成。这将导致测试用例的失败。随着工作的进行,我们已经使主线程睡了几秒钟。
样例配置
<int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="defaultMessageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />
<beans:bean id="defaultMessageListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="taskExecutor" ref="taskExecutor"/>
<beans:property name="maxMessagesPerTask" value="10"/>
<beans:property name="sessionTransacted" value="true"/>
</beans:bean>
<beans:bean id="taskExecutor" class="org.springframework.scheduling.timer.TimerTaskExecutor" />我在这里尝试使用TimerTaskExecutor,因为它会创建单个线程,但是该线程与主线程是分开的,所以问题没有得到解决。我试过使用SyncTaskExecutor,但这也不起作用(或者可能是我提供了正确的属性值?)。
答案:
我们用SimpleMessageListenerContainer解决了这个问题。这是新的配置
<int-jms:message-driven-channel-adapter
id="mq.txbus.publisher.channel.adapter"
container="messageListenerContainer"
channel="inbound.endpoint.publisher"
acknowledge="transacted"
extract-payload="true" />
<beans:bean id="messageListenerContainer" class="org.springframework.jms.listener.SimpleMessageListenerContainer">
<beans:property name="connectionFactory" ref="mockConnectionFactory"/>
<beans:property name="destination" ref="publisherToTxmQueue"/>
<beans:property name="sessionTransacted" value="true"/>
<beans:property name="exposeListenerSession" value="false"/>
</beans:bean> 发布于 2012-05-07 16:26:26
首先,您必须理解jms本质上是异步的,不阻塞。这意味着,一旦您向队列发送消息,它将由另一个线程处理,可能在另一台机器上处理,如果使用者停机,可能几分钟或数小时后。
阅读您的测试用例的描述,看起来您正在进行一些系统/集成测试。不幸的是,除了等待之外,您没有什么可以做的,但是您不应该盲目地等待,因为这会使您的测试变得缓慢,但也不会非常稳定--无论您等待了多长时间,在繁忙的系统上,还是在某个冗长的GC过程中,即使没有错误,您的测试也可能会超时。
所以,不要睡上固定的秒--例如,睡眠100毫秒,检查一些只有在消息处理完成时才能满足的条件。例如,如果处理消息,将某些记录插入数据库,则定期检查数据库。
更优雅的方法(无需等待)是实现请求/修复模式,有关实现细节,请参阅http://activemq.apache.org/how-should-i-implement-request-response-with-jms.html。基本上,在发送消息时,您定义了一个应答队列,并阻止等待该队列中的消息。在处理原始消息时,使用者应该向定义的队列发送回复消息。当您收到该消息时-执行所有断言。
发布于 2012-05-08 19:58:34
如果是出于测试目的,那么为什么不使用一个在jms活动完成后运行断言的CyclicBarrier。
https://stackoverflow.com/questions/10479547
复制相似问题