在Spark中,我使用sc.binaryFiles从s3下载了多个文件。生成的RDD将键作为文件名,值包含文件的内容。我已经解压了文件内容,csv对其进行了解析,并将其转换为数据帧。所以,现在我有了一个PairRDDString,DataFrame。我遇到的问题是,我想使用密钥作为文件名将文件保存到HDFS,并将该值保存为拼接文件,如果该文件已经存在,则覆盖该文件。这就是我到目前为止所得到的。
val files = sc.binaryFiles(lFiles.mkString(","), 250).mapValues(stream => sc.parallelize(readZipStream(new ZipInputStream(stream.open))))
val tables = files.mapValues(file => {
val header = file.first.split(",")
val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true)))
val lines = file.mapPartitionsWithIndex { (idx, iter) => if (idx == 0) iter.drop(1) else iter }.flatMap(x => x.split("\n"))
val rowRDD = lines.map(x => Row.fromSeq(x.split(",")))
sqlContext.createDataFrame(rowRDD, schema)
})如果你有任何建议,请让我知道。我将不胜感激。
谢谢,本
发布于 2016-04-06 10:22:52
在spark中将文件保存到HDFS的方法与hadoop相同。所以你需要创建一个扩展MultipleTextOutputFormat的类,在自定义类中你可以定义输出文件名yourself.the示例如下:
class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = {
"realtime-" + new SimpleDateFormat("yyyyMMddHHmm").format(new Date()) + "00-" + name
}
}调用代码如下:
RDD.rddToPairRDDFunctions(rdd.map { case (key, list) =>
(NullWritable.get, key)
}).saveAsHadoopFile(input, classOf[NullWritable], classOf[String], classOf[RDDMultipleTextOutputFormat])https://stackoverflow.com/questions/36437174
复制相似问题