首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >组合密钥Hadoop难点

组合密钥Hadoop难点
EN

Stack Overflow用户
提问于 2013-08-22 13:21:36
回答 1查看 3.2K关注 0票数 1

我使用Hadoop来分析GSOD数据(ftp://ftp.ncdc.noaa.gov/pub/data/gsod/)。我选择了5年来执行我的实验(2005-2009年)。我配置了一个小型集群,并执行了一个简单的MapReduce程序,该程序获得了一年的最高温度。

现在,我必须创建一个新的MR程序,为每个电台计算所有这些年来发生的现象。

我必须分析的文件具有这样的结构:

代码语言:javascript
复制
STN--- ...  FRSHTO
722115      110001
722115      011001
722110      111000
722110      001000
722000      001000

列STN表示站号,FRSHTT表示雾、雨或细雨、雪或冰球、H-冰雹、T-雷电、O-旋风或漏斗云等现象.

值1表示这一现象发生在那一天;0表示没有受到影响。

我需要找到如下结果:

代码语言:javascript
复制
722115: F = 1, R = 2, S = 1, O = 2
722110: F = 1, R = 1, S = 2
722000: S = 1

我可以运行MR程序,但是结果是错误的,给出了以下结果:

代码语言:javascript
复制
722115 F, 1
722115 R, 1
722115 R, 1
722115 S, 1
722115 O, 1
722115 O, 1
722110 F, 1
722110 R, 1
722110 S, 1
722110 S, 1
722000 S, 1

我使用了这些密码:

Mapper.java

代码语言:javascript
复制
public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text, StationPhenomenun, IntWritable> {
@Override
protected void map(LongWritable key, Text value, org.apache.hadoop.mapreduce.Mapper.Context context) throws IOException, InterruptedException {
    String line = value.toString();
    // Every file starts with a field description line, so, I ignore this line
    if (!line.startsWith("STN---")) {
        // First field of the line means the station code where data was collected
        String station = line.substring(0, 6);
        String fog = (line.substring(132, 133));
        String rainOrDrizzle = (line.substring(133, 134));
        String snowOrIcePellets = (line.substring(134, 135));
        String hail = (line.substring(135, 136));
        String thunder = (line.substring(136, 137));
        String tornadoOrFunnelCloud = (line.substring(137, 138));

        if (fog.equals("1"))
            context.write(new StationPhenomenun(station,"F"), new IntWritable(1));
        if (rainOrDrizzle.equals("1"))
            context.write(new StationPhenomenun(station,"R"), new IntWritable(1));
        if (snowOrIcePellets.equals("1"))
            context.write(new StationPhenomenun(station,"S"), new IntWritable(1));
        if (hail.equals("1"))
            context.write(new StationPhenomenun(station,"H"), new IntWritable(1));
        if (thunder.equals("1"))
            context.write(new StationPhenomenun(station,"T"), new IntWritable(1));
        if (tornadoOrFunnelCloud.equals("1"))
            context.write(new StationPhenomenun(station,"O"), new IntWritable(1));
    }
}
}

Reducer.java

代码语言:javascript
复制
public class Reducer extends org.apache.hadoop.mapreduce.Reducer<StationPhenomenun, IntWritable, StationPhenomenun, IntWritable> {

protected void reduce(StationPhenomenun key, Iterable<IntWritable> values, org.apache.hadoop.mapreduce.Reducer.Context context) throws IOException, InterruptedException {
int count = 0;        
    for (IntWritable value : values) {
        count++;
    }

    String station = key.getStation().toString();
    String occurence = key.getPhenomenun().toString();

    StationPhenomenun textPair = new StationPhenomenun(station, occurence);
    context.write(textPair, new IntWritable(count));
}
}

StationPhenomenum.java

代码语言:javascript
复制
public class StationPhenomenun implements WritableComparable<StationPhenomenun> {
private String station;
private String phenomenun;
public StationPhenomenun(String station, String phenomenun) {
    this.station = station;
    this.phenomenun = phenomenun;
}
public StationPhenomenun() {
}
public String getStation() {
    return station;
}
public String getPhenomenun() {
    return phenomenun;
}
@Override
public void readFields(DataInput in) throws IOException {
    station = in.readUTF();
    phenomenun = in.readUTF();
}
@Override
public void write(DataOutput out) throws IOException {
    out.writeUTF(station);
    out.writeUTF(phenomenun);
}
@Override
public int compareTo(StationPhenomenun t) {
    int cmp = this.station.compareTo(t.station);
    if (cmp != 0) {
        return cmp;
    }
    return this.phenomenun.compareTo(t.phenomenun);
}    
@Override
public boolean equals(Object obj) {
    if (obj == null) {
        return false;
    }
    if (getClass() != obj.getClass()) {
        return false;
    }
    final StationPhenomenun other = (StationPhenomenun) obj;
    if (this.station != other.station && (this.station == null || !this.station.equals(other.station))) {
        return false;
    }
    if (this.phenomenun != other.phenomenun && (this.phenomenun == null || !this.phenomenun.equals(other.phenomenun))) {
        return false;
    }
    return true;
}
@Override
public int hashCode() {
    return this.station.hashCode() * 163 + this.phenomenun.hashCode();
}
}

NcdcJob.java

代码语言:javascript
复制
public class NcdcJob {
public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf);
    job.setJarByClass(NcdcJob.class);
    FileInputFormat.addInputPath(job, new Path("/user/hadoop/input"));
    FileOutputFormat.setOutputPath(job, new Path("/user/hadoop/station"));
    job.setMapperClass(Mapper.class);
    job.setReducerClass(Reducer.class);
    job.setMapOutputKeyClass(StationPhenomenun.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setOutputKeyClass(StationPhenomenun.class);
    job.setOutputValueClass(IntWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

有人做过类似的事吗?

PS.:我试过这个解决方案(Hadoop复合密钥),但对我没有用.

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2013-08-23 16:38:46

只需检查以下两个类是否与您的自定义实现相匹配。

代码语言:javascript
复制
 job.setMapperClass(Mapper.class);
 job.setReducerClass(Reducer.class);

通过以下更改,我能够获得所需的结果

代码语言:javascript
复制
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

protected void reduce(StationPhenomenun key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

还将类名更改为MyMapperMyReducer

代码语言:javascript
复制
722115,1,1,0,0,0,1
722115,0,1,1,0,0,1
722110,1,1,1,0,0,0
722110,0,0,1,0,0,0
722000,0,0,1,0,0,0

对于这个输入集,我可以得到以下结果

代码语言:javascript
复制
StationPhenomenun [station=722000, phenomenun=S]    1
StationPhenomenun [station=722110, phenomenun=F]    1
StationPhenomenun [station=722110, phenomenun=R]    1
StationPhenomenun [station=722110, phenomenun=S]    2
StationPhenomenun [station=722115, phenomenun=F]    1
StationPhenomenun [station=722115, phenomenun=O]    2
StationPhenomenun [station=722115, phenomenun=R]    2
StationPhenomenun [station=722115, phenomenun=S]    1

计算是一样的,您只需要自定义输出的显示方式。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/18381684

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档