以下是场景
Reducer1
/
Mapper - - Reducer2
\
ReducerN 在reducer中,我想将数据写到不同的文件中,假设reducer看起来像这样
def reduce():
for line in sys.STDIN:
if(line == type1):
create_type_1_file(line)
if(line == type2):
create_type_2_file(line)
if(line == type3):
create_type3_file(line)
... and so on
def create_type_1_file(line):
# writes to file1
def create_type2_file(line):
# writes to file2
def create_type_3_file(line):
# write to file 3 考虑如下所示的路径:
file1 = /home/user/data/file1
file2 = /home/user/data/file2
file3 = /home/user/data/file3 当我在pseudo-distributed mode(machine with one node and hdfs daemons running)中运行时,情况很好,因为所有的守护进程都将写入同一组文件
问:-如果我在1000台机器的集群中运行它,它们会写入相同的文件集吗?在这种情况下,我是writing to local filesystem,有没有更好的方法在hadoop streaming中执行此操作
发布于 2011-10-11 09:38:31
通常,reduce的o/p被写入像HDFS这样的可靠存储系统,因为如果节点中的一个发生故障,那么与该节点相关联的reduce数据就会丢失。不可能在Hadoop框架上下文之外再次运行该特定reduce任务。此外,一旦作业完成,来自1000个节点的o/p必须针对不同的输入类型进行合并。
在HDFS中,并发写入是not supported。在HDFS中,可能存在多个reducers正在写入同一文件的情况,这可能会损坏该文件。当多个reduce任务在单个节点上运行时,在写入单个本地文件时,并发性也可能是一个问题。
其中一种解决方案是使用reduce task specific file name,然后组合特定输入类型的所有文件。
发布于 2016-07-06 15:58:12
可以使用MultipleOutputs将减速器的输出写入多个位置。class.You可以将file1、file2和file3视为三个文件夹,并将1000个减速器的输出数据分别写入这些文件夹。
作业提交的使用模式:
Job job = new Job();
FileInputFormat.setInputPath(job, inDir);
//outDir is the root path, in this case, outDir="/home/user/data/"
FileOutputFormat.setOutputPath(job, outDir);
//You have to assign the output formatclass.Using MultipleOutputs in this way will still create zero-sized default output, eg part-00000. To prevent this use LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); instead of job.setOutputFormatClass(TextOutputFormat.class); in your Hadoop job configuration.
LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setMapperClass(MOMap.class);
job.setReducerClass(MOReduce.class);
...
job.waitForCompletion(true);Reducer中的用法:
private MultipleOutputs out;
public void setup(Context context) {
out = new MultipleOutputs(context);
...
}
public void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
//'/' characters in baseOutputPath will be translated into directory levels in your file system. Also, append your custom-generated path with "part" or similar, otherwise your output will be -00000, -00001 etc. No call to context.write() is necessary.
for (Text line : values) {
if(line == type1)
out.write(key, new Text(line),"file1/part");
else if(line == type2)
out.write(key, new Text(line),"file2/part");
else if(line == type3)
out.write(key, new Text(line),"file3/part");
}
}
protected void cleanup(Context context) throws IOException, InterruptedException {
out.close();
}https://stackoverflow.com/questions/7715865
复制相似问题