Categories

Custom XML Record Reader for Hadoop

One of the common formats for the Map Reduce input files is XML. Unfortunately Hadoop does not provide a standard XML reader. A blog post [1] provides a good example on writing one, but it suffers from several shortcomings:
• It is a single tag, while in real life a user can be interested in getting XML snippets for multiple tags.
• It does not support “no closing tag” XML syntax. In XML a properly formatted snippet can be in two different forms
<foo> ………………</foo>
Or

<foo …………………/>

Both are valid XML and while the code supports the former, it does not support the latter.
• It does not support zipped/gzipped files. Compression is a common technique to reduce a file size, and although there are a few reasons not to use zipped files for Map Reduce processing (files can’t be split in this case) compressed files are still commonly used in the case of multi file Map Reduce jobs.
• There is some times desirable to process only a starting tag. A typical case is a starting tag of the document containing document’s metadata in the form of attributes. If this snippet is processed as a whole, then it will subsume the entire document and other tags will not be found.
Our implementation, presented here, overcomes all of these shortcomings.
Implementation
The implementation consists of the 4 main classes – XMLInputFormat, XMLReader, XMLIngester and KeyString. We will describe each of the classes in details below.
XMLInputFormat
This class (Listing 1) extends FileInputFormat and implements two main methods:
• CreateRecordReader is responsible for creation of a new XMLInputReader
• IsSplittable is responsible to check whether the file is zipped and based on this decides whether it can be split.

package com.navteq.hadoop;
 import java.io.IOException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.RecordReader;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 public class XMLInputFormat extends FileInputFormat {
@Override
 public RecordReader createRecordReader(
     InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {
      return new XMLReader();
 }
 @Override
 protected boolean isSplitable(JobContext context, Path filename) {
 CompressionCodec codec =
 new CompressionCodecFactory(context.getConfiguration()).getCodec(filename);
 return codec == null;
 }
}

Listing 1 XMLInputFormatClass
XMLReader
XMLReader class (Listing 2) extends Record reader class, provided by Hadoop framework:

package com.navteq.hadoop;

import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

public class XMLReader extends RecordReader {

public static final String ELEMENT_NAME = “elementName”;
// private DataInputStream _fileIn;
private InputStream _fileIn;
private long _start;
private long _end;
private long _current;
private boolean _eof = false;
private boolean _firstOnly = false;
private KeyString[] _keyString;
private Text _key;
private String _value;
private byte[][] _startTag;
private byte[][] _endTag;
private int[] _matchingStartTag;
private int[] _matchingEndTag;

@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext context) throws IOException, InterruptedException {

FileSplit split = (FileSplit) inputSplit;
Configuration configuration = context.getConfiguration();
String key = configuration.get(ELEMENT_NAME);

long s = split.getStart();
long e = s + split.getLength();
final Path file = split.getPath();
CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(configuration);
final CompressionCodec codec = compressionCodecs.getCodec(file);

// open the file and seek to the start of the split
final FileSystem fs = file.getFileSystem(configuration);
FSDataInputStream fileIn = fs.open(file);
if (codec != null) {
init(0, Long.MAX_VALUE, key, codec.createInputStream(fileIn));
} else {
init(s, e, key, fileIn);
}
}

@Override
public void close() throws IOException {

if (_fileIn != null) {
_fileIn.close();
}
}

@Override
public Text getCurrentKey() throws IOException, InterruptedException {

return _key;
}

@Override
public Text getCurrentValue() throws IOException, InterruptedException {

return new Text(_value);
}

@Override
public float getProgress() throws IOException, InterruptedException {

if (_start == _end) {
return 0.0f;
} else {
return Math.min(1.0f, (_current – _start) / (float) (_end – _start));
}
}

@Override
public boolean nextKeyValue() throws IOException, InterruptedException {

if ((_eof) || (_current & gt;
= _end
))
return false;

_firstOnly = false;
if (readUntilMatch(_startTag, _matchingStartTag, false)) {
return true;
}

if (_eof) {
return false;
}

String endtag = “”;
if (_firstOnly) {
_value += endtag;
return true;
}

_endTag[0] = endtag.getBytes();
readUntilMatch(_endTag, _matchingEndTag, true);
if (_eof) {
return false;
}
return true;
}

private boolean readUntilMatch(byte[][] match, int[] matchingTag, boolean withinBlock) throws IOException {

char currChar = ‘ ‘, prevChar = ‘ ‘;
boolean matched = false;
for (int i = 0; i & lt; matchingTag.length;
i++
)
matchingTag[i] = 0;
for (;; _current++) {
try {
int rv = _fileIn.read();
if (rv & lt;
0

){
// end of file:
_eof = true;
return false;
}
currChar = (char) rv;
} catch (Exception e) {
// end of file:
_eof = true;
return false;
}

// Save the value
if (withinBlock || matched) {
_value += currChar;
}

if (!matched) {
for (int j = 0; j = match[j].length) {
matched = true;
if (withinBlock) {
return false;
}
long st = _current – match[j].length;
if (st & gt;
_end

){
// end of block:
_eof = true;
return false;
}
_value = new String(match[j]);
if (j & gt;
= _keyString.length
)
j -= _keyString.length;
_key = new Text(_keyString[j].getString());
_firstOnly = _keyString[j].isFirstOnly();
if (_value.endsWith(“>”)) {
return false;
}
break;
}
} else {
matchingTag[j] = 0;
}
}
prevChar = currChar;
continue;
}
// We are matched, go till the end of the tag

if(currChar
==
‘>’){

if(!withinBlock
&amp ;

&amp ;

(prevChar
== ‘/’))
return true;

return false;
}
prevChar = currChar;
}
}

