我需要处理流到S3文件夹中的xml文件。目前,我已经实现了如下内容。
首先,使用Spark的fileStream读取文件
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
对于每个RDD,检查是否已读取任何文件
if (data.count() !=0)将字符串写入新的HDFS目录
data.coalesce(1).saveAsTextFile(sdir);从上述HDFS目录创建Dataframe读取
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)对Dataframe进行一些处理并另存为JSON
loaddata.write.mode("append").json("s3://mybucket/somefolder")不知何故,我觉得上面的方法是非常低效的,坦率地说,很像学生时代的孩子。有没有更好的解决方案?任何帮助都将不胜感激。
接下来的问题:如何操作数据帧中的字段(而不是列)?我有一个非常复杂的嵌套xml,当我使用上面描述的方法时,我得到了一个包含9列和50多个内部Struct数组的Dataframe。这是很好的,除了需要修剪某些字段名称。有没有一种方法可以在不爆炸数据帧的情况下实现这一点,因为我需要再次构造相同的结构?
发布于 2016-11-18 23:41:45
如果你使用Spark 2.0,你也许能够让它与结构化流媒体一起工作:
val inputDF = spark.readStream.format("com.databricks.spark.xml")
.option("rowTag", "Trans")
.load(path)https://stackoverflow.com/questions/40679817
复制相似问题