Pārlūkot izejas kodu

MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories in its results. Contributed by Jason Dere

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612400 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 10 gadi atpakaļ
vecāks
revīzija
c2174a5536

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -172,6 +172,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5957. AM throws ClassNotFoundException with job classloader
     enabled if custom output format/committer is used (Sangjin Lee via jlowe)
 
+    MAPREDUCE-5756. CombineFileInputFormat.getSplits() including directories
+    in its results (Jason Dere via jlowe)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/CombineFileInputFormat.java

@@ -579,7 +579,7 @@ public abstract class CombineFileInputFormat<K, V>
         blocks = new OneBlockInfo[0];
       } else {
 
-        if(locations.length == 0) {
+        if(locations.length == 0 && !stat.isDirectory()) {
           locations = new BlockLocation[] { new BlockLocation() };
         }
 

+ 55 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestCombineFileInputFormat.java

@@ -1274,6 +1274,61 @@ public class TestCombineFileInputFormat extends TestCase {
     fileSys.delete(file.getParent(), true);
   }
 
+  /**
+   * Test that directories do not get included as part of getSplits()
+   */
+  @Test
+  public void testGetSplitsWithDirectory() throws Exception {
+    MiniDFSCluster dfs = null;
+    try {
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+          .build();
+      dfs.waitActive();
+
+      dfs = new MiniDFSCluster.Builder(conf).racks(rack1).hosts(hosts1)
+          .build();
+      dfs.waitActive();
+
+      FileSystem fileSys = dfs.getFileSystem();
+
+      // Set up the following directory structure:
+      // /dir1/: directory
+      // /dir1/file: regular file
+      // /dir1/dir2/: directory
+      Path dir1 = new Path("/dir1");
+      Path file = new Path("/dir1/file1");
+      Path dir2 = new Path("/dir1/dir2");
+      if (!fileSys.mkdirs(dir1)) {
+        throw new IOException("Mkdirs failed to create " + dir1.toString());
+      }
+      FSDataOutputStream out = fileSys.create(file);
+      out.write(new byte[0]);
+      out.close();
+      if (!fileSys.mkdirs(dir2)) {
+        throw new IOException("Mkdirs failed to create " + dir2.toString());
+      }
+
+      // split it using a CombinedFile input format
+      DummyInputFormat inFormat = new DummyInputFormat();
+      Job job = Job.getInstance(conf);
+      FileInputFormat.setInputPaths(job, "/dir1");
+      List<InputSplit> splits = inFormat.getSplits(job);
+
+      // directories should be omitted from getSplits() - we should only see file1 and not dir2
+      assertEquals(1, splits.size());
+      CombineFileSplit fileSplit = (CombineFileSplit) splits.get(0);
+      assertEquals(1, fileSplit.getNumPaths());
+      assertEquals(file.getName(), fileSplit.getPath(0).getName());
+      assertEquals(0, fileSplit.getOffset(0));
+      assertEquals(0, fileSplit.getLength(0));
+    } finally {
+      if (dfs != null) {
+        dfs.shutdown();
+      }
+    }
+  }
+
   /**
    * Test when input files are from non-default file systems
    */