使用Flink CEP时,Context具有currentProcessingTime属性。使用System.currentTimeMillis()和这种方法有区别吗?这里有一个小实验,看起来这些值几乎是相等的(至少在这个简单的场景中是这样)。
import org.apache.flink.api.scala._
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.fromCollection((1 to 100000).map(_ => 1).toList)
val pattern = Pattern.begin[Int]("start")
.where(new IterativeCondition[Int] {
override def filter(value: Int, ctx: IterativeCondition.Context[Int]): Boolean = {
ctx.currentProcessingTime == System.currentTimeMillis
}
})
CEP.pattern(source, pattern)
.process[Int]((m, ctx, out) => m.get("start").forEach(out.collect(_)))
.keyBy(_ => "")
.reduce(_ + _)
.print() // sometimes less than 100000
env.execute()我能想到的唯一原因是currentProcessingTimestamp在许多机器/并行操作符实例中以某种方式保持一致?
发布于 2019-12-29 16:15:41
currentProcessingTime只是System.currentTimeMillis的一个包装器。它并没有做任何聪明的事情。
https://stackoverflow.com/questions/59514502
复制相似问题