首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于时间序列处理的火花流(按时间间隔除以数据)

用于时间序列处理的火花流(按时间间隔除以数据)
EN

Stack Overflow用户
提问于 2016-04-05 08:16:00
回答 2查看 907关注 0票数 3

我从UDP套接字获取数据流(nginx在线日志),数据结构是:

代码语言:javascript
复制
date                | ip       | mac   | objectName | rate | size
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 | book1      | 10   | 121
2016-04-05 11:17:34 | 10.0.0.2 | a5:a8 | book2351   | 8    | 2342
2016-04-05 11:17:34 | 10.0.0.3 | d1:b56| bookA5     | 10   | 12

2016-04-05 11:17:35 | 10.0.0.1 | e1:e2 | book67     | 10   | 768
2016-04-05 11:17:35 | 10.0.0.2 | a5:a8 | book2351   | 8    | 897
2016-04-05 11:17:35 | 10.0.0.3 | d1:b56| bookA5     | 9    | 34
2016-04-05 11:17:35 | 10.0.0.4 | c7:c2 | book99     | 9    | 924
...
2016-04-05 11:18:01 | 10.0.0.1 | e1:e2 | book-10    | 8    | 547547
2016-04-05 11:18:17 | 10.0.0.4 | c7:c2 | book99     | 10   | 23423
2016-04-05 11:18:18 | 10.0.0.3 | d1:b56| bookA5     | 10   | 1138

我必须:

  • 聚合数据,按分钟划分-一个结果行(分钟,ip,mac)
  • objectName --可以在一分钟内更改,我必须选择第一个,即2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 book1更改为book67book1也必须更改。
  • 变化率的计数
  • 大小之间的差异(以前在分区内的时间,当前在分区内的时间),即对于2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 =.768-121

因此,结果(不计算大小):

代码语言:javascript
复制
date                | ip       | mac   | objectName | changes | size
2016-04-05 11:17:00 | 10.0.0.1 | e1:e2 | book1      | 0       | 768 - 121
2016-04-05 11:17:00 | 10.0.0.2 | a5:a8 | book2351   | 0       | 897 - 2342
2016-04-05 11:17:00 | 10.0.0.3 | d1:b56| bookA5     | 1       | 34 - 12    
2016-04-05 11:17:00 | 10.0.0.4 | c7:c2 | book99     | 0       | 924
...
2016-04-05 11:18:00 | 10.0.0.1 | e1:e2 | book-10    | 0       | 547547
2016-04-05 11:18:00 | 10.0.0.4 | c7:c2 | book99     | 0       | 23423
2016-04-05 11:18:00 | 10.0.0.3 | d1:b56| bookA5     | 0       | 1138

这里是我的代码快照,我了解updateStateByKeywindow ,但我不知道如何将数据刷新到DB或文件系统,当句点(分钟)更改为时:

代码语言:javascript
复制
private static final Duration SLIDE_INTERVAL = Durations.seconds(10);
private static final String nginxLogHost = "localhost";
private static final int nginxLogPort = 9999;
private class Raw {
  LocalDate time; // full time with seconds
  String ip;
  String mac;
  String objectName;
  int rate;
  int size;
}
private class Key {
  LocalDate time; // time with 00 seconds
  String ip;
  String mac;
}
private class RawValue {
  LocalDate time; // full time with seconds
  String objectName;
  int rate;
  int size;
}
private class Value {
  String objectName;
  int changes;
  int size;
}
public static void main(String[] args) {
    SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("TestNginxLog");
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, SLIDE_INTERVAL);
    jssc.checkpoint("/tmp");
JavaReceiverInputDStream<Raw> logRecords = jssc.receiverStream(new NginxUDPReceiver(nginxLogHost, nginxLogPort));
 PairFunction<Raw, Key, RawValue> pairFunction = (PairFunction<Raw, Key, RawValue>) rawLine -> {
        LocalDateTime time = rawLine.getDateTime();
        Key k = new Key(LocalTime.of(time.getHour(), time.getMinute()), rawLine.getIp(), rawLine.getMac());
        RawValue v = new RawValue(time, rawLine.getObjectName(), rawLine.getRate(), rawLine.getSize());
        return new Tuple2<>(k, v);
    };
    JavaPairDStream<Key, RawValue> logDStream = logRecords.mapToPair(pairFunction);
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-05-14 10:11:24

