Selaa lähdekoodia

HADOOP-3095. Speed up split generation in the FileInputSplit,
especially for non-HDFS file systems. Deprecates
InputFormat.validateInput. Contributed by Tom White.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@663370 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 17 vuotta sitten
vanhempi
commit
3a6ec076a1

+ 4 - 0
CHANGES.txt

@@ -248,6 +248,10 @@ Trunk (unreleased changes)
     HADOOP-1702. Reduce buffer copies when data is written to DFS. 
     DataNodes take 30% less CPU while writing data. (rangadi)
 
+    HADOOP-3095. Speed up split generation in the FileInputSplit,
+    especially for non-HDFS file systems. Deprecates
+    InputFormat.validateInput. (tomwhite via omalley)
+
   BUG FIXES
 
     HADOOP-2905. 'fsck -move' triggers NPE in NameNode. 

+ 13 - 6
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -114,9 +114,12 @@ public class DistributedFileSystem extends FileSystem {
   }
   
 
-  public BlockLocation[] getFileBlockLocations(Path f, long start,
+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
       long len) throws IOException {
-    return dfs.getBlockLocations(getPathName(f), start, len);
+    if (file == null) {
+      return null;
+    }
+    return dfs.getBlockLocations(getPathName(file.getPath()), start, len);
   }
 
   public void setVerifyChecksum(boolean verifyChecksum) {
@@ -352,11 +355,15 @@ public class DistributedFileSystem extends FileSystem {
       DfsPath p = (DfsPath) f;
       return p.info;
     }
-    FileStatus fs = dfs.getFileInfo(getPathName(f));
-    if (fs != null)
-      return fs;
-    else
+    DFSFileInfo fi = dfs.getFileInfo(getPathName(f));
+    if (fi != null) {
+      return new FileStatus(fi.getLen(), fi.isDir(), fi.getReplication(),
+          fi.getBlockSize(), fi.getModificationTime(),
+          fi.getPermission(), fi.getOwner(), fi.getGroup(),
+          new DfsPath(fi, this)); // fully-qualify path;
+    } else {
       throw new FileNotFoundException("File does not exist: " + f);
+    }
   }
 
   /** {@inheritDoc }*/

+ 4 - 2
src/java/org/apache/hadoop/dfs/HftpFileSystem.java

@@ -161,11 +161,13 @@ public class HftpFileSystem extends FileSystem {
               Long.valueOf(attrs.getValue("blocksize")).longValue(),
               modif, FsPermission.valueOf(attrs.getValue("permission")),
               attrs.getValue("owner"), attrs.getValue("group"),
-              new Path(getUri().toString(), attrs.getValue("path")))
+              new Path(getUri().toString(), attrs.getValue("path"))
+                .makeQualified(HftpFileSystem.this))
         : new FileStatus(0L, true, 0, 0L,
               modif, FsPermission.valueOf(attrs.getValue("permission")),
               attrs.getValue("owner"), attrs.getValue("group"),
-              new Path(getUri().toString(), attrs.getValue("path")));
+              new Path(getUri().toString(), attrs.getValue("path"))
+                .makeQualified(HftpFileSystem.this));
       fslist.add(fs);
     }
 

