
在vscode中使用git方法
git clone https://github.com/apache/flink-training.git这个和maven是类似的,但是更新,实现的功能更多也更简洁
./gradlew test shadowJar每个项目里有exercise和solution,solution是已经实现的方法。
package org.apache.flink.training.solutions.ridecleansing;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
import org.apache.flink.training.exercises.common.utils.GeoUtils;
/**
* Solution to the Ride Cleansing exercise from the Flink training.
*
* <p>The task of this exercise is to filter a data stream of taxi ride records to keep only rides
* that both start and end within New York City. The resulting stream should be printed.
*/
public class RideCleansingSolution {
private final SourceFunction<TaxiRide> source;
private final SinkFunction<TaxiRide> sink;
/** Creates a job using the source and sink provided. */
public RideCleansingSolution(SourceFunction<TaxiRide> source, SinkFunction<TaxiRide> sink) {
this.source = source;
this.sink = sink;
}
/**
* Main method.
*
* @throws Exception which occurs during job execution.
*/
public static void main(String[] args) throws Exception {
RideCleansingSolution job =
new RideCleansingSolution(new TaxiRideGenerator(), new PrintSinkFunction<>());
job.execute();
}
/**
* Creates and executes the long rides pipeline.
*
* @return {JobExecutionResult}
* @throws Exception which occurs during job execution.
*/
public JobExecutionResult execute() throws Exception {
// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set up the pipeline
env.addSource(source).filter(new NYCFilter()).addSink(sink);
// run the pipeline and return the result
return env.execute("Taxi Ride Cleansing");
}
/** Keep only those rides and both start and end in NYC. */
public static class NYCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) {
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
&& GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
}
}从代码中可以看到,source 和 sink已经实现,框架也已经实现好,需要做的只是实现fileter方法
也就是这里的内容
/** Keep only those rides and both start and end in NYC. */
public static class NYCFilter implements FilterFunction<TaxiRide> {
@Override
public boolean filter(TaxiRide taxiRide) {
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat)
&& GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
}
}原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。
原创声明:本文系作者授权腾讯云开发者社区发表,未经许可,不得转载。
如有侵权,请联系 cloudcommunity@tencent.com 删除。