我是Hadoop的新手,我的map-还原代码可以工作,但是它不会产生任何输出。这里是地图减少的信息:
16/09/20 13:11:40 INFO mapred.JobClient: Job complete: job_201609081210_0078
16/09/20 13:11:40 INFO mapred.JobClient: Counters: 28
16/09/20 13:11:40 INFO mapred.JobClient: Map-Reduce Framework
16/09/20 13:11:40 INFO mapred.JobClient: Spilled Records=0
16/09/20 13:11:40 INFO mapred.JobClient: Map output materialized bytes=1362
16/09/20 13:11:40 INFO mapred.JobClient: Reduce input records=0
16/09/20 13:11:40 INFO mapred.JobClient: Virtual memory (bytes) snapshot=466248720384
16/09/20 13:11:40 INFO mapred.JobClient: Map input records=852032443
16/09/20 13:11:40 INFO mapred.JobClient: SPLIT_RAW_BYTES=29964
16/09/20 13:11:40 INFO mapred.JobClient: Map output bytes=0
16/09/20 13:11:40 INFO mapred.JobClient: Reduce shuffle bytes=1362
16/09/20 13:11:40 INFO mapred.JobClient: Physical memory (bytes) snapshot=57472311296
16/09/20 13:11:40 INFO mapred.JobClient: Reduce input groups=0
16/09/20 13:11:40 INFO mapred.JobClient: Combine output records=0
16/09/20 13:11:40 INFO mapred.JobClient: Reduce output records=0
16/09/20 13:11:40 INFO mapred.JobClient: Map output records=0
16/09/20 13:11:40 INFO mapred.JobClient: Combine input records=0
16/09/20 13:11:40 INFO mapred.JobClient: CPU time spent (ms)=2375210
16/09/20 13:11:40 INFO mapred.JobClient: Total committed heap usage (bytes)=47554494464
16/09/20 13:11:40 INFO mapred.JobClient: File Input Format Counters
16/09/20 13:11:40 INFO mapred.JobClient: Bytes Read=15163097088
16/09/20 13:11:40 INFO mapred.JobClient: FileSystemCounters
16/09/20 13:11:40 INFO mapred.JobClient: HDFS_BYTES_READ=15163127052
16/09/20 13:11:40 INFO mapred.JobClient: FILE_BYTES_WRITTEN=13170190
16/09/20 13:11:40 INFO mapred.JobClient: FILE_BYTES_READ=6
16/09/20 13:11:40 INFO mapred.JobClient: Job Counters
16/09/20 13:11:40 INFO mapred.JobClient: Launched map tasks=227
16/09/20 13:11:40 INFO mapred.JobClient: Launched reduce tasks=1
16/09/20 13:11:40 INFO mapred.JobClient: SLOTS_MILLIS_REDUCES=759045
16/09/20 13:11:40 INFO mapred.JobClient: Total time spent by all reduces waiting after reserving slots (ms)=0
16/09/20 13:11:40 INFO mapred.JobClient: SLOTS_MILLIS_MAPS=1613259
16/09/20 13:11:40 INFO mapred.JobClient: Total time spent by all maps waiting after reserving slots (ms)=0
16/09/20 13:11:40 INFO mapred.JobClient: Data-local map tasks=227
16/09/20 13:11:40 INFO mapred.JobClient: File Output Format Counters
16/09/20 13:11:40 INFO mapred.JobClient: Bytes Written=0下面是启动mapreduce作业的代码:
import java.io.File;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class mp{
public static void main(String[] args) throws Exception {
Job job1 = new Job();
job1.setJarByClass(mp.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
String oFolder = args[0] + "/output";
FileOutputFormat.setOutputPath(job1, new Path(oFolder));
job1.setMapperClass(TransMapper1.class);
job1.setReducerClass(TransReducer1.class);
job1.setMapOutputKeyClass(LongWritable.class);
job1.setMapOutputValueClass(DnaWritable.class);
job1.setOutputKeyClass(LongWritable.class);
job1.setOutputValueClass(Text.class);
}
}这里是mapper类(TransMapper1):
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class TransMapper1 extends Mapper<LongWritable, Text, LongWritable, DnaWritable> {
@Override
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
StringTokenizer tokenizer = new StringTokenizer(line);
LongWritable bamWindow = new LongWritable(Long.parseLong(tokenizer.nextToken()));
LongWritable read = new LongWritable(Long.parseLong(tokenizer.nextToken()));
LongWritable refWindow = new LongWritable(Long.parseLong(tokenizer.nextToken()));
IntWritable chr = new IntWritable(Integer.parseInt(tokenizer.nextToken()));
DoubleWritable dist = new DoubleWritable(Double.parseDouble(tokenizer.nextToken()));
DnaWritable dnaW = new DnaWritable(bamWindow,read,refWindow,chr,dist);
context.write(bamWindow,dnaW);
}
}这是Reducer类(TransReducer1):
import java.io.IOException;
import java.util.ArrayList;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class TransReducer1 extends Reducer<LongWritable, DnaWritable, LongWritable, Text> {
@Override
public void reduce(LongWritable key, Iterable<DnaWritable> values, Context context) throws IOException, InterruptedException {
ArrayList<DnaWritable> list = new ArrayList<DnaWritable>();
double minDist = Double.MAX_VALUE;
for (DnaWritable value : values) {
long bamWindow = value.getBamWindow().get();
long read = value.getRead().get();
long refWindow = value.getRefWindow().get();
int chr = value.getChr().get();
double dist = value.getDist().get();
if (dist > minDist)
continue;
else
if (dist < minDist)
list.clear();
list.add(new DnaWritable(bamWindow,read,refWindow,chr,dist));
minDist = Math.min(minDist, value.getDist().get());
}
for(int i = 0; i < list.size(); i++){
context.write(new LongWritable(list.get(i).getRead().get()),new Text(new DnaWritable(list.get(i).getBamWindow(),list.get(i).getRead(),list.get(i).getRefWindow(),list.get(i).getChr(),list.get(i).getDist()).toString()));
}
}
}这是DnaWritable类(我没有用导入部分来简化它):
public class DnaWritable implements Writable {
LongWritable bamWindow;
LongWritable read;
LongWritable refWindow;
IntWritable chr;
DoubleWritable dist;
public DnaWritable(LongWritable bamWindow, LongWritable read, LongWritable refWindow, IntWritable chr, DoubleWritable dist){
this.bamWindow = bamWindow;
this.read = read;
this.refWindow = refWindow;
this.chr = chr;
this.dist = dist;
}
public DnaWritable(long bamWindow, long read, long refWindow, int chr, double dist){
this.bamWindow = new LongWritable(bamWindow);
this.read = new LongWritable(read);
this.refWindow = new LongWritable(refWindow);
this.chr = new IntWritable(chr);
this.dist = new DoubleWritable(dist);
}
@Override
public void write(DataOutput dataOutput) throws IOException {
bamWindow.write(dataOutput);
read.write(dataOutput);
refWindow.write(dataOutput);
chr.write(dataOutput);
dist.write(dataOutput);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
bamWindow.readFields(dataInput);
read.readFields(dataInput);
refWindow.readFields(dataInput);
chr.readFields(dataInput);
dist.readFields(dataInput);
}
}任何帮助都将是非常感谢的。谢谢
发布于 2016-09-20 18:21:32
您能否将DnaWritable类更改为并进行相同的测试(处理NPE)
package com.hadoop.intellipaat;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class DnaWritable implements Writable {
private Long bamWindow;
private Long read;
private Long refWindow;
private Integer chr;
private Double dist;
public DnaWritable(Long bamWindow, Long read, Long refWindow, Integer chr, Double dist) {
super();
this.bamWindow = bamWindow;
this.read = read;
this.refWindow = refWindow;
this.chr = chr;
this.dist = dist;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(bamWindow);
out.writeLong(read);
out.writeLong(refWindow);
out.writeInt(chr);
out.writeDouble(dist);
}
@Override
public void readFields(DataInput in) throws IOException {
this.bamWindow = in.readLong();
this.read = in.readLong();
this.refWindow = in.readLong();
this.chr = in.readInt();
this.dist = in.readDouble();
}
}发布于 2016-09-21 07:25:12
我认为你根本没有把你的工作提交给集群。在您的主类中,没有job1.mit()或job1.warforCompletion(True)。
////submit the job to hadoop
if (!job1.waitForCompletion(true))
return;此外,在您的主要方法中需要进行更正。
Job job1 = new Job(); //new Job() constructor is deprecated now.下面是创建作业对象的正确方法
Configuration conf = new Configuration();
Job job1 = Job.getInstance(conf, "Your Program name");发布于 2016-09-20 17:54:29
我认为您还没有在您的write(DataOutput out)类中正确地实现DnaWritable和readFields(DataInput in)方法。
https://stackoverflow.com/questions/39600399
复制相似问题