+ 23 - 9
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -326,21 +326,35 @@ public abstract class FileSystem extends Configured implements Closeable {
    * hostnames of machines that contain the given file.
    *
    * The FileSystem will simply return an elt containing 'localhost'.
+   * @deprecated use {@link #getFileBlockLocations(FileStatus, long, long)}
    */
+  @Deprecated
   public BlockLocation[] getFileBlockLocations(Path f, 
     long start, long len) throws IOException {
-    if (!exists(f)) {
+  
+    return getFileBlockLocations(getFileStatus(f), start, len);
+  }
+  
+  /**
+   * Return an array containing hostnames, offset and size of 
+   * portions of the given file.  For a nonexistent 
+   * file or regions, null will be returned.
+   *
+   * This call is most helpful with DFS, where it returns 
+   * hostnames of machines that contain the given file.
+   *
+   * The FileSystem will simply return an elt containing 'localhost'.
+   */
+  public BlockLocation[] getFileBlockLocations(FileStatus file, 
+      long start, long len) throws IOException {
+    if (file == null) {
       return null;
-    } else {
-      BlockLocation result[] = new BlockLocation[1];
-      String[] name = new String[1];
-      name[0] = "localhost:50010";
-      String[] host = new String[1];
-      host[0] = "localhost";
-      result[0] = new BlockLocation(name, host, 0, len);
-      return result;
     }
+    String[] name = { "localhost:50010" };
+    String[] host = { "localhost" };
+    return new BlockLocation[] { new BlockLocation(name, host, 0, len) };
   }
+  
   /**
    * Opens an FSDataInputStream at the indicated Path.
    * @param f the file name to open

+ 5 - 0
src/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -96,6 +96,11 @@ public class FilterFileSystem extends FileSystem {
       return fs.getFileBlockLocations(f, start, len);
   }
 
+  public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+    long len) throws IOException {
+      return fs.getFileBlockLocations(file, start, len);
+  }
+  
   /**
    * Opens an FSDataInputStream at the indicated Path.
    * @param f the file name to open

+ 1 - 25
src/java/org/apache/hadoop/fs/InMemoryFileSystem.java

@@ -81,30 +81,6 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
       return uri;
     }
 
-    /**
-     * Return an array containing hostnames, offset and size.
-     *
-     * This call is most helpful with DFS, where it returns
-     * hostnames for machines that contain the given file.
-     
-     * The InMemoryFileSystem will simply return an elt 
-     * containing 'inmemory'
-     */ 
-    public BlockLocation[] getFileBlockLocations (Path f, 
-      long start, long len) throws IOException {
-      if (!exists(f)) {
-        return null;
-      } else {
-        BlockLocation result[] = new BlockLocation[1];
-        String[] name = new String[1];
-        name[0] = "inmemory:50010";
-        String[] host = new String[1];
-        host[0] = "inmemory";
-        result[0] = new BlockLocation(name, host, 0, len);
-        return result;
-      }
-    }
-    
     private class InMemoryInputStream extends FSInputStream {
       private DataInputBuffer din = new DataInputBuffer();
       private FileAttributes fAttr;
@@ -309,7 +285,7 @@ public class InMemoryFileSystem extends ChecksumFileSystem {
         if (attr==null) {
           throw new FileNotFoundException("File " + f + " does not exist.");
         }
-        return new InMemoryFileStatus(f, attr);
+        return new InMemoryFileStatus(f.makeQualified(this), attr);
       }
     }
   

+ 2 - 2
src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java

@@ -403,7 +403,7 @@ public class FTPFileSystem extends FileSystem {
       long modTime = -1; // Modification time of root dir not known.
       Path root = new Path("/");
       return new FileStatus(length, isDir, blockReplication, blockSize,
-          modTime, root);
+          modTime, root.makeQualified(this));
     }
     String pathName = parentPath.toUri().getPath();
     FTPFile[] ftpFiles = client.listFiles(pathName);
@@ -443,7 +443,7 @@ public class FTPFileSystem extends FileSystem {
     String group = ftpFile.getGroup();
     Path filePath = new Path(parentPath, ftpFile.getName());
     return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
-        permission, user, group, filePath);
+        permission, user, group, filePath.makeQualified(this));
   }
 
   @Override

+ 9 - 6
src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java

@@ -173,13 +173,14 @@ public class KosmosFileSystem extends FileSystem {
         }
         if (kfsImpl.isDirectory(srep)) {
             // System.out.println("Status of path: " + path + " is dir");
-            return new FileStatus(0, true, 1, 0, 0, path);
+            return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
         } else {
             // System.out.println("Status of path: " + path + " is file");
             return new FileStatus(kfsImpl.filesize(srep), false, 
                                   kfsImpl.getReplication(srep),
                                   getDefaultBlockSize(),
-                                  kfsImpl.getModificationTime(srep), path);
+                                  kfsImpl.getModificationTime(srep),
+                                  path.makeQualified(this));
         }
     }
     
