Pārlūkot izejas kodu

HADOOP-8521. Port StreamInputFormat to new Map Reduce API (madhukara phatak via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1360238 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 13 gadi atpakaļ
vecāks
revīzija
9c87911c4a

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -175,6 +175,9 @@ Trunk (unreleased changes)
     HADOOP-8584. test-patch.sh should not immediately exit when no
     tests are added or modified. (Colin Patrick McCabe via eli)
 
+    HADOOP-8521. Port StreamInputFormat to new Map Reduce API (madhukara
+    phatak via bobby)
+
   OPTIMIZATIONS
 
     HADOOP-7761. Improve the performance of raw comparisons. (todd)

+ 1 - 1
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamBaseRecordReader.java

@@ -126,7 +126,7 @@ public abstract class StreamBaseRecordReader implements RecordReader<Text, Text>
     }
     String unqualSplit = split_.getPath().getName() + ":" +
                          split_.getStart() + "+" + split_.getLength();
-    String status = "HSTR " + StreamUtil.HOST + " " + numRec_ + ". pos=" + pos + " " + unqualSplit
+    String status = "HSTR " + StreamUtil.getHost() + " " + numRec_ + ". pos=" + pos + " " + unqualSplit
       + " Processing record=" + recStr;
     status += " " + splitName_;
     return status;

+ 7 - 2
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamUtil.java

@@ -168,12 +168,17 @@ public class StreamUtil {
   }
 
   static private Environment env;
-  static String HOST;
+  private static String host;
 
