首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark SQL freeze

Spark SQL freeze
EN

Stack Overflow用户
提问于 2018-08-10 21:37:45
回答 1查看 719关注 0票数 1

我对Spark SQL有个问题。我从csv文件中读取了一些数据。接下来,我执行groupBy和join操作,完成的任务是将连接的数据写入文件。我的问题是时间差距(我在下面的日志中用空间显示了这一点)。

代码语言:javascript
复制
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1069
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1003
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 965
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1073
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1038
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 900
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 903
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 938
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on 10.4.110.24:36423 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO storage.BlockManagerInfo: Removed broadcast_84_piece0 on omm104.in.nawras.com.om:43133 in memory (size: 32.8 KB, free: 4.1 GB)
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 969
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1036
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 970
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1006
18/08/07 23:39:40 INFO spark.ContextCleaner: Cleaned accumulator 1039
18/08/07 23:39:47 WARN util.Utils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.debug.maxToStringFields' in SparkEnv.conf.
18/08/07 23:39:54 INFO parquet.ParquetFileFormat: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter

18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Post-Scan Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Output Data Schema: struct<_c0: string, _c1: string, _c2: string, _c3: string, _c4: string ... 802 more fields>
18/08/08 00:14:22 INFO execution.FileSourceScanExec: Pushed Filters: 
18/08/08 00:14:22 INFO datasources.FileSourceStrategy: Pruning directories with: 

数据帧的大小很小,大约5000条记录,800列。我使用了以下代码:

代码语言:javascript
复制
val parentDF = ...
val childADF = ...
val childBDF = ...

val aggregatedAColName = "CHILD_A"
val aggregatedBColName = "CHILD_B"

val columns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3", "val_0")
val keyColumns = List("key_col_0", "key_col_1", "key_col_2", "key_col_3")

val nestedAColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedAColName)
val childADataFrame = childADF
  .select(nestedAColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedAColName).alias(aggregatedAColName))
val joinedWithA = parentDF.join(childADataFrame, keyColumns, "left")

val nestedBColumns = keyColumns.map(x => col(x)) :+ struct(columns.map(col): _*).alias(aggregatedBColName)
val childBDataFrame = childBDF
  .select(nestedBColumns: _*)
  .repartition(keyColumns.map(col): _*)
  .groupBy(keyColumns.map(col): _*)
  .agg(collect_list(aggregatedBColName).alias(aggregatedBColName))
val joinedWithB = joinedWithA.join(childBDataFrame, keyColumns, "left")

30个文件(全部约85k记录)的处理时间高得出奇,约为38分钟。你见过类似的问题吗?

EN

回答 1

Stack Overflow用户

发布于 2018-08-10 21:59:17

尽量避免调用repartition(),因为它会导致节点内不必要的数据移动。

根据Learning Spark

请记住,对数据进行重新分区是一项相当昂贵的操作。Spark还有一个名为coalesce()的repartition()的优化版本,它允许避免数据移动,但前提是您要减少RDD分区的数量。

简单地说,COALESCE :-只是为了减少分区的数量,不是数据的混洗,它只是压缩分区。

票数 -1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51787802

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档