首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Flink -事件时间窗口

Apache Flink -事件时间窗口
EN

Stack Overflow用户
提问于 2018-08-29 19:39:38
回答 2查看 1.2K关注 0票数 0

我想在Apache flink中创建键控窗口,以便每个键的窗口在该键的第一个事件到达n分钟后执行。是否可以使用事件时间特征来完成(因为处理时间取决于系统时钟,并且不确定第一个事件何时到达)。如果可能,请解释事件时间和水印的分配,并解释如何在n分钟后调用进程窗口函数。

下面是代码的一部分,它可以让你了解我目前正在做的事情:

代码语言:javascript
复制
            //Make keyed events so as to start a window for a key
            KeyedStream<SourceData, Tuple> keyedEvents = 
                    env.addSource(new MySource(configData),"JSON Source")
                    .assignTimestampsAndWatermarks(new MyTimeStamps())
                    .setParallelism(1)
                    .keyBy("service");


            //Start a window for windowTime time
            DataStream<ResultData> resultData=
                    keyedEvents
                    .timeWindow(Time.minutes(winTime))
                    .process(new ProcessEventWindow(configData))
                    .name("Event Collection Window")
                    .setParallelism(25);

那么,我如何分配事件时间和水印,使窗口跟随第一个事件的事件时间作为起始点,并在10分钟后执行(第一个事件的开始时间可能因键不同而不同)。任何帮助都将不胜感激。

代码语言:javascript
复制
        /------------ ( window of 10 minutes )
Streams          |------------ ( window of 10 minutes )
            \------------ ( window of 10 minutes )

Edit :我用来分配时间戳和水印的类

代码语言:javascript
复制
public class MyTimeStamps implements AssignerWithPeriodicWatermarks<SourceData> {

    @Override
    public long extractTimestamp(SourceData element, long previousElementTimestamp) {

          //Will return epoch of currentTime
        return GlobalUtilities.getCurrentEpoch();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // TODO Auto-generated method stub
        //Will return epoch of currentTime + 10 minutes
        return new Watermark(GlobalUtilities.getTimeShiftNMinutesEpoch(10));
    }

}
EN

回答 2

Stack Overflow用户

发布于 2018-08-30 17:25:33

我认为对于您的用例,最好使用ProcessFunction。您可以做的是在第一个事件到来时注册一个EventTimeTimer。而不是在onTimer方法中发出结果。

类似于:

代码语言:javascript
复制
public class ProcessFunctionImpl extends ProcessFunction<SourceData, ResultData> {

    @Override
    public void processElement(SourceData value, Context ctx, Collector<ResultData> out)
        throws Exception {

        // retrieve the current aggregate
        ResultData current = state.value();
        if (current == null) {
            // first event arrived
            current = new ResultData();
            // register end of window
            ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 60 * 1000 /* 10 minutes */);
        }

        // update the state's aggregate
        current += value;

        // write the state back
        state.update(current);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<ResultData> out)
        throws Exception {

        // get the state for the key that scheduled the timer
        ResultData result = state.value();

        out.collect(result);

        // reset the window state
        state.clear();
    }
}
票数 1
EN

Stack Overflow用户

发布于 2018-08-30 03:37:15

关于事件时间窗口,我不久前也有过类似的问题。下面是我的流的样子

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//Consumer Setup

val stream = env.addSource(consumer)
  .assignTimestampsAndWatermarks(new WMAssigner)

// Additional Setup here

stream
  .keyBy { data => data.findValue("service") }
  .window(TumblingEventTimeWindows.of(Time.minutes(10)))
  .process { new WindowProcessor }

  //Sinks go here

我的WMAssigner类看起来像这样(注意:这允许1分钟的乱序事件发生,如果你不想延迟,你可以扩展一个不同的时间戳提取器):

代码语言:javascript
复制
class WMAssigner extends BoundedOutOfOrdernessTimestampExtractor[ObjectNode] (Time.seconds(60)) {
  override def extractTimestamp(element: ObjectNode): Long = {
    val tsStr = element.findValue("data").findValue("ts").toString replaceAll("\"", "")
    tsStr.toLong
  }
}

我想要用于水印的时间戳是data.ts字段。

我的WindowProcessor:

代码语言:javascript
复制
class WindowProcessor extends ProcessWindowFunction[ObjectNode,String,String,TimeWindow] {
  override def process(key: String, context: Context, elements: Iterable[ObjectNode], out: Collector[String]): Unit = {
    val out = ""
    elements.foreach( value => {
      out = value.findValue("data").findValue("outData")
    }
    out.collect(out)
  }
}

如果有什么不清楚的地方,请告诉我

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52076584

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档