首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink流事件时间窗口

Flink流事件时间窗口
EN

Stack Overflow用户
提问于 2017-12-19 15:39:22
回答 1查看 2K关注 0票数 1

我正在运行一个基于EventTime的测试窗口的简单示例。我能够用处理时间生成输出,但是当我使用EventTime时,不会出现输出。请帮助我理解我做错了什么。

我正在创建一个大小为10秒的SlidingWindow,它每5秒滑动一次,在窗口的末尾,系统将发出在此期间收到的消息数量。

代码语言:javascript
复制
input :
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695853 (generated at 13th second, received at 13th second) 
a,1513695856 (generated at 16th second, received at 19th second) 
a,1513695859 (generated at 13th second, received at 19th second) 

第二个字段代表事件的时间戳,代表13,13,16,19秒一分钟。

代码语言:javascript
复制
if i am using Processing Time window :

Output :
(a,1)
(a,3)
(a,2)

但是当我使用事件时间时,没有输出就是打印。请帮助我了解出了什么问题。

代码语言:javascript
复制
package org.apache.flink.window.training;

import java.io.InputStream;
import java.util.Properties;

import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;

import com.fasterxml.jackson.databind.ObjectMapper;

public class SocketStream {


  private static Properties properties = new Properties();

  public static void main(String args[]) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    InputStream inputStream =
        SocketStream.class.getClassLoader().getResourceAsStream("local-kafka-server.properties");

    properties.load(inputStream);

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    FlinkKafkaConsumer010<String> consumer =
        new FlinkKafkaConsumer010<>("test-topic", new SimpleStringSchema(), properties);

    DataStream<Element> socketStockStream =
        env.addSource(consumer).map(new MapFunction<String, Element>() {
          @Override
          public Element map(String value) throws Exception {

            String split[] = value.split(",");
            Element element = new Element(split[0], Long.parseLong(split[1]));

            return element;
          }
        }).assignTimestampsAndWatermarks(new TimestampExtractor());

    socketStockStream.map(new MapFunction<Element, Tuple2<String, Integer>>() {

      @Override
      public Tuple2<String, Integer> map(Element value) throws Exception {

        return new Tuple2<String, Integer>(value.getId(), 1);
      }
    }).keyBy(0).timeWindow(Time.seconds(10), Time.seconds(5))
    .sum(1).
     print();

    env.execute();
  }

  public static class TimestampExtractor implements AssignerWithPunctuatedWatermarks<Element> {

    private static final long serialVersionUID = 1L;

    @Override
    public long extractTimestamp(Element element, long previousElementTimestamp) {

      return element.getTimestamp();
    }

    @Override
    public Watermark checkAndGetNextWatermark(Element lastElement, long extractedTimestamp) {
      // TODO Auto-generated method stub
      return null;
    }
  }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-12-19 16:34:36

事件时间处理需要正确生成时间戳和水印

代码中的TimestampExtractor不生成水印,但总是返回null

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47890401

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档