Browse Source

Merge -c 1424815 from branch-1 to branch-1.1 to fix MAPREDUCE-4888. Fixed NLineInputFormat one-off error which dropped data. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.1@1424817 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 12 years ago
parent
commit
f6d70385ac

+ 3 - 0
CHANGES.txt

@@ -65,6 +65,9 @@ Release 1.1.2 - 2012.12.07
 
     MAPREDUCE-4859. Fixed TestRecoveryManager. (acmurthy) 
 
+    MAPREDUCE-4888. Fixed NLineInputFormat one-off error which dropped data.
+    (vinodkv via acmurthy) 
+
 Release 1.1.1 - 2012.11.18
 
   INCOMPATIBLE CHANGES

+ 19 - 13
src/mapred/org/apache/hadoop/mapred/lib/NLineInputFormat.java

@@ -97,25 +97,14 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text>
           numLines++;
           length += num;
           if (numLines == N) {
-            // NLineInputFormat uses LineRecordReader, which always reads (and
-            // consumes) at least one character out of its upper split
-            // boundary. So to make sure that each mapper gets N lines, we
-            // move back the upper split limits of each split by one character 
-            // here.
-            if (begin == 0) {
-              splits.add(new FileSplit(fileName, begin, length - 1,
-                new String[] {}));
-            } else {
-              splits.add(new FileSplit(fileName, begin - 1, length,
-                new String[] {}));
-            }
+            splits.add(createFileSplit(fileName, begin, length));
             begin += length;
             length = 0;
             numLines = 0;
           }
         }
         if (numLines != 0) {
-          splits.add(new FileSplit(fileName, begin, length, new String[]{}));
+          splits.add(createFileSplit(fileName, begin, length));
         }
    
       } finally {
@@ -127,6 +116,23 @@ public class NLineInputFormat extends FileInputFormat<LongWritable, Text>
     return splits.toArray(new FileSplit[splits.size()]);
   }
 
+  /**
+   * NLineInputFormat uses LineRecordReader, which always reads
+   * (and consumes) at least one character out of its upper split
+   * boundary. So to make sure that each mapper gets N lines, we
+   * move back the upper split limits of each split 
+   * by one character here.
+   * @param fileName  Path of file
+   * @param begin  the position of the first byte in the file to process
+   * @param length  number of bytes in InputSplit
+   * @return  FileSplit
+   */
+  protected static FileSplit createFileSplit(Path fileName, long begin, long length) {
+    return (begin == 0) 
+    ? new FileSplit(fileName, begin, length - 1, new String[] {})
+    : new FileSplit(fileName, begin - 1, length, new String[] {});
+  }
+
   public void configure(JobConf conf) {
     N = conf.getInt("mapred.line.input.format.linespermap", 1);
   }

+ 22 - 10
src/test/org/apache/hadoop/mapred/lib/TestLineInputFormat.java

@@ -48,9 +48,6 @@ public class TestLineInputFormat extends TestCase {
     JobConf job = new JobConf();
     Path file = new Path(workDir, "test.txt");
 
-    int seed = new Random().nextInt();
-    Random random = new Random(seed);
-
     localFs.delete(workDir, true);
     FileInputFormat.setInputPaths(job, workDir);
     int numLinesPerMap = 5;
@@ -58,7 +55,8 @@ public class TestLineInputFormat extends TestCase {
 
     // for a variety of lengths
     for (int length = 0; length < MAX_LENGTH;
-         length += random.nextInt(MAX_LENGTH/10) + 1) {
+         length += 1) {
+      System.out.println("Processing file of length "+length);
       // create a file with length entries
       Writer writer = new OutputStreamWriter(localFs.create(file));
       try {
@@ -69,14 +67,21 @@ public class TestLineInputFormat extends TestCase {
       } finally {
         writer.close();
       }
-      checkFormat(job, numLinesPerMap);
+      int lastN = 0;
+      if (length != 0) {
+        lastN = length % numLinesPerMap;
+        if (lastN == 0) {
+          lastN = numLinesPerMap;
+        }
+      }
+      checkFormat(job, numLinesPerMap, lastN);
     }
   }
 
   // A reporter that does nothing
   private static final Reporter voidReporter = Reporter.NULL;
   
-  void checkFormat(JobConf job, int expectedN) throws IOException{
+  void checkFormat(JobConf job, int expectedN, int lastN) throws IOException{
     NLineInputFormat format = new NLineInputFormat();
     format.configure(job);
     int ignoredNumSplits = 1;
@@ -84,7 +89,8 @@ public class TestLineInputFormat extends TestCase {
 
     // check all splits except last one
     int count = 0;
-    for (int j = 0; j < splits.length -1; j++) {
+    for (int j = 0; j < splits.length; j++) {
+      System.out.println("Processing split "+splits[j]);
       assertEquals("There are no split locations", 0,
                    splits[j].getLocations().length);
       RecordReader<LongWritable, Text> reader =
@@ -102,16 +108,22 @@ public class TestLineInputFormat extends TestCase {
       try {
         count = 0;
         while (reader.next(key, value)) {
+          System.out.println("Got "+key+" "+value+" at count "+count+" of split "+j);
           count++;
         }
       } finally {
         reader.close();
       }
-      assertEquals("number of lines in split is " + expectedN ,
-                   expectedN, count);
+      if ( j == splits.length - 1) {
+        assertEquals("number of lines in split(" + j + ") is wrong" ,
+                     lastN, count);
+      } else {
+        assertEquals("number of lines in split(" + j + ") is wrong" ,
+                     expectedN, count);
+      }
     }
   }
-  
+
   public static void main(String[] args) throws Exception {
     new TestLineInputFormat().testFormat();
   }