首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >spring-integration-mqtt与多个Mqtt服务器进行订阅

spring-integration-mqtt与多个Mqtt服务器进行订阅
EN

Stack Overflow用户
提问于 2018-01-27 10:13:52
回答 1查看 1.3K关注 0票数 3

我使用spring的spring集成- Mqtt和我可以连接到单个Mqtt服务器,并且可以接收订阅主题上的消息,现在我想要创建一个应用程序,它可以连接到多个Mqtt服务器,并且可以从每个连接接收数据,我想将它作为动态管理,在这里我可以从数据库或文本文件中添加更多的Mqtt服务器。

用于订阅的单个Mqtt连接的简单bean如下所示

代码语言:javascript
复制
@Bean
public MessageProducer inbound() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

上面的代码为mqtt服务器创建了一个连接,可以接收消息,如果我为第二个服务器复制两次相同的代码,具有不同的Mqtt ip地址,我可以连接到两个Mqtt服务器,如下所示

代码语言:javascript
复制
@Bean
public MessageProducer inbound() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

@Bean
public MessageProducer inbound2() {

    MqttPahoMessageDrivenChannelAdapter adapter2 =
            new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                             "DATA/#", "LD/#","CONF/#","CONFIG/#");
    adapter2.setCompletionTimeout(0);
    adapter2.setConverter(new DefaultPahoMessageConverter());
    adapter2.setQos(2);

    adapter2.setOutputChannel(mqttInputChannel());
    return adapter2;

}

上面的代码也很好,我可以从两个Mqtt服务器接收消息,但是有什么方法可以像下面这样动态地管理它吗?我将bean的返回类型更改为list,但没有工作:

代码语言:javascript
复制
  @Bean
  public List<MqttPahoMessageDrivenChannelAdapter> getAdapter () {
      List<MqttPahoMessageDrivenChannelAdapter > logConfList=new ArrayList<MqttPahoMessageDrivenChannelAdapter>();
      MqttPahoMessageDrivenChannelAdapter adapter2 =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.1:1883","mqtt_virtual_received_sus_2",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter2.setCompletionTimeout(0);
      adapter2.setConverter(new DefaultPahoMessageConverter());
      adapter2.setQos(2);

      adapter2.setOutputChannel(mqttInputChannel() );

      MqttPahoMessageDrivenChannelAdapter adapter =
              new MqttPahoMessageDrivenChannelAdapter("tcp://192.168.100.14:1883","mqtt_virtual_received_sus_1",
                                               "DATA/#", "LD/#","CONF/#","CONFIG/#");
      adapter.setCompletionTimeout(0);
      adapter.setConverter(new DefaultPahoMessageConverter());
      adapter.setQos(2);

      adapter.setOutputChannel(mqttInputChannel() );
      logConfList.add(adapter);
      logConfList.add(adapter2);

      return logConfList;

  }

有什么方法可以动态地管理这些bean吗?在这里,我可以从文本文件和for循环中获取mqtt服务器的详细信息,或者我可以管理多个连接。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-01-27 17:35:02

动态和运行时集成流

代码语言:javascript
复制
@Autowired
private IntegrationFlowContext flowContext;

private IntegrationFlowRegistration addAnAdapter(String uri, String clientId, MessageChannel channel,
        String... topics) {
    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(uri, clientId, topics);
    // more adapter configuration
    IntegrationFlow flow = IntegrationFlows.from(adapter)
        .channel(channel)
        .get();
    return this.flowContext.registration(flow).register();
}

private void removeAdapter(IntegrationFlowRegistration flowReg) {
    this.flowContext.remove(flowReg.getId());
}
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48474720

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档