首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何将Flink中的时间窗口保存为文本文件?

如何将Flink中的时间窗口保存为文本文件?
EN

Stack Overflow用户
提问于 2019-11-08 22:30:40
回答 1查看 296关注 0票数 0

我开始在Java的ApacheFlink中工作。

我的目标是在一分钟的时间窗口中使用一个ApacheKafka主题,这将应用非常基本的信息,并将每个窗口的结果记录在一个文件中。

到目前为止,我成功地将文本转换简化应用于我接收的内容,我应该使用apply或process来写入文件,但窗口的结果我有点迷失。

到目前为止,这是我的代码

代码语言:javascript
复制
package myflink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.time.ZoneId;
import java.util.Date;
import java.util.Properties;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.akka.org.jboss.netty.channel.ExceptionEvent;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import scala.util.parsing.json.JSONObject;
public class BatchJob {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment  env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", "test");
        properties.setProperty("auto.offset.reset", "latest");
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("topic-basic-test", new SimpleStringSchema(), properties);
        DataStream<String> data = env.addSource(consumer);
        data.flatMap(new JSONparse()).timeWindowAll(Time.minutes(1))."NEXT ??" .print()
        System.out.println("Hola usuario 2");
        env.execute("Flink Batch Java API Skeleton");
    }
    public static class JSONparse implements FlatMapFunction<String, Tuple2<String, String>> {
        @Override
        public void flatMap(String s, Collector<Tuple2<String, String>> collector) throws Exception {
            System.out.println(s);
            s = s + "ACA PODES JUGAR NDEAH";
            collector.collect(new Tuple2<String,String>("M",s));
        }
    }
}
EN

回答 1

Stack Overflow用户

发布于 2019-11-12 04:35:17

如果您希望将每个一分钟窗口的结果保存到它自己的文件中,那么可以考虑将StreamingFileSink与一分钟存储桶一起使用--这应该可以完成您正在寻找的任务,或者非常接近。

我认为您最终会为每个窗口创建一个目录,其中包含该窗口的每个并行实例中的一个文件--但是当您使用不并行操作的timeWindowAll时,每个存储桶将只有一个文件,除非结果太大而导致文件翻转。

顺便说一句,在FlatMap中进行JSON解析的性能会相当差,因为这将最终为每个事件实例化一个新的解析器,这反过来又会导致相当多的GC活动。最好使用RichFlatMap并在open()方法中创建一个解析器,您可以为每个事件重用该解析器。更好的是,使用JSONKeyValueDeserializationSchema而不是SimpleStringSchema,并让kafka连接器为您处理json解析。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58768492

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档