我对Hadoop和MapReduce编程完全陌生,我正在尝试用公共爬行的数据编写我的第一个MapReduce程序。
我想阅读来自AWS的2015年4月的所有数据。例如,如果我想在命令行中下载2015年4月的所有数据,则需要:
s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-18/segments/1429246633512.41/wat/*.warc.wat.gz s3cmd get
这个命令行可以工作,但我不想下载2015年4月的所有数据,我只想读取所有的"warc.wat.gz“文件(以分析数据)。
我试着创造我的工作,看起来是这样的:
public class FirstJob extends Configured implements Tool {
private static final Logger LOG = Logger.getLogger(FirstJob.class);
/**
* Main entry point that uses the {@link ToolRunner} class to run the Hadoop
* job.
*/
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new FirstJob(), args);
System.out.println("done !!");
System.exit(res);
}
/**
* Builds and runs the Hadoop job.
*
* @return 0 if the Hadoop job completes successfully and 1 otherwise.
*/
public int run(String[] arg0) throws Exception {
Configuration conf = getConf();
//
Job job = new Job(conf);
job.setJarByClass(FirstJob.class);
job.setNumReduceTasks(1);
//String inputPath = "data/*.warc.wat.gz";
String inputPath = "s3n://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2015-18/segments/1429246633512.41/wat/*.warc.wat.gz";
LOG.info("Input path: " + inputPath);
FileInputFormat.addInputPath(job, new Path(inputPath));
String outputPath = "/tmp/cc-firstjob/";
FileSystem fs = FileSystem.newInstance(conf);
if (fs.exists(new Path(outputPath))) {
fs.delete(new Path(outputPath), true);
}
FileOutputFormat.setOutputPath(job, new Path(outputPath));
job.setInputFormatClass(WARCFileInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(FirstJobUrlTypeMap.ServerMapper.class);
job.setReducerClass(LongSumReducer.class);
if (job.waitForCompletion(true)) {
return 0;
} else {
return 1;
}
}但我有个错误:
线程“java.lang.IllegalArgumentException”中的异常: AWS访问密钥ID和秘密访问密钥必须分别指定为s3n URL的用户名或密码,或者通过设置fs.s3n.awsAccessKeyId或fs.s3n.awsSecretAccessKey属性来指定。
我怎样才能解决我的问题?提前谢谢,
发布于 2015-07-10 10:35:50
我解决了我的问题。在代码中,更改:
Configuration conf = getConf();
//
Job job = new Job(conf);至
Configuration conf = new Configuration();
conf.set("fs.s3n.awsAccessKeyId", "your_key");
conf.set("fs.s3n.awsSecretAccessKey", "your_key");
Job job = new Job(conf);https://stackoverflow.com/questions/31287956
复制相似问题