Переглянути джерело

svn merge -c 1499125 FIXES: MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the input path dir. Contributed by Devaraj K

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1499131 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 роки тому
батько
коміт
46d99e05d6

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -12,6 +12,9 @@ Release 0.23.10 - UNRELEASED
 
   BUG FIXES
 
+    MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
+    input path dir (Devaraj K via jlowe)
+
 Release 0.23.9 - 2013-07-08
 
   INCOMPATIBLE CHANGES

+ 5 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -64,6 +64,10 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
 
   public static final String NUM_INPUT_FILES =
     org.apache.hadoop.mapreduce.lib.input.FileInputFormat.NUM_INPUT_FILES;
+  
+  public static final String INPUT_DIR_RECURSIVE = 
+    org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR_RECURSIVE;
+
 
   private static final double SPLIT_SLOP = 1.1;   // 10% slop
 
@@ -187,7 +191,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
     TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, job);
     
     // Whether we need to recursive look into the directory structure
-    boolean recursive = job.getBoolean("mapred.input.dir.recursive", false);
+    boolean recursive = job.getBoolean(INPUT_DIR_RECURSIVE, false);
     
     List<FileStatus> result = new ArrayList<FileStatus>();
     List<IOException> errors = new ArrayList<IOException>();

+ 56 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java

@@ -64,6 +64,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     "mapreduce.input.pathFilter.class";
   public static final String NUM_INPUT_FILES =
     "mapreduce.input.fileinputformat.numinputfiles";
+  public static final String INPUT_DIR_RECURSIVE =
+    "mapreduce.input.fileinputformat.input.dir.recursive";
 
   private static final Log LOG = LogFactory.getLog(FileInputFormat.class);
 
@@ -97,6 +99,27 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
       return true;
     }
   }
