我的目录结构如下:
/数据/模型a
/数据/模式B
/数据/模型C。
这些文件中的每一个都有格式的数据(id,记分),我必须分别为它们做以下工作-
1)按分数分组并按降序排序(DF_1:得分,计数)
2)从DF_1中计算出每组分数的累积频率(DF_2: score,count,cumFreq)
3)从DF_2中选择5-10之间的累积频率(DF_3: score,cumFreq)
4)从DF_3中选择最小得分(DF_4: score)
5)从文件中选择在DF_4中得分大于得分的所有id并保存
我能够这样做:将目录读取为wholeTextFile并为所有模型创建一个通用的数据格式,然后使用group模型。
我想做的 -
val scores_file = sc.wholeTextFiles("/data/*/")
val scores = scores_file.map{ line =>
//step 1
//step 2
//step 3
//step 4
//step 5 : save as line._1
} 这将有助于单独处理每个文件,并避免按组处理。
发布于 2017-07-17 09:25:15
假设您的模型是离散的值,并且您知道,那么您可以将所有模型定义为一个列表。
val model = List("modelA", "modelB", "modelC", ... )您可以采用以下方法:
model.forEach( model => {
val scoresPerModel = sc.textFile(model);
scoresPerModel.map { line =>
// business logic here
}
})如果您在计算业务逻辑之前不知道模型,则必须使用Hadoop文件系统API读取该模型并从中提取模型。
private val fs = {
val conf = new org.apache.hadoop.conf.Configuration()
FileSystem.get(conf)
}
fs.listFiles(new Path(hdfsPath)) https://stackoverflow.com/questions/45139913
复制相似问题