我需要理解为什么eventStream.connect(otherStream).map(_ => Right(2), _ => Left("2"))不生成DataStream[Either[String, Int]],而是生成DataStream[Either[String, Int]] with Product with Serializable。我正在使用一些API,它确实接受DataStream[T],如果我传递给他们一个DataStream[T] with Product with Serializable,我就会得到一个编译时错误。有人能解释一下或者给我点提示吗?
我给你们举个例子:
class FlinkFoo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// Silly int source
val eventStream: DataStream[Int] = env.addSource((sc: SourceContext[Int]) => {
while (true) sc.collect(1)
})
// Silly String source
val otherStream: DataStream[String] = env.addSource((sc: SourceContext[String]) => {
while (true) sc.collect("1")
})
// I need to connect two stream and then flatten them
val connectedStream2: DataStream[Either[String, Int] with Product with Serializable] = eventStream.connect(otherStream).map(_ => Right(2), _ => Left("2"))
/* Compile time error !!!!
* found : org.apache.flink.streaming.api.scala.DataStream[Either[String,Int] with Product with Serializable]
* [error] required: org.apache.flink.streaming.api.scala.DataStream[Either[?,?]]
* [error] Note: Either[String,Int] with Product with Serializable <: Either[?,?], but class DataStream is invariant in type T.
* [error] You may wish to define T as +T instead. (SLS 4.5)
* [error] fooMethod(connectedStream2)
* [error] ^
**/
fooMethod(connectedStream2)
}
def fooMethod[T, P](dataStream: DataStream[Either[T, P]]): Unit = {
// do something
}
}发布于 2018-02-14 14:25:18
您可以尝试将Flink scala隐式序列化程序和TypeInformation添加到您的范围中,如下所示
import org.apache.flink.streaming.api.scala._上面导入的包对象调用TypeUtils 对象;它们为Either提供序列化程序和所需的类型信息,只要对许多其他实体都是如此。
在Flink泛型类型解析之后,您需要这些转换来解析Either类型,并且您可以显式地将返回类型添加到您的辅助中,以实现该转换。
val yourEitherStream: DataStream[Either[String, Int]] =
eventStream
.connect(otherStream)
.map(_ => Right(2), _ => Left("2"))with Product with Serializable混搭是Scala 2.11号的一种拒绝,由2.12来解决(但您不能将它与Flink 现在就来一起使用)。
https://stackoverflow.com/questions/48788998
复制相似问题