我正在尝试设置一个队列,该队列将由一个轮询线程轮询,并将其内容传递给通过dispatcher & taskExecutor调用的某个服务。下面的代码是我想出的
<int:channel id="dataInQueue">
<int:priority-queue capacity="100" />
</int:channel>
<int:bridge input-channel="dataInQueue" output-channel="dataInProcesingQueue">
<int:poller receive-timeout="5000" fixed-rate="500" task-executor="taskScheduler" />
</int:bridge>
<int:router input-channel="dataInProcesingQueue" expression="payload.runType.id">
<int:mapping value="1" channel="processingQ1"/>
</int:router>
<int:channel id="processingQ1" >
<int:dispatcher task-executor="taskExecutor"/>
</int:channel>
<int:chain input-channel="processingQ1" output-channel="outChannel">
<int:service-activator ref="myService" />
</int:chain>
<bean id="taskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="maxPoolSize" value="20" />
<property name="corePoolSize" value="20" />
<property name="threadNamePrefix" value="My-TaskExecutor" />
</bean>
<bean id="taskScheduler"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
<property name="poolSize" value="20" />
<property name="threadNamePrefix" value="My-TaskScheduler" />
</bean> 不幸的是这不起作用。如果我把一条消息放在队列上,我就不会看到它正在被处理。另一方面,如果我用org.springframework.core.task.SimpleAsyncTaskExecutor实现代替taskExecutor & taskScheduler,那么一切都开始工作了。看起来问题就在我的线程池配置的某个地方,但是我看不到任何错误。
发布于 2015-01-31 15:56:54
只需删除这个task-executor="taskScheduler"即可。
计票器已经在内部使用内置的taskScheduler;task-executor属性用于当您想立即向另一个线程传递时。
目前还不清楚为什么这会导致你的应用程序不工作,但它是多余的调度程序移交给自己。
只要移除它,调度器就会在路由器之后交给您的执行者。
或者,在轮询器上设置task-executor="taskExecutor",在processingQ1上删除调度器--您不需要两个切换。
编辑:
尽管如此,我刚刚尝试了你的方案,这对我来说很好(我看到了双重切换).
11:05:43.623 DEBUG [My-TaskScheduler4][org.springframework.integration.endpoint.SourcePollingChannelAdapter] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.PriorityChannel] postReceive on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler4][org.springframework.integration.channel.PriorityChannel] postSend (sent=true) on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.endpoint.PollingConsumer] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.handler.BridgeHandler] org.springframework.integration.handler.BridgeHandler#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.630 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.ExecutorChannel] postSend (sent=true) on channel 'toRabbit', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.631 DEBUG [My-TaskExecutor1][org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint] org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]如果在我的建议之后它仍然不起作用,我建议您打开调试日志记录(包括线程名)并跟踪切换。
https://stackoverflow.com/questions/28252817
复制相似问题