首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Camel rabbitmq + convertSendAndReceive():无法转换内容类型为[null]的传入消息

Camel rabbitmq + convertSendAndReceive():无法转换内容类型为[null]的传入消息
EN

Stack Overflow用户
提问于 2015-06-09 15:37:17
回答 3查看 9.3K关注 0票数 6

我有一个组件,它向等待返回结果的员工服务发送消息。

代码语言:javascript
复制
@Autowired
private RabbitTemplate rabbit;
[...]
Object response = rabbit.convertSendAndReceive("testQ", ...);

工人服务是用Apache Camel rabbitmq路由实现的:

代码语言:javascript
复制
from("rabbitmq://localhost/myExchange?declare=false&routingKey=testQ&queue=testQ")
        .routeId("myCamelRoute")
        .process(myProcessor)
        .to("log:myLog");

myProcessor处理消息并注销Camel消息头:

代码语言:javascript
复制
__TypeId__=...
breadcrumbId=...
rabbitmq.CONTENT_ENCODING=UTF-8
rabbitmq.CONTENT_TYPE=application/json
rabbitmq.CORRELATIONID=7e390b6b-d30f-4f26-ba44-33fb887db0e8
rabbitmq.DELIVERY_TAG=4
rabbitmq.EXCHANGE_NAME=
rabbitmq.PRIORITY=0
rabbitmq.REPLY_TO=amq.rabbitmq.reply-to.g2dkABNyYWJiaXRAOWU5ZjkxNDI4ZWRiAAAJgwAAADUC.5+kPXXxaXhoYo7A4T0HSZQ==
rabbitmq.ROUTING_KEY=testQ

消息头显然在工作端包含rabbitmq.CONTENT_TYPE=application/json,但是当响应消息返回时,这个信息似乎会“丢失”:

o.s.a.s.c.Jackson2JsonMessageConverter:无法转换内容类型为null的传入消息。

你知道这是怎么回事吗?

EN

回答 3

Stack Overflow用户

发布于 2016-03-18 09:33:03

在使用Header字段时,我在使用RabbitMQ管理控制台时也看到了同样的错误。将"content_type":"application/json“作为消息属性传递很好。

票数 12
EN

Stack Overflow用户

发布于 2020-08-11 18:21:00

编辑:实际上,似乎铬自动完成没有正常工作。我手动键入属性,并且工作也很好。

我也面临着同样的问题。这似乎是RabbitMQ管理控制台和Spring使用者之间的一个问题。

基于this,我重写了来自Jackson2JsonMessageConverter的fromMessage方法,并将contentType强制为application/json,并且工作良好

我的堆栈是:

  • org.springframework: 4.3.6
  • org.springframework.amqp :1.7.14
  • com.fasterxml.jackson.core: 2.11.2

代码语言:javascript
复制
import org.springframework.amqp.core.Message;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;

public class JacksonMessageConverter extends Jackson2JsonMessageConverter {
    public JacksonMessageConverter() {
        super();
    }
    
    @Override
    public Object fromMessage(Message message) {
        message.getMessageProperties().setContentType("application/json");
        return super.fromMessage(message);
    }
}

然后我用了我的兔子配置

代码语言:javascript
复制
    @Bean
public MessageConverter messageConverter() {
    JacksonMessageConverter jsonMessageConverter = new JacksonMessageConverter();
    jsonMessageConverter.setClassMapper(classMapper());
    
    return jsonMessageConverter;
}

其他的解决方案是在GO中创建一个基本的应用程序来发布消息,添加内容类型,并且运行良好,因为问题似乎出现在RabbitMQ管理控制台中。

我的剧本开始了:

代码语言:javascript
复制
// MqFactory - Creates a connection with mq
func MqFactory() *amqp.Connection {
    mqURI, err := amqp.ParseURI(createAmqpURI())
    if err != nil {
        fmt.Printf("Failed on parse mq uri: %s", err)
    }
    conn, err := amqp.Dial(mqURI.String())
    if err != nil {
        failOnError(err, "Failed to connect to MQ")
    } else {
        fmt.Println("Connection established with MQ")
    }

    sendMessage(conn)

    return conn
}

func sendMessage(conn *amqp.Connection) {
    channel, err := conn.Channel()
    if err != nil {
        failOnError(err, "Failed 1")
    }

    err2 := channel.Publish(
        "<exchange>",
        "<routin_key>",
        false,
        false,
        amqp.Publishing{
            Headers: amqp.Table{
                "__TypeId__": "<type_id>", // If needed
            },
            ContentType: "application/json",
            Body:        []byte("<body>"),
        },
    )

    if err2 != nil {
        failOnError(err2, "Failed 2")
    }
}

func createAmqpURI() string {
    host := os.Getenv("MQ_HOST")
    port := os.Getenv("MQ_PORT")
    usr := os.Getenv("MQ_USR")
    pwd := os.Getenv("MQ_PWD")
    return "amqp://" + usr + ":" + pwd + "@" + host + ":" + port
}

func failOnError(err error, msg string) {
    println(msg)
    if err != nil {
        fmt.Printf("%s: %s", msg, err)
    }
}

票数 2
EN

Stack Overflow用户

发布于 2021-01-29 10:26:49

如果春季-amqp:

代码语言:javascript
复制
@Bean
public CachingConnectionFactory cachingConnectionFactory() {
    final CachingConnectionFactory factory = new CachingConnectionFactory();
    factory.setUsername(username);
    factory.setPassword(password);
    factory.setHost(host);
    factory.setPort(port);
    return factory;
}

@Bean
@DependsOn("cachingConnectionFactory")
public RabbitTemplate rabbitTemplate(CachingConnectionFactory cachingConnectionFactory) {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);
    rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
    return rabbitTemplate;
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30736670

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档