首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring在处理海量信息时丢失消息

spring在处理海量信息时丢失消息
EN

Stack Overflow用户
提问于 2016-03-04 04:04:00
回答 1查看 438关注 0票数 1

我使用spring,我的流如下所示,并在3个节点容器上运行测试,其中一个是以兔子为传输对象的管理节点。

代码语言:javascript
复制
aws-s3|processor1|http-client|processor2>queue:readyQueue

我在下面创建了tap。

代码语言:javascript
复制
tap1  aws-s3>s3Queue


tap2  processor1>processorQueue1

tap3  http-client>httpQueue

我在测试中运行以下场景:

Scenario1:200k的5个文件=100万条记录-客户端=70和processor2=30的并发性

我看到了900 K的消息s3Queue

我看到了889k消息processorQueue1

我看到了886 k消息httpQueue

我看到883 K消息processorQueue2消息到处都丢失了,而且它是随机的

代码语言:javascript
复制
Scenario2:

5个200k文件=100万条记录和所有模块concurrency=1

我看到了998800条消息s3Queue

我看到了998760条消息processorQueue1

我看到了997540条消息httpQueue

我看到了997530条消息processorQueue2

即使这个数字也是随机的,而且不一致。

代码语言:javascript
复制
Scenario3

我更改了以下的流和concurrency=1以及200k =100万记录的5个文件

代码语言:javascript
复制
aws-s3 >testQueue

我收到了我所有的消息,我运行了3次,没有问题,我收到了我所有的100万条信息

代码语言:javascript
复制
scenario4

我更改了以下流和concurrency=1 5文件(200 k=100万条记录)

代码语言:javascript
复制
aws-s3 |processor1 >testQueue2

我收到了我所有的消息,我运行了3次,没有问题,我收到了我所有的100万条信息

在scenario4和scenarion 3中,数据摄入速度更快,处理速度更快5分钟,在兔子传输队列中吞食速度更快,例如每秒5k味精。

在场景1中,数据摄入速度较慢,甚至s3模块也在以每秒300至1000 msg的速度将数据拖慢。

在场景2中,s3提取数据的速度更快,但http客户端的速度大约为每秒100 msg,而aws-s3则以每秒3-4k msg的速度快速提取数据。

我在想,就像看到xd线程正在引发问题一样,我正在失去messages.Please,您能帮我解决这个问题吗?

更新

代码语言:javascript
复制
Scenario 5 

我在http客户机中将reply-timeout更改为-1,然后只丢失了37条msg。

现在,我再次运行第二次迭代,我丢失了25000毫希,当这发生时,我看到了咆哮的容器日志。

代码语言:javascript
复制
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR task-scheduler-7 handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint@b6700b1]; nested exception is org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
        at org.springframework.xd.dirt.integration.rabbit.RabbitMessageBus$SendingHandler.handleMessageInternal(RabbitMessageBus.java:891)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
        at org.springframework.integration.channel.interceptor.WireTap.preSend(WireTap.java:129)
        at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:392)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:282)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
        at sun.reflect.GeneratedMethodAccessor204.invoke(Unknown Source)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:317)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:190)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
        at org.springframework.integration.monitor.DirectChannelMetrics.monitorSend(DirectChannelMetrics.java:114)
        at org.springframework.integration.monitor.DirectChannelMetrics.doInvoke(DirectChannelMetrics.java:98)
        at org.springframework.integration.monitor.DirectChannelMetrics.invoke(DirectChannelMetrics.java:92)
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
        at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:207)
        at com.sun.proxy.$Proxy1537.send(Unknown Source)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:95)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154)
        at org.springframework.integration.splitter.AbstractMessageSplitter.produceOutput(AbstractMessageSplitter.java:157)
        at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105)