private void init(long s, long e, String keys, InputStream in) throws IOException {

_start = s;
_end = e;
_current = _start;
StringTokenizer st = new StringTokenizer(keys, “,”);
int kLen = st.countTokens();
_keyString = new KeyString[kLen];
_startTag = new byte [2*kLen][];
_endTag = new byte [1][];
for (int i = 0; i < kLen; i++) {
String str = st.nextToken();
boolean firstOnly = false;
if(str.startsWith(“+”)){
firstOnly = true;
str = str.substring(1);
}
_keyString[i] = new KeyString(str, firstOnly);
_startTag[i] = (“”).getBytes();
_startTag[kLen + i] = (” 0) {
if (_fileIn instanceof FSDataInputStream) {
((FSDataInputStream) _fileIn).seek(_start);
} else {
_fileIn.skip(_start);
}
}
}

// This is a test method
public static void main(String[] args) throws Exception {

// Parameters
// args[0] – file
// args[1] – tag names; comma separated
// args[2] – start
// args[3] – end

DataInputStream in = new DataInputStream(new FileInputStream(new File(args[0])));
long start = Long.parseLong(args[2]);
long end = Long.parseLong(args[3]);
XMLReader reader = new XMLReader();
reader.init(start, end, args[1], in);
Map occurences = new HashMap();
while(reader.nextKeyValue()){
String key = reader.getCurrentKey().toString();
System.out.println(“key ” + key);
Integer count = occurences.get(key);
if(count == null)
count = 1;
else
count++;
occurences.put(key, count);
}
for(Map.Entry entry: occurences.entrySet())
System.out.println(“key ” + entry.getKey() + ” found ” + entry.getValue() + ” times”);

}
}

Listing 3 KeyString class
The close method is very simple – it just closes the input stream.
Get current key and get current value methods just return corresponding values pre-calculated by the nextKeyValue method.
Get progress method calculates map progress to this point based on the amount of bytes read to this point
NextKeyValue method is trying to prepare values for next invocations of get key and get value methods. It returns true if these values are prepared and false otherwise. This method first checks if we need to continue (end of file has been detected or we have advanced pass the end byte) and return false if we do not. It then tries to find one of the specified opening tags using readUntilMatch method. If this method returns true, meaning that complete tag is found, it will return true. If the end of file flag was set during the tag search, the method will return false otherwise it will set up a closing tag . It will them check if for the given tag we need only an opening tag and if it is the case, it will add a closing tag to it and return true. Finally it uses readUntilMatch method to find closing tag and either return false, if end of file flag is set during search or true otherwise.
The reader’s workhorse is readUntilMatch method, which is reading an input stream trying to find one of the requested opening tags. It take a list of byte arrays of tag values, search indexes array and a flag, specifying whether search is for opening or closing tag. The method is implemented in a form of an infinite loop, starting by reading next character from the input stream. If end of file is reached during read, then end of file flag is set and the value of false is return. Otherwise, if we are looking for a closing tag or we already have a match, we will add new character to a value string. Then code looks whether match has already occurred, and if it did not, it tries to match this new character to one of the requested arrays. If it does not match to the value in array, based on index, then index for this array is set to 0. If there is a match, an index for a given array is incremented, and if its value reaches tags length, then we consider a tag to be matched. Matched behavior is different whether we are searching for opening or closing tag. In the case of closing tag, once match is found, we are done and can return (false). In the case of opening tag we first need to check whether it starts within the current block. If it does not we set end of file to true and return false. Otherwise we store a value of a found tag in a key and start building a value. If the matched string ends with “>” we return false, otherwise we continue reading from the input stream until we encounter “>”. Once it is reached, we check whether the previous character was “/”, and if it was, than we return true, otherwise we return false.
Additionally the class implements main method, which can be used for testing the class.
XMLIngester
This class (Listing 4) extends Mapper class provided by Hadoop framework.

package com.navteq.hadoop;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class XMLIngester extends Mapper {

@Override
protected void setup(Context context){

// Any Setup goes here
}

@Override
public void map(Text key, Text value, Context context){

String tag = key.toString();
String fragment = value.toString();

// Processing goes here
}

@Override
protected void cleanup(Context context){

// Cleanup here
}

}

Listing 4 XMLIngester class
Here we are showing only skeleton implementation of the class, describing what goes into any method. The actual implementation depends on what actually has to be done during the Map phase.
References
1. Xml Processing in Hadoop. http://xmlandhadoop.blogspot.com/

Comments

One Comment so far. Leave a comment below.
  1. andresdigi25,

    Hi
    in your code how works the input split?
    it split the xml by line?
    I have a similar project but i must split htmls, but i have problems with the input split part
    Thanks

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Connecting to %s

Follow

Get every new post delivered to your Inbox.

%d bloggers like this: