首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink CEP事件不触发

Flink CEP事件不触发
EN

Stack Overflow用户
提问于 2020-07-06 07:01:59
回答 1查看 402关注 0票数 0

我已经在Flink中实现了CEP模式,它正在按预期工作,连接到本地Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP并没有触发。

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    //saves checkpoint
    env.getCheckpointConfig().enableExternalizedCheckpoints(
            CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我在用AscendingTimestampExtractor,

代码语言:javascript
复制
consumer.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<ObjectNode>() {
      @Override
      public long extractAscendingTimestamp(ObjectNode objectNode) {
        long timestamp;
        Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
        timestamp = instant.toEpochMilli();
        return timestamp;
      }
    });

我还收到了警告信息,

增加时间戳提取器:140-时间戳单调违反: 1594017872227 < 1594017873133

我也试过使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks,没有一个在工作

我已经附加了Flink控制台屏幕截图,其中水印没有分配。更新的flink控制台屏幕截图

有人能帮忙吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-07-06 11:31:24

CEP必须首先对输入流进行排序,这是基于水印的。所以问题可能是水印,但你还没有向我们展示足够的原因来调试。一个常见的问题是有一个空闲源,它可以阻止水印的前进。

但还有其他可能的原因。为了调试这种情况,我建议您查看一些度量标准,无论是在Flink debug中还是在度量系统中(如果有连接的话)。首先,通过查看管道不同阶段的numRecordsInnumRecordsOutnumRecordsInPerSecondnumRecordsOutPerSecond,检查记录是否正在流动。

如果存在事件,那么查看工作中的不同任务中的currentOutputWatermark,以查看事件时间是否提前。

更新:

似乎您正在调用assignTimestampsAndWatermarks的卡夫卡消费者,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,这将阻碍整个水印。试着在源代码生成的assignTimestampsAndWatermarks上调用DataStream,看看是否修复了问题。(当然,如果没有每个分区的水印,您将无法使用AscendingTimestampExtractor,因为流不会有序。)

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62750834

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档