Ver Fonte

Merged src/core, src/test/core, src/contrib/eclipse-plugin, and
src/contrib/ec2 from trunk 776174:784663


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/HADOOP-4687/core@784965 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley há 16 anos atrás
pai
commit
bcd64325a1
37 ficheiros alterados com 1293 adições e 506 exclusões
  1. 9 1
      src/contrib/ec2/bin/hadoop-ec2-init-remote.sh
  2. 3 1
      src/contrib/ec2/bin/launch-hadoop-slaves
  3. 2 1
      src/contrib/eclipse-plugin/build.xml
  4. 17 0
      src/java/core-default.xml
  5. 23 38
      src/java/org/apache/hadoop/filecache/DistributedCache.java
  6. 66 0
      src/java/org/apache/hadoop/fs/CreateFlag.java
  7. 68 19
      src/java/org/apache/hadoop/fs/FileSystem.java
  8. 3 2
      src/java/org/apache/hadoop/fs/FilterFileSystem.java
  9. 101 55
      src/java/org/apache/hadoop/fs/FsShell.java
  10. 12 2
      src/java/org/apache/hadoop/fs/RawLocalFileSystem.java
  11. 15 1
      src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
  12. 0 1
      src/java/org/apache/hadoop/fs/kfs/IFSImpl.java
  13. 0 1
      src/java/org/apache/hadoop/fs/kfs/KFSImpl.java
  14. 0 1
      src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java
  15. 0 1
      src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java
  16. 12 4
      src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java
  17. 15 2
      src/java/org/apache/hadoop/fs/s3/S3FileSystem.java
  18. 43 66
      src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java
  19. 2 4
      src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java
  20. 154 104
      src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java
  21. 5 3
      src/java/org/apache/hadoop/http/HttpServer.java
  22. 0 60
      src/java/org/apache/hadoop/io/DeprecatedUTF8.java
  23. 22 10
      src/java/org/apache/hadoop/io/IOUtils.java
  24. 12 9
      src/java/org/apache/hadoop/metrics/ContextFactory.java
  25. 3 0
      src/java/org/apache/hadoop/net/NetUtils.java
  26. 158 84
      src/java/org/apache/hadoop/util/ProcessTree.java
  27. 89 11
      src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java
  28. 103 8
      src/java/org/apache/hadoop/util/Progress.java
  29. 2 2
      src/java/org/apache/hadoop/util/RunJar.java
  30. 2 1
      src/java/org/apache/hadoop/util/StringUtils.java
  31. 9 0
      src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java
  32. 0 1
      src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java
  33. 0 1
      src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java
  34. 12 0
      src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java
  35. 7 12
      src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java
  36. 90 0
      src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java
  37. 234 0
      src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

+ 9 - 1
src/contrib/ec2/bin/hadoop-ec2-init-remote.sh

@@ -17,7 +17,9 @@ MASTER_HOST=%MASTER_HOST% # Interpolated before being sent to EC2 node
 SECURITY_GROUPS=`wget -q -O - http://169.254.169.254/latest/meta-data/security-groups`
 IS_MASTER=`echo $SECURITY_GROUPS | awk '{ a = match ($0, "-master$"); if (a) print "true"; else print "false"; }'`
 if [ "$IS_MASTER" == "true" ]; then
- MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/local-hostname`
+ # use public hostnames for master. private hostnames can be used by substituting:
+ # MASTER_HOST=`wget -q -O - http://169.254.169.254/latest/meta-data/local-hostname`
+ MASTER_HOST=`wget -q -O - 'http://169.254.169.254/latest/meta-data/public-hostname'`
 fi
 
 HADOOP_HOME=`ls -d /usr/local/hadoop-*`
@@ -78,6 +80,12 @@ cat > $HADOOP_HOME/conf/hadoop-site.xml <<EOF
   <value>3</value>
 </property>
 
+<property>
+  <name>hadoop.rpc.socket.factory.class.default</name>
+  <value>org.apache.hadoop.net.StandardSocketFactory</value>
+  <final>true</final>
+</property>
+
 </configuration>
 EOF
 

+ 3 - 1
src/contrib/ec2/bin/launch-hadoop-slaves

@@ -42,7 +42,9 @@ fi
 
 # Finding Hadoop image
 AMI_IMAGE=`ec2-describe-images -a | grep $S3_BUCKET | grep $HADOOP_VERSION | grep $ARCH |grep available | awk '{print $2}'`
-MASTER_HOST=`cat $MASTER_PRIVATE_IP_PATH`
+# to use private master hostname, substitute below with:
+# MASTER_HOST=`cat $MASTER_PRIVATE_IP_PATH`
+MASTER_HOST=`cat $MASTER_IP_PATH`
 MASTER_ZONE=`cat $MASTER_ZONE_PATH`
 
 # Substituting master hostname

+ 2 - 1
src/contrib/eclipse-plugin/build.xml

@@ -67,7 +67,8 @@
   <target name="jar" depends="compile" unless="skip.contrib">
     <mkdir dir="${build.dir}/lib"/>
     <copy file="${hadoop.root}/build/hadoop-${version}-core.jar" tofile="${build.dir}/lib/hadoop-core.jar" verbose="true"/>
-    <copy file="${hadoop.root}/lib/commons-cli-2.0-SNAPSHOT.jar" todir="${build.dir}/lib" verbose="true"/>
+    <copy file="${hadoop.root}/build/ivy/lib/Hadoop/common/commons-cli-${commons-cli.version}.jar"  todir="${build.dir}/lib" verbose="true"/>
+    <copy file="${hadoop.root}/build/ivy/lib/Hadoop/common/commons-cli-${commons-cli2.version}.jar"  todir="${build.dir}/lib" verbose="true"/>
     <jar
       jarfile="${build.dir}/hadoop-${version}-${name}.jar"
       manifest="${root}/META-INF/MANIFEST.MF">

+ 17 - 0
src/java/core-default.xml

@@ -247,6 +247,23 @@
 </property>
 
 
+<property>
+  <name>fs.automatic.close</name>
+  <value>true</value>
+  <description>By default, FileSystem instances are automatically closed at program
+  exit using a JVM shutdown hook. Setting this property to false disables this
+  behavior. This is an advanced option that should only be used by server applications
+  requiring a more carefully orchestrated shutdown sequence.
+  </description>
+</property>
+
+<property>
+  <name>fs.s3n.block.size</name>
+  <value>67108864</value>
+  <description>Block size to use when reading files using the native S3
+  filesystem (s3n: URIs).</description>
+</property>
+
 <property>
   <name>local.cache.size</name>
   <value>10737418240</value>

+ 23 - 38
src/java/org/apache/hadoop/filecache/DistributedCache.java

@@ -34,11 +34,11 @@ import java.net.URI;
  * framework to cache files (text, archives, jars etc.) needed by applications.
  * </p>
  * 
