Browse Source

HADOOP-3938. Disk space quotas for HDFS. This is similar to namespace
quotas in 0.18. (rangadi)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@697284 13f79535-47bb-0310-9956-ffa450edef68

Raghu Angadi 16 years ago
parent
commit
14ce6379e8

+ 3 - 0
CHANGES.txt

@@ -75,6 +75,9 @@ Trunk (unreleased changes)
     DFS Used%: DFS used space/Present Capacity
     (Suresh Srinivas via hairong)
 
+    HADOOP-3938. Disk space quotas for HDFS. This is similar to namespace
+    quotas in 0.18. (rangadi)
+
   NEW FEATURES
 
     HADOOP-3341. Allow streaming jobs to specify the field separator for map

+ 44 - 21
src/core/org/apache/hadoop/fs/ContentSummary.java

@@ -29,25 +29,28 @@ public class ContentSummary implements Writable{
   private long fileCount;
   private long directoryCount;
   private long quota;
+  private long spaceConsumed;
+  private long spaceQuota;
+  
 
   /** Constructor */
   public ContentSummary() {}
   
   /** Constructor */
   public ContentSummary(long length, long fileCount, long directoryCount) {
-    this.length = length;
-    this.fileCount = fileCount;
-    this.directoryCount = directoryCount;
-    this.quota = -1L;
+    this(length, fileCount, directoryCount, -1L, length, -1L);
   }
 
   /** Constructor */
   public ContentSummary(
-      long length, long fileCount, long directoryCount, long quota) {
+      long length, long fileCount, long directoryCount, long quota,
+      long spaceConsumed, long spaceQuota) {
     this.length = length;
     this.fileCount = fileCount;
     this.directoryCount = directoryCount;
     this.quota = quota;
+    this.spaceConsumed = spaceConsumed;
+    this.spaceQuota = spaceQuota;
   }
 
   /** @return the length */
@@ -62,12 +65,20 @@ public class ContentSummary implements Writable{
   /** Return the directory quota */
   public long getQuota() {return quota;}
   
+  /** Retuns (disk) space consumed */ 
+  public long getSpaceConsumed() {return spaceConsumed;}
+
+  /** Returns (disk) space quota */
+  public long getSpaceQuota() {return spaceQuota;}
+  
   /** {@inheritDoc} */
   public void write(DataOutput out) throws IOException {
     out.writeLong(length);
     out.writeLong(fileCount);
     out.writeLong(directoryCount);
     out.writeLong(quota);
+    out.writeLong(spaceConsumed);
+    out.writeLong(spaceQuota);
   }
 
   /** {@inheritDoc} */
@@ -76,6 +87,8 @@ public class ContentSummary implements Writable{
     this.fileCount = in.readLong();
     this.directoryCount = in.readLong();
     this.quota = in.readLong();
+    this.spaceConsumed = in.readLong();
+    this.spaceQuota = in.readLong();
   }
   
   /** 
@@ -86,20 +99,20 @@ public class ContentSummary implements Writable{
   private static final String STRING_FORMAT = "%12d %12d %18d ";
   /** 
    * Output format:
-   * <----12----> <----15----> <----12----> <----12----> <-------18------->
-   *    QUOTA   REMAINING_QUATA  DIR_COUNT   FILE_COUNT        CONTENT_SIZE FILE_NAME    
+   * <----12----> <----15----> <----15----> <----15----> <----12----> <----12----> <-------18------->
+   *    QUOTA   REMAINING_QUATA SPACE_QUOTA SPACE_QUOTA_REM DIR_COUNT   FILE_COUNT   CONTENT_SIZE     FILE_NAME    
    */
-  private static final String QUOTA_STRING_FORMAT = "%12d %15d "+STRING_FORMAT;
-  private static final String NON_QUOTA_STRING_FORMAT =
-    "%12s %15s "+STRING_FORMAT;
-
+  private static final String QUOTA_STRING_FORMAT = "%12s %15s ";
+  private static final String SPACE_QUOTA_STRING_FORMAT = "%15s %15s ";
+  
   /** The header string */
   private static final String HEADER = String.format(
       STRING_FORMAT.replace('d', 's'), "directories", "files", "bytes");
 
   private static final String QUOTA_HEADER = String.format(
-      QUOTA_STRING_FORMAT.replace('d', 's'), 
-      "quota", "remaining quota", "directories", "files", "bytes");
+      QUOTA_STRING_FORMAT + SPACE_QUOTA_STRING_FORMAT, 
+      "quota", "remaining quota", "space quota", "reamaining quota") +
+      HEADER;
   
   /** Return the header of the output.
    * if qOption is false, output directory count, file count, and content size;
@@ -125,17 +138,27 @@ public class ContentSummary implements Writable{
    * @return the string representation of the object
    */
   public String toString(boolean qOption) {
+    String prefix = "";
     if (qOption) {
+      String quotaStr = "none";
+      String quotaRem = "inf";
+      String spaceQuotaStr = "none";
+      String spaceQuotaRem = "inf";
+      
       if (quota>0) {
-        long remainingQuota = quota-(directoryCount+fileCount);
-        return String.format(QUOTA_STRING_FORMAT, quota, remainingQuota,
-            directoryCount, fileCount, length);
-      } else {
-        return String.format(NON_QUOTA_STRING_FORMAT, "none", "inf",
-            directoryCount, fileCount, length);
+        quotaStr = Long.toString(quota);
+        quotaRem = Long.toString(quota-(directoryCount+fileCount));
+      }
+      if (spaceQuota>0) {
+        spaceQuotaStr = Long.toString(spaceQuota);
+        spaceQuotaRem = Long.toString(spaceQuota - spaceConsumed);        
       }
-    } else {
-      return String.format(STRING_FORMAT, directoryCount, fileCount, length);
+      
+      prefix = String.format(QUOTA_STRING_FORMAT + SPACE_QUOTA_STRING_FORMAT, 
+                             quotaStr, quotaRem, spaceQuotaStr, spaceQuotaRem);
     }
+    
+    return prefix + String.format(STRING_FORMAT, directoryCount, 
+                                  fileCount, length);
   }
 }

+ 2 - 1
src/core/org/apache/hadoop/fs/shell/Count.java

@@ -34,7 +34,8 @@ public class Count extends Command {
       "Count the number of directories, files and bytes under the paths",
       "that match the specified file pattern.  The output columns are:",
       "DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME or",
-      "QUOTA REMAINING_QUATA DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME");
+      "QUOTA REMAINING_QUATA SPACE_QUOTA REMAINING_SPACE_QUOTA ",
+      "      DIR_COUNT FILE_COUNT CONTENT_SIZE FILE_NAME");
   
   private boolean qOption;
 

+ 96 - 75
src/docs/src/documentation/content/xdocs/hdfs_quota_admin_guide.xml

@@ -1,84 +1,105 @@
-<?xml version="1.0"?>
-<!--
-  Copyright 2002-2004 The Apache Software Foundation
+<?xml version="1.0"?> 
 
-  Licensed under the Apache License, Version 2.0 (the "License");
-  you may not use this file except in compliance with the License.
-  You may obtain a copy of the License at
+<!-- Copyright 2002-2004 The Apache Software Foundation
 
-      http://www.apache.org/licenses/LICENSE-2.0
+ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+License. You may obtain a copy of the License at
 
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
--->
+ http://www.apache.org/licenses/LICENSE-2.0
 
-<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN"
-          "http://forrest.apache.org/dtd/document-v20.dtd">
+ Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
+governing permissions and limitations under the License. -->
 
+<!DOCTYPE document PUBLIC "-//APACHE//DTD Documentation V2.0//EN" "http://forrest.apache.org/dtd/document-v20.dtd">
 
 <document>
 
-  <header>
-    <title>
-      Name Space Quotas Administrator Guide
-    </title>
-  </header>
-
-  <body>
-      <p>
-      The Hadoop Distributed File System (HDFS) allows the administrator to set quotas on individual directories. 
-      Newly created directories have no associated quota. 
-      The largest quota is <code>Long.Max_Value</code>. A quota of one forces a directory 
-      to remain empty.
-      </p>
-
-      <p>
-      The directory quota is a hard limit on the number of names in the tree 
-      rooted at that directory. File and directory creations fault if the quota 
-      would be exceeded. Quotas stick to renamed directories; the rename 
-      operation faults if operation would result in a quota violation. 
-      The attempt to set a quota faults if the directory would be in violation 
-      of the new quota.
-      </p>
-
-      <p>
-      Quotas are persistent with the fsimage. When starting, if the fsimage 
-      is immediately in violation of a quota (perhaps the fsimage was 
-      surreptitiously modified), the startup operation fails with an error report. 
-      Setting or removing a quota creates a journal entry.
-      </p> 
-
-      <p>
-      The following new commands or new options are added to support quotas. 
-      The first two are administration commands.
-      </p>
-
-      <ul>
-      <li>
-      <code>dfsadmin -setquota &lt;N> &lt;directory>...&lt;directory></code> 
-      <br />
-      Set the quota to be <code>N</code> for each directory. Best effort for each directory,
-      with faults reported if <code>N</code> is not a positive long integer, 
-      the directory does not exist or it is a file, or the directory would 
-      immediately exceed the new quota.
-      </li>
-  
-      <li>
-      <code>dfsadmin -clrquota &lt;directory>...&lt;director></code><br /> 
-      Remove any quota for each directory. Best effort for each directory, 
-      with faults reported if the directory does not exist or it is a file. 
-      It is not a fault if the directory has no quota.
-      </li>
-  
-      <li>
-      <code>fs -count -q &lt;directory>...&lt;directory></code><br />
-      With the <code>-q</code> option, also report the quota value set for each 
-      directory, and the available quota remaining. If the directory does not have 
-      a quota set, the reported values are <code>none</code> and <code>inf</code>.
-      </li>
-      </ul>
-   </body>
+ <header> <title> Directory Quotas Administrator's Guide </title> </header>
+
+ <body>
+
+ <p> The Hadoop Distributed File System (HDFS) allows the administrator to set quotas for the number of names used and the
+amount of space used for individual directories. Name quotas and space quotas operate independently, but the administration and
+implementation of the two types of quotas are closely parallel. </p>
+
+<section> <title>Name Quotas</title>
+
+ <p> The name quota is a hard limit on the number of file and directory names in the tree rooted at that directory. File and
+directory creations fail if the quota would be exceeded. Quotas stick with renamed directories; the rename operation fails if
+operation would result in a quota violation. The attempt to set a quota fails if the directory would be in violation of the new
+quota. A newly created directory has no associated quota. The largest quota is <code>Long.Max_Value</code>. A quota of one
+forces a directory to remain empty. (Yes, a directory counts against its own quota!) </p>
+
+ <p> Quotas are persistent with the <code>fsimage</code>. When starting, if the <code>fsimage</code> is immediately in
+violation of a quota (perhaps the <code>fsimage</code> was surreptitiously modified), 
+a warning is printed for each of such violations. Setting or removing a quota creates a journal entry. </p> </section>
+
+<section> <title>Space Quotas</title>
+
+ <p> The space quota is a hard limit on the number of bytes used by files in the tree rooted at that directory. Block
+allocations fail if the quota would not allow a full block to be written. Each replica of a block counts against the quota. Quotas
+stick with renamed directories; the rename operation fails if the operation would result in a quota violation. The attempt to
+set a quota fails if the directory would be in violation of the new quota. A newly created directory has no associated quota.
+The largest quota is <code>Long.Max_Value</code>. A quota of zero still permits files to be created, but no blocks can be added to the files.
+Directories don't use host file system space and don't count against the space quota. The host file system space used to save
+the file meta data is not counted against the quota. Quotas are charged at the intended replication factor for the file;
+changing the replication factor for a file will credit or debit quotas. </p>
+
+ <p> Quotas are persistent with the <code>fsimage</code>. When starting, if the <code>fsimage</code> is immediately in
+violation of a quota (perhaps the <code>fsimage</code> was surreptitiously modified), a warning is printed for
+each of such violations. Setting or removing a quota creates a journal entry. </p>
+
+ </section>
+
+<section>
+
+ <title>Administrative Commands</title>
+
+ <p> Quotas are managed by a set of commands available only to the administrator. </p>
+
+<ul>
+
+ <li> <code>dfsadmin -setquota &lt;N> &lt;directory>...&lt;directory></code> <br /> Set the name quota to be <code>N</code> for
+each directory. Best effort for each directory, with faults reported if <code>N</code> is not a positive long integer, the
+directory does not exist or it is a file, or the directory would immediately exceed the new quota. </li>
+
+ <li> <code>dfsadmin -clrquota &lt;directory>...&lt;director></code><br /> Remove any name quota for each directory. Best
+effort for each directory, with faults reported if the directory does not exist or it is a file. It is not a fault if the
+directory has no quota. </li>
+
+ <li> <code>dfsadmin -setspacequota &lt;N> &lt;directory>...&lt;directory></code> <br /> Set the space quota to be
+N&times;2<sup>30</sup> bytes (GB) for each directory. Best effort for each directory, with faults reported if <code>N</code> is
+neither zero nor a positive integer, the directory does not exist or it is a file, or the directory would immediately exceed
+the new quota. </li>
+
+ <li> <code>dfsadmin -clrspacequota &lt;directory>...&lt;director></code><br /> Remove any space quota for each directory. Best
+effort for each directory, with faults reported if the directory does not exist or it is a file. It is not a fault if the
+directory has no quota. </li>
+
+ </ul>
+
+</section>
+
+<section>
+
+ <title>Reporting Command</title>
+
+ <p> An an extension to the <code>count</code> command of the HDFS shell reports quota values and the current count of names and bytes in use. </p> 
+
+<ul>
+	
+	<li>
+
+ <code>fs -count -q &lt;directory>...&lt;directory></code><br /> With the <code>-q</code> option, also report the name quota
+value set for each directory, the available name quota remaining, the space quota value set, and the available space quota
+remaining. If the directory does not have a quota set, the reported values are <code>none</code> and <code>inf</code>. Space
+values are rounded to multiples of 2<sup>30</sup> bytes (GB).
+
+ </li>
+
+ </ul> </section>
+
+</body>
+
 </document>

