首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >星火数据帧将行值转换为列名

星火数据帧将行值转换为列名
EN

Stack Overflow用户
提问于 2019-08-30 13:15:22
回答 2查看 2.5K关注 0票数 3

使用,我需要通过用户id将行值转换为列和分区,并创建csv文件。

代码语言:javascript
复制
val someDF = Seq(
  ("user1", "math","algebra-1","90"),
  ("user1", "physics","gravity","70"),
  ("user3", "biology","health","50"),
  ("user2", "biology","health","100"),
  ("user1", "math","algebra-1","40"),
  ("user2", "physics","gravity-2","20")
).toDF("user_id", "course_id","lesson_name","score")

someDF.show(false)

+-------+---------+-----------+-----+
|user_id|course_id|lesson_name|score|
+-------+---------+-----------+-----+
|  user1|     math|  algebra-1|   90|
|  user1|  physics|    gravity|   70|
|  user3|  biology|     health|   50|
|  user2|  biology|     health|  100|
|  user1|     math|  algebra-1|   40|
|  user2|  physics|  gravity-2|   20|
+-------+---------+-----------+-----+

val result = someDF.groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

result.show(false)

+-------+---------+---------+-------+---------+------+
|user_id|course_id|algebra-1|gravity|gravity-2|health|
+-------+---------+---------+-------+---------+------+
|  user3|  biology|     null|   null|     null|    50|
|  user1|     math|       90|   null|     null|  null|
|  user2|  biology|     null|   null|     null|   100|
|  user2|  physics|     null|   null|       20|  null|
|  user1|  physics|     null|     70|     null|  null|
+-------+---------+---------+-------+---------+------+

使用上面的代码,我能够将行值(Lesson_name)转换为列名。但我需要在course_wise中用csv保存out

在csv中预期的应该如下所示。

代码语言:javascript
复制
biology.csv // Expected Output

+-------+---------+------+
|user_id|course_id|health|
+-------+---------+------+
|  user3|  biology|  50  |
|  user2|  biology| 100  |
+-------+---------+-------

physics.csv // Expected Output

+-------+---------+---------+-------
|user_id|course_id|gravity-2|gravity|
+-------+---------+---------+-------+
|  user2|  physics|  50     |  null |
|  user1|  physics| 100     |  70   | 
+-------+---------+---------+-------+

**注: csv中的每门课程只应包含特定的课程名称,而不应包含任何与课程名称无关的课程名称。

实际上,在csv中,我可以在下面格式化**

代码语言:javascript
复制
result.write
  .partitionBy("course_id")
  .mode("overwrite")
  .format("com.databricks.spark.csv")
  .option("header", "true")
  .save(somepath)

例:

代码语言:javascript
复制
biology.csv // Wrong output, Due to it is containing non-relevant course lesson's(algebra-1,gravity-2,algebra-1)
+-------+---------+---------+-------+---------+------+
|user_id|course_id|algebra-1|gravity|gravity-2|health|
+-------+---------+---------+-------+---------+------+
|  user3|  biology|     null|   null|     null|    50|
|  user2|  biology|     null|   null|     null|   100|
+-------+---------+---------+-------+---------+------+

任何人都可以帮助解决这个问题吗?

EN

回答 2

Stack Overflow用户

发布于 2019-08-30 13:57:42

只需按路线过滤,然后再转向:

代码语言:javascript
复制
val result = someDF.filter($"course_id" === "physics").groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

+-------+---------+-------+---------+
|user_id|course_id|gravity|gravity-2|
+-------+---------+-------+---------+
|user2  |physics  |null   |20       |
|user1  |physics  |70     |null     |

+

票数 1
EN

Stack Overflow用户

发布于 2019-09-01 09:34:29

我假设您的意思是希望通过course_id将数据保存到单独的目录中。你可以用这个方法。

代码语言:javascript
复制
scala> val someDF = Seq(
("user1", "math","algebra-1","90"),
("user1", "physics","gravity","70"),
("user3", "biology","health","50"),
("user2", "biology","health","100"),
("user1", "math","algebra-1","40"),
("user2", "physics","gravity-2","20")
).toDF("user_id", "course_id","lesson_name","score")


scala> val result = someDF.groupBy("user_id", "course_id").pivot("lesson_name").agg(first("score"))

scala>     val eventNames = result.select($"course_id").distinct().collect() 
var eventlist =eventNames.map(x => x(0).toString)



for (eventName <- eventlist) {
val course = result.where($"course_id" === lit(eventName))
//remove null column

val row = course
.select(course.columns.map(c => when(col(c).isNull, 0).otherwise(1).as(c)): _*)
.groupBy().max(course.columns.map(c => c): _*)
.first

val colKeep = row.getValuesMap[Int](row.schema.fieldNames)
.map{c => if (c._2 == 1) Some(c._1) else None }
.flatten.toArray


var final_df = course.select(row.schema.fieldNames.intersect(colKeep)
.map(c => col(c.drop(4).dropRight(1))): _*)


final_df.show()

final_df.coalesce(1).write.mode("overwrite").format("csv").save(s"${eventName}")
}


+-------+---------+------+
|user_id|course_id|health|
+-------+---------+------+
|  user3|  biology|    50|
|  user2|  biology|   100|
+-------+---------+------+

+-------+---------+-------+---------+
|user_id|course_id|gravity|gravity-2|
+-------+---------+-------+---------+
|  user2|  physics|   null|       20|
|  user1|  physics|     70|     null|
+-------+---------+-------+---------+

+-------+---------+---------+
|user_id|course_id|algebra-1|
+-------+---------+---------+
|  user1|     math|       90|
+-------+---------+---------+

如果它解决了您的目的,请接受answer.HAppy Hadoop。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57727480

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档