我让一个消费者消费一条消息,进行一些转换,然后创建一个新的Pojo并将其传递给生产者。生产者使用JmsTemplate在队列中发送消息。生产者应该将原始消息的头部(如(JMSType,JMSCorrelationID,JMSExpiration,JMSDeliveryMode) )设置为要发送的新消息。但是生产者应该更改原始消息的replyTo目的地。我还没有找到创建目的地并设置为JMSReplyTo的方法。有些人知道我如何做到这一点?
也许JmsTemplate不是做这件事的正确类。
public class Producer {
private final JmsTemplate jmsTemplate;
@Value("${jms-destination}")
private String destination;
public Producer(@Autowired JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void send(final MessageHeaders headers, final Pojo pojo) {
Validate.notNull(order);
jmsTemplate.convertAndSend(destination, pojo, (message) -> {
final Destination replyToDestination = ???;
message.setJMSReplyTo(replyToDestination);
message.setJMSType((String) headers.get(JmsHeaders.TYPE));
message.setJMSCorrelationID((String) headers.get(JmsHeaders.CORRELATION_ID));
message.setJMSExpiration((long) headers.get(JmsHeaders.EXPIRATION));
message.setJMSDeliveryMode((int) headers.get(JmsHeaders.DELIVERY_MODE));
return message;
});
}
}我发现只有这种方法可以做到,但我不喜欢也不确定这会带来副作用:
public class Producer {
private final JmsTemplate jmsTemplate;
@Value("${jms-destination}")
private String destination;
@Value("${jms-replyTo}")
private String replyTo;
public Producer(@Autowired JmsTemplate jmsTemplate) {
this.jmsTemplate = jmsTemplate;
}
public void send(final MessageHeaders headers, Pojo pojo) {
Validate.notNull(order);
jmsTemplate.convertAndSend(destination, order, (message) -> {
final Destination replyToDestination = buildReplyTo();
message.setJMSReplyTo(replyToDestination);
message.setJMSType((String) headers.get(JmsHeaders.TYPE));
message.setJMSCorrelationID((String) headers.get(JmsHeaders.CORRELATION_ID));
message.setJMSExpiration((long) headers.get(JmsHeaders.EXPIRATION));
message.setJMSDeliveryMode((int) headers.get(JmsHeaders.DELIVERY_MODE));
return message;
});
}
private Destination buildReplyTo() throws JMSException {
final Session session = jmsTemplate.getConnectionFactory().createConnection()
.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination queue =
jmsTemplate.getDestinationResolver().resolveDestinationName(session, replyTo, false);
return queue;
}
}发布于 2019-08-19 20:39:06
您的解决方案创建了不闭合的侧向连接。您应该使用已有的session对象,通过send API手动发送您的pojo。使用getRequiredMessageConverter转换您的pojo。
public void send(final MessageHeaders headers, Pojo pojo) {
Validate.notNull(order);
final String responseQueue = "responseQ";
jmsTemplate.send(destination,
session -> {
Message message = getRequiredMessageConverter().toMessage(message, session);
message.setJMSReplyTo(session.createQueue(responseQueue)); //fill queue
//any other setters
return message;
});
}
// based on Spring JmsTemplate library
private MessageConverter getRequiredMessageConverter() throws IllegalStateException {
MessageConverter converter = jmsTemplate.getMessageConverter();
if (converter == null) {
throw new IllegalStateException("No 'messageConverter' specified. Check configuration of JmsTemplate.");
} else {
return converter;
}
} https://stackoverflow.com/questions/46216326
复制相似问题