在编写mapreduce功能时,我遇到了一些问题。我想解决以下问题:
我有一个带有1mio JSONObject的JSON文件,如下所示:
{"_id":3951,"title":"Two Family House (2000)","genres":["Drama"],"ratings":[{"userId":173,"rating":5},{"userId":195,"rating":5},{"userId":411,"rating":4},{"userId":593,"rating":2},{"userId":629,"rating":3},{"userId":830,"rating":3},{"userId":838,"rating":5},{"userId":850,"rating":4},{"userId":856,"rating":4},{"userId":862,"rating":5},{"userId":889,"rating":1},{"userId":928,"rating":5},{"userId":986,"rating":4},{"userId":1001,"rating":5},{"userId":1069,"rating":3},{"userId":1168,"rating":3},{"userId":1173,"rating":2},{"userId":1242,"rating":3},{"userId":1266,"rating":5},{"userId":1331,"rating":5},{"userId":1417,"rating":5},{"userId":1470,"rating":4},{"userId":1474,"rating":5},{"userId":1615,"rating":3},{"userId":1625,"rating":4},{"userId":1733,"rating":4},{"userId":1799,"rating":4},{"userId":1865,"rating":5},{"userId":1877,"rating":5},{"userId":1897,"rating":5},{"userId":1946,"rating":4},{"userId":2031,"rating":4},{"userId":2129,"rating":2},{"userId":2353,"rating":4},{"userId":2986,"rating":4},{"userId":3940,"rating":4},{"userId":3985,"rating":3},{"userId":4025,"rating":5},{"userId":4727,"rating":3},{"userId":5333,"rating":3}]}还有更多..。
一个JSON对象是一个Movie,它包含一个数组分级。我要计算JSON文件中的所有评级。
我在IntelliJ中创建了一个Maven Proct,它依赖于Hadoop和JSON。我的MapReduce课程是:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import java.io.IOException;
import java.util.Iterator;
public class RatingCounter {
public static class RatingMapper extends Mapper<JSONObject, Text, Text, Text>{
private Text id = new Text();
private Text ratingAnzahl = new Text();
public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException{
JSONParser parser = new JSONParser();
try {
Object obj = parser.parse(value.toString());
JSONObject jsonObject = (JSONObject) obj;
String movieId = (String) jsonObject.get("_id");
int count = 0;
// loop array
JSONArray ratings = (JSONArray) jsonObject.get("ratings");
Iterator<String> iterator = ratings.iterator();
while (iterator.hasNext()) {
count++;
}
} catch (ParseException e) {
e.printStackTrace();
}
}
}
public static class RatingReducer extends Reducer<Text, Text, Text, Text> {
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Text resultValue = new Text();
int allRatings = 0;
while (values.hasNext()){
allRatings += Integer.parseInt(values.toString());
}
resultValue.set(""+allRatings);
context.write(key, resultValue);
}
}
public static void main (String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "ratings count");
job.setJarByClass(RatingCounter.class);
job.setMapperClass(RatingMapper.class);
job.setReducerClass(RatingReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}我不知道如何用Mapper和Reducer编写函数。有人能帮我吗?
发布于 2017-06-12 08:45:53
我对你的地图和减速器做了一些修改。
首先,对于映射器来说,在扩展Mapper类的同时,不需要在任何地方编写输出和语法(可以说)。任何映射器的第一个输入是行的LongWritable (或Object类型)偏移量。您可以注意到下面的更改
public static class RatingMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
public void map(LongWritable key, Text value, Context context) throws IOException, ParseException{
JSONParser parser = new JSONParser();
Object obj = parser.parse(value.toString());
JSONObject jsonObject = (JSONObject) obj;
String movieId = (String) jsonObject.get("_id");
JSONArray ratings = (JSONArray) jsonObject.get("ratings");
context.write(new Text(movieId), new IntWritable(ratings.size()) );
}
}请注意,地图的输出是使用context.write编写的。
现在,进入您的Reducer,一些事情会因为我在映射器中所做的更改而改变。此外,由于您的评级编号将始终是一个整数,您不需要将其转换为Text,使用parseInt,然后再转换为Text。
public static class RatingReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int allRatings = 0;
while (values.hasNext()){
allRatings += value.get();
}
context.write(key, new IntWritable(resultValue));
}
}https://stackoverflow.com/questions/44398273
复制相似问题