MapReduce Custom Input Formats - Reading Paragraphs as Input Records
If you are working on Hadoop MapReduce or Using AWS EMR then there might be an usecase where input files consistent a paragraph as key-value record instead of a single line (think about scenarios like analyzing comments of news articles). So instead of processing a single line as input if you need to process a complete paragraph at once as a single record then how will you achieve it in MapReduce?.
In order to do this, we will need to customize the default behavior of TextInputFormat i.e. to read each line by default into reading a complete paragraph as one input key-value pair for further processing in MapReduce jobs.
This requires us to to create a custom record reader which can be done by implementing the class RecordReader. The next() method is where you would tell the record reader to fetch a paragraph instead of one line. See the following implementation, it's self-explanatory:
public class ParagraphRecordReader implements RecordReader<LongWritable, Text> { private LineRecordReader lineRecord; private LongWritable lineKey; private Text lineValue; public ParagraphRecordReader(JobConf conf, FileSplit split) throws IOException { lineRecord = new LineRecordReader(conf, split); lineKey = lineRecord.createKey(); lineValue = lineRecord.createValue(); } @Override public void close() throws IOException { lineRecord.close(); } @Override public LongWritable createKey() { return new LongWritable(); } @Override public Text createValue() { return new Text(""); } @Override public float getProgress() throws IOException { return lineRecord.getPos(); } @Override public synchronized boolean next(LongWritable key, Text value) throws IOException { boolean appended, isNextLineAvailable; boolean retval; byte space[] = {' '}; value.clear(); isNextLineAvailable = false; do { appended = false; retval = lineRecord.next(lineKey, lineValue); if (retval) { if (lineValue.toString().length() > 0) { byte[] rawline = lineValue.getBytes(); int rawlinelen = lineValue.getLength(); value.append(rawline, 0, rawlinelen); value.append(space, 0, 1); appended = true; } isNextLineAvailable = true; } } while (appended); return isNextLineAvailable; } @Override public long getPos() throws IOException { return lineRecord.getPos(); } }
With a ParagraphRecordReader implementation, we would need to extend TextInputFormat to create a custom InputFomat by just overriding the getRecordReader method and return an object of ParagraphRecordReader to override default behavior.
Our new class, ParagrapghInputFormat will have below implementation:
public class ParagrapghInputFormat extends TextInputFormat { @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf conf, Reporter reporter)throws IOException { reporter.setStatus(split.toString()); return new ParagraphRecordReader(conf, (FileSplit)split); } }
Another change is to ensure that the job configuration to use our custom input format implementation for reading data into MapReduce jobs. It will be as simple as setting up inputformat type to ParagraphInputFormat as show below:
conf.setInputFormat(ParagraphInputFormat.class);
With above changes, we have a required implementation to support reading paragraphs as input records into MapReduce programs.
To have a clear perspective of what we achieved above, let's assume that input file is as follows with paragraphs:
This is a good article sharing an useful perspective on customizing default behavior. You could use highlight blocks for showing code.
And a simple mapper code would look like:
@Override public void map(LongWritable key, Text value, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { System.out.println(key+" : "+value); }
This mapper output in console as follows:
0 : This is a good article sharing an useful perspective on customizing default behavior
0 : You could use highlight blocks for showing code.
Hope this helps you in extending default input format behaviors in MapReduce. If you like what we do then connect us with at cloud [at] minjar [dot] com, we are always hiring smart people :)
Author: This blog post is contributed by Amarkant, TechLead - BigData team at Minjar.