我有一个包含分片队列的rabbit集群(3个节点)。每个分片驻留在不同的rabbit代理节点上。我使用spring rabbit模块来使用分片队列中的消息,但是,使用者总是从单个rabbit节点(因此是分片)连接(并使用)。
我已经将缓存模式设置为“connection”,所以可以打开多个连接,我使用spring.rabbitmq.addresses属性来传递spring连接工厂的多个地址,但仍然可以获得单个节点(列表中的第一个)的连接。
这是我的spring配置:
@Bean
DirectRabbitListenerContainerFactory directRabbitListenerContainerFactory(
DirectRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory,
ConditionalRejectingErrorHandler errorHandler) {
DirectRabbitListenerContainerFactory factory = new DirectRabbitListenerContainerFactory();
factory.setErrorHandler(errorHandler);
Advice[] adviceChain = { retryInterceptor() };
factory.setAdviceChain(adviceChain);
configurer.configure(factory, connectionFactory);
return factory;
}
@Bean
public RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(retriesOnError)
.build();
}这是我的application.yaml:
spring:
rabbitmq:
addresses: node1:5672,node2:5672,node3:5672
username: ${RABBITMQ_USERNAME}
password: ${RABBITMQ_PASSWORD}
listener:
cache:
connection:
mode: connection
type: direct
direct:
acknowledge-mode: auto
consumers-per-queue: ${RABBITMQ_CONSUMERS_PER_QUEUE}这是我的兔子听众:
@Component
@RequiredArgsConstructor
@Slf4j
public class RabbitMQConsumer {
private final MessageAction postProcessAction;
private final ComponentService componentService;
private final Coercion coercion;
@RabbitListener(queues = "${spring.rabbitmq.queueName}", containerFactory = "directRabbitListenerContainerFactory")
public void listen(@Payload MyObject in) {
///do something
}任何帮助都将不胜感激。
发布于 2020-08-17 20:54:28
您可以在rabbitmq-users Google Group上询问,但我不认为这是Spring的限制,而是amqp-client本身。
根据我的快速阅读,我的理解是您从“伪队列”消费,而不是从单个分片消费。所以只有一个消费者,而不是3个。
您可以配置3个连接工厂、3个侦听器容器工厂,并向该方法添加3个@RabbitListener。
编辑
另请参阅:
/**
* When {@link #setAddresses(String) addresses} are provided and there is more than
* one, set to true to shuffle the list before opening a new connection so that the
* connection to the broker will be attempted in random order.
* @param shuffleAddresses true to shuffle the list.
* @since 2.1.8
* @see Collections#shuffle(List)
*/
public void setShuffleAddresses(boolean shuffleAddresses) {
this.shuffleAddresses = shuffleAddresses;
}在连接工厂上。再加上缓存模式连接和监听程序上的多个并发,您应该可以访问所有代理。
https://stackoverflow.com/questions/63435504
复制相似问题