+  
+  /**
+   * @param job
+   *          the job to modify
+   * @param inputDirRecursive
+   */
+  public static void setInputDirRecursive(Job job,
+      boolean inputDirRecursive) {
+    job.getConfiguration().setBoolean(INPUT_DIR_RECURSIVE,
+        inputDirRecursive);
+  }
+ 
+  /**
+   * @param job
+   *          the job to look at.
+   * @return should the files to be read recursively?
+   */
+  public static boolean getInputDirRecursive(JobContext job) {
+    return job.getConfiguration().getBoolean(INPUT_DIR_RECURSIVE,
+        false);
+  }
 
   /**
    * Get the lower bound on split size imposed by the format.
@@ -205,6 +228,9 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     TokenCache.obtainTokensForNamenodes(job.getCredentials(), dirs, 
                                         job.getConfiguration());
 
+    // Whether we need to recursive look into the directory structure
+    boolean recursive = getInputDirRecursive(job);
+    
     List<IOException> errors = new ArrayList<IOException>();
     
     // creates a MultiPathFilter with the hiddenFileFilter and the
@@ -230,7 +256,11 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
           if (globStat.isDirectory()) {
             for(FileStatus stat: fs.listStatus(globStat.getPath(),
                 inputFilter)) {
-              result.add(stat);
+              if (recursive && stat.isDirectory()) {
+                addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+              } else {
+                result.add(stat);
+              }
             }          
           } else {
             result.add(globStat);
@@ -246,6 +276,31 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
     return result;
   }
   
+  /**
+   * Add files in the input path recursively into the results.
+   * @param result
+   *          The List to store all files.
+   * @param fs
+   *          The FileSystem.
+   * @param path
+   *          The input path.
+   * @param inputFilter
+   *          The input filter that can be used to filter files/dirs. 
+   * @throws IOException
+   */
+  protected void addInputPathRecursively(List<FileStatus> result,
+      FileSystem fs, Path path, PathFilter inputFilter) 
+      throws IOException {
+    for(FileStatus stat: fs.listStatus(path, inputFilter)) {
+      if (stat.isDirectory()) {
+        addInputPathRecursively(result, fs, stat.getPath(), inputFilter);
+      } else {
+        result.add(stat);
+      }
+    }          
+  }
+  
+  
   /**
    * A factory that makes the split for this class. It can be overridden
    * by sub-classes to make sub-types

+ 3 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/util/ConfigUtil.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
 
@@ -523,6 +524,8 @@ public class ConfigUtil {
       MRJobConfig.MAPREDUCE_JOB_USER_CLASSPATH_FIRST);
     Configuration.addDeprecation(JTConfig.JT_MAX_JOB_SPLIT_METAINFO_SIZE,
         MRJobConfig.SPLIT_METAINFO_MAXSIZE);
+    Configuration.addDeprecation("mapred.input.dir.recursive",
+        FileInputFormat.INPUT_DIR_RECURSIVE);
   }
 }
 

+ 120 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.lib.input;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.List;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.junit.Test;
+
+public class TestFileInputFormat {
+
+  @Test
+  public void testNumInputFilesRecursively() throws Exception {
+    Configuration conf = getConfiguration();
+    conf.set(FileInputFormat.INPUT_DIR_RECURSIVE, "true");
+    Job job = Job.getInstance(conf);
+    FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+    List<InputSplit> splits = fileInputFormat.getSplits(job);
+    Assert.assertEquals("Input splits are not correct", 3, splits.size());
+    Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
+        .getPath().toString());
+    Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
+        .getPath().toString());
+    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
+        .toString());
+    
+    // Using the deprecated configuration
+    conf = getConfiguration();
+    conf.set("mapred.input.dir.recursive", "true");
+    job = Job.getInstance(conf);
+    splits = fileInputFormat.getSplits(job);
+    Assert.assertEquals("Input splits are not correct", 3, splits.size());
+    Assert.assertEquals("test:/a1/a2/file2", ((FileSplit) splits.get(0))
+        .getPath().toString());
+    Assert.assertEquals("test:/a1/a2/file3", ((FileSplit) splits.get(1))
+        .getPath().toString());
+    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(2)).getPath()
+        .toString());
+  }
+
+  @Test
+  public void testNumInputFilesWithoutRecursively() throws Exception {
+    Configuration conf = getConfiguration();
+    Job job = Job.getInstance(conf);
+    FileInputFormat<?, ?> fileInputFormat = new TextInputFormat();
+    List<InputSplit> splits = fileInputFormat.getSplits(job);
+    Assert.assertEquals("Input splits are not correct", 2, splits.size());
+    Assert.assertEquals("test:/a1/a2", ((FileSplit) splits.get(0)).getPath()
+        .toString());
+    Assert.assertEquals("test:/a1/file1", ((FileSplit) splits.get(1)).getPath()
+        .toString());
+  }
+
+  private Configuration getConfiguration() {
+    Configuration conf = new Configuration();
+    conf.set("fs.test.impl.disable.cache", "true");
+    conf.setClass("fs.test.impl", MockFileSystem.class, FileSystem.class);
+    conf.set(FileInputFormat.INPUT_DIR, "test:///a1");
+    return conf;
+  }
+
+  static class MockFileSystem extends RawLocalFileSystem {
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws FileNotFoundException,
+        IOException {
+      if (f.toString().equals("test:/a1")) {
+        return new FileStatus[] {
+            new FileStatus(10, true, 1, 150, 150, new Path("test:/a1/a2")),
+            new FileStatus(10, false, 1, 150, 150, new Path("test:/a1/file1")) };
+      } else if (f.toString().equals("test:/a1/a2")) {
+        return new FileStatus[] {
+            new FileStatus(10, false, 1, 150, 150,
+                new Path("test:/a1/a2/file2")),
+            new FileStatus(10, false, 1, 151, 150,
+                new Path("test:/a1/a2/file3")) };
+      }
+      return new FileStatus[0];
+    }
+
+    @Override
+    public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+        throws IOException {
+      return new FileStatus[] { new FileStatus(10, true, 1, 150, 150,
+          pathPattern) };
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f, PathFilter filter)
+        throws FileNotFoundException, IOException {
+      return this.listStatus(f);
+    }
+  }
+}

+ 1 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java

@@ -189,7 +189,7 @@ public class TestFileInputFormat extends TestCase {
         + "directory with directories inside.", exceptionThrown);
 
     // Enable multi-level/recursive inputs
-    job.setBoolean("mapred.input.dir.recursive", true);
+    job.setBoolean(FileInputFormat.INPUT_DIR_RECURSIVE, true);
     InputSplit[] splits = inFormat.getSplits(job, 1);
     assertEquals(splits.length, 2);
   }