首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink CEP中的Context#currentProcessingTime

Flink CEP中的Context#currentProcessingTime
EN

Stack Overflow用户
提问于 2019-12-29 04:11:45
回答 1查看 92关注 0票数 0

使用Flink CEP时,Context具有currentProcessingTime属性。使用System.currentTimeMillis()和这种方法有区别吗?这里有一个小实验,看起来这些值几乎是相等的(至少在这个简单的场景中是这样)。

代码语言:javascript
复制
import org.apache.flink.api.scala._

val env = StreamExecutionEnvironment.getExecutionEnvironment

val source = env.fromCollection((1 to 100000).map(_ => 1).toList)

val pattern = Pattern.begin[Int]("start")
    .where(new IterativeCondition[Int] {
      override def filter(value: Int, ctx: IterativeCondition.Context[Int]): Boolean = {
        ctx.currentProcessingTime == System.currentTimeMillis
      }
    })

CEP.pattern(source, pattern)
    .process[Int]((m, ctx, out) => m.get("start").forEach(out.collect(_)))
    .keyBy(_ => "")
    .reduce(_ + _)
    .print() // sometimes less than 100000

env.execute()

我能想到的唯一原因是currentProcessingTimestamp在许多机器/并行操作符实例中以某种方式保持一致?

EN

回答 1

Stack Overflow用户

发布于 2019-12-29 16:15:41

currentProcessingTime只是System.currentTimeMillis的一个包装器。它并没有做任何聪明的事情。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59514502

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档