我同时启动了生产者和消费者。6小时后,生产者产生了大约6条crores消息进入队列,并在6小时后停止了生产者,但消费者仍在连续运行,即使在运行18小时后,仍有4条crores消息在队列中。谁能让我知道为什么消费者的表现很慢?
提前感谢!
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(this.queueName);
container.setMessageListener(new MessageListenerAdapter(new TestMessageHandler(), new JsonMessageConverter()));
return container;
}
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(
"localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(new JsonMessageConverter());
template.setRoutingKey(this.queueName);
template.setQueue(this.queueName);
return template;
}
public class TestMessageHandler {
// receive messages
public void handleMessage(MessageBeanTest msgBean) {
// Storing bean data into CSV file
}
}发布于 2016-08-11 02:37:30
根据Gary的建议,您可以按如下方式设置它们。查看@RabbitListener
@Bean
public SimpleRabbitListenerContainerFactory listenerContainer( {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(baseConfig.connectionFactory());
factory.setConcurrentConsumers(7); // choose a value
factory.setPrefetchCount(1); // how many messages per consumer at a time
factory.setMaxConcurrentConsumers(10); // choose a value
factory.setDefaultRequeueRejected(false); // if you want to deadletter
return factory;
}发布于 2013-09-11 23:03:03
根据WikiPedia的数据,crore ==是1000万,所以你的意思是6000万。
容器处理消息的速度只能和侦听器一样快--您需要分析您对每条消息所做的操作。
您还需要尝试容器并发设置(concurrentConsumers)、预取等,以获得最佳性能,但最终它仍然是您的侦听器,它占用了大部分处理时间;容器的开销非常小。如果您的侦听器构造得不好,那么增加并发数将没有任何帮助。
如果您使用的是事务,这将显著降低消费速度。
尝试使用不处理消息的侦听器。
最后,在提出这样的问题时,您应该始终显示配置。
https://stackoverflow.com/questions/18732134
复制相似问题