我一直在使用Apache编写一个原型应用程序。在此过程中,我选择将org.apache.flink.streaming.api.functions.windowing.WindowFunction用于特定的用例。但是,在编写application ()函数的主体时,我遇到了这个错误(下面的代码不是来自我正在编写的应用程序--我的数据类型不同--它与Flink文档站点中提供的示例代码不同):
import scala.collection.Iterable
import scala.collection.Map
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.{TimeWindow}
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._
class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window $window count: $count")
}
}编译器正在抱怨:
Error:(16, 7) class MyWindowFunction needs to be abstract, since method apply in trait WindowFunction of type
(x$1: String, x$2: org.apache.flink.streaming.api.windowing.windows.TimeWindow,
x$3: Iterable[(String, Long)],
x$4: org.apache.flink.util.Collector[String])Unit is not defined
class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {我检查了apply()中参数的顺序;它们似乎是正确的。
由于某些原因,我找不到错误的确切来源。有人能帮我找到解决办法吗?
发布于 2016-09-09 07:54:03
我发现了这个错误的原因。
我不清楚的是,Apache的API期望的是一个java.lang.Iterable,而不是它的Scala对应的:
class MyWindowFunction extends
WindowFunction[(String, Long), String, String, TimeWindow] {
override
def apply(
key: String,
w: TimeWindow,
iterable: Iterable[(String, Long)], // from java.lang.Iterable
collector: Collector[String]): Unit = {
// ....
}
}因此,我必须适当地导入:
import java.lang.Iterable // From Java
import java.util.Map // From Java
import org.apache.flink.streaming.api.functions.windowing.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import scala.collection.JavaConversions._ // Implicit conversions
class MyWindowFunction
extends WindowFunction[(String, Long), String, String, TimeWindow] {
override
def apply(
key: String,
w: TimeWindow,
iterable: Iterable[(String, Long)],
collector: Collector[String]): Unit = {
// ....
}
}一切都很好!
https://stackoverflow.com/questions/39405724
复制相似问题