首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Spark:将DataFrame编写为压缩的JSON

Spark:将DataFrame编写为压缩的JSON
EN

Stack Overflow用户
提问于 2015-08-11 12:08:58
回答 3查看 20.1K关注 0票数 21

Apache Spark的DataFrameReader.json()可以自动处理JSONlines压缩文件,但是似乎没有办法让DataFrameWriter.json()编写压缩的JSONlines文件。额外的网络I/O在云中是非常昂贵的。

有没有办法绕过这个问题?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2015-09-22 18:46:22

下面的解决方案使用pyspark,但我假设Scala中的代码应该是类似的。

第一个选项是在初始化SparkConf时设置以下内容:

代码语言:javascript
复制
conf = SparkConf()
conf.set("spark.hadoop.mapred.output.compress", "true")
conf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
conf.set("spark.hadoop.mapred.output.compression.type", "BLOCK")

使用上面的代码,您使用该sparkContext生成的任何文件都会使用gzip自动压缩。

第二个选项,如果只想在上下文中压缩选定的文件。假设"df“是你的数据帧,文件名是你的目的地:

代码语言:javascript
复制
df_rdd = self.df.toJSON() 
df_rdd.saveAsTextFile(filename,compressionCodecClass="org.apache.hadoop.io.compress.GzipCodec")
票数 15
EN

Stack Overflow用户

发布于 2017-06-28 16:22:11

使用Spark 2.X (可能在更早的时候,我没有测试)有一种更简单的方法来编写压缩的JSON,它不需要更改配置:

代码语言:javascript
复制
val df: DataFrame = ...
df.write.option("compression", "gzip").json("/foo/bar")

这也适用于CSV和Parquet,只需在设置压缩选项后使用.csv()和.parquet()而不是.json()来写入文件。

可能的编解码器有: none、bzip2、deflate、gzip、lz4和snappy。

票数 30
EN

Stack Overflow用户

发布于 2018-12-19 13:58:45

作为公认的答案,在SparkConf上设置压缩选项并不是一个好的做法。它改变了全局的行为,而不是在每个文件的基础上指示设置。事实是,explicit总是比implicit更好。也有一些情况下,用户不能轻松地操作上下文配置,例如spark-shell或设计为另一个子模块的代码。

正确的方式

从Spark1.4开始支持使用压缩来写入DataFrame。实现这一目标的几种方法:

代码语言:javascript
复制
df.write.json("filename.json", compression="gzip")

就这样!只需随心所欲地使用DataFrameWriter.json()即可。

魔力隐藏在代码pyspark/sql/readwriter.py

代码语言:javascript
复制
@since(1.4)
def json(self, path, mode=None, compression=None, dateFormat=None, timestampFormat=None):
    """Saves the content of the :class:`DataFrame` in JSON format
    (`JSON Lines text format or newline-delimited JSON <http://jsonlines.org/>`_) at the
    specified path.

    :param path: the path in any Hadoop supported file system
    :param mode: ...

    :param compression: compression codec to use when saving to file. This can be one of the
                        known case-insensitive shorten names (none, bzip2, gzip, lz4,
                        snappy and deflate).
    :param dateFormat: ...
    :param timestampFormat: ...

    >>> df.write.json(os.path.join(tempfile.mkdtemp(), 'data'))
    """
    self.mode(mode)
    self._set_opts(
        compression=compression, dateFormat=dateFormat, timestampFormat=timestampFormat)
    self._jwrite.json(path)

支持的压缩格式有bzip2、gzip、lz4、snappy和deflate,不区分大小写。

scala API应该是相同的。

另一个

代码语言:javascript
复制
df.write.options(compression="gzip").json("filename.json")

类似于上面的。可以支持更多选项作为关键字参数。从Spark 1.4开始可用。

第三

代码语言:javascript
复制
df.write.option("compression", "gzip").json("filename.json")

DataFrameWriter.option()是从Spark1.5开始添加的。一次只能添加一个参数。

票数 15
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/31933053

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档