Caused by: org.springframework.amqp.AmqpIOException: java.io.IOException
        at org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:63)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:51)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:758)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$300(CachingConnectionFactory.java:747)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:419)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:395)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:364)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:357)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1100(CachingConnectionFactory.java:75)
        at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:763)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils$1.createChannel(ConnectionFactoryUtils.java:85)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.doGetTransactionalResourceHolder(ConnectionFactoryUtils.java:134)
        at org.springframework.amqp.rabbit.connection.ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactoryUtils.java:67)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1035)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1028)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:540)
        at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:635)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.send(AmqpOutboundEndpoint.java:331)
        at org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint.handleRequestMessage(AmqpOutboundEndpoint.java:323)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
        ... 93 more
Caused by: java.io.IOException
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:106)
        at com.rabbitmq.client.impl.AMQChannel.wrap(AMQChannel.java:102)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:124)
        at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:125)
        at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:134)
        at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:499)
        at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:44)
        ... 112 more
Caused by: com.rabbitmq.client.ShutdownSignalException: connection error
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:348)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:221)
        at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:118)
        ... 116 more
Caused by: com.rabbitmq.client.impl.UnknownChannelException: Unknown channel number 23364
        at com.rabbitmq.client.impl.ChannelManager.getChannel(ChannelManager.java:80)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:552)
        ... 1 more

2016-03-04T03:42:05-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)


2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:53:13-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:5672 connection.CachingConnectionFactory - Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue 'xdbus.tap-s3.tap:stream:stream.batch-aws-s3-source.0' in vhost '/', class-id=50, method-id=20)
~                                                                                                                                                                                 


2016-03-04T02:57:54-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T02:57:55-0500 1.2.1.RELEASE ERROR AMQP Connection xxx:8080 connection.CachingConnectionFactory - Channel shutdown: connection error
2016-03-04T03:42:04-0500 1.2.1.RELEASE ERROR AMQP Connection yyy:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

已更新

当这个异常发生时,我发现了消息丢失的问题,我看到了很多msg lost.This模式,我测试了多个time.Everytime,这个异常发生了,我看到msg lost.Also出现并发,这使得这个问题经常发生。

代码语言:javascript
复制
2016-03-05T13:59:41-0500 1.2.1.RELEASE ERROR AMQP Connection host1:5672 connection.CachingConnectionFactory - Channel shutdown: connection error

兔形

代码语言:javascript
复制
spring:
  rabbitmq:
   addresses: host1:5672,host2:5672,host3:5672
   adminAddresses: http://host1:15672,http://host2:15672,http://host3:15672
   nodes: rabbit@host1.test.com,rabbit@host2.test.com,rabbit@host2.test.com
   username: test
   password: test
   virtual_host: /
   useSSL: false
   sslProperties:

随着缓存大小增加到200而更新

我添加了您提供的xml,并将缓存大小增加到200。这是在处理100万和80k messages.Only时发生的情况,我的http客户机并发性是100,其他的是1 .Slowly处理停止的msg仍然存在,在我指定的通道中的http-客户机队列和相同的count.But msg计数以每分钟10 msg的速度缓慢增长,但它的.Slowly非常慢。

Msg在http 186174之前不会在队列中衰老,但是msg正在慢慢地进入batchCacheQueue。

模拟测试用例:

1)我使用了spring集成aws-s3源,其中包含一个复合模块的分配器,如xml解析、具有并发性的http-client、并发性100 >命名通道。

2)我认为文件源也可能会work.Create百万条记录的单个文件,并尝试从文件中提取这个文件。

3)在大约4到5次运行后,我们看到这个异常正在发生

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-03-05 20:20:15

原因: com.rabbitmq.client.impl.UnknownChannelException:未知信道编号23364

我们发现了一个问题:当通道经常被搅动时,您需要在兔子缓存连接工厂中增加通道缓存大小。

这是一个解决问题的方法

我打开了一个JIRA问题,这样下一个版本的Spring将在servers.yml中公开这个设置,这样您就不必覆盖总线配置文件。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/35787785

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档