@@ -308,12 +309,14 @@ public class KosmosFileSystem extends FileSystem {
      * Return null if the file doesn't exist; otherwise, get the
      * locations of the various chunks of the file file from KFS.
      */
-    public BlockLocation[] getBlockLocations(Path f, long start, long len
-                                             ) throws IOException {
-      if (!exists(f)) {
+    @Override
+    public BlockLocation[] getFileBlockLocations(FileStatus file, long start,
+        long len) throws IOException {
+
+      if (file == null) {
         return null;
       }
-      String srep = makeAbsolute(f).toUri().getPath();
+      String srep = makeAbsolute(file.getPath()).toUri().getPath();
       String[][] hints = kfsImpl.getDataLocation(srep, start, len);
       BlockLocation[] result = new BlockLocation[hints.length];
       long blockSize = getDefaultBlockSize();

+ 4 - 2
src/java/org/apache/hadoop/fs/s3/S3FileSystem.java

@@ -170,7 +170,9 @@ public class S3FileSystem extends FileSystem {
       return null;
     }
     if (inode.isFile()) {
-      return new FileStatus[] { new S3FileStatus(f, inode) };
+      return new FileStatus[] {
+        new S3FileStatus(f.makeQualified(this), inode)
+      };
     }
     ArrayList<FileStatus> ret = new ArrayList<FileStatus>();
     for (Path p : store.listSubPaths(absolutePath)) {
@@ -314,7 +316,7 @@ public class S3FileSystem extends FileSystem {
     if (inode == null) {
       throw new FileNotFoundException(f + ": No such file or directory.");
     }
-    return new S3FileStatus(f, inode);
+    return new S3FileStatus(f.makeQualified(this), inode);
   }
 
   // diagnostic methods

+ 113 - 67
src/java/org/apache/hadoop/mapred/FileInputFormat.java

@@ -18,18 +18,18 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
@@ -38,9 +38,9 @@ import org.apache.hadoop.util.StringUtils;
  * A base class for file-based {@link InputFormat}.
  * 
  * <p><code>FileInputFormat</code> is the base class for all file-based 
- * <code>InputFormat</code>s. This provides generic implementations of
- * {@link #validateInput(JobConf)} and {@link #getSplits(JobConf, int)}.
- * Implementations fo <code>FileInputFormat</code> can also override the 
+ * <code>InputFormat</code>s. This provides a generic implementation of
+ * {@link #getSplits(JobConf, int)}.
+ * Subclasses of <code>FileInputFormat</code> can also override the 
  * {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
  * not split-up and are processed as a whole by {@link Mapper}s.
  */
@@ -121,7 +121,7 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
    * @return the PathFilter instance set for the job, NULL if none has been set.
    */
   public static PathFilter getInputPathFilter(JobConf conf) {
-    Class filterClass = conf.getClass("mapred.input.pathFilter.class", null,
+    Class<?> filterClass = conf.getClass("mapred.input.pathFilter.class", null,
         PathFilter.class);
     return (filterClass != null) ?
         (PathFilter) ReflectionUtils.newInstance(filterClass, conf) : null;
@@ -132,18 +132,18 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
    * expression. 
    * 
    * @param job the job to list input paths for
-   * @return array of Path objects
+   * @return array of FileStatus objects
    * @throws IOException if zero items.
    */
-  protected Path[] listPaths(JobConf job)
-    throws IOException {
+  protected FileStatus[] listStatus(JobConf job) throws IOException {
     Path[] dirs = getInputPaths(job);
     if (dirs.length == 0) {
       throw new IOException("No input paths specified in job");
     }
 
-    List<Path> result = new ArrayList<Path>();
-
+    List<FileStatus> result = new ArrayList<FileStatus>();
+    List<IOException> errors = new ArrayList<IOException>();
+    
     // creates a MultiPathFilter with the hiddenFileFilter and the
     // user provided one (if any).
     List<PathFilter> filters = new ArrayList<PathFilter>();
@@ -156,73 +156,119 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
 
     for (Path p: dirs) {
       FileSystem fs = p.getFileSystem(job); 
-      FileStatus[] matches = fs.listStatus(FileUtil.stat2Paths(fs.globStatus(p,
-                                     inputFilter)), inputFilter);
-
-      for (FileStatus match: matches) {
-        result.add(fs.makeQualified(match.getPath()));
-      }
-    }
-
-    return result.toArray(new Path[result.size()]);
-  }
-
-  public void validateInput(JobConf job) throws IOException {
-    Path[] inputDirs = getInputPaths(job);
-    if (inputDirs.length == 0) {
-      throw new IOException("No input paths specified in input"); 
-    }
-    
-    List<IOException> result = new ArrayList<IOException>();
-    int totalFiles = 0; 
-    for (Path p: inputDirs) {
-      FileSystem fs = p.getFileSystem(job);
-      if (fs.exists(p)) {
-        // make sure all paths are files to avoid exception
-        // while generating splits
-        Path[] subPaths = FileUtil.stat2Paths(fs.listStatus(p, 
-                                              hiddenFileFilter));
-        for (Path subPath : subPaths) {
-          FileSystem subFS = subPath.getFileSystem(job); 
-          if (!subFS.exists(subPath)) {
-            result.add(new IOException(
-                                       "Input path does not exist: " + subPath)); 
-          } else {
-            totalFiles++; 
-          }
-        }
+      FileStatus[] matches = fs.globStatus(p, inputFilter);
+      if (matches == null) {
+        errors.add(new IOException("Input path does not exist: " + p));
+      } else if (matches.length == 0) {
+        errors.add(new IOException("Input Pattern " + p + " matches 0 files"));
       } else {
-        Path [] paths = FileUtil.stat2Paths(fs.globStatus(p, 
-                                                          hiddenFileFilter), p);
-        if (paths.length == 0) {
-          result.add(
-                     new IOException("Input Pattern " + p + " matches 0 files")); 
-        } else {
-          // validate globbed paths 
-          for (Path gPath : paths) {
-            FileSystem gPathFS = gPath.getFileSystem(job); 
-            if (!gPathFS.exists(gPath)) {
-              result.add(
-                         new FileNotFoundException(
-                                                   "Input path doesnt exist : " + gPath)); 
-            }
+        for (FileStatus globStat: matches) {
+          if (globStat.isDir()) {
+            for(FileStatus stat: fs.listStatus(globStat.getPath(),
+                inputFilter)) {
+              result.add(stat);
+            }          
+          } else {
+            result.add(globStat);
           }
-          totalFiles += paths.length; 
         }
       }
     }
-    if (!result.isEmpty()) {
-      throw new InvalidInputException(result);
+
+    if (!errors.isEmpty()) {
+      throw new InvalidInputException(errors);
     }
-    // send output to client. 
-    LOG.info("Total input paths to process : " + totalFiles); 
+    LOG.info("Total input paths to process : " + result.size()); 
+    return result.toArray(new FileStatus[result.size()]);
+  }
+  
+  /** List input directories.
+   * Subclasses may override to, e.g., select only files matching a regular
+   * expression. 
+   * 
+   * @param job the job to list input paths for
+   * @return array of Path objects
+   * @throws IOException if zero items.
+   * @deprecated Use {@link #listStatus(JobConf)} instead.
+   */
+  @Deprecated
+  protected Path[] listPaths(JobConf job)
+    throws IOException {
+    return FileUtil.stat2Paths(listStatus(job));
+  }
+  
+  @Deprecated
+  public void validateInput(JobConf job) throws IOException {
+    // handled by getSplits
   }
 
-  /** Splits files returned by {@link #listPaths(JobConf)} when
+  /** Splits files returned by {@link #listStatus(JobConf)} when
    * they're too big.*/ 
+  @SuppressWarnings("deprecation")
   public InputSplit[] getSplits(JobConf job, int numSplits)
     throws IOException {
-    Path[] files = listPaths(job);
+    FileStatus[] files = listStatus(job);
+    
+    // Applications may have overridden listPaths so we need to check if
+    // it returns a different set of paths to listStatus.
+    // If it does we revert to the old behavior using Paths not FileStatus
+    // objects.
+    // When listPaths is removed, this check can be removed too.
+    Path[] paths = listPaths(job);
+    if (!Arrays.equals(paths, FileUtil.stat2Paths(files))) {
+      LOG.warn("FileInputFormat#listPaths is deprecated, override listStatus " +
+      		"instead.");
+      return getSplitsForPaths(job, numSplits, paths);
+    }
+    long totalSize = 0;                           // compute total size
+    for (FileStatus file: files) {                // check we have valid files
+      if (file.isDir()) {
+        throw new IOException("Not a file: "+ file.getPath());
+      }
+      totalSize += file.getLen();
+    }
+
+    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
+    long minSize = Math.max(job.getLong("mapred.min.split.size", 1),
+                            minSplitSize);
+
+    // generate splits
+    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
+    for (FileStatus file: files) {
+      Path path = file.getPath();
+      FileSystem fs = path.getFileSystem(job);
+      long length = file.getLen();
+      BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
+      if ((length != 0) && isSplitable(fs, path)) { 
+        long blockSize = file.getBlockSize();
+        long splitSize = computeSplitSize(goalSize, minSize, blockSize);
+
+        long bytesRemaining = length;
+        while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
+          int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
+          splits.add(new FileSplit(path, length-bytesRemaining, splitSize, 
+                                   blkLocations[blkIndex].getHosts()));
+          bytesRemaining -= splitSize;
+        }
+        
+        if (bytesRemaining != 0) {
+          splits.add(new FileSplit(path, length-bytesRemaining, bytesRemaining, 
+                     blkLocations[blkLocations.length-1].getHosts()));
+        }
+      } else if (length != 0) {
+        splits.add(new FileSplit(path, 0, length, blkLocations[0].getHosts()));
+      } else { 
+        //Create empty hosts array for zero length files
+        splits.add(new FileSplit(path, 0, length, new String[0]));
+      }
+    }
+    LOG.debug("Total # of splits: " + splits.size());
+    return splits.toArray(new FileSplit[splits.size()]);
+  }
+  
+  @Deprecated
+  private InputSplit[] getSplitsForPaths(JobConf job, int numSplits,
+      Path[] files) throws IOException {
     long totalSize = 0;                           // compute total size
     for (int i = 0; i < files.length; i++) {      // check we have valid files
       Path file = files[i];

+ 2 - 0
src/java/org/apache/hadoop/mapred/InputFormat.java

@@ -74,6 +74,8 @@ public interface InputFormat<K, V> {
    * 
    * @param job job configuration.
    * @throws InvalidInputException if the job does not have valid input
+   * @deprecated getSplits is called in the client and can perform any
+   * necessary validation of the input 
    */
   void validateInput(JobConf job) throws IOException;
   

+ 3 - 2
src/java/org/apache/hadoop/mapred/JobClient.java

@@ -672,7 +672,8 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     
     
     // Check the input specification 
-    job.getInputFormat().validateInput(job);
+    InputFormat inFormat = job.getInputFormat();
+    inFormat.validateInput(job);
 
     // Check the output specification
     job.getOutputFormat().checkOutputSpecs(fs, job);
@@ -680,7 +681,7 @@ public class JobClient extends Configured implements MRConstants, Tool  {
     // Create the splits for the job
     LOG.debug("Creating splits at " + fs.makeQualified(submitSplitFile));
     InputSplit[] splits = 
-      job.getInputFormat().getSplits(job, job.getNumMapTasks());
+      inFormat.getSplits(job, job.getNumMapTasks());
     // sort the splits into order based on size, so that the biggest
     // go first
     Arrays.sort(splits, new Comparator<InputSplit>() {