首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >将转换从配置单元sql查询移动到Spark

将转换从配置单元sql查询移动到Spark
EN

Stack Overflow用户
提问于 2016-08-22 17:20:20
回答 2查看 1.1K关注 0票数 0
代码语言:javascript
复制
val temp = sqlContext.sql(s"SELECT A, B, C, (CASE WHEN (D) in (1,2,3) THEN ((E)+0.000)/60 ELSE 0 END) AS Z from TEST.TEST_TABLE")
val temp1 = temp.map({ temp => ((temp.getShort(0), temp.getString(1)), (USAGE_TEMP.getDouble(2), USAGE_TEMP.getDouble(3)))})
.reduceByKey((x, y) => ((x._1+y._1),(x._2+y._2)))

我希望在scala中完成转换,而不是上面在hive层上进行计算(案例评估)的代码。我该怎么做呢?

在Map中填充数据时,是否可以执行相同的操作?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-08-22 17:46:43

代码语言:javascript
复制
val temp = sqlContext.sql(s"SELECT A, B, C, D, E from TEST.TEST_TABLE")

val tempTransform = temp.map(row => {
  val z = List[Double](1, 2, 3).contains(row.getDouble(3)) match {
    case true => row.getDouble(4) / 60
    case _ => 0
  }
  Row(row.getShort(0), Row.getString(1), Row.getDouble(2), z)
})

val temp1 = tempTransform.map({ temp => ((temp.getShort(0), temp.getString(1)), (USAGE_TEMP.getDouble(2), USAGE_TEMP.getDouble(3)))})
  .reduceByKey((x, y) => ((x._1+y._1),(x._2+y._2)))
票数 1
EN

Stack Overflow用户

发布于 2016-09-22 17:02:45

您也可以使用此语法

代码语言:javascript
复制
new_df = old_df.withColumn('target_column', udf(df.name))

由本example引用

代码语言:javascript
复制
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ // for `toDF` and $""
import org.apache.spark.sql.functions._ // for `when`

val df = sc.parallelize(Seq((4, "blah", 2), (2, "", 3), (56, "foo", 3), (100, null, 5)))
    .toDF("A", "B", "C")

val newDf = df.withColumn("D", when($"B".isNull or $"B" === "", 0).otherwise(1))

在您的示例中,执行数据帧形式的sql,如下面的val temp = sqlContext.sql(s"SELECT A, B, C, D, E from TEST.TEST_TABLE")

并将withColumn与case或when otherwise一起应用,或者在需要时应用spark udf

,调用scala函数逻辑而不是hiveudf

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/39075682

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档