我想将Kaa通知与Kaa数据收集功能结合使用。
如何实现获取日志的方案?!
场景是:
1-服务器将通知发送到端点(使用端点ID),然后端点答复发送具有数据收集功能的数据。
2-服务器稍等片刻,检查端点(按端点ID)的上一个记录的时间戳(我尝试使用MongoDB日志附录)。
3-“添加一个听所有通知的通知侦听器:
kaaClient.addNotificationListener(new NotificationListener() {
@Override
public void onNotification(long id, SecurityAlert sampleNotification) {
LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
LOG.info("Notification body: {} \n", sampleNotification.getAlertMessage());
LOG.info("Notification alert type: {} \n", sampleNotification.getAlertType());
inputTopicIdMessage();
}
});日志记录的需求设置:
1-添加了配置模式:
{
"type": "record",
"name": "Configuration",
"namespace": "org.kaaproject.kaa.schema.sample",
"fields": [
{
"name": "samplingPeriod",
"type": "int",
"by_default": "1"
}
]}2-添加了日志模式:
{
"type":"record",
"name":"Data",
"namespace":"org.kaaproject.kaa.scheme.sample",
"fields":[
{
"name":"topicId",
"type":"int"
},
{
"name":"timeStamp",
"type":"long"
}
],
"displayName":"Logging scheme"}
3-在MongoDB中添加了日志附录。
在此之后,我希望使用以下命令启用“查看日志”:
db.logs_ApplicationToken.find();更新(2017-12-03):
我运行了以下代码,类似于Kaa通知代码的Kaa数据收集代码:
private static class MeasureSender implements Runnable {
KaaClient kaaClient;
MeasureSender(KaaClient kaaClient) {
this.kaaClient = kaaClient;
}
@Override
public void run() {
sentRecordsCount.incrementAndGet();
DataLogging record = generateTopicId();
RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
LOG.info("Log record {} submitted for sending", record.toString());
try {
RecordInfo recordInfo = future.get(); // wait for log record delivery error
BucketInfo bucketInfo = recordInfo.getBucketInfo();
LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
confirmationsCount.incrementAndGet();
} catch (Exception e) {
LOG.error("Exception was caught while waiting for log's delivery report.", e);
}
}
}
private static DataLogging generateTopicId() {
//TODO: Logic for get topicId
return new DataLogging(topicId, System.currentTimeMillis());
}在运行应用程序之后,当我在Kaa服务器上运行以下命令时:
db.logs_18693008741969774929.find();我得到了结果:
{ "_id" : ObjectId("5a228679ef540e07f3e73cd6"), "header" : { "endpointKeyHash" :{ "string" : "dXhbOD271Qtg9+FhxHXfrjE9bw4=" }, "applicationToken" : { "string" : "18693008741969774929" }, "headerVersion" : { "int" : 1 }, "timestamp" : { "long": NumberLong("1512212089541") }, "logSchemaVersion" : { "int" : 5 } }, "event": { "topicId" : 0, "timeStamp": 0 } }结果表明,"topicId“尚未收到。因为它等于0。
正如您在上面的方法private static DataLogging generateTopicId(){}中所看到的,
我做这件事需要一些逻辑。
更新(2017-12-06):
public class NotificationDemo {
private static final Logger LOG = LoggerFactory.getLogger(NotificationDemo.class);
private static KaaClient kaaClient;
private static final int LOGS_DEFAULT_THRESHOLD = 1;
private static int samplePeriodInSeconds = 1;
private static volatile AtomicInteger sentRecordsCount = new AtomicInteger(0);
private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
private JsonObjectParserImpl jsonObjectParser = new JsonObjectParserImpl();
private String StatusOfDevices = String.valueOf(jsonObjectParser.getGetStatusOfDevices());
private static ScheduledExecutorService executor;
private static ScheduledFuture<?> executorHandle;
/**
* The list of all available notification
* <p>
* private static int samplePeriodInSeconds = 1;
* private static volatile AtomicInteger sentRecordsCount = new AtomicInteger0t;
* private static volatile AtomicInteger confirmationsCount = new AtomicInteger(0);
* <p>
* private static Random rand = new Random(o;pics.
*/
private static List<Topic> topics;
/**
* Topics client subscribed
*/
private static List<Topic> subscribedTopics = new ArrayList<Topic>();
InputStreamReader inputStreamReader = new InputStreamReader(System.in);
BufferedReader keyboardInput = new BufferedReader(inputStreamReader);
public NotificationDemo() throws IOException, JSONException {
}
public static void main(String[] args) throws IOException, JSONException {
NotificationDemo main = new NotificationDemo();
main.config();
}
public void config() {
LOG.info("Notification demo started");
//kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener(), true);
KaaClient kaaClient = Kaa.newClient(new DesktopKaaPlatformContext(), new SimpleKaaClientStateListener() {
@Override
public void onStarted() {
LOG.info("--= Kaa client started =--");
}
@Override
public void onStopped() {
LOG.info("--= Kaa client stopped =--");
}
}, true);
/*
* Set record count strategy for uploading every log record as soon as it is created.
*/
kaaClient.setLogUploadStrategy(new RecordCountLogUploadStrategy(LOGS_DEFAULT_THRESHOLD));
/*
* A listener that listens to the notification topic list updates.
*/
kaaClient.addConfigurationListener(new ConfigurationListener() {
@Override
public void onConfigurationUpdate(Configuration configuration) {
LOG.info("--= Endpoint configuration was updated =--");
displayConfiguration(configuration);
Integer newSamplePeriod = configuration.getSamplingPeriod();
if ((newSamplePeriod != null) && (newSamplePeriod > 0)) {
changeMeasurementPeriod(kaaClient, newSamplePeriod);
} else {
LOG.warn("Sample period value (= {} in updated configuration is wrong, so ignore it.", newSamplePeriod);
}
}
});
NotificationTopicListListener topicListListener = new BasicNotificationTopicListListener();
kaaClient.addTopicListListener(topicListListener);
/*
* Add a notification listener that listens to all notifications.
*/
kaaClient.addNotificationListener(new NotificationListener() {
@Override
public void onNotification(long id, Notification notification) {
LOG.info("Notification from the topic with id [{}] and name [{}] received.", id, getTopic(id).getName());
LOG.info("Notification body: {} \n", notification.getMessage());
String commands = (notification.getMessage());
if (commands.equals("arm")) {
System.out.println("The Status of Devices:" + StatusOfDevices);
}
inputTopicIdMessage();
}
});
/*
* Start the Kaa client and connect it to the Kaa server.
*/
kaaClient.start();
topics = kaaClient.getTopics();
/*
* List the obtained notification topics.
*/
showTopics();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNextLong())
{
long topicId = scanner.nextLong();
if (getTopic(topicId) != null) {
LOG.info("Subscribing to optional topic {}", topicId);
subscribeTopic(topicId);
} else {
LOG.info("There is no input topic id. Please, input existing topic id.");
}
}
/*
* Stop listening to the notification topic list updates.
*/
kaaClient.removeTopicListListener(topicListListener);
unsubscribeOptionalTopics();
/*
* Stop the Kaa client and release all the resources which were in use.
*/
kaaClient.stop();
LOG.info("Notification demo stopped");
}
private static void changeMeasurementPeriod(KaaClient kaaClient, Integer newPeriod) {
if (executorHandle != null) {
executorHandle.cancel(false);
}
samplePeriodInSeconds = newPeriod;
executorHandle = executor.scheduleAtFixedRate(new MeasureSender(kaaClient), 0, samplePeriodInSeconds, TimeUnit.SECONDS);
LOG.info("Set new sample period = {} seconds.", samplePeriodInSeconds);
}
private static class MeasureSender implements Runnable {
KaaClient kaaClient;
MeasureSender(KaaClient kaaClient) {
this.kaaClient = kaaClient;
}
@Override
public void run() {
sentRecordsCount.incrementAndGet();
DataLogging record = generateTopicId();
RecordFuture future = kaaClient.addLogRecord(record); // submit log record for sending to Kaa node
LOG.info("Log record {} submitted for sending", record.toString());
try {
RecordInfo recordInfo = future.get(); // wait for log record delivery error
BucketInfo bucketInfo = recordInfo.getBucketInfo();
LOG.info("Received log record delivery info. Bucket Id [{}]. Record delivery time [{} ms].",
bucketInfo.getBucketId(), recordInfo.getRecordDeliveryTimeMs());
confirmationsCount.incrementAndGet();
} catch (Exception e) {
LOG.error("Exception was caught while waiting for log's delivery report.", e);
}
}
}
private static DataLogging generateTopicId() {
Integer topicId = generateTopicId().getTopicId();
return new DataLogging(topicId, System.currentTimeMillis());
}
private static void inputTopicIdMessage() {
LOG.info("\nPlease, type topic ID in order to subscribe to ones or type any text to exit: \n");
}
private static void displayConfiguration(org.kaaproject.kaa.schema.sample.Configuration configuration) {
LOG.info("Configuration = {}", configuration.toString());
}
private static void showTopics() {
if (topics == null || topics.isEmpty()) {
LOG.info("Topic list is empty");
return;
}
LOG.info("Available topics:");
for (Topic topic : topics) {
LOG.info("Topic id: {}, name: {}, type: {}", topic.getId(), topic.getName(), topic.getSubscriptionType());
}
LOG.info("Subscribed on topics:");
for (Topic t : getOneTypeTopics(SubscriptionType.MANDATORY_SUBSCRIPTION)) {
LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
}
/*
* Optional topics
*/
if (!subscribedTopics.isEmpty()) {
for (Topic t : subscribedTopics) {
LOG.info("Topic id: {}, name: {}, type: {}", t.getId(), t.getName(), t.getSubscriptionType().name());
}
}
inputTopicIdMessage();
}
private static List<Topic> getOneTypeTopics(SubscriptionType type) {
List<Topic> res = new ArrayList<>();
for (Topic t : NotificationDemo.topics) {
if (t.getSubscriptionType() == type) {
res.add(t);
}
}
return res;
}
private static void subscribeTopic(long topicId) {
try {
subscribedTopics.add(getTopic(topicId));
kaaClient.subscribeToTopic(topicId, true);
} catch (UnavailableTopicException e) {
e.printStackTrace();
}
inputTopicIdMessage();
}
private static Topic getTopic(long id) {
for (Topic t : topics)
if (t.getId() == id)
return t;
return null;
}
private static void sleepForSeconds(int seconds) {
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private static void unsubscribeOptionalTopics() {
List<Topic> topics = getOneTypeTopics(SubscriptionType.OPTIONAL_SUBSCRIPTION);
for (Topic t : subscribedTopics) {
try {
kaaClient.unsubscribeFromTopic(t.getId());
} catch (UnavailableTopicException e) {
// if not subscribe
}
}
}
private static void waitForAnyInput() {
try {
System.in.read();
} catch (IOException e) {
LOG.warn("Error happens when waiting for user input.", e);
}
}
/**
* A listener that tracks the notification topic list updates
* and subscribes the Kaa client to every new topic available.
*/
private static class BasicNotificationTopicListListener implements NotificationTopicListListener {
@Override
public void onListUpdated(List<Topic> list) {
LOG.info("Topic list was updated:");
topics.clear();
topics.addAll(list);
showTopics();
}
}}
发布于 2017-12-14 12:26:45
虽然我不能完全确定,但我还是会尽力回答的。(目前还不清楚你想要实现什么。)
首先,我必须说,数据日志记录和通知是完全正交的特性。这意味着它们不以任何方式相互作用。
结果表明,"topicId“尚未收到。因为它等于0。
这很可能是因为您的应用程序根本没有设置topicId。(例如,不带参数地调用new DataLogging()。)
从Kaa的角度来看,这个topicId只是一个整数字段,服务器并不关心--它会保存在mongo中,不管您发送什么。如果您的目标是收集任何数据,只需在其中放置一个随机整数即可。
private static DataLogging generateTopicId() {
Integer topicId = 42;
return new DataLogging(topicId, System.currentTimeMillis());
}https://stackoverflow.com/questions/47608165
复制相似问题