如果我的理解是错误的,我将向flink道歉,我正在构建一个数据流应用程序,并且该流包含多个数据流,这些数据流检查传入的DataStream中是否存在所需的字段。我的应用程序验证传入的数据,如果数据被成功验证,它应该将数据附加到给定的文件中,如果数据已经存在的话。我试图模拟在一个DataStream中是否发生任何异常,其他数据流不应该受到影响,因为我在其中一个流中显式抛出一个异常。在下面的示例中,为了简单起见,我使用windows文本文件来追加数据。
注意:我的流没有状态,因为我没有任何东西可以存储在状态
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
output2.addSink(new SinkFunction<String>() {
@Override
public void invoke(String value) throws Exception {
try {
File myObj = new File("C://flinkOutput//filename.txt");
if (myObj.createNewFile()) {
System.out.println("File created: " + myObj.getName());
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
} else {
System.out.println("File already exists.");
BufferedWriter out = new BufferedWriter(
new FileWriter("C://flinkOutput//filename.txt", true));
out.write(value);
out.close();
System.out.println("Successfully wrote to the file.");
}
} catch (IOException e) {
System.out.println("An error occurred.");
e.printStackTrace();
}
}
});
env.execute();
}以下我几乎没有疑问。
请帮助我消除我对检查点的疑虑,以及如何实现我在flink中读到的EXACTLY_ONCE机制,但我没有得到如何实现它的任何示例。
正如@Mikalai建议的那样,我在下面实现了StreamingSinkFunction
与StreamingSinkFunction
public class ExceptionTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// env.setParallelism(1);
//env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));
// to set minimum progress time to happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within 5000 ms, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(5000);
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // DELETE_ON_CANCELLATION
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
3, // number of restart attempts
Time.of(10, TimeUnit.SECONDS) // delay
));
DataStream<String> input1 = env.fromElements("hello");
DataStream<String> input2 = env.fromElements("hello");
DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
//out.collect(value.concat(" world"));
throw new Exception("=====================NO VALUE TO CHECK=================");
}
});
DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
out.collect(value.concat(" world"));
}
});
String outputPath = "C://flinkCheckpoint";
final StreamingFileSink<String> sink = StreamingFileSink
.forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1)
.build())
.build();
output2.addSink(sink);
});
env.execute();
}但是,当我检查检查点文件夹时,我可以看到它创建了四个正在进行中的部分文件,如下所示

有什么是我正在做的,因为它创建多部分文件吗?
发布于 2020-10-27 13:52:32
为了保证端到端精确地一次记录传递(除了精确地-一次状态语义),数据接收器需要参与检查点机制(以及数据源)。
如果要将数据写入文件,则可以使用StreamingFileSink,它向桶内的FileSystem文件发出输入元素。这与检查点机制集成在一起,提供精确的一次语义跳出框.
如果要实现自己的接收器,则接收器函数必须实现CheckpointedFunction接口,并正确实现在请求检查点快照并刷新当前应用程序状态时调用的snapshotState(FunctionSnapshotContext context)方法。此外,我建议在完成分布式检查点后通知CheckpointListener接口。
Flink已经提供了一个抽象的TwoPhaseCommitSinkFunction,它是所有打算精确实现一次语义的SinkFunction的推荐基类。它是通过在CheckpointedFunction和CheckpointListener之上实现两阶段提交算法来实现的。例如,您可以查看FlinkKafkaProducer.java源代码。
https://stackoverflow.com/questions/64553439
复制相似问题