首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用密钥作为文件名,将值作为内容,将文件保存在Spark PairRDD中?

如何使用密钥作为文件名,将值作为内容,将文件保存在Spark PairRDD中?
EN

Stack Overflow用户
提问于 2016-04-06 05:14:25
回答 1查看 1.5K关注 0票数 1

在Spark中,我使用sc.binaryFiles从s3下载了多个文件。生成的RDD将键作为文件名,值包含文件的内容。我已经解压了文件内容,csv对其进行了解析,并将其转换为数据帧。所以,现在我有了一个PairRDDString,DataFrame。我遇到的问题是,我想使用密钥作为文件名将文件保存到HDFS,并将该值保存为拼接文件,如果该文件已经存在,则覆盖该文件。这就是我到目前为止所得到的。

代码语言:javascript
复制
val files = sc.binaryFiles(lFiles.mkString(","), 250).mapValues(stream => sc.parallelize(readZipStream(new ZipInputStream(stream.open))))
val tables = files.mapValues(file => {
    val header = file.first.split(",")
    val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true)))
    val lines = file.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }.flatMap(x => x.split("\n"))
    val rowRDD = lines.map(x => Row.fromSeq(x.split(",")))
    sqlContext.createDataFrame(rowRDD, schema)
})

如果你有任何建议,请让我知道。我将不胜感激。

谢谢,本

EN

回答 1

Stack Overflow用户

发布于 2016-04-06 10:22:52

在spark中将文件保存到HDFS的方法与hadoop相同。所以你需要创建一个扩展MultipleTextOutputFormat的类,在自定义类中你可以定义输出文件名yourself.the示例如下:

代码语言:javascript
复制
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
        "realtime-" + new SimpleDateFormat("yyyyMMddHHmm").format(new Date()) + "00-" + name
    }
}

调用代码如下:

代码语言:javascript
复制
RDD.rddToPairRDDFunctions(rdd.map { case (key, list) =>
    (NullWritable.get, key)
}).saveAsHadoopFile(input, classOf[NullWritable], classOf[String], classOf[RDDMultipleTextOutputFormat])
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/36437174

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档