mydata = event.result.NewDataSet.Table as ArrayCollection; mydata.filterFunction
;import org.apache.flink.api.common.JobExecutionResult;import org.apache.flink.api.common.functions.FilterFunction Keep only those rides and both start and end in NYC. */ public static class NYCFilter implements FilterFunction Keep only those rides and both start and end in NYC. */ public static class NYCFilter implements FilterFunction
dataStream.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction
+ described path Path string // handler处理函数 Function RouteFunction // 拦截器 Filters []FilterFunction string httpMethod string // required function RouteFunction // required filters []FilterFunction produces []string consumes []string pathParameters []*Parameter filters []FilterFunction WebService ServeMux *http.ServeMux isRegisteredOnRoot bool containerFilters []FilterFunction http.NewServeMux(), isRegisteredOnRoot: false, containerFilters: []FilterFunction
+ described path Path string // handler处理函数 Function RouteFunction // 拦截器 Filters []FilterFunction string httpMethod string // required function RouteFunction // required filters []FilterFunction produces []string consumes []string pathParameters []*Parameter filters []FilterFunction WebService ServeMux *http.ServeMux isRegisteredOnRoot bool containerFilters []FilterFunction http.NewServeMux(), isRegisteredOnRoot: false, containerFilters: []FilterFunction
java.util.ArrayList;import java.util.Arrays;import java.util.List;import org.apache.flink.api.common.functions.FilterFunction Integer, Integer>() {public Integer map(Integer value) throws Exception {return value + 1;}}).filter(new FilterFunction value > 5;}});sink.print();//1> 10//14> 7//16> 9//13> 6//2> 11//15> 8}// lambda实现public static void filterFunction2 > 5);sink.print();//12> 7//15> 10//11> 6//13> 8//14> 9//16> 11}// 查询user id大于3的记录public static void filterFunction3 throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();filterFunction3
+ described path Path string // handler处理函数 Function RouteFunction // 拦截器 Filters []FilterFunction string httpMethod string // required function RouteFunction // required filters []FilterFunction produces []string consumes []string pathParameters []*Parameter filters []FilterFunction WebService ServeMux *http.ServeMux isRegisteredOnRoot bool containerFilters []FilterFunction http.NewServeMux(), isRegisteredOnRoot: false, containerFilters: []FilterFunction
import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FilterFunction } } }); DataStream<String> filtedDS = wordsDS.filter(new FilterFunction import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.FilterFunction Transformation //下面的操作相当于将数据随机分配一下,有可能出现数据倾斜 DataStream<Long> filterDS = longDS.filter(new FilterFunction
filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。 WaterSensor("sensor_2", 2, 2), new WaterSensor("sensor_3", 3, 3) ); // 方式一:传入匿名类实现FilterFunction stream.filter(new FilterFunction<WaterSensor>() { @Override public boolean filter throws Exception { return e.id.equals("sensor_1"); } }).print(); // 方式二:传入FilterFunction stream.filter(new UserFilter()).print(); env.execute(); } public static class UserFilter implements FilterFunction
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FilterFunction // 对map后的数据流进行filter操作 DataStream<Integer> filteredStream = mappedStream.filter(new FilterFunction
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.FilterFunction // 对map后的数据流进行filter操作 DataStream<Integer> filteredStream = mappedStream.filter(new FilterFunction
com.javaedge.flink.basic; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction word.toLowerCase().trim()); } } }).filter(new FilterFunction
iterationBody = iteration.map (/*do something*/); DataStream<Long> feedback = iterationBody.filter(new FilterFunction value > 0; } }); iteration.closeWith(feedback); DataStream<Long> output = iterationBody.filter(new FilterFunction
1、函数类(Function Classes) Flink暴露了所有UDF函数的接口,具体实现方式为接口或者抽象类,例如MapFunction、FilterFunction、ReduceFunction 因此之前写过实现MapFunction、FilterFunction、ReduceFunction的自定义函数,且此类函数用处不大,这里不过多赘述。
.addSource(consumer); DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction
env .addSource(consumer); DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction
env.addSource(new VideoOrderSource()); DataStream<VideoOrder> filterDS = videoOrderDS.filter(new FilterFunction
integer) throws Exception { return integer * integer; } }) // Leave only even numbers .filter(new FilterFunction movieName, new HashSet<>(Arrays.asList(genres))); } }); DataSet<Movie> filteredMovies = movies.filter(new FilterFunction 现在,当我们有一个电影数据集时,我们可以实现算法的核心部分并过滤出所有的动作电影: DataSet<Movie> filteredMovies = movies.filter(new FilterFunction
创建指定范围内的数字型的DataSource: package com.bolingcavalry.api; import org.apache.flink.api.common.functions.FilterFunction dataStream = env.generateSequence(1, 10); //做一次过滤,只保留偶数,然后打印 dataStream.filter(new FilterFunction
filter 转换需要传入的参数需要实现 FilterFunction 接口,而FilterFunction 内要实现 filter()方法,就相当于一个返回布尔类型的条件表达式。 // -Filter-1 传入匿名类实现FilterFunction接口 stream.filter(new FilterFunction<Event>() { @Override public filter(Event e) throws Exception { return e.user.equals("Mary"); } }); // -Filter-2 传入FilterFunction 例如: MapFunction、FilterFunction、ReduceFunction 等。 } } } 匿名函数实现 // -1 函数类型 -匿名函数 SingleOutputStreamOperator<Event> data = stream01.filter(new FilterFunction