+ 44 - 27
src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

@@ -500,7 +500,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     try {
       return namenode.setReplication(src, replication);
     } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class);
+      throw re.unwrapRemoteException(AccessControlException.class,
+                                     QuotaExceededException.class);
     }
   }
 
@@ -840,32 +841,24 @@ public class DFSClient implements FSConstants, java.io.Closeable {
   }
 
   /**
-   * Remove the quota for a directory
-   * @param path The string representation of the path to the directory
-   * @throws FileNotFoundException if the path is not a directory
+   * Sets or resets quotas for a directory.
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long)
    */
-  void clearQuota(String src) throws IOException {
-    try {
-      namenode.clearQuota(src);
-    } catch(RemoteException re) {
-      throw re.unwrapRemoteException(AccessControlException.class,
-                                     FileNotFoundException.class);
+  void setQuota(String src, long namespaceQuota, long diskspaceQuota) 
+                                                 throws IOException {
+    // sanity check
+    if ((namespaceQuota <= 0 && namespaceQuota != FSConstants.QUOTA_DONT_SET &&
+         namespaceQuota != FSConstants.QUOTA_RESET) ||
+        (diskspaceQuota <= 0 && diskspaceQuota != FSConstants.QUOTA_DONT_SET &&
+         diskspaceQuota != FSConstants.QUOTA_RESET)) {
+      throw new IllegalArgumentException("Invalid values for quota : " +
+                                         namespaceQuota + " and " + 
+                                         diskspaceQuota);
+                                         
     }
-  }
-  
-  /**
-   * Set the quota for a directory.
-   * @param path  The string representation of the path to the directory
-   * @param quota The limit of the number of names in the tree rooted 
-   *              at the directory
-   * @throws FileNotFoundException if the path is a file or 
-   *                               does not exist 
-   * @throws QuotaExceededException if the directory size 
-   *                                is greater than the given quota
-   */
-  void setQuota(String src, long quota) throws IOException {
+    
     try {
-      namenode.setQuota(src, quota);
+      namenode.setQuota(src, namespaceQuota, diskspaceQuota);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
                                      FileNotFoundException.class,
@@ -2008,7 +2001,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private DatanodeInfo[] nodes = null; // list of targets for current block
     private volatile boolean hasError = false;
     private volatile int errorIndex = 0;
-    private IOException lastException = null;
+    private volatile IOException lastException = null;
     private long artificialSlowdown = 0;
     private long lastFlushOffset = -1; // offset when flush was invoked
     private boolean persistBlocks = false; // persist blocks on namenode
@@ -2016,6 +2009,12 @@ public class DFSClient implements FSConstants, java.io.Closeable {
     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
     private volatile boolean appendChunk = false;   // appending to existing partial block
 
+    private void setLastException(IOException e) {
+      if (lastException == null) {
+        lastException = e;
+      }
+    }
+    
     private class Packet {
       ByteBuffer buffer;           // only one of buf and buffer is non-null
       byte[]  buf;
@@ -2206,6 +2205,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
             } catch (Throwable e) {
               LOG.warn("DataStreamer Exception: " + 
                        StringUtils.stringifyException(e));
+              if (e instanceof IOException) {
+                setLastException((IOException)e);
+              }
               hasError = true;
             }
           }
@@ -2334,6 +2336,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           } catch (Exception e) {
             if (!closed) {
               hasError = true;
+              if (e instanceof IOException) {
+                setLastException((IOException)e);
+              }
               LOG.warn("DFSOutputStream ResponseProcessor exception " + 
                        " for block " + block +
                         StringUtils.stringifyException(e));
@@ -2395,8 +2400,9 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       while (!success && clientRunning) {
         DatanodeInfo[] newnodes = null;
         if (nodes == null) {
-          lastException = new IOException("Could not get block locations. " +
-                                          "Aborting...");
+          String msg = "Could not get block locations. Aborting...";
+          LOG.warn(msg);
+          setLastException(new IOException(msg));
           closed = true;
           if (streamer != null) streamer.close();
           return false;
@@ -2473,6 +2479,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
         }
 
         this.hasError = false;
+        lastException = null;
         errorIndex = 0;
         success = createBlockOutputStream(nodes, clientName, true);
       }
@@ -2646,6 +2653,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
       boolean success;
       do {
         hasError = false;
+        lastException = null;
         errorIndex = 0;
         retry = false;
         nodes = null;
@@ -2756,6 +2764,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           }
         }
         hasError = true;
+        setLastException(ie);
         blockReplyStream = null;
         return false;  // error
       }
@@ -2771,6 +2780,14 @@ public class DFSClient implements FSConstants, java.io.Closeable {
           try {
             return namenode.addBlock(src, clientName);
           } catch (RemoteException e) {
+            IOException ue = 
+              e.unwrapRemoteException(FileNotFoundException.class,
+                                      AccessControlException.class,
+                                      QuotaExceededException.class);
+            if (ue != e) { 
+              throw ue; // no need to retry these exceptions
+            }
+            
             if (--retries == 0 && 
                 !NotReplicatedYetException.class.getName().
                 equals(e.getClassName())) {

+ 5 - 15
src/hdfs/org/apache/hadoop/hdfs/DistributedFileSystem.java

@@ -206,22 +206,12 @@ public class DistributedFileSystem extends FileSystem {
     return dfs.getContentSummary(getPathName(f));
   }
 
-  /** Clear a directory's quota
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#clearQuota(String)
+  /** Set a directory's quotas
+   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long) 
    */
-  public void clearQuota(Path src) throws IOException {
-    dfs.clearQuota(getPathName(src));
-  }
-  
-  /** Set a directory's quota
-   * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long) 
-   */
-  public void setQuota(Path src, long quota) throws IOException {
-    if (quota <= 0) {
-      throw new IllegalArgumentException("Quota should be a positive number: "
-          + quota);
-    }
-    dfs.setQuota(getPathName(src), quota);
+  public void setQuota(Path src, long namespaceQuota, long diskspaceQuota) 
+                       throws IOException {
+    dfs.setQuota(getPathName(src), namespaceQuota, diskspaceQuota);
   }
   
   private FileStatus makeQualified(FileStatus f) {

+ 15 - 12
src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java

@@ -39,9 +39,9 @@ public interface ClientProtocol extends VersionedProtocol {
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 39: removed DFSFileInfo (Use FileStatus instead)
+   * 40: added disk space quotas.
    */
-  public static final long versionID = 39L;
+  public static final long versionID = 40L;
   
   ///////////////////////////////////////
   // File contents
@@ -409,21 +409,24 @@ public interface ClientProtocol extends VersionedProtocol {
   /**
    * Set the quota for a directory.
    * @param path  The string representation of the path to the directory
-   * @param quota The limit of the number of names in the tree rooted 
-   *              at the directory
+   * @param namespaceQuota Limit on the number of names in the tree rooted 
+   *                       at the directory
+   * @param diskspaceQuota Limit on disk space occupied all the files under
+   *                       this directory. 
+   * <br><br>
+   *                       
+   * The quota can have three types of values : (1) 0 or more will set 
+   * the quota to that value, (2) {@link FSConstants#QUOTA_DONT_SET}  implies 
+   * the quota will not be changed, and (3) {@link FSConstants#QUOTA_RESET} 
+   * implies the quota will be reset. Any other value is a runtime error.
+   *                        
    * @throws FileNotFoundException if the path is a file or 
    *                               does not exist 
    * @throws QuotaExceededException if the directory size 
    *                                is greater than the given quota
    */
-  public void setQuota(String path, long quota) throws IOException;
-  
-  /**
-   * Remove the quota for a directory
-   * @param path The string representation of the path to the directory
-   * @throws FileNotFoundException if the path is not a directory
-   */
-  public void clearQuota(String path) throws IOException;
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota)
+                      throws IOException;
   
   /**
    * Write all metadata for this file into persistent storage.

+ 6 - 2
src/hdfs/org/apache/hadoop/hdfs/protocol/FSConstants.java

@@ -116,6 +116,10 @@ public interface FSConstants {
   // Chunk the block Invalidate message
   public static final int BLOCK_INVALIDATE_CHUNK = 100;
 
+  // Long that indicates "leave current quota unchanged"
+  public static final long QUOTA_DONT_SET = Long.MAX_VALUE;
+  public static final long QUOTA_RESET = -1L;
+  
   //
   // Timeouts, constants
   //
@@ -190,7 +194,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -17;
+  public static final int LAYOUT_VERSION = -18;
   // Current version: 
-  // Support Access time on files
+  // Support disk space quotas
 }

+ 21 - 9
src/hdfs/org/apache/hadoop/hdfs/protocol/QuotaExceededException.java

@@ -20,23 +20,33 @@ package org.apache.hadoop.hdfs.protocol;
 
 import java.io.IOException;
 
-/** This class is for the error when an attempt to add an inode to namespace 
- * violates the quota restriction of any inode on the path to the newly added
- * inode.
+/** 
+ * This exception is thrown when modification to HDFS results in violation
+ * of a directory quota. A directory quota might be namespace quota (limit 
+ * on number of files and directories) or a diskspace quota (limit on space 
+ * taken by all the file under the directory tree). <br> <br>
+ * 
+ * The message for the exception specifies the the directory where the quota
+ * was violated and actual quotas.
  */
 public final class QuotaExceededException extends IOException {
   private static final long serialVersionUID = 1L;
   private String pathName;
-  private long quota;
-  private long count;
+  private long nsQuota;
+  private long nsCount;
+  private long dsQuota;
+  private long diskspace;
   
   public QuotaExceededException(String msg) {
     super(msg);
   }
   
-  public QuotaExceededException(long quota, long count) {
-    this.quota = quota;
-    this.count = count;
+  public QuotaExceededException(long nsQuota, long nsCount,
+                                long dsQuota, long diskspace) {
+    this.nsQuota = nsQuota;
+    this.nsCount = nsCount;
+    this.dsQuota = dsQuota;
+    this.diskspace = diskspace;
   }
   
   public void setPathName(String path) {
@@ -47,7 +57,9 @@ public final class QuotaExceededException extends IOException {
     String msg = super.getMessage();
     if (msg == null) {
       return "The quota" + (pathName==null?"":(" of " + pathName)) + 
-          " is exceeded: quota=" + quota + " count=" + count;
+          " is exceeded: namespace quota=" + nsQuota + " file count=" + 
+          nsCount + ", diskspace quota=" + dsQuota + 
+          " diskspace=" + diskspace; 
     } else {
       return msg;
     }

+ 259 - 124
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -61,7 +61,7 @@ class FSDirectory implements FSConstants, Closeable {
   public FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) throws IOException {
     rootDir = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME,
         ns.createFsOwnerPermissions(new FsPermission((short)0755)),
-        Integer.MAX_VALUE);
+        Integer.MAX_VALUE, -1);
     this.fsImage = fsImage;
     namesystem = ns;
     initialize(conf);
@@ -154,7 +154,7 @@ class FSDirectory implements FSConstants, Closeable {
                                  preferredBlockSize, modTime, clientName, 
                                  clientMachine, clientNode);
     synchronized (rootDir) {
-      newNode = addNode(path, newNode, false);
+      newNode = addNode(path, newNode, -1, false);
     }
     if (newNode == null) {
       NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
@@ -180,14 +180,17 @@ class FSDirectory implements FSConstants, Closeable {
                             long atime,
                             long preferredBlockSize) {
     INode newNode;
+    long diskspace = -1; // unknown
     if (blocks == null)
       newNode = new INodeDirectory(permissions, modificationTime);
-    else 
+    else {
       newNode = new INodeFile(permissions, blocks.length, replication,
                               modificationTime, atime, preferredBlockSize);
+      diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
+    }
     synchronized (rootDir) {
       try {
-        newNode = addNode(path, newNode, false);
+        newNode = addNode(path, newNode, diskspace, false);
         if(newNode != null && blocks != null) {
           int nrBlocks = blocks.length;
           // Add file->block mapping
@@ -210,14 +213,16 @@ class FSDirectory implements FSConstants, Closeable {
                               short replication,
                               long modificationTime,
                               long atime,
-                              long quota,
+                              long nsQuota,
+                              long dsQuota,
                               long preferredBlockSize) {
+    // NOTE: This does not update space counts for parents
     // create new inode
     INode newNode;
     if (blocks == null) {
-      if (quota >= 0) {
+      if (nsQuota >= 0 || dsQuota >= 0) {
         newNode = new INodeDirectoryWithQuota(
-            permissions, modificationTime, quota);
+            permissions, modificationTime, nsQuota, dsQuota);
       } else {
         newNode = new INodeDirectory(permissions, modificationTime);
       }
@@ -249,12 +254,16 @@ class FSDirectory implements FSConstants, Closeable {
   /**
    * Add a block to the file. Returns a reference to the added block.
    */
-  Block addBlock(String path, INode file, Block block) throws IOException {
+  Block addBlock(String path, INode[] inodes, Block block) throws IOException {
     waitForReady();
 
     synchronized (rootDir) {
-      INodeFile fileNode = (INodeFile) file;
+      INodeFile fileNode = (INodeFile) inodes[inodes.length-1];
 
+      // check quota limits and updated space consumed
+      updateCount(inodes, inodes.length-1, 0, 
+                  fileNode.getPreferredBlockSize()*fileNode.getReplication());
+      
       // associate the new list of blocks with this file
       namesystem.blocksMap.addINode(block, fileNode);
       BlockInfo blockInfo = namesystem.blocksMap.getStoredBlock(block);
@@ -347,10 +356,8 @@ class FSDirectory implements FSConstants, Closeable {
    */
   boolean unprotectedRenameTo(String src, String dst, long timestamp) 
   throws QuotaExceededException {
-    byte[][] srcComponents = INode.getPathComponents(src);
-    INode[] srcInodes = new INode[srcComponents.length];
     synchronized (rootDir) {
-      rootDir.getExistingPathINodes(srcComponents, srcInodes);
+      INode[] srcInodes = rootDir.getExistingPathINodes(src);
 
       // check the validation of the source
       if (srcInodes[srcInodes.length-1] == null) {
@@ -463,13 +470,20 @@ class FSDirectory implements FSConstants, Closeable {
     oldReplication[0] = -1;
     Block[] fileBlocks = null;
     synchronized(rootDir) {
-      INode inode = rootDir.getNode(src);
+      INode[] inodes = rootDir.getExistingPathINodes(src);
+      INode inode = inodes[inodes.length - 1];
       if (inode == null)
         return null;
       if (inode.isDirectory())
         return null;
       INodeFile fileNode = (INodeFile)inode;
       oldReplication[0] = fileNode.getReplication();
+
+      // check disk quota
+      long dsDelta = (replication - oldReplication[0]) *
+           (fileNode.diskspaceConsumed()/oldReplication[0]);
+      updateCount(inodes, inodes.length-1, 0, dsDelta);
+
       fileNode.setReplication(replication);
       fileBlocks = fileNode.getBlocks();
     }
@@ -584,12 +598,9 @@ class FSDirectory implements FSConstants, Closeable {
    */ 
   INode unprotectedDelete(String src, long modificationTime) {
     src = normalizePath(src);
-    String[] names = INode.getPathNames(src);
-    byte[][] components = INode.getPathComponents(names);
-    INode[] inodes = new INode[components.length];
 
     synchronized (rootDir) {
-      rootDir.getExistingPathINodes(components, inodes);
+      INode[] inodes =  rootDir.getExistingPathINodes(src);
       INode targetNode = inodes[inodes.length-1];
 
       if (targetNode == null) { // non-existent src
@@ -630,8 +641,18 @@ class FSDirectory implements FSConstants, Closeable {
    * Replaces the specified inode with the specified one.
    */
   void replaceNode(String path, INodeFile oldnode, INodeFile newnode) 
-                      throws IOException {
+                                                   throws IOException {
+    replaceNode(path, oldnode, newnode, true);
+  }
+  
+  /**
+   * @see #replaceNode(String, INodeFile, INodeFile)
+   */
+  private void replaceNode(String path, INodeFile oldnode, INodeFile newnode,
+                           boolean updateDiskspace) throws IOException {    
     synchronized (rootDir) {
+      long dsOld = oldnode.diskspaceConsumed();
+      
       //
       // Remove the node from the namespace 
       //
@@ -641,7 +662,25 @@ class FSDirectory implements FSConstants, Closeable {
         throw new IOException("FSDirectory.replaceNode: " +
                               "failed to remove " + path);
       } 
+      
+      /* Currently oldnode and newnode are assumed to contain the same
+       * blocks. Otherwise, blocks need to be removed from the blocksMap.
+       */
+      
       rootDir.addNode(path, newnode); 
+
+      //check if disk space needs to be updated.
+      long dsNew = 0;
+      if (updateDiskspace && (dsNew = newnode.diskspaceConsumed()) != dsOld) {
+        try {
+          updateSpaceConsumed(path, 0, dsNew-dsOld);
+        } catch (QuotaExceededException e) {
+          // undo
+          replaceNode(path, newnode, oldnode, false);
+          throw e;
+        }
+      }
+      
       int index = 0;
       for (Block b : newnode.getBlocks()) {
         BlockInfo info = namesystem.blocksMap.addINode(b, newnode);
@@ -725,6 +764,24 @@ class FSDirectory implements FSConstants, Closeable {
     }
   }
 
+  /**
+   * Retrieve the existing INodes along the given path.
+   * 
+   * @param path the path to explore
+   * @return INodes array containing the existing INodes in the order they
+   *         appear when following the path from the root INode to the
+   *         deepest INodes. The array size will be the number of expected
+   *         components in the path, and non existing components will be
+   *         filled with null
+   *         
+   * @see INodeDirectory#getExistingPathINodes(byte[][], INode[])
+   */
+  INode[] getExistingPathINodes(String path) {
+    synchronized (rootDir){
+      return rootDir.getExistingPathINodes(path);
+    }
+  }
+  
   /** 
    * Check whether the filepath could be created
    */
@@ -751,39 +808,68 @@ class FSDirectory implements FSConstants, Closeable {
     }
   }
 
+  /** Updates namespace and diskspace consumed for all
+   * directories until the parent directory of file represented by path.
+   * 
+   * @param path path for the file.
+   * @param nsDelta the delta change of namespace
+   * @param dsDelta the delta change of diskspace
+   * @throws QuotaExceededException if the new count violates any quota limit
+   * @throws FileNotFound if path does not exist.
+   */
+  void updateSpaceConsumed(String path, long nsDelta, long dsDelta)
+                                         throws QuotaExceededException,
+                                                FileNotFoundException {
+    synchronized (rootDir) {
+      INode[] inodes = rootDir.getExistingPathINodes(path);
+      int len = inodes.length;
+      if (inodes[len - 1] == null) {
+        throw new FileNotFoundException(path + 
+                                        " does not exist under rootDir.");
+      }
+      updateCount(inodes, len-1, nsDelta, dsDelta);
+    }
+  }
+  
   /** update count of each inode with quota
    * 
    * @param inodes an array of inodes on a path
    * @param numOfINodes the number of inodes to update starting from index 0
-   * @param deltaCount the delta change of the count
+   * @param nsDelta the delta change of namespace
+   * @param dsDelta the delta change of diskspace
    * @throws QuotaExceededException if the new count violates any quota limit
    */
-  private static void updateCount(
-      INode[] inodes, int numOfINodes, long deltaCount )
-  throws QuotaExceededException {
+  private void updateCount(INode[] inodes, int numOfINodes, 
+                           long nsDelta, long dsDelta)
+                           throws QuotaExceededException {
+    if (!ready) {
+      //still intializing. do not check or update quotas.
+      return;
+    }
     if (numOfINodes>inodes.length) {
       numOfINodes = inodes.length;
     }
     // check existing components in the path  
-    List<INodeDirectoryWithQuota> inodesWithQuota = 
-      new ArrayList<INodeDirectoryWithQuota>(numOfINodes);
     int i=0;
     try {
       for(; i < numOfINodes; i++) {
-        if (inodes[i].getQuota() >= 0) { // a directory with quota
-          INodeDirectoryWithQuota quotaINode =(INodeDirectoryWithQuota)inodes[i]; 
-          quotaINode.updateNumItemsInTree(deltaCount);
-          inodesWithQuota.add(quotaINode);
+        if (inodes[i].isQuotaSet()) { // a directory with quota
+          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+          node.updateNumItemsInTree(nsDelta, dsDelta);
         }
       }
     } catch (QuotaExceededException e) {
-      for (INodeDirectoryWithQuota quotaINode:inodesWithQuota) {
+      e.setPathName(getFullPathName(inodes, i));
+      // undo updates
+      for( ; i-- > 0; ) {
         try {
-          quotaINode.updateNumItemsInTree(-deltaCount);
+          if (inodes[i].isQuotaSet()) { // a directory with quota
+            INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
+            node.updateNumItemsInTree(-nsDelta, -dsDelta);
+          }
         } catch (IOException ingored) {
         }
       }
-      e.setPathName(getFullPathName(inodes, i));
       throw e;
     }
   }
@@ -881,17 +967,19 @@ class FSDirectory implements FSConstants, Closeable {
         inheritPermission );
   }
   
-  /** Add a node child to the namespace. The full path name of the node is src. 
+  /** Add a node child to the namespace. The full path name of the node is src.
+   * childDiskspace should be -1, if unknown. 
    * QuotaExceededException is thrown if it violates quota limit */
   private <T extends INode> T addNode(String src, T child, 
-      boolean inheritPermission) 
+        long childDiskspace, boolean inheritPermission) 
   throws QuotaExceededException {
     byte[][] components = INode.getPathComponents(src);
     child.setLocalName(components[components.length-1]);
     INode[] inodes = new INode[components.length];
     synchronized (rootDir) {
       rootDir.getExistingPathINodes(components, inodes);
-      return addChild(inodes, inodes.length-1, child, inheritPermission);
+      return addChild(inodes, inodes.length-1, child, childDiskspace,
+                      inheritPermission);
     }
   }
   
@@ -900,12 +988,25 @@ class FSDirectory implements FSConstants, Closeable {
    * QuotaExceededException is thrown if it violates quota limit */
   private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
       boolean inheritPermission) throws QuotaExceededException {
-    long childSize = child.numItemsInTree();
-    updateCount(pathComponents, pos, childSize);
+    return addChild(pathComponents, pos, child, -1, inheritPermission);
+  }
+  
+  /** Add a node child to the inodes at index pos. 
+   * Its ancestors are stored at [0, pos-1]. 
+   * QuotaExceededException is thrown if it violates quota limit */
+  private <T extends INode> T addChild(INode[] pathComponents, int pos, T child,
+       long childDiskspace, boolean inheritPermission) throws QuotaExceededException {
+    INode.DirCounts counts = new INode.DirCounts();
+    child.spaceConsumedInTree(counts);
+    if (childDiskspace < 0) {
+      childDiskspace = counts.getDsCount();
+    }
+    updateCount(pathComponents, pos, counts.getNsCount(), childDiskspace);
     T addedNode = ((INodeDirectory)pathComponents[pos-1]).addChild(
         child, inheritPermission);
     if (addedNode == null) {
-      updateCount(pathComponents, pos, -childSize);
+      updateCount(pathComponents, pos, 
+                  -counts.getNsCount(), -childDiskspace);
     }
     return addedNode;
   }
@@ -920,8 +1021,10 @@ class FSDirectory implements FSConstants, Closeable {
     INode removedNode = 
       ((INodeDirectory)pathComponents[pos-1]).removeChild(pathComponents[pos]);
     if (removedNode != null) {
-      updateCount(pathComponents, pos, 
-          -removedNode.numItemsInTree());
+      INode.DirCounts counts = new INode.DirCounts();
+      removedNode.spaceConsumedInTree(counts);
+      updateCount(pathComponents, pos,
+                  -counts.getNsCount(), -counts.getDsCount());
     }
     return removedNode;
   }
@@ -952,117 +1055,149 @@ class FSDirectory implements FSConstants, Closeable {
    * A directory's count is defined as the total number inodes in the tree
    * rooted at the directory.
    * 
-   * @throws QuotaExceededException if the count update violates 
-   *                                any quota limitation
+   * This is an update of existing state of the filesystem and does not
+   * throw QuotaExceededException.
    */
-  void updateCountForINodeWithQuota() throws QuotaExceededException {
-    updateCountForINodeWithQuota(rootDir);
+  void updateCountForINodeWithQuota() {
+    updateCountForINodeWithQuota(rootDir, new INode.DirCounts(), 
+                                 new ArrayList<INode>(50));
   }
   
-  /** Update the count of the directory if it has a quota and return the count
+  /** 
+   * Update the count of the directory if it has a quota and return the count
    * 
-   * @param node the root of the tree that represents the directory
+   * This does not throw a QuotaExceededException. This is just an update
+   * of of existing state and throwing QuotaExceededException does not help
+   * with fixing the state, if there is a problem.
+   * 
+   * @param dir the root of the tree that represents the directory
+   * @param counters counters for name space and disk space
+   * @param nodesInPath INodes for the each of components in the path.
    * @return the size of the tree
-   * @throws QuotaExceededException if the count is greater than its quota
    */
-  private static long updateCountForINodeWithQuota(INode node) throws QuotaExceededException {
-    long count = 1L;
-    if (node.isDirectory()) {
-      INodeDirectory dNode = (INodeDirectory)node;
-      for (INode child : dNode.getChildren()) {
-        count += updateCountForINodeWithQuota(child);
-      }
-      if (dNode.getQuota()>=0) {
-        ((INodeDirectoryWithQuota)dNode).setCount(count);
+  private static void updateCountForINodeWithQuota(INodeDirectory dir, 
+                                               INode.DirCounts counts,
+                                               ArrayList<INode> nodesInPath) {
+    long parentNamespace = counts.nsCount;
+    long parentDiskspace = counts.dsCount;
+    
+    counts.nsCount = 1L;//for self. should not call node.spaceConsumedInTree()
+    counts.dsCount = 0L;
+    
+    /* We don't need nodesInPath if we could use 'parent' field in 
+     * INode. using 'parent' is not currently recommended. */
+    nodesInPath.add(dir);
+
+    for (INode child : dir.getChildren()) {
+      if (child.isDirectory()) {
+        updateCountForINodeWithQuota((INodeDirectory)child, 
+                                     counts, nodesInPath);
+      } else { // reduce recursive calls
+        counts.nsCount += 1;
+        counts.dsCount += ((INodeFile)child).diskspaceConsumed();
       }
     }
-    return count;
+      
+    if (dir.isQuotaSet()) {
+      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(counts.nsCount,
+                                                      counts.dsCount);
+
+      // check if quota is violated for some reason.
+      if ((dir.getNsQuota() >= 0 && counts.nsCount > dir.getNsQuota()) ||
+          (dir.getDsQuota() >= 0 && counts.dsCount > dir.getDsQuota())) {
+
+        // can only happen because of a software bug. the bug should be fixed.
+        StringBuilder path = new StringBuilder(512);
+        for (INode n : nodesInPath) {
+          path.append('/');
+          path.append(n.getLocalName());
+        }
+        
+        NameNode.LOG.warn("Unexpected quota violation in image for " + path + 
+                          " (Namespace quota : " + dir.getNsQuota() +
+                          " consumed : " + counts.nsCount + ")" +
+                          " (Diskspace quota : " + dir.getDsQuota() +
+                          " consumed : " + counts.dsCount + ").");
+      }            
+    }
+      
+    // pop 
+    nodesInPath.remove(nodesInPath.size()-1);
+    
+    counts.nsCount += parentNamespace;
+    counts.dsCount += parentDiskspace;
   }
   
   /**
-   * Set the quota for a directory.
-   * @param path The string representation of the path to the directory
-   * @param quota The limit of the number of names in or below the directory
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
+   * Sets quota for for a directory.
+   * @returns INodeDirectory if any of the quotas have changed. null other wise.
    * @throws FileNotFoundException if the path does not exist or is a file
    * @throws QuotaExceededException if the directory tree size is 
    *                                greater than the given quota
    */
-  void unprotectedSetQuota(String src, long quota)
-  throws FileNotFoundException, QuotaExceededException {
+  INodeDirectory unprotectedSetQuota(String src, long nsQuota, long dsQuota) 
+                       throws FileNotFoundException, QuotaExceededException {
+    // sanity check
+    if ((nsQuota < 0 && nsQuota != FSConstants.QUOTA_DONT_SET && 
+         nsQuota < FSConstants.QUOTA_RESET) || 
+        (dsQuota < 0 && dsQuota != FSConstants.QUOTA_DONT_SET && 
+          dsQuota < FSConstants.QUOTA_RESET)) {
+      throw new IllegalArgumentException("Illegal value for nsQuota or " +
+                                         "dsQuota : " + nsQuota + " and " +
+                                         dsQuota);
+    }
+    
     String srcs = normalizePath(src);
-    byte[][] components = INode.getPathComponents(src);
-    INode[] inodes = new INode[components.length==1?1:2];
-    synchronized (rootDir) {
-      rootDir.getExistingPathINodes(components, inodes);
-      INode targetNode = inodes[inodes.length-1];
-      if (targetNode == null) {
-        throw new FileNotFoundException("Directory does not exist: " + srcs);
-      } else if (!targetNode.isDirectory()) {
-        throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
-      } else { // a directory inode
-        INodeDirectory dirNode = (INodeDirectory)targetNode;
-        if (dirNode instanceof INodeDirectoryWithQuota) { 
-          // a directory with quota; so set the quota to the new value
-          ((INodeDirectoryWithQuota)dirNode).setQuota(quota);
-        } else {
-          // a non-quota directory; so replace it with a directory with quota
-          INodeDirectoryWithQuota newNode = 
-            new INodeDirectoryWithQuota(quota, dirNode);
-          // non-root directory node; parent != null
-          assert inodes.length==2;
-          INodeDirectory parent = (INodeDirectory)inodes[0];
-          parent.replaceChild(newNode);
-        }
+    INode[] inodes = rootDir.getExistingPathINodes(src);
+    INode targetNode = inodes[inodes.length-1];
+    if (targetNode == null) {
+      throw new FileNotFoundException("Directory does not exist: " + srcs);
+    } else if (!targetNode.isDirectory()) {
+      throw new FileNotFoundException("Cannot set quota on a file: " + srcs);  
+    } else { // a directory inode
+      INodeDirectory dirNode = (INodeDirectory)targetNode;
+      long oldNsQuota = dirNode.getNsQuota();
+      long oldDsQuota = dirNode.getDsQuota();
+      if (nsQuota == FSConstants.QUOTA_DONT_SET) {
+        nsQuota = oldNsQuota;
       }
+      if (dsQuota == FSConstants.QUOTA_DONT_SET) {
+        dsQuota = oldDsQuota;
+      }        
+
+      if (dirNode instanceof INodeDirectoryWithQuota) { 
+        // a directory with quota; so set the quota to the new value
+        ((INodeDirectoryWithQuota)dirNode).setQuota(nsQuota, dsQuota);
+      } else {
+        // a non-quota directory; so replace it with a directory with quota
+        INodeDirectoryWithQuota newNode = 
+          new INodeDirectoryWithQuota(nsQuota, dsQuota, dirNode);
+        // non-root directory node; parent != null
+        INodeDirectory parent = (INodeDirectory)inodes[inodes.length-2];
+        dirNode = newNode;
+        parent.replaceChild(newNode);
+      }
+      return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
     }
   }
   
   /**
-   * @see #unprotectedSetQuota(String, long)
-   */
-  void setQuota(String src, long quota) 
-  throws FileNotFoundException, QuotaExceededException {
-    unprotectedSetQuota(src, quota);
-    fsImage.getEditLog().logSetQuota(src, quota);
-  }
-  
-  /**
-   * Remove the quota for a directory
-   * @param src The string representation of the path to the directory
-   * @throws FileNotFoundException if the path does not exist or it is a file
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the 
+   * contract.
+   * @see #unprotectedSetQuota(String, long, long)
    */
-  void unprotectedClearQuota(String src) throws IOException {
-    String srcs = normalizePath(src);
-    byte[][] components = INode.getPathComponents(src);
-    INode[] inodes = new INode[components.length==1?1:2];
-    synchronized (rootDir) {
-      rootDir.getExistingPathINodes(components, inodes);
-      INode targetNode = inodes[inodes.length-1];
-      if (targetNode == null || !targetNode.isDirectory()) {
-        throw new FileNotFoundException("Directory does not exist: " + srcs);
-      } else if (targetNode instanceof INodeDirectoryWithQuota) {
-        // a directory inode with quota
-        // replace the directory with quota with a non-quota one
-        INodeDirectoryWithQuota dirNode = (INodeDirectoryWithQuota)targetNode;
-        INodeDirectory newNode = new INodeDirectory(dirNode);
-        if (dirNode == rootDir) { // root
-          throw new IOException("Can't clear the root's quota");
-        } else { // non-root directory node; parent != null
-          INodeDirectory parent = (INodeDirectory)inodes[0];
-          parent.replaceChild(newNode);
-        }
+  void setQuota(String src, long nsQuota, long dsQuota) 
+                throws FileNotFoundException, QuotaExceededException {
+    synchronized (rootDir) {    
+      INodeDirectory dir = unprotectedSetQuota(src, nsQuota, dsQuota);
+      if (dir != null) {
+        fsImage.getEditLog().logSetQuota(src, dir.getNsQuota(), 
+                                         dir.getDsQuota());
       }
     }
   }
   
-  /**
-   * @see #unprotectedClearQuota(String)
-   */
-  void clearQuota(String src) throws IOException {
-    unprotectedClearQuota(src);
-    fsImage.getEditLog().logClearQuota(src);
-  }
-  
   long totalInodes() {
     synchronized (rootDir) {
       return rootDir.numItemsInTree();

+ 24 - 17
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java

@@ -62,9 +62,12 @@ public class FSEditLog {
   private static final byte OP_SET_OWNER = 8;
   private static final byte OP_CLOSE = 9;    // close after write
   private static final byte OP_SET_GENSTAMP = 10;    // store genstamp
-  private static final byte OP_SET_QUOTA = 11; // set a directory's quota
-  private static final byte OP_CLEAR_QUOTA = 12; // clear a directory's quota
+  /* The following two are not used any more. Should be removed once
+   * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
+  private static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
+  private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
   private static final byte OP_TIMES = 13; // sets mod & access time on a file
+  private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
   private static int sizeFlushBuffer = 512*1024;
 
   private ArrayList<EditLogOutputStream> editStreams = null;
@@ -732,23 +735,34 @@ public class FSEditLog {
               FSImage.readString(in), FSImage.readString(in));
           break;
         }
-        case OP_SET_QUOTA: {
+        case OP_SET_NS_QUOTA: {
           if (logVersion > -16) {
             throw new IOException("Unexpected opcode " + opcode
                 + " for version " + logVersion);
           }
           fsDir.unprotectedSetQuota(FSImage.readString(in), 
-              readLongWritable(in) );
+                                    readLongWritable(in), 
+                                    FSConstants.QUOTA_DONT_SET);
           break;
         }
-        case OP_CLEAR_QUOTA: {
+        case OP_CLEAR_NS_QUOTA: {
           if (logVersion > -16) {
             throw new IOException("Unexpected opcode " + opcode
                 + " for version " + logVersion);
           }
-          fsDir.unprotectedClearQuota(FSImage.readString(in));
+          fsDir.unprotectedSetQuota(FSImage.readString(in),
+                                    FSConstants.QUOTA_RESET,
+                                    FSConstants.QUOTA_DONT_SET);
           break;
         }
+
+        case OP_SET_QUOTA:
+          fsDir.unprotectedSetQuota(FSImage.readString(in),
+                                    readLongWritable(in),
+                                    readLongWritable(in));
+                                      
+          break;
+
         case OP_TIMES: {
           numOpTimes++;
           int length = in.readInt();
@@ -1016,23 +1030,16 @@ public class FSEditLog {
             FSEditLog.toLogReplication(replication));
   }
   
-  /** Add set quota record to edit log
+  /** Add set namespace quota record to edit log
    * 
    * @param src the string representation of the path to a directory
    * @param quota the directory size limit
    */
-  void logSetQuota(String src, long quota) {
-    logEdit(OP_SET_QUOTA, new UTF8(src), new LongWritable(quota));
+  void logSetQuota(String src, long nsQuota, long dsQuota) {
+    logEdit(OP_SET_QUOTA, new UTF8(src), 
+            new LongWritable(nsQuota), new LongWritable(dsQuota));
   }
 
-  /** Add clear quota record to edit log
-   * 
-   * @param src the string representation of the path to a directory
-   */
-  void logClearQuota(String src) {
-    logEdit(OP_CLEAR_QUOTA, new UTF8(src));
-  }
-  
   /**  Add set permissions record to edit log */
   void logSetPermissions(String src, FsPermission permissions) {
     logEdit(OP_SET_PERMISSIONS, new UTF8(src), permissions);

+ 14 - 9
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -59,7 +59,6 @@ import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
 import org.apache.hadoop.hdfs.server.common.UpgradeManager;
-import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 
 /**
  * FSImage handles checkpointing and logging of the namespace edits.
@@ -893,9 +892,13 @@ public class FSImage extends Storage {
         }
         
         // get quota only when the node is a directory
-        long quota = -1L;
+        long nsQuota = -1L;
         if (imgVersion <= -16 && blocks == null) {
-          quota = in.readLong();
+          nsQuota = in.readLong();
+        }
+        long dsQuota = -1L;
+        if (imgVersion <= -18 && blocks == null) {
+          dsQuota = in.readLong();
         }
         
         PermissionStatus permissions = fsNamesys.getUpgradePermission();
@@ -904,8 +907,8 @@ public class FSImage extends Storage {
         }
         if (path.length() == 0) { // it is the root
           // update the root's attributes
-          if (quota != -1) {
-            fsDir.rootDir.setQuota(quota);
+          if (nsQuota != -1 || dsQuota != -1) {
+            fsDir.rootDir.setQuota(nsQuota, dsQuota);
           }
           fsDir.rootDir.setModificationTime(modificationTime);
           fsDir.rootDir.setPermissionStatus(permissions);
@@ -918,7 +921,8 @@ public class FSImage extends Storage {
         }
         // add new inode
         parentINode = fsDir.addToParent(path, parentINode, permissions,
-            blocks, replication, modificationTime, atime, quota, blockSize);
+                                        blocks, replication, modificationTime, 
+                                        atime, nsQuota, dsQuota, blockSize);
       }
       
       // load datanode info
@@ -927,8 +931,6 @@ public class FSImage extends Storage {
       // load Files Under Construction
       this.loadFilesUnderConstruction(imgVersion, in, fsNamesys);
       
-      // update the count of each directory with quota
-      fsDir.updateCountForINodeWithQuota();
     } finally {
       in.close();
     }
@@ -968,6 +970,8 @@ public class FSImage extends Storage {
       numEdits += FSEditLog.loadFSEdits(edits);
       edits.close();
     }
+    // update the counts.
+    FSNamesystem.getFSNamesystem().dir.updateCountForINodeWithQuota();    
     return numEdits;
   }
 
@@ -1109,7 +1113,8 @@ public class FSImage extends Storage {
       out.writeLong(0);   // access time
       out.writeLong(0);   // preferred block size
       out.writeInt(-1);    // # of blocks
-      out.writeLong(node.getQuota());
+      out.writeLong(node.getNsQuota());
+      out.writeLong(node.getDsQuota());
       FILE_PERM.fromShort(node.getFsPermissionShort());
       PermissionStatus.write(out, node.getUserName(),
                              node.getGroupName(),

+ 49 - 33
src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -296,7 +296,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   /**
    * Initialize FSNamesystem.
    */
-  private synchronized void initialize(NameNode nn, Configuration conf) throws IOException {
+  private void initialize(NameNode nn, Configuration conf) throws IOException {
     this.systemStart = now();
     this.startTime = new Date(systemStart); 
     setConfigurationParameters(conf);
@@ -417,7 +417,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   /**
    * Initializes some of the members from configuration
    */
-  private synchronized void setConfigurationParameters(Configuration conf) 
+  private void setConfigurationParameters(Configuration conf) 
                                           throws IOException {
     fsNamesystemObject = this;
     try {
@@ -1269,13 +1269,18 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
 
     // Allocate a new block and record it in the INode. 
     synchronized (this) {
-      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
+      INode[] pathINodes = dir.getExistingPathINodes(src);
+      int inodesLen = pathINodes.length;
+      checkLease(src, clientName, pathINodes[inodesLen-1]);
+      INodeFileUnderConstruction pendingFile  = (INodeFileUnderConstruction) 
+                                                pathINodes[inodesLen - 1];
+                                                           
       if (!checkFileProgress(pendingFile, false)) {
         throw new NotReplicatedYetException("Not replicated yet:" + src);
       }
 
       // allocate new block record block locations in INode.
-      newBlock = allocateBlock(src, pendingFile);
+      newBlock = allocateBlock(src, pathINodes);
       pendingFile.setTargets(targets);
       
       for (DatanodeDescriptor dn : targets) {
@@ -1305,11 +1310,18 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     return true;
   }
   
-  // make sure that we still have the lease on this file
-  private INodeFileUnderConstruction checkLease(String src, String holder
-      ) throws IOException {
-    INode file = dir.getFileINode(src);
-    if (file == null) {
+  // make sure that we still have the lease on this file.
+  private INodeFileUnderConstruction checkLease(String src, String holder) 
+                                                      throws IOException {
+    INodeFile file = dir.getFileINode(src);
+    checkLease(src, holder, file);
+    return (INodeFileUnderConstruction)file;
+  }
+
+  private void checkLease(String src, String holder, INode file) 
+                                                     throws IOException {
+
+    if (file == null || file.isDirectory()) {
       Lease lease = leaseManager.getLease(new StringBytesWritable(holder));
       throw new LeaseExpiredException("No lease on " + src +
                                       " File does not exist. " +
@@ -1330,7 +1342,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
       throw new LeaseExpiredException("Lease mismatch on " + src + " owned by "
           + pendingFile.getClientName() + " but is accessed by " + holder);
     }
-    return pendingFile;    
   }
 
   /**
@@ -1403,14 +1414,18 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
     
   /**
    * Allocate a block at the given pending filename
+   * 
+   * @param src path to the file
+   * @param indoes INode representing each of the components of src. 
+   *        <code>inodes[inodes.length-1]</code> is the INode for the file.
    */
-  private Block allocateBlock(String src, INode file) throws IOException {
+  private Block allocateBlock(String src, INode[] inodes) throws IOException {
     Block b = null;
     do {
       b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 
                     getGenerationStamp());
     } while (isValidBlock(b));
-    b = dir.addBlock(src, file, b);
+    b = dir.addBlock(src, inodes, b);
     NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
                                  +src+ ". "+b);
     return b;
@@ -1727,32 +1742,16 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
   }
 
   /**
-   * Set the quota for a directory.
-   * @param path The string representation of the path to the directory
-   * @param quota The limit of the number of names in or below the directory
-   * @throws IOException if the path is not a directory or the number of
-   * existing names in or below the directory is greater than the given quota
+   * Set the namespace quota and diskspace quota for a directory.
+   * See {@link ClientProtocol#setQuota(String, long, long)} for the 
+   * contract.
    */
-  void setQuota(String path, long quota) throws IOException {
+  void setQuota(String path, long nsQuota, long dsQuota) throws IOException {
     if (isPermissionEnabled) {
       checkSuperuserPrivilege();
     }
     
-    dir.setQuota(path, quota);
-    getEditLog().logSync();
-  }
-  
-  /**
-   * Remove the quota for a directory
-   * @param path The string representation of the path to the directory
-   * @throws IOException if the path is not a directory
-   */
-  void clearQuota(String path) throws IOException {
-    if (isPermissionEnabled) {
-      checkSuperuserPrivilege();
-    }
-    
-    dir.clearQuota(path);
+    dir.setQuota(path, nsQuota, dsQuota);
     getEditLog().logSync();
   }
   
@@ -2838,6 +2837,23 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean {
             LOG.warn("Error in deleting bad block " + block + e);
           }
         }
+        
+        //Updated space consumed if required.
+        INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
+        long diff = (file == null) ? 0 :
+                    (file.getPreferredBlockSize() - storedBlock.getNumBytes());
+        
+        if (diff > 0 && file.isUnderConstruction() &&
+            cursize < storedBlock.getNumBytes()) {
+          try {
+            String path = /* For finding parents */ 
+              leaseManager.findPath((INodeFileUnderConstruction)file);
+            dir.updateSpaceConsumed(path, 0, -diff*file.getReplication());
+          } catch (IOException e) {
+            LOG.warn("Unexpected exception while updating disk space : " +
+                     e.getMessage());
+          }
+        }
       }
       block = storedBlock;
     }

+ 35 - 17
src/hdfs/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -17,13 +17,9 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import java.io.FileNotFoundException;
 import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Arrays;
 import java.util.List;
-import java.io.IOException;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.ContentSummary;
@@ -31,8 +27,6 @@ import org.apache.hadoop.fs.permission.*;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
-import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 
 /**
@@ -46,6 +40,23 @@ public abstract class INode implements Comparable<byte[]> {
   protected long modificationTime;
   protected int accessTime; // precise to the last hour
 
+  /** Simple wrapper for two counters : 
+   *  nsCount (namespace consumed) and dsCount (diskspace consumed).
+   */
+  static class DirCounts {
+    long nsCount = 0;
+    long dsCount = 0;
+    
+    /** returns namespace count */
+    long getNsCount() {
+      return nsCount;
+    }
+    /** returns diskspace count */
+    long getDsCount() {
+      return dsCount;
+    }
+  }
+  
   //Only updated by updatePermissionStatus(...).
   //Other codes should not modify it.
   private long permission;
@@ -173,12 +184,13 @@ public abstract class INode implements Comparable<byte[]> {
 
   /** Compute {@link ContentSummary}. */
   public final ContentSummary computeContentSummary() {
-    long[] a = computeContentSummary(new long[]{0,0,0});
-    return new ContentSummary(a[0], a[1], a[2], getQuota());
+    long[] a = computeContentSummary(new long[]{0,0,0,0});
+    return new ContentSummary(a[0], a[1], a[2], getNsQuota(), 
+                              a[3], getDsQuota());
   }
   /**
    * @return an array of three longs. 
-   * 0: length, 1: file count, 2: directory count
+   * 0: length, 1: file count, 2: directory count 3: disk space
    */
   abstract long[] computeContentSummary(long[] summary);
   
@@ -186,19 +198,25 @@ public abstract class INode implements Comparable<byte[]> {
    * Get the quota set for this inode
    * @return the quota if it is set; -1 otherwise
    */
-  long getQuota() {
+  long getNsQuota() {
     return -1;
   }
 
+  long getDsQuota() {
+    return -1;
+  }
+  
+  boolean isQuotaSet() {
+    return getNsQuota() >= 0 || getDsQuota() >= 0;
+  }
+  
   /**
-   * Get the total number of names in the tree
-   * rooted at this inode including the root
-   * @return The total number of names in this tree
+   * Adds total nubmer of names and total disk space taken under 
+   * this tree to counts.
+   * Returns updated counts object.
    */
-  long numItemsInTree() {
-    return 1;
-  }
-    
+  abstract DirCounts spaceConsumedInTree(DirCounts counts);
+  
   /**
    * Get local file name
    * @return local file name

+ 11 - 11
src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -190,6 +190,8 @@ public class INodeDirectory extends INode {
    *         deepest INodes. The array size will be the number of expected
    *         components in the path, and non existing components will be
    *         filled with null
+   *         
+   * @see #getExistingPathINodes(byte[][], INode[])
    */
   INode[] getExistingPathINodes(String path) {
     byte[][] components = getPathComponents(path);
@@ -206,7 +208,7 @@ public class INodeDirectory extends INode {
    * @param node INode to insert
    * @param inheritPermission inherit permission from parent?
    * @return  null if the child with this name already exists; 
-   *          inserted INode, otherwise
+   *          node, otherwise
    */
   <T extends INode> T addChild(final T node, boolean inheritPermission) {
     if (inheritPermission) {
@@ -300,17 +302,15 @@ public class INodeDirectory extends INode {
     return parent;
   }
 
-  /**
-   */
-  long numItemsInTree() {
-    long total = 1L;
-    if (children == null) {
-      return total;
-    }
-    for (INode child : children) {
-      total += child.numItemsInTree();
+  /** {@inheritDoc} */
+  DirCounts spaceConsumedInTree(DirCounts counts) {
+    counts.nsCount += 1;
+    if (children != null) {
+      for (INode child : children) {
+        child.spaceConsumedInTree(counts);
+      }
     }
-    return total;
+    return counts;    
   }
 
   /** {@inheritDoc} */

+ 85 - 38
src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java

@@ -24,92 +24,139 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
  * Directory INode class that has a quota restriction
  */
 class INodeDirectoryWithQuota extends INodeDirectory {
-  private long quota;
-  private long count;
+  private long nsQuota; /// NameSpace quota
+  private long nsCount;
+  private long dsQuota; /// disk space quota
+  private long diskspace;
   
   /** Convert an existing directory inode to one with the given quota
    * 
-   * @param quota Quota to be assigned to this inode
+   * @param nsQuota Namespace quota to be assigned to this inode
+   * @param dsQuota Diskspace quota to be assigned to this indoe
    * @param other The other inode from which all other properties are copied
    */
-  INodeDirectoryWithQuota(long quota, INodeDirectory other)
+  INodeDirectoryWithQuota(long nsQuota, long dsQuota, INodeDirectory other)
   throws QuotaExceededException {
     super(other);
-    this.count = other.numItemsInTree();
-    setQuota(quota);
+    INode.DirCounts counts = new INode.DirCounts();
+    other.spaceConsumedInTree(counts);
+    this.nsCount= counts.getNsCount();
+    this.diskspace = counts.getDsCount();
+    setQuota(nsQuota, dsQuota);
   }
   
   /** constructor with no quota verification */
   INodeDirectoryWithQuota(
-      PermissionStatus permissions, long modificationTime, long quota)
+      PermissionStatus permissions, long modificationTime, 
+      long nsQuota, long dsQuota)
   {
     super(permissions, modificationTime);
-    this.quota = quota;
+    this.nsQuota = nsQuota;
+    this.dsQuota = dsQuota;
   }
   
   /** constructor with no quota verification */
-  INodeDirectoryWithQuota(String name, PermissionStatus permissions, long quota)
+  INodeDirectoryWithQuota(String name, PermissionStatus permissions, 
+                          long nsQuota, long dsQuota)
   {
     super(name, permissions);
-    this.quota = quota;
+    this.nsQuota = nsQuota;
+    this.dsQuota = dsQuota;
   }
   
-  /** Get this directory's quota
-   * @return this directory's quota
+  /** Get this directory's namespace quota
+   * @return this directory's namespace quota
    */
-  long getQuota() {
-    return quota;
+  long getNsQuota() {
+    return nsQuota;
+  }
+  
+  /** Get this directory's diskspace quota
+   * @return this directory's diskspace quota
+   */
+  long getDsQuota() {
+    return dsQuota;
   }
   
   /** Set this directory's quota
    * 
-   * @param quota Quota to be set
-   * @throws QuotaExceededException if the given quota is less than 
-   *                                the size of the tree
+   * @param nsQuota Namespace quota to be set
+   * @param dsQuota diskspace quota to be set
+   * @throws QuotaExceededException if quota is modified and the modified quota
+   *         is too low.
+   *                                
    */
-  void setQuota(long quota) throws QuotaExceededException {
-    verifyQuota(quota, this.count);
-    this.quota = quota;
+  void setQuota(long newNsQuota, long newDsQuota) throws QuotaExceededException {
+    // if a quota is not chaged, ignore that in verification.
+    if ((newNsQuota >=0 && newNsQuota != nsQuota && newNsQuota < nsCount)  ||
+        (newDsQuota >=0 && newDsQuota != dsQuota && newDsQuota < diskspace)) {
+      throw new QuotaExceededException(newNsQuota, nsCount, 
+                                       newDsQuota, diskspace);
+    }
+
+    nsQuota = newNsQuota;
+    dsQuota = newDsQuota;
   }
   
+  
+  @Override
+  DirCounts spaceConsumedInTree(DirCounts counts) {
+    counts.nsCount += nsCount;
+    counts.dsCount += diskspace;
+    return counts;
+  }
+
   /** Get the number of names in the subtree rooted at this directory
    * @return the size of the subtree rooted at this directory
    */
   long numItemsInTree() {
-    return count;
+    return nsCount;
+  }
+  
+  long diskspaceConsumed() {
+    return diskspace;
   }
   
   /** Update the size of the tree
    * 
-   * @param delta the change of the tree size
+   * @param nsDelta the change of the tree size
+   * @param dsDelta change to disk space occupied
    * @throws QuotaExceededException if the changed size is greater 
    *                                than the quota
    */
-  void updateNumItemsInTree(long delta) throws QuotaExceededException {
-    long newCount = this.count + delta;
-    if (delta>0) {
-      verifyQuota(this.quota, newCount);
+  void updateNumItemsInTree(long nsDelta, long dsDelta) throws 
+                            QuotaExceededException {
+    long newCount = nsCount + nsDelta;
+    long newDiskspace = diskspace + dsDelta;
+    if (nsDelta>0 || dsDelta>0) {
+      verifyQuota(nsQuota, newCount, dsQuota, newDiskspace);
     }
-    this.count = newCount;
+    nsCount = newCount;
+    diskspace = newDiskspace;
   }
   
-  /** Set the size of the tree rooted at this directory
+  /** 
+   * Sets namespace and diskspace take by the directory rooted 
+   * at this INode. This should be used carefully. It does not check 
+   * for quota violations.
    * 
-   * @param count size of the directory to be set
-   * @throws QuotaExceededException if the given count is greater than quota
+   * @param namespace size of the directory to be set
+   * @param diskspace disk space take by all the nodes under this directory
    */
-  void setCount(long count) throws QuotaExceededException {
-    verifyQuota(this.quota, count);
-    this.count = count;
+  void setSpaceConsumed(long namespace, long diskspace) {
+    this.nsCount = namespace;
+    this.diskspace = diskspace;
   }
   
-  /** Verify if the count satisfies the quota restriction 
+  /** Verify if the namespace count disk space satisfies the quota restriction 
    * @throws QuotaExceededException if the given quota is less than the count
    */
-  private static void verifyQuota(long quota, long count)
-  throws QuotaExceededException {
-    if (quota < count) {
-      throw new QuotaExceededException(quota, count);
+  private static void verifyQuota(long nsQuota, long nsCount, 
+                                  long dsQuota, long diskspace)
+                                  throws QuotaExceededException {
+    if ((nsQuota >= 0 && nsQuota < nsCount) || 
+        (dsQuota >= 0 && dsQuota < diskspace)) {
+      throw new QuotaExceededException(nsQuota, nsCount, dsQuota, diskspace);
     }
   }
 }

+ 31 - 0
src/hdfs/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -130,9 +130,40 @@ public class INodeFile extends INode {
     }
     summary[0] += bytes;
     summary[1]++;
+    summary[3] += diskspaceConsumed();
     return summary;
   }
 
+  
+
+  @Override
+  DirCounts spaceConsumedInTree(DirCounts counts) {
+    counts.nsCount += 1;
+    counts.dsCount += diskspaceConsumed();
+    return counts;
+  }
+
+  long diskspaceConsumed() {
+    return diskspaceConsumed(blocks);
+  }
+  
+  long diskspaceConsumed(Block[] blkArr) {
+    long size = 0;
+    for (Block blk : blkArr) {
+      if (blk != null) {
+        size += blk.getNumBytes();
+      }
+    }
+    /* If the last block is being written to, use prefferedBlockSize
+     * rather than the actual block size.
+     */
+    if (blkArr.length > 0 && blkArr[blkArr.length-1] != null && 
+        isUnderConstruction()) {
+      size += preferredBlockSize - blocks[blocks.length-1].getNumBytes();
+    }
+    return size * blockReplication;
+  }
+  
   /**
    * Get the preferred block size of the file.
    * @return the number of bytes

+ 3 - 7
src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -580,13 +580,9 @@ public class NameNode implements ClientProtocol, DatanodeProtocol,
   }
 
   /** {@inheritDoc} */
-  public void setQuota(String path, long quota) throws IOException {
-    namesystem.setQuota(path, quota);
-  }
-  
-  /** {@inheritDoc} */
-  public void clearQuota(String path) throws IOException {
-    namesystem.clearQuota(path);
+  public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
+                       throws IOException {
+    namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
   }
   
   /** {@inheritDoc} */

+ 126 - 2
src/hdfs/org/apache/hadoop/hdfs/tools/DFSAdmin.java

@@ -94,7 +94,7 @@ public class DFSAdmin extends FsShell {
 
     @Override
     public void run(Path path) throws IOException {
-      dfs.clearQuota(path);
+      dfs.setQuota(path, FSConstants.QUOTA_RESET, FSConstants.QUOTA_DONT_SET);
     }
   }
   
@@ -141,7 +141,111 @@ public class DFSAdmin extends FsShell {
 
     @Override
     public void run(Path path) throws IOException {
-      dfs.setQuota(path, quota);
+      dfs.setQuota(path, quota, FSConstants.QUOTA_DONT_SET);
+    }
+  }
+  
+  /** A class that supports command clearSpaceQuota */
+  private static class ClearSpaceQuotaCommand extends DFSAdminCommand {
+    private static final String NAME = "clrSpaceQuota";
+    private static final String USAGE = "-"+NAME+" <dirname>...<dirname>";
+    private static final String DESCRIPTION = USAGE + ": " +
+    "\tClear the disk space quota for each directory <dirName>.\n" +
+    "\t\tBest effort for the directory. with fault reported if\n" +
+    "\t\t1. the directory does not exist or is a file, or\n" +
+    "\t\t2. user is not an administrator.\n" +
+    "\t\tIt does not fault if the directory has no quota.";
+    
+    /** Constructor */
+    ClearSpaceQuotaCommand(String[] args, int pos, FileSystem fs) {
+      super(fs);
+      CommandFormat c = new CommandFormat(NAME, 1, Integer.MAX_VALUE);
+      List<String> parameters = c.parse(args, pos);
+      this.args = parameters.toArray(new String[parameters.size()]);
+    }
+    
+    /** Check if a command is the clrQuota command
+     * 
+     * @param cmd A string representation of a command starting with "-"
+     * @return true if this is a clrQuota command; false otherwise
+     */
+    public static boolean matches(String cmd) {
+      return ("-"+NAME).equals(cmd); 
+    }
+
+    @Override
+    public String getCommandName() {
+      return NAME;
+    }
+
+    @Override
+    public void run(Path path) throws IOException {
+      dfs.setQuota(path, FSConstants.QUOTA_DONT_SET, FSConstants.QUOTA_RESET);
+    }
+  }
+  
+  /** A class that supports command setQuota */
+  private static class SetSpaceQuotaCommand extends DFSAdminCommand {
+    private static final String NAME = "setSpaceQuota";
+    private static final String USAGE =
+      "-"+NAME+" <quota> <dirname>...<dirname>";
+    private static final String DESCRIPTION = USAGE +
+      "\tSet the dik space quota <quota> for each directory <dirName>.\n" + 
+      "\t\tThe directory quota is a long integer that puts a hard limit " +
+      "on the number of names in the directory tree.\n" +
+      "\t\tQuota can also be speciefied with MB, GB, or TB suffix" +
+      " (e.g. 100GB, 20TB).\n" + 
+      "\t\tBest effort for the directory, with faults reported if\n" +
+      "\t\t1. N is not a positive integer, or\n" +
+      "\t\t2. user is not an administrator, or\n" +
+      "\t\t3. the directory does not exist or is a file, or\n" +
+      "\t\t4. the directory would immediately exceed the new space quota.";
+    
+    private long quota; // the quota to be set
+    
+    /** Constructor */
+    SetSpaceQuotaCommand(String[] args, int pos, FileSystem fs) {
+      super(fs);
+      CommandFormat c = new CommandFormat(NAME, 2, Integer.MAX_VALUE);
+      List<String> parameters = c.parse(args, pos);
+      long multiplier = 1;
+      String str = parameters.remove(0).trim();
+      if (str.endsWith("TB")) {
+        multiplier = 1024 * 1024 * 1024 * 1024;
+      } else if (str.endsWith("GB")) {
+        multiplier = 1024 * 1024 * 1024;
+      } else if (str.endsWith("MB")) {
+        multiplier = 1024 * 1024;
+      }
+      if (multiplier != 1) {
+        str = str.substring(0, str.length()-2);
+      }
+      
+      quota = Long.parseLong(str);
+      if (quota > Long.MAX_VALUE/multiplier) {
+        throw new IllegalArgumentException("quota exceeds Long.MAX_VALUE!");
+      }
+      quota *= multiplier;
+      this.args = parameters.toArray(new String[parameters.size()]);
+    }
+    
+    /** Check if a command is the setQuota command
+     * 
+     * @param cmd A string representation of a command starting with "-"
+     * @return true if this is a count command; false otherwise
+     */
+    public static boolean matches(String cmd) {
+      return ("-"+NAME).equals(cmd); 
+    }
+
+    @Override
+    public String getCommandName() {
+      return NAME;
+    }
+
+    @Override
+    public void run(Path path) throws IOException {
+      dfs.setQuota(path, FSConstants.QUOTA_DONT_SET, quota);
     }
   }
   
@@ -293,6 +397,8 @@ public class DFSAdmin extends FsShell {
       "\t[-refreshNodes]\n" +
       "\t[" + SetQuotaCommand.USAGE + "]\n" +
       "\t[" + ClearQuotaCommand.USAGE +"]\n" +
+      "\t[" + SetSpaceQuotaCommand.USAGE + "]\n" +
+      "\t[" + ClearSpaceQuotaCommand.USAGE +"]\n" +
       "\t[-help [cmd]]\n";
 
     String report ="-report: \tReports basic filesystem information and statistics.\n";
@@ -354,6 +460,10 @@ public class DFSAdmin extends FsShell {
       System.out.println(SetQuotaCommand.DESCRIPTION);
     } else if (ClearQuotaCommand.matches(cmd)) {
       System.out.println(ClearQuotaCommand.DESCRIPTION);
+    } else if (SetSpaceQuotaCommand.matches(cmd)) {
+      System.out.println(SetSpaceQuotaCommand.DESCRIPTION);
+    } else if (ClearSpaceQuotaCommand.matches(cmd)) {
+      System.out.println(ClearSpaceQuotaCommand.DESCRIPTION);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -366,6 +476,8 @@ public class DFSAdmin extends FsShell {
       System.out.println(metaSave);
       System.out.println(SetQuotaCommand.DESCRIPTION);
       System.out.println(ClearQuotaCommand.DESCRIPTION);
+      System.out.println(SetSpaceQuotaCommand.DESCRIPTION);
+      System.out.println(ClearSpaceQuotaCommand.DESCRIPTION);
       System.out.println(help);
       System.out.println();
       ToolRunner.printGenericCommandUsage(System.out);
@@ -478,6 +590,12 @@ public class DFSAdmin extends FsShell {
     } else if (ClearQuotaCommand.matches(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " ["+ClearQuotaCommand.USAGE+"]");
+    } else if (SetSpaceQuotaCommand.matches(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [" + SetSpaceQuotaCommand.USAGE+"]");
+    } else if (ClearSpaceQuotaCommand.matches(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " ["+ClearSpaceQuotaCommand.USAGE+"]");
     } else {
       System.err.println("Usage: java DFSAdmin");
       System.err.println("           [-report]");
@@ -488,6 +606,8 @@ public class DFSAdmin extends FsShell {
       System.err.println("           [-metasave filename]");
       System.err.println("           ["+SetQuotaCommand.USAGE+"]");
       System.err.println("           ["+ClearQuotaCommand.USAGE+"]");
+      System.err.println("           ["+SetSpaceQuotaCommand.USAGE+"]");
+      System.err.println("           ["+ClearSpaceQuotaCommand.USAGE+"]");      
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -576,6 +696,10 @@ public class DFSAdmin extends FsShell {
         exitCode = new ClearQuotaCommand(argv, i, fs).runAll();
       } else if (SetQuotaCommand.matches(cmd)) {
         exitCode = new SetQuotaCommand(argv, i, fs).runAll();
+      } else if (ClearSpaceQuotaCommand.matches(cmd)) {
+        exitCode = new ClearSpaceQuotaCommand(argv, i, fs).runAll();
+      } else if (SetSpaceQuotaCommand.matches(cmd)) {
+        exitCode = new SetSpaceQuotaCommand(argv, i, fs).runAll();
       } else if ("-help".equals(cmd)) {
         if (i < argv.length) {
           printHelp(argv[i]);

+ 17 - 11
src/test/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -24,9 +24,9 @@ import java.io.FileReader;
 import java.io.IOException;
 import java.util.Random;
 import junit.framework.TestCase;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -124,18 +124,24 @@ public class DFSTestUtil extends TestCase {
       throw new IOException("Mkdirs failed to create " + 
                             fileName.getParent().toString());
     }
-    FSDataOutputStream out = fs.create(fileName, replFactor);
-    byte[] toWrite = new byte[1024];
-    Random rb = new Random(seed);
-    long bytesToWrite = fileLen;
-    while (bytesToWrite>0) {
-     rb.nextBytes(toWrite);
-     int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
+    FSDataOutputStream out = null;
+    try {
+      out = fs.create(fileName, replFactor);
+      byte[] toWrite = new byte[1024];
+      Random rb = new Random(seed);
+      long bytesToWrite = fileLen;
+      while (bytesToWrite>0) {
+        rb.nextBytes(toWrite);
+        int bytesToWriteNext = (1024<bytesToWrite)?1024:(int)bytesToWrite;
 
-     out.write(toWrite, 0, bytesToWriteNext);
-     bytesToWrite -= bytesToWriteNext;
+        out.write(toWrite, 0, bytesToWriteNext);
+        bytesToWrite -= bytesToWriteNext;
+      }
+      out.close();
+      out = null;
+    } finally {
+      IOUtils.closeStream(out);
     }
-    out.close();
   }
   
   /** check if the files have been copied correctly. */

+ 275 - 9
src/test/org/apache/hadoop/hdfs/TestQuota.java

@@ -23,14 +23,22 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 
 import junit.framework.TestCase;
 
 /** A class for testing quota-related commands */
 public class TestQuota extends TestCase {
+  
+  private void runCommand(DFSAdmin admin, boolean expectError, String... args) 
+                         throws Exception {
+    runCommand(admin, args, expectError);
+  }
+  
   private void runCommand(DFSAdmin admin, String args[], boolean expectEror)
   throws Exception {
     int val = admin.run(args);
@@ -41,9 +49,14 @@ public class TestQuota extends TestCase {
     }
   }
   
-  /** Test quota related commands: setQuota, clrQuota, and count */
+  /** Test quota related commands: 
+   *    setQuota, clrQuota, setSpaceQuota, clrSpaceQuota, and count 
+   */
   public void testQuotaCommands() throws Exception {
     final Configuration conf = new Configuration();
+    // set a smaller block size so that we can test with smaller 
+    // Space quotas
+    conf.set("dfs.block.size", "512");
     final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
     final FileSystem fs = cluster.getFileSystem();
     assertTrue("Not a HDFS: "+fs.getUri(),
@@ -52,11 +65,19 @@ public class TestQuota extends TestCase {
     DFSAdmin admin = new DFSAdmin(conf);
     
     try {
+      final int fileLen = 1024;
+      final short replication = 5;
+      final long spaceQuota = fileLen * replication * 15 / 8;
+
       // 1: create a directory /test and set its quota to be 3
       final Path parent = new Path("/test");
       assertTrue(dfs.mkdirs(parent));
       String[] args = new String[]{"-setQuota", "3", parent.toString()};
       runCommand(admin, args, false);
+
+      // set diskspace quota to 10000 
+      runCommand(admin, false, "-setSpaceQuota", 
+                 Long.toString(spaceQuota), parent.toString());
       
       // 2: create directory /test/data0
       final Path childDir0 = new Path(parent, "data0");
@@ -64,18 +85,22 @@ public class TestQuota extends TestCase {
 
       // 3: create a file /test/datafile0
       final Path childFile0 = new Path(parent, "datafile0");
-      OutputStream fout = dfs.create(childFile0);
-      fout.close();
+      DFSTestUtil.createFile(fs, childFile0, fileLen, replication, 0);
       
       // 4: count -q /test
       ContentSummary c = dfs.getContentSummary(parent);
       assertEquals(c.getFileCount()+c.getDirectoryCount(), 3);
       assertEquals(c.getQuota(), 3);
+      assertEquals(c.getSpaceConsumed(), fileLen*replication);
+      assertEquals(c.getSpaceQuota(), spaceQuota);
       
       // 5: count -q /test/data0
       c = dfs.getContentSummary(childDir0);
       assertEquals(c.getFileCount()+c.getDirectoryCount(), 1);
       assertEquals(c.getQuota(), -1);
+      // check disk space consumed
+      c = dfs.getContentSummary(parent);
+      assertEquals(c.getSpaceConsumed(), fileLen*replication);
 
       // 6: create a directory /test/data1
       final Path childDir1 = new Path(parent, "data1");
@@ -87,6 +112,8 @@ public class TestQuota extends TestCase {
       }
       assertTrue(hasException);
       
+      OutputStream fout;
+      
       // 7: create a file /test/datafile1
       final Path childFile1 = new Path(parent, "datafile1");
       hasException = false;
@@ -101,6 +128,7 @@ public class TestQuota extends TestCase {
       runCommand(admin, new String[]{"-clrQuota", parent.toString()}, false);
       c = dfs.getContentSummary(parent);
       assertEquals(c.getQuota(), -1);
+      assertEquals(c.getSpaceQuota(), spaceQuota);
       
       // 9: clear quota /test/data0
       runCommand(admin, new String[]{"-clrQuota", childDir0.toString()}, false);
@@ -108,12 +136,36 @@ public class TestQuota extends TestCase {
       assertEquals(c.getQuota(), -1);
       
       // 10: create a file /test/datafile1
-      fout = dfs.create(childFile1);
-      fout.close();
+      fout = dfs.create(childFile1, replication);
+      
+      // 10.s: but writing fileLen bytes should result in an quota exception
+      hasException = false;
+      try {
+        fout.write(new byte[fileLen]);
+        fout.close();
+      } catch (QuotaExceededException e) {
+        hasException = true;
+        IOUtils.closeStream(fout);
+      }
+      assertTrue(hasException);
+      
+      //delete the file
+      dfs.delete(childFile1, false);
+      
+      // 9.s: clear diskspace quota
+      runCommand(admin, false, "-clrSpaceQuota", parent.toString());
+      c = dfs.getContentSummary(parent);
+      assertEquals(c.getQuota(), -1);
+      assertEquals(c.getSpaceQuota(), -1);       
+      
+      // now creating childFile1 should succeed
+      DFSTestUtil.createFile(dfs, childFile1, fileLen, replication, 0);
       
       // 11: set the quota of /test to be 1
       args = new String[]{"-setQuota", "1", parent.toString()};
       runCommand(admin, args, true);
+      runCommand(admin, true, "-setSpaceQuota",  // for space quota
+                 Integer.toString(fileLen), args[2]);
       
       // 12: set the quota of /test/data0 to be 1
       args = new String[]{"-setQuota", "1", childDir0.toString()};
@@ -136,35 +188,49 @@ public class TestQuota extends TestCase {
       assertFalse(dfs.exists(nonExistentPath));
       args = new String[]{"-setQuota", "1", nonExistentPath.toString()};
       runCommand(admin, args, true);
+      runCommand(admin, true, "-setSpaceQuota", "1GB", // for space quota
+                 nonExistentPath.toString());
       
       // 14b: set quota on a file
       assertTrue(dfs.isFile(childFile0));
       args[1] = childFile0.toString();
       runCommand(admin, args, true);
+      // same for space quota
+      runCommand(admin, true, "-setSpaceQuota", "1GB", args[1]);
       
       // 15a: clear quota on a file
       args[0] = "-clrQuota";
       runCommand(admin, args, true);
+      runCommand(admin, true, "-clrSpaceQuota", args[1]);
       
       // 15b: clear quota on a non-existent directory
       args[1] = nonExistentPath.toString();
       runCommand(admin, args, true);
-
+      runCommand(admin, true, "-clrSpaceQuota", args[1]);
+      
       // 16a: set the quota of /test to be 0
       args = new String[]{"-setQuota", "0", parent.toString()};
       runCommand(admin, args, true);
+      runCommand(admin, true, "-setSpaceQuota", "0", args[2]);
       
       // 16b: set the quota of /test to be -1
       args[1] = "-1";
       runCommand(admin, args, true);
+      runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
       
       // 16c: set the quota of /test to be Long.MAX_VALUE+1
       args[1] = String.valueOf(Long.MAX_VALUE+1L);
       runCommand(admin, args, true);
+      runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
       
       // 16d: set the quota of /test to be a non integer
       args[1] = "33aa1.5";
       runCommand(admin, args, true);
+      runCommand(admin, true, "-setSpaceQuota", args[1], args[2]);
+      
+      // 16e: set space quota with a value larger than Long.MAX_VALUE
+      runCommand(admin, true, "-setSpaceQuota", 
+                 (Long.MAX_VALUE/1024/1024 + 1024) + "TB", args[2]);
       
       // 17:  setQuota by a non-administrator
       UnixUserGroupInformation.saveToConf(conf, 
@@ -173,10 +239,12 @@ public class TestQuota extends TestCase {
       DFSAdmin userAdmin = new DFSAdmin(conf);
       args[1] = "100";
       runCommand(userAdmin, args, true);
+      runCommand(userAdmin, true, "-setSpaceQuota", "1GB", args[2]);
       
       // 18: clrQuota by a non-administrator
       args = new String[] {"-clrQuota", parent.toString()};
       runCommand(userAdmin, args, true);
+      runCommand(userAdmin, true, "-clrSpaceQuota",  args[1]);      
     } finally {
       cluster.shutdown();
     }
@@ -198,14 +266,14 @@ public class TestQuota extends TestCase {
 
       // 2: set the quota of /nqdir0/qdir1 to be 6
       final Path quotaDir1 = new Path("/nqdir0/qdir1");
-      dfs.setQuota(quotaDir1, 6);
+      dfs.setQuota(quotaDir1, 6, FSConstants.QUOTA_DONT_SET);
       ContentSummary c = dfs.getContentSummary(quotaDir1);
       assertEquals(c.getDirectoryCount(), 3);
       assertEquals(c.getQuota(), 6);
 
       // 3: set the quota of /nqdir0/qdir1/qdir20 to be 7
       final Path quotaDir2 = new Path("/nqdir0/qdir1/qdir20");
-      dfs.setQuota(quotaDir2, 7);
+      dfs.setQuota(quotaDir2, 7, FSConstants.QUOTA_DONT_SET);
       c = dfs.getContentSummary(quotaDir2);
       assertEquals(c.getDirectoryCount(), 2);
       assertEquals(c.getQuota(), 7);
@@ -213,7 +281,7 @@ public class TestQuota extends TestCase {
       // 4: Create directory /nqdir0/qdir1/qdir21 and set its quota to 2
       final Path quotaDir3 = new Path("/nqdir0/qdir1/qdir21");
       assertTrue(dfs.mkdirs(quotaDir3));
-      dfs.setQuota(quotaDir3, 2);
+      dfs.setQuota(quotaDir3, 2, FSConstants.QUOTA_DONT_SET);
       c = dfs.getContentSummary(quotaDir3);
       assertEquals(c.getDirectoryCount(), 1);
       assertEquals(c.getQuota(), 2);
@@ -345,4 +413,202 @@ public class TestQuota extends TestCase {
       cluster.shutdown();
     }
   }
+  
+  /**
+   * Test HDFS operations that change disk space consumed by a directory tree.
+   * namely create, rename, delete, append, and setReplication.
+   * 
+   * This is based on testNamespaceCommands() above.
+   */
+  public void testSpaceCommands() throws Exception {
+    final Configuration conf = new Configuration();
+    // set a smaller block size so that we can test with smaller 
+    // diskspace quotas
+    conf.set("dfs.block.size", "512");
+    final MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null);
+    final FileSystem fs = cluster.getFileSystem();
+    assertTrue("Not a HDFS: "+fs.getUri(),
+                fs instanceof DistributedFileSystem);
+    final DistributedFileSystem dfs = (DistributedFileSystem)fs;
+
+    try {
+      int fileLen = 1024;
+      short replication = 3;
+      int fileSpace = fileLen * replication;
+      
+      // create directory /nqdir0/qdir1/qdir20/nqdir30
+      assertTrue(dfs.mkdirs(new Path("/nqdir0/qdir1/qdir20/nqdir30")));
+
+      // set the quota of /nqdir0/qdir1 to 4 * fileSpace 
+      final Path quotaDir1 = new Path("/nqdir0/qdir1");
+      dfs.setQuota(quotaDir1, FSConstants.QUOTA_DONT_SET, 4 * fileSpace);
+      ContentSummary c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getSpaceQuota(), 4 * fileSpace);
+      
+      // set the quota of /nqdir0/qdir1/qdir20 to 6 * fileSpace 
+      final Path quotaDir20 = new Path("/nqdir0/qdir1/qdir20");
+      dfs.setQuota(quotaDir20, FSConstants.QUOTA_DONT_SET, 6 * fileSpace);
+      c = dfs.getContentSummary(quotaDir20);
+      assertEquals(c.getSpaceQuota(), 6 * fileSpace);
+
+
+      // Create /nqdir0/qdir1/qdir21 and set its space quota to 2 * fileSpace
+      final Path quotaDir21 = new Path("/nqdir0/qdir1/qdir21");
+      assertTrue(dfs.mkdirs(quotaDir21));
+      dfs.setQuota(quotaDir21, FSConstants.QUOTA_DONT_SET, 2 * fileSpace);
+      c = dfs.getContentSummary(quotaDir21);
+      assertEquals(c.getSpaceQuota(), 2 * fileSpace);
+
+      // 5: Create directory /nqdir0/qdir1/qdir21/nqdir32
+      Path tempPath = new Path(quotaDir21, "nqdir32");
+      assertTrue(dfs.mkdirs(tempPath));
+      
+      // create a file under nqdir32/fileDir
+      DFSTestUtil.createFile(dfs, new Path(tempPath, "fileDir/file1"), fileLen, 
+                             replication, 0);
+      c = dfs.getContentSummary(quotaDir21);
+      assertEquals(c.getSpaceConsumed(), fileSpace);
+      
+      // Create a larger file /nqdir0/qdir1/qdir21/nqdir33/
+      boolean hasException = false;
+      try {
+        DFSTestUtil.createFile(dfs, new Path(quotaDir21, "nqdir33/file2"), 
+                               2*fileLen, replication, 0);
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+      // delete nqdir33
+      assertTrue(dfs.delete(new Path(quotaDir21, "nqdir33"), true));
+      c = dfs.getContentSummary(quotaDir21);
+      assertEquals(c.getSpaceConsumed(), fileSpace);
+      assertEquals(c.getSpaceQuota(), 2*fileSpace);
+
+      // Verify space before the move:
+      c = dfs.getContentSummary(quotaDir20);
+      assertEquals(c.getSpaceConsumed(), 0);
+      
+      // Move /nqdir0/qdir1/qdir21/nqdir32 /nqdir0/qdir1/qdir20/nqdir30
+      Path dstPath = new Path(quotaDir20, "nqdir30");
+      Path srcPath = new Path(quotaDir21, "nqdir32");
+      assertTrue(dfs.rename(srcPath, dstPath));
+      
+      // verify space after the move
+      c = dfs.getContentSummary(quotaDir20);
+      assertEquals(c.getSpaceConsumed(), fileSpace);
+      // verify space for its parent
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getSpaceConsumed(), fileSpace);
+      // verify space for source for the move
+      c = dfs.getContentSummary(quotaDir21);
+      assertEquals(c.getSpaceConsumed(), 0);
+      
+      final Path file2 = new Path(dstPath, "fileDir/file2");
+      int file2Len = 2 * fileLen;
+      // create a larger file under /nqdir0/qdir1/qdir20/nqdir30
+      DFSTestUtil.createFile(dfs, file2, file2Len, replication, 0);
+      
+      c = dfs.getContentSummary(quotaDir20);
+      assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
+      c = dfs.getContentSummary(quotaDir21);
+      assertEquals(c.getSpaceConsumed(), 0);
+      
+      // Reverse: Move /nqdir0/qdir1/qdir20/nqdir30 to /nqdir0/qdir1/qdir21/
+      hasException = false;
+      try {
+        assertFalse(dfs.rename(dstPath, srcPath));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+      
+      // make sure no intermediate directories left by failed rename
+      assertFalse(dfs.exists(srcPath));
+      // directory should exist
+      assertTrue(dfs.exists(dstPath));
+            
+      // verify space after the failed move
+      c = dfs.getContentSummary(quotaDir20);
+      assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
+      c = dfs.getContentSummary(quotaDir21);
+      assertEquals(c.getSpaceConsumed(), 0);
+      
+      // Test Append :
+      
+      // verify space quota
+      c = dfs.getContentSummary(quotaDir1);
+      assertEquals(c.getSpaceQuota(), 4 * fileSpace);
+      
+      // verify space before append;
+      c = dfs.getContentSummary(dstPath);
+      assertEquals(c.getSpaceConsumed(), 3 * fileSpace);
+      
+      OutputStream out = dfs.append(file2);
+      // appending 1 fileLen should succeed
+      out.write(new byte[fileLen]);
+      out.close();
+      
+      file2Len += fileLen; // after append
+      
+      // verify space after append;
+      c = dfs.getContentSummary(dstPath);
+      assertEquals(c.getSpaceConsumed(), 4 * fileSpace);
+      
+      // now increase the quota for quotaDir1
+      dfs.setQuota(quotaDir1, FSConstants.QUOTA_DONT_SET, 5 * fileSpace);
+      // Now, appending more than 1 fileLen should result in an error
+      out = dfs.append(file2);
+      hasException = false;
+      try {
+        out.write(new byte[fileLen + 1024]);
+        out.flush();
+        out.close();
+      } catch (QuotaExceededException e) {
+        hasException = true;
+        IOUtils.closeStream(out);
+      }
+      assertTrue(hasException);
+      
+      file2Len += fileLen; // after partial append
+      
+      // verify space after partial append
+      c = dfs.getContentSummary(dstPath);
+      assertEquals(c.getSpaceConsumed(), 5 * fileSpace);
+      
+      // Test set replication :
+      
+      // first reduce the replication
+      dfs.setReplication(file2, (short)(replication-1));
+      
+      // verify that space is reduced by file2Len
+      c = dfs.getContentSummary(dstPath);
+      assertEquals(c.getSpaceConsumed(), 5 * fileSpace - file2Len);
+      
+      // now try to increase the replication and and expect an error.
+      hasException = false;
+      try {
+        dfs.setReplication(file2, (short)(replication+1));
+      } catch (QuotaExceededException e) {
+        hasException = true;
+      }
+      assertTrue(hasException);
+
+      // verify space consumed remains unchanged.
+      c = dfs.getContentSummary(dstPath);
+      assertEquals(c.getSpaceConsumed(), 5 * fileSpace - file2Len);
+      
+      // now increase the quota for quotaDir1 and quotaDir20
+      dfs.setQuota(quotaDir1, FSConstants.QUOTA_DONT_SET, 10 * fileSpace);
+      dfs.setQuota(quotaDir20, FSConstants.QUOTA_DONT_SET, 10 * fileSpace);
+      
+      // then increasing replication should be ok.
+      dfs.setReplication(file2, (short)(replication+1));
+      // verify increase in space
+      c = dfs.getContentSummary(dstPath);
+      assertEquals(c.getSpaceConsumed(), 5 * fileSpace + file2Len);
+      
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }