我试着用TotalOrderPartitioner做一个简单的排序示例。输入是一个序列文件,其中IntWritable为键,NullWritable为值。我想根据钥匙分类。它的输出是一个序列文件,其中IntWritable为键,NullWritable为值。我在集群环境中运行这个任务。这是我的驾驶课:
public class SortDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
Configuration conf = this.getConf();
Job job = Job.getInstance(conf);
job.setJobName("SORT-WITH-TOTAL-ORDER-PARTITIONER");
job.setJarByClass(SortDriver.class);
job.setInputFormatClass(SequenceFileInputFormat.class);
SequenceFileInputFormat.setInputPaths(job, new Path("/user/client/seq-input"));
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapperClass(SortMapper.class);
job.setReducerClass(SortReducer.class);
job.setPartitionerClass(TotalOrderPartitioner.class);
TotalOrderPartitioner.setPartitionFile(conf, new Path("/user/client/partition.lst"));
job.setOutputFormatClass(SequenceFileOutputFormat.class);
SequenceFileOutputFormat.setCompressOutput(job, true);
SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);
SequenceFileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);
SequenceFileOutputFormat.setOutputPath(job, new Path("/user/client/sorted-output"));
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(NullWritable.class);
job.setNumReduceTasks(3);
InputSampler.Sampler<IntWritable, NullWritable> sampler = new InputSampler.RandomSampler<>(0.1, 200);
InputSampler.writePartitionFile(job, sampler);
boolean res = job.waitForCompletion(true);
return res ? 0 : 1;
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new Configuration(), new SortDriver(), args));
}
}Mapper类:
public class SortMapper extends Mapper<IntWritable, NullWritable, IntWritable, NullWritable>{
@Override
protected void map(IntWritable key, NullWritable value, Context context) throws IOException, InterruptedException {
context.write(key, value);
}
}Reducer类:
public class SortReducer extends Reducer<IntWritable, NullWritable, IntWritable, NullWritable> {
@Override
protected void reduce(IntWritable key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(key, NullWritable.get());
}
}当我管理这份工作时,我得到:
Error: java.lang.IllegalArgumentException: Can't read partitions file
at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:116)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:678)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:747)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:340)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:167)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1557)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
Caused by: java.io.FileNotFoundException: File file:/grid/hadoop/yarn/local/usercache/client/appcache/application_1406784047304_0002/container_1406784047304_0002_01_000003/_partition.lst does not exist
at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:511)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:724)
at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:501)
at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:397)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1749)
at org.apache.hadoop.io.SequenceFile$Reader.<init>(SequenceFile.java:1773)
at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.readPartitions(TotalOrderPartitioner.java:301)
at org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner.setConf(TotalOrderPartitioner.java:88)
... 10 more我在主目录(/user/client)中找到了名为_partition.lst的分区文件。分区文件名与代码:TotalOrderPartitioner.setPartitionFile(conf, new Path("/user/client/partition.lst"));不匹配。有人能帮我解决这个问题吗?我在HDP 2.1发行版中使用Hadoop2.4。
发布于 2014-07-31 12:45:05
我认为问题就在眼前:
TotalOrderPartitioner.setPartitionFile(conf, new Path("/user/client/partition.lst"));你必须把它替换成:
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path("/user/client/partition.lst"));因为你在用
InputSampler.writePartitionFile(job, sampler);否则,尝试将最后一行替换为:
InputSampler.writePartitionFile(conf, sampler);但我不确定在新的API中它是否像那样工作。
希望能帮上忙!祝好运!
发布于 2015-01-14 08:14:45
我在使用时还发现了这个错误,而且mapreduce服务还没有安装和启动。安装mapreduce并启动它之后,异常就消失了。
发布于 2015-03-24 00:41:46
获得此错误时,我有job.setNumReduceTasks(3);并运行我的代码在独立模式
将其改为job.setNumReduceTasks(1),并在独立模式下工作良好
https://stackoverflow.com/questions/25051671
复制相似问题