Bladeren bron

MAPREDUCE-4470. Fix TestCombineFileInputFormat.testForEmptyFile (ikatsov via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1441494 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves 12 jaren geleden
bovenliggende
commit
86325f69da

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -31,6 +31,9 @@ Release 0.23.7 - UNRELEASED
     CombinefileInputFormat.getSplits() returns 0 split. (Bhallamudi 
     Venkata Siva Kamesh via tgraves)
 
+    MAPREDUCE-4470. Fix TestCombineFileInputFormat.testForEmptyFile (ikatsov 
+    via tgraves)
+
 Release 0.23.6 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java

@@ -499,6 +499,10 @@ public abstract class CombineFileInputFormat<K, V>
         blocks = new OneBlockInfo[0];
       } else {
 
+        if(locations.length == 0) {
+          locations = new BlockLocation[] { new BlockLocation() };
+        }
+
         if (!isSplitable) {
           // if the file is not splitable, just create the one block with
           // full file length

+ 39 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

@@ -33,7 +33,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobContext;
@@ -136,6 +138,43 @@ public class TestFileInputFormat {
     }
   }
 
+  /**
+   * Test when the input file's length is 0.
+   */
+  @Test
+  public void testForEmptyFile() throws Exception {
+      Configuration conf = new Configuration();
+      FileSystem fileSys = FileSystem.get(conf);
+      Path file = new Path("test" + "/file");
+      FSDataOutputStream out = fileSys.create(file, true,
+              conf.getInt("io.file.buffer.size", 4096), (short) 1, (long) 1024);
+      out.write(new byte[0]);
+      out.close();
+
+      // split it using a File input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = Job.getInstance(conf);
+      FileInputFormat.setInputPaths(job, "test");
+      List<InputSplit> splits = inFormat.getSplits(job);
+      assertEquals(1, splits.size());
+      FileSplit fileSplit = (FileSplit) splits.get(0);
+      assertEquals(0, fileSplit.getLocations().length);
+      assertEquals(file.getName(), fileSplit.getPath().getName());
+      assertEquals(0, fileSplit.getStart());
+      assertEquals(0, fileSplit.getLength());
+
+      fileSys.delete(file.getParent(), true);
+  }
+
+  /** Dummy class to extend FileInputFormat*/
+  private class DummyInputFormat extends FileInputFormat<Text, Text> {
+    @Override
+    public RecordReader<Text,Text> createRecordReader(InputSplit split,
+        TaskAttemptContext context) throws IOException {
+      return null;
+    }
+  }
+
   private class FileInputFormatForTest<K, V> extends FileInputFormat<K, V> {
 
     long splitSize;