我刚开始学习Flink中的窗口函数。我有一个定制的源码来产生数字,我的目标是计算偶数和奇数的和。
下面是代码(Flink:1.12 Scala:2.11.8)。
object ProcessWindowFunc {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val source = env.addSource(new CustomSource())
source
.keyBy(x=>x%2)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // error message here
.reduce(_+_)
.print()
env.execute("sum")
}
}
class CustomSource extends SourceFunction[Int]{
var running = true
var count = 0
def run(ctx: SourceFunction.SourceContext[Int]) = {
while(running) {
ctx.collect(count)
count += 1
Thread.sleep(800)
}
}
override def cancel() = {
this.running = false
}
}构建失败,控制台输出如下所示。
type mismatch;
found: org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
required: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[_ >: Int, ?]
Note: Object <: Any (and org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows <: org.apache.flink.streaming.api.windowing.assigners.WindowAssigner[Object,org.apache.flink.streaming.api.windowing.windows.TimeWindow]),
but Java-defined class WindowAssigner is invariant in type T.
You may wish to investigate a wildcard type such as `_ <: Any`. (SLS 3.2.10)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))我不太理解这里的错误消息。我试着用Java实现同样的逻辑,它是有效的。因此,我猜测Java和Scala之间可能存在泛型冲突。但是我仍然不知道如何解决这个问题。
如有任何帮助和建议,我们将非常感谢!这个问题让我困惑了一整天。
发布于 2021-03-03 17:43:21
第一个猜测是你没有
import org.apache.flink.streaming.api.scala._https://stackoverflow.com/questions/66453216
复制相似问题