Browse Source

HADOOP-3187. Quotas for namespace management. Contributed by Hairong Kuang.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@663485 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 years ago
parent
commit
65235b7e7e

+ 2 - 0
CHANGES.txt

@@ -139,6 +139,8 @@ Trunk (unreleased changes)
     HADOOP-1328. Implement user counters in streaming. (tomwhite via
     omalley)
 
+    HADOOP-3187. Quotas for namespace management. (Hairong Kuang via ddas)
+
   IMPROVEMENTS
    
     HADOOP-2928. Remove deprecated FileSystem.getContentLength().

+ 28 - 2
src/java/org/apache/hadoop/dfs/ClientProtocol.java

@@ -37,9 +37,10 @@ interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 34 : remove abandonFileInProgress(...)
+   * 35 : Quota-related RPCs are introduced: getQuota, clearQuota;
+   * Besides, getContentSummary also returns the quota of the directory.
    */
-  public static final long versionID = 34L;
+  public static final long versionID = 35L;
   
   ///////////////////////////////////////
   // File contents
@@ -93,6 +94,8 @@ interface ClientProtocol extends VersionedProtocol {
    * @throws AccessControlException if permission to create file is 
    * denied by the system. As usually on the client side the exception will 
    * be wrapped into {@link org.apache.hadoop.ipc.RemoteException}.
+   * @throws QuotaExceededException if the file creation violates 
+   *                                any quota restriction
    * @throws IOException if other errors occur.
    */
   public void create(String src, 
@@ -190,6 +193,8 @@ interface ClientProtocol extends VersionedProtocol {
    * @return true if successful, or false if the old name does not exist
    * or if the new name already belongs to the namespace.
    * @throws IOException if the new name is invalid.
+   * @throws QuotaExceededException if the rename would violate 
+   *                                any quota restriction
    */
   public boolean rename(String src, String dst) throws IOException;
 
@@ -227,6 +232,8 @@ interface ClientProtocol extends VersionedProtocol {
    * @throws {@link AccessControlException} if permission to create file is 
    * denied by the system. As usually on the client side the exception will 
    * be wraped into {@link org.apache.hadoop.ipc.RemoteException}.
+   * @throws QuotaExceededException if the operation would violate 
+   *                                any quota restriction.
    */
   public boolean mkdirs(String src, FsPermission masked) throws IOException;
 
@@ -410,6 +417,25 @@ interface ClientProtocol extends VersionedProtocol {
    */
   public ContentSummary getContentSummary(String path) throws IOException;
 
+  /**
+   * Set the quota for a directory.
+   * @param path  The string representation of the path to the directory
+   * @param quota The limit of the number of names in the tree rooted 
+   *              at the directory
+   * @throws FileNotFoundException if the path is a file or 
+   *                               does not exist 
+   * @throws QuotaExceededException if the directory size 
+   *                                is greater than the given quota
+   */
+  public void setQuota(String path, long quota) throws IOException;
+  
+  /**
+   * Remove the quota for a directory
+   * @param path The string representation of the path to the directory
+   * @throws FileNotFoundException if the path is not a directory
+   */
+  public void clearQuota(String path) throws IOException;
+  
   /**
    * Write all metadata for this file into persistent storage.
    * The file must be currently open for writing.

+ 135 - 11
src/java/org/apache/hadoop/dfs/DFSAdmin.java

@@ -18,11 +18,16 @@
 package org.apache.hadoop.dfs;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.dfs.DistributedFileSystem.DiskStatus;
 import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.shell.Command;
+import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.util.ToolRunner;
@@ -32,6 +37,106 @@ import org.apache.hadoop.util.ToolRunner;
  */
 public class DFSAdmin extends FsShell {
 
+  /**
+   * An abstract class for the execution of a file system command
+   */
+  abstract private static class DFSAdminCommand extends Command {
+    /** Constructor */
+    public DFSAdminCommand(FileSystem fs) {
+      super(fs);
+      if (!(fs instanceof DistributedFileSystem)) {
+        throw new IllegalArgumentException("FileSystem " + fs.getUri() + 
+            " is not a distributed file system");
+      }
+    }
+  }
+  
+  /** A class that supports command clearQuota */
+  private static class ClearQuotaCommand extends DFSAdminCommand {
+    private static final String NAME = "clrQuota";
+    private static final String USAGE = "-"+NAME+" <dirname>...<dirname>";
+    private static final String DESCRIPTION = USAGE + ": " +
+    "\tClear the quota for each directory <dirName>.\n" +
+    "\t\tBest effort for the directory. with fault reported if\n" +
+    "\t\t1. the directory does not exist or is a file, or\n" +
+    "\t\t2. user is not an administrator.\n" +
+    "\t\tIt does not fault if the directory has no quota.";
+    
+    /** Constructor */
+    ClearQuotaCommand(String[] args, int pos, FileSystem fs) {
+      super(fs);
+      CommandFormat c = new CommandFormat(NAME, 1, Integer.MAX_VALUE);
+      List<String> parameters = c.parse(args, pos);
+      this.args = parameters.toArray(new String[parameters.size()]);
+    }
+    
+    /** Check if a command is the clrQuota command
+     * 
+     * @param cmd A string representation of a command starting with "-"
+     * @return true if this is a clrQuota command; false otherwise
+     */
+    public static boolean matches(String cmd) {
+      return ("-"+NAME).equals(cmd); 
+    }
+
+    @Override
+    public String getCommandName() {
+      return NAME;
+    }
+
+    @Override
+    public void run(Path path) throws IOException {
+      ((DistributedFileSystem)fs).clearQuota(path);
+    }
+  }
+  
+  /** A class that supports command setQuota */
+  private static class SetQuotaCommand extends DFSAdminCommand {
+    private static final String NAME = "setQuota";
+    private static final String USAGE =
+      "-"+NAME+" <quota> <dirname>...<dirname>";
+    private static final String DESCRIPTION = 
+      "-setQuota <quota> <dirname>...<dirname>: " +
+      "\tSet the quota <quota> for each directory <dirName>.\n" + 
+      "\t\tThe directory quota is a long integer that puts a hard limit " +
+      "on the number of names in the directory tree\n" +
+      "\t\tBest effort for the directory, with faults reported if\n" +
+      "\t\t1. N is not a positive integer, or\n" +
+      "\t\t2. user is not an administrator, or\n" +
+      "\t\t3. the directory does not exist or is a file, or\n" +
+      "\t\t4. the directory would immediately exceed the new quota.";
+    
+    private final long quota; // the quota to be set
+    
+    /** Constructor */
+    SetQuotaCommand(String[] args, int pos, FileSystem fs) {
+      super(fs);
+      CommandFormat c = new CommandFormat(NAME, 2, Integer.MAX_VALUE);
+      List<String> parameters = c.parse(args, pos);
+      this.quota = Long.parseLong(parameters.remove(0));
+      this.args = parameters.toArray(new String[parameters.size()]);
+    }
+    
+    /** Check if a command is the setQuota command
+     * 
+     * @param cmd A string representation of a command starting with "-"
+     * @return true if this is a count command; false otherwise
+     */
+    public static boolean matches(String cmd) {
+      return ("-"+NAME).equals(cmd); 
+    }
+
+    @Override
+    public String getCommandName() {
+      return NAME;
+    }
+
+    @Override
+    public void run(Path path) throws IOException {
+      ((DistributedFileSystem)fs).setQuota(path, quota);
+    }
+  }
+  
   /**
    * Construct a DFSAdmin object.
    */
@@ -172,7 +277,10 @@ public class DFSAdmin extends FsShell {
     String summary = "hadoop dfsadmin is the command to execute DFS administrative commands.\n" +
       "The full syntax is: \n\n" +
       "hadoop dfsadmin [-report] [-safemode <enter | leave | get | wait>]\n" +
-      "\t[-refreshNodes] [-help [cmd]]\n";
+      "\t[-refreshNodes]\n" +
+      "\t[" + SetQuotaCommand.USAGE + "]\n" +
+      "\t[" + ClearQuotaCommand.USAGE +"]\n" +
+      "\t[-help [cmd]]\n";
 
     String report ="-report: \tReports basic filesystem information and statistics.\n";
         
@@ -222,6 +330,10 @@ public class DFSAdmin extends FsShell {
       System.out.println(upgradeProgress);
     } else if ("metasave".equals(cmd)) {
       System.out.println(metaSave);
+    } else if (SetQuotaCommand.matches(cmd)) {
+      System.out.println(SetQuotaCommand.DESCRIPTION);
+    } else if (ClearQuotaCommand.matches(cmd)) {
+      System.out.println(ClearQuotaCommand.DESCRIPTION);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -232,6 +344,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(finalizeUpgrade);
       System.out.println(upgradeProgress);
       System.out.println(metaSave);
+      System.out.println(SetQuotaCommand.DESCRIPTION);
+      System.out.println(ClearQuotaCommand.DESCRIPTION);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -319,7 +433,7 @@ public class DFSAdmin extends FsShell {
    * Displays format of commands.
    * @param cmd The command that is being executed.
    */
-  public void printUsage(String cmd) {
+  private static void printUsage(String cmd) {
     if ("-report".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-report]");
@@ -338,6 +452,12 @@ public class DFSAdmin extends FsShell {
     } else if ("-metasave".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-metasave filename]");
+    } else if (SetQuotaCommand.matches(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [" + SetQuotaCommand.USAGE+"]");
+    } else if (ClearQuotaCommand.matches(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " ["+ClearQuotaCommand.USAGE+"]");
     } else {
       System.err.println("Usage: java DFSAdmin");
       System.err.println("           [-report]");
@@ -346,6 +466,8 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-finalizeUpgrade]");
       System.err.println("           [-upgradeProgress status | details | force]");
       System.err.println("           [-metasave filename]");
+      System.err.println("           ["+SetQuotaCommand.USAGE+"]");
+      System.err.println("           ["+ClearQuotaCommand.USAGE+"]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -403,8 +525,7 @@ public class DFSAdmin extends FsShell {
         return exitCode;
       }
     }
-
-
+    
     // initialize DFSAdmin
     try {
       init();
@@ -431,6 +552,10 @@ public class DFSAdmin extends FsShell {
         exitCode = upgradeProgress(argv, i);
       } else if ("-metasave".equals(cmd)) {
         exitCode = metaSave(argv, i);
+      } else if (ClearQuotaCommand.matches(cmd)) {
+        exitCode = new ClearQuotaCommand(argv, i, fs).runAll();
+      } else if (SetQuotaCommand.matches(cmd)) {
+        exitCode = new SetQuotaCommand(argv, i, fs).runAll();
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);
@@ -442,6 +567,10 @@ public class DFSAdmin extends FsShell {
         System.err.println(cmd.substring(1) + ": Unknown command");
         printUsage("");
       }
+    } catch (IllegalArgumentException arge) {
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
+      printUsage(cmd);
     } catch (RemoteException e) {
       //
       // This is a error returned by hadoop server. Print
@@ -456,16 +585,11 @@ public class DFSAdmin extends FsShell {
         System.err.println(cmd.substring(1) + ": "
                            + ex.getLocalizedMessage());
       }
-    } catch (IOException e) {
-      //
-      // IO exception encountered locally.
-      //
+    } catch (Exception e) {
       exitCode = -1;
       System.err.println(cmd.substring(1) + ": "
                          + e.getLocalizedMessage());
-    } finally {
-      fs.close();
-    }
+    } 
     return exitCode;
   }
 

+ 39 - 3
src/java/org/apache/hadoop/dfs/DFSClient.java

@@ -505,7 +505,8 @@ class DFSClient implements FSConstants {
     try {
       return namenode.rename(src, dst);
     } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class);
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     QuotaExceededException.class);
     }
   }
 
@@ -702,7 +703,8 @@ class DFSClient implements FSConstants {
     try {
       return namenode.mkdirs(src, masked);
     } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class);
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     QuotaExceededException.class);
     }
   }
 
@@ -715,6 +717,39 @@ class DFSClient implements FSConstants {
     }
   }
 
+  /**
+   * Remove the quota for a directory
+   * @param path The string representation of the path to the directory
+   * @throws FileNotFoundException if the path is not a directory
+   */
+  void clearQuota(String src) throws IOException {
+    try {
+      namenode.clearQuota(src);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class);
+    }
+  }
+  
+  /**
+   * Set the quota for a directory.
+   * @param path  The string representation of the path to the directory
+   * @param quota The limit of the number of names in the tree rooted 
+   *              at the directory
+   * @throws FileNotFoundException if the path is a file or 
+   *                               does not exist 
+   * @throws QuotaExceededException if the directory size 
+   *                                is greater than the given quota
+   */
+  void setQuota(String src, long quota) throws IOException {
+    try {
+      namenode.setQuota(src, quota);
+    } catch(RemoteException re) {
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     FileNotFoundException.class,
+                                     QuotaExceededException.class);
+    }
+  }
   /**
    * Pick the best node from which to stream the data.
    * Entries in <i>nodes</i> are already in the priority order
@@ -2256,7 +2291,8 @@ class DFSClient implements FSConstants {
         namenode.create(
             src, masked, clientName, overwrite, replication, blockSize);
       } catch(RemoteException re) {
-        throw re.unwrapRemoteException(AccessControlException.class);
+        throw re.unwrapRemoteException(AccessControlException.class,
+                                       QuotaExceededException.class);
       }
       streamer = new DataStreamer();
       streamer.setDaemon(true);

+ 18 - 0
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -182,6 +182,24 @@ public class DistributedFileSystem extends FileSystem {
     return dfs.getContentSummary(getPathName(f));
   }
 
+  /** Clear a directory's quota
+   * @see ClientProtocol#clearQuota(String)
+   */
+  void clearQuota(Path src) throws IOException {
+    dfs.clearQuota(getPathName(src));
+  }
+  
+  /** Set a directory's quota
+   * @see ClientProtocol#setQuota(String, long) 
+   */
+  void setQuota(Path src, long quota) throws IOException {
+    if (quota <= 0) {
+      throw new IllegalArgumentException("Quota should be a positive number: "
+          + quota);
+    }
+    dfs.setQuota(getPathName(src), quota);
+  }
+  
   public FileStatus[] listStatus(Path p) throws IOException {
     DFSFileInfo[] infos = dfs.listPaths(getPathName(p));
     if (infos == null) return null;

+ 2 - 2
src/java/org/apache/hadoop/dfs/FSConstants.java

@@ -189,7 +189,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -15;
+  public static final int LAYOUT_VERSION = -16;
   // Current version: 
-  // Store generation stamp with each Block
+  // Change edit log and fsimage to support quotas
 }

