首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星火SumProduct DataFrame

星火SumProduct DataFrame
EN

Stack Overflow用户
提问于 2015-11-10 19:56:12
回答 3查看 1.5K关注 0票数 0

我想从本质上创建一个跨越星火DataFrame中列的求和产品。我有一个看起来像这样的DataFrame:

代码语言:javascript
复制
id    val1   val2   val3   val4
123   10     5      7      5

我还有一张地图,看起来像:

代码语言:javascript
复制
val coefficents = Map("val1" -> 1, "val2" -> 2, "val3" -> 3, "val4" -> 4)

我想取DataFrame的每一列中的值,乘以映射中的相应值,然后在新列中返回结果,因此本质上是:

代码语言:javascript
复制
(10*1) + (5*2) + (7*3) + (5*4) = 61

我试过这个:

代码语言:javascript
复制
val myDF1 = myDF.withColumn("mySum", {var a:Double = 0.0; for ((k,v) <- coefficients) a + (col(k).cast(DoubleType)*coefficients(k));a})

但是得到一个错误,即"+“方法被重载了。即使我解决了这个问题,我也不确定这会成功。有什么想法吗?我总是可以以文本字符串的形式动态地构建SQL查询,并这样做,但我希望能有更好的效果。

任何想法都会受到赞赏。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2015-11-11 04:16:30

代码的问题是您试图向Column添加一个Doublecast(DoubleType)只影响存储值的类型,而不影响列本身的类型。因为Double没有提供*(x: org.apache.spark.sql.Column): org.apache.spark.sql.Column方法,所以一切都会失败。

例如,要使它发挥作用,您可以这样做:

代码语言:javascript
复制
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{col, lit}

val df = sc.parallelize(Seq(
    (123, 10, 5, 7, 5), (456,  1, 1, 1, 1)
)).toDF("k", "val1", "val2", "val3", "val4")

val coefficients = Map("val1" -> 1, "val2" -> 2, "val3" -> 3, "val4" -> 4)

val dotProduct: Column = coefficients
  // To be explicit you can replace
  // col(k) * v with col(k) * lit(v)
  // but it is not required here
  // since we use * f Column.* method not Int.*
  .map{ case (k, v) => col(k) * v }  // * -> Column.*
  .reduce(_ + _)  // + -> Column.+

df.withColumn("mySum", dotProduct).show
// +---+----+----+----+----+-----+
// |  k|val1|val2|val3|val4|mySum|
// +---+----+----+----+----+-----+
// |123|  10|   5|   7|   5|   61|
// |456|   1|   1|   1|   1|   10|
// +---+----+----+----+----+-----+
票数 2
EN

Stack Overflow用户

发布于 2015-11-10 20:55:53

看起来问题是你实际上并没有对a做任何事情

代码语言:javascript
复制
for((k, v) <- coefficients) a + ...

你可能是说a += ...

此外,关于清理withColumn调用中的代码块的一些建议:

您不需要调用coefficients(k),因为您已经从for((k,v) <- coefficients)获得了它在v中的价值

Scala很擅长做一行,但是如果你必须在一行中加分号,那就有点作弊了:P,我建议把和计算部分分解成一个表达式的一行。

和表达式可以重写为fold,避免使用var (惯用Scala通常避免var)。

代码语言:javascript
复制
import org.apache.spark.sql.functions.lit

coefficients.foldLeft(lit(0.0)){ 
  case (sumSoFar, (k,v)) => col(k).cast(DoubleType) * v + sumSoFar
}
票数 2
EN

Stack Overflow用户

发布于 2015-11-10 23:22:20

我不确定这在DataFrame API中是否可行,因为您只能使用列,而不能使用任何预定义的闭包(例如,参数映射)。

我在下面概述了一种使用DataFrame的底层RDD的方法:

代码语言:javascript
复制
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Initializing your input example.
val df1 = sc.parallelize(Seq((123, 10, 5, 7, 5))).toDF("id", "val1", "val2", "val3", "val4")

// Return column names as an array
val names = df1.columns

// Grab underlying RDD and zip elements with column names
val rdd1 = df1.rdd.map(row => (0 until row.length).map(row.getInt(_)).zip(names))

// Tack on accumulated total to the existing row
val rdd2 = rdd0.map { seq => Row.fromSeq(seq.map(_._1) :+ seq.map { case (value: Int, name: String) => value * coefficents.getOrElse(name, 0) }.sum) }

// Create output schema (with total)
val totalSchema = StructType(df1.schema.fields :+ StructField("total", IntegerType))

// Apply schema to create output dataframe
val df2 = sqlContext.createDataFrame(rdd1, totalSchema)

// Show output:
df2.show()
...
+---+----+----+----+----+-----+
| id|val1|val2|val3|val4|total|
+---+----+----+----+----+-----+
|123|  10|   5|   7|   5|   61|
+---+----+----+----+----+-----+
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/33638424

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档