所以,我要用自己的答案来结束这个问题。可以将此函数示例用作updateStateByKey的参数。这段代码中的线索词是:Optional.absent()用于消除已经保存的数据,Optional.of(...用于分组数据和setAggregateReady(true)

最后一种方法是通过过滤器getAggregateReady(true)和一些火花流输出操作(例如,foreachRDD )将数据保存到外部目标(DB或文件系统)。

在此之后,下一批中的数据将落入updateStateByKey,并将被代码removeIf(T::isAggregateReady)删除。

代码语言:javascript
复制
/**
 * It aggregates data between batches.
 * <p>
 * currentBatchValues values that was got in current batch
 * previousBatchesState values that was got in all previous batches
 * You have to clear data (return for them Optional.absent()) to eliminate them from DStream.
 * First batch: data checked for aggregateReady.
 * Second batch: data, signed aggregateReady=true removes from DStream (you have to save them to DB or another target before this cleaning)
 */
protected Function2<List<Iterable<T>>, Optional<List<T>>, Optional<List<T>>> updateDataRowsFunc = (currentBatchValues, previousBatchesState) -> {

    Optional<List<T>> res;

    //log.debug("previousBatchesState isPresent {}", previousBatchesState.isPresent());
    //log.debug("previousBatchesState {}", previousBatchesState);
    //log.debug("currentBatchValues isEmpty {}", currentBatchValues.isEmpty());
    //log.debug("currentBatchValues {}", currentBatchValues);

    // previous data that was aggregateReady already saved
    if (previousBatchesState.isPresent()) {
        log.debug("count before remove = {}", previousBatchesState.get().size());
        previousBatchesState.get().removeIf(T::isAggregateReady);
        // absent previous state if all of it's data was aggregated already
        int cntBefore = previousBatchesState.get().size();
        if (cntBefore == 0) previousBatchesState = Optional.absent();
    }

    // warn: can't bear comparator outside, for the reason that error "Task can'not serializable"
    Comparator<T> dataRowByAggGroupComparator = (o1, o2) -> o1.getAggregateGroup().compareTo(o2.getAggregateGroup());

    // no data was collected at previous batches && data exists in current batch
    if (!previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) {

        log.debug("algorithm 1");

        // list currentBatchValues contains only 1 value (1-to-N NginxDataRow records), so we getAllJsonFilesInFolder it Iterable and convert to List
        // warn: may be another way to compare Iterable elements, without using List
        List<T> listDataRow = new ArrayList<>();
        currentBatchValues.get(0).forEach(listDataRow::add);

        // in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups
        LocalDateTime minAggGroup = listDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup();
        LocalDateTime maxAggGroup = listDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup();

        // batch was split between groups
        if (!minAggGroup.equals(maxAggGroup)) {
            log.debug("batch was split between groups {} and {}", minAggGroup, maxAggGroup);
            // set ready to aggregate for previous group of data, because aggregate group was changed
            listDataRow.stream().filter(z -> z.getAggregateGroup().equals(minAggGroup)).forEach(z -> z.setAggregateReady(true));
        }

        res = Optional.of(listDataRow);
        //log.debug("agg res = {}", res);

        // data exist in both: previous and current batches
    } else if (previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) {

        log.debug("algorithm 2");

        List<T> listCurrentBatchDataRow = new ArrayList<>();
        currentBatchValues.get(0).forEach(listCurrentBatchDataRow::add);

        LocalDateTime previousBatchAggGroup = previousBatchesState.get().stream().findFirst().get().getAggregateGroup();

        // in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups
        LocalDateTime minCurrentBatchAggGroup = listCurrentBatchDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup();
        LocalDateTime maxCurrentBatchAggGroup = listCurrentBatchDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup();

        // previous and current data in different groups
        if (!previousBatchAggGroup.equals(maxCurrentBatchAggGroup)) {

            log.debug("previous batch needed to save, because agg group was changed from {} to {}", previousBatchAggGroup, maxCurrentBatchAggGroup);
            // set ready to aggregate for previous group of data, because aggregate group was changed
            previousBatchesState.get().stream().forEach(z -> z.setAggregateReady(true));

            // batch was split between groups
            if (!minCurrentBatchAggGroup.equals(maxCurrentBatchAggGroup)) {

                log.debug("batch was split between groups {} and {}", minCurrentBatchAggGroup, maxCurrentBatchAggGroup);
                listCurrentBatchDataRow.stream().filter(z -> z.getAggregateGroup().equals(minCurrentBatchAggGroup)).forEach(z -> z.setAggregateReady(true));

            }
        }

        // union previous and current batches data
        previousBatchesState.get().addAll(listCurrentBatchDataRow);

        res = Optional.of(previousBatchesState.get());
        //log.debug("agg res = {}", res);

        // data exist in previous batch but current batch is empty
    } else if (previousBatchesState.isPresent() && currentBatchValues.isEmpty()) {

        log.debug("algorithm 3");

        res = previousBatchesState;
        //log.debug("agg res = {}", res);

        // all of previous data was aggregated and absent() already
    } else if (!previousBatchesState.isPresent() && currentBatchValues.isEmpty()) {

        log.debug("algorithm 4");

        res = Optional.absent();

    } else {

        log.error("Strange situation, you have to check log-file");
        res = null;

    }

    // if abonent data was received in one minute and after abonent shut down connection, they will stay in DStream forever
    // after some period forced to save them
    if (res != null && res.isPresent()) {
        res.get().stream().filter(z -> Math.abs(java.time.Duration.between(z.getAggregateGroup(), LocalDateTime.now()).getSeconds() / 60) > FORCED_SAVE_INTERVAL).forEach(z -> z.setAggregateReady(true));
    }

    return res;
};
票数 0
EN

Stack Overflow用户

发布于 2016-04-08 13:40:55

这是部分回答,但问题尚未完成。在mapToPair之后,我使用:

代码语言:javascript
复制
    // 1 key - N values
    JavaPairDStream<Key, Iterable<Value>> abonentConnects = logDStream.groupByKey();

    // Accumulate data
    Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>> updateFunc = (Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>>) (values, previousState) -> {
        List<Value> sum = previousState.or(new ArrayList<>());
        for (Iterable<Value> v : values) {
            v.forEach(sum::add);
        }
        return Optional.of(sum);
    };
    JavaPairDStream<Key, List<Value>> state = abonentConnects.updateStateByKey(updateFunc);

    // filter data (previous minute)
    Function<Tuple2<Key, List<Value>>, Boolean> filterFunc = (Function<Tuple2<Key, List<Value>>, Boolean>) v1 -> {
        LocalDateTime previousTime = LocalDateTime.now().minusMinutes(1).withSecond(0).withNano(0);
        LocalDateTime valueTime = v1._1().getTime();
        return valueTime.compareTo(previousTime) == 0;
    };
    JavaPairDStream<Key, List<Value>> filteredRecords = state.filter(filterFunc);

    // save data
    filteredRecords.foreachRDD(x -> {
        if (x.count() > 0) {
            x.saveAsTextFile("/tmp/xxx/grouped/" + LocalDateTime.now().toString().replace(":", "-").replace(".", "-"));
        }
    });

    streamingContext.start();
    streamingContext.awaitTermination();

由于结果数据产生,但是由于每5秒执行一次操作,我每5秒就会得到相同的重复数据。

我知道,我必须使用Optional.absent()来清除流中保存的数据。我尝试过使用它,但我不能组合成一个片段:将数据保存到文件系统或立即清除保存的数据。

问题:我怎么能做到呢?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36420725

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档