瀏覽代碼

HADOOP-1818. Fix MultiFileInputFormat so that it does not return
empty splits. Contributed by Thomas Friol.


git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@574546 13f79535-47bb-0310-9956-ffa450edef68

Enis Soztutar 17 年之前
父節點
當前提交
cc61308137

+ 3 - 0
CHANGES.txt

@@ -108,6 +108,9 @@ Trunk (unreleased changes)
     HADOOP-1853.  Fix contrib/streaming to accept multiple -cacheFile
     options.  (Prachi Gupta via cutting)
 
+    HADOOP-1818. Fix MultiFileInputFormat so that it does not return 
+    empty splits when numPaths < numSplits.  (Thomas Friol via enis)
+
   IMPROVEMENTS
 
     HADOOP-1266. Remove dependency of package org.apache.hadoop.net on 

+ 31 - 24
src/java/org/apache/hadoop/mapred/MultiFileInputFormat.java

@@ -19,6 +19,8 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -42,35 +44,40 @@ public abstract class MultiFileInputFormat<K extends WritableComparable,
   public InputSplit[] getSplits(JobConf job, int numSplits) 
     throws IOException {
     
-    MultiFileSplit[] splits = new MultiFileSplit[numSplits];
     Path[] paths = listPaths(job);
-    long[] lengths = new long[paths.length];
-    long totLength = 0;
-    for(int i=0; i<paths.length; i++) {
-      FileSystem fs = paths[i].getFileSystem(job);
-      lengths[i] = fs.getContentLength(paths[i]);
-      totLength += lengths[i];
-    }
-    float avgLengthPerSplit = ((float)totLength) / numSplits;
-    long cumulativeLength = 0;
+    List<MultiFileSplit> splits = new ArrayList<MultiFileSplit>(Math.min(numSplits, paths.length));
+    if (paths.length != 0) {
+      // HADOOP-1818: Manage splits only if there are paths
+      long[] lengths = new long[paths.length];
+      long totLength = 0;
+      for(int i=0; i<paths.length; i++) {
+        FileSystem fs = paths[i].getFileSystem(job);
+        lengths[i] = fs.getContentLength(paths[i]);
+        totLength += lengths[i];
+      }
+      float avgLengthPerSplit = ((float)totLength) / numSplits;
+      long cumulativeLength = 0;
 
-    int startIndex = 0;
+      int startIndex = 0;
 
-    for(int i=0; i<numSplits; i++) {
-      int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
-          , startIndex, lengths);
-      Path[] splitPaths = new Path[splitSize];
-      long[] splitLengths = new long[splitSize];
-      System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
-      System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
-      splits[i] = new MultiFileSplit(job, splitPaths, splitLengths);
-      startIndex += splitSize;
-      for(long l: splitLengths) {
-        cumulativeLength += l;
+      for(int i=0; i<numSplits; i++) {
+        int splitSize = findSize(i, avgLengthPerSplit, cumulativeLength
+            , startIndex, lengths);
+        if (splitSize != 0) {
+          // HADOOP-1818: Manage split only if split size is not equals to 0
+          Path[] splitPaths = new Path[splitSize];
+          long[] splitLengths = new long[splitSize];
+          System.arraycopy(paths, startIndex, splitPaths , 0, splitSize);
+          System.arraycopy(lengths, startIndex, splitLengths , 0, splitSize);
+          splits.add(new MultiFileSplit(job, splitPaths, splitLengths));
+          startIndex += splitSize;
+          for(long l: splitLengths) {
+            cumulativeLength += l;
+          }
+        }
       }
     }
-    return splits;
-    
+    return splits.toArray(new MultiFileSplit[splits.size()]);    
   }
 
   private int findSize(int splitIndex, float avgLengthPerSplit

+ 19 - 3
src/test/org/apache/hadoop/mapred/TestMultiFileInputFormat.java

@@ -33,6 +33,7 @@ import junit.framework.TestCase;
 public class TestMultiFileInputFormat extends TestCase{
 
   private static JobConf job = new JobConf();
+
   private static final Log LOG = LogFactory.getLog(TestMultiFileInputFormat.class);
   
   private static final int MAX_SPLIT_COUNT  = 10000;
@@ -53,7 +54,7 @@ public class TestMultiFileInputFormat extends TestCase{
     }
   }
   
-  private Path initFiles(FileSystem fs, int numFiles) throws IOException{
+  private Path initFiles(FileSystem fs, int numFiles, int numBytes) throws IOException{
     Path dir = new Path(System.getProperty("test.build.data",".") + "/mapred");
     Path multiFileDir = new Path(dir, "test.multifile");
     fs.delete(multiFileDir);
@@ -62,7 +63,9 @@ public class TestMultiFileInputFormat extends TestCase{
     for(int i=0; i<numFiles ;i++) {
       Path path = new Path(multiFileDir, "file_" + i);
        FSDataOutputStream out = fs.create(path);
-       int numBytes = rand.nextInt(MAX_BYTES);
+       if (numBytes == -1) {
+         numBytes = rand.nextInt(MAX_BYTES);
+       }
        for(int j=0; j< numBytes; j++) {
          out.write(rand.nextInt());
        }
@@ -92,7 +95,7 @@ public class TestMultiFileInputFormat extends TestCase{
     for(int numFiles = 1; numFiles< MAX_NUM_FILES ; 
       numFiles+= (NUM_FILES_INCR / 2) + rand.nextInt(NUM_FILES_INCR / 2)) {
       
-      Path dir = initFiles(fs, numFiles);
+      Path dir = initFiles(fs, numFiles, -1);
       BitSet bits = new BitSet(numFiles);
       for(int i=1;i< MAX_SPLIT_COUNT ;i+= rand.nextInt(SPLIT_COUNT_INCR) + 1) {
         LOG.info("Running for Num Files=" + numFiles + ", split count=" + i);
@@ -121,6 +124,19 @@ public class TestMultiFileInputFormat extends TestCase{
     LOG.info("Test Finished");
   }
   
+  public void testFormatWithLessPathsThanSplits() throws Exception {
+    MultiFileInputFormat format = new DummyMultiFileInputFormat();
+    FileSystem fs = FileSystem.getLocal(job);     
+    
+    // Test with no path
+    initFiles(fs, 0, -1);    
+    assertEquals(0, format.getSplits(job, 2).length);
+    
+    // Test with 2 path and 4 splits
+    initFiles(fs, 2, 500);
+    assertEquals(2, format.getSplits(job, 4).length);
+  }
+  
   public static void main(String[] args) throws Exception{
     TestMultiFileInputFormat test = new TestMultiFileInputFormat();
     test.testFormat();