首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用桥轮询程序和任务执行器安装时未处理的队列

使用桥轮询程序和任务执行器安装时未处理的队列
EN

Stack Overflow用户
提问于 2015-01-31 14:39:05
回答 1查看 145关注 0票数 1

我正在尝试设置一个队列,该队列将由一个轮询线程轮询,并将其内容传递给通过dispatcher & taskExecutor调用的某个服务。下面的代码是我想出的

代码语言:javascript
复制
<int:channel id="dataInQueue">
    <int:priority-queue capacity="100" />
</int:channel>

<int:bridge input-channel="dataInQueue" output-channel="dataInProcesingQueue">
    <int:poller receive-timeout="5000" fixed-rate="500" task-executor="taskScheduler" />
</int:bridge>   

<int:router input-channel="dataInProcesingQueue" expression="payload.runType.id">
    <int:mapping value="1" channel="processingQ1"/>
</int:router>   

<int:channel id="processingQ1" >
    <int:dispatcher task-executor="taskExecutor"/>
</int:channel>

<int:chain input-channel="processingQ1" output-channel="outChannel">
    <int:service-activator ref="myService"  />
</int:chain>    

<bean id="taskExecutor"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="maxPoolSize" value="20" />
        <property name="corePoolSize" value="20" />
        <property name="threadNamePrefix" value="My-TaskExecutor" />                
</bean>
<bean id="taskScheduler"
        class="org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler">
        <property name="poolSize" value="20" />
        <property name="threadNamePrefix" value="My-TaskScheduler" />
</bean> 

不幸的是这不起作用。如果我把一条消息放在队列上,我就不会看到它正在被处理。另一方面,如果我用org.springframework.core.task.SimpleAsyncTaskExecutor实现代替taskExecutor & taskScheduler,那么一切都开始工作了。看起来问题就在我的线程池配置的某个地方,但是我看不到任何错误。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2015-01-31 15:56:54

只需删除这个task-executor="taskScheduler"即可。

计票器已经在内部使用内置的taskSchedulertask-executor属性用于当您想立即向另一个线程传递时。

目前还不清楚为什么这会导致你的应用程序不工作,但它是多余的调度程序移交给自己。

只要移除它,调度器就会在路由器之后交给您的执行者。

或者,在轮询器上设置task-executor="taskExecutor",在processingQ1上删除调度器--您不需要两个切换。

编辑:

尽管如此,我刚刚尝试了你的方案,这对我来说很好(我看到了双重切换).

代码语言:javascript
复制
11:05:43.623 DEBUG [My-TaskScheduler4][org.springframework.integration.endpoint.SourcePollingChannelAdapter] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.PriorityChannel] postReceive on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.628 DEBUG [My-TaskScheduler4][org.springframework.integration.channel.PriorityChannel] postSend (sent=true) on channel 'dataInQueue', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.endpoint.PollingConsumer] Poll resulted in Message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.629 DEBUG [My-TaskScheduler15][org.springframework.integration.handler.BridgeHandler] org.springframework.integration.handler.BridgeHandler#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.630 DEBUG [My-TaskScheduler15][org.springframework.integration.channel.ExecutorChannel] postSend (sent=true) on channel 'toRabbit', message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]
11:05:43.631 DEBUG [My-TaskExecutor1][org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint] org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: GenericMessage [payload=foo, headers={id=a23d369b-a7c7-50d6-2209-6df83e51f380, timestamp=1422720343623}]

如果在我的建议之后它仍然不起作用,我建议您打开调试日志记录(包括线程名)并跟踪切换。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/28252817

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档