首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink检查点模式ExactlyOnce没有按预期工作

Flink检查点模式ExactlyOnce没有按预期工作
EN

Stack Overflow用户
提问于 2020-10-27 11:25:49
回答 1查看 415关注 0票数 0

如果我的理解是错误的,我将向flink道歉,我正在构建一个数据流应用程序,并且该流包含多个数据流,这些数据流检查传入的DataStream中是否存在所需的字段。我的应用程序验证传入的数据,如果数据被成功验证,它应该将数据附加到给定的文件中,如果数据已经存在的话。我试图模拟在一个DataStream中是否发生任何异常,其他数据流不应该受到影响,因为我在其中一个流中显式抛出一个异常。在下面的示例中,为了简单起见,我使用windows文本文件来追加数据。

注意:我的流没有状态,因为我没有任何东西可以存储在状态

代码语言:javascript
复制
public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));

        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within 5000 ms, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(5000);

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  
        
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts
                Time.of(10, TimeUnit.SECONDS) // delay
        ));

        DataStream<String> input1 = env.fromElements("hello");
        
        DataStream<String> input2 = env.fromElements("hello");


        DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });


        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });

       output2.addSink(new SinkFunction<String>() {
           @Override
           public void invoke(String value) throws Exception {
               try {
                File myObj = new File("C://flinkOutput//filename.txt");
                if (myObj.createNewFile()) {
                    System.out.println("File created: " + myObj.getName());
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt", true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                } else {
                    System.out.println("File already exists.");
                    BufferedWriter out = new BufferedWriter(
                            new FileWriter("C://flinkOutput//filename.txt", true));
                    out.write(value);
                    out.close();
                    System.out.println("Successfully wrote to the file.");
                }
            } catch (IOException e) {
                System.out.println("An error occurred.");
                e.printStackTrace();
            }
           }
       });

        env.execute();

    }

以下我几乎没有疑问。

  1. 当我在output1流中抛出异常时,即使在遇到异常并将数据写入本地文件之后,第二个流output2仍在运行,但当我检查该文件时,输出如下 你好世界你好世界
  2. 根据我从flink文档中得到的理解,如果我使用检查点模式作为EXACTLY_ONCE,它不应该将数据写入文件的时间不超过一次,因为这个过程已经完成,并且将数据写入文件。但这在我的情况下不会发生,如果我做错了什么,我也不会得到。

请帮助我消除我对检查点的疑虑,以及如何实现我在flink中读到的EXACTLY_ONCE机制,但我没有得到如何实现它的任何示例。

正如@Mikalai建议的那样,我在下面实现了StreamingSinkFunction

与StreamingSinkFunction

代码语言:javascript
复制
public class ExceptionTest {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // start a checkpoint every 1000 ms
        env.enableCheckpointing(1000);

       // env.setParallelism(1);

        //env.setStateBackend(new RocksDBStateBackend("file:///C://flinkCheckpoint", true));

        // to set minimum progress time to happen between checkpoints
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // checkpoints have to complete within 5000 ms, or are discarded
        env.getCheckpointConfig().setCheckpointTimeout(5000);

        // set mode to exactly-once (this is the default)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  
        
        // allow only one checkpoint to be in progress at the same time
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // enable externalized checkpoints which are retained after job cancellation
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);  // DELETE_ON_CANCELLATION

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // number of restart attempts
                Time.of(10, TimeUnit.SECONDS) // delay
        ));

        DataStream<String> input1 = env.fromElements("hello");
        
        DataStream<String> input2 = env.fromElements("hello");


        DataStream<String> output1 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                //out.collect(value.concat(" world"));
                throw new Exception("=====================NO VALUE TO CHECK=================");
            }
        });
        
        
        DataStream<String> output2 = input.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                out.collect(value.concat(" world"));
            }
        });
        
        
        String outputPath = "C://flinkCheckpoint";

        final StreamingFileSink<String> sink = StreamingFileSink
                .forRowFormat(new Path(outputPath), new SimpleStringEncoder<String>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1)
                                .build())
                .build();
                
        
        output2.addSink(sink);

       
       });

        env.execute();

    }

但是,当我检查检查点文件夹时,我可以看到它创建了四个正在进行中的部分文件,如下所示

有什么是我正在做的,因为它创建多部分文件吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-10-27 13:52:32

为了保证端到端精确地一次记录传递(除了精确地-一次状态语义),数据接收器需要参与检查点机制(以及数据源)。

如果要将数据写入文件,则可以使用StreamingFileSink,它向桶内的FileSystem文件发出输入元素。这与检查点机制集成在一起,提供精确的一次语义跳出框.

如果要实现自己的接收器,则接收器函数必须实现CheckpointedFunction接口,并正确实现在请求检查点快照并刷新当前应用程序状态时调用的snapshotState(FunctionSnapshotContext context)方法。此外,我建议在完成分布式检查点后通知CheckpointListener接口。

Flink已经提供了一个抽象的TwoPhaseCommitSinkFunction,它是所有打算精确实现一次语义的SinkFunction的推荐基类。它是通过在CheckpointedFunctionCheckpointListener之上实现两阶段提交算法来实现的。例如,您可以查看FlinkKafkaProducer.java源代码。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/64553439

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档