首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么flink无法从保存点恢复

为什么flink无法从保存点恢复
EN

Stack Overflow用户
提问于 2021-05-26 13:52:10
回答 1查看 184关注 0票数 0

版本flink 1.7

我正在尝试从一个保存点(或检查点)恢复flink作业,该作业所做的是从kafka ->读取数据,并对kafka执行30分钟窗口聚合(如计数器) ->接收器。

我使用rocksdb并启用了检查点。

现在我尝试手动触发一个保存点。每个聚合的期望值为30(每分钟1个数据)。但是,当我从保存点恢复( flink run -d -s {-s})时,聚合值不是30(小于30,这取决于我取消flink作业并恢复的时间)。当作业正常运行时,它会得到30。

我不知道为什么有些数据可能会丢失?

日志中显示"No restore state for FlinkKafkaConsumer“

主代码:

代码语言:javascript
复制
        source.flatMap(new FlatMapFunction<String, Model>() {
        private static final long serialVersionUID = 5814342517597371470L;

        @Override
        public void flatMap(String value, Collector<Model> out) throws Exception {
            LOGGER.info("----> catch value: " + value);
            Model model =  JSONObject.parseObject(value, Model.class);
            out.collect(model);
        }
    }).uid("flatmap-1").name("flatmap-1").assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Model>() {

        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public long extractTimestamp(Model element, long previousElementTimestamp) {
            return element.getTime();
        }

        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(Model lastElement, long extractedTimestamp) {
            return new Watermark(extractedTimestamp);
        }
    }).setParallelism(1).keyBy(Model::getDim).window(new DynamicWindowAssigner()).aggregate(new AggregateFunction<Model, Model, Model>() {
        @Override
        public Model createAccumulator() {
            return new Model();
        }

        @Override
        public Model add(Model value, Model accumulator) {
            init(value, accumulator);
            accumulator.setValue(accumulator.getValue() + 1);
            return accumulator;
        }

        @Override
        public Model getResult(Model accumulator) {
            return accumulator;
        }

        @Override
        public Model merge(Model a, Model b) {
            return null;
        }

        private void init(Model value, Model accumulator){
            if(accumulator.getTime() == 0L){
                accumulator.setValue(0);
                accumulator.setDim(value.getDim());
                accumulator.setTime(value.getTime());
            }
        }
    }).uid("agg-1").name("agg-1").map(new MapFunction<Model, String>() {
        private static final long serialVersionUID = -1742071229344039681L;

        @Override
        public String map(Model value) throws Exception {
            value.setTime(TimeWindow.getWindowStartWithOffset(value.getTime(), 0, TimeUnit.MINUTES.toMillis(30)));
            return JSONObject.toJSONString(value);
        }
    }).uid("flatmap-2").name("flatmap-2").setParallelism(4).addSink(metricProducer).uid("sink").name("sink").setParallelism(2);

检查点设置:

代码语言:javascript
复制
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    env.enableCheckpointing(60000);
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    env.getCheckpointConfig().setCheckpointTimeout(120000);
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(50000);
    StateBackend stateBackend = new RocksDBStateBackend(${path}, true);
    env.setStateBackend(stateBackend);
    env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.getConfig().disableSysoutLogging();
EN

回答 1

Stack Overflow用户

发布于 2021-05-28 19:02:41

最后,我发现我应该使用flink run -s {savepoint} -d xxx.jar instesad of flink run -d xxx.jar -s {savepoint},如果"-d“标志在"-s”标志之前,则flink忽略"-s“

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67699206

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档