瀏覽代碼

MAPREDUCE-7086. Add config to allow FileInputFormat to ignore directories when recursive=false. Contributed by Sergey Shelukhin

(cherry picked from commit 68c6ec719da8e79ada31c8f3a82124f90b9a71fd)
Jason Lowe 7 年之前
父節點
當前提交
daa739fa13

+ 18 - 7
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -78,10 +78,13 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
 
   public static final String NUM_INPUT_FILES =
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
-  
+
   public static final String INPUT_DIR_RECURSIVE = 
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
 
+  public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS =
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS;
+
 
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
@@ -319,16 +322,24 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
   public InputSplit[] getSplits(JobConf job, int numSplits)
     throws IOException {
     StopWatch sw = new StopWatch().start();
-    FileStatus[] files = listStatus(job);
-    
+    FileStatus[] stats = listStatus(job);
+
     // Save the number of input files for metrics/loadgen
-    job.setLong(NUM_INPUT_FILES, files.length);
+    job.setLong(NUM_INPUT_FILES, stats.length);
     long totalSize = 0;                           // compute total size
-    for (FileStatus file: files) {                // check we have valid files
+    boolean ignoreDirs = !job.getBoolean(INPUT_DIR_RECURSIVE, false)
+      && job.getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
+
+    List<FileStatus> files = new ArrayList<>(stats.length);
+    for (FileStatus file: stats) {                // check we have valid files
       if (file.isDirectory()) {
-        throw new IOException("Not a file: "+ file.getPath());
+        if (!ignoreDirs) {
+          throw new IOException("Not a file: "+ file.getPath());
+        }
+      } else {
+        files.add(file);
+        totalSize += file.getLen();
       }
-      totalSize += file.getLen();
     }
 
     long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);

+ 8 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

@@ -76,6 +76,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     "mapreduce.input.fileinputformat.numinputfiles";
   public static final String INPUT_DIR_RECURSIVE =
     "mapreduce.input.fileinputformat.input.dir.recursive";
+  public static final String INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS =
+    "mapreduce.input.fileinputformat.input.dir.nonrecursive.ignore.subdirs";
   public static final String LIST_STATUS_NUM_THREADS =
       "mapreduce.input.fileinputformat.list-status.num-threads";
   public static final int DEFAULT_LIST_STATUS_NUM_THREADS = 1;
@@ -392,7 +394,13 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     // generate splits
     List<InputSplit> splits = new ArrayList<InputSplit>();
     List<FileStatus> files = listStatus(job);
+
+    boolean ignoreDirs = !getInputDirRecursive(job)
+      && job.getConfiguration().getBoolean(INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, false);
     for (FileStatus file: files) {
+      if (ignoreDirs && file.isDirectory()) {
+        continue;
+      }
       Path path = file.getPath();
       long length = file.getLen();
       if (length != 0) {

+ 16 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java

@@ -102,7 +102,22 @@ public class TestFileInputFormat {
         1, mockFs.numListLocatedStatusCalls);
     FileSystem.closeAll();
   }
-  
+
+  @Test
+  public void testIgnoreDirs() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true);
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+    conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, "test:///a1");
+    MockFileSystem mockFs = (MockFileSystem) new Path("test:///").getFileSystem(conf);
+    JobConf job = new JobConf(conf);
+    TextInputFormat fileInputFormat = new TextInputFormat();
+    fileInputFormat.configure(job);
+    InputSplit[] splits = fileInputFormat.getSplits(job, 1);
+    Assert.assertEquals("Input splits are not correct", 1, splits.length);
+    FileSystem.closeAll();
+  }
+
   @Test
   public void testSplitLocationInfo() throws Exception {
     Configuration conf = getConfiguration();

+ 12 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

@@ -123,6 +123,18 @@ public class TestFileInputFormat {
     verifySplits(Lists.newArrayList("test:/a1/a2", "test:/a1/file1"), splits);
   }
 
+  @Test
+  public void testNumInputFilesIgnoreDirs() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.setInt(FileInputFormat.LIST_STATUS_NUM_THREADS, numThreads);
+    conf.setBoolean(FileInputFormat.INPUT_DIR_NONRECURSIVE_IGNORE_SUBDIRS, true);
+    Job job = Job.getInstance(conf);
+    FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+    List<InputSplit> splits = fileInputFormat.getSplits(job);
+    Assert.assertEquals("Input splits are not correct", 1, splits.size());
+    verifySplits(Lists.newArrayList("test:/a1/file1"), splits);
+  }
+
   @Test
   public void testListLocatedStatus() throws Exception {
     Configuration conf = getConfiguration();