Prechádzať zdrojové kódy

Fix HADOOP-38: Add FileSystem.getBlockSize() method and use it as the maximum split size. Also change FileSystem to implement Configurable, and improve some javadoc, using inherited comments where possible and removing implementation details from public javadoc.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@378058 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 rokov pred
rodič
commit
7d9ac1edb9

+ 11 - 48
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -29,9 +29,6 @@ import org.apache.hadoop.conf.Configuration;
  * This object is the way end-user code interacts with a Hadoop
  * DistributedFileSystem.
  *
- * It's substantially a wrapper around the DFSClient class, with
- * a few extra functions.
- *
  * @author Mike Cafarella
  *****************************************************************/
 public class DistributedFileSystem extends FileSystem {
@@ -43,9 +40,7 @@ public class DistributedFileSystem extends FileSystem {
 
     DFSClient dfs;
 
-    /**
-     * Create the ShareSet automatically, and then go on to
-     * the regular constructor.
+    /** Construct a client for the filesystem at <code>namenode</code>.
      */
     public DistributedFileSystem(InetSocketAddress namenode, Configuration conf) throws IOException {
       super(conf);
@@ -90,27 +85,19 @@ public class DistributedFileSystem extends FileSystem {
         return dfs.delete(getPath(f));
     }
 
-    /**
-     */
     public boolean exists(File f) throws IOException {
         return dfs.exists(getPath(f));
     }
 
-    /**
-     */
     public boolean isDirectory(File f) throws IOException {
         return dfs.isDirectory(getPath(f));
     }
 
-    /**
-     */
     public long getLength(File f) throws IOException {
         DFSFileInfo info[] = dfs.listFiles(getPath(f));
         return info[0].getLen();
     }
 
-    /**
-     */
     public File[] listFilesRaw(File f) throws IOException {
         DFSFileInfo info[] = dfs.listFiles(getPath(f));
         if (info == null) {
@@ -124,36 +111,22 @@ public class DistributedFileSystem extends FileSystem {
         }
     }
 
-    /**
-     */
     public void mkdirs(File f) throws IOException {
         dfs.mkdirs(getPath(f));
     }
 
-    /**
-     * Obtain a filesystem lock at File f.
-     */
     public void lock(File f, boolean shared) throws IOException {
         dfs.lock(getPath(f), ! shared);
     }
 
-    /**
-     * Release a held lock
-     */
     public void release(File f) throws IOException {
         dfs.release(getPath(f));
     }
 
-    /**
-     * Remove the src when finished.
-     */
     public void moveFromLocalFile(File src, File dst) throws IOException {
         doFromLocalFile(src, dst, true);
     }
 
-    /**
-     * keep the src when finished.
-     */
     public void copyFromLocalFile(File src, File dst) throws IOException {
         doFromLocalFile(src, dst, false);
     }
@@ -177,7 +150,7 @@ public class DistributedFileSystem extends FileSystem {
                 doFromLocalFile(contents[i], new File(dst, contents[i].getName()), deleteSource);
             }
         } else {
-            byte buf[] = new byte[this.conf.getInt("io.file.buffer.size", 4096)];
+            byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];
             InputStream in = new BufferedInputStream(new FileInputStream(src));
             try {
                 OutputStream out = create(dst);
@@ -198,10 +171,6 @@ public class DistributedFileSystem extends FileSystem {
             src.delete();
     }
 
-    /**
-     * Takes a hierarchy of files from the FS system and writes to
-     * the given local target.
-     */
     public void copyToLocalFile(File src, File dst) throws IOException {
         if (dst.exists()) {
             if (! dst.isDirectory()) {
@@ -222,10 +191,10 @@ public class DistributedFileSystem extends FileSystem {
                 copyToLocalFile(contents[i], new File(dst, contents[i].getName()));
             }
         } else {
-            byte buf[] = new byte[this.conf.getInt("io.file.buffer.size", 4096)];
+            byte buf[] = new byte[getConf().getInt("io.file.buffer.size", 4096)];
             InputStream in = open(src);
             try {
-                OutputStream out = FileSystem.getNamed("local", this.conf).create(dst);
+                OutputStream out = FileSystem.getNamed("local", getConf()).create(dst);
                 try {
                     int bytesRead = in.read(buf);
                     while (bytesRead >= 0) {
@@ -241,10 +210,6 @@ public class DistributedFileSystem extends FileSystem {
         }
     }
 
-    /**
-     * Output will go to the tmp working area.  There may be some source
-     * material that we obtain first.
-     */
     public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws IOException {
         if (exists(fsOutputFile)) {
             copyToLocalFile(fsOutputFile, tmpLocalFile);
@@ -272,25 +237,18 @@ public class DistributedFileSystem extends FileSystem {
      */
     public void completeLocalInput(File localFile) throws IOException {
         // Get rid of the local copy - we don't need it anymore.
-        FileUtil.fullyDelete(localFile, this.conf);
+        FileUtil.fullyDelete(localFile, getConf());
     }
 
-    /**
-     * Shut down the FS.  Not necessary for regular filesystem.
-     */
     public void close() throws IOException {
         dfs.close();
     }
 
-    /**
-     */
     public String toString() {
         return "DFS[" + dfs + "]";
     }
 
-    /**
-     */
-    public DFSClient getClient() {
+    DFSClient getClient() {
         return dfs;
     }
     
@@ -321,4 +279,9 @@ public class DistributedFileSystem extends FileSystem {
       // directory on their datanode, and then re-replicate the blocks, so that
       // no data is lost. a task may fail, but on retry it should succeed.
     }
+
+    public long getBlockSize() {
+      return dfs.BLOCK_SIZE;
+    }
+
 }

+ 18 - 12
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -43,7 +43,7 @@ import org.apache.hadoop.util.LogFormatter;
  * implementation is {@link DistributedFileSystem}.
  * @author Mike Cafarella
  *****************************************************************/
-public abstract class FileSystem {
+public abstract class FileSystem extends Configured {
     public static final Logger LOG = LogFormatter.getLogger("org.apache.hadoop.dfs.DistributedFileSystem");
 
     private static final HashMap NAME_TO_FS = new HashMap();
@@ -87,7 +87,6 @@ public abstract class FileSystem {
       return getNamed(conf.get("fs.default.name", "local"), conf);
     }
 
-    protected Configuration conf;
     /** Returns a name for this filesystem, suitable to pass to {@link
      * FileSystem#getNamed(String,Configuration)}.*/
     public abstract String getName();
@@ -96,7 +95,6 @@ public abstract class FileSystem {
      * host:port pair, naming an DFS name server.*/
     public static FileSystem getNamed(String name, Configuration conf) throws IOException {
       FileSystem fs = (FileSystem)NAME_TO_FS.get(name);
-      int ioFileBufferSize = conf.getInt("io.file.buffer.size", 4096);
       if (fs == null) {
         if ("local".equals(name)) {
           fs = new LocalFileSystem(conf);
@@ -122,10 +120,9 @@ public abstract class FileSystem {
     ///////////////////////////////////////////////////////////////
     // FileSystem
     ///////////////////////////////////////////////////////////////
-    /**
-     */
-    public FileSystem(Configuration conf) {
-        this.conf = conf;
+
+    protected FileSystem(Configuration conf) {
+      super(conf);
     }
 
     /**
@@ -146,7 +143,7 @@ public abstract class FileSystem {
      * @param bufferSize the size of the buffer to be used.
      */
     public FSDataInputStream open(File f, int bufferSize) throws IOException {
-      return new FSDataInputStream(this, f, bufferSize, this.conf);
+      return new FSDataInputStream(this, f, bufferSize, getConf());
     }
     
     /**
@@ -154,7 +151,7 @@ public abstract class FileSystem {
      * @param f the file to open
      */
     public FSDataInputStream open(File f) throws IOException {
-      return new FSDataInputStream(this, f, conf);
+      return new FSDataInputStream(this, f, getConf());
     }
 
     /**
@@ -168,7 +165,7 @@ public abstract class FileSystem {
      * Files are overwritten by default.
      */
     public FSDataOutputStream create(File f) throws IOException {
-      return create(f, true,this.conf.getInt("io.file.buffer.size", 4096));
+      return create(f, true, getConf().getInt("io.file.buffer.size", 4096));
     }
 
     /**
@@ -180,7 +177,7 @@ public abstract class FileSystem {
      */
     public FSDataOutputStream create(File f, boolean overwrite,
                                       int bufferSize) throws IOException {
-      return new FSDataOutputStream(this, f, overwrite, this.conf);
+      return new FSDataOutputStream(this, f, overwrite, getConf());
     }
 
     /** Opens an OutputStream at the indicated File.
@@ -256,8 +253,10 @@ public abstract class FileSystem {
      */
     public abstract boolean exists(File f) throws IOException;
 
+    /** True iff the named path is a directory. */
     public abstract boolean isDirectory(File f) throws IOException;
 
+    /** True iff the named path is a regular file. */
     public boolean isFile(File f) throws IOException {
         if (exists(f) && ! isDirectory(f)) {
             return true;
@@ -266,8 +265,10 @@ public abstract class FileSystem {
         }
     }
     
+    /** The number of bytes in a file. */
     public abstract long getLength(File f) throws IOException;
 
+    /** List files in a directory. */
     public File[] listFiles(File f) throws IOException {
       return listFiles(f, new FileFilter() {
           public boolean accept(File file) {
@@ -276,8 +277,10 @@ public abstract class FileSystem {
         });
     }
 
+    /** List files in a directory. */
     public abstract File[] listFilesRaw(File f) throws IOException;
 
+    /** Filter files in a directory. */
     public File[] listFiles(File f, FileFilter filter) throws IOException {
         Vector results = new Vector();
         File listing[] = listFilesRaw(f);
@@ -311,7 +314,6 @@ public abstract class FileSystem {
      * The src file is on the local disk.  Add it to FS at
      * the given dst name and the source is kept intact afterwards
      */
-    // not implemneted yet
     public abstract void copyFromLocalFile(File src, File dst) throws IOException;
 
     /**
@@ -383,4 +385,8 @@ public abstract class FileSystem {
                                                long start, long length,
                                                int crc);
 
+    /** Return the number of bytes that large input files should be optimally
+     * be split into to minimize i/o time. */
+    public abstract long getBlockSize();
+
 }

+ 20 - 62
src/java/org/apache/hadoop/fs/LocalFileSystem.java

@@ -25,9 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.UTF8;
 
 /****************************************************************
- * Implement the FileSystem interface for the local disk.
- * This is pretty easy.  The interface exists so we can use either
- * remote or local Files very easily.
+ * Implement the FileSystem API for the native filesystem.
  *
  * @author Mike Cafarella
  *****************************************************************/
@@ -38,8 +36,7 @@ public class LocalFileSystem extends FileSystem {
     // by default use copy/delete instead of rename
     boolean useCopyForRename = true;
     
-    /**
-     */
+    /** Construct a local filesystem client. */
     public LocalFileSystem(Configuration conf) throws IOException {
         super(conf);
         // if you find an OS which reliably supports non-POSIX
@@ -111,9 +108,6 @@ public class LocalFileSystem extends FileSystem {
         public long skip(long n) throws IOException { return fis.skip(n); }
     }
     
-    /**
-     * Open the file at f
-     */
     public FSInputStream openRaw(File f) throws IOException {
         if (! f.exists()) {
             throw new FileNotFoundException(f.toString());
@@ -169,58 +163,39 @@ public class LocalFileSystem extends FileSystem {
         return new LocalFSFileOutputStream(f);
     }
 
-    /**
-     * Rename files/dirs
-     */
     public boolean renameRaw(File src, File dst) throws IOException {
         if (useCopyForRename) {
-            FileUtil.copyContents(this, src, dst, true, conf);
+            FileUtil.copyContents(this, src, dst, true, getConf());
             return fullyDelete(src);
         } else return src.renameTo(dst);
     }
 
-    /**
-     * Get rid of File f, whether a true file or dir.
-     */
     public boolean deleteRaw(File f) throws IOException {
         if (f.isFile()) {
             return f.delete();
         } else return fullyDelete(f);
     }
 
-    /**
-     */
     public boolean exists(File f) throws IOException {
         return f.exists();
     }
 
-    /**
-     */
     public boolean isDirectory(File f) throws IOException {
         return f.isDirectory();
     }
 
-    /**
-     */
     public long getLength(File f) throws IOException {
         return f.length();
     }
 
-    /**
-     */
     public File[] listFilesRaw(File f) throws IOException {
         return f.listFiles();
     }
 
-    /**
-     */
     public void mkdirs(File f) throws IOException {
         f.mkdirs();
     }
 
-    /**
-     * Obtain a filesystem lock at File f.
-     */
     public synchronized void lock(File f, boolean shared) throws IOException {
         f.createNewFile();
 
@@ -237,9 +212,6 @@ public class LocalFileSystem extends FileSystem {
         lockObjSet.put(f, lockObj);
     }
 
-    /**
-     * Release a held lock
-     */
     public synchronized void release(File f) throws IOException {
         FileLock lockObj = (FileLock) lockObjSet.get(f);
         FileInputStream sharedLockData = (FileInputStream) sharedLockDataSet.get(f);
@@ -263,71 +235,51 @@ public class LocalFileSystem extends FileSystem {
         }
     }
 
-    /**
-     * In the case of the local filesystem, we can just rename the file.
-     */
+    // In the case of the local filesystem, we can just rename the file.
     public void moveFromLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
             if (useCopyForRename) {
-                FileUtil.copyContents(this, src, dst, true, this.conf);
+                FileUtil.copyContents(this, src, dst, true, getConf());
                 fullyDelete(src);
             } else src.renameTo(dst);
         }
     }
 
-    /**
-     * Similar to moveFromLocalFile(), except the source is kept intact.
-     */
+    // Similar to moveFromLocalFile(), except the source is kept intact.
     public void copyFromLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
-            FileUtil.copyContents(this, src, dst, true, this.conf);
+            FileUtil.copyContents(this, src, dst, true, getConf());
         }
     }
 
-    /**
-     * We can't delete the src file in this case.  Too bad.
-     */
+    // We can't delete the src file in this case.  Too bad.
     public void copyToLocalFile(File src, File dst) throws IOException {
         if (! src.equals(dst)) {
-            FileUtil.copyContents(this, src, dst, true, this.conf);
+            FileUtil.copyContents(this, src, dst, true, getConf());
         }
     }
 
-    /**
-     * We can write output directly to the final location
-     */
+    // We can write output directly to the final location
     public File startLocalOutput(File fsOutputFile, File tmpLocalFile) throws IOException {
         return fsOutputFile;
     }
 
-    /**
-     * It's in the right place - nothing to do.
-     */
+    // It's in the right place - nothing to do.
     public void completeLocalOutput(File fsWorkingFile, File tmpLocalFile) throws IOException {
     }
 
-    /**
-     * We can read directly from the real local fs.
-     */
+    // We can read directly from the real local fs.
     public File startLocalInput(File fsInputFile, File tmpLocalFile) throws IOException {
         return fsInputFile;
     }
 
-    /**
-     * We're done reading.  Nothing to clean up.
-     */
+    // We're done reading.  Nothing to clean up.
     public void completeLocalInput(File localFile) throws IOException {
         // Ignore the file, it's at the right destination!
     }
 
-    /**
-     * Shut down the FS.  Not necessary for regular filesystem.
-     */
-    public void close() throws IOException {
-    }
+    public void close() throws IOException {}
 
-    /**
-     */
     public String toString() {
         return "LocalFS";
     }
@@ -392,4 +344,10 @@ public class LocalFileSystem extends FileSystem {
       }
     }
 
+    public long getBlockSize() {
+      // default to 32MB: large enough to minimize the impact of seeks
+      return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
+    }
+
+
 }

+ 11 - 1
src/java/org/apache/hadoop/mapred/InputFormatBase.java

@@ -105,7 +105,17 @@ public abstract class InputFormatBase implements InputFormat {
       totalSize += fs.getLength(files[i]);
     }
 
-    long bytesPerSplit = Math.max(totalSize / numSplits, minSplitSize);
+    long bytesPerSplit = totalSize / numSplits;   // start w/ desired num splits
+
+    long fsBlockSize = fs.getBlockSize();
+    if (bytesPerSplit > fsBlockSize) {            // no larger than fs blocks
+      bytesPerSplit = fsBlockSize;
+    }
+
+    if (bytesPerSplit < minSplitSize) {           // no smaller than min size
+      bytesPerSplit = minSplitSize;
+    }
+
     long maxPerSplit = bytesPerSplit + (long)(bytesPerSplit*SPLIT_SLOP);
 
     //LOG.info("bytesPerSplit = " + bytesPerSplit);