+ 369 - 89
src/java/org/apache/hadoop/dfs/FSDirectory.java

@@ -41,14 +41,12 @@ import org.apache.hadoop.dfs.BlocksMap.BlockInfo;
 class FSDirectory implements FSConstants {
 
   FSNamesystem namesystem = null;
-  final INodeDirectory rootDir;
+  final INodeDirectoryWithQuota rootDir;
   FSImage fsImage;  
   boolean ready = false;
   // Metrics record
   private MetricsRecord directoryMetrics = null;
 
-  volatile private long totalInodes = 1;   // number of inodes, for rootdir
-    
   /** Access an existing dfs name directory. */
   public FSDirectory(FSNamesystem ns, Configuration conf) throws IOException {
     this(new FSImage(), ns, conf);
@@ -56,8 +54,9 @@ class FSDirectory implements FSConstants {
   }
 
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
-    rootDir = new INodeDirectory(INodeDirectory.ROOT_NAME,
-        ns.createFsOwnerPermissions(new FsPermission((short)0755)));
+    rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
+        ns.createFsOwnerPermissions(new FsPermission((short)0755)),
+        Integer.MAX_VALUE);
     this.fsImage = fsImage;
     namesystem = ns;
     initialize(conf);
@@ -149,14 +148,7 @@ class FSDirectory implements FSConstants {
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
     synchronized (rootDir) {
-      try {
-        newNode = rootDir.addNode(path, newNode);
-      } catch (FileNotFoundException e) {
-        newNode = null;
-      }
-      if (newNode != null) {
-        totalInodes++;
-      }
+      newNode = addNode(path, newNode, false);
     }
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
@@ -188,7 +180,7 @@ class FSDirectory implements FSConstants {
                               modificationTime, preferredBlockSize);
     synchronized (rootDir) {
       try {
-        newNode = rootDir.addNode(path, newNode);
+        newNode = addNode(path, newNode, false);
         if(newNode != null && blocks != null) {
           int nrBlocks = blocks.length;
           // Add file->block mapping
@@ -197,12 +189,9 @@ class FSDirectory implements FSConstants {
             newF.setBlock(i, namesystem.blocksMap.addINode(blocks[i], newF));
           }
         }
-      } catch (FileNotFoundException e) {
+      } catch (IOException e) {
         return null;
       }
-      if (newNode != null) {
-        totalInodes++;
-      }
       return newNode;
     }
   }
@@ -213,12 +202,18 @@ class FSDirectory implements FSConstants {
                               Block[] blocks, 
                               short replication,
                               long modificationTime,
+                              long quota,
                               long preferredBlockSize) {
     // create new inode
     INode newNode;
-    if (blocks == null)
-      newNode = new INodeDirectory(permissions, modificationTime);
-    else 
+    if (blocks == null) {
+      if (quota >= 0) {
+        newNode = new INodeDirectoryWithQuota(
+            permissions, modificationTime, quota);
+      } else {
+        newNode = new INodeDirectory(permissions, modificationTime);
+      }
+    } else 
       newNode = new INodeFile(permissions, blocks.length, replication,
                               modificationTime, preferredBlockSize);
     // add new node to the parent
@@ -231,7 +226,6 @@ class FSDirectory implements FSConstants {
       }
       if(newParent == null)
         return null;
-      totalInodes++;
       if(blocks != null) {
         int nrBlocks = blocks.length;
         // Add file->block mapping
@@ -321,9 +315,10 @@ class FSDirectory implements FSConstants {
   }
 
   /**
-   * Change the filename
+   * @see #unprotectedRenameTo(String, String, long)
    */
-  public boolean renameTo(String src, String dst) {
+  public boolean renameTo(String src, String dst)
+  throws QuotaExceededException {
     if (NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: "
                                   +src+" to "+dst);
@@ -336,47 +331,90 @@ class FSDirectory implements FSConstants {
     return true;
   }
 
-  /**
+  /** Change a path name
+   * 
+   * @param src source path
+   * @param dst destination path
+   * @return true if rename succeeds; false otherwise
+   * @throws QuotaExceededException if the operation violates any quota limit
    */
-  boolean unprotectedRenameTo(String src, String dst, long timestamp) {
-    synchronized(rootDir) {
-      INode renamedNode = rootDir.getNode(src);
-      if (renamedNode == null) {
+  boolean unprotectedRenameTo(String src, String dst, long timestamp) 
+  throws QuotaExceededException {
+    byte[][] srcComponents = INode.getPathComponents(src);
+    INode[] srcInodes = new INode[srcComponents.length];
+    synchronized (rootDir) {
+      rootDir.getExistingPathINodes(srcComponents, srcInodes);
+
+      // check the validation of the source
+      if (srcInodes[srcInodes.length-1] == null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                      +"failed to rename "+src+" to "+dst+ " because source does not exist");
         return false;
+      } else if (srcInodes.length == 1) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            +"failed to rename "+src+" to "+dst+ " because source is the root");
+        return false;
       }
       if (isDir(dst)) {
         dst += Path.SEPARATOR + new Path(src).getName();
       }
-      if (rootDir.getNode(dst) != null) {
+      
+      byte[][] dstComponents = INode.getPathComponents(dst);
+      INode[] dstInodes = new INode[dstComponents.length];
+      rootDir.getExistingPathINodes(dstComponents, dstInodes);
+      
+      // check the existence of the destination
+      if (dstInodes[dstInodes.length-1] != null) {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
                                      +"failed to rename "+src+" to "+dst+ " because destination exists");
         return false;
       }
-      INodeDirectory oldParent = renamedNode.getParent();
-      oldParent.removeChild(renamedNode);
-            
-      // the renamed node can be reused now
+      
+      // remove source
+      INode srcChild = null;
       try {
-        if (rootDir.addNode(dst, renamedNode) != null) {
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
-                                        +src+" is renamed to "+dst);
-
-          // update modification time of old parent as well as new parent dir
-          oldParent.setModificationTime(timestamp);
-          renamedNode.getParent().setModificationTime(timestamp);
-          return true;
-        }
-      } catch (FileNotFoundException e) {
+        srcChild = removeChild(srcInodes, srcInodes.length-1);
+      } catch (IOException e) {
+        // srcChild == null; go to next if statement
+      }
+      if (srcChild == null) {
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
+            +"failed to rename "+src+" to "+dst+ " because the source can not be removed");
+        return false;
+      }
+
+      // add to the destination
+      INode dstChild = null;
+      QuotaExceededException failureByQuota = null;
+      try {
+        // set the destination's name
+        srcChild.setLocalName(dstComponents[dstInodes.length-1]);
+        // add it to the namespace
+        dstChild = addChild(dstInodes, dstInodes.length-1, srcChild, false);
+      } catch (QuotaExceededException qe) {
+        failureByQuota = qe;
+      }
+      if (dstChild != null) {
+        NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedRenameTo: "
+            +src+" is renamed to "+dst);
+
+        // update modification time of dst and the parent of src
+        srcInodes[srcInodes.length-2].setModificationTime(timestamp);
+        dstInodes[dstInodes.length-2].setModificationTime(timestamp);
+        return true;
+      } else {
         NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedRenameTo: "
-                                     +"failed to rename "+src+" to "+dst);
+            +"failed to rename "+src+" to "+dst);
         try {
-          rootDir.addNode(src, renamedNode); // put it back
-        }catch(FileNotFoundException e2) {                
+          // put it back
+          addChild(srcInodes, srcInodes.length-1, srcChild, false);
+        } catch (IOException ignored) {}
+        if (failureByQuota != null) {
+          throw failureByQuota;
+        } else {
+          return false;
         }
       }
-      return false;
     }
   }
 