- * <p>Applications specify the files, via urls (hdfs:// or http://) to be 
- * cached via the org.apache.hadoop.mapred.JobConf.
- * The <code>DistributedCache</code> assumes that the
- * files specified via hdfs:// urls are already present on the 
- * {@link FileSystem} at the path specified by the url.</p>
+ * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
+ * via the org.apache.hadoop.mapred.JobConf. The
+ * <code>DistributedCache</code> assumes that the files specified via urls are
+ * already present on the {@link FileSystem} at the path specified by the url
+ * and are accessible by every machine in the cluster.</p>
  * 
  * <p>The framework will copy the necessary files on to the slave node before 
  * any tasks for the job are executed on that node. Its efficiency stems from 
@@ -127,9 +127,7 @@ public class DistributedCache {
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param fileStatus The file status on the dfs.
@@ -160,9 +158,7 @@ public class DistributedCache {
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param fileStatus The file status on the dfs.
@@ -229,9 +225,7 @@ public class DistributedCache {
    * previously cached (and valid) or copy it from the {@link FileSystem} now.
    * 
    * @param cache the cache to be localized, this should be specified as 
-   * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
-   * or hostname:port is provided the file is assumed to be in the filesystem
-   * being used in the Configuration
+   * new URI(scheme://scheme-specific-part/absolute_path_to_file#LINKNAME).
    * @param conf The Confguration file which contains the filesystem
    * @param baseDir The base cache Dir where you wnat to localize the files/archives
    * @param isArchive if the cache is an archive or a file. In case it is an 
@@ -348,7 +342,7 @@ public class DistributedCache {
     if(cache.getFragment() == null) {
     	doSymlink = false;
     }
-    FileSystem fs = getFileSystem(cache, conf);
+    FileSystem fs = FileSystem.get(cache, conf);
     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
     File flink = new File(link);
     if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
@@ -531,14 +525,6 @@ public class DistributedCache {
     }  
   }
   
-  private static FileSystem getFileSystem(URI cache, Configuration conf)
-    throws IOException {
-    if ("hdfs".equals(cache.getScheme()))
-      return FileSystem.get(cache, conf);
-    else
-      return FileSystem.get(conf);
-  }
-
   /**
    * Set the configuration with the given set of archives
    * @param archives The list of archives that need to be localized
@@ -695,7 +681,7 @@ public class DistributedCache {
     throws IOException {
     String classpath = conf.get("mapred.job.classpath.files");
     conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
-             : classpath + System.getProperty("path.separator") + file.toString());
+             : classpath + "," + file.toString());
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(file).toUri();
 
@@ -708,14 +694,14 @@ public class DistributedCache {
    * @param conf Configuration that contains the classpath setting
    */
   public static Path[] getFileClassPaths(Configuration conf) {
-    String classpath = conf.get("mapred.job.classpath.files");
-    if (classpath == null)
-      return null;
-    ArrayList list = Collections.list(new StringTokenizer(classpath, System
-                                                          .getProperty("path.separator")));
+    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+                                "mapred.job.classpath.files");
+    if (list.size() == 0) { 
+      return null; 
+    }
     Path[] paths = new Path[list.size()];
     for (int i = 0; i < list.size(); i++) {
-      paths[i] = new Path((String) list.get(i));
+      paths[i] = new Path(list.get(i));
     }
     return paths;
   }
@@ -731,8 +717,7 @@ public class DistributedCache {
     throws IOException {
     String classpath = conf.get("mapred.job.classpath.archives");
     conf.set("mapred.job.classpath.archives", classpath == null ? archive
-             .toString() : classpath + System.getProperty("path.separator")
-             + archive.toString());
+             .toString() : classpath + "," + archive.toString());
     FileSystem fs = FileSystem.get(conf);
     URI uri = fs.makeQualified(archive).toUri();
 
@@ -745,14 +730,14 @@ public class DistributedCache {
    * @param conf Configuration that contains the classpath setting
    */
   public static Path[] getArchiveClassPaths(Configuration conf) {
-    String classpath = conf.get("mapred.job.classpath.archives");
-    if (classpath == null)
-      return null;
-    ArrayList list = Collections.list(new StringTokenizer(classpath, System
-                                                          .getProperty("path.separator")));
+    ArrayList<String> list = (ArrayList<String>)conf.getStringCollection(
+                                "mapred.job.classpath.archives");
+    if (list.size() == 0) { 
+      return null; 
+    }
     Path[] paths = new Path[list.size()];
     for (int i = 0; i < list.size(); i++) {
-      paths[i] = new Path((String) list.get(i));
+      paths[i] = new Path(list.get(i));
     }
     return paths;
   }

+ 66 - 0
src/java/org/apache/hadoop/fs/CreateFlag.java

@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+/****************************************************************
+ *CreateFlag specifies the file create semantic. Users can combine flags like:<br>
+ *<code>
+ * EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND)
+ * <code>
+ * and pass it to {@link org.apache.hadoop.fs.FileSystem #create(Path f, FsPermission permission,
+ * EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
+ * Progressable progress)}.
+ * 
+ * <p>
+ * Combine {@link #OVERWRITE} with either {@link #CREATE} 
+ * or {@link #APPEND} does the same as only use 
+ * {@link #OVERWRITE}. <br>
+ * Combine {@link #CREATE} with {@link #APPEND} has the semantic:
+ * <ol>
+ * <li> create the file if it does not exist;
+ * <li> append the file if it already exists.
+ * </ol>
+ *****************************************************************/
+public enum CreateFlag {
+
+  /**
+   * create the file if it does not exist, and throw an IOException if it
+   * already exists
+   */
+  CREATE((short) 0x01),
+
+  /**
+   * create the file if it does not exist, if it exists, overwrite it.
+   */
+  OVERWRITE((short) 0x02),
+
+  /**
+   * append to a file, and throw an IOException if it does not exist
+   */
+  APPEND((short) 0x04);
+
+  private short mode;
+
+  private CreateFlag(short mode) {
+    this.mode = mode;
+  }
+
+  short getMode() {
+    return mode;
+  }
+}

+ 68 - 19
src/java/org/apache/hadoop/fs/FileSystem.java

@@ -24,7 +24,9 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -65,12 +67,12 @@ import org.apache.hadoop.util.ReflectionUtils;
  * implementation is DistributedFileSystem.
  *****************************************************************/
 public abstract class FileSystem extends Configured implements Closeable {
-  private static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
+  public static final String FS_DEFAULT_NAME_KEY = "fs.default.name";
 
   public static final Log LOG = LogFactory.getLog(FileSystem.class);
 
   /** FileSystem cache */
-  private static final Cache CACHE = new Cache();
+  static final Cache CACHE = new Cache();
 
   /** The key this instance is stored under in the cache. */
   private Cache.Key key;
@@ -224,17 +226,6 @@ public abstract class FileSystem extends Configured implements Closeable {
     return (LocalFileSystem)newInstance(LocalFileSystem.NAME, conf);
   }
 
-  private static class ClientFinalizer extends Thread {
-    public synchronized void run() {
-      try {
-        FileSystem.closeAll();
-      } catch (IOException e) {
-        LOG.info("FileSystem.closeAll() threw an exception:\n" + e);
-      }
-    }
-  }
-  private static final ClientFinalizer clientFinalizer = new ClientFinalizer();
-
   /**
    * Close all cached filesystems. Be sure those filesystems are not
    * used anymore.
@@ -516,6 +507,7 @@ public abstract class FileSystem extends Configured implements Closeable {
   /**
    * Opens an FSDataOutputStream at the indicated Path with write-progress
    * reporting.
+   * @deprecated Consider using {@link #create(Path, FsPermission, EnumSet, int, short, long, Progressable)} instead.
    * @param f the file name to open
    * @param permission
    * @param overwrite if a file with this name already exists, then if true,
@@ -527,13 +519,36 @@ public abstract class FileSystem extends Configured implements Closeable {
    * @throws IOException
    * @see #setPermission(Path, FsPermission)
    */
-  public abstract FSDataOutputStream create(Path f,
+  public FSDataOutputStream create(Path f,
       FsPermission permission,
       boolean overwrite,
       int bufferSize,
       short replication,
       long blockSize,
-      Progressable progress) throws IOException;
+      Progressable progress) throws IOException{
+    return create(f, permission, overwrite ? EnumSet.of(CreateFlag.OVERWRITE)
+        : EnumSet.of(CreateFlag.CREATE), bufferSize, replication, blockSize,
+        progress);
+  }
+  
+  /**
+   * Opens an FSDataOutputStream at the indicated Path with write-progress
+   * reporting.
+   * @param f the file name to open.
+   * @param permission
+   * @param flag determines the semantic of this create.
+   * @param bufferSize the size of the buffer to be used.
+   * @param replication required block replication for the file.
+   * @param blockSize
+   * @param progress
+   * @throws IOException
+   * @see #setPermission(Path, FsPermission)
+   * @see CreateFlag
+   */
+  public abstract FSDataOutputStream create(Path f, FsPermission permission,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException ;
+  
 
   /**
    * Creates the given Path as a brand-new zero-length file.  If
@@ -1409,7 +1424,10 @@ public abstract class FileSystem extends Configured implements Closeable {
 
   /** Caching FileSystem objects */
   static class Cache {
+    private final ClientFinalizer clientFinalizer = new ClientFinalizer();
+
     private final Map<Key, FileSystem> map = new HashMap<Key, FileSystem>();
+    private final Set<Key> toAutoClose = new HashSet<Key>();
 
     /** A variable that makes all objects in the cache unique */
     private static AtomicLong unique = new AtomicLong(1);
@@ -1434,6 +1452,10 @@ public abstract class FileSystem extends Configured implements Closeable {
         }
         fs.key = key;
         map.put(key, fs);
+
+        if (conf.getBoolean("fs.automatic.close", true)) {
+          toAutoClose.add(key);
+        }
       }
       return fs;
     }
@@ -1441,6 +1463,7 @@ public abstract class FileSystem extends Configured implements Closeable {
     synchronized void remove(Key key, FileSystem fs) {
       if (map.containsKey(key) && fs == map.get(key)) {
         map.remove(key);
+        toAutoClose.remove(key);
         if (map.isEmpty() && !clientFinalizer.isAlive()) {
           if (!Runtime.getRuntime().removeShutdownHook(clientFinalizer)) {
             LOG.info("Could not cancel cleanup thread, though no " +
@@ -1451,11 +1474,27 @@ public abstract class FileSystem extends Configured implements Closeable {
     }
 
     synchronized void closeAll() throws IOException {
+      closeAll(false);
+    }
+
+    /**
+     * Close all FileSystem instances in the Cache.
+     * @param onlyAutomatic only close those that are marked for automatic closing
+     */
+    synchronized void closeAll(boolean onlyAutomatic) throws IOException {
       List<IOException> exceptions = new ArrayList<IOException>();
-      for(; !map.isEmpty(); ) {
-        Map.Entry<Key, FileSystem> e = map.entrySet().iterator().next();
-        final Key key = e.getKey();
-        final FileSystem fs = e.getValue();
+
+      // Make a copy of the keys in the map since we'll be modifying
+      // the map while iterating over it, which isn't safe.
+      List<Key> keys = new ArrayList<Key>();
+      keys.addAll(map.keySet());
+
+      for (Key key : keys) {
+        final FileSystem fs = map.get(key);
+
+        if (onlyAutomatic && !toAutoClose.contains(key)) {
+          continue;
+        }
 
         //remove from cache
         remove(key, fs);
@@ -1475,6 +1514,16 @@ public abstract class FileSystem extends Configured implements Closeable {
       }
     }
 
+    private class ClientFinalizer extends Thread {
+      public synchronized void run() {
+        try {
+          closeAll(true);
+        } catch (IOException e) {
+          LOG.info("FileSystem.Cache.closeAll() threw an exception:\n" + e);
+        }
+      }
+    }
+
     /** FileSystem.Cache.Key */
     static class Key {
       final String scheme;

+ 3 - 2
src/java/org/apache/hadoop/fs/FilterFileSystem.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
 
 import java.io.*;
 import java.net.URI;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -106,10 +107,10 @@ public class FilterFileSystem extends FileSystem {
   /** {@inheritDoc} */
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     return fs.create(f, permission,
-        overwrite, bufferSize, replication, blockSize, progress);
+        flag, bufferSize, replication, blockSize, progress);
   }
 
   /**

+ 101 - 55
src/java/org/apache/hadoop/fs/FsShell.java

@@ -61,6 +61,7 @@ public class FsShell extends Configured implements Tool {
   static final String COPYTOLOCAL_SHORT_USAGE = GET_SHORT_USAGE.replace(
       "-get", "-copyToLocal");
   static final String TAIL_USAGE="-tail [-f] <file>";
+  static final String DU_USAGE="-du [-s] [-h] <paths...>";
 
   /**
    */
@@ -670,58 +671,98 @@ public class FsShell extends Configured implements Tool {
 
   /**
    * Show the size of all files that match the file pattern <i>src</i>
-   * @param src a file pattern specifying source files
+   * @param cmd
+   * @param pos ignore anything before this pos in cmd
    * @throws IOException  
    * @see org.apache.hadoop.fs.FileSystem#globStatus(Path)
    */
-  void du(String src) throws IOException {
-    Path srcPath = new Path(src);
-    FileSystem srcFs = srcPath.getFileSystem(getConf());
-    Path[] pathItems = FileUtil.stat2Paths(srcFs.globStatus(srcPath), 
-                                           srcPath);
-    FileStatus items[] = srcFs.listStatus(pathItems);
-    if ((items == null) || ((items.length == 0) && 
-        (!srcFs.exists(srcPath)))){
-      throw new FileNotFoundException("Cannot access " + src
-            + ": No such file or directory.");
-    } else {
-      System.out.println("Found " + items.length + " items");
-      int maxLength = 10;
-      
-      long length[] = new long[items.length];
-      for (int i = 0; i < items.length; i++) {
-        length[i] = items[i].isDir() ?
-          srcFs.getContentSummary(items[i].getPath()).getLength() :
-          items[i].getLen();
-        int len = String.valueOf(length[i]).length();
-        if (len > maxLength) maxLength = len;
+  void du(String[] cmd, int pos) throws IOException {
+    CommandFormat c = new CommandFormat(
+      "du", 0, Integer.MAX_VALUE, "h", "s");
+    List<String> params;
+    try {
+      params = c.parse(cmd, pos);
+    } catch (IllegalArgumentException iae) {
+      System.err.println("Usage: java FsShell " + DU_USAGE);
+      throw iae;
+    }
+    boolean humanReadable = c.getOpt("h");
+    boolean summary = c.getOpt("s");
+
+    // Default to cwd
+    if (params.isEmpty()) {
+      params.add(".");
+    }
+
+    List<UsagePair> usages = new ArrayList<UsagePair>();
+
+    for (String src : params) {
+      Path srcPath = new Path(src);
+      FileSystem srcFs = srcPath.getFileSystem(getConf());
+      FileStatus globStatus[] = srcFs.globStatus(srcPath);
+      FileStatus statusToPrint[];
+
+      if (summary) {
+        statusToPrint = globStatus;
+      } else {
+        Path statPaths[] = FileUtil.stat2Paths(globStatus, srcPath);
+        statusToPrint = srcFs.listStatus(statPaths);
       }
-      for(int i = 0; i < items.length; i++) {
-        System.out.printf("%-"+ (maxLength + BORDER) +"d", length[i]);
-        System.out.println(items[i].getPath());
+      if ((statusToPrint == null) || ((statusToPrint.length == 0) &&
+                                      (!srcFs.exists(srcPath)))){
+        throw new FileNotFoundException("Cannot access " + src
+                                        + ": No such file or directory.");
+      }
+
+      if (!summary) {
+        System.out.println("Found " + statusToPrint.length + " items");
+      }
+
+      for (FileStatus stat : statusToPrint) {
+        long length;
+        if (summary || stat.isDir()) {
+          length = srcFs.getContentSummary(stat.getPath()).getLength();
+        } else {
+          length = stat.getLen();
+        }
+
+        usages.add(new UsagePair(String.valueOf(stat.getPath()), length));
       }
     }
+    printUsageSummary(usages, humanReadable);
   }
     
   /**
    * Show the summary disk usage of each dir/file 
    * that matches the file pattern <i>src</i>
-   * @param src a file pattern specifying source files
+   * @param cmd
+   * @param pos ignore anything before this pos in cmd
    * @throws IOException  
    * @see org.apache.hadoop.fs.FileSystem#globStatus(Path)
    */
-  void dus(String src) throws IOException {
-    Path srcPath = new Path(src);
-    FileSystem srcFs = srcPath.getFileSystem(getConf());
-    FileStatus status[] = srcFs.globStatus(new Path(src));
-    if (status==null || status.length==0) {
-      throw new FileNotFoundException("Cannot access " + src + 
-          ": No such file or directory.");
+  void dus(String[] cmd, int pos) throws IOException {
+    String newcmd[] = new String[cmd.length + 1];
+    System.arraycopy(cmd, 0, newcmd, 0, cmd.length);
+    newcmd[cmd.length] = "-s";
+    du(newcmd, pos);
+  }
+
+  private void printUsageSummary(List<UsagePair> usages,
+                                 boolean humanReadable) {
+    int maxColumnWidth = 0;
+    for (UsagePair usage : usages) {
+      String toPrint = humanReadable ?
+        StringUtils.humanReadableInt(usage.bytes) : String.valueOf(usage.bytes);
+      if (toPrint.length() > maxColumnWidth) {
+        maxColumnWidth = toPrint.length();
+      }
     }
-    for(int i=0; i<status.length; i++) {
-      long totalSize = srcFs.getContentSummary(status[i].getPath()).getLength();
-      String pathStr = status[i].getPath().toString();
-      System.out.println(("".equals(pathStr)?".":pathStr) + "\t" + totalSize);
+
+    for (UsagePair usage : usages) {
+      String toPrint = humanReadable ?
+        StringUtils.humanReadableInt(usage.bytes) : String.valueOf(usage.bytes);
+      System.out.printf("%-"+ (maxColumnWidth + BORDER) +"s", toPrint);
+      System.out.println(usage.path);
     }
   }
 
@@ -1129,10 +1170,13 @@ public class FsShell extends Configured implements Tool {
 
     while (true) {
       FSDataInputStream in = srcFs.open(path);
-      in.seek(offset);
-      IOUtils.copyBytes(in, System.out, 1024, false);
-      offset = in.getPos();
-      in.close();
+      try {
+        in.seek(offset);
+        IOUtils.copyBytes(in, System.out, 1024);
+        offset = in.getPos();
+      } finally {
+        in.close();
+      }
       if (!foption) {
         break;
       }
@@ -1558,10 +1602,6 @@ public class FsShell extends Configured implements Tool {
           delete(argv[i], true);
         } else if ("-df".equals(cmd)) {
           df(argv[i]);
-        } else if ("-du".equals(cmd)) {
-          du(argv[i]);
-        } else if ("-dus".equals(cmd)) {
-          dus(argv[i]);
         } else if (Count.matches(cmd)) {
           new Count(argv, i, getConf()).runAll();
         } else if ("-ls".equals(cmd)) {
@@ -1809,17 +1849,9 @@ public class FsShell extends Configured implements Tool {
           df(null);
         }
       } else if ("-du".equals(cmd)) {
-        if (i < argv.length) {
-          exitCode = doall(cmd, argv, i);
-        } else {
-          du(".");
-        }
+        du(argv, i);
       } else if ("-dus".equals(cmd)) {
-        if (i < argv.length) {
-          exitCode = doall(cmd, argv, i);
-        } else {
-          dus(".");
-        }         
+        dus(argv, i);
       } else if (Count.matches(cmd)) {
         exitCode = new Count(argv, i, getConf()).runAll();
       } else if ("-mkdir".equals(cmd)) {
@@ -1922,4 +1954,18 @@ public class FsShell extends Configured implements Tool {
           throw new IOException("Multiple IOExceptions: " + exceptions);
     }
   }
+
+
+  /**
+   * Utility class for a line of du output
+   */
+  private static class UsagePair {
+    public String path;
+    public long bytes;
+
+    public UsagePair(String path, long bytes) {
+      this.path = path;
+      this.bytes = bytes;
+    }
+  }
 }

+ 12 - 2
src/java/org/apache/hadoop/fs/RawLocalFileSystem.java

@@ -28,6 +28,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -244,10 +245,19 @@ public class RawLocalFileSystem extends FileSystem {
   /** {@inheritDoc} */
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
+    
+      if(flag.contains(CreateFlag.APPEND)){
+        if (!exists(f)){
+          if(flag.contains(CreateFlag.CREATE))
+            return create(f, false, bufferSize, replication, blockSize, progress);
+        }
+        return append(f, bufferSize, progress);
+    }
+   
     FSDataOutputStream out = create(f,
-        overwrite, bufferSize, replication, blockSize, progress);
+        flag.contains(CreateFlag.OVERWRITE), bufferSize, replication, blockSize, progress);
     setPermission(f, permission);
     return out;
   }

+ 15 - 1
src/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java

@@ -21,6 +21,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.EnumSet;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.net.ftp.FTP;
@@ -28,6 +30,7 @@ import org.apache.commons.net.ftp.FTPClient;
 import org.apache.commons.net.ftp.FTPFile;
 import org.apache.commons.net.ftp.FTPReply;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -193,19 +196,30 @@ public class FTPFileSystem extends FileSystem {
    */
   @Override
   public FSDataOutputStream create(Path file, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     final FTPClient client = connect();
     Path workDir = new Path(client.printWorkingDirectory());
     Path absolute = makeAbsolute(workDir, file);
+    
+    boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+    boolean create = flag.contains(CreateFlag.CREATE);
+    boolean append= flag.contains(CreateFlag.APPEND);
+    
     if (exists(client, file)) {
       if (overwrite) {
         delete(client, file);
+      } else if(append){
+        return append(file, bufferSize, progress);
       } else {
         disconnect(client);
         throw new IOException("File already exists: " + file);
       }
+    } else {
+      if(append && !create)
+        throw new FileNotFoundException("File does not exist: "+ file);
     }
+    
     Path parent = absolute.getParent();
     if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
       parent = (parent == null) ? new Path("/") : parent;

+ 0 - 1
src/java/org/apache/hadoop/fs/kfs/IFSImpl.java

@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * We need to provide the ability to the code in fs/kfs without really
  * having a KFS deployment.  In particular, the glue code that wraps

+ 0 - 1
src/java/org/apache/hadoop/fs/kfs/KFSImpl.java

@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Provide the implementation of KFS which turn into calls to KfsAccess.
  */

+ 0 - 1
src/java/org/apache/hadoop/fs/kfs/KFSInputStream.java

@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Implements the Hadoop FSInputStream interfaces to allow applications to read
  * files in Kosmos File System (KFS).

+ 0 - 1
src/java/org/apache/hadoop/fs/kfs/KFSOutputStream.java

@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Implements the Hadoop FSOutputStream interfaces to allow applications to write to
  * files in Kosmos File System (KFS).

+ 12 - 4
src/java/org/apache/hadoop/fs/kfs/KosmosFileSystem.java

@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Implements the Hadoop FS interfaces to allow applications to store
  *files in Kosmos File System (KFS).
@@ -23,9 +22,11 @@ package org.apache.hadoop.fs.kfs;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
+import java.util.EnumSet;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -120,7 +121,6 @@ public class KosmosFileSystem extends FileSystem {
     }
 
     @Override
-    @Deprecated
     public boolean isDirectory(Path path) throws IOException {
 	Path absolute = makeAbsolute(path);
         String srep = absolute.toUri().getPath();
@@ -131,7 +131,6 @@ public class KosmosFileSystem extends FileSystem {
     }
 
     @Override
-    @Deprecated
     public boolean isFile(Path path) throws IOException {
 	Path absolute = makeAbsolute(path);
         String srep = absolute.toUri().getPath();
@@ -186,16 +185,25 @@ public class KosmosFileSystem extends FileSystem {
 
     @Override
     public FSDataOutputStream create(Path file, FsPermission permission,
-                                     boolean overwrite, int bufferSize,
+                                     EnumSet<CreateFlag> flag, int bufferSize,
 				     short replication, long blockSize, Progressable progress)
 	throws IOException {
 
+      boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
+      boolean create = flag.contains(CreateFlag.CREATE);
+      boolean append= flag.contains(CreateFlag.APPEND);
+      
         if (exists(file)) {
             if (overwrite) {
                 delete(file, true);
+            } else if (append){
+             return append(file, bufferSize, progress);
             } else {
                 throw new IOException("File already exists: " + file);
             }
+        } else {
+          if(append && !create)
+            throw new FileNotFoundException("File does not exist: "+ file);
         }
 
 	Path parent = file.getParent();

+ 15 - 2
src/java/org/apache/hadoop/fs/s3/S3FileSystem.java

@@ -22,12 +22,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -201,18 +203,24 @@ public class S3FileSystem extends FileSystem {
    */
   @Override
   public FSDataOutputStream create(Path file, FsPermission permission,
-      boolean overwrite, int bufferSize,
+      EnumSet<CreateFlag> flag, int bufferSize,
       short replication, long blockSize, Progressable progress)
     throws IOException {
 
     INode inode = store.retrieveINode(makeAbsolute(file));
     if (inode != null) {
-      if (overwrite) {
+      if (flag.contains(CreateFlag.OVERWRITE)) {
         delete(file, true);
+      } else if (flag.contains(CreateFlag.APPEND)){
+        return append(file, bufferSize, progress);
       } else {
         throw new IOException("File already exists: " + file);
       }
     } else {
+      
+      if(flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
+        throw new FileNotFoundException("File does not exist: "+ file);
+      
       Path parent = file.getParent();
       if (parent != null) {
         if (!mkdirs(parent)) {
@@ -324,6 +332,11 @@ public class S3FileSystem extends FileSystem {
     }
     return new S3FileStatus(f.makeQualified(this), inode);
   }
+  
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong("fs.s3.block.size", 64 * 1024 * 1024);
+  }
 
   // diagnostic methods
 

+ 43 - 66
src/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java

@@ -24,6 +24,7 @@ import java.io.BufferedInputStream;
 import java.io.ByteArrayInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
@@ -53,10 +54,7 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
             s3Credentials.getSecretAccessKey());
       this.s3Service = new RestS3Service(awsCredentials);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
     bucket = new S3Bucket(uri.getHost());
   }
@@ -76,10 +74,7 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       }
       s3Service.putObject(bucket, object);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     } finally {
       if (in != null) {
         try {
@@ -99,10 +94,7 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       object.setContentLength(0);
       s3Service.putObject(bucket, object);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
   }
   
@@ -116,10 +108,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       if (e.getMessage().contains("ResponseCode=404")) {
         return null;
       }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
+      return null; //never returned - keep compiler happy
     }
   }
   
@@ -128,13 +118,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       S3Object object = s3Service.getObject(bucket, key);
       return object.getDataInputStream();
     } catch (S3ServiceException e) {
-      if ("NoSuchKey".equals(e.getS3ErrorCode())) {
-        return null;
-      }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(key, e);
+      return null; //never returned - keep compiler happy
     }
   }
   
@@ -145,32 +130,22 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
                                             null, byteRangeStart, null);
       return object.getDataInputStream();
     } catch (S3ServiceException e) {
-      if ("NoSuchKey".equals(e.getS3ErrorCode())) {
-        return null;
-      }
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(key, e);
+      return null; //never returned - keep compiler happy
     }
   }
 
   public PartialListing list(String prefix, int maxListingLength)
     throws IOException {
-    return list(prefix, maxListingLength, null);
+    return list(prefix, maxListingLength, null, false);
   }
   
-  public PartialListing list(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
+  public PartialListing list(String prefix, int maxListingLength, String priorLastKey,
+      boolean recurse) throws IOException {
 
-    return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
+    return list(prefix, recurse ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
   }
 
-  public PartialListing listAll(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
-
-    return list(prefix, null, maxListingLength, priorLastKey);
-  }
 
   private PartialListing list(String prefix, String delimiter,
       int maxListingLength, String priorLastKey) throws IOException {
@@ -191,10 +166,8 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
       return new PartialListing(chunk.getPriorLastKey(), fileMetadata,
           chunk.getCommonPrefixes());
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
+      return null; //never returned - keep compiler happy
     }
   }
 
@@ -202,36 +175,27 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
     try {
       s3Service.deleteObject(bucket, key);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(key, e);
     }
   }
   
-  public void rename(String srcKey, String dstKey) throws IOException {
+  public void copy(String srcKey, String dstKey) throws IOException {
     try {
-      s3Service.moveObject(bucket.getName(), srcKey, bucket.getName(),
+      s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
           new S3Object(dstKey), false);
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(srcKey, e);
     }
   }
 
   public void purge(String prefix) throws IOException {
     try {
       S3Object[] objects = s3Service.listObjects(bucket, prefix, null);
-      for (int i = 0; i < objects.length; i++) {
-        s3Service.deleteObject(bucket, objects[i].getKey());
+      for (S3Object object : objects) {
+        s3Service.deleteObject(bucket, object.getKey());
       }
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
   }
 
@@ -240,16 +204,29 @@ class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
     sb.append(bucket.getName()).append("\n");
     try {
       S3Object[] objects = s3Service.listObjects(bucket);
-      for (int i = 0; i < objects.length; i++) {
-        sb.append(objects[i].getKey()).append("\n");
+      for (S3Object object : objects) {
+        sb.append(object.getKey()).append("\n");
       }
     } catch (S3ServiceException e) {
-      if (e.getCause() instanceof IOException) {
-        throw (IOException) e.getCause();
-      }
-      throw new S3Exception(e);
+      handleServiceException(e);
     }
     System.out.println(sb);
   }
-  
+
+  private void handleServiceException(String key, S3ServiceException e) throws IOException {
+    if ("NoSuchKey".equals(e.getS3ErrorCode())) {
+      throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
+    } else {
+      handleServiceException(e);
+    }
+  }
+
+  private void handleServiceException(S3ServiceException e) throws IOException {
+    if (e.getCause() instanceof IOException) {
+      throw (IOException) e.getCause();
+    }
+    else {
+      throw new S3Exception(e);
+    }
+  }
 }

+ 2 - 4
src/java/org/apache/hadoop/fs/s3native/NativeFileSystemStore.java

@@ -42,14 +42,12 @@ interface NativeFileSystemStore {
   InputStream retrieve(String key, long byteRangeStart) throws IOException;
   
   PartialListing list(String prefix, int maxListingLength) throws IOException;
-  PartialListing list(String prefix, int maxListingLength, String priorLastKey)
+  PartialListing list(String prefix, int maxListingLength, String priorLastKey, boolean recursive)
     throws IOException;
-  PartialListing listAll(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException;
   
   void delete(String key) throws IOException;
 
-  void rename(String srcKey, String dstKey) throws IOException;
+  void copy(String srcKey, String dstKey) throws IOException;
   
   /**
    * Delete all keys with the given prefix. Used for testing.

+ 154 - 104
src/java/org/apache/hadoop/fs/s3native/NativeS3FileSystem.java

@@ -30,6 +30,7 @@ import java.security.DigestOutputStream;
 import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BufferedFSInputStream;
+import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FSInputStream;
@@ -61,6 +63,17 @@ import org.apache.hadoop.util.Progressable;
  * Unlike {@link org.apache.hadoop.fs.s3.S3FileSystem} this implementation
  * stores files on S3 in their
  * native form so they can be read by other S3 tools.
+ *
+ * A note about directories. S3 of course has no "native" support for them.
+ * The idiom we choose then is: for any directory created by this class,
+ * we use an empty object "#{dirpath}_$folder$" as a marker.
+ * Further, to interoperate with other S3 tools, we also accept the following:
+ *  - an object "#{dirpath}/' denoting a directory marker
+ *  - if there exists any objects with the prefix "#{dirpath}/", then the
+ *    directory is said to exist
+ *  - if both a file with the name of a directory and a marker for that
+ *    directory exists, then the *file masks the directory*, and the directory
+ *    is never returned.
  * </p>
  * @see org.apache.hadoop.fs.s3.S3FileSystem
  */
@@ -70,7 +83,6 @@ public class NativeS3FileSystem extends FileSystem {
     LogFactory.getLog(NativeS3FileSystem.class);
   
   private static final String FOLDER_SUFFIX = "_$folder$";
-  private static final long MAX_S3_FILE_SIZE = 5 * 1024 * 1024 * 1024L;
   static final String PATH_DELIMITER = Path.SEPARATOR;
   private static final int S3_MAX_LISTING_LENGTH = 1000;
   
@@ -85,6 +97,7 @@ public class NativeS3FileSystem extends FileSystem {
       this.key = key;
     }
     
+    @Override
     public synchronized int read() throws IOException {
       int result = in.read();
       if (result != -1) {
@@ -95,6 +108,7 @@ public class NativeS3FileSystem extends FileSystem {
       }
       return result;
     }
+    @Override
     public synchronized int read(byte[] b, int off, int len)
       throws IOException {
       
@@ -108,18 +122,23 @@ public class NativeS3FileSystem extends FileSystem {
       return result;
     }
 
+    @Override
     public void close() throws IOException {
       in.close();
     }
 
+    @Override
     public synchronized void seek(long pos) throws IOException {
       in.close();
+      LOG.info("Opening key '" + key + "' for reading at position '" + pos + "'");
       in = store.retrieve(key, pos);
       this.pos = pos;
     }
+    @Override
     public synchronized long getPos() throws IOException {
       return pos;
     }
+    @Override
     public boolean seekToNewSource(long targetPos) throws IOException {
       return false;
     }
@@ -140,6 +159,7 @@ public class NativeS3FileSystem extends FileSystem {
       this.conf = conf;
       this.key = key;
       this.backupFile = newBackupFile();
+      LOG.info("OutputStream for key '" + key + "' writing to tempfile '" + this.backupFile + "'");
       try {
         this.digest = MessageDigest.getInstance("MD5");
         this.backupStream = new BufferedOutputStream(new DigestOutputStream(
@@ -174,6 +194,7 @@ public class NativeS3FileSystem extends FileSystem {
       }
 
       backupStream.close();
+      LOG.info("OutputStream for key '" + key + "' closed. Now beginning upload");
       
       try {
         byte[] md5Hash = digest == null ? null : digest.digest();
@@ -185,7 +206,7 @@ public class NativeS3FileSystem extends FileSystem {
         super.close();
         closed = true;
       } 
-
+      LOG.info("OutputStream for key '" + key + "' upload complete");
     }
 
     @Override
@@ -197,8 +218,6 @@ public class NativeS3FileSystem extends FileSystem {
     public void write(byte[] b, int off, int len) throws IOException {
       backupStream.write(b, off, len);
     }
-    
-    
   }
   
   private URI uri;
@@ -242,6 +261,7 @@ public class NativeS3FileSystem extends FileSystem {
     Map<String, RetryPolicy> methodNameToPolicyMap =
       new HashMap<String, RetryPolicy>();
     methodNameToPolicyMap.put("storeFile", methodPolicy);
+    methodNameToPolicyMap.put("rename", methodPolicy);
     
     return (NativeFileSystemStore)
       RetryProxy.create(NativeFileSystemStore.class, store,
@@ -249,10 +269,19 @@ public class NativeS3FileSystem extends FileSystem {
   }
   
   private static String pathToKey(Path path) {
+    if (path.toUri().getScheme() != null && "".equals(path.toUri().getPath())) {
+      // allow uris without trailing slash after bucket to refer to root,
+      // like s3n://mybucket
+      return "";
+    }
     if (!path.isAbsolute()) {
       throw new IllegalArgumentException("Path must be absolute: " + path);
     }
-    return path.toUri().getPath().substring(1); // remove initial slash
+    String ret = path.toUri().getPath().substring(1); // remove initial slash
+    if (ret.endsWith("/") && (ret.indexOf("/") != ret.length() - 1)) {
+      ret = ret.substring(0, ret.length() -1);
+  }
+    return ret;
   }
   
   private static Path keyToPath(String key) {
@@ -267,6 +296,7 @@ public class NativeS3FileSystem extends FileSystem {
   }
 
   /** This optional operation is not yet supported. */
+  @Override
   public FSDataOutputStream append(Path f, int bufferSize,
       Progressable progress) throws IOException {
     throw new IOException("Not supported");
@@ -274,12 +304,21 @@ public class NativeS3FileSystem extends FileSystem {
   
   @Override
   public FSDataOutputStream create(Path f, FsPermission permission,
-      boolean overwrite, int bufferSize, short replication, long blockSize,
+      EnumSet<CreateFlag> flag, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
 
-    if (exists(f) && !overwrite) {
-      throw new IOException("File already exists:"+f);
+    if(exists(f)) {
+      if(flag.contains(CreateFlag.APPEND)){
+        return append(f, bufferSize, progress);
+      } else if(!flag.contains(CreateFlag.OVERWRITE)) {
+        throw new IOException("File already exists: "+f);
+      }
+    } else {
+      if (flag.contains(CreateFlag.APPEND) && !flag.contains(CreateFlag.CREATE))
+        throw new IOException("File already exists: " + f.toString());
     }
+    
+    LOG.debug("Creating new file '" + f + "' in S3");
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     return new FSDataOutputStream(new NativeS3FsOutputStream(getConf(), store,
@@ -287,27 +326,41 @@ public class NativeS3FileSystem extends FileSystem {
   }
   
   @Override
-  public boolean delete(Path f, boolean recursive) throws IOException {
+  public boolean delete(Path f, boolean recurse) throws IOException {
     FileStatus status;
     try {
       status = getFileStatus(f);
     } catch (FileNotFoundException e) {
+      LOG.debug("Delete called for '" + f + "' but file does not exist, so returning false");
       return false;
     }
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     if (status.isDir()) {
-      FileStatus[] contents = listStatus(f);
-      if (!recursive && contents.length > 0) {
-        throw new IOException("Directory " + f.toString() + " is not empty.");
+      if (!recurse && listStatus(f).length > 0) {
+        throw new IOException("Can not delete " + f + " at is a not empty directory and recurse option is false");
       }
-      for (FileStatus p : contents) {
-        if (!delete(p.getPath(), recursive)) {
-          return false;
+
+      createParent(f);
+
+      LOG.debug("Deleting directory '" + f  + "'");
+      String priorLastKey = null;
+      do {
+        PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, true);
+        for (FileMetadata file : listing.getFiles()) {
+          store.delete(file.getKey());
         }
+        priorLastKey = listing.getPriorLastKey();
+      } while (priorLastKey != null);
+
+      try {
+        store.delete(key + FOLDER_SUFFIX);
+      } catch (FileNotFoundException e) {
+        //this is fine, we don't require a marker
       }
-      store.delete(key + FOLDER_SUFFIX);
     } else {
+      LOG.debug("Deleting file '" + f + "'");
+      createParent(f);
       store.delete(key);
     }
     return true;
@@ -315,7 +368,6 @@ public class NativeS3FileSystem extends FileSystem {
 
   @Override
   public FileStatus getFileStatus(Path f) throws IOException {
-    
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     
@@ -323,23 +375,28 @@ public class NativeS3FileSystem extends FileSystem {
       return newDirectory(absolutePath);
     }
     
+    LOG.debug("getFileStatus retrieving metadata for key '" + key + "'");
     FileMetadata meta = store.retrieveMetadata(key);
     if (meta != null) {
+      LOG.debug("getFileStatus returning 'file' for key '" + key + "'");
       return newFile(meta, absolutePath);
     }
     if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
+      LOG.debug("getFileStatus returning 'directory' for key '" + key + "' as '"
+          + key + FOLDER_SUFFIX + "' exists");
       return newDirectory(absolutePath);
     }
     
+    LOG.debug("getFileStatus listing key '" + key + "'");
     PartialListing listing = store.list(key, 1);
     if (listing.getFiles().length > 0 ||
         listing.getCommonPrefixes().length > 0) {
+      LOG.debug("getFileStatus returning 'directory' for key '" + key + "' as it has contents");
       return newDirectory(absolutePath);
     }
     
-    throw new FileNotFoundException(absolutePath +
-        ": No such file or directory.");
-    
+    LOG.debug("getFileStatus could not find key '" + key + "'");
+    throw new FileNotFoundException("No such file or directory '" + absolutePath + "'");
   }
 
   @Override
@@ -372,16 +429,20 @@ public class NativeS3FileSystem extends FileSystem {
     Set<FileStatus> status = new TreeSet<FileStatus>();
     String priorLastKey = null;
     do {
-      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, 
-          priorLastKey);
+      PartialListing listing = store.list(key, S3_MAX_LISTING_LENGTH, priorLastKey, false);
       for (FileMetadata fileMetadata : listing.getFiles()) {
         Path subpath = keyToPath(fileMetadata.getKey());
         String relativePath = pathUri.relativize(subpath.toUri()).getPath();
-        if (relativePath.endsWith(FOLDER_SUFFIX)) {
-          status.add(newDirectory(new Path(absolutePath,
-              relativePath.substring(0,
-                  relativePath.indexOf(FOLDER_SUFFIX)))));
-        } else {
+
+        if (fileMetadata.getKey().equals(key + "/")) {
+          // this is just the directory we have been asked to list
+        }
+        else if (relativePath.endsWith(FOLDER_SUFFIX)) {
+          status.add(newDirectory(new Path(
+              absolutePath,
+              relativePath.substring(0, relativePath.indexOf(FOLDER_SUFFIX)))));
+        }
+        else {
           status.add(newFile(fileMetadata, subpath));
         }
       }
@@ -398,17 +459,16 @@ public class NativeS3FileSystem extends FileSystem {
       return null;
     }
     
-    return status.toArray(new FileStatus[0]);
+    return status.toArray(new FileStatus[status.size()]);
   }
   
   private FileStatus newFile(FileMetadata meta, Path path) {
-    return new FileStatus(meta.getLength(), false, 1, MAX_S3_FILE_SIZE,
+    return new FileStatus(meta.getLength(), false, 1, getDefaultBlockSize(),
         meta.getLastModified(), path.makeQualified(this));
   }
   
   private FileStatus newDirectory(Path path) {
-    return new FileStatus(0, true, 1, MAX_S3_FILE_SIZE, 0,
-        path.makeQualified(this));
+    return new FileStatus(0, true, 1, 0, 0, path.makeQualified(this));
   }
 
   @Override
@@ -432,10 +492,11 @@ public class NativeS3FileSystem extends FileSystem {
       FileStatus fileStatus = getFileStatus(f);
       if (!fileStatus.isDir()) {
         throw new IOException(String.format(
-            "Can't make directory for path %s since it is a file.", f));
+            "Can't make directory for path '%s' since it is a file.", f));
 
       }
     } catch (FileNotFoundException e) {
+      LOG.debug("Making dir '" + f + "' in S3");
       String key = pathToKey(f) + FOLDER_SUFFIX;
       store.storeEmptyFile(key);    
     }
@@ -444,9 +505,11 @@ public class NativeS3FileSystem extends FileSystem {
 
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    if (!exists(f)) {
-      throw new FileNotFoundException(f.toString());
+    FileStatus fs = getFileStatus(f); // will throw if the file doesn't exist
+    if (fs.isDir()) {
+      throw new IOException("'" + f + "' is a directory");
     }
+    LOG.info("Opening '" + f + "' for reading");
     Path absolutePath = makeAbsolute(f);
     String key = pathToKey(absolutePath);
     return new FSDataInputStream(new BufferedFSInputStream(
@@ -456,47 +519,16 @@ public class NativeS3FileSystem extends FileSystem {
   // rename() and delete() use this method to ensure that the parent directory
   // of the source does not vanish.
   private void createParent(Path path) throws IOException {
-      Path parent = path.getParent();
-      if (parent != null) {
-          String key = pathToKey(makeAbsolute(parent));
-          if (key.length() > 0) {
-              store.storeEmptyFile(key + FOLDER_SUFFIX);
-          }
+    Path parent = path.getParent();
+    if (parent != null) {
+      String key = pathToKey(makeAbsolute(parent));
+      if (key.length() > 0) {
+          store.storeEmptyFile(key + FOLDER_SUFFIX);
       }
+    }
   }
   
-  private boolean existsAndIsFile(Path f) throws IOException {
-    
-    Path absolutePath = makeAbsolute(f);
-    String key = pathToKey(absolutePath);
-    
-    if (key.length() == 0) {
-        return false;
-    }
-    
-    FileMetadata meta = store.retrieveMetadata(key);
-    if (meta != null) {
-        // S3 object with given key exists, so this is a file
-        return true;
-    }
     
-    if (store.retrieveMetadata(key + FOLDER_SUFFIX) != null) {
-        // Signifies empty directory
-        return false;
-    }
-    
-    PartialListing listing = store.list(key, 1, null);
-    if (listing.getFiles().length > 0 ||
-        listing.getCommonPrefixes().length > 0) {
-        // Non-empty directory
-        return false;
-    }
-    
-    throw new FileNotFoundException(absolutePath +
-        ": No such file or directory");
-}
-
-
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
 
@@ -507,60 +539,79 @@ public class NativeS3FileSystem extends FileSystem {
       return false;
     }
 
+    final String debugPreamble = "Renaming '" + src + "' to '" + dst + "' - ";
+
     // Figure out the final destination
     String dstKey;
     try {
-      boolean dstIsFile = existsAndIsFile(dst);
+      boolean dstIsFile = !getFileStatus(dst).isDir();
       if (dstIsFile) {
-        // Attempting to overwrite a file using rename()
+        LOG.debug(debugPreamble + "returning false as dst is an already existing file");
         return false;
       } else {
-        // Move to within the existent directory
+        LOG.debug(debugPreamble + "using dst as output directory");
         dstKey = pathToKey(makeAbsolute(new Path(dst, src.getName())));
       }
     } catch (FileNotFoundException e) {
-      // dst doesn't exist, so we can proceed
+      LOG.debug(debugPreamble + "using dst as output destination");
       dstKey = pathToKey(makeAbsolute(dst));
       try {
         if (!getFileStatus(dst.getParent()).isDir()) {
-          return false; // parent dst is a file
+          LOG.debug(debugPreamble + "returning false as dst parent exists and is a file");
+          return false;
         }
       } catch (FileNotFoundException ex) {
-        return false; // parent dst does not exist
+        LOG.debug(debugPreamble + "returning false as dst parent does not exist");
+        return false;
       }
     }
 
+    boolean srcIsFile;
     try {
-      boolean srcIsFile = existsAndIsFile(src);
-      if (srcIsFile) {
-        store.rename(srcKey, dstKey);
-      } else {
-        // Move the folder object
-        store.delete(srcKey + FOLDER_SUFFIX);
-        store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
-
-        // Move everything inside the folder
-        String priorLastKey = null;
-        do {
-          PartialListing listing = store.listAll(srcKey, S3_MAX_LISTING_LENGTH,
-              priorLastKey);
-          for (FileMetadata file : listing.getFiles()) {
-            store.rename(file.getKey(), dstKey
-                + file.getKey().substring(srcKey.length()));
-          }
-          priorLastKey = listing.getPriorLastKey();
-        } while (priorLastKey != null);
-      }
-
-      createParent(src);
-      return true;
-
+      srcIsFile = !getFileStatus(src).isDir();
     } catch (FileNotFoundException e) {
-      // Source file does not exist;
+      LOG.debug(debugPreamble + "returning false as src does not exist");
       return false;
     }
-  }
+    if (srcIsFile) {
+      LOG.debug(debugPreamble + "src is file, so doing copy then delete in S3");
+      store.copy(srcKey, dstKey);
+      store.delete(srcKey);
+    } else {
+      LOG.debug(debugPreamble + "src is directory, so copying contents");
+      store.storeEmptyFile(dstKey + FOLDER_SUFFIX);
+
+      List<String> keysToDelete = new ArrayList<String>();
+      String priorLastKey = null;
+      do {
+        PartialListing listing = store.list(srcKey, S3_MAX_LISTING_LENGTH, priorLastKey, true);
+        for (FileMetadata file : listing.getFiles()) {
+          keysToDelete.add(file.getKey());
+          store.copy(file.getKey(), dstKey + file.getKey().substring(srcKey.length()));
+        }
+        priorLastKey = listing.getPriorLastKey();
+      } while (priorLastKey != null);
+
+      LOG.debug(debugPreamble + "all files in src copied, now removing src files");
+      for (String key: keysToDelete) {
+        store.delete(key);
+      }
+
+      try {
+        store.delete(srcKey + FOLDER_SUFFIX);
+      } catch (FileNotFoundException e) {
+        //this is fine, we don't require a marker
+      }
+      LOG.debug(debugPreamble + "done");
+    }
 
+    return true;
+  }
+  
+  @Override
+  public long getDefaultBlockSize() {
+    return getConf().getLong("fs.s3n.block.size", 64 * 1024 * 1024);
+  }
 
   /**
    * Set the working directory to the given directory.
@@ -574,5 +625,4 @@ public class NativeS3FileSystem extends FileSystem {
   public Path getWorkingDirectory() {
     return workingDir;
   }
-
 }

+ 5 - 3
src/java/org/apache/hadoop/http/HttpServer.java

@@ -238,13 +238,15 @@ public class HttpServer implements FilterContainer {
   }
 
   /**
-   * Add an internal servlet in the server.
+   * Add an internal servlet in the server. 
+   * Note: This method is to be used for adding servlets that facilitate
+   * internal communication and not for user facing functionality. For
+   * servlets added using this method, filters are not enabled. 
+   * 
    * @param name The name of the servlet (can be passed as null)
    * @param pathSpec The path spec for the servlet
    * @param clazz The servlet class
-   * @deprecated this is a temporary method
    */
-  @Deprecated
   public void addInternalServlet(String name, String pathSpec,
       Class<? extends HttpServlet> clazz) {
     ServletHolder holder = new ServletHolder(clazz);

+ 0 - 60
src/java/org/apache/hadoop/io/DeprecatedUTF8.java

@@ -1,60 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.io;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-/**
- * Wrapper for {@link UTF8}.
- * This class should be used only when it is absolutely necessary
- * to use {@link UTF8}. The only difference is that using this class
- * does not require "@SuppressWarning" annotation to avoid javac warning. 
- * Instead the deprecation is implied in the class name.
- */
-@SuppressWarnings("deprecation")
-public class DeprecatedUTF8 extends UTF8 {
-  
-  public DeprecatedUTF8() {
-    super();
-  }
-
-  /** Construct from a given string. */
-  public DeprecatedUTF8(String string) {
-    super(string);
-  }
-
-  /** Construct from a given string. */
-  public DeprecatedUTF8(DeprecatedUTF8 utf8) {
-    super(utf8);
-  }
-  
-  /* The following two are the mostly commonly used methods.
-   * wrapping them so that editors do not complain about the deprecation.
-   */
-  
-  public static String readString(DataInput in) throws IOException {
-    return UTF8.readString(in);
-  }
-  
-  public static int writeString(DataOutput out, String s) throws IOException {
-    return UTF8.writeString(out, s);
-  }
-}

+ 22 - 10
src/java/org/apache/hadoop/io/IOUtils.java

@@ -41,17 +41,8 @@ public class IOUtils {
   public static void copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) 
     throws IOException {
 
-    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
-    byte buf[] = new byte[buffSize];
     try {
-      int bytesRead = in.read(buf);
-      while (bytesRead >= 0) {
-        out.write(buf, 0, bytesRead);
-        if ((ps != null) && ps.checkError()) {
-          throw new IOException("Unable to write to output stream.");
-        }
-        bytesRead = in.read(buf);
-      }
+      copyBytes(in, out, buffSize);
     } finally {
       if(close) {
         out.close();
@@ -60,6 +51,27 @@ public class IOUtils {
     }
   }
   
+  /**
+   * Copies from one stream to another.
+   * 
+   * @param in InputStrem to read from
+   * @param out OutputStream to write to
+   * @param buffSize the size of the buffer 
+   */
+  public static void copyBytes(InputStream in, OutputStream out, int buffSize) 
+    throws IOException {
+
+    PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
+    byte buf[] = new byte[buffSize];
+    int bytesRead = in.read(buf);
+    while (bytesRead >= 0) {
+      out.write(buf, 0, bytesRead);
+      if ((ps != null) && ps.checkError()) {
+        throw new IOException("Unable to write to output stream.");
+      }
+      bytesRead = in.read(buf);
+    }
+  }
   /**
    * Copies from one stream to another. <strong>closes the input and output streams 
    * at the end</strong>.

+ 12 - 9
src/java/org/apache/hadoop/metrics/ContextFactory.java

@@ -188,16 +188,19 @@ public class ContextFactory {
   private void setAttributes() throws IOException {
     InputStream is = getClass().getResourceAsStream(PROPERTIES_FILE);
     if (is != null) {
-      Properties properties = new Properties();
-      properties.load(is);
-      //for (Object propertyNameObj : properties.keySet()) {
-      Iterator it = properties.keySet().iterator();
-      while (it.hasNext()) {
-        String propertyName = (String) it.next();
-        String propertyValue = properties.getProperty(propertyName);
-        setAttribute(propertyName, propertyValue);
+      try {
+        Properties properties = new Properties();
+        properties.load(is);
+        //for (Object propertyNameObj : properties.keySet()) {
+        Iterator it = properties.keySet().iterator();
+        while (it.hasNext()) {
+          String propertyName = (String) it.next();
+          String propertyValue = properties.getProperty(propertyName);
+          setAttribute(propertyName, propertyValue);
+        }
+      } finally {
+        is.close();
       }
-      is.close();
     }
   }
     

+ 3 - 0
src/java/org/apache/hadoop/net/NetUtils.java

@@ -132,6 +132,9 @@ public class NetUtils {
    */
   public static InetSocketAddress createSocketAddr(String target,
                                                    int defaultPort) {
+    if (target == null) {
+      throw new IllegalArgumentException("Target address cannot be null.");
+    }
     int colonIndex = target.indexOf(':');
     if (colonIndex < 0 && defaultPort == -1) {
       throw new RuntimeException("Not a host:port pair: " + target);

+ 158 - 84
src/java/org/apache/hadoop/util/ProcessTree.java

@@ -54,73 +54,24 @@ public class ProcessTree {
   }
 
   /**
-   * Kills the process(OR process group) by sending the signal SIGKILL
-   * in the current thread
-   * @param pid Process id(OR process group id) of to-be-deleted-process
-   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
-   * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
-   *  sending SIGTERM
-   */
-  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
-      long sleepTimeBeforeSigKill) {
-    // Kill the subprocesses of root process(even if the root process is not
-    // alive) if process group is to be killed.
-    if (isProcessGroup || ProcessTree.isAlive(pid)) {
-      try {
-        // Sleep for some time before sending SIGKILL
-        Thread.sleep(sleepTimeBeforeSigKill);
-      } catch (InterruptedException i) {
-        LOG.warn("Thread sleep is interrupted.");
-      }
-
-      ShellCommandExecutor shexec = null;
-
-      try {
-        String pid_pgrpid;
-        if(isProcessGroup) {//kill the whole process group
-          pid_pgrpid = "-" + pid;
-        }
-        else {//kill single process
-          pid_pgrpid = pid;
-        }
-        
-        String[] args = { "kill", "-9", pid_pgrpid };
-        shexec = new ShellCommandExecutor(args);
-        shexec.execute();
-      } catch (IOException ioe) {
-        LOG.warn("Error executing shell command " + ioe);
-      } finally {
-        if(isProcessGroup) {
-          LOG.info("Killing process group" + pid + " with SIGKILL. Exit code "
-            + shexec.getExitCode());
-        }
-        else {
-          LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
-                    + shexec.getExitCode());
-        }
-      }
-    }
-  }
-
-  /** Kills the process(OR process group) by sending the signal SIGKILL
-   * @param pid Process id(OR process group id) of to-be-deleted-process
-   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * Destroy the process-tree.
+   * @param pid process id of the root process of the subtree of processes
+   *            to be killed
    * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
    *                               after sending SIGTERM
+   * @param isProcessGroup pid is a process group leader or not
    * @param inBackground Process is to be killed in the back ground with
    *                     a separate thread
    */
-  private static void sigKill(String pid, boolean isProcessGroup,
-                        long sleeptimeBeforeSigkill, boolean inBackground) {
-
-    if(inBackground) { // use a separate thread for killing
-      SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
-                                                      sleeptimeBeforeSigkill);
-      sigKillThread.setDaemon(true);
-      sigKillThread.start();
+  public static void destroy(String pid, long sleeptimeBeforeSigkill,
+                             boolean isProcessGroup, boolean inBackground) {
+    if(isProcessGroup) {
+      destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
     }
     else {
-      sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+      //TODO: Destroy all the processes in the subtree in this case also.
+      // For the time being, killing only the root process.
+      destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
     }
   }
 
@@ -133,6 +84,29 @@ public class ProcessTree {
    */
   protected static void destroyProcess(String pid, long sleeptimeBeforeSigkill,
                                     boolean inBackground) {
+    terminateProcess(pid);
+    sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
+  }
+
+  /** Destroy the process group.
+   * @param pgrpId Process group id of to-be-killed-processes
+   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
+   *                               after sending SIGTERM
+   * @param inBackground Process group is to be killed in the back ground with
+   *                     a separate thread
+   */
+  protected static void destroyProcessGroup(String pgrpId,
+                       long sleeptimeBeforeSigkill, boolean inBackground) {
+    terminateProcessGroup(pgrpId);
+    sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
+  }
+
+  /**
+   * Sends terminate signal to the process, allowing it to gracefully exit.
+   * 
+   * @param pid pid of the process to be sent SIGTERM
+   */
+  public static void terminateProcess(String pid) {
     ShellCommandExecutor shexec = null;
     try {
       String[] args = { "kill", pid };
@@ -144,19 +118,15 @@ public class ProcessTree {
       LOG.info("Killing process " + pid +
                " with SIGTERM. Exit code " + shexec.getExitCode());
     }
-    
-    sigKill(pid, false, sleeptimeBeforeSigkill, inBackground);
   }
-  
-  /** Destroy the process group.
-   * @param pgrpId Process group id of to-be-killed-processes
-   * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
-   *                               after sending SIGTERM
-   * @param inBackground Process group is to be killed in the back ground with
-   *                     a separate thread
+
+  /**
+   * Sends terminate signal to all the process belonging to the passed process
+   * group, allowing the group to gracefully exit.
+   * 
+   * @param pgrpId process group id
    */
-  protected static void destroyProcessGroup(String pgrpId,
-                       long sleeptimeBeforeSigkill, boolean inBackground) {
+  public static void terminateProcessGroup(String pgrpId) {
     ShellCommandExecutor shexec = null;
     try {
       String[] args = { "kill", "--", "-" + pgrpId };
@@ -168,37 +138,115 @@ public class ProcessTree {
       LOG.info("Killing all processes in the process group " + pgrpId +
                " with SIGTERM. Exit code " + shexec.getExitCode());
     }
-    
-    sigKill(pgrpId, true, sleeptimeBeforeSigkill, inBackground);
   }
 
   /**
-   * Destroy the process-tree.
-   * @param pid process id of the root process of the subtree of processes
-   *            to be killed
+   * Kills the process(OR process group) by sending the signal SIGKILL
+   * in the current thread
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
+   * @param sleepTimeBeforeSigKill wait time before sending SIGKILL after
+   *  sending SIGTERM
+   */
+  private static void sigKillInCurrentThread(String pid, boolean isProcessGroup,
+      long sleepTimeBeforeSigKill) {
+    // Kill the subprocesses of root process(even if the root process is not
+    // alive) if process group is to be killed.
+    if (isProcessGroup || ProcessTree.isAlive(pid)) {
+      try {
+        // Sleep for some time before sending SIGKILL
+        Thread.sleep(sleepTimeBeforeSigKill);
+      } catch (InterruptedException i) {
+        LOG.warn("Thread sleep is interrupted.");
+      }
+      if(isProcessGroup) {
+        killProcessGroup(pid);
+      } else {
+        killProcess(pid);
+      }
+    }  
+  }
+  
+
+  /** Kills the process(OR process group) by sending the signal SIGKILL
+   * @param pid Process id(OR process group id) of to-be-deleted-process
+   * @param isProcessGroup Is pid a process group id of to-be-deleted-processes
    * @param sleeptimeBeforeSigkill The time to wait before sending SIGKILL
    *                               after sending SIGTERM
-   * @param isProcessGroup pid is a process group leader or not
    * @param inBackground Process is to be killed in the back ground with
    *                     a separate thread
    */
-  public static void destroy(String pid, long sleeptimeBeforeSigkill,
-                             boolean isProcessGroup, boolean inBackground) {
-    if(isProcessGroup) {
-      destroyProcessGroup(pid, sleeptimeBeforeSigkill, inBackground);
+  private static void sigKill(String pid, boolean isProcessGroup,
+                        long sleeptimeBeforeSigkill, boolean inBackground) {
+
+    if(inBackground) { // use a separate thread for killing
+      SigKillThread sigKillThread = new SigKillThread(pid, isProcessGroup,
+                                                      sleeptimeBeforeSigkill);
+      sigKillThread.setDaemon(true);
+      sigKillThread.start();
     }
     else {
-      //TODO: Destroy all the processes in the subtree in this case also.
-      // For the time being, killing only the root process.
-      destroyProcess(pid, sleeptimeBeforeSigkill, inBackground);
+      sigKillInCurrentThread(pid, isProcessGroup, sleeptimeBeforeSigkill);
+    }
+  }
+
+  /**
+   * Sends kill signal to process, forcefully terminating the process.
+   * 
+   * @param pid process id
+   */
+  public static void killProcess(String pid) {
+
+    //If process tree is not alive then return immediately.
+    if(!ProcessTree.isAlive(pid)) {
+      return;
+    }
+    String[] args = { "kill", "-9", pid };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(args);
+    try {
+      shexec.execute();
+    } catch (IOException e) {
+      LOG.warn("Error sending SIGKILL to process "+ pid + " ."+ 
+          StringUtils.stringifyException(e));
+    } finally {
+      LOG.info("Killing process " + pid + " with SIGKILL. Exit code "
+          + shexec.getExitCode());
     }
   }
 
+  /**
+   * Sends kill signal to all process belonging to same process group,
+   * forcefully terminating the process group.
+   * 
+   * @param pgrpId process group id
+   */
+  public static void killProcessGroup(String pgrpId) {
+
+    //If process tree is not alive then return immediately.
+    if(!ProcessTree.isProcessGroupAlive(pgrpId)) {
+      return;
+    }
 
+    String[] args = { "kill", "-9", "-"+pgrpId };
+    ShellCommandExecutor shexec = new ShellCommandExecutor(args);
+    try {
+      shexec.execute();
+    } catch (IOException e) {
+      LOG.warn("Error sending SIGKILL to process group "+ pgrpId + " ."+ 
+          StringUtils.stringifyException(e));
+    } finally {
+      LOG.info("Killing process group" + pgrpId + " with SIGKILL. Exit code "
+          + shexec.getExitCode());
+    }
+  }
+  
   /**
    * Is the process with PID pid still alive?
    * This method assumes that isAlive is called on a pid that was alive not
    * too long ago, and hence assumes no chance of pid-wrapping-around.
+   * 
+   * @param pid pid of the process to check.
+   * @return true if process is alive.
    */
   public static boolean isAlive(String pid) {
     ShellCommandExecutor shexec = null;
@@ -215,6 +263,32 @@ public class ProcessTree {
     }
     return (shexec.getExitCode() == 0 ? true : false);
   }
+  
+  /**
+   * Is the process group with  still alive?
+   * 
+   * This method assumes that isAlive is called on a pid that was alive not
+   * too long ago, and hence assumes no chance of pid-wrapping-around.
+   * 
+   * @param pgrpId process group id
+   * @return true if any of process in group is alive.
+   */
+  public static boolean isProcessGroupAlive(String pgrpId) {
+    ShellCommandExecutor shexec = null;
+    try {
+      String[] args = { "kill", "-0", "-"+pgrpId };
+      shexec = new ShellCommandExecutor(args);
+      shexec.execute();
+    } catch (ExitCodeException ee) {
+      return false;
+    } catch (IOException ioe) {
+      LOG.warn("Error executing shell command "
+          + Arrays.toString(shexec.getExecString()) + ioe);
+      return false;
+    }
+    return (shexec.getExitCode() == 0 ? true : false);
+  }
+  
 
   /**
    * Helper thread class that kills process-tree with SIGKILL in background

+ 89 - 11
src/java/org/apache/hadoop/util/ProcfsBasedProcessTree.java

@@ -47,6 +47,10 @@ public class ProcfsBasedProcessTree extends ProcessTree {
   private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
       .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
 
+  // to enable testing, using this variable which can be configured
+  // to a test directory.
+  private String procfsDir;
+  
   private Integer pid = -1;
   private boolean setsidUsed = false;
   private long sleeptimeBeforeSigkill = DEFAULT_SLEEPTIME_BEFORE_SIGKILL;
@@ -59,11 +63,29 @@ public class ProcfsBasedProcessTree extends ProcessTree {
 
   public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
                                 long sigkillInterval) {
+    this(pid, setsidUsed, sigkillInterval, PROCFS);
+  }
+
+  /**
+   * Build a new process tree rooted at the pid.
+   * 
+   * This method is provided mainly for testing purposes, where
+   * the root of the proc file system can be adjusted.
+   * 
+   * @param pid root of the process tree
+   * @param setsidUsed true, if setsid was used for the root pid
+   * @param sigkillInterval how long to wait between a SIGTERM and SIGKILL 
+   *                        when killing a process tree
+   * @param procfsDir the root of a proc file system - only used for testing. 
+   */
+  public ProcfsBasedProcessTree(String pid, boolean setsidUsed,
+                                long sigkillInterval, String procfsDir) {
     this.pid = getValidPID(pid);
     this.setsidUsed = setsidUsed;
     sleeptimeBeforeSigkill = sigkillInterval;
+    this.procfsDir = procfsDir;
   }
-
+  
   /**
    * Sets SIGKILL interval
    * @deprecated Use {@link ProcfsBasedProcessTree#ProcfsBasedProcessTree(
@@ -108,13 +130,17 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       List<Integer> processList = getProcessList();
 
       Map<Integer, ProcessInfo> allProcessInfo = new HashMap<Integer, ProcessInfo>();
+      
+      // cache the processTree to get the age for processes
+      Map<Integer, ProcessInfo> oldProcs = 
+              new HashMap<Integer, ProcessInfo>(processTree);
       processTree.clear();
 
       ProcessInfo me = null;
       for (Integer proc : processList) {
         // Get information for each process
         ProcessInfo pInfo = new ProcessInfo(proc);
-        if (constructProcessInfo(pInfo) != null) {
+        if (constructProcessInfo(pInfo, procfsDir) != null) {
           allProcessInfo.put(proc, pInfo);
           if (proc.equals(this.pid)) {
             me = pInfo; // cache 'me'
@@ -150,6 +176,16 @@ public class ProcfsBasedProcessTree extends ProcessTree {
         pInfoQueue.addAll(pInfo.getChildren());
       }
 
+      // update age values.
+      for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
+        ProcessInfo oldInfo = oldProcs.get(procs.getKey());
+        if (oldInfo != null) {
+          if (procs.getValue() != null) {
+            procs.getValue().updateAge(oldInfo);  
+          }
+        }
+      }
+
       if (LOG.isDebugEnabled()) {
         // Log.debug the ProcfsBasedProcessTree
         LOG.debug(this.toString());
@@ -269,15 +305,29 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    * @return cumulative virtual memory used by the process-tree in bytes.
    */
   public long getCumulativeVmem() {
+    // include all processes.. all processes will be older than 0.
+    return getCumulativeVmem(0);
+  }
+
+  /**
+   * Get the cumulative virtual memory used by all the processes in the
+   * process-tree that are older than the passed in age.
+   * 
+   * @param olderThanAge processes above this age are included in the
+   *                      memory addition
+   * @return cumulative virtual memory used by the process-tree in bytes,
+   *          for processes older than this age.
+   */
+  public long getCumulativeVmem(int olderThanAge) {
     long total = 0;
     for (ProcessInfo p : processTree.values()) {
-      if (p != null) {
+      if ((p != null) && (p.getAge() > olderThanAge)) {
         total += p.getVmem();
       }
     }
     return total;
   }
-
+  
   private static Integer getValidPID(String pid) {
     Integer retPid = -1;
     try {
@@ -295,13 +345,13 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    * Get the list of all processes in the system.
    */
   private List<Integer> getProcessList() {
-    String[] processDirs = (new File(PROCFS)).list();
+    String[] processDirs = (new File(procfsDir)).list();
     List<Integer> processList = new ArrayList<Integer>();
 
     for (String dir : processDirs) {
       try {
         int pd = Integer.parseInt(dir);
-        if ((new File(PROCFS + dir)).isDirectory()) {
+        if ((new File(procfsDir, dir)).isDirectory()) {
           processList.add(Integer.valueOf(pd));
         }
       } catch (NumberFormatException n) {
@@ -319,12 +369,29 @@ public class ProcfsBasedProcessTree extends ProcessTree {
    * same. Returns null on failing to read from procfs,
    */
   private static ProcessInfo constructProcessInfo(ProcessInfo pinfo) {
+    return constructProcessInfo(pinfo, PROCFS);
+  }
+
+  /**
+   * Construct the ProcessInfo using the process' PID and procfs rooted at the
+   * specified directory and return the same. It is provided mainly to assist
+   * testing purposes.
+   * 
+   * Returns null on failing to read from procfs,
+   *
+   * @param pinfo ProcessInfo that needs to be updated
+   * @param procfsDir root of the proc file system
+   * @return updated ProcessInfo, null on errors.
+   */
+  private static ProcessInfo constructProcessInfo(ProcessInfo pinfo, 
+                                                    String procfsDir) {
     ProcessInfo ret = null;
-    // Read "/proc/<pid>/stat" file
+    // Read "procfsDir/<pid>/stat" file - typically /proc/<pid>/stat
     BufferedReader in = null;
     FileReader fReader = null;
     try {
-      fReader = new FileReader(PROCFS + pinfo.getPid() + "/stat");
+      File pidDir = new File(procfsDir, String.valueOf(pinfo.getPid()));
+      fReader = new FileReader(new File(pidDir, "/stat"));
       in = new BufferedReader(fReader);
     } catch (FileNotFoundException f) {
       // The process vanished in the interim!
@@ -338,7 +405,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       boolean mat = m.find();
       if (mat) {
         // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
-        pinfo.update(m.group(2), Integer.parseInt(m.group(3)), Integer
+        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
             .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
             .parseLong(m.group(7)));
       }
@@ -365,7 +432,6 @@ public class ProcfsBasedProcessTree extends ProcessTree {
 
     return ret;
   }
-
   /**
    * Returns a string printing PIDs of process present in the
    * ProcfsBasedProcessTree. Output format : [pid pid ..]
@@ -391,10 +457,14 @@ public class ProcfsBasedProcessTree extends ProcessTree {
     private Integer ppid; // parent process-id
     private Integer sessionId; // session-id
     private Long vmem; // virtual memory usage
+    // how many times has this process been seen alive
+    private int age; 
     private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
 
     public ProcessInfo(int pid) {
       this.pid = Integer.valueOf(pid);
+      // seeing this the first time.
+      this.age = 1;
     }
 
     public Integer getPid() {
@@ -421,6 +491,10 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       return vmem;
     }
 
+    public int getAge() {
+      return age;
+    }
+    
     public boolean isParent(ProcessInfo p) {
       if (pid.equals(p.getPpid())) {
         return true;
@@ -428,7 +502,7 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       return false;
     }
 
-    public void update(String name, Integer ppid, Integer pgrpId,
+    public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
         Integer sessionId, Long vmem) {
       this.name = name;
       this.ppid = ppid;
@@ -437,6 +511,10 @@ public class ProcfsBasedProcessTree extends ProcessTree {
       this.vmem = vmem;
     }
 
+    public void updateAge(ProcessInfo oldInfo) {
+      this.age = oldInfo.age + 1;
+    }
+    
     public boolean addChild(ProcessInfo p) {
       return children.add(p);
     }

+ 103 - 8
src/java/org/apache/hadoop/util/Progress.java

@@ -20,19 +20,32 @@ package org.apache.hadoop.util;
 
 import java.util.ArrayList;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 /** Utility to assist with generation of progress reports.  Applications build
  * a hierarchy of {@link Progress} instances, each modelling a phase of
  * execution.  The root is constructed with {@link #Progress()}.  Nodes for
  * sub-phases are created by calling {@link #addPhase()}.
  */
 public class Progress {
+  private static final Log LOG = LogFactory.getLog(Progress.class);
   private String status = "";
   private float progress;
   private int currentPhase;
   private ArrayList<Progress> phases = new ArrayList<Progress>();
   private Progress parent;
-  private float progressPerPhase;
 
+  // Each phase can have different progress weightage. For example, in
+  // Map Task, map phase accounts for 66.7% and sort phase for 33.3%.
+  // User needs to give weightages as parameters to all phases(when adding
+  // phases) in a Progress object, if he wants to give weightage to any of the
+  // phases. So when nodes are added without specifying weightage, it means 
+  // fixed weightage for all phases.
+  private boolean fixedWeightageForAllPhases = false;
+  private float progressPerPhase = 0.0f;
+  private ArrayList<Float> progressWeightagesForPhases = new ArrayList<Float>();
+  
   /** Creates a new root node. */
   public Progress() {}
 
@@ -43,15 +56,73 @@ public class Progress {
     return phase;
   }
 
-  /** Adds a node to the tree. */
+  /** Adds a node to the tree. Gives equal weightage to all phases */
   public synchronized Progress addPhase() {
+    Progress phase = addNewPhase();
+    // set equal weightage for all phases
+    progressPerPhase = 1.0f / (float)phases.size();
+    fixedWeightageForAllPhases = true;
+    return phase;
+  }
+  
+  /** Adds a new phase. Caller needs to set progress weightage */
+  private synchronized Progress addNewPhase() {
     Progress phase = new Progress();
     phases.add(phase);
     phase.setParent(this);
-    progressPerPhase = 1.0f / (float)phases.size();
     return phase;
   }
 
+  /** Adds a named node with a specified progress weightage to the tree. */
+  public Progress addPhase(String status, float weightage) {
+    Progress phase = addPhase(weightage);
+    phase.setStatus(status);
+
+    return phase;
+  }
+
+  /** Adds a node with a specified progress weightage to the tree. */
+  public synchronized Progress addPhase(float weightage) {
+    Progress phase = new Progress();
+    progressWeightagesForPhases.add(weightage);
+    phases.add(phase);
+    phase.setParent(this);
+
+    // Ensure that the sum of weightages does not cross 1.0
+    float sum = 0;
+    for (int i = 0; i < phases.size(); i++) {
+      sum += progressWeightagesForPhases.get(i);
+    }
+    if (sum > 1.0) {
+      LOG.warn("Sum of weightages can not be more than 1.0; But sum = " + sum);
+    }
+
+    return phase;
+  }
+
+  /** Adds n nodes to the tree. Gives equal weightage to all phases */
+  public synchronized void addPhases(int n) {
+    for (int i = 0; i < n; i++) {
+      addNewPhase();
+    }
+    // set equal weightage for all phases
+    progressPerPhase = 1.0f / (float)phases.size();
+    fixedWeightageForAllPhases = true;
+  }
+
+  /**
+   * returns progress weightage of the given phase
+   * @param phaseNum the phase number of the phase(child node) for which we need
+   *                 progress weightage
+   * @return returns the progress weightage of the specified phase
+   */
+  float getProgressWeightage(int phaseNum) {
+    if (fixedWeightageForAllPhases) {
+      return progressPerPhase; // all phases are of equal weightage
+    }
+    return progressWeightagesForPhases.get(phaseNum);
+  }
+
   synchronized Progress getParent() { return parent; }
   synchronized void setParent(Progress parent) { this.parent = parent; }
   
@@ -89,8 +160,8 @@ public class Progress {
   }
 
   /** Returns the overall progress of the root. */
-  // this method probably does not need to be synchronized as getINternal() is synchronized 
-  // and the node's parent never changes. Still, it doesn't hurt. 
+  // this method probably does not need to be synchronized as getInternal() is
+  // synchronized and the node's parent never changes. Still, it doesn't hurt. 
   public synchronized float get() {
     Progress node = this;
     while (node.getParent() != null) {                 // find the root
@@ -99,13 +170,37 @@ public class Progress {
     return node.getInternal();
   }
 
+  /**
+   * Returns progress in this node. get() would give overall progress of the
+   * root node(not just given current node).
+   */
+  public synchronized float getProgress() {
+    return getInternal();
+  }
+  
   /** Computes progress in this node. */
   private synchronized float getInternal() {
     int phaseCount = phases.size();
     if (phaseCount != 0) {
-      float subProgress =
-        currentPhase < phaseCount ? phase().getInternal() : 0.0f;
-      return progressPerPhase*(currentPhase + subProgress);
+      float subProgress = 0.0f;
+      float progressFromCurrentPhase = 0.0f;
+      if (currentPhase < phaseCount) {
+        subProgress = phase().getInternal();
+        progressFromCurrentPhase =
+          getProgressWeightage(currentPhase) * subProgress;
+      }
+      
+      float progressFromCompletedPhases = 0.0f;
+      if (fixedWeightageForAllPhases) { // same progress weightage for each phase
+        progressFromCompletedPhases = progressPerPhase * currentPhase;
+      }
+      else {
+        for (int i = 0; i < currentPhase; i++) {
+          // progress weightages of phases could be different. Add them
+          progressFromCompletedPhases += getProgressWeightage(i);
+        }
+      }
+      return  progressFromCompletedPhases + progressFromCurrentPhase;
     } else {
       return progress;
     }

+ 2 - 2
src/java/org/apache/hadoop/util/RunJar.java

@@ -108,7 +108,7 @@ public class RunJar {
 
     File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
     boolean b = tmpDir.mkdirs();
-    if (!b || !tmpDir.isDirectory()) { 
+    if (!b && !tmpDir.isDirectory()) { 
       System.err.println("Mkdirs failed to create " + tmpDir);
       System.exit(-1);
     }
@@ -119,7 +119,7 @@ public class RunJar {
       System.exit(-1);
     }
     b = workDir.mkdirs();
-    if (!b || !workDir.isDirectory()) {
+    if (!b && !workDir.isDirectory()) {
       System.err.println("Mkdirs failed to create " + workDir);
       System.exit(-1);
     }

+ 2 - 1
src/java/org/apache/hadoop/util/StringUtils.java

@@ -88,7 +88,8 @@ public class StringUtils {
     double result = number;
     String suffix = "";
     if (absNumber < 1024) {
-      // nothing
+      // since no division has occurred, don't format with a decimal point
+      return String.valueOf(number);
     } else if (absNumber < 1024 * 1024) {
       result = number / 1024.0;
       suffix = "k";

+ 9 - 0
src/test/core/org/apache/hadoop/filecache/TestDistributedCache.java

@@ -55,6 +55,15 @@ public class TestDistributedCache extends TestCase {
     assertTrue("DistributedCache failed deleting old cache when the cache store is full.",
         dirStatuses.length > 1);
   }
+  
+  public void testFileSystemOtherThanDefault() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("fs.fakefile.impl", conf.get("fs.file.impl"));
+    Path fileToCache = new Path("fakefile:///" + firstCacheFile.toUri().getPath());
+    Path result = DistributedCache.getLocalCache(fileToCache.toUri(), conf, new Path(TEST_CACHE_BASE_DIR), 
+        false, System.currentTimeMillis(), new Path(TEST_ROOT_DIR));
+    assertNotNull("DistributedCache cached file on non-default filesystem.", result);
+  }
 
   private void createTempFile(FileSystem fs, Path p) throws IOException {
     FSDataOutputStream out = fs.create(p);

+ 0 - 1
src/test/core/org/apache/hadoop/fs/kfs/KFSEmulationImpl.java

@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * We need to provide the ability to the code in fs/kfs without really
  * having a KFS deployment.  For this purpose, use the LocalFileSystem

+ 0 - 1
src/test/core/org/apache/hadoop/fs/kfs/TestKosmosFileSystem.java

@@ -12,7 +12,6 @@
  * implied. See the License for the specific language governing
  * permissions and limitations under the License.
  *
- * @author: Sriram Rao (Kosmix Corp.)
  * 
  * Unit tests for testing the KosmosFileSystem API implementation.
  */

+ 12 - 0
src/test/core/org/apache/hadoop/fs/s3/S3FileSystemContractBaseTest.java

@@ -23,6 +23,7 @@ import java.net.URI;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.Path;
 
 public abstract class S3FileSystemContractBaseTest
   extends FileSystemContractBaseTest {
@@ -45,4 +46,15 @@ public abstract class S3FileSystemContractBaseTest
     super.tearDown();
   }
   
+  public void testBlockSize() throws Exception {
+    
+    long newBlockSize = fs.getDefaultBlockSize() * 2;
+    fs.getConf().setLong("fs.s3.block.size", newBlockSize);
+    
+    Path file = path("/test/hadoop/file");
+    createFile(file);
+    assertEquals("Double default block size", newBlockSize,
+	fs.getFileStatus(file).getBlockSize());
+  }
+  
 }

+ 7 - 12
src/test/core/org/apache/hadoop/fs/s3native/InMemoryNativeFileSystemStore.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3native;
 
 import static org.apache.hadoop.fs.s3native.NativeS3FileSystem.PATH_DELIMITER;
+
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.ByteArrayOutputStream;
@@ -122,19 +123,13 @@ class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
 
   public PartialListing list(String prefix, int maxListingLength)
       throws IOException {
-    return list(prefix, maxListingLength, null);
+    return list(prefix, maxListingLength, null, false);
   }
 
   public PartialListing list(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
-
-    return list(prefix, PATH_DELIMITER, maxListingLength, priorLastKey);
-  }
-
-  public PartialListing listAll(String prefix, int maxListingLength,
-      String priorLastKey) throws IOException {
+      String priorLastKey, boolean recursive) throws IOException {
 
-    return list(prefix, null, maxListingLength, priorLastKey);
+    return list(prefix, recursive ? null : PATH_DELIMITER, maxListingLength, priorLastKey);
   }
 
   private PartialListing list(String prefix, String delimiter,
@@ -174,9 +169,9 @@ class InMemoryNativeFileSystemStore implements NativeFileSystemStore {
     dataMap.remove(key);
   }
 
-  public void rename(String srcKey, String dstKey) throws IOException {
-    metadataMap.put(dstKey, metadataMap.remove(srcKey));
-    dataMap.put(dstKey, dataMap.remove(srcKey));
+  public void copy(String srcKey, String dstKey) throws IOException {
+    metadataMap.put(dstKey, metadataMap.get(srcKey));
+    dataMap.put(dstKey, dataMap.get(srcKey));
   }
   
   public void purge(String prefix) throws IOException {

+ 90 - 0
src/test/core/org/apache/hadoop/fs/s3native/NativeS3FileSystemContractBaseTest.java

@@ -56,4 +56,94 @@ public abstract class NativeS3FileSystemContractBaseTest
     assertEquals(path("/test"), paths[0].getPath());
   }
   
+  public void testNoTrailingBackslashOnBucket() throws Exception {
+    assertTrue(fs.getFileStatus(new Path(fs.getUri().toString())).isDir());
+  }
+
+  private void createTestFiles(String base) throws IOException {
+    store.storeEmptyFile(base + "/file1");
+    store.storeEmptyFile(base + "/dir/file2");
+    store.storeEmptyFile(base + "/dir/file3");
+  }
+
+  public void testDirWithDifferentMarkersWorks() throws Exception {
+
+    for (int i = 0; i < 3; i++) {
+      String base = "test/hadoop" + i;
+      Path path = path("/" + base);
+
+      createTestFiles(base);
+
+      if (i == 0 ) {
+        //do nothing, we are testing correctness with no markers
+      }
+      else if (i == 1) {
+        // test for _$folder$ marker
+        store.storeEmptyFile(base + "_$folder$");
+        store.storeEmptyFile(base + "/dir_$folder$");
+      }
+      else if (i == 2) {
+        // test the end slash file marker
+        store.storeEmptyFile(base + "/");
+        store.storeEmptyFile(base + "/dir/");
+      }
+      else if (i == 3) {
+        // test both markers
+        store.storeEmptyFile(base + "_$folder$");
+        store.storeEmptyFile(base + "/dir_$folder$");
+        store.storeEmptyFile(base + "/");
+        store.storeEmptyFile(base + "/dir/");
+      }
+
+      assertTrue(fs.getFileStatus(path).isDir());
+      assertEquals(2, fs.listStatus(path).length);
+    }
+  }
+
+  public void testDeleteWithNoMarker() throws Exception {
+    String base = "test/hadoop";
+    Path path = path("/" + base);
+
+    createTestFiles(base);
+
+    fs.delete(path, true);
+
+    path = path("/test");
+    assertTrue(fs.getFileStatus(path).isDir());
+    assertEquals(0, fs.listStatus(path).length);
+  }
+
+  public void testRenameWithNoMarker() throws Exception {
+    String base = "test/hadoop";
+    Path dest = path("/test/hadoop2");
+
+    createTestFiles(base);
+
+    fs.rename(path("/" + base), dest);
+
+    Path path = path("/test");
+    assertTrue(fs.getFileStatus(path).isDir());
+    assertEquals(1, fs.listStatus(path).length);
+    assertTrue(fs.getFileStatus(dest).isDir());
+    assertEquals(2, fs.listStatus(dest).length);
+  }
+
+  public void testEmptyFile() throws Exception {
+    store.storeEmptyFile("test/hadoop/file1");
+    fs.open(path("/test/hadoop/file1")).close();
+  }
+  
+  public void testBlockSize() throws Exception {
+    Path file = path("/test/hadoop/file");
+    createFile(file);
+    assertEquals("Default block size", fs.getDefaultBlockSize(),
+    fs.getFileStatus(file).getBlockSize());
+
+    // Block size is determined at read time
+    long newBlockSize = fs.getDefaultBlockSize() * 2;
+    fs.getConf().setLong("fs.s3n.block.size", newBlockSize);
+    assertEquals("Double default block size", newBlockSize,
+    fs.getFileStatus(file).getBlockSize());
+  }
+
 }

+ 234 - 0
src/test/core/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.util;
 
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileReader;
@@ -29,6 +30,7 @@ import java.util.Vector;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.util.Shell.ExitCodeException;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
@@ -231,4 +233,236 @@ public class TestProcfsBasedProcessTree extends TestCase {
     }
     return pid;
   }
+  
+  public static class ProcessStatInfo {
+    // sample stat in a single line : 3910 (gpm) S 1 3910 3910 0 -1 4194624 
+    // 83 0 0 0 0 0 0 0 16 0 1 0 7852 2408448 88 4294967295 134512640 
+    // 134590050 3220521392 3220520036 10975138 0 0 4096 134234626 
+    // 4294967295 0 0 17 1 0 0
+    String pid;
+    String name;
+    String ppid;
+    String pgrpId;
+    String session;
+    String vmem;
+    
+    public ProcessStatInfo(String[] statEntries) {
+      pid = statEntries[0];
+      name = statEntries[1];
+      ppid = statEntries[2];
+      pgrpId = statEntries[3];
+      session = statEntries[4];
+      vmem = statEntries[5];
+    }
+    
+    // construct a line that mimics the procfs stat file.
+    // all unused numerical entries are set to 0.
+    public String getStatLine() {
+      return String.format("%s (%s) S %s %s %s 0 0 0" +
+                      " 0 0 0 0 0 0 0 0 0 0 0 0 0 %s 0 0 0" +
+                      " 0 0 0 0 0 0 0 0" +
+                      " 0 0 0 0 0", 
+                      pid, name, ppid, pgrpId, session, vmem);
+    }
+  }
+  
+  /**
+   * A basic test that creates a few process directories and writes
+   * stat files. Verifies that the virtual memory is correctly  
+   * computed.
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  public void testVirtualMemoryForProcessTree() throws IOException {
+
+    // test processes
+    String[] pids = { "100", "200", "300", "400" };
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      setupPidDirs(procfsRootDir, pids);
+      
+      // create stat objects.
+      // assuming processes 100, 200, 300 are in tree and 400 is not.
+      ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+      procInfos[0] = new ProcessStatInfo(new String[] 
+                                  {"100", "proc1", "1", "100", "100", "100000"});
+      procInfos[1] = new ProcessStatInfo(new String[] 
+                                  {"200", "proc2", "100", "100", "100", "200000"});
+      procInfos[2] = new ProcessStatInfo(new String[] 
+                                  {"300", "proc3", "200", "100", "100", "300000"});
+      procInfos[3] = new ProcessStatInfo(new String[] 
+                                  {"400", "proc4", "1", "400", "400", "400000"});
+      
+      writeStatFiles(procfsRootDir, pids, procInfos);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree = 
+          new ProcfsBasedProcessTree("100", true, 100L, 
+                                  procfsRootDir.getAbsolutePath());
+      // build the process tree.
+      processTree.getProcessTree();
+      
+      // verify cumulative memory
+      assertEquals("Cumulative memory does not match", 
+              Long.parseLong("600000"), processTree.getCumulativeVmem());
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+  
+  /**
+   * Tests that cumulative memory is computed only for
+   * processes older than a given age.
+   * @throws IOException if there was a problem setting up the
+   *                      fake procfs directories or files.
+   */
+  public void testVMemForOlderProcesses() throws IOException {
+    // initial list of processes
+    String[] pids = { "100", "200", "300", "400" };
+    // create the fake procfs root directory. 
+    File procfsRootDir = new File(TEST_ROOT_DIR, "proc");
+
+    try {
+      setupProcfsRootDir(procfsRootDir);
+      setupPidDirs(procfsRootDir, pids);
+      
+      // create stat objects.
+      // assuming 100, 200 and 400 are in tree, 300 is not.
+      ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
+      procInfos[0] = new ProcessStatInfo(new String[] 
+                                  {"100", "proc1", "1", "100", "100", "100000"});
+      procInfos[1] = new ProcessStatInfo(new String[] 
+                                  {"200", "proc2", "100", "100", "100", "200000"});
+      procInfos[2] = new ProcessStatInfo(new String[] 
+                                  {"300", "proc3", "1", "300", "300", "300000"});
+      procInfos[3] = new ProcessStatInfo(new String[] 
+                                  {"400", "proc4", "100", "100", "100", "400000"});
+      
+      writeStatFiles(procfsRootDir, pids, procInfos);
+      
+      // crank up the process tree class.
+      ProcfsBasedProcessTree processTree = 
+          new ProcfsBasedProcessTree("100", true, 100L, 
+                                  procfsRootDir.getAbsolutePath());
+      // build the process tree.
+      processTree.getProcessTree();
+      
+      // verify cumulative memory
+      assertEquals("Cumulative memory does not match", 
+              Long.parseLong("700000"), processTree.getCumulativeVmem());
+      
+      // write one more process as child of 100.
+      String[] newPids = { "500" };
+      setupPidDirs(procfsRootDir, newPids);
+      
+      ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1];
+      newProcInfos[0] = new ProcessStatInfo(new String[]
+                             {"500", "proc5", "100", "100", "100", "500000"});
+      writeStatFiles(procfsRootDir, newPids, newProcInfos);
+      
+      // check vmem includes the new process.
+      processTree.getProcessTree();
+      assertEquals("Cumulative memory does not include new process",
+              Long.parseLong("1200000"), processTree.getCumulativeVmem());
+      
+      // however processes older than 1 iteration will retain the older value
+      assertEquals("Cumulative memory shouldn't have included new process",
+              Long.parseLong("700000"), processTree.getCumulativeVmem(1));
+      
+      // one more process
+      newPids = new String[]{ "600" };
+      setupPidDirs(procfsRootDir, newPids);
+      
+      newProcInfos = new ProcessStatInfo[1];
+      newProcInfos[0] = new ProcessStatInfo(new String[]
+                                     {"600", "proc6", "100", "100", "100", "600000"});
+      writeStatFiles(procfsRootDir, newPids, newProcInfos);
+
+      // refresh process tree
+      processTree.getProcessTree();
+      
+      // processes older than 2 iterations should be same as before.
+      assertEquals("Cumulative memory shouldn't have included new processes",
+          Long.parseLong("700000"), processTree.getCumulativeVmem(2));
+      
+      // processes older than 1 iteration should not include new process,
+      // but include process 500
+      assertEquals("Cumulative memory shouldn't have included new processes",
+          Long.parseLong("1200000"), processTree.getCumulativeVmem(1));
+      
+      // no processes older than 3 iterations, this should be 0
+      assertEquals("Getting non-zero vmem for processes older than 3 iterations",
+                    0L, processTree.getCumulativeVmem(3));
+    } finally {
+      FileUtil.fullyDelete(procfsRootDir);
+    }
+  }
+
+  /**
+   * Create a directory to mimic the procfs file system's root.
+   * @param procfsRootDir root directory to create.
+   * @throws IOException if could not delete the procfs root directory
+   */
+  public static void setupProcfsRootDir(File procfsRootDir) 
+                                        throws IOException { 
+    // cleanup any existing process root dir.
+    if (procfsRootDir.exists()) {
+      assertTrue(FileUtil.fullyDelete(procfsRootDir));  
+    }
+
+    // create afresh
+    assertTrue(procfsRootDir.mkdirs());
+  }
+
+  /**
+   * Create PID directories under the specified procfs root directory
+   * @param procfsRootDir root directory of procfs file system
+   * @param pids the PID directories to create.
+   * @throws IOException If PID dirs could not be created
+   */
+  public static void setupPidDirs(File procfsRootDir, String[] pids) 
+                      throws IOException {
+    for (String pid : pids) {
+      File pidDir = new File(procfsRootDir, pid);
+      pidDir.mkdir();
+      if (!pidDir.exists()) {
+        throw new IOException ("couldn't make process directory under " +
+            "fake procfs");
+      } else {
+        LOG.info("created pid dir");
+      }
+    }
+  }
+  
+  /**
+   * Write stat files under the specified pid directories with data
+   * setup in the corresponding ProcessStatInfo objects
+   * @param procfsRootDir root directory of procfs file system
+   * @param pids the PID directories under which to create the stat file
+   * @param procs corresponding ProcessStatInfo objects whose data should be
+   *              written to the stat files.
+   * @throws IOException if stat files could not be written
+   */
+  public static void writeStatFiles(File procfsRootDir, String[] pids, 
+                              ProcessStatInfo[] procs) throws IOException {
+    for (int i=0; i<pids.length; i++) {
+      File statFile = new File(new File(procfsRootDir, pids[i]), "stat");
+      BufferedWriter bw = null;
+      try {
+        FileWriter fw = new FileWriter(statFile);
+        bw = new BufferedWriter(fw);
+        bw.write(procs[i].getStatLine());
+        LOG.info("wrote stat file for " + pids[i] + 
+                  " with contents: " + procs[i].getStatLine());
+      } finally {
+        // not handling exception - will throw an error and fail the test.
+        if (bw != null) {
+          bw.close();
+        }
+      }
+    }
+  }
 }