首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我们如何保存一个巨大的pyspark数据帧?

我们如何保存一个巨大的pyspark数据帧?
EN

Stack Overflow用户
提问于 2019-07-23 10:24:12
回答 1查看 4.4K关注 0票数 7

我有一个很大的pyspark Dataframe,我想把它保存在myfile (.tsv)中以备将来使用。为此,我定义了以下代码:

代码语言:javascript
复制
with open(myfile, "a") as csv_file:
        writer = csv.writer(csv_file, delimiter='\t')
        writer.writerow(["vertex" + "\t" + "id_source" + "\t" + "id_target" + "\t"+ "similarity"])

        for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
            part_rdd = joinDesrdd_df.rdd.mapPartitionsWithIndex(make_part_filter(part_id), True)
            data_from_part_rdd = part_rdd.collect()
            vertex_list = set()

            for row in data_from_part_rdd:
                writer.writerow([....])

        csv_file.flush() 

我的代码不能跳过这一步,它会生成一个异常:

1.

代码语言:javascript
复制
 in the workers log:
19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/2 finished with state KILLED exitStatus 143
14: 19/07/22 08:58:57 INFO ExternalShuffleBlockResolver: Application app-20190722085320-0000 removed, cleanupLocalDirs = true
14: 19/07/22 08:58:57 INFO Worker: Cleaning up local directories for application app-20190722085320-0000
 5: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/1 finished with state KILLED exitStatus 143
 7: 19/07/22 08:58:57 INFO Worker: Executor app-20190722085320-0000/14 finished with state KILLED exitStatus 143
...

2-在作业执行日志中:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/project/6008168/tamouze/RWLastVersion2207/module1.py", line 306, in <module>
    for part_id in range(joinDesrdd_df.rdd.getNumPartitions()):
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 88, in rdd
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco
  File "/cvmfs/soft.computecanada.ca/easybuild/software/2017/Core/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o528.javaToPython.
: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
+- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])

我不知道为什么我的这段代码会产生异常。请注意,在小数据上,执行是可以的,但在大数据上则不是。

另外,请告诉我保存pysaprk dataframe以备将来使用的最好方法是什么?

更新:我尝试用下面的循环替换上面的代码:

代码语言:javascript
复制
joinDesrdd_df.withColumn("par_id",col('id_source')%50).repartition(50, 'par_id').write.format('parquet').partitionBy("par_id").save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

也会得到类似的异常:

代码语言:javascript
复制
19/07/22 21:10:18 INFO TaskSetManager: Finished task 653.0 in stage 11.0 (TID 2257) in 216940 ms on 172.16.140.237 (executor 14) (1017/1024)
19/07/22 21:11:32 ERROR FileFormatWriter: Aborting job null.
org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree:
Exchange hashpartitioning(par_id#328, 50)
+- *(12) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258, par_id#328])
   +- Exchange hashpartitioning(id_source#263, id_target#292, similarity#258, 1024)
      +- *(11) HashAggregate(keys=[id_source#263, id_target#292, similarity#258], functions=[], output=[id_source#263, id_target#292, similarity#258])
         +- *(11) Project [id_source#263, id_target#292, similarity#258]
            +- *(11) BroadcastHashJoin [instance_target#65], [instance#291], Inner, BuildRight
EN

回答 1

Stack Overflow用户

发布于 2019-07-23 10:33:15

我建议使用Spark原生写入功能:

代码语言:javascript
复制
joinDesrdd_df.write.format('csv').option("header", "true").save("path/to/the/output/csv/folder")

Spark将数据帧的每个分区作为单独的csv文件保存到指定的路径中。您可以通过repartition方法控制文件的数量,这将在一定程度上控制每个文件将包含的数据量。

我还建议对大数据集使用ORC或Parquet数据格式,因为它们肯定更适合存储大数据集。

例如,拼花地板:

代码语言:javascript
复制
joinDesrdd_df.withColumn("par_id",col('id_source')%50). \
 repartition(50, 'par_id').write.format('parquet'). \
 save("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")

要将其读回到dataframe中:

代码语言:javascript
复制
df = spark.read. \
 parquet("/project/6008168/bib/RWLastVersion2207/randomWalkParquet/candidate.parquet")
票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/57155855

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档