+  public static String getHost(){
+    return host;
+  }
+ 
+   
   static {
     try {
       env = new Environment();
-      HOST = env.getHost();
+      host = env.getHost();
     } catch (IOException io) {
       io.printStackTrace();
     }

+ 9 - 9
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/StreamXmlRecordReader.java

@@ -64,7 +64,7 @@ public class StreamXmlRecordReader extends StreamBaseRecordReader {
     init();
   }
 
-  public void init() throws IOException {
+  public final void init() throws IOException {
     LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_=" + end_ + " length_="
              + length_ + " start_ > in_.getPos() =" + (start_ > in_.getPos()) + " " + start_ + " > "
              + in_.getPos());
@@ -185,14 +185,14 @@ public class StreamXmlRecordReader extends StreamBaseRecordReader {
   }
 
   // states
-  final static int CDATA_IN = 10;
-  final static int CDATA_OUT = 11;
-  final static int CDATA_UNK = 12;
-  final static int RECORD_ACCEPT = 13;
+  private final static int CDATA_IN = 10;
+  private final static int CDATA_OUT = 11;
+  private final static int CDATA_UNK = 12;
+  private final static int RECORD_ACCEPT = 13;
   // inputs
-  final static int CDATA_BEGIN = 20;
-  final static int CDATA_END = 21;
-  final static int RECORD_MAYBE = 22;
+  private final static int CDATA_BEGIN = 20;
+  private final static int CDATA_END = 21;
+  private final static int RECORD_MAYBE = 22;
 
   /* also updates firstMatchStart_;*/
   int nextState(int state, int input, int bufPos) {
@@ -293,7 +293,7 @@ public class StreamXmlRecordReader extends StreamBaseRecordReader {
   BufferedInputStream bin_; // Wrap FSDataInputStream for efficient backward seeks 
   long pos_; // Keep track on position with respect encapsulated FSDataInputStream  
 
-  final static int NA = -1;
+  private final static int NA = -1;
   int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.
   int firstMatchEnd_ = 0;
 

+ 153 - 0
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamBaseRecordReader.java

@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.streaming.StreamUtil;
+
+/**
+ * Shared functionality for hadoopStreaming formats. A custom reader can be
+ * defined to be a RecordReader with the constructor below and is selected with
+ * the option bin/hadoopStreaming -inputreader ...
+ * 
+ * @see StreamXmlRecordReader
+ */
+public abstract class StreamBaseRecordReader extends RecordReader<Text, Text> {
+
+  protected static final Log LOG = LogFactory
+      .getLog(StreamBaseRecordReader.class.getName());
+
+  // custom JobConf properties for this class are prefixed with this namespace
+  final static String CONF_NS = "stream.recordreader.";
+
+  public StreamBaseRecordReader(FSDataInputStream in, FileSplit split,
+      TaskAttemptContext context, Configuration conf, FileSystem fs)
+      throws IOException {
+    in_ = in;
+    split_ = split;
+    start_ = split_.getStart();
+    length_ = split_.getLength();
+    end_ = start_ + length_;
+    splitName_ = split_.getPath().getName();
+    this.context_ = context;
+    conf_ = conf;
+    fs_ = fs;
+
+    statusMaxRecordChars_ = conf.getInt(CONF_NS + "statuschars", 200);
+  }
+
+  // / RecordReader API
+
+  /**
+   * Read a record. Implementation should call numRecStats at the end
+   */
+  public abstract boolean next(Text key, Text value) throws IOException;
+
+  /** Returns the current position in the input. */
+  public synchronized long getPos() throws IOException {
+    return in_.getPos();
+  }
+
+  /** Close this to future operations. */
+  public synchronized void close() throws IOException {
+    in_.close();
+  }
+
+  public float getProgress() throws IOException {
+    if (end_ == start_) {
+      return 1.0f;
+    } else {
+      return ((float) (in_.getPos() - start_)) / ((float) (end_ - start_));
+    }
+  }
+
+  public Text createKey() {
+    return new Text();
+  }
+
+  public Text createValue() {
+    return new Text();
+  }
+
+  // / StreamBaseRecordReader API
+
+  /**
+   * Implementation should seek forward in_ to the first byte of the next
+   * record. The initial byte offset in the stream is arbitrary.
+   */
+  public abstract void seekNextRecordBoundary() throws IOException;
+
+  void numRecStats(byte[] record, int start, int len) throws IOException {
+    numRec_++;
+    if (numRec_ == nextStatusRec_) {
+      String recordStr = new String(record, start, Math.min(len,
+          statusMaxRecordChars_), "UTF-8");
+      nextStatusRec_ += 100;// *= 10;
+      String status = getStatus(recordStr);
+      LOG.info(status);
+      context_.setStatus(status);
+    }
+  }
+
+  long lastMem = 0;
+
+  String getStatus(CharSequence record) {
+    long pos = -1;
+    try {
+      pos = getPos();
+    } catch (IOException io) {
+    }
+    String recStr;
+    if (record.length() > statusMaxRecordChars_) {
+      recStr = record.subSequence(0, statusMaxRecordChars_) + "...";
+    } else {
+      recStr = record.toString();
+    }
+    String unqualSplit = split_.getPath().getName() + ":" + split_.getStart()
+        + "+" + split_.getLength();
+    String status = "HSTR " + StreamUtil.getHost() + " " + numRec_ + ". pos="
+        + pos + " " + unqualSplit + " Processing record=" + recStr;
+    status += " " + splitName_;
+    return status;
+  }
+
+  FSDataInputStream in_;
+  FileSplit split_;
+  long start_;
+  long end_;
+  long length_;
+  String splitName_;
+  TaskAttemptContext context_;
+  Configuration conf_;
+  FileSystem fs_;
+  int numRec_ = 0;
+  int nextStatusRec_ = 1;
+  int statusMaxRecordChars_;
+
+}

+ 94 - 0
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamInputFormat.java

@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.mapreduce;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.streaming.StreamUtil;
+
+/**
+ * An input format that selects a RecordReader based on a JobConf property. This
+ * should be used only for non-standard record reader such as
+ * StreamXmlRecordReader. For all other standard record readers, the appropriate
+ * input format classes should be used.
+ */
+public class StreamInputFormat extends KeyValueTextInputFormat {
+
+  @Override
+  public RecordReader<Text, Text> createRecordReader(InputSplit genericSplit,
+      TaskAttemptContext context) throws IOException {
+
+    Configuration conf = context.getConfiguration();
+
+    String c = conf.get("stream.recordreader.class");
+    if (c == null || c.indexOf("LineRecordReader") >= 0) {
+      return super.createRecordReader(genericSplit, context);
+    }
+
+    // handling non-standard record reader (likely StreamXmlRecordReader)
+    FileSplit split = (FileSplit) genericSplit;
+    // LOG.info("getRecordReader start.....split=" + split);
+    context.setStatus(split.toString());
+    context.progress();
+
+    // Open the file and seek to the start of the split
+    FileSystem fs = split.getPath().getFileSystem(conf);
+    FSDataInputStream in = fs.open(split.getPath());
+
+    // Factory dispatch based on available params..
+    Class readerClass;
+
+    {
+      readerClass = StreamUtil.goodClassOrNull(conf, c, null);
+      if (readerClass == null) {
+        throw new RuntimeException("Class not found: " + c);
+      }
+    }
+    Constructor ctor;
+
+    try {
+      ctor = readerClass.getConstructor(new Class[] { FSDataInputStream.class,
+          FileSplit.class, TaskAttemptContext.class, Configuration.class,
+          FileSystem.class });
+    } catch (NoSuchMethodException nsm) {
+      throw new RuntimeException(nsm);
+    }
+
+    RecordReader<Text, Text> reader;
+    try {
+      reader = (RecordReader<Text, Text>) ctor.newInstance(new Object[] { in,
+          split, context, conf, fs });
+    } catch (Exception nsm) {
+      throw new RuntimeException(nsm);
+    }
+    return reader;
+
+  }
+
+}

+ 340 - 0
hadoop-tools/hadoop-streaming/src/main/java/org/apache/hadoop/streaming/mapreduce/StreamXmlRecordReader.java

@@ -0,0 +1,340 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.mapreduce;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.streaming.StreamUtil;
+
+/**
+ * A way to interpret XML fragments as Mapper input records. Values are XML
+ * subtrees delimited by configurable tags. Keys could be the value of a certain
+ * attribute in the XML subtree, but this is left to the stream processor
+ * application.
+ * 
+ * The name-value properties that StreamXmlRecordReader understands are: String
+ * begin (chars marking beginning of record) String end (chars marking end of
+ * record) int maxrec (maximum record size) int lookahead(maximum lookahead to
+ * sync CDATA) boolean slowmatch
+ */
+public class StreamXmlRecordReader extends StreamBaseRecordReader {
+
+  private Text key;
+  private Text value;
+
+  public StreamXmlRecordReader(FSDataInputStream in, FileSplit split,
+      TaskAttemptContext context, Configuration conf, FileSystem fs)
+  throws IOException {
+    super(in, split, context, conf, fs);
+
+    beginMark_ = checkJobGet(CONF_NS + "begin");
+    endMark_ = checkJobGet(CONF_NS + "end");
+
+    maxRecSize_ = conf_.getInt(CONF_NS + "maxrec", 50 * 1000);
+    lookAhead_ = conf_.getInt(CONF_NS + "lookahead", 2 * maxRecSize_);
+    synched_ = false;
+
+    slowMatch_ = conf_.getBoolean(CONF_NS + "slowmatch", false);
+    if (slowMatch_) {
+      beginPat_ = makePatternCDataOrMark(beginMark_);
+      endPat_ = makePatternCDataOrMark(endMark_);
+    }
+    init();
+  }
+
+  public final void init() throws IOException {
+    LOG.info("StreamBaseRecordReader.init: " + " start_=" + start_ + " end_="
+        + end_ + " length_=" + length_ + " start_ > in_.getPos() ="
+        + (start_ > in_.getPos()) + " " + start_ + " > " + in_.getPos());
+    if (start_ > in_.getPos()) {
+      in_.seek(start_);
+    }
+    pos_ = start_;
+    bin_ = new BufferedInputStream(in_);
+    seekNextRecordBoundary();
+  }
+
+  int numNext = 0;
+
+  public synchronized boolean next(Text key, Text value) throws IOException {
+    numNext++;
+    if (pos_ >= end_) {
+      return false;
+    }
+
+    DataOutputBuffer buf = new DataOutputBuffer();
+    if (!readUntilMatchBegin()) {
+      return false;
+    }
+    if (pos_ >= end_ || !readUntilMatchEnd(buf)) {
+      return false;
+    }
+
+    // There is only one elem..key/value splitting is not done here.
+    byte[] record = new byte[buf.getLength()];
+    System.arraycopy(buf.getData(), 0, record, 0, record.length);
+
+    numRecStats(record, 0, record.length);
+
+    key.set(record);
+    value.set("");
+
+    return true;
+  }
+
+  public void seekNextRecordBoundary() throws IOException {
+    readUntilMatchBegin();
+  }
+
+  boolean readUntilMatchBegin() throws IOException {
+    if (slowMatch_) {
+      return slowReadUntilMatch(beginPat_, false, null);
+    } else {
+      return fastReadUntilMatch(beginMark_, false, null);
+    }
+  }
+
+  private boolean readUntilMatchEnd(DataOutputBuffer buf) throws IOException {
+    if (slowMatch_) {
+      return slowReadUntilMatch(endPat_, true, buf);
+    } else {
+      return fastReadUntilMatch(endMark_, true, buf);
+    }
+  }
+
+  private boolean slowReadUntilMatch(Pattern markPattern, boolean includePat,
+      DataOutputBuffer outBufOrNull) throws IOException {
+    byte[] buf = new byte[Math.max(lookAhead_, maxRecSize_)];
+    int read = 0;
+    bin_.mark(Math.max(lookAhead_, maxRecSize_) + 2); // mark to invalidate if
+    // we read more
+    read = bin_.read(buf);
+    if (read == -1)
+      return false;
+
+    String sbuf = new String(buf, 0, read, "UTF-8");
+    Matcher match = markPattern.matcher(sbuf);
+
+    firstMatchStart_ = NA;
+    firstMatchEnd_ = NA;
+    int bufPos = 0;
+    int state = synched_ ? CDATA_OUT : CDATA_UNK;
+    int s = 0;
+
+    while (match.find(bufPos)) {
+      int input;
+      if (match.group(1) != null) {
+        input = CDATA_BEGIN;
+      } else if (match.group(2) != null) {
+        input = CDATA_END;
+        firstMatchStart_ = NA; // |<DOC CDATA[ </DOC> ]]> should keep it
+      } else {
+        input = RECORD_MAYBE;
+      }
+      if (input == RECORD_MAYBE) {
+        if (firstMatchStart_ == NA) {
+          firstMatchStart_ = match.start();
+          firstMatchEnd_ = match.end();
+        }
+      }
+      state = nextState(state, input, match.start());
+      if (state == RECORD_ACCEPT) {
+        break;
+      }
+      bufPos = match.end();
+      s++;
+    }
+    if (state != CDATA_UNK) {
+      synched_ = true;
+    }
+    boolean matched = (firstMatchStart_ != NA)
+    && (state == RECORD_ACCEPT || state == CDATA_UNK);
+    if (matched) {
+      int endPos = includePat ? firstMatchEnd_ : firstMatchStart_;
+      bin_.reset();
+
+      for (long skiplen = endPos; skiplen > 0;) {
+        skiplen -= bin_.skip(skiplen); // Skip succeeds as we have read this
+        // buffer
+      }
+
+      pos_ += endPos;
+      if (outBufOrNull != null) {
+        outBufOrNull.writeBytes(sbuf.substring(0, endPos));
+      }
+    }
+    return matched;
+  }
+
+  // states
+  private final static int CDATA_IN = 10;
+  private final static int CDATA_OUT = 11;
+  private final static int CDATA_UNK = 12;
+  private final static int RECORD_ACCEPT = 13;
+  // inputs
+  private final static int CDATA_BEGIN = 20;
+  private final static int CDATA_END = 21;
+  private final static int RECORD_MAYBE = 22;
+
+  /* also updates firstMatchStart_; */
+  int nextState(int state, int input, int bufPos) {
+    switch (state) {
+    case CDATA_UNK:
+    case CDATA_OUT:
+      switch (input) {
+      case CDATA_BEGIN:
+        return CDATA_IN;
+      case CDATA_END:
+        if (state == CDATA_OUT) {
+          // System.out.println("buggy XML " + bufPos);
+        }
+        return CDATA_OUT;
+      case RECORD_MAYBE:
+        return (state == CDATA_UNK) ? CDATA_UNK : RECORD_ACCEPT;
+      }
+      break;
+    case CDATA_IN:
+      return (input == CDATA_END) ? CDATA_OUT : CDATA_IN;
+    }
+    throw new IllegalStateException(state + " " + input + " " + bufPos + " "
+        + splitName_);
+  }
+
+  Pattern makePatternCDataOrMark(String escapedMark) {
+    StringBuffer pat = new StringBuffer();
+    addGroup(pat, StreamUtil.regexpEscape("CDATA[")); // CDATA_BEGIN
+    addGroup(pat, StreamUtil.regexpEscape("]]>")); // CDATA_END
+    addGroup(pat, escapedMark); // RECORD_MAYBE
+    return Pattern.compile(pat.toString());
+  }
+
+  void addGroup(StringBuffer pat, String escapedGroup) {
+    if (pat.length() > 0) {
+      pat.append("|");
+    }
+    pat.append("(");
+    pat.append(escapedGroup);
+    pat.append(")");
+  }
+
+  boolean fastReadUntilMatch(String textPat, boolean includePat,
+      DataOutputBuffer outBufOrNull) throws IOException {
+    byte[] cpat = textPat.getBytes("UTF-8");
+    int m = 0;
+    boolean match = false;
+    int msup = cpat.length;
+    int LL = 120000 * 10;
+
+    bin_.mark(LL); // large number to invalidate mark
+    while (true) {
+      int b = bin_.read();
+      if (b == -1)
+        break;
+
+      byte c = (byte) b; // this assumes eight-bit matching. OK with UTF-8
+      if (c == cpat[m]) {
+        m++;
+        if (m == msup) {
+          match = true;
+          break;
+        }
+      } else {
+        bin_.mark(LL); // rest mark so we could jump back if we found a match
+        if (outBufOrNull != null) {
+          outBufOrNull.write(cpat, 0, m);
+          outBufOrNull.write(c);
+        }
+        pos_ += m + 1; // skip m chars, +1 for 'c'
+        m = 0;
+      }
+    }
+    if (!includePat && match) {
+      bin_.reset();
+    } else if (outBufOrNull != null) {
+      outBufOrNull.write(cpat);
+      pos_ += msup;
+    }
+    return match;
+  }
+
+  String checkJobGet(String prop) throws IOException {
+    String val = conf_.get(prop);
+    if (val == null) {
+      throw new IOException("JobConf: missing required property: " + prop);
+    }
+    return val;
+  }
+
+  String beginMark_;
+  String endMark_;
+
+  Pattern beginPat_;
+  Pattern endPat_;
+
+  boolean slowMatch_;
+  int lookAhead_; // bytes to read to try to synch CDATA/non-CDATA. Should be
+  // more than max record size
+  int maxRecSize_;
+
+  BufferedInputStream bin_; // Wrap FSDataInputStream for efficient backward
+  // seeks
+  long pos_; // Keep track on position with respect encapsulated
+  // FSDataInputStream
+
+  private final static int NA = -1;
+  int firstMatchStart_ = 0; // candidate record boundary. Might just be CDATA.
+  int firstMatchEnd_ = 0;
+
+  boolean synched_;
+
+  @Override
+  public Text getCurrentKey() throws IOException, InterruptedException {
+    return key;
+  }
+
+  @Override
+  public Text getCurrentValue() throws IOException, InterruptedException {
+    return value;
+  }
+
+  @Override
+  public void initialize(InputSplit arg0, TaskAttemptContext arg1)
+  throws IOException, InterruptedException {
+
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    key = createKey();
+    value = createValue();
+    return next(key, value);
+  }
+
+}

+ 144 - 0
hadoop-tools/hadoop-streaming/src/test/java/org/apache/hadoop/streaming/mapreduce/TestStreamXmlRecordReader.java

@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.streaming.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This class tests StreamXmlRecordReader The test creates an XML file, uses
+ * StreamXmlRecordReader and compares the expected output against the generated
+ * output
+ */
+public class TestStreamXmlRecordReader {
+
+  private File INPUT_FILE;
+  private String input;
+  private String outputExpect;
+  Path OUTPUT_DIR;
+  FileSystem fs;
+
+  public TestStreamXmlRecordReader() throws IOException {
+    INPUT_FILE = new File("target/input.xml");
+    input = "<xmltag>\t\nroses.are.red\t\nviolets.are.blue\t\n"
+        + "bunnies.are.pink\t\n</xmltag>\t\n";
+    outputExpect = input;
+  }
+
+  protected void assertOutput(String expectedOutput, String output)
+      throws IOException {
+    String[] words = expectedOutput.split("\t\n");
+    Set<String> expectedWords = new HashSet<String>(Arrays.asList(words));
+    words = output.split("\t\n");
+    Set<String> returnedWords = new HashSet<String>(Arrays.asList(words));
+    assertTrue(returnedWords.containsAll(expectedWords));
+  }
+
+  protected void checkOutput() throws IOException {
+    File outFile = new File(OUTPUT_DIR.toString());
+    Path outPath = new Path(outFile.getAbsolutePath(), "part-r-00000");
+    String output = slurpHadoop(outPath, fs);
+    fs.delete(outPath, true);
+    outputExpect = "<PATTERN>\n" + outputExpect + "</PATTERN>";
+    System.err.println("outEx1=" + outputExpect);
+    System.err.println("  out1=" + output);
+    assertOutput(outputExpect, output);
+  }
+
+  private String slurpHadoop(Path p, FileSystem fs) throws IOException {
+    int len = (int) fs.getFileStatus(p).getLen();
+    byte[] buf = new byte[len];
+    FSDataInputStream in = fs.open(p);
+    String contents = null;
+    try {
+      in.readFully(in.getPos(), buf);
+      contents = new String(buf, "UTF-8");
+    } finally {
+      in.close();
+    }
+    return contents;
+  }
+
+  @Before
+  public void createInput() throws IOException {
+    FileOutputStream out = new FileOutputStream(INPUT_FILE.getAbsoluteFile());
+    String dummyXmlStartTag = "<PATTERN>\n";
+    String dummyXmlEndTag = "</PATTERN>\n";
+    out.write(dummyXmlStartTag.getBytes("UTF-8"));
+    out.write(input.getBytes("UTF-8"));
+    out.write(dummyXmlEndTag.getBytes("UTF-8"));
+    out.close();
+  }
+
+  @Test
+  public void testStreamXmlRecordReader() throws Exception {
+
+    Job job = new Job();
+    Configuration conf = job.getConfiguration();
+    job.setJarByClass(TestStreamXmlRecordReader.class);
+    job.setMapperClass(Mapper.class);
+    conf.set("stream.recordreader.class",
+        "org.apache.hadoop.streaming.mapreduce.StreamXmlRecordReader");
+    conf.set("stream.recordreader.begin", "<PATTERN>");
+    conf.set("stream.recordreader.end", "</PATTERN>");
+    job.setInputFormatClass(StreamInputFormat.class);
+    job.setMapOutputKeyClass(Text.class);
+    job.setMapOutputValueClass(Text.class);
+    job.setOutputKeyClass(Text.class);
+    job.setOutputValueClass(Text.class);
+    FileInputFormat.addInputPath(job, new Path("target/input.xml"));
+    OUTPUT_DIR = new Path("target/output");
+    fs = FileSystem.get(conf);
+    if (fs.exists(OUTPUT_DIR)) {
+      fs.delete(OUTPUT_DIR, true);
+    }
+    FileOutputFormat.setOutputPath(job, OUTPUT_DIR);
+    boolean ret = job.waitForCompletion(true);
+
+    assertEquals(true, ret);
+    checkOutput();
+
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    fs.delete(OUTPUT_DIR, true);
+  }
+
+}