Browse Source

HADOOP-3144. Improve robustness of LineRecordReader by defining a maximum
line length (mapred.linerecordreader.maxlength), thereby avoiding reading
too far into the following split. Contributed by Zheng Shao.



git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@653906 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas 17 years ago
parent
commit
f46fefbebf

+ 4 - 0
CHANGES.txt

@@ -85,6 +85,10 @@ Trunk (unreleased changes)
     HADOOP-3345. Enhance the hudson-test-patch target to cleanup messages,
     fix minor defects, and add eclipse plugin and python unit tests. (nigel)
 
+    HADOOP-3144. Improve robustness of LineRecordReader by defining a maximum
+    line length (mapred.linerecordreader.maxlength), thereby avoiding reading
+    too far into the following split. (Zheng Shao via cdouglas)
+
   OPTIMIZATIONS
 
     HADOOP-3274. The default constructor of BytesWritable creates empty 

+ 67 - 11
src/java/org/apache/hadoop/mapred/LineRecordReader.java

@@ -30,16 +30,22 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
 
 /**
  * Treats keys as offset in file and value as line. 
  */
 public class LineRecordReader implements RecordReader<LongWritable, Text> {
+  private static final Log LOG
+    = LogFactory.getLog(LineRecordReader.class.getName());
+
   private CompressionCodecFactory compressionCodecs = null;
   private long start;
   private long pos;
   private long end;
   private LineReader in;
+  int maxLineLength;
 
   /**
    * A class that provides a line reader from an input stream.
@@ -100,15 +106,19 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
     /**
      * Read from the InputStream into the given Text.
      * @param str the object to store the given line
+     * @param maxLineLength the maximum number of bytes to store into str.
+     * @param maxBytesToConsume the maximum number of bytes to consume in this call.
      * @return the number of bytes read including the newline
      * @throws IOException if the underlying stream throws
      */
-    public int readLine(Text str) throws IOException {
+    public int readLine(Text str, int maxLineLength,
+                        int maxBytesToConsume) throws IOException {
       str.clear();
       boolean hadFinalNewline = false;
       boolean hadFinalReturn = false;
       boolean hitEndOfFile = false;
       int startPosn = bufferPosn;
+      long bytesConsumed = 0;
       outerLoop: while (true) {
         if (bufferPosn >= bufferLength) {
           if (!backfill()) {
@@ -125,7 +135,7 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
             break outerLoop;
           case '\r':
             if (hadFinalReturn) {
-              // leave this \n in the stream, so we'll get it next time
+              // leave this \r in the stream, so we'll get it next time
               break outerLoop;
             }
             hadFinalReturn = true;
@@ -136,24 +146,55 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
             }
           }        
         }
+        bytesConsumed += bufferPosn - startPosn;
         int length = bufferPosn - startPosn - (hadFinalReturn ? 1 : 0);
+        length = (int)Math.min(length, maxLineLength - str.getLength());
         if (length >= 0) {
           str.append(buffer, startPosn, length);
         }
+        if (bytesConsumed >= maxBytesToConsume) {
+          return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+        }
       }
       int newlineLength = (hadFinalNewline ? 1 : 0) + (hadFinalReturn ? 1 : 0);
       if (!hitEndOfFile) {
+        bytesConsumed += bufferPosn - startPosn;
         int length = bufferPosn - startPosn - newlineLength;
+        length = (int)Math.min(length, maxLineLength - str.getLength());
         if (length > 0) {
           str.append(buffer, startPosn, length);
         }
       }
-      return str.getLength() + newlineLength;
+      return (int)Math.min(bytesConsumed, (long)Integer.MAX_VALUE);
+    }
+
+    /**
+     * Read from the InputStream into the given Text.
+     * @param str the object to store the given line
+     * @param maxLineLength the maximum number of bytes to store into str.
+     * @return the number of bytes read including the newline
+     * @throws IOException if the underlying stream throws
+     */
+    public int readLine(Text str, int maxLineLength) throws IOException {
+      return readLine(str, maxLineLength, Integer.MAX_VALUE);
+  }
+
+    /**
+     * Read from the InputStream into the given Text.
+     * @param str the object to store the given line
+     * @return the number of bytes read including the newline
+     * @throws IOException if the underlying stream throws
+     */
+    public int readLine(Text str) throws IOException {
+      return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
     }
+
   }
 
   public LineRecordReader(Configuration job, 
                           FileSplit split) throws IOException {
+    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
+                                    Integer.MAX_VALUE);
     start = split.getStart();
     end = start + split.getLength();
     final Path file = split.getPath();
@@ -176,12 +217,15 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
       in = new LineReader(fileIn, job);
     }
     if (skipFirstLine) {  // skip first line and re-establish "start".
-      start += in.readLine(new Text());
+      start += in.readLine(new Text(), 0,
+                           (int)Math.min((long)Integer.MAX_VALUE, end - start));
     }
     this.pos = start;
   }
   
-  public LineRecordReader(InputStream in, long offset, long endOffset) {
+  public LineRecordReader(InputStream in, long offset, long endOffset,
+                          int maxLineLength) {
+    this.maxLineLength = maxLineLength;
     this.in = new LineReader(in, LineReader.DEFAULT_BUFFER_SIZE);
     this.start = offset;
     this.pos = offset;
@@ -191,6 +235,8 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
   public LineRecordReader(InputStream in, long offset, long endOffset, 
                           Configuration job) 
     throws IOException{
+    this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
+                                    Integer.MAX_VALUE);
     this.in = new LineReader(in, job);
     this.start = offset;
     this.pos = offset;
@@ -208,15 +254,25 @@ public class LineRecordReader implements RecordReader<LongWritable, Text> {
   /** Read a line. */
   public synchronized boolean next(LongWritable key, Text value)
     throws IOException {
-    if (pos >= end)
-      return false;
 
-    key.set(pos);           // key is position
-    int newSize = in.readLine(value);
-    if (newSize > 0) {
+    while (pos < end) {
+      key.set(pos);
+
+      int newSize = in.readLine(value, maxLineLength,
+                                Math.max((int)Math.min(Integer.MAX_VALUE, end-pos),
+                                         maxLineLength));
+      if (newSize == 0) {
+        return false;
+      }
       pos += newSize;
-      return true;
+      if (newSize < maxLineLength) {
+        return true;
+      }
+
+      // line too long. try again
+      LOG.info("Skipped line of size " + newSize + " at pos " + (pos - newSize));
     }
+
     return false;
   }
 

+ 18 - 0
src/test/org/apache/hadoop/mapred/TestTextInputFormat.java

@@ -162,6 +162,24 @@ public class TestTextInputFormat extends TestCase {
     assertEquals("end of file", 0, in.readLine(out));
   }
   
+  public void testMaxLineLength() throws Exception {
+    LineReader in = makeStream("a\nbb\n\nccc\rdddd\r\neeeee");
+    Text out = new Text();
+    in.readLine(out, 1);
+    assertEquals("line1 length", 1, out.getLength());
+    in.readLine(out, 1);
+    assertEquals("line2 length", 1, out.getLength());
+    in.readLine(out, 1);
+    assertEquals("line3 length", 0, out.getLength());
+    in.readLine(out, 3);
+    assertEquals("line4 length", 3, out.getLength());
+    in.readLine(out, 10);
+    assertEquals("line5 length", 4, out.getLength());
+    in.readLine(out, 8);
+    assertEquals("line5 length", 5, out.getLength());
+    assertEquals("end of file", 0, in.readLine(out));
+  }
+
   private static void writeFile(FileSystem fs, Path name, 
                                 CompressionCodec codec,
                                 String contents) throws IOException {