首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spring拒绝监听器外部的消息

Spring拒绝监听器外部的消息
EN

Stack Overflow用户
提问于 2018-11-22 11:17:19
回答 2查看 1.3K关注 0票数 1

该应用程序使用java 10、spring和rabbitmq。

系统有一个死信队列,我们在其中发送一些消息(由于数据库不可用,无法按预期的方式处理它们)。

目前,数据库可用性每X秒检查一次,如果仅可用,则将消息重新队列到它们的原始队列。否则,我们什么也不做,消息停留在死信队列中。

当重新排队到原始队列时,消息可以再次返回死信队列,并看到x-死亡报头计数增加。

出于某些原因,我们希望处理带有计数>= 5(例如)的死信消息,并将其他消息重新排到死信队列中。

我需要基本密码的消息,首先检查x-死亡计数头,然后发送到原来的队列,如果计数足够大,否则重新队列中的死信队列。

我无法设法将队列重新队列为死信队列,因为基本的get未进入侦听器:抛出AmqpRejectAndDontRequeueException不起作用,因为在rabbitmq侦听器对象中不会抛出异常。

我尝试在receiveAndCallback方法中抛出异常,但这似乎不是更好的方法:

代码语言:javascript
复制
rabbitTemplate.receiveAndReply(queueName, new ReceiveAndReplyCallback<Message, Object>() {

        @Override
        public Object handle(Message message) {
            Long messageXdeathCount = null;
            if (null != message.getMessageProperties() && null != message.getMessageProperties().getHeaders()) {
                List<Map<String, ?>> xdeathHeader =
                        (List<Map<String, ?>>) message.getMessageProperties().getHeaders().get(
                        "x-death");
                if (null != xdeathHeader && null != xdeathHeader.get(0)) {
                    messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
                }
            }
            if (messageXdeathCount == null) {
                messageXdeathCount = 0L;
            }
            if (messageXdeathCount >= 5) {
                resendsMessage(message);
            } else {
                // this does not reject the message
                throw new AmqpRejectAndDontRequeueException("rejected");
            }
            return null;
        }
    });
    return receive;
}

在此方法执行之后,消息不会如我所期望的那样被拒绝,并且离开队列(它已经被添加)。

以下是交换和队列声明:

代码语言:javascript
复制
@Bean
public Exchange exchange() {
    TopicExchange exchange = new TopicExchange(EXCHANGE, true, false);
    admin().declareExchange(exchange);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    Queue queue = new Queue("queueName", true, false, false, args);
    admin().declareQueue(queue);
    Binding binding = BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
    admin().declareBinding(binding);
    return exchange;
}

如何在不使用AmqpRejectAndDontRequeueException的情况下拒绝死信队列中的消息?交换是否有可能将x-死信交换设置为self?

谢谢你的帮忙

更新

我尝试了另一种方法,使用频道get和reject:

代码语言:javascript
复制
// exchange creation
@Bean
public Exchange exchange() throws IOException {
    Connection connection = connectionFactory().createConnection();
    Channel channel = channel();
    channel.exchangeDeclare(EXCHANGE, ExchangeTypes.TOPIC, true, false, null);
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-dead-letter-exchange", EXCHANGE);
    channel.queueDeclare("queueName", true, false, false, args);
    channel.queueBind("queueName", EXCHANGE, routingKey);
    return exchange;
}

消息获取和攻击或拒绝:

代码语言:javascript
复制
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = null;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
            (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
    if(null != xdeathHeader && null != xdeathHeader.get(0)) {
        messageXdeathCount = (Long) xdeathHeader.get(0).get("count");
    }
}
if (messageXdeathCount == null) {
    messageXdeathCount = 0L;
}
if (messageXdeathCount >= 5) {
    MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    MessageProperties messageProps =
            messagePropertiesConverter.toMessageProperties(response.getProps(),
response.getEnvelope(), "UTF-8");
    resendsMessage(new Message(response.getBody(), messageProps));
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
    if(response.getProps().getHeaders().get("x-death") == null) {
        response.getProps().getHeaders().put("x-death", new ArrayList<>());
    }
    if(((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0) == null) {
        ((List<Map<String, Object>>)response.getProps().getHeaders().get("x-death")).add(new HashMap<>());
    }
    ((List<Map<String, Object>>) response.getProps().getHeaders().get("x-death")).get(0).put(
            "count", messageXdeathCount + 1);
    channel.basicReject(response.getEnvelope().getDeliveryTag(), true);
}

首先,我意识到这是非常丑陋的,然后消息不能更新之间的get和拒绝。有没有一种使用channel.basicReject和更新x-死亡计数头的方法?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-11-22 15:44:24

receiveAndReply()方法目前不提供对接收消息的确认的控制。请随意打开一个新特征请求

您可以使用侦听器容器来获得所需的灵活性。

编辑

你可以下到狂犬病API..。

代码语言:javascript
复制
rabbitTemplate.execute(channel -> {
    // basicGet, basicPublish, ack/nack etc here
});
票数 2
EN

Stack Overflow用户

发布于 2018-11-23 13:43:48

我可以使用频道的基本方法:

代码语言:javascript
复制
GetResponse response = channel.basicGet(queueName, false);
Long messageXdeathCount = 0L;
if(null != response.getProps() && null != response.getProps().getHeaders()) {
    List<Map<String, ?>> xdeathHeader =
            (List<Map<String, ?>>) response.getProps().getHeaders().get("x-death");
    if(null != xdeathHeader && null != xdeathHeader.get(0)) {
        for (Map<String, ?> map : xdeathHeader) {
            Long count = (Long) map.get("count");
            messageXdeathCount += count == null ? 0L : count;
        }
    }
}
if (messageXdeathCount >= 5) {
    MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    MessageProperties messageProps = messagePropertiesConverter.toMessageProperties(response.getProps(), response.getEnvelope(), "UTF-8");
    resendsMessage(new Message(response.getBody(), messageProps));
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
} else {
    channel.basicReject(response.getEnvelope().getDeliveryTag(), false);
}

我问题的更新部分的问题在最后一行:

代码语言:javascript
复制
channel.basicGet(queueName, true);

布尔值指示消息是否应该被请求:如果不请求,它将按照预期的方式进入交换字母和递增计数x-death报头。布尔值更新为false,修复了该问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/53429782

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档