首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark是否支持melt和dcast

Spark是否支持melt和dcast
EN

Stack Overflow用户
提问于 2016-04-07 20:16:27
回答 3查看 3.7K关注 0票数 4

我们使用melt和dcast将数据从宽->长和长->宽格式转换。有关更多详细信息,请参阅http://seananderson.ca/2013/10/19/reshape.html

scala或SparkR都可以。

我已经经历了blogscala functionsR API。我看不到有类似功能的函数。

Spark中有没有等价的函数?如果没有,在Spark中有没有其他的方法呢?

EN

回答 3

Stack Overflow用户

发布于 2016-05-10 17:24:24

Reshaping Data with Pivot in Spark支持使用pivot进行整形。我知道melt大致与pivot相反,也被称为unpivot。我对Spark比较陌生。根据我的知识,我尝试实现了熔化操作。

代码语言:javascript
复制
    def melt(df: DataFrame, columns: List[String]): DataFrame ={

    val restOfTheColumns =  df.columns.filterNot(columns.contains(_))
    val baseDF = df.select(columns.head, columns.tail: _*)
    val newStructure =StructType(baseDF.schema.fields ++ List(StructField("variable", StringType, true), StructField("value", StringType, true)))
    var newdf  = sqlContext.createDataFrame(sqlContext.sparkContext.emptyRDD[Row], newStructure)

    for(variableCol <- restOfTheColumns){
      val colValues = df.select(variableCol).map(r=> r(0).toString)
      val colRdd=baseDF.rdd.zip(colValues).map(tuple => Row.fromSeq(tuple._1.toSeq.:+(variableCol).:+(tuple._2.toString)))
      var colDF =sqlContext.createDataFrame(colRdd, newStructure)
      newdf =newdf.unionAll(colDF)
    }
    newdf
  }

它起作用了。但我对效率不是很确定。

代码语言:javascript
复制
+-----+---+---+----------+------+
| name|sex|age|    street|weight|
+-----+---+---+----------+------+
|Alice|  f| 34| somewhere|    70|
|  Bob|  m| 63|   nowhere|   -70|
|Alice|  f|612|nextstreet|    23|
|  Bob|  m|612|      moon|     8|
+-----+---+---+----------+------+

可用作

代码语言:javascript
复制
melt(df, List("name", "sex"))

结果如下:

代码语言:javascript
复制
+-----+---+--------+----------+
| name|sex|variable|     value|
+-----+---+--------+----------+
|Alice|  f|     age|        34|
|  Bob|  m|     age|        63|
|Alice|  f|     age|       612|
|  Bob|  m|     age|       612|
|Alice|  f|  street| somewhere|
|  Bob|  m|  street|   nowhere|
|Alice|  f|  street|nextstreet|
|  Bob|  m|  street|      moon|
|Alice|  f|  weight|        70|
|  Bob|  m|  weight|       -70|
|Alice|  f|  weight|        23|
|  Bob|  m|  weight|         8|
+-----+---+--------+----------+

我希望它是有用的,并感谢您的意见,如果有改进的空间。

票数 10
EN

Stack Overflow用户

发布于 2016-10-24 07:56:37

这是一个只使用数据集操作的spark.ml.Transformer (没有RDD的东西)

代码语言:javascript
复制
case class Melt(meltColumns: String*) extends Transformer{

  override def transform(in: Dataset[_]): DataFrame = {
    val nonMeltColumns =  in.columns.filterNot{ meltColumns.contains }
    val newDS = in
      .select(nonMeltColumns.head,meltColumns:_*)
      .withColumn("variable", functions.lit(nonMeltColumns.head))
      .withColumnRenamed(nonMeltColumns.head,"value")

    nonMeltColumns.tail
      .foldLeft(newDS){ case (acc,col) =>
        in
          .select(col,meltColumns:_*)
          .withColumn("variable", functions.lit(col))
          .withColumnRenamed(col,"value")
          .union(acc)
      }
      .select(meltColumns.head,meltColumns.tail ++ List("variable","value") : _*)
  }

  override def copy(extra: ParamMap): Transformer = defaultCopy(extra)

  @DeveloperApi
  override def transformSchema(schema: StructType): StructType = ???

  override val uid: String = Identifiable.randomUID("Melt")
}

下面是一个使用它的测试

代码语言:javascript
复制
"spark" should "melt a dataset" in {
    import spark.implicits._
    val schema = StructType(
      List(StructField("Melt1",StringType),StructField("Melt2",StringType)) ++
      Range(3,10).map{ i => StructField("name_"+i,DoubleType)}.toList)

    val ds = Range(1,11)
      .map{ i => Row("a" :: "b" :: Range(3,10).map{ j => Math.random() }.toList :_ *)}
      .|>{ rows => spark.sparkContext.parallelize(rows) }
      .|>{ rdd => spark.createDataFrame(rdd,schema) }

    val newDF = ds.transform{ df =>
      Melt("Melt1","Melt2").transform(df) }

    assert(newDF.count() === 70)
  }

.|>是scalaZ管道运算符

票数 0
EN

Stack Overflow用户

发布于 2017-04-24 15:47:40

Spark DataFrame具有提供R melt功能的explode方法。在Spark 1.6.1中工作的示例:

代码语言:javascript
复制
// input df has columns (anyDim, n1, n2)
case class MNV(measureName: String, measureValue: Integer);
val dfExploded = df.explode(col("n1"), col("n2")) {
  case Row(n1: Int, n2: Int) =>
  Array(MNV("n1", n1), MNV("n2", n2))
}
// dfExploded has columns (anyDim, n1, n2, measureName, measureValue)
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36475914

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档