请有人帮助我理解这个配置中的问题所在:版本:
@Configuration
public class MqttConfig {
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { "tcp://localhost:1883" });
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter inboundAdapter(MqttPahoClientFactory clientFactory) {
return new MqttPahoMessageDrivenChannelAdapter("MyApp", clientFactory, "ReplyTopic");
}
@Bean
IntegrationFlow inboundFlow(MqttPahoMessageDrivenChannelAdapter inboundAdapter) {
return IntegrationFlows.from(inboundAdapter)
.bridge()
.channel("replyChannel")
.get();
}
@Bean
public MessageChannel replyChannel() {
return MessageChannels.publishSubscribe().get();;
}
@Bean
public MqttPahoMessageHandler outboundAdapter(MqttPahoClientFactory clientFactory) {
return new MqttPahoMessageHandler("MyApp", clientFactory);
}
@Bean
public IntegrationFlow outboundFlow(MqttPahoMessageHandler outboundAdapter) {
return IntegrationFlows.from("requestChannel")
.handle(outboundAdapter).get()
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "requestChannel", replyChannel = "replyChannel")
String send(String request, @Header(MqttHeaders.TOPIC) String requestTopic);
}
}客户端代码
@RestController
public class MyController {
@Autowired
private MyGateway myGateway;
@GetMapping("/sendRequest")
public String sendRequest() {
var response = myGateway.send("Hello", "MyTopic");
return response;
}
}用法:
curl http://localhost:8080/sendRequest来自mqtt代理(HiveMQ)的手动响应
docker exec -it hivemq mqtt pub -t ReplyTopic -m "World" --debug
CLIENT mqttClient-MQTT_5_0-9ecded84-8416-4baa-a8f3-d593c692bc65: acknowledged PUBLISH: 'World' for PUBLISH to Topic: ReplyTopic但是我不知道为什么在Spring应用程序输出上有这样的消息
2022-10-25 18:04:33.171 ERROR 17069 --- [T Call: MyApp] .m.i.MqttPahoMessageDrivenChannelAdapter : Unhandled exception for GenericMessage [payload=World, headers={mqtt_receivedRetained=false, mqtt_id=0, mqtt_duplicate=false, id=9dbd5e14-66ed-5dc8-6cea-6d04ef19c6cc, mqtt_receivedTopic=ReplyTopic, mqtt_receivedQos=0, timestamp=1666713873170}]
org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.handler.BridgeHandler@6f63903c]; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available有人能解释一下为什么我会有这个吗?
no output-channel or replyChannel header available发布于 2022-10-25 17:02:30
我认为您所面临的问题与您的bridge()配置无关。
这来自于MessagingGatewaySupport及其由replyChannel = "replyChannel"激活的replyMessageCorrelator特性。
真正的问题是,您正在尝试用MQTT v3做一些不可能的事情。没有通过MQTT传输消息头来进行网关发起者(一个相关密钥)所需的TemporaryReplyChannel。在关于网关的文档中可以看到更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#gateway。
换句话说:独立于网关上的replyChannel配置,replyChannel报头必须出现在回复消息中。这就是网关将请求与回复联系起来的方式。
您必须查看聚合器,以并行发送请求消息,并保留所述的TemporaryReplyChannel头。然后,当您收到回复(inboundAdapter)时,将其发送到此聚合器。您需要确保来自请求和应答有效负载的一些相关键,这样它们就可以匹配并完成组,以便将应答发送回网关。
请参阅docs中的更多信息:https://docs.spring.io/spring-integration/docs/current/reference/html/message-routing.html#aggregator
https://stackoverflow.com/questions/74197479
复制相似问题