@@ -496,7 +534,7 @@ class FSDirectory implements FSConstants {
                                   +src);
     waitForReady();
     long now = FSNamesystem.now();
-    INode deletedNode = unprotectedDelete(src, now, deletedBlocks); 
+    INode deletedNode = unprotectedDelete(src, now, deletedBlocks);
     if (deletedNode != null) {
       fsImage.getEditLog().logDelete(src, now);
     }
@@ -520,42 +558,58 @@ class FSDirectory implements FSConstants {
   }
   
   /**
-   */
+   * Delete a path from the name space
+   * Update the count at each ancestor directory with quota
+   * @param src a string representation of a path to an inode
+   * @param modificationTime the time the inode is removed
+   * @param deletedBlocks the place holder for the blocks to be removed
+   * @return if the deletion succeeds
+   */ 
   INode unprotectedDelete(String src, long modificationTime, 
                           Collection<Block> deletedBlocks) {
+    src = normalizePath(src);
+    String[] names = INode.getPathNames(src);
+    byte[][] components = INode.getPathComponents(names);
+    INode[] inodes = new INode[components.length];
+
     synchronized (rootDir) {
-      INode targetNode = rootDir.getNode(src);
-      if (targetNode == null) {
+      rootDir.getExistingPathINodes(components, inodes);
+      INode targetNode = inodes[inodes.length-1];
+
+      if (targetNode == null) { // non-existent src
         NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-                                     +"failed to remove "+src+" because it does not exist");
+            +"failed to remove "+src+" because it does not exist");
+        return null;
+      } else if (inodes.length == 1) { // src is the root
+        NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
+            "failed to remove " + src +
+            " because the root is not allowed to be deleted");
         return null;
       } else {
-        //
-        // Remove the node from the namespace and GC all
-        // the blocks underneath the node.
-        //
-        if (targetNode.getParent() == null) {
-          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: "
-                                       +"failed to remove "+src+" because it does not have a parent");
-          return null;
-        } else {
-          targetNode.getParent().setModificationTime(modificationTime);
-          targetNode.removeNode();
-          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
-                                        +src+" is removed");
+        try {
+          // Remove the node from the namespace
+          removeChild(inodes, inodes.length-1);
+          // set the parent's modification time
+          inodes[inodes.length-2].setModificationTime(modificationTime);
+          // GC all the blocks underneath the node.
           ArrayList<Block> v = new ArrayList<Block>();
           int filesRemoved = targetNode.collectSubtreeBlocksAndClear(v);
           incrDeletedFileCount(filesRemoved);
-          totalInodes -= filesRemoved;
           for (Block b : v) {
             namesystem.blocksMap.removeINode(b);
-            // If block is removed from blocksMap remove it from corruptReplicasMap
+            // remove the block from corruptReplicasMap
             namesystem.corruptReplicas.removeFromCorruptReplicasMap(b);
             if (deletedBlocks != null) {
               deletedBlocks.add(b);
             }
           }
+          NameNode.stateChangeLog.debug("DIR* FSDirectory.unprotectedDelete: "
+              +src+" is removed");
           return targetNode;
+        } catch (IOException e) {
+          NameNode.stateChangeLog.warn("DIR* FSDirectory.unprotectedDelete: " +
+              "failed to remove " + src + " because " + e.getMessage());
+          return null;
         }
       }
     }
@@ -683,20 +737,79 @@ class FSDirectory implements FSConstants {
     }
   }
 
+  /** update count of each inode with quota
+   * 
+   * @param inodes an array of inodes on a path
+   * @param numOfINodes the number of inodes to update starting from index 0
+   * @param deltaCount the delta change of the count
+   * @throws QuotaExceededException if the new count violates any quota limit
+   */
+  private static void updateCount(
+      INode[] inodes, int numOfINodes, long deltaCount )
+  throws QuotaExceededException {
+    if (numOfINodes>inodes.length) {
+      numOfINodes = inodes.length;
+    }
+    // check existing components in the path  
+    List<INodeDirectoryWithQuota> inodesWithQuota = 
+      new ArrayList<INodeDirectoryWithQuota>(numOfINodes);
+    int i=0;
+    try {
+      for(; i < numOfINodes; i++) {
+        if (inodes[i].getQuota() >= 0) { // a directory with quota
+          INodeDirectoryWithQuota quotaINode =(INodeDirectoryWithQuota)inodes[i]; 
+          quotaINode.updateNumItemsInTree(deltaCount);
+          inodesWithQuota.add(quotaINode);
+        }
+      }
+    } catch (QuotaExceededException e) {
+      for (INodeDirectoryWithQuota quotaINode:inodesWithQuota) {
+        try {
+          quotaINode.updateNumItemsInTree(-deltaCount);
+        } catch (IOException ingored) {
+        }
+      }
+      e.setPathName(getFullPathName(inodes, i));
+      throw e;
+    }
+  }
+  
+  /** Return the name of the path represented by inodes at [0, pos] */
+  private static String getFullPathName(INode[] inodes, int pos) {
+    StringBuilder fullPathName = new StringBuilder();
+    for (int i=1; i<=pos; i++) {
+      fullPathName.append(Path.SEPARATOR_CHAR).append(inodes[i].getLocalName());
+    }
+    return fullPathName.toString();
+  }
+  
   /**
-   * Create directory entries for every item
+   * Create a directory 
+   * If ancestor directories do not exist, automatically create them.
+
+   * @param src string representation of the path to the directory
+   * @param permissions the permission of the directory
+   * @param inheritPermission if the permission of the directory should inherit
+   *                          from its parent or not. The automatically created
+   *                          ones always inherit its permission from its parent
+   * @param now creation time
+   * @return true if the operation succeeds false otherwise
+   * @throws FileNotFoundException if an ancestor or itself is a file
+   * @throws QuotaExceededException if directory creation violates 
+   *                                any quota limit
    */
   boolean mkdirs(String src, PermissionStatus permissions,
-      boolean inheritPermission, long now) throws IOException {
+      boolean inheritPermission, long now)
+      throws FileNotFoundException, QuotaExceededException {
     src = normalizePath(src);
     String[] names = INode.getPathNames(src);
     byte[][] components = INode.getPathComponents(names);
+    INode[] inodes = new INode[components.length];
 
     synchronized(rootDir) {
-      INode[] inodes = new INode[components.length];
       rootDir.getExistingPathINodes(components, inodes);
 
-      // find the index of the first null in inodes[]  
+      // find the index of the first null in inodes[]
       StringBuilder pathbuilder = new StringBuilder();
       int i = 1;
       for(; i < inodes.length && inodes[i] != null; i++) {
@@ -711,17 +824,14 @@ class FSDirectory implements FSConstants {
       for(; i < inodes.length; i++) {
         pathbuilder.append(Path.SEPARATOR + names[i]);
         String cur = pathbuilder.toString();
-  
-        inodes[i] = new INodeDirectory(permissions, now);
-        inodes[i].name = components[i];
-        INode inserted = ((INodeDirectory)inodes[i-1]).addChild(
-            inodes[i], inheritPermission || i != inodes.length-1);
-
-        assert inserted == inodes[i];
-        totalInodes++;
+        unprotectedMkdir(inodes, i, components[i], permissions,
+            inheritPermission || i != components.length-1, now);
+        if (inodes[i] == null) {
+          return false;
+        }
+        fsImage.getEditLog().logMkDir(cur, inodes[i]);
         NameNode.stateChangeLog.debug(
             "DIR* FSDirectory.mkdirs: created directory " + cur);
-        fsImage.getEditLog().logMkDir(cur, inserted);
       }
     }
     return true;
@@ -730,17 +840,74 @@ class FSDirectory implements FSConstants {
   /**
    */
   INode unprotectedMkdir(String src, PermissionStatus permissions,
-                          long timestamp) throws FileNotFoundException {
+                          long timestamp) throws QuotaExceededException {
+    byte[][] components = INode.getPathComponents(src);
+    INode[] inodes = new INode[components.length];
     synchronized (rootDir) {
-      INode newNode = rootDir.addNode(src,
-                                new INodeDirectory(permissions, timestamp));
-      if (newNode != null) {
-        totalInodes++;
-      }
-      return newNode;
+      rootDir.getExistingPathINodes(components, inodes);
+      unprotectedMkdir(inodes, inodes.length-1, components[inodes.length-1],
+          permissions, false, timestamp);
+      return inodes[inodes.length-1];
     }
   }
 
+  /** create a directory at index pos.
+   * The parent path to the directory is at [0, pos-1].
+   * All ancestors exist. Newly created one stored at index pos.
+   */
+  private void unprotectedMkdir(INode[] inodes, int pos,
+      byte[] name, PermissionStatus permission, boolean inheritPermission,
+      long timestamp) throws QuotaExceededException {
+    inodes[pos] = addChild(inodes, pos, 
+        new INodeDirectory(name, permission, timestamp),
+        inheritPermission );
+  }
+  
+  /** Add a node child to the namespace. The full path name of the node is src. 
+   * QuotaExceededException is thrown if it violates quota limit */
+  private <T extends INode> T addNode(String src, T child, 
+      boolean inheritPermission) 
+  throws QuotaExceededException {
+    byte[][] components = INode.getPathComponents(src);
+    child.setLocalName(components[components.length-1]);
+    INode[] inodes = new INode[components.length];
+    synchronized (rootDir) {
+      rootDir.getExistingPathINodes(components, inodes);
+      return addChild(inodes, inodes.length-1, child, inheritPermission);
+    }
+  }
+  
+  /** Add a node child to the inodes at index pos. 
+   * Its ancestors are stored at [0, pos-1]. 
+   * QuotaExceededException is thrown if it violates quota limit */
+  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
+      boolean inheritPermission) throws QuotaExceededException {
+    long childSize = child.numItemsInTree();
+    updateCount(pathComponents, pos, childSize);
+    T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
+        child, inheritPermission);
+    if (addedNode == null) {
+      updateCount(pathComponents, pos, -childSize);
+    }
+    return addedNode;
+  }
+  
+  /** Remove an inode at index pos from the namespace.
+   * Its ancestors are stored at [0, pos-1].
+   * Count of each ancestor with quota is also updated.
+   * Return the removed node; null if the removal fails.
+   */
+  private INode removeChild(INode[] pathComponents, int pos)
+  throws QuotaExceededException {
+    INode removedNode = 
+      ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
+    if (removedNode != null) {
+      updateCount(pathComponents, pos, 
+          -removedNode.numItemsInTree());
+    }
+    return removedNode;
+  }
+  
   /**
    */
   String normalizePath(String src) {
@@ -763,9 +930,122 @@ class FSDirectory implements FSConstants {
     }
   }
 
+  /** Update the count of each directory with quota in the namespace
+   * A directory's count is defined as the total number inodes in the tree
+   * rooted at the directory.
+   * 
+   * @throws QuotaExceededException if the count update violates 
+   *                                any quota limitation
+   */
+  void updateCountForINodeWithQuota() throws QuotaExceededException {
+    updateCountForINodeWithQuota(rootDir);
+  }
+  
+  /** Update the count of the directory if it has a quota and return the count
+   * 
+   * @param node the root of the tree that represents the directory
+   * @return the size of the tree
+   * @throws QuotaExceededException if the count is greater than its quota
+   */
+  private static long updateCountForINodeWithQuota(INode node) throws QuotaExceededException {
+    long count = 1L;
+    if (node.isDirectory()) {
+      INodeDirectory dNode = (INodeDirectory)node;
+      for (INode child : dNode.getChildren()) {
+        count += updateCountForINodeWithQuota(child);
+      }
+      if (dNode.getQuota()>=0) {
+        ((INodeDirectoryWithQuota)dNode).setCount(count);
+      }
+    }
+    return count;
+  }
+  
+  /**
+   * Set the quota for a directory.
+   * @param path The string representation of the path to the directory
+   * @param quota The limit of the number of names in or below the directory
+   * @throws FileNotFoundException if the path does not exist or is a file
+   * @throws QuotaExceededException if the directory tree size is 
+   *                                greater than the given quota
+   */
+  void unprotectedSetQuota(String src, long quota)
+  throws FileNotFoundException, QuotaExceededException {
+    String srcs = normalizePath(src);
+    byte[][] components = INode.getPathComponents(src);
+    INode[] inodes = new INode[components.length==1?1:2];
+    synchronized (rootDir) {
+      rootDir.getExistingPathINodes(components, inodes);
+      INode targetNode = inodes[inodes.length-1];
+      if (targetNode == null || !targetNode.isDirectory()) {
+        throw new FileNotFoundException("Directory does not exist: " + srcs);
+      } else { // a directory inode
+        INodeDirectory dirNode = (INodeDirectory)targetNode;
+        if (dirNode instanceof INodeDirectoryWithQuota) { 
+          // a directory with quota; so set the quota to the new value
+          ((INodeDirectoryWithQuota)dirNode).setQuota(quota);
+        } else {
+          // a non-quota directory; so replace it with a directory with quota
+          INodeDirectoryWithQuota newNode = 
+            new INodeDirectoryWithQuota(quota, dirNode);
+          // non-root directory node; parent != null
+          assert inodes.length==2;
+          INodeDirectory parent = (INodeDirectory)inodes[0];
+          parent.replaceChild(newNode);
+        }
+      }
+    }
+  }
+  
+  /**
+   * @see #unprotectedSetQuota(String, long)
+   */
+  void setQuota(String src, long quota) 
+  throws FileNotFoundException, QuotaExceededException {
+    unprotectedSetQuota(src, quota);
+    fsImage.getEditLog().logSetQuota(src, quota);
+  }
+  
+  /**
+   * Remove the quota for a directory
+   * @param src The string representation of the path to the directory
+   * @throws FileNotFoundException if the path does not exist or it is a file
+   */
+  void unprotectedClearQuota(String src) throws IOException {
+    String srcs = normalizePath(src);
+    byte[][] components = INode.getPathComponents(src);
+    INode[] inodes = new INode[components.length==1?1:2];
+    synchronized (rootDir) {
+      rootDir.getExistingPathINodes(components, inodes);
+      INode targetNode = inodes[inodes.length-1];
+      if (targetNode == null || !targetNode.isDirectory()) {
+        throw new FileNotFoundException("Directory does not exist: " + srcs);
+      } else if (targetNode instanceof INodeDirectoryWithQuota) {
+        // a directory inode with quota
+        // replace the directory with quota with a non-quota one
+        INodeDirectoryWithQuota dirNode = (INodeDirectoryWithQuota)targetNode;
+        INodeDirectory newNode = new INodeDirectory(dirNode);
+        if (dirNode == rootDir) { // root
+          throw new IOException("Can't clear the root's quota");
+        } else { // non-root directory node; parent != null
+          INodeDirectory parent = (INodeDirectory)inodes[0];
+          parent.replaceChild(newNode);
+        }
+      }
+    }
+  }
+  
+  /**
+   * @see #unprotectedClearQuota(String)
+   */
+  void clearQuota(String src) throws IOException {
+    unprotectedClearQuota(src);
+    fsImage.getEditLog().logClearQuota(src);
+  }
+  
   long totalInodes() {
     synchronized (rootDir) {
-      return totalInodes;
+      return rootDir.numItemsInTree();
     }
   }
 }

+ 47 - 2
src/java/org/apache/hadoop/dfs/FSEditLog.java

@@ -28,8 +28,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.DataInput;
-import java.io.DataOutput;
 import java.util.ArrayList;
 import java.lang.Math;
 import java.nio.channels.FileChannel;
@@ -54,6 +52,8 @@ class FSEditLog {
   private static final byte OP_SET_OWNER = 8;
   private static final byte OP_CLOSE = 9;    // close after write
   private static final byte OP_SET_GENSTAMP = 10;    // store genstamp
+  private static final byte OP_SET_QUOTA = 11; // set a directory's quota
+  private static final byte OP_CLEAR_QUOTA = 12; // clear a directory's quota
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -632,6 +632,23 @@ class FSEditLog {
                 FSImage.readString(in), FSImage.readString(in));
             break;
           }
+          case OP_SET_QUOTA: {
+            if (logVersion > -16) {
+              throw new IOException("Unexpected opcode " + opcode
+                  + " for version " + logVersion);
+            }
+            fsDir.unprotectedSetQuota(FSImage.readString(in), 
+                readLongWritable(in) );
+            break;
+          }
+          case OP_CLEAR_QUOTA: {
+            if (logVersion > -16) {
+              throw new IOException("Unexpected opcode " + opcode
+                  + " for version " + logVersion);
+            }
+            fsDir.unprotectedClearQuota(FSImage.readString(in));
+            break;
+          }
           default: {
             throw new IOException("Never seen opcode " + opcode);
           }
@@ -660,6 +677,17 @@ class FSEditLog {
     return numEdits;
   }
 
+  // a place holder for reading a long
+  private static final LongWritable longWritable = new LongWritable();
+
+  /** Read an integer from an input stream */
+  private static long readLongWritable(DataInputStream in) throws IOException {
+    synchronized (longWritable) {
+      longWritable.readFields(in);
+      return longWritable.get();
+    }
+  }
+  
   static short adjustReplication(short replication) {
     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
     short minReplication = fsNamesys.getMinReplication();
@@ -874,6 +902,23 @@ class FSEditLog {
             FSEditLog.toLogReplication(replication));
   }
   
+  /** Add set quota record to edit log
+   * 
+   * @param src the string representation of the path to a directory
+   * @param quota the directory size limit
+   */
+  void logSetQuota(String src, long quota) {
+    logEdit(OP_SET_QUOTA, new UTF8(src), new LongWritable(quota));
+  }
+
+  /** Add clear quota record to edit log
+   * 
+   * @param src the string representation of the path to a directory
+   */
+  void logClearQuota(String src) {
+    logEdit(OP_CLEAR_QUOTA, new UTF8(src));
+  }
+  
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
     logEdit(OP_SET_PERMISSIONS, new UTF8(src), permissions);

+ 22 - 7
src/java/org/apache/hadoop/dfs/FSImage.java

@@ -709,8 +709,12 @@ class FSImage extends Storage {
       this.namespaceID = in.readInt();
 
       // read number of files
-      int numFiles = 0;
-      numFiles = in.readInt();
+      long numFiles;
+      if (imgVersion <= -16) {
+        numFiles = in.readLong();
+      } else {
+        numFiles = in.readInt();
+      }
 
       this.layoutVersion = imgVersion;
       // read in the last generation stamp.
@@ -729,7 +733,7 @@ class FSImage extends Storage {
       String path;
       String parentPath = "";
       INodeDirectory parentINode = fsDir.rootDir;
-      for (int i = 0; i < numFiles; i++) {
+      for (long i = 0; i < numFiles; i++) {
         long modificationTime = 0;
         long blockSize = 0;
         path = readString(in);
@@ -769,13 +773,20 @@ class FSImage extends Storage {
             blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
           }
         }
+        
+        // get quota only when the node is a directory
+        long quota = -1L;
+        if (imgVersion <= -16 && blocks == null) {
+          quota = in.readLong();
+        }
+        
         PermissionStatus permissions = fsNamesys.getUpgradePermission();
         if (imgVersion <= -11) {
           permissions = PermissionStatus.read(in);
         }
-        // check if this is a root node
-        if (path.length() == 0) {
+        if (path.length() == 0) { // it is the root
           // update the root's attributes
+          fsDir.rootDir.setQuota(quota);
           fsDir.rootDir.setModificationTime(modificationTime);
           fsDir.rootDir.setPermissionStatus(permissions);
           continue;
@@ -787,7 +798,7 @@ class FSImage extends Storage {
         }
         // add new inode
         parentINode = fsDir.addToParent(path, parentINode, permissions,
-            blocks, replication, modificationTime, blockSize);
+            blocks, replication, modificationTime, quota, blockSize);
       }
       
       // load datanode info
@@ -795,6 +806,9 @@ class FSImage extends Storage {
 
       // load Files Under Construction
       this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
+      
+      // update the count of each directory with quota
+      fsDir.updateCountForINodeWithQuota();
     } finally {
       in.close();
     }
@@ -847,7 +861,7 @@ class FSImage extends Storage {
     try {
       out.writeInt(FSConstants.LAYOUT_VERSION);
       out.writeInt(namespaceID);
-      out.writeInt(fsDir.rootDir.numItemsInTree());
+      out.writeLong(fsDir.rootDir.numItemsInTree());
       out.writeLong(fsNamesys.getGenerationStamp());
       byte[] byteStore = new byte[4*FSConstants.MAX_PATH_LENGTH];
       ByteBuffer strbuf = ByteBuffer.wrap(byteStore);
@@ -958,6 +972,7 @@ class FSImage extends Storage {
       out.writeLong(node.getModificationTime());
       out.writeLong(0);   // preferred block size
       out.writeInt(-1);    // # of blocks
+      out.writeLong(node.getQuota());
       FILE_PERM.fromShort(node.getFsPermissionShort());
       PermissionStatus.write(out, node.getUserName(),
                              node.getGroupName(),

+ 30 - 0
src/java/org/apache/hadoop/dfs/FSNamesystem.java

@@ -1590,6 +1590,36 @@ class FSNamesystem implements FSConstants, FSNamesystemMBean {
     return dir.getContentSummary(src);
   }
 
+  /**
+   * Set the quota for a directory.
+   * @param path The string representation of the path to the directory
+   * @param quota The limit of the number of names in or below the directory
+   * @throws IOException if the path is not a directory or the number of
+   * existing names in or below the directory is greater than the given quota
+   */
+  void setQuota(String path, long quota) throws IOException {
+    if (isPermissionEnabled) {
+      checkSuperuserPrivilege();
+    }
+    
+    dir.setQuota(path, quota);
+    getEditLog().logSync();
+  }
+  
+  /**
+   * Remove the quota for a directory
+   * @param path The string representation of the path to the directory
+   * @throws IOException if the path is not a directory
+   */
+  void clearQuota(String path) throws IOException {
+    if (isPermissionEnabled) {
+      checkSuperuserPrivilege();
+    }
+    
+    dir.clearQuota(path);
+    getEditLog().logSync();
+  }
+  
   /** Persist all metadata about this file.
    * @param src The string representation of the path
    * @param clientName The string representation of the client

+ 161 - 9
src/java/org/apache/hadoop/dfs/INode.java

@@ -85,6 +85,17 @@ abstract class INode implements Comparable<byte[]> {
     this(permissions, 0L);
     setLocalName(name);
   }
+  
+  /** copy constructor
+   * 
+   * @param other Other node to be copied
+   */
+  INode(INode other) {
+    setLocalName(other.getLocalName());
+    this.parent = other.getParent();
+    setPermissionStatus(other.getPermissionStatus());
+    setModificationTime(other.getModificationTime());
+  }
 
   /**
    * Check whether this is the root inode.
@@ -154,14 +165,31 @@ abstract class INode implements Comparable<byte[]> {
   /** Compute {@link ContentSummary}. */
   final ContentSummary computeContentSummary() {
     long[] a = computeContentSummary(new long[]{0,0,0});
-    return new ContentSummary(a[0], a[1], a[2]);
+    return new ContentSummary(a[0], a[1], a[2], getQuota());
   }
   /**
    * @return an array of three longs. 
    * 0: length, 1: file count, 2: directory count
    */
   abstract long[] computeContentSummary(long[] summary);
+  
+  /**
+   * Get the quota set for this inode
+   * @return the quota if it is set; -1 otherwise
+   */
+  long getQuota() {
+    return -1;
+  }
 
+  /**
+   * Get the total number of names in the tree
+   * rooted at this inode including the root
+   * @return The total number of names in this tree
+   */
+  long numItemsInTree() {
+    return 1;
+  }
+    
   /**
    * Get local file name
    * @return local file name
@@ -361,6 +389,21 @@ class INodeDirectory extends INode {
     this.children = null;
   }
 
+  /** constructor */
+  INodeDirectory(byte[] localName, PermissionStatus permissions, long mTime) {
+    this(permissions, mTime);
+    this.name = localName;
+  }
+  
+  /** copy constructor
+   * 
+   * @param other
+   */
+  INodeDirectory(INodeDirectory other) {
+    super(other);
+    this.children = other.getChildren();
+  }
+  
   /**
    * Check whether it's a directory
    */
@@ -368,14 +411,32 @@ class INodeDirectory extends INode {
     return true;
   }
 
-  void removeChild(INode node) {
+  INode removeChild(INode node) {
     assert children != null;
     int low = Collections.binarySearch(children, node.name);
     if (low >= 0) {
-      children.remove(low);
+      return children.remove(low);
+    } else {
+      return null;
     }
   }
 
+  /** Replace a child that has the same name as newChild by newChild.
+   * 
+   * @param newChild Child node to be added
+   */
+  void replaceChild(INode newChild) {
+    if ( children == null ) {
+      throw new IllegalArgumentException("The directory is empty");
+    }
+    int low = Collections.binarySearch(children, newChild.name);
+    if (low>=0) { // an old child exists so replace by the newChild
+      children.set(low, newChild);
+    } else {
+      throw new IllegalArgumentException("No child exists to be replaced");
+    }
+  }
+  
   INode getChild(String name) {
     return getChildINode(string2Bytes(name));
   }
@@ -584,16 +645,13 @@ class INodeDirectory extends INode {
 
   /**
    */
-  int numItemsInTree() {
-    int total = 1;
+  long numItemsInTree() {
+    long total = 1L;
     if (children == null) {
       return total;
     }
     for (INode child : children) {
-      if(!child.isDirectory())
-        total++;
-      else
-        total += ((INodeDirectory)child).numItemsInTree();
+      total += child.numItemsInTree();
     }
     return total;
   }
@@ -632,6 +690,100 @@ class INodeDirectory extends INode {
   }
 }
 
+/**
+ * Directory INode class that has a quota restriction
+ */
+class INodeDirectoryWithQuota extends INodeDirectory {
+  private long quota;
+  private long count;
+  
+  /** Convert an existing directory inode to one with the given quota
+   * 
+   * @param quota Quota to be assigned to this inode
+   * @param other The other inode from which all other properties are copied
+   */
+  INodeDirectoryWithQuota(long quota, INodeDirectory other)
+  throws QuotaExceededException {
+    super(other);
+    this.count = other.numItemsInTree();
+    setQuota(quota);
+  }
+  
+  /** constructor with no quota verification */
+  INodeDirectoryWithQuota(
+      PermissionStatus permissions, long modificationTime, long quota)
+  {
+    super(permissions, modificationTime);
+    this.quota = quota;
+  }
+  
+  /** constructor with no quota verification */
+  INodeDirectoryWithQuota(String name, PermissionStatus permissions, long quota)
+  {
+    super(name, permissions);
+    this.quota = quota;
+  }
+  
+  /** Get this directory's quota
+   * @return this directory's quota
+   */
+  long getQuota() {
+    return quota;
+  }
+  
+  /** Set this directory's quota
+   * 
+   * @param quota Quota to be set
+   * @throws QuotaExceededException if the given quota is less than 
+   *                                the size of the tree
+   */
+  void setQuota(long quota) throws QuotaExceededException {
+    verifyQuota(quota, this.count);
+    this.quota = quota;
+  }
+  
+  /** Get the number of names in the subtree rooted at this directory
+   * @return the size of the subtree rooted at this directory
+   */
+  long numItemsInTree() {
+    return count;
+  }
+  
+  /** Update the size of the tree
+   * 
+   * @param delta the change of the tree size
+   * @throws QuotaExceededException if the changed size is greater 
+   *                                than the quota
+   */
+  void updateNumItemsInTree(long delta) throws QuotaExceededException {
+    long newCount = this.count + delta;
+    if (delta>0) {
+      verifyQuota(this.quota, newCount);
+    }
+    this.count = newCount;
+  }
+  
+  /** Set the size of the tree rooted at this directory
+   * 
+   * @param count size of the directory to be set
+   * @throws QuotaExceededException if the given count is greater than quota
+   */
+  void setCount(long count) throws QuotaExceededException {
+    verifyQuota(this.quota, count);
+    this.count = count;
+  }
+  
+  /** Verify if the count satisfies the quota restriction 
+   * @throws QuotaExceededException if the given quota is less than the count
+   */
+  private static void verifyQuota(long quota, long count)
+  throws QuotaExceededException {
+    if (quota < count) {
+      throw new QuotaExceededException(quota, count);
+    }
+  }
+}
+
 class INodeFile extends INode {
   static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
 

+ 10 - 0
src/java/org/apache/hadoop/dfs/NameNode.java

@@ -549,6 +549,16 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
     return namesystem.getContentSummary(path);
   }
 
+  /** {@inheritDoc} */
+  public void setQuota(String path, long quota) throws IOException {
+    namesystem.setQuota(path, quota);
+  }
+  
+  /** {@inheritDoc} */
+  public void clearQuota(String path) throws IOException {
+    namesystem.clearQuota(path);
+  }
+  
   /** {@inheritDoc} */
   public void fsync(String src, String clientName) throws IOException {
     namesystem.fsync(src, clientName);

+ 64 - 3
src/java/org/apache/hadoop/fs/ContentSummary.java

@@ -28,6 +28,7 @@ public class ContentSummary implements Writable{
   private long length;
   private long fileCount;
   private long directoryCount;
+  private long quota;
 
   /** Constructor */
   public ContentSummary() {}
@@ -37,6 +38,16 @@ public class ContentSummary implements Writable{
     this.length = length;
     this.fileCount = fileCount;
     this.directoryCount = directoryCount;
+    this.quota = -1L;
+  }
+
+  /** Constructor */
+  public ContentSummary(
+      long length, long fileCount, long directoryCount, long quota) {
+    this.length = length;
+    this.fileCount = fileCount;
+    this.directoryCount = directoryCount;
+    this.quota = quota;
   }
 
   /** @return the length */
@@ -48,11 +59,15 @@ public class ContentSummary implements Writable{
   /** @return the file count */
   public long getFileCount() {return fileCount;}
   
+  /** Return the directory quota */
+  public long getQuota() {return quota;}
+  
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     out.writeLong(length);
     out.writeLong(fileCount);
     out.writeLong(directoryCount);
+    out.writeLong(quota);
   }
 
   /** {@inheritDoc} */
@@ -60,6 +75,7 @@ public class ContentSummary implements Writable{
     this.length = in.readLong();
     this.fileCount = in.readLong();
     this.directoryCount = in.readLong();
+    this.quota = in.readLong();
   }
   
   /** 
@@ -68,13 +84,58 @@ public class ContentSummary implements Writable{
    *    DIR_COUNT   FILE_COUNT       CONTENT_SIZE FILE_NAME    
    */
   private static final String STRING_FORMAT = "%12d %12d %18d ";
+  /** 
+   * Output format:
+   * <----12----> <----15----> <----12----> <----12----> <-------18------->
+   *    QUOTA   REMAINING_QUATA  DIR_COUNT   FILE_COUNT        CONTENT_SIZE FILE_NAME    
+   */
+  private static final String QUOTA_STRING_FORMAT = "%12d %15d "+STRING_FORMAT;
+  private static final String NON_QUOTA_STRING_FORMAT =
+    "%12s %15s "+STRING_FORMAT;
 
   /** The header string */
-  static String HEADER = String.format(
+  private static final String HEADER = String.format(
       STRING_FORMAT.replace('d', 's'), "directories", "files", "bytes");
 
+  private static final String QUOTA_HEADER = String.format(
+      QUOTA_STRING_FORMAT.replace('d', 's'), 
+      "quota", "remaining quota", "directories", "files", "bytes");
+  
+  /** Return the header of the output.
+   * if qOption is false, output directory count, file count, and content size;
+   * if qOption is true, output quota and remaining quota as well.
+   * 
+   * @param qOption a flag indicating if quota needs to be printed or not
+   * @return the header of the output
+   */
+  public static String getHeader(boolean qOption) {
+    return qOption ? QUOTA_HEADER : HEADER;
+  }
+  
   /** {@inheritDoc} */
   public String toString() {
-    return String.format(STRING_FORMAT, directoryCount, fileCount, length);
+    return toString(true);
+  }
+
+  /** Return the string representation of the object in the output format.
+   * if qOption is false, output directory count, file count, and content size;
+   * if qOption is true, output quota and remaining quota as well.
+   * 
+   * @param qOption a flag indicating if quota needs to be printed or not
+   * @return the string representation of the object
+   */
+  public String toString(boolean qOption) {
+    if (qOption) {
+      if (quota>0) {
+        long remainingQuota = quota-(directoryCount+fileCount);
+        return String.format(QUOTA_STRING_FORMAT, quota, remainingQuota,
+            directoryCount, fileCount, length);
+      } else {
+        return String.format(NON_QUOTA_STRING_FORMAT, "none", "inf",
+            directoryCount, fileCount, length);
+      }
+    } else {
+      return String.format(STRING_FORMAT, directoryCount, fileCount, length);
+    }
   }
-}
+}

+ 26 - 62
src/java/org/apache/hadoop/fs/FsShell.java

@@ -30,6 +30,7 @@ import java.util.zip.GZIPInputStream;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.fs.shell.Count;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -175,8 +176,8 @@ public class FsShell extends Configured implements Tool {
       System.err.println("Usage: java FsShell " + GET_SHORT_USAGE);
       throw iae;
     }
-    final boolean copyCrc = cf.options.get("crc");
-    final boolean verifyChecksum = !cf.options.get("ignoreCrc");
+    final boolean copyCrc = cf.getOpt("crc");
+    final boolean verifyChecksum = !cf.getOpt("ignoreCrc");
 
     if (dststr.equals("-")) {
       if (copyCrc) {
@@ -420,42 +421,6 @@ public class FsShell extends Configured implements Tool {
     }.globAndProcess(srcPattern, srcPattern.getFileSystem(getConf()));
   }
 
-  /**
-   * Parse the args of a command and check the format of args.
-   */
-  static class CommandFormat {
-    final String name;
-    final int minPar, maxPar;
-    final Map<String, Boolean> options = new HashMap<String, Boolean>();
-
-    CommandFormat(String n, int min, int max, String ... possibleOpt) {
-      name = n;
-      minPar = min;
-      maxPar = max;
-      for(String opt : possibleOpt)
-        options.put(opt, Boolean.FALSE);
-    }
-
-    List<String> parse(String[] args, int pos) {
-      List<String> parameters = new ArrayList<String>();
-      for(; pos < args.length; pos++) {
-        if (args[pos].charAt(0) == '-' && args[pos].length() > 1) {
-          String opt = args[pos].substring(1);
-          if (options.containsKey(opt))
-            options.put(opt, Boolean.TRUE);
-          else
-            throw new IllegalArgumentException("Illegal option " + args[pos]);
-        }
-        else
-          parameters.add(args[pos]);
-      }
-      int psize = parameters.size();
-      if (psize < minPar || psize > maxPar)
-        throw new IllegalArgumentException("Illegal number of arguments");
-      return parameters;
-    }
-  }
-
   /**
    * Parse the incoming command string
    * @param cmd
@@ -485,8 +450,8 @@ public class FsShell extends Configured implements Tool {
       throw new IllegalArgumentException("replication must be >= 1");
     }
 
-    List<Path> waitList = c.options.get("w")? new ArrayList<Path>(): null;
-    setReplication(rep, dst, c.options.get("R"), waitList);
+    List<Path> waitList = c.getOpt("w")? new ArrayList<Path>(): null;
+    setReplication(rep, dst, c.getOpt("R"), waitList);
 
     if (waitList != null) {
       waitForReplication(waitList, rep);
@@ -1117,7 +1082,7 @@ public class FsShell extends Configured implements Tool {
       System.err.println("Usage: java FsShell " + TAIL_USAGE);
       throw iae;
     }
-    boolean foption = c.options.get("f") ? true: false;
+    boolean foption = c.getOpt("f") ? true: false;
     path = new Path(src);
     FileSystem srcFs = path.getFileSystem(getConf());
     if (srcFs.isDirectory(path)) {
@@ -1493,7 +1458,7 @@ public class FsShell extends Configured implements Tool {
       System.out.println(chown);
     } else if ("chgrp".equals(cmd)) {
       System.out.println(chgrp);
-    } else if (Count.NAME.equals(cmd)) {
+    } else if (Count.matches(cmd)) {
       System.out.println(Count.DESCRIPTION);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
@@ -1532,8 +1497,7 @@ public class FsShell extends Configured implements Tool {
    * Apply operation specified by 'cmd' on all parameters
    * starting from argv[startindex].
    */
-  private int doall(String cmd, String argv[], Configuration conf, 
-                    int startindex) {
+  private int doall(String cmd, String argv[], int startindex) {
     int exitCode = 0;
     int i = startindex;
     //
@@ -1557,7 +1521,7 @@ public class FsShell extends Configured implements Tool {
         } else if ("-dus".equals(cmd)) {
           dus(argv[i]);
         } else if (Count.matches(cmd)) {
-          Count.count(argv[i], getConf(), System.out);
+          new Count(argv, i, fs).runAll();
         } else if ("-ls".equals(cmd)) {
           ls(argv[i], false);
         } else if ("-lsr".equals(cmd)) {
@@ -1602,7 +1566,7 @@ public class FsShell extends Configured implements Tool {
    * Displays format of commands.
    * 
    */
-  void printUsage(String cmd) {
+  private static void printUsage(String cmd) {
     String prefix = "Usage: java " + FsShell.class.getSimpleName();
     if ("-fs".equals(cmd)) {
       System.err.println("Usage: java FsShell" + 
@@ -1761,9 +1725,9 @@ public class FsShell extends Configured implements Tool {
         else
           copyMergeToLocal(argv[i++], new Path(argv[i++]));
       } else if ("-cat".equals(cmd)) {
-        exitCode = doall(cmd, argv, getConf(), i);
+        exitCode = doall(cmd, argv, i);
       } else if ("-text".equals(cmd)) {
-        exitCode = doall(cmd, argv, getConf(), i);
+        exitCode = doall(cmd, argv, i);
       } else if ("-moveToLocal".equals(cmd)) {
         moveToLocal(argv[i++], new Path(argv[i++]));
       } else if ("-setrep".equals(cmd)) {
@@ -1774,13 +1738,13 @@ public class FsShell extends Configured implements Tool {
         FsShellPermissions.changePermissions(fs, cmd, argv, i, this);
       } else if ("-ls".equals(cmd)) {
         if (i < argv.length) {
-          exitCode = doall(cmd, argv, getConf(), i);
+          exitCode = doall(cmd, argv, i);
         } else {
           ls(Path.CUR_DIR, false);
         } 
       } else if ("-lsr".equals(cmd)) {
         if (i < argv.length) {
-          exitCode = doall(cmd, argv, getConf(), i);
+          exitCode = doall(cmd, argv, i);
         } else {
           ls(Path.CUR_DIR, true);
         } 
@@ -1789,33 +1753,29 @@ public class FsShell extends Configured implements Tool {
       } else if ("-cp".equals(cmd)) {
         exitCode = copy(argv, getConf());
       } else if ("-rm".equals(cmd)) {
-        exitCode = doall(cmd, argv, getConf(), i);
+        exitCode = doall(cmd, argv, i);
       } else if ("-rmr".equals(cmd)) {
-        exitCode = doall(cmd, argv, getConf(), i);
+        exitCode = doall(cmd, argv, i);
       } else if ("-expunge".equals(cmd)) {
         expunge();
       } else if ("-du".equals(cmd)) {
         if (i < argv.length) {
-          exitCode = doall(cmd, argv, getConf(), i);
+          exitCode = doall(cmd, argv, i);
         } else {
           du(".");
         }
       } else if ("-dus".equals(cmd)) {
         if (i < argv.length) {
-          exitCode = doall(cmd, argv, getConf(), i);
+          exitCode = doall(cmd, argv, i);
         } else {
           dus(".");
         }         
       } else if (Count.matches(cmd)) {
-        if (i < argv.length) {
-          exitCode = doall(cmd, argv, getConf(), i);
-        } else {
-          Count.count(".", getConf(), System.out);
-        }         
+        exitCode = new Count(argv, i, fs).runAll();
       } else if ("-mkdir".equals(cmd)) {
-        exitCode = doall(cmd, argv, getConf(), i);
+        exitCode = doall(cmd, argv, i);
       } else if ("-touchz".equals(cmd)) {
-        exitCode = doall(cmd, argv, getConf(), i);
+        exitCode = doall(cmd, argv, i);
       } else if ("-test".equals(cmd)) {
         exitCode = test(argv, i);
       } else if ("-stat".equals(cmd)) {
@@ -1837,6 +1797,10 @@ public class FsShell extends Configured implements Tool {
         System.err.println(cmd.substring(1) + ": Unknown command");
         printUsage("");
       }
+    } catch (IllegalArgumentException arge) {
+      exitCode = -1;
+      System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
+      printUsage(cmd);
     } catch (RemoteException e) {
       //
       // This is a error returned by hadoop server. Print
@@ -1858,7 +1822,7 @@ public class FsShell extends Configured implements Tool {
       exitCode = -1;
       System.err.println(cmd.substring(1) + ": " + 
                          e.getLocalizedMessage());  
-    } catch (RuntimeException re) {
+    } catch (Exception re) {
       exitCode = -1;
       System.err.println(cmd.substring(1) + ": " + re.getLocalizedMessage());  
     } finally {

+ 84 - 0
src/java/org/apache/hadoop/fs/shell/Command.java

@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.shell;
+
+import java.io.*;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
+
+/**
+ * An abstract class for the execution of a file system command
+ */
+abstract public class Command {
+  final protected FileSystem fs;
+  protected String[] args;
+  
+  /** Constructor */
+  protected Command(FileSystem fs) {
+    this.fs = fs;
+  }
+  
+  /** Return the command's name excluding the leading character - */
+  abstract public String getCommandName();
+  
+  /** 
+   * Execute the command on the input path
+   * 
+   * @param path the input path
+   * @throws IOException if any error occurs
+   */
+  abstract protected void run(Path path) throws IOException;
+  
+  /** 
+   * For each source path, execute the command
+   * 
+   * @return 0 if it runs successfully; -1 if it fails
+   */
+  public int runAll() {
+    int exitCode = 0;
+    for (String src : args) {
+      try {
+        Path srcPath = new Path(src);
+        FileStatus[] statuses = fs.globStatus(srcPath);
+        if (statuses == null) {
+          System.err.println("Can not find listing for " + src);
+          exitCode = -1;
+        } else {
+          for(FileStatus s : statuses) {
+            run(s.getPath());
+          }
+        }
+      } catch (RemoteException re) {
+        exitCode = -1;
+        String content = re.getLocalizedMessage();
+        int eol = content.indexOf('\n');
+        if (eol>=0) {
+          content = content.substring(0, eol);
+        }
+        System.err.println(getCommandName() + ": " + content);
+      } catch (IOException e) {
+        exitCode = -1;
+        System.err.println(getCommandName() + ": " + e.getLocalizedMessage());
+      }
+    }
+    return exitCode;
+  }
+}

+ 75 - 0
src/java/org/apache/hadoop/fs/shell/CommandFormat.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.shell;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Parse the args of a command and check the format of args.
+ */
+public class CommandFormat {
+  final String name;
+  final int minPar, maxPar;
+  final Map<String, Boolean> options = new HashMap<String, Boolean>();
+
+  /** constructor */
+  public CommandFormat(String n, int min, int max, String ... possibleOpt) {
+    name = n;
+    minPar = min;
+    maxPar = max;
+    for(String opt : possibleOpt)
+      options.put(opt, Boolean.FALSE);
+  }
+
+  /** Parse parameters starting from the given position
+   * 
+   * @param args an array of input arguments
+   * @param pos the position at which starts to parse
+   * @return a list of parameters
+   */
+  public List<String> parse(String[] args, int pos) {
+    List<String> parameters = new ArrayList<String>();
+    for(; pos < args.length; pos++) {
+      if (args[pos].charAt(0) == '-' && args[pos].length() > 1) {
+        String opt = args[pos].substring(1);
+        if (options.containsKey(opt))
+          options.put(opt, Boolean.TRUE);
+        else
+          throw new IllegalArgumentException("Illegal option " + args[pos]);
+      }
+      else
+        parameters.add(args[pos]);
+    }
+    int psize = parameters.size();
+    if (psize < minPar || psize > maxPar)
+      throw new IllegalArgumentException("Illegal number of arguments");
+    return parameters;
+  }
+  
+  /** Return if the option is set or not
+   * 
+   * @param option String representation of an option
+   * @return true is the option is set; false otherwise
+   */
+  public boolean getOpt(String option) {
+    return options.get(option);
+  }
+}

+ 39 - 21
src/java/org/apache/hadoop/fs/shell/Count.java

@@ -18,40 +18,58 @@
 package org.apache.hadoop.fs.shell;
 
 import java.io.*;
+import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 /**
- * Count the number of directories, files and bytes.
+ * Count the number of directories, files, bytes, quota, and remaining quota.
  */
-public class Count {
+public class Count extends Command {
   public static final String NAME = "count";
-  public static final String USAGE = "-" + NAME + " <path>";
+  public static final String USAGE = "-" + NAME + "[-q] <path>";
   public static final String DESCRIPTION = CommandUtils.formatDescription(USAGE, 
       "Count the number of directories, files and bytes under the paths",
       "that match the specified file pattern.  The output columns are:",
-      "DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME");
+      "DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME or",
+      "QUOTA REMAINING_QUATA DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME");
+  
+  private boolean qOption;
 
+  /** Constructor
+   * 
+   * @param cmd the count command
+   * @param pos the starting index of the arguments 
+   * @param fs the file system handler
+   */
+  public Count(String[] cmd, int pos, FileSystem fs) {
+    super(fs);
+    CommandFormat c = new CommandFormat(NAME, 1, Integer.MAX_VALUE, "q");
+    List<String> parameters = c.parse(cmd, pos);
+    this.args = parameters.toArray(new String[parameters.size()]);
+    if (this.args.length == 0) { // default path is the current working directory
+      this.args = new String[] {"."};
+    }
+    this.qOption = c.getOpt("q") ? true: false;
+  }
+  
+  /** Check if a command is the count command
+   * 
+   * @param cmd A string representation of a command starting with "-"
+   * @return true if this is a count command; false otherwise
+   */
   public static boolean matches(String cmd) {
     return ("-" + NAME).equals(cmd); 
   }
 
-  public static void count(String src, Configuration conf, PrintStream out
-      ) throws IOException {
-    Path srcPath = new Path(src);
-    FileSystem srcFs = srcPath.getFileSystem(conf);
-    FileStatus[] statuses = srcFs.globStatus(srcPath);
-    if (statuses == null || statuses.length == 0) {
-      throw new FileNotFoundException(src + " not found.");
-    }
-    for(FileStatus s : statuses) {
-      Path p = s.getPath();
-      String pathstr = p.toString();
-      out.println(srcFs.getContentSummary(p)
-          + ("".equals(pathstr)? ".": pathstr));
-    }
+  @Override
+  public String getCommandName() {
+    return NAME;
+  }
+
+  @Override
+  protected void run(Path path) throws IOException {
+    System.out.println(fs.getContentSummary(path).toString(qOption) + path);
   }
-}
+}

+ 6 - 1
src/java/org/apache/hadoop/ipc/RemoteException.java

@@ -82,7 +82,12 @@ public class RemoteException extends IOException {
       throws Exception {
     Constructor<? extends IOException> cn = cls.getConstructor(String.class);
     cn.setAccessible(true);
-    IOException ex = cn.newInstance(this.getMessage());
+    String firstLine = this.getMessage();
+    int eol = firstLine.indexOf('\n');
+    if (eol>=0) {
+      firstLine = firstLine.substring(0, eol);
+    }
+    IOException ex = cn.newInstance(firstLine);
     ex.initCause(this);
     return ex;
   }

+ 22 - 14
src/test/org/apache/hadoop/dfs/TestDFSShell.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.shell.*;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ToolRunner;
@@ -669,10 +670,10 @@ public class TestDFSShell extends TestCase {
       String root = createTree(dfs, "count");
 
       // Verify the counts
-      runCount(root, 2, 4, conf);
-      runCount(root + "2", 2, 1, conf);
-      runCount(root + "2/f1", 0, 1, conf);
-      runCount(root + "2/sub", 1, 0, conf);
+      runCount(root, 2, 4, dfs);
+      runCount(root + "2", 2, 1, dfs);
+      runCount(root + "2/f1", 0, 1, dfs);
+      runCount(root + "2/sub", 1, 0, dfs);
     } finally {
       try {
         dfs.close();
@@ -681,18 +682,25 @@ public class TestDFSShell extends TestCase {
       cluster.shutdown();
     }
   }
-  private void runCount(String path, long dirs, long files, Configuration conf
-      ) throws IOException {
+  private void runCount(String path, long dirs, long files, FileSystem fs
+    ) throws IOException {
     ByteArrayOutputStream bytes = new ByteArrayOutputStream(); 
     PrintStream out = new PrintStream(bytes);
-    Count.count(path, conf, out);
-    String results = bytes.toString();
-    System.out.println(results);
-    Scanner in = new Scanner(results);
-    assertEquals(dirs, in.nextLong());
-    assertEquals(files, in.nextLong());
-    in.close();
-    out.close();
+    PrintStream oldOut = System.out;
+    System.setOut(out);
+    Scanner in = null;
+    try {
+      new Count(new String[]{path}, 0, fs).runAll();
+      String results = bytes.toString();
+      System.out.println(results);
+      in = new Scanner(results);
+      assertEquals(dirs, in.nextLong());
+      assertEquals(files, in.nextLong());
+    } finally {
+      if (in!=null) in.close();
+      IOUtils.closeStream(out);
+      System.setOut(oldOut);
+    }
   }
 
   //throws IOException instead of Exception as shell.run() does.

+ 333 - 0
src/test/org/apache/hadoop/dfs/TestQuota.java

@@ -0,0 +1,333 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+
+import junit.framework.TestCase;
+
+/** A class for testing quota-related commands */
+public class TestQuota extends TestCase {
+  private void runCommand(DFSAdmin admin, String args[], boolean expectEror)
+  throws Exception {
+    int val = admin.run(args);
+    if (expectEror) {
+      assertEquals(val, -1);
+    } else {
+      assertTrue(val>=0);
+    }
+  }
+  
+  /** Test quota related commands: setQuota, clrQuota, and count */
+  public void testQuotaCommands() throws Exception {
+    final Configuration conf = new Configuration();
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    final FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+                fs instanceof DistributedFileSystem);
+    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+    DFSAdmin admin = new DFSAdmin(conf);
+    
+    try {
+      // 1: create a directory /test and set its quota to be 3
+      final Path parent = new Path("/test");
+      assertTrue(dfs.mkdirs(parent));
+      String[] args = new String[]{"-setQuota", "3", parent.toString()};
+      runCommand(admin, args, false);
+      
+      // 2: create directory /test/data0
+      final Path childDir0 = new Path(parent, "data0");
+      assertTrue(dfs.mkdirs(childDir0));
+
+      // 3: create a file /test/datafile0
+      final Path childFile0 = new Path(parent, "datafile0");
+      OutputStream fout = dfs.create(childFile0);
+      fout.close();
+      
+      // 4: count -q /test
+      ContentSummary c = dfs.getContentSummary(parent);
+      assertEquals(c.getFileCount()+c.getDirectoryCount(), 3);
+      assertEquals(c.getQuota(), 3);
+      
+      // 5: count -q /test/data0
+      c = dfs.getContentSummary(childDir0);
+      assertEquals(c.getFileCount()+c.getDirectoryCount(), 1);
+      assertEquals(c.getQuota(), -1);
+
+      // 6: create a directory /test/data1
+      final Path childDir1 = new Path(parent, "data1");
+      boolean hasException = false;
+      try {
+        assertFalse(dfs.mkdirs(childDir1));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+      
+      // 7: create a file /test/datafile1
+      final Path childFile1 = new Path(parent, "datafile1");
+      hasException = false;
+      try {
+        fout = dfs.create(childFile1);
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+      
+      // 8: clear quota /test
+      runCommand(admin, new String[]{"-clrQuota", parent.toString()}, false);
+      c = dfs.getContentSummary(parent);
+      assertEquals(c.getQuota(), -1);
+      
+      // 9: clear quota /test/data0
+      runCommand(admin, new String[]{"-clrQuota", childDir0.toString()}, false);
+      c = dfs.getContentSummary(childDir0);
+      assertEquals(c.getQuota(), -1);
+      
+      // 10: create a file /test/datafile1
+      fout = dfs.create(childFile1);
+      fout.close();
+      
+      // 11: set the quota of /test to be 1
+      args = new String[]{"-setQuota", "1", parent.toString()};
+      runCommand(admin, args, true);
+      
+      // 12: set the quota of /test/data0 to be 1
+      args = new String[]{"-setQuota", "1", childDir0.toString()};
+      runCommand(admin, args, false);
+      
+      // 13: not able create a directory under data0
+      hasException = false;
+      try {
+        assertFalse(dfs.mkdirs(new Path(childDir0, "in")));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+      c = dfs.getContentSummary(childDir0);
+      assertEquals(c.getDirectoryCount()+c.getFileCount(), 1);
+      assertEquals(c.getQuota(), 1);
+      
+      // 14a: set quota on a non-existent directory
+      Path nonExistentPath = new Path("/test1");
+      assertFalse(dfs.exists(nonExistentPath));
+      args = new String[]{"-setQuota", "1", nonExistentPath.toString()};
+      runCommand(admin, args, true);
+      
+      // 14b: set quota on a file
+      assertTrue(dfs.isFile(childFile0));
+      args[1] = childFile0.toString();
+      runCommand(admin, args, true);
+      
+      // 15a: clear quota on a file
+      args[0] = "-clrQuota";
+      runCommand(admin, args, true);
+      
+      // 15b: clear quota on a non-existent directory
+      args[1] = nonExistentPath.toString();
+      runCommand(admin, args, true);
+
+      // 16a: set the quota of /test to be 0
+      args = new String[]{"-setQuota", "0", parent.toString()};
+      runCommand(admin, args, true);
+      
+      // 16b: set the quota of /test to be -1
+      args[1] = "-1";
+      runCommand(admin, args, true);
+      
+      // 16c: set the quota of /test to be Long.MAX_VALUE+1
+      args[1] = String.valueOf(Long.MAX_VALUE+1L);
+      runCommand(admin, args, true);
+      
+      // 16d: set the quota of /test to be a non integer
+      args[1] = "33aa1.5";
+      runCommand(admin, args, true);
+      
+      // 17:  setQuota by a non-administrator
+      UnixUserGroupInformation.saveToConf(conf, 
+          UnixUserGroupInformation.UGI_PROPERTY_NAME, 
+          new UnixUserGroupInformation(new String[]{"userxx\n", "groupyy\n"}));
+      DFSAdmin userAdmin = new DFSAdmin(conf);
+      args[1] = "100";
+      runCommand(userAdmin, args, true);
+      
+      // 18: clrQuota by a non-administrator
+      args = new String[] {"-clrQuota", parent.toString()};
+      runCommand(userAdmin, args, true);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+  
+  /** Test commands that change the size of the name space:
+   *  mkdirs, rename, and delete */
+  public void testNamespaceCommands() throws Exception {
+    final Configuration conf = new Configuration();
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    final FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+                fs instanceof DistributedFileSystem);
+    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+    
+    try {
+      // 1: create directory /nqdir0/qdir1/qdir20/nqdir30
+      assertTrue(dfs.mkdirs(new Path("/nqdir0/qdir1/qdir20/nqdir30")));
+
+      // 2: set the quota of /nqdir0/qdir1 to be 6
+      final Path quotaDir1 = new Path("/nqdir0/qdir1");
+      dfs.setQuota(quotaDir1, 6);
+      ContentSummary c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getDirectoryCount(), 3);
+      assertEquals(c.getQuota(), 6);
+
+      // 3: set the quota of /nqdir0/qdir1/qdir20 to be 7
+      final Path quotaDir2 = new Path("/nqdir0/qdir1/qdir20");
+      dfs.setQuota(quotaDir2, 7);
+      c = dfs.getContentSummary(quotaDir2);
+      assertEquals(c.getDirectoryCount(), 2);
+      assertEquals(c.getQuota(), 7);
+
+      // 4: Create directory /nqdir0/qdir1/qdir21 and set its quota to 2
+      final Path quotaDir3 = new Path("/nqdir0/qdir1/qdir21");
+      assertTrue(dfs.mkdirs(quotaDir3));
+      dfs.setQuota(quotaDir3, 2);
+      c = dfs.getContentSummary(quotaDir3);
+      assertEquals(c.getDirectoryCount(), 1);
+      assertEquals(c.getQuota(), 2);
+
+      // 5: Create directory /nqdir0/qdir1/qdir21/nqdir32
+      Path tempPath = new Path(quotaDir3, "nqdir32");
+      assertTrue(dfs.mkdirs(tempPath));
+      c = dfs.getContentSummary(quotaDir3);
+      assertEquals(c.getDirectoryCount(), 2);
+      assertEquals(c.getQuota(), 2);
+
+      // 6: Create directory /nqdir0/qdir1/qdir21/nqdir33
+      tempPath = new Path(quotaDir3, "nqdir33");
+      boolean hasException = false;
+      try {
+        assertFalse(dfs.mkdirs(tempPath));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+      c = dfs.getContentSummary(quotaDir3);
+      assertEquals(c.getDirectoryCount(), 2);
+      assertEquals(c.getQuota(), 2);
+
+      // 7: Create directory /nqdir0/qdir1/qdir20/nqdir31
+      tempPath = new Path(quotaDir2, "nqdir31");
+      assertTrue(dfs.mkdirs(tempPath));
+      c = dfs.getContentSummary(quotaDir2);
+      assertEquals(c.getDirectoryCount(), 3);
+      assertEquals(c.getQuota(), 7);
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getDirectoryCount(), 6);
+      assertEquals(c.getQuota(), 6);
+
+      // 8: Create directory /nqdir0/qdir1/qdir20/nqdir33
+      tempPath = new Path(quotaDir2, "nqdir33");
+      hasException = false;
+      try {
+        assertFalse(dfs.mkdirs(tempPath));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+
+      // 9: Move /nqdir0/qdir1/qdir21/nqdir32 /nqdir0/qdir1/qdir20/nqdir30
+      tempPath = new Path(quotaDir2, "nqdir30");
+      dfs.rename(new Path(quotaDir3, "nqdir32"), tempPath);
+      c = dfs.getContentSummary(quotaDir2);
+      assertEquals(c.getDirectoryCount(), 4);
+      assertEquals(c.getQuota(), 7);
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getDirectoryCount(), 6);
+      assertEquals(c.getQuota(), 6);
+
+      // 10: Move /nqdir0/qdir1/qdir21/nqdir30 to /nqdir0/qdir1/qdir21
+      hasException = false;
+      try {
+        assertFalse(dfs.rename(tempPath, quotaDir3));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+
+      // 11: Move /nqdir0/qdir1/qdir20/nqdir30 to /nqdir0
+      assertTrue(dfs.rename(tempPath, new Path("/nqdir0")));
+      c = dfs.getContentSummary(quotaDir2);
+      assertEquals(c.getDirectoryCount(), 2);
+      assertEquals(c.getQuota(), 7);
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getDirectoryCount(), 4);
+      assertEquals(c.getQuota(), 6);
+
+      // 12: Create directory /nqdir0/nqdir30/nqdir33
+      assertTrue(dfs.mkdirs(new Path("/nqdir0/nqdir30/nqdir33")));
+
+      // 13: Move /nqdir0/nqdir30 /nqdir0/qdir1/qdir20/qdir30
+      hasException = false;
+      try {
+        assertFalse(dfs.rename(new Path("/nqdir0/nqdir30"), tempPath));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+
+      // 14: Move /nqdir0/qdir1/qdir21 /nqdir0/qdir1/qdir20
+      assertTrue(dfs.rename(quotaDir3, quotaDir2));
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getDirectoryCount(), 4);
+      assertEquals(c.getQuota(), 6);
+      c = dfs.getContentSummary(quotaDir2);
+      assertEquals(c.getDirectoryCount(), 3);
+      assertEquals(c.getQuota(), 7);
+      tempPath = new Path(quotaDir2, "qdir21");
+      c = dfs.getContentSummary(tempPath);
+      assertEquals(c.getDirectoryCount(), 1);
+      assertEquals(c.getQuota(), 2);
+
+      // 15: Delete /nqdir0/qdir1/qdir20/qdir21
+      dfs.delete(tempPath, true);
+      c = dfs.getContentSummary(quotaDir2);
+      assertEquals(c.getDirectoryCount(), 2);
+      assertEquals(c.getQuota(), 7);
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getDirectoryCount(), 3);
+      assertEquals(c.getQuota(), 6);
+
+      // 16: Move /nqdir0/qdir30 /nqdir0/qdir1/qdir20
+      assertTrue(dfs.rename(new Path("/nqdir0/nqdir30"), quotaDir2));
+      c = dfs.getContentSummary(quotaDir2);
+      assertEquals(c.getDirectoryCount(), 5);
+      assertEquals(c.getQuota(), 7);
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getDirectoryCount(), 6);
+      assertEquals(c.getQuota(), 6);
+    } finally {
+      cluster.shutdown();
+    }
+  }
+}

+ 1 - 1
src/test/org/apache/hadoop/fs/TestFileSystem.java

@@ -32,7 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.dfs.MiniDFSCluster;
-import org.apache.hadoop.fs.FsShell.CommandFormat;
+import org.apache.hadoop.fs.shell.CommandFormat;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.UTF8;