(interface), Line RecordReader(class)的关系 FileInputFormat implements InputFormat TextInputFormat extends FileInputFormat TextInputFormat.get RecordReader calls Line RecordReader Line RecordReader (即由LineRecorderReader类来实现 " 将每个s plit解析成records, 再依次将record解析成<K,V>对" ),该方法实现了接口RecordReader public interface RecordReader<K, V> { boolean next(K key, V value) throws IOException; K 类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法, 定义一个InputFormat的核心是定义一个类似于LineRecordReader的,
先让我们确定程序的核心机制: 自定义一个InputFormat 改写RecordReader,实现一次读取一个完整文件封装为KV 在输出时使用SequenceFileOutPutFormat输出合并文件 custom_recordReader = new Custom_RecordReader(); custom_recordReader.initialize(split,context ); return custom_recordReader; } } 自定义RecordReader /** * * RecordReader的核心工作逻辑: * 通过 * 通过getCurrentKey 和 getCurrentValue来返回上面构造好的key和value * * * @author * */ public class Custom_RecordReader extends RecordReader<NullWritable, BytesWritable> { private FileSplit fileSplit; private Configuration
MapReduce框架依赖InputFormat进行输入数据分片以及提供读取分片数据的RecordReader实例对象。 每一个InputFormat类都会有一个对应的RecordReader类, RecordReader类主要作用是将输入数据转换为键值对, 传输给mapper阶段的map方法。 createRecordReader:创建一个具体读取数据并构造key/value键值对的RecordReader实例对象。 RecordReader详解 全称:org.apache.hadoop.mapreduce.RecordReader 方法详解: initialize:根据对应的分片信息进行初始化操作。
MapTask的void runNewMapper(...)中(请先阅读此方法的源代码,以便理解下文),就会创建各种Mapper要用到的参数,包括Mapper、InputFormat、InputSplit、RecordReader 、MapContext,之后会运行: input.initialize(split, mapperContext); // input类型是RecordReader mapper.run(mapperContext Mapper初始化 每个Mapper会被分配到一个Split,在runNewMapper中可以包装成RecordReader。 new WrappedMapper<INKEY, INVALUE, OUTKEY, OUTVALUE>().getMapContext( mapContext); RecordReader 所以最终调用的是RecordReader::nextKeyValue()。
Task的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个Map Task; 2)为Mapper提供输入数据,即给定一个split,(使用其中的RecordReader 对象,实际上是由RecordReader对象将 //split解析成一个个的key/value对儿 public RecordReader<K,V> getRecordReader(InputSplit InputFormat使用的RecordReader将从FileSplit中获取信息,解析FileSplit对象从而获得需要的数据的起始位置、长度和节点位置。 RecordReader 对于getRecordReader(…)方法,它返回一个RecordReader对象,该对象可以讲输入的split分片解析成一个个的key/value对儿 ()获取RecordReader<K,V>对象, //并由RecordReader对象解析其中的input(split)… K1 key = input.createKey(); V1 value
(2)自定义一个类继承RecordReader,实现一次读取一个完整文件,将文件名为key,文件内容为value。 (3)在输出时使用SequenceFileOutPutFormat输出合并文件。 (2)重写createRecordReader(),返回自定义的RecordReader对象 自定义一个类继承RecordReader 在RecordReader中,nextKeyValue() 读取切片的内容封装到bytes作为value */ public class MyInputFormat extends FileInputFormat { @Override public RecordReader boolean isSplitable(JobContext context, Path filename) { return false; } } MyRecordReader.java /* * RecordReader * XXXContext都是Job的上下文,通过XXXContext可以获取Job的配置Configuration对象 */ public class MyRecordReader extends RecordReader
org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader isSplitable(JobContext context, Path filename) { return false; } @Override public RecordReader throws IOException, InterruptedException { return new WholeFileRecordReader(); } } 2)自定义RecordReader–WholeFileRecordReader org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader ,处理一个文件,把这个文件直接读成 一个KV值 */ public class WholeFileRecordReader extends RecordReader<Text, BytesWritable
重写createRecordReader方法,返回自定义RecordReader。 创建自定义RecordReader,继承RecordReader类,并重写方法。
RecordReader。主要用于将一个文件中的数据记录分拆成具体的键值对,传送给Map过程作为键值对输入参数。每一个数据输入格式都有一个默认的RecordReader。 TextInputFormat的默认RecordReader是LineRecordReader,而KeyValueTextInputFormat的默认RecordReader是KeyValueLineRecordReader 当然肯定还有很多数据输入格式和对应的默认RecordReader 这里就不接受了,有需要的可以去官网看看 数据输出格式(OutputFormat)用于描述MapReduce作业的数据输出规范 与数据输入格式中的RecordReader类似,数据输出格式也提供一个对应的RecordWriter,以便系统明确输出结果写入到文件中的具体格式。 当然同样肯定还有很多数据输出格式和对应的默认RecordWriter 对于自定义数据输入格式 可以参考已有的数据输入格式,继承自它即可,只要重写GetRecordReader方法得到一个自己写的RecordReader
network.init(); network.setListeners(new ScoreIterationListener(10)); ImageRecordReader recordReader MultipleEpochsIterator trainIter; log.info("Train model...."); // 用原始图像来训练 recordReader.initialize (trainData, null); dataIter = new RecordReaderDataSetIterator(recordReader, batchSize, 1, numLabels (trainData, transform); dataIter = new RecordReaderDataSetIterator(recordReader, batchSize (testData); dataIter = new RecordReaderDataSetIterator(recordReader, batchSize, 1, numLabels)
说明调用的是RecordReader中的方法,而具体是RecordReader中的哪个实现类呢?继续往下。 4.FileInputFormat 我们在启动类中设置了输入输出路径。 说明nextKeyValue()其实执行的是RecordReader中的nextKeyValue方法。 ? 读取split文件中每行数据的方法。
自定义类 在编写MapReduce的时候,自带的输入格式有时候满足不了我们的需求,这就需要自己定义InputFormat,InputSplit和RecordReader。 org.apache.hadoop.io.IntWritable; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader import org.apache.hadoop.mapreduce.TaskAttemptContext; public class FindMaxValueRecordReader extends RecordReader org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader InputFormat<IntWritable, ArrayWritable>{ public static float [] floatvalues; @Override public RecordReader
自定义的类必须符合MR的Mapper的规范 在MR中,只能处理key-value格式的数据 KEYIN, VALUEIN: mapper输入的k-v类型,由当前Job的InputFormat的RecordReader 封装输入的key-value由RecordReader自动进行,不可自定义。 : mapper输出的k-v类型,可自定义 InputFormat的作用: ①验证输入目录中的文件格式,是否符合当前Job的要求 ②生成切片,每个切片都会交给一个MapTask处理 ③提供RecordReader ,由RR从切片中读取记录,交给Mapper进行处理 方法: List<InputSplit> getSplits: 切片 RecordReader<K,V> createRecordReader: 创建RecordReader 默认hadoop使用的是TextInputFormat,TextInputFormat使用LineRecordReader 在Hadoop中,如果有Reduce阶段。
org.datavec.api.split.FileSplit;import org.datavec.api.split.InputSplit;import org.datavec.api.records.reader.RecordReader "train" : "test");ImageRecordReader recordReader = new ImageRecordReader(height, width, channels, new "/%d.png", 0, numLabels));DataNormalization scaler = new NormalizerStandardize(); scaler.fit(recordReader ); recordReader.setPreProcessor(scaler);DataSetIterator dataIter = new RecordReaderDataSetIterator (recordReader, batchSize, numLabels, numLabels);return dataIter; }}构建CNN模型接下来,我们将使用DL4J构建CNN模型。
com.bolingcavalry.commons.utils.DownloaderUtility; import lombok.extern.slf4j.Slf4j; import org.datavec.api.records.reader.RecordReader int numLinesToSkip = 0; // 分隔符 char delimiter = ','; // CSV读取工具 RecordReader recordReader = new CSVRecordReader(numLinesToSkip,delimiter); // 下载并解压后,得到文件的位置 String DownloaderUtility.IRISDATA.Download(); log.info("鸢尾花数据已下载并解压至 : {}", dataPathLocal); // 读取下载后的文件 recordReader.initialize data sets) // 加载到数据集迭代器中 DataSetIterator iterator = new RecordReaderDataSetIterator(recordReader
熟悉Hadoop/MapReduce的朋友肯定知道它的输入用InputFormat来确定具体的InputSplit和RecordReader。 DataVec也有自己FileSplit和RecordReader,并且对于不同的数据类型(文本、CSV、音频、图像、视频等),有不同的RecordReader,下面是一个图像的例子。 RecordReader是DataVec中的一个类,ImageRecordReader是RecordReader中的一个子类,这样就可以将输入图像转成向量化的带有索引的数据。
splitIndex.getStartOffset()); LOG.info("Processing split: " + split); org.apache.hadoop.mapreduce.RecordReader 解析3源码: static class NewTrackingRecordReader<K,V> extends org.apache.hadoop.mapreduce.RecordReader <K,V> { private final org.apache.hadoop.mapreduce.RecordReader<K,V> real; private final org.apache.hadoop.mapreduce.Counter 源码 public class TextInputFormat extends FileInputFormat<LongWritable, Text> { @Override public RecordReader } 解析4源码: public MapContextImpl(Configuration conf, TaskAttemptID taskid, RecordReader
OrcInputFormat protected OrcInputFormat<OrcStruct> orcInputFormat = new OrcInputFormat(); public RecordReader (conf, reader, start, length); } protected static class TextRecordReaderWrapper implements RecordReader org.apache.hadoop.io.Text, org.apache.hadoop.io.Text> { @SuppressWarnings({ "rawtypes", "unchecked" }) public RecordReader
there are no more records */ Record next() throws IOException; ... } public interface RecordReader 这里简单看下json的JsonPathReader的代码实现,JsonPathReader里 @Override public RecordReader createRecordReader(final JsonPathRowRecordReader(jsonPaths, schema, bufferedIn, logger, dateFormat, timeFormat, timestampFormat); } 核心的RecordReader
输入格式: InputFormat类定义了如何分割和读取输入文件,它提供有下面的几个功能: 选择作为输入的文件或对象; 定义把文件划分到任务的InputSplits; 为RecordReader读取文件提供了一个工厂方法 记录读取器(RecordReader) InputSplit定义了如何切分工作,但是没有描述如何去访问它。 RecordReader类则是实际的用来加载数据并把数据转换为适合mapper读取的键值对。 RecordReader会在输入块上被重复的调用直到整个输入块被处理完毕,每一次调用RecordReader都会调用Mapper的map()方法。 5. RecordWriter:这个跟InputFormat中通过RecordReader读取单个记录的实现很相似,OutputFormat类是RecordWriter对象的工厂方法,用来把单个的记录写到文件中