首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Mqtt- Spring Boot中的集成

Mqtt- Spring Boot中的集成
EN

Stack Overflow用户
提问于 2019-03-20 16:18:16
回答 1查看 5.6K关注 0票数 1

我尝试将mqtt集成到我的Spring Boot Java项目中。

我的依赖:

代码语言:javascript
复制
<dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-mqtt</artifactId>
        <scope>compile</scope>
        <exclusions>
            <exclusion>
                <artifactId>jackson-module-kotlin</artifactId>
                <groupId>com.fasterxml.jackson.module</groupId>
            </exclusion>
        </exclusions>
    </dependency>

我创建了一个名为MqttClient的Java类。我在这里尝试一下:

代码语言:javascript
复制
IMqttClient publisher = new org.eclipse.paho.client.mqttv3.MqttClient("pfad", publisherId);

我在这里得到一个错误: org.eclipse.paho.client.mqttv3.MqttClient

错误:未处理的异常: org.eclipse.paho.client.mqttv3.MqttException

你知道出什么问题了吗?

EN

回答 1

Stack Overflow用户

发布于 2019-03-20 17:20:03

找不到问题出在哪里?我确实用下面的方式解决了同样的问题。看看它对你是否有效。我有一个mqtt网关类,定义如下。

代码语言:javascript
复制
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway(defaultRequestChannel = "mqttPromiseOutboundChannel")
public interface MqttPromiseGateway {

    void sendToMqtt(String data);
}

当我想要向Mqtt发送一条消息时,我会自动连接已定义的网关类。

代码语言:javascript
复制
@Autowired
private MqttPromiseGateway mqttPromiseGateway;

现在,我可以使用该网关通过Mqtt通道发送消息。

代码语言:javascript
复制
 mqttPromiseGateway.sendToMqtt(content);

我们还为MqttProducer定义了一个服务。

代码语言:javascript
复制
@Service
public class DemoMqttProducer {

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        options.setUserName("myusername");
        options.setPassword("mypassword#".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }


    @Bean
    @ServiceActivator(inputChannel = "mqttPromiseOutboundChannel")
    public MessageHandler mqttPromiseOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("testClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic("Promise");
        return messageHandler;
    }

    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageChannel mqttPromiseOutboundChannel() {
        return new DirectChannel();
    }
    @Bean
    public IntegrationFlow mqttInFlow() {
        return IntegrationFlows.from(mqttInbound())
                .transform(p -> p + ", received from MQTT")
                .handle(logger())
                .get();
    }

    @Bean
    public MessageProducerSupport mqttInbound() {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
                mqttClientFactory(), "Promise");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        return adapter;
    }    

}

方法mqttInflow将获取传入的消息并将其发送到记录器。如果您想以不同的方式处理它,则需要更改该方法。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55256254

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档