首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >MQTT与SpringBoot集成连接丢失

MQTT与SpringBoot集成连接丢失
EN

Stack Overflow用户
提问于 2020-09-04 08:43:46
回答 1查看 2.4K关注 0票数 1

我目前在SpringBoot中有一个API,我想添加一个MQTT客户机来订阅一个或多个主题。我尝试了几个Paho,Hive客户端,但没有成功,我目前使用的是SpringBoot的默认MQTT,它使用Paho,但即使使用基本配置也无法使它工作。我一启动应用程序就会出现“连接丢失”错误.你能告诉我一个解决办法或者其他什么有用的方法吗?谢谢!

Maven:

代码语言:javascript
复制
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>           
        </dependency>
代码语言:javascript
复制
....

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import lombok.extern.slf4j.Slf4j;
import springfox.documentation.swagger2.annotations.EnableSwagger2;

@Slf4j
@SpringBootApplication
@EnableSwagger2
public class MainApiSpring {

    public static void main(String[] args) {

        SpringApplication.run(MainApiSpring.class, args);
        log.trace("L'application a correctement été démarrée.");

    }

    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883",
                "test/topic");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);

        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {

            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                System.out.println(message.getPayload());
            }

        };
    }
}

运行中的错误:

代码语言:javascript
复制
2020-09-04 10:31:39.099 ERROR 4244 --- [           main] .m.i.MqttPahoMessageDrivenChannelAdapter : Exception while connecting and subscribing, retrying

org.eclipse.paho.client.mqttv3.MqttException: Connexion perdue
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:197) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    at java.base/java.lang.Thread.run(Thread.java:830) ~[na:na]
Caused by: java.io.EOFException: null
    at java.base/java.io.DataInputStream.readByte(DataInputStream.java:272) ~[na:na]
    at org.eclipse.paho.client.mqttv3.internal.wire.MqttInputStream.readMqttWireMessage(MqttInputStream.java:92) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    at org.eclipse.paho.client.mqttv3.internal.CommsReceiver.run(CommsReceiver.java:137) ~[org.eclipse.paho.client.mqttv3-1.2.4.jar:na]
    ... 1 common frames omitted

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-09-05 13:24:04

答:这适用于MqttOptions定义的!

代码语言:javascript
复制
    @Bean
    public MqttConnectOptions getReceiverMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(30);
        mqttConnectOptions.setKeepAliveInterval(60);
        mqttConnectOptions.setAutomaticReconnect(true);

//      mqttConnectOptions.setUserName("myemail");
        String password = "mypassword!";
//      String hostUrl = "tcp://maqiatto.com:1883";
        String hostUrl = "tcp://localhost:1883";
//      mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[] { hostUrl });
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getReceiverMqttConnectOptions());
        return factory;
    }

    @Bean
    public MessageProducer inbound() {
        String clientId2 = "uuid-" + UUID.randomUUID().toString();
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(clientId2,
//              mqttClientFactory(), "myemail/test");
                mqttClientFactory(), "test", "test/paho");
        adapter.setCompletionTimeout(20000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(2);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63737765

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档