我已经在Flink中实现了CEP模式,它正在按预期工作,连接到本地Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP并没有触发。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//saves checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);我在用AscendingTimestampExtractor,
consumer.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<ObjectNode>() {
@Override
public long extractAscendingTimestamp(ObjectNode objectNode) {
long timestamp;
Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
timestamp = instant.toEpochMilli();
return timestamp;
}
});我还收到了警告信息,
增加时间戳提取器:140-时间戳单调违反: 1594017872227 < 1594017873133
我也试过使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks,没有一个在工作
我已经附加了Flink控制台屏幕截图,其中水印没有分配。更新的flink控制台屏幕截图
有人能帮忙吗?
发布于 2020-07-06 11:31:24
CEP必须首先对输入流进行排序,这是基于水印的。所以问题可能是水印,但你还没有向我们展示足够的原因来调试。一个常见的问题是有一个空闲源,它可以阻止水印的前进。
但还有其他可能的原因。为了调试这种情况,我建议您查看一些度量标准,无论是在Flink debug中还是在度量系统中(如果有连接的话)。首先,通过查看管道不同阶段的numRecordsIn、numRecordsOut或numRecordsInPerSecond和numRecordsOutPerSecond,检查记录是否正在流动。
如果存在事件,那么查看工作中的不同任务中的currentOutputWatermark,以查看事件时间是否提前。
更新:
似乎您正在调用assignTimestampsAndWatermarks的卡夫卡消费者,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,该分区将不会产生任何水印,这将阻碍整个水印。试着在源代码生成的assignTimestampsAndWatermarks上调用DataStream,看看是否修复了问题。(当然,如果没有每个分区的水印,您将无法使用AscendingTimestampExtractor,因为流不会有序。)
https://stackoverflow.com/questions/62750834
复制相似问题