首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >DataSet/DataStream类接口

DataSet/DataStream类接口
EN

Stack Overflow用户
提问于 2020-04-13 17:52:18
回答 1查看 371关注 0票数 2

我只是尝试在Flink中使用Scala类型的类。我定义了以下类型的类接口:

代码语言:javascript
复制
trait LikeEvent[T] {
    def timestamp(payload: T): Int
}

现在,我想考虑一下这样的DataSet of LikeEvent[_]

代码语言:javascript
复制
// existing classes that need to be adapted/normalized (without touching them)
case class Log(ts: Int, severity: Int, message: String)
case class Metric(ts: Int, name: String, value: Double)

// create instances for the raw events
object EventInstance {

    implicit val logEvent = new LikeEvent[Log] {
        def timestamp(log: Log): Int = log.ts
    }

    implicit val metricEvent = new LikeEvent[Metric] {
        def timestamp(metric: Metric): Int = metric.ts
    }
}

// add ops to the raw event classes (regular class)
object EventSyntax {

    implicit class Event[T: LikeEvent](val payload: T) {
        val le = implicitly[LikeEvent[T]]
        def timestamp: Int = le.timestamp(payload)
    }
}

下面的应用程序运行得很好:

代码语言:javascript
复制
// set up the execution environment
val env = ExecutionEnvironment.getExecutionEnvironment

// underlying (raw) events
val events: DataSet[Event[_]] = env.fromElements(
  Metric(1586736000, "cpu_usage", 0.2),
  Log(1586736005, 1, "invalid login"),
  Log(1586736010, 1, "invalid login"),
  Log(1586736015, 1, "invalid login"),
  Log(1586736030, 2, "valid login"),
  Metric(1586736060, "cpu_usage", 0.8),
  Log(1586736120, 0, "end of world"),
)

// count events per hour
val eventsPerHour = events
  .map(new GetMinuteEventTuple())
  .groupBy(0).reduceGroup { g =>
    val gl = g.toList
    val (hour, count) = (gl.head._1, gl.size)
    (hour, count)
  }

eventsPerHour.print()

打印预期输出

代码语言:javascript
复制
(0,5)
(1,1)
(2,1)

但是,如果我像这样修改语法对象:

代码语言:javascript
复制
// couldn't make it work with Flink!
// add ops to the raw event classes (case class)
object EventSyntax2 {

  case class Event[T: LikeEvent](payload: T) {
    val le = implicitly[LikeEvent[T]]
    def timestamp: Int = le.timestamp(payload)
  }

  implicit def fromPayload[T: LikeEvent](payload: T): Event[T] = Event(payload)  
}

我得到以下错误:

代码语言:javascript
复制
type mismatch;
found   : org.apache.flink.api.scala.DataSet[Product with Serializable]
required: org.apache.flink.api.scala.DataSet[com.salvalcantara.fp.EventSyntax2.Event[_]]

因此,在消息的指导下,我做了以下更改:

代码语言:javascript
复制
val events: DataSet[Event[_]] = env.fromElements[Event[_]](...)

在此之后,错误更改为:

代码语言:javascript
复制
could not find implicit value for evidence parameter of type org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax2.Event[_]]

我不明白为什么EventSyntax2会导致这些错误,而EventSyntax则编译和运行良好。为什么在EventSyntax2中使用case类包装器比在EventSyntax中使用常规类更有问题?

总之,我的问题有两个:

如何用EventSyntax2

  • What解决我的问题将是实现我的目标的最简单的方法?在这里,我只是为了学习而尝试类型类模式,但在我看来,一种更面向对象的方法(基于子类型)看起来更简单。就像这样:

代码语言:javascript
复制
// Define trait
trait Event {
    def timestamp: Int
    def payload: Product with Serializable // Any case class
}

// Metric adapter (similar for Log)
object MetricAdapter {

    implicit class MetricEvent(val payload: Metric) extends Event {
        def timestamp: Int = payload.ts
    }
}

然后简单地使用val events: DataSet[Event] = env.fromElements(...)

List 注意到 提出了类似的问题,但它考虑的是一个简单的Scala ,而不是Flink (或 DataStream**). )。我问题的重点是使用Flink中的类型类模式,以某种方式考虑 streams/datasets,以及它是否真的有意义,或者在这种情况下应该明确地支持常规特性,并继承上面概述的特性。

顺便说一下,您可以在这里找到代码:https://github.com/salvalcantara/flink-events-and-polymorphism

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-04-18 15:15:43

简短的回答: Flink不能在scala中导出通配符类型的TypeInformation

很长的答案:你的两个问题都在问:什么是TypeInformation,它是如何使用的,以及它是如何派生出来的。

TypeInformation是Flink的内部类型系统,当数据在网络中被洗牌并存储在一个状态后端(当使用DataStream api时)时,它使用它来序列化数据。

序列化是数据处理中的一个主要性能问题,因此Flink包含针对常见数据类型和模式的专用序列化程序。在Java堆栈中,它支持所有JVM原语、Pojo、Flink元组、一些常见的集合类型和avro。类的类型是使用反射来确定的,如果它与已知类型不匹配,它将返回给Kryo。

在scala中,类型信息是使用is派生的。scala DataSet和DataStream api上的所有方法都为隐式类型类注释了它们的泛型参数。

代码语言:javascript
复制
def map[T: TypeInformation] 

TypeInformation可以像任何类型类一样手动提供,也可以使用从flink导入的宏派生。

代码语言:javascript
复制
import org.apache.flink.api.scala._

这个宏通过对scala元组、scala案例类和一些常见scala库类型的支持来装饰java类型堆栈。我之所以说装饰器,是因为如果您的类不是这些类型之一,那么它可以并且将返回到java堆栈。

那么,为什么第1版能工作呢?

因为它是类型堆栈无法匹配的普通类,因此它将其解析为泛型类型,并返回基于kryo的序列化程序。您可以从控制台测试它,并看到它返回一个泛型类型。

代码语言:javascript
复制
> scala> implicitly[TypeInformation[EventSyntax.Event[_]]]
res2: org.apache.flink.api.common.typeinfo.TypeInformation[com.salvalcantara.fp.EventSyntax.Event[_]] = GenericType<com.salvalcantara.fp.EventSyntax.Event>

版本2无法工作,因为它将类型识别为case类,然后用于递归地为每个成员派生TypeInformation实例。这对于通配符类型是不可能的,通配符类型与Any不同,因此派生失败。

通常,您不应该在异构类型中使用Flink,因为它将无法为您的工作负载派生高效的序列化器。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61193662

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档