首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么CoMap返回“带有可序列化产品的DataStream”而不是只返回DataStream?

为什么CoMap返回“带有可序列化产品的DataStream”而不是只返回DataStream?
EN

Stack Overflow用户
提问于 2018-02-14 13:59:00
回答 1查看 282关注 0票数 1

我需要理解为什么eventStream.connect(otherStream).map(_ => Right(2), _ => Left("2"))不生成DataStream[Either[String, Int]],而是生成DataStream[Either[String, Int]] with Product with Serializable。我正在使用一些API,它确实接受DataStream[T],如果我传递给他们一个DataStream[T] with Product with Serializable,我就会得到一个编译时错误。有人能解释一下或者给我点提示吗?

我给你们举个例子:

代码语言:javascript
复制
class FlinkFoo {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // Silly int source
    val eventStream: DataStream[Int] = env.addSource((sc: SourceContext[Int]) => {
      while (true) sc.collect(1)
    })

    // Silly String source
    val otherStream: DataStream[String] = env.addSource((sc: SourceContext[String]) => {
      while (true) sc.collect("1")
    })

    // I need to connect two stream and then flatten them
    val connectedStream2: DataStream[Either[String, Int] with Product with Serializable] = eventStream.connect(otherStream).map(_ => Right(2), _ => Left("2"))

    /* Compile time error !!!!
     * found   : org.apache.flink.streaming.api.scala.DataStream[Either[String,Int] with Product with Serializable]
     * [error]  required: org.apache.flink.streaming.api.scala.DataStream[Either[?,?]]
     * [error] Note: Either[String,Int] with Product with Serializable <: Either[?,?], but class DataStream is invariant in type T.
     * [error] You may wish to define T as +T instead. (SLS 4.5)
     * [error]     fooMethod(connectedStream2)
     * [error]               ^
     **/
    fooMethod(connectedStream2)
  }

  def fooMethod[T, P](dataStream: DataStream[Either[T, P]]): Unit = {
    // do something
  }
}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-14 14:25:18

您可以尝试将Flink scala隐式序列化程序和TypeInformation添加到您的范围中,如下所示

代码语言:javascript
复制
import org.apache.flink.streaming.api.scala._

上面导入的包对象调用TypeUtils 对象;它们为Either提供序列化程序和所需的类型信息,只要对许多其他实体都是如此。

在Flink泛型类型解析之后,您需要这些转换来解析Either类型,并且您可以显式地将返回类型添加到您的辅助中,以实现该转换。

代码语言:javascript
复制
val yourEitherStream: DataStream[Either[String, Int]] =
  eventStream
    .connect(otherStream)
    .map(_ => Right(2), _ => Left("2"))

with Product with Serializable混搭是Scala 2.11号的一种拒绝,由2.12来解决(但您不能将它与Flink 现在就来一起使用)。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48788998

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档