我有一个简单的mapper,减速机和组合器的地图还原代码。映射器的输出被传递给组合器。但是给减速器,而不是来自组合器的输出,来自映射器的输出被传递。
请帮帮忙
代码:
package Combiner;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class AverageSalary
{
public static class Map extends Mapper<LongWritable, Text, Text, DoubleWritable>
{
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException
{
String[] empDetails= value.toString().split(",");
Text unit_key = new Text(empDetails[1]);
DoubleWritable salary_value = new DoubleWritable(Double.parseDouble(empDetails[2]));
context.write(unit_key,salary_value);
}
}
public static class Combiner extends Reducer<Text,DoubleWritable, Text,Text>
{
public void reduce(final Text key, final Iterable<DoubleWritable> values, final Context context)
{
String val;
double sum=0;
int len=0;
while (values.iterator().hasNext())
{
sum+=values.iterator().next().get();
len++;
}
val=String.valueOf(sum)+":"+String.valueOf(len);
try {
context.write(key,new Text(val));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static class Reduce extends Reducer<Text,Text, Text,Text>
{
public void reduce (final Text key, final Text values, final Context context)
{
//String[] sumDetails=values.toString().split(":");
//double average;
//average=Double.parseDouble(sumDetails[0]);
try {
context.write(key,values);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public static void main(String args[])
{
Configuration conf = new Configuration();
try
{
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: Main <in> <out>");
System.exit(-1); }
Job job = new Job(conf, "Average salary");
//job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
job.setJarByClass(AverageSalary.class);
job.setMapperClass(Map.class);
job.setCombinerClass(Combiner.class);
job.setReducerClass(Reduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
System.exit(job.waitForCompletion(true) ? 0 : -1);
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}}
发布于 2013-11-26 10:16:52
似乎您忘记了组合器的重要性质:
键/值的输入类型和键/值的输出类型需要相同。
您不能接受一个Text/DoubleWritable并返回一个Text/Text。我建议您使用Text而不是DoubleWritable,并在Combiner中进行适当的解析。
发布于 2013-11-26 15:01:23
组合器的第一条规则是:不假定组合器将运行。将组合器仅视为优化。
不能保证Combiner会运行您的所有数据。在某些情况下,当数据不需要溢出到磁盘时,MapReduce将完全跳过使用Combiner。还请注意,Combiner可能在数据子集上运行多次!每次泄漏都会运行一次。
在你的情况下,你是在做这个错误的假设。您应该在Combiner和Reducer中进行求和。
此外,您还应该遵循@user987339的答案。组合器的输入和输出必须相同(文本、双->文本、双文本),并且需要与映射器的输出和减速器的输入相匹配。
发布于 2015-04-23 13:07:30
如果使用组合函数,则其形式与Reducer函数(并且是Reducer的实现)相同,但它的输出类型是中间键和值类型(K2和V2),因此它们可以为减少函数提供如下信息: map:(K1,V1)→list(K2,V2)组合:(K2,list( V2) ) V3 list(K2,V2) reduce:(K2,list(V2)) K3 list(K3,V3)通常是相同的,在这种情况下,K3与K2相同,V3和V2是一样的。
https://stackoverflow.com/questions/20212884
复制相似问题