如何读取HDFS上某个目录中的所有文件并使用scalding进行处理。对于本地文件系统,我使用以下代码
import com.twitter.scalding._
import com.twitter.scalding.JsonLine
import java.io._
class ParseJsonJob(args: Args) extends Job(args) {
val fileList = new File(args("input")).listFiles
val fields = ('device_guid
,'service_name
,'event_type
)
fileList.map {
fileName =>
JsonLine(fileName.toString, fields)
.read
.filter ('service_name) { name: String => name == "myservice" }
.write(Tsv(args("output") + fileName.toString.split("/").last))
}
}这不适用于HDFS。除了文件之外,TextLine或JsonLine还会读取目录吗?
发布于 2015-02-23 01:00:04
您将获得一个Hadoop并使用FileSystem.liststatus原语来扫描HDFS目录,如下所示:
...
val hadoopConf= implicitly[Mode] match {
case Hdfs(_, conf) => conf
}
val fs= FileSystem.get(hadoopConf)
for(status <- fs.listStatus(new Path(args("input")))) {
JsonLine(status.getPath.toString.toString, fields)
.read
.filter ('service_name) { name: String => name == "myservice" }
.write(Tsv(args("output") + fileName.toString.split("/").last))
}发布于 2015-03-14 04:17:27
导入com.twitter.scalding._
导入com.twitter.scalding.JsonLine导入java.io._
类字段(args: Args)扩展作业(Args){ val ParseJsonJob= ('device_guid,'service_name,'event_type)
JsonLine(args("input"),.read ).read .filter ('service_name) { name: String => name == "myservice“} .write(Tsv(args("output") )}}
这对你来说是可行的。如果不是,让我知道。
https://stackoverflow.com/questions/28620732
复制相似问题