首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >:Splitter异常导致后续消息中止

:Splitter异常导致后续消息中止
EN

Stack Overflow用户
提问于 2014-11-12 17:30:09
回答 2查看 1.8K关注 0票数 0

我有以下配置,在这里,我试图处理由拆分器拆分的消息列表。我面临的问题是,单个消息处理中的异常会导致所有后续消息都不处理。我做错什么了吗?

代码语言:javascript
复制
<int:chain input-channel="exceptionTestChannel">
  <int:splitter/>
  <int:header-enricher>
    <int:error-channel ref="myErrorChannel"/>
  </int:header-enricher>
  <int:service-activator id="testExceptionService"
    ref="testExceptionService" method="throwException"></int:service-activator>
  <int:aggregator></int:aggregator>
</int:chain>

<int:channel id="myErrorChannel">
  <int:interceptors>
    <int:wire-tap channel="loggerChannel" />
  </int:interceptors>
</int:channel>

<int:channel id="exceptionTestChannel">
  <int:interceptors>
    <int:wire-tap channel="loggerChannel" />
  </int:interceptors>
</int:channel>


<int:transformer ref="errorUnwrapper" input-channel="myErrorChannel"  />

例外情况如下

代码语言:javascript
复制
2014-11-12 09:56:30,076 ERROR [com.test.transform.ErrorUnwrapper] - [Payload=org.springframework.integration.MessageHandlingException: com.test.TestServiceException: System Error. Trial test exception][Headers={timestamp=1415814990076, id=a2f50a6d-4de6-3785-e8d8-a5ef9f46c27f}]
org.springframework.integration.MessageHandlingException: com.test.TestServiceException: System Error. Trial test exception
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:76)
    at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:67)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
    at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:148)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
    at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendMessage(AbstractReplyProducingMessageHandler.java:228)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:212)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:167)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:131)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:178)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:149)
    at org.springframework.integration.core.MessagingTemplate.doSend(MessagingTemplate.java:330)
    at org.springframework.integration.core.MessagingTemplate.send(MessagingTemplate.java:169)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:97)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:199)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:143)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:141)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:273)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:452)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:314)
    at java.util.concurrent.FutureTask.run(FutureTask.java:149)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:109)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:218)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:883)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:919)
    at java.lang.Thread.run(Thread.java:736)
Caused by: com.test.TestServiceException: System Error. Trial test exception
    at com.test.TestServiceException.getSystemErrorInstance(TestServiceException.java:31)
    at com.test.TestExceptionService.throwException(TestExceptionService.java:13)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:60)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:37)
    at java.lang.reflect.Method.invoke(Method.java:611)
    at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:69)
    at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:122)
    at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:44)
    at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:258)
    at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:82)
    at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:102)
    at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:103)
    at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:144)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:268)
    at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142)
    at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:73)
    ... 50 more
2014-11-12 09:56:30,076 WARN [org.springframework.integration.channel.MessagePublishingErrorHandler] - Error message was not delivered.
org.springframework.integration.support.channel.ChannelResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.sendReplyMessage(AbstractReplyProducingMessageHandler.java:218)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.produceReply(AbstractReplyProducingMessageHandler.java:177)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleResult(AbstractReplyProducingMessageHandler.java:171)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:149)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:102)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:178)
    at org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:83)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:49)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:49)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:268)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:53)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:452)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:314)
    at java.util.concurrent.FutureTask.run(FutureTask.java:149)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:109)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:218)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:883)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:919)
    at java.lang.Thread.run(Thread.java:736)
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2014-11-12 18:22:53

单个消息处理中的异常将导致所有后续消息不处理。

您应该同意这是任何单线程进程的正常行为:当异常在某个地方引起时,所有其他代码都不会被调用。

简单的split看起来像for (Object o : collection)

要实现您的需求,您需要使用ExecutorChannel (或QueueChannel)作为<splitter>output-channel

当然,在这种情况下,您应该将splitter拉出<chain>之外。

通过这种线程转换,您的消息不会相互影响。

票数 0
EN

Stack Overflow用户

发布于 2014-11-12 20:23:09

@Artem是正确的,但另一种选择是,如果您希望在同一个线程上运行所有东西,请插入一个中间流网关。

只是增加一个错误通道头是行不通的.

代码语言:javascript
复制
<chain>
    <splitter />
    <service-activator ref="gw" />
<chain>

<gateway id="gw" request-channel="foo" error-channel="errors" 
      reply-timeout="0" />

<logging-channel-adapter channel="errors" /> 

<chain input-channel="foo">
    <int:service-activator id="testExceptionService"
        ref="testExceptionService" method="throwException"></int:service-activator>
    <int:aggregator></int:aggregator>
</chain>

然而,删除这样的拆分将导致默认聚合器永远不会完成组,因此您需要一个自定义发布策略,即使缺少了分区,也会释放组。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/26892941

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档