首页
学习
活动
专区
圈层
工具
发布

MQTT确认
EN

Stack Overflow用户
提问于 2015-07-13 19:13:33
回答 2查看 5.7K关注 0票数 2

如果我使用的是QOS,类型1意味着代理将继续向订阅者发送消息,直到收到确认为止。如何设置或返回ack?请任何人对此有所了解。

这是我的源代码:

代码语言:javascript
复制
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import java.util.Vector;

import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.UTF8Buffer;
import org.fusesource.mqtt.client.Callback;
import org.fusesource.mqtt.client.CallbackConnection;
import org.fusesource.mqtt.client.Listener;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.QoS;
import org.fusesource.mqtt.client.Topic;

import com.adventnet.management.log.Log;
import com.adventnet.nms.util.NmsLogMgr;
public class DefaultMqttListener implements IMqttListener,Runnable{

    long count = 0;
    long start = System.currentTimeMillis();
    private HashMap serverDetailsHash;
    public DefaultMqttListener(HashMap serverProp)
    {
        this.serverDetailsHash = serverProp;
    }
    CallbackConnection myconnection;
    @Override
    public void init() {
        MQTT mqtt = new MQTT();
        String user = env("APOLLO_USER", (String)serverDetailsHash.get("userName"));    //No I18N
        String password = env("APOLLO_PASSWORD", (String)serverDetailsHash.get("password"));    //No I18N
        String host = env("APOLLO_HOST", (String)serverDetailsHash.get("hostName"));    //No I18N
        int port = Integer.parseInt(env("APOLLO_PORT", (String)serverDetailsHash.get("port")));
        try {
            mqtt.setHost(host, port);
            mqtt.setUserName(user);
            mqtt.setPassword(password);
            final CallbackConnection connection = mqtt.callbackConnection();
            myconnection = connection;
            connection.listener(new org.fusesource.mqtt.client.Listener() {
                public void onConnected() {
                }
                public void onDisconnected() {
                }
                public void onFailure(Throwable value) {
                    value.printStackTrace();
                    System.exit(-2);
                }
                public void onPublish(UTF8Buffer topic, Buffer msg, Runnable ack) {
                        long time =  System.currentTimeMillis();
                        callback( topic,  msg,  ack,connection,time);
                }
            });
            connection.connect(new Callback<Void>() {
                @Override
                public void onSuccess(Void value) {
                    NmsLogMgr.M2MERR.log("MQTT Listener connected in ::::", Log.SUMMARY);
                    ArrayList getTopics = (ArrayList)serverDetailsHash.get("Topics");
                    for(int i=0;i<getTopics.size();i++)
                    {
                        HashMap getTopic = (HashMap)getTopics.get(i);
                        String topicName = (String) getTopic.get("topicName");
                        String qosType = (String) getTopic.get("qosType");
                        Topic[] topic = {new Topic(topicName, getQosType(qosType))};
                        connection.subscribe(topic, new Callback<byte[]>() {
                            public void onSuccess(byte[] qoses) {
                            }
                            public void onFailure(Throwable value) {
                                value.printStackTrace();
                                System.exit(-2);
                            }
                        });
                    }
                    //Topic[] topics = {new Topic("adminTest", QoS.AT_LEAST_ONCE),new Topic("adminTest1", QoS.AT_LEAST_ONCE)};
                }
                @Override
                public void onFailure(Throwable value) {
                    value.printStackTrace();
                    System.exit(-2);
                }
            });

            // Wait forever..
            synchronized (Listener.class) {
                while(true){
                    Listener.class.wait();}

            }
        } catch (URISyntaxException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private static String env(String key, String defaultValue) {
        String rc = System.getenv(key);
        if( rc== null ){
            return defaultValue;}
        return rc;
    }

    @Override
    public void callback(UTF8Buffer topic, Buffer msg, Runnable ack, CallbackConnection connection, long time) {
        // TODO Auto-generated method stub
        try {
            String Message = msg.utf8().toString();
            MQTTMessage mqttMsg = new MQTTMessage();
            mqttMsg.setMQTTMessage(Message);
            mqttMsg.setTime(time);
            mqttMsg.setTopic(topic);
            mqttMsg.sethostName((String) serverDetailsHash.get("hostName"));
            MQTTCacheManager.mgr.addToCache(mqttMsg);
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

    }

    @Override
    public void close() {
        // TODO Auto-generated method stub
            NmsLogMgr.M2MERR.log("myconnection closed", Log.SUMMARY);
            myconnection.disconnect(new Callback<Void>() {
            @Override
            public void onSuccess(Void value) {
                System.exit(0);
            }
            @Override
            public void onFailure(Throwable value) {
                value.printStackTrace();
                System.exit(-2);
            }
        });

    }

    @Override
    public void run() {
        this.init();
        // TODO Auto-generated method stub
    }
    public QoS getQosType(String name)
    {
        Properties qosContainer = new Properties();
        qosContainer.put("0", QoS.AT_MOST_ONCE);
        qosContainer.put("1", QoS.AT_LEAST_ONCE);
        qosContainer.put("2", QoS.EXACTLY_ONCE);
        QoS qosName = (QoS) qosContainer.get(name);
        return qosName;
    }
}
EN

回答 2

Stack Overflow用户

发布于 2015-07-13 19:39:43

您根本不需要在代码中发送确认,所有这些都应该由您正在使用的MQTT库来处理。

QOS的分组在发布者和代理之间,然后在代理和任何订阅者之间分开。

票数 2
EN

Stack Overflow用户

发布于 2015-07-13 20:14:37

我没有使用Java库,但您需要订阅主题,指定QoS级别1(至少有一次交付)或QoS级别2(只有一次交付)。在这些情况下,底层库会将ACK数据包发送到代理。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31381856

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档