首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >TypedDataset隐式编码器及其在Scala中的类型界

TypedDataset隐式编码器及其在Scala中的类型界
EN

Stack Overflow用户
提问于 2019-04-11 16:35:02
回答 1查看 1.1K关注 0票数 1

我的目标是创建一个MyDataFrame类,它将知道如何在给定的路径上获取数据,但我希望提供类型安全性。我在使用对远程数据进行类型限制的frameless.TypedDataset时遇到了一些问题。例如

代码语言:javascript
复制
sealed trait Schema
final case class TableA(id: String) extends Schema
final case class TableB(id: String) extends Schema

class MyDataFrame[T <: Schema](path: String, implicit val spark: SparkSession) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
} 

但我一直在找could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]。我知道TypedDataset.create需要一个Injection才能工作。但我不知道如何为一个通用的T编写这个文件。我认为编译器可以推断出,由于Schema的所有子类型都是case classes,所以它可以工作。

有人碰到过这个吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-04-11 16:44:57

所有隐式参数都应该在最后一个参数列表中,并且这个参数列表应该与非隐式参数列表分开。

如果您试图编译

代码语言:javascript
复制
class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

你会看到错误

代码语言:javascript
复制
Error:(11, 35) could not find implicit value for evidence parameter of type frameless.TypedEncoder[org.apache.spark.sql.Row]
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]

因此,我们只需添加相应的隐式参数

代码语言:javascript
复制
class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

我们会有错误

代码语言:javascript
复制
Error:(11, 64) could not find implicit value for parameter as: frameless.ops.As[org.apache.spark.sql.Row,T]
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]

因此,让我们再添加一个隐式参数

代码语言:javascript
复制
import frameless.ops.As
import frameless.{TypedDataset, TypedEncoder}
import org.apache.spark.sql.{Row, SparkSession}

class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row], as: As[Row, T]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

或者用放映机

代码语言:javascript
复制
class MyDataFrame[T <: Schema : As[Row, ?]](path: String)(implicit spark: SparkSession, te: TypedEncoder[Row]) {
  def read = TypedDataset.create(spark.read.parquet(path)).as[T]
}

您可以创建自定义类型类。

代码语言:javascript
复制
  trait Helper[T] {
    implicit def te: TypedEncoder[Row]
    implicit def as: As[Row, T]
  }

  object Helper {
    implicit def mkHelper[T](implicit te0: TypedEncoder[Row], as0: As[Row, T]): Helper[T] = new Helper[T] {
      override implicit def te: TypedEncoder[Row] = te0
      override implicit def as: As[Row, T] = as0
    }
  }

  class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
    val h = implicitly[Helper[T]]
    import h._
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]
  }

代码语言:javascript
复制
  class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
    import h._
    def read = TypedDataset.create(spark.read.parquet(path)).as[T]
  }

代码语言:javascript
复制
  trait Helper[T] {
    def create(dataFrame: DataFrame): TypedDataset[T]
  }

  object Helper {
    implicit def mkHelper[T](implicit te: TypedEncoder[Row], as: As[Row, T]): Helper[T] =
      (dataFrame: DataFrame) => TypedDataset.create(dataFrame).as[T]
  }

  class MyDataFrame[T <: Schema : Helper](path: String)(implicit spark: SparkSession) {
    def read = implicitly[Helper[T]].create(spark.read.parquet(path))
  }

代码语言:javascript
复制
  class MyDataFrame[T <: Schema](path: String)(implicit spark: SparkSession, h: Helper[T]) {
    def read = h.create(spark.read.parquet(path))
  }

修正版:

代码语言:javascript
复制
import org.apache.spark.sql.Encoder
import frameless.{TypedDataset, TypedEncoder}

class MyDataFrame[T <: Schema](path: String)(implicit
  spark: SparkSession,
  e: Encoder[T],
  te: TypedEncoder[T]
) {
  def read: TypedDataset[T] = TypedDataset.create[T](spark.read.parquet(path).as[T])
}

或使用上下文边界

代码语言:javascript
复制
class MyDataFrame[T <: Schema : Encoder : TypedEncoder](path: String)(implicit
  spark: SparkSession
) {
  def read: TypedDataset[T] = TypedDataset.create[T](spark.read.parquet(path).as[T])
}

测试:我将一个json文件converted {"id": "xyz"}转换成拼花文件,然后

代码语言:javascript
复制
sealed trait Schema
final case class TableA(id: String) extends Schema
final case class TableB(id: String) extends Schema

import org.apache.spark.sql.SparkSession

implicit val spark: SparkSession = SparkSession.builder
  .master("local")
  .appName("Spark SQL basic example")
  .getOrCreate()

import spark.implicits._
import frameless.syntax._

val res: TypedDataset[TableA] = new MyDataFrame[TableA]("path/to/parquet/file").read
println(res) // [id: string]
res.foreach(println).run() // TableA(xyz)
票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55637342

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档