Explorar o código

HDFS-4503. Update computeContentSummary(..), spaceConsumedInTree(..) and diskspaceConsumed(..) in INode for snapshot.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1448373 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze %!s(int64=12) %!d(string=hai) anos
pai
achega
fac3883188
Modificáronse 21 ficheiros con 700 adicións e 335 borrados
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt
  2. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
  3. 170 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EnumCounters.java
  4. 47 148
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  5. 69 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
  6. 6 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  7. 84 28
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
  8. 17 28
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
  9. 43 21
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
  10. 117 50
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  11. 17 9
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
  12. 58 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java
  13. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java
  14. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java
  15. 12 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshot.java
  16. 19 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
  17. 25 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  18. 0 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
  19. 0 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
  20. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/Diff.java
  21. 6 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-2802.txt

@@ -161,3 +161,6 @@ Branch-2802 Snapshot (Unreleased)
   HDFS-4487. Fix snapshot diff report for HDFS-4446.  (Jing Zhao via szetszwo)
 
   HDFS-4431. Support snapshot in OfflineImageViewer.  (Jing Zhao via szetszwo)
+
+  HDFS-4503. Update computeContentSummary(..), spaceConsumedInTree(..) and
+  diskspaceConsumed(..) in INode for snapshot.  (szetszwo)

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java

@@ -217,7 +217,7 @@ public class BackupImage extends FSImage {
       }
       lastAppliedTxId = logLoader.getLastAppliedTxId();
 
-      namesystem.dir.updateCountForINodeWithQuota(); // inefficient!
+      FSImage.updateCountForQuota(namesystem.dir.rootDir); // inefficient!
     } finally {
       backupInputStream.clear();
     }

+ 170 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EnumCounters.java

@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.HashMap;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Counters for an enum type.
+ * 
+ * For example, suppose there is an enum type
+ * <pre>
+ * enum Fruit { APPLE, ORANGE, GRAPE }
+ * </pre>
+ * An {@link EnumCounters} object can be created for counting the numbers of
+ * APPLE, ORANGLE and GRAPE.
+ *
+ * @param <E> the enum type
+ */
+public class EnumCounters<E extends Enum<E>> {
+  /** An array of enum constants. */
+  private final E[] enumConstants;
+  /** The counter array, counters[i] corresponds to the enumConstants[i]. */
+  private final long[] counters;
+
+  /**
+   * Construct counters for the given enum constants.
+   * @param enumConstants an array of enum constants such that, 
+   *                      for all i, enumConstants[i].ordinal() == i.
+   */
+  public EnumCounters(final E[] enumConstants) {
+    for(int i = 0; i < enumConstants.length; i++) {
+      Preconditions.checkArgument(enumConstants[i].ordinal() == i);
+    }
+    this.enumConstants = enumConstants;
+    this.counters = new long[enumConstants.length];
+  }
+  
+  /** @return the value of counter e. */
+  public final long get(final E e) {
+    return counters[e.ordinal()];
+  }
+
+  /** Negate all counters. */
+  public final void negation() {
+    for(int i = 0; i < counters.length; i++) {
+      counters[i] = -counters[i];
+    }
+  }
+  
+  /** Set counter e to the given value. */
+  public final void set(final E e, final long value) {
+    counters[e.ordinal()] = value;
+  }
+
+  /** Add the given value to counter e. */
+  public final void add(final E e, final long value) {
+    counters[e.ordinal()] += value;
+  }
+
+  /** Add that counters to this counters. */
+  public final void add(final EnumCounters<E> that) {
+    for(int i = 0; i < counters.length; i++) {
+      this.counters[i] += that.counters[i];
+    }
+  }
+
+  /** Subtract the given value from counter e. */
+  public final void subtract(final E e, final long value) {
+    counters[e.ordinal()] -= value;
+  }
+
+  /** Subtract that counters from this counters. */
+  public final void subtract(final EnumCounters<E> that) {
+    for(int i = 0; i < counters.length; i++) {
+      this.counters[i] -= that.counters[i];
+    }
+  }
+
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder();
+    for(int i = 0; i < counters.length; i++) {
+      final String name = enumConstants[i].name();
+      b.append(name).append("=").append(counters[i]).append(", ");
+    }
+    return b.substring(0, b.length() - 2);
+  }
+
+  /**
+   * A factory for creating counters.
+   * 
+   * @param <E> the enum type
+   * @param <C> the counter type
+   */
+  public static interface Factory<E extends Enum<E>,
+                                  C extends EnumCounters<E>> {
+    /** Create a new counters instance. */
+    public C newInstance(); 
+  }
+
+  /**
+   * A key-value map which maps the keys to {@link EnumCounters}.
+   * Note that null key is supported.
+   *
+   * @param <K> the key type
+   * @param <E> the enum type
+   * @param <C> the counter type
+   */
+  public static class Map<K, E extends Enum<E>, C extends EnumCounters<E>> {
+    /** The factory for creating counters. */
+    private final Factory<E, C> factory;
+    /** Key-to-Counts map. */
+    private final java.util.Map<K, C> counts = new HashMap<K, C>();
+    
+    /** Construct a map. */
+    public Map(final Factory<E, C> factory) {
+      this.factory = factory;
+    }
+
+    /** @return the counters for the given key. */
+    public final C getCounts(final K key) {
+      C c = counts.get(key);
+      if (c == null) {
+        c = factory.newInstance();
+        counts.put(key, c); 
+      }
+      return c;
+    }
+    
+    /** @return the sum of the values of all the counters. */
+    public final C sum() {
+      final C sum = factory.newInstance();
+      for(C c : counts.values()) {
+        sum.add(c);
+      }
+      return sum;
+    }
+    
+    /** @return the sum of the values of all the counters for e. */
+    public final long sum(final E e) {
+      long sum = 0;
+      for(C c : counts.values()) {
+        sum += c.get(e);
+      }
+      return sum;
+    }
+
+    @Override
+    public String toString() {
+      return counts.toString();
+    }
+  }
+}

+ 47 - 148
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -334,8 +334,7 @@ public class FSDirectory implements Closeable {
           INodeFileUnderConstruction.valueOf(inodesInPath.getLastINode(), path);
 
       // check quota limits and updated space consumed
-      updateCount(inodesInPath, 0,
-          fileINode.getPreferredBlockSize()*fileINode.getFileReplication(), true);
+      updateCount(inodesInPath, 0, 0, fileINode.getBlockDiskspace(), true);
 
       // associate new last block for the file
       BlockInfoUnderConstruction blockInfo =
@@ -426,8 +425,7 @@ public class FSDirectory implements Closeable {
 
     // update space consumed
     final INodesInPath iip = rootDir.getINodesInPath4Write(path, true);
-    updateCount(iip, 0,
-        -fileNode.getPreferredBlockSize()*fileNode.getFileReplication(), true);
+    updateCount(iip, 0, 0, -fileNode.getBlockDiskspace(), true);
   }
 
   /**
@@ -1457,21 +1455,16 @@ public class FSDirectory implements Closeable {
     try {
       updateCount(inodesInPath, numOfINodes, nsDelta, dsDelta, false);
     } catch (QuotaExceededException e) {
-      NameNode.LOG.warn("FSDirectory.updateCountNoQuotaCheck - unexpected ", e);
+      NameNode.LOG.error("BUG: unexpected exception ", e);
     }
   }
   
   /**
    * updates quota without verification
    * callers responsibility is to make sure quota is not exceeded
-   * @param inodes
-   * @param numOfINodes
-   * @param nsDelta
-   * @param dsDelta
    */
-  private void unprotectedUpdateCount(INodesInPath inodesInPath,
+  private static void unprotectedUpdateCount(INodesInPath inodesInPath,
       int numOfINodes, long nsDelta, long dsDelta) {
-    assert hasWriteLock();
     final INode[] inodes = inodesInPath.getINodes();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
@@ -1482,7 +1475,7 @@ public class FSDirectory implements Closeable {
   }
   
   /** Return the name of the path represented by inodes at [0, pos] */
-  private static String getFullPathName(INode[] inodes, int pos) {
+  static String getFullPathName(INode[] inodes, int pos) {
     StringBuilder fullPathName = new StringBuilder();
     if (inodes[0].isRoot()) {
       if (pos == 0) return Path.SEPARATOR;
@@ -1710,36 +1703,27 @@ public class FSDirectory implements Closeable {
    *          Pass null if a node is not being moved.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  private void verifyQuota(INode[] inodes, int pos, long nsDelta, long dsDelta,
-      INode commonAncestor) throws QuotaExceededException {
-    if (!ready) {
-      // Do not check quota if edits log is still being processed
-      return;
-    }
+  private static void verifyQuota(INode[] inodes, int pos, long nsDelta,
+      long dsDelta, INode commonAncestor) throws QuotaExceededException {
     if (nsDelta <= 0 && dsDelta <= 0) {
       // if quota is being freed or not being consumed
       return;
     }
-    if (pos>inodes.length) {
-      pos = inodes.length;
-    }
-    int i = pos - 1;
-    try {
-      // check existing components in the path  
-      for(; i >= 0; i--) {
-        if (commonAncestor == inodes[i]) {
-          // Moving an existing node. Stop checking for quota when common
-          // ancestor is reached
-          return;
-        }
-        if (inodes[i].isQuotaSet()) { // a directory with quota
-          INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i]; 
-          node.verifyQuota(nsDelta, dsDelta);
+
+    // check existing components in the path
+    for(int i = (pos > inodes.length? inodes.length: pos) - 1; i >= 0; i--) {
+      if (commonAncestor == inodes[i]) {
+        // Stop checking for quota when common ancestor is reached
+        return;
+      }
+      if (inodes[i].isQuotaSet()) { // a directory with quota
+        try {
+          ((INodeDirectoryWithQuota)inodes[i]).verifyQuota(nsDelta, dsDelta);
+        } catch (QuotaExceededException e) {
+          e.setPathName(getFullPathName(inodes, i));
+          throw e;
         }
       }
-    } catch (QuotaExceededException e) {
-      e.setPathName(getFullPathName(inodes, i));
-      throw e;
     }
   }
   
@@ -1747,36 +1731,29 @@ public class FSDirectory implements Closeable {
    * Verify quota for rename operation where srcInodes[srcInodes.length-1] moves
    * dstInodes[dstInodes.length-1]
    * 
-   * @param srcInodes directory from where node is being moved.
-   * @param dstInodes directory to where node is moved to.
+   * @param src directory from where node is being moved.
+   * @param dst directory to where node is moved to.
    * @throws QuotaExceededException if quota limit is exceeded.
    */
-  private void verifyQuotaForRename(INode[] srcInodes, INode[]dstInodes)
+  private void verifyQuotaForRename(INode[] src, INode[] dst)
       throws QuotaExceededException {
     if (!ready) {
       // Do not check quota if edits log is still being processed
       return;
     }
-    INode srcInode = srcInodes[srcInodes.length - 1];
-    INode commonAncestor = null;
-    for(int i =0;srcInodes[i] == dstInodes[i]; i++) {
-      commonAncestor = srcInodes[i];
-    }
-    INode.DirCounts srcCounts = new INode.DirCounts();
-    srcInode.spaceConsumedInTree(srcCounts);
-    long nsDelta = srcCounts.getNsCount();
-    long dsDelta = srcCounts.getDsCount();
+    int i = 0;
+    for(; src[i] == dst[i]; i++);
+    // src[i - 1] is the last common ancestor.
+
+    final Quota.Counts delta = src[src.length - 1].computeQuotaUsage();
     
     // Reduce the required quota by dst that is being removed
-    INode dstInode = dstInodes[dstInodes.length - 1];
-    if (dstInode != null) {
-      INode.DirCounts dstCounts = new INode.DirCounts();
-      dstInode.spaceConsumedInTree(dstCounts);
-      nsDelta -= dstCounts.getNsCount();
-      dsDelta -= dstCounts.getDsCount();
+    final int dstIndex = dst.length - 1;
+    if (dst[dstIndex] != null) {
+      delta.subtract(dst[dstIndex].computeQuotaUsage());
     }
-    verifyQuota(dstInodes, dstInodes.length - 1, nsDelta, dsDelta,
-        commonAncestor);
+    verifyQuota(dst, dstIndex, delta.get(Quota.NAMESPACE),
+        delta.get(Quota.DISKSPACE), src[i - 1]);
   }
   
   /**
@@ -1844,16 +1821,14 @@ public class FSDirectory implements Closeable {
       verifyFsLimits(inodes, pos, child);
     }
     
-    INode.DirCounts counts = new INode.DirCounts();
-    child.spaceConsumedInTree(counts);
-    updateCount(inodesInPath, pos, counts.getNsCount(), counts.getDsCount(), checkQuota);
-    if (inodes[pos-1] == null) {
-      throw new NullPointerException("Panic: parent does not exist");
-    }
+    final Quota.Counts counts = child.computeQuotaUsage();
+    updateCount(inodesInPath, pos,
+        counts.get(Quota.NAMESPACE), counts.get(Quota.DISKSPACE), checkQuota);
     final boolean added = ((INodeDirectory)inodes[pos-1]).addChild(child, true,
         inodesInPath.getLatestSnapshot());
     if (!added) {
-      updateCount(inodesInPath, pos, -counts.getNsCount(), -counts.getDsCount(), true);
+      updateCountNoQuotaCheck(inodesInPath, pos,
+          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
     }
     return added;
   }
@@ -1881,10 +1856,9 @@ public class FSDirectory implements Closeable {
       Preconditions.checkState(removedNode == inodes[pos]);
 
       inodesInPath.setINode(pos - 1, removedNode.getParent());
-      INode.DirCounts counts = new INode.DirCounts();
-      removedNode.spaceConsumedInTree(counts);
+      final Quota.Counts counts = removedNode.computeQuotaUsage();
       updateCountNoQuotaCheck(inodesInPath, pos,
-                  -counts.getNsCount(), -counts.getDsCount());
+          -counts.get(Quota.NAMESPACE), -counts.get(Quota.DISKSPACE));
     }
     return removedNode;
   }
@@ -1914,84 +1888,6 @@ public class FSDirectory implements Closeable {
       readUnlock();
     }
   }
-
-  /** Update the count of each directory with quota in the namespace
-   * A directory's count is defined as the total number inodes in the tree
-   * rooted at the directory.
-   * 
-   * This is an update of existing state of the filesystem and does not
-   * throw QuotaExceededException.
-   */
-  void updateCountForINodeWithQuota() {
-    updateCountForINodeWithQuota(rootDir, new INode.DirCounts(), 
-                                 new ArrayList<INode>(50));
-  }
-  
-  /** 
-   * Update the count of the directory if it has a quota and return the count
-   * 
-   * This does not throw a QuotaExceededException. This is just an update
-   * of of existing state and throwing QuotaExceededException does not help
-   * with fixing the state, if there is a problem.
-   * 
-   * @param dir the root of the tree that represents the directory
-   * @param counters counters for name space and disk space
-   * @param nodesInPath INodes for the each of components in the path.
-   */
-  private static void updateCountForINodeWithQuota(INodeDirectory dir, 
-                                               INode.DirCounts counts,
-                                               ArrayList<INode> nodesInPath) {
-    long parentNamespace = counts.nsCount;
-    long parentDiskspace = counts.dsCount;
-    
-    counts.nsCount = 1L;//for self. should not call node.spaceConsumedInTree()
-    counts.dsCount = 0L;
-    
-    /* We don't need nodesInPath if we could use 'parent' field in 
-     * INode. using 'parent' is not currently recommended. */
-    nodesInPath.add(dir);
-
-    for (INode child : dir.getChildrenList(null)) {
-      if (child.isDirectory()) {
-        updateCountForINodeWithQuota((INodeDirectory)child, 
-                                     counts, nodesInPath);
-      } else if (child.isSymlink()) {
-        counts.nsCount += 1;
-      } else { // reduce recursive calls
-        counts.nsCount += 1;
-        counts.dsCount += ((INodeFile)child).diskspaceConsumed();
-      }
-    }
-      
-    if (dir.isQuotaSet()) {
-      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(counts.nsCount,
-                                                      counts.dsCount);
-
-      // check if quota is violated for some reason.
-      if ((dir.getNsQuota() >= 0 && counts.nsCount > dir.getNsQuota()) ||
-          (dir.getDsQuota() >= 0 && counts.dsCount > dir.getDsQuota())) {
-
-        // can only happen because of a software bug. the bug should be fixed.
-        StringBuilder path = new StringBuilder(512);
-        for (INode n : nodesInPath) {
-          path.append('/');
-          path.append(n.getLocalName());
-        }
-        
-        NameNode.LOG.warn("Quota violation in image for " + path + 
-                          " (Namespace quota : " + dir.getNsQuota() +
-                          " consumed : " + counts.nsCount + ")" +
-                          " (Diskspace quota : " + dir.getDsQuota() +
-                          " consumed : " + counts.dsCount + ").");
-      }            
-    }
-      
-    // pop 
-    nodesInPath.remove(nodesInPath.size()-1);
-    
-    counts.nsCount += parentNamespace;
-    counts.dsCount += parentDiskspace;
-  }
   
   /**
    * See {@link ClientProtocol#setQuota(String, long, long)} for the contract.
@@ -2169,7 +2065,7 @@ public class FSDirectory implements Closeable {
      long blocksize = 0;
      if (node instanceof INodeFile) {
        INodeFile fileNode = (INodeFile)node;
-       size = fileNode.computeFileSize(true, snapshot);
+       size = fileNode.computeFileSize(snapshot);
        replication = fileNode.getFileReplication(snapshot);
        blocksize = fileNode.getPreferredBlockSize();
      }
@@ -2200,12 +2096,15 @@ public class FSDirectory implements Closeable {
       LocatedBlocks loc = null;
       if (node instanceof INodeFile) {
         INodeFile fileNode = (INodeFile)node;
-        size = fileNode.computeFileSize(true, snapshot);
+        size = fileNode.computeFileSize(snapshot);
         replication = fileNode.getFileReplication(snapshot);
         blocksize = fileNode.getPreferredBlockSize();
+
+        final boolean isUc = fileNode.isUnderConstruction();
+        final long fileSize = snapshot == null && isUc?
+            fileNode.computeFileSizeNotIncludingLastUcBlock(): size;
         loc = getFSNamesystem().getBlockManager().createLocatedBlocks(
-            fileNode.getBlocks(), fileNode.computeFileSize(false, snapshot),
-            fileNode.isUnderConstruction(), 0L, size, false);
+            fileNode.getBlocks(), fileSize, isUc, 0L, size, false);
         if (loc==null) {
           loc = new LocatedBlocks();
         }

+ 69 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java

@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Stack;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -714,11 +715,78 @@ public class FSImage implements Closeable {
     } finally {
       FSEditLog.closeAllStreams(editStreams);
       // update the counts
-      target.dir.updateCountForINodeWithQuota();   
+      updateCountForQuota(target.dir.rootDir);   
     }
     return lastAppliedTxId - prevLastAppliedTxId;
   }
 
+  /** Update the count of each directory with quota in the namespace
+   * A directory's count is defined as the total number inodes in the tree
+   * rooted at the directory.
+   * 
+   * This is an update of existing state of the filesystem and does not
+   * throw QuotaExceededException.
+   */
+  static void updateCountForQuota(INodeDirectoryWithQuota root) {
+    updateCountForINodeWithQuota(root, new Quota.Counts(), new Stack<INode>());
+  }
+  
+  /** 
+   * Update the count of the directory if it has a quota and return the count
+   * 
+   * This does not throw a QuotaExceededException. This is just an update
+   * of of existing state and throwing QuotaExceededException does not help
+   * with fixing the state, if there is a problem.
+   * 
+   * @param dir the root of the tree that represents the directory
+   * @param counters counters for name space and disk space
+   * @param stack INodes for the each of components in the path.
+   */
+  private static void updateCountForINodeWithQuota(INodeDirectory dir,
+      Quota.Counts counts, Stack<INode> stack) {
+    // The stack is not needed since we could use the 'parent' field in INode.
+    // However, using 'parent' is not recommended.
+    stack.push(dir);
+
+    final long parentNamespace = counts.get(Quota.NAMESPACE);
+    final long parentDiskspace = counts.get(Quota.DISKSPACE);
+    
+    counts.add(Quota.NAMESPACE, 1);
+    for (INode child : dir.getChildrenList(null)) {
+      if (child.isDirectory()) {
+        updateCountForINodeWithQuota((INodeDirectory)child, counts, stack);
+      } else {
+        // file or symlink: count here to reduce recursive calls.
+        counts.add(Quota.NAMESPACE, 1);
+        if (child.isFile()) {
+          counts.add(Quota.DISKSPACE, ((INodeFile)child).diskspaceConsumed());
+        }
+      }
+    }
+      
+    if (dir.isQuotaSet()) {
+      // check if quota is violated. It indicates a software bug.
+      final long namespace = counts.get(Quota.NAMESPACE) - parentNamespace;
+      if (Quota.isViolated(dir.getNsQuota(), namespace)) {
+        final INode[] inodes = stack.toArray(new INode[stack.size()]);
+        LOG.error("BUG: Namespace quota violation in image for "
+            + FSDirectory.getFullPathName(inodes, inodes.length)
+            + " quota = " + dir.getNsQuota() + " < consumed = " + namespace);
+      }
+
+      final long diskspace = counts.get(Quota.DISKSPACE) - parentDiskspace;
+      if (Quota.isViolated(dir.getDsQuota(), diskspace)) {
+        final INode[] inodes = stack.toArray(new INode[stack.size()]);
+        LOG.error("BUG: Diskspace quota violation in image for "
+            + FSDirectory.getFullPathName(inodes, inodes.length)
+            + " quota = " + dir.getDsQuota() + " < consumed = " + diskspace);
+      }
+
+      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(namespace, diskspace);
+    }
+      
+    stack.pop();
+  }
 
   /**
    * Load the image namespace from the given image file, verifying

+ 6 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -1396,8 +1396,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           }
           dir.setTimes(src, inode, -1, now, false, iip.getLatestSnapshot());
         }
-        return blockManager.createLocatedBlocks(inode.getBlocks(),
-            inode.computeFileSize(false, iip.getPathSnapshot()),
+        final long fileSize = iip.getPathSnapshot() != null?
+            inode.computeFileSize(iip.getPathSnapshot())
+            : inode.computeFileSizeNotIncludingLastUcBlock();
+        return blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
             inode.isUnderConstruction(), offset, length, needBlockToken);
       } finally {
         if (attempt == 0) {
@@ -2302,7 +2304,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       saveAllocatedBlock(src, inodesInPath, newBlock, targets);
 
       dir.persistBlocks(src, pendingFile);
-      offset = pendingFile.computeFileSize(true);
+      offset = pendingFile.computeFileSize();
     } finally {
       writeUnlock();
     }
@@ -2390,7 +2392,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
             "caught retry for allocation of a new block in " +
             src + ". Returning previously allocated block " + lastBlockInFile);
-        long offset = pendingFile.computeFileSize(true);
+        long offset = pendingFile.computeFileSize();
         onRetryBlock[0] = makeLocatedBlock(lastBlockInFile,
             ((BlockInfoUnderConstruction)lastBlockInFile).getExpectedLocations(),
             offset);

+ 84 - 28
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff;
 import org.apache.hadoop.util.StringUtils;
@@ -48,23 +49,6 @@ import com.google.common.primitives.SignedBytes;
 public abstract class INode implements Diff.Element<byte[]> {
   public static final Log LOG = LogFactory.getLog(INode.class);
 
-  /** Wrapper of two counters for namespace consumed and diskspace consumed. */
-  static class DirCounts {
-    /** namespace count */
-    long nsCount = 0;
-    /** diskspace count */
-    long dsCount = 0;
-    
-    /** returns namespace count */
-    long getNsCount() {
-      return nsCount;
-    }
-    /** returns diskspace count */
-    long getDsCount() {
-      return dsCount;
-    }
-  }
-  
   private static enum PermissionStatusFormat {
     MODE(0, 16),
     GROUP(MODE.OFFSET + MODE.LENGTH, 25),
@@ -310,17 +294,81 @@ public abstract class INode implements Diff.Element<byte[]> {
   public abstract int destroySubtreeAndCollectBlocks(Snapshot snapshot,
       BlocksMapUpdateInfo collectedBlocks);
 
+  /**
+   * The content types such as file, directory and symlink to be computed
+   * in {@link INode#computeContentSummary(CountsMap)}.
+   */
+  public enum Content {
+    /** The number of files. */
+    FILE,
+    /** The number of directories. */
+    DIRECTORY,
+    /** The number of symlinks. */
+    SYMLINK,
+
+    /** The total of file length in bytes. */
+    LENGTH,
+    /** The total of disk space usage in bytes including replication. */
+    DISKSPACE,
+
+    /** The number of snapshots. */
+    SNAPSHOT,
+    /** The number of snapshottable directories. */
+    SNAPSHOTTABLE_DIRECTORY;
+
+    /** Content counts. */
+    public static class Counts extends EnumCounters<Content> {
+      private Counts() {
+        super(Content.values());
+      }
+    }
+
+    private static final EnumCounters.Factory<Content, Counts> FACTORY
+        = new EnumCounters.Factory<Content, Counts>() {
+      @Override
+      public Counts newInstance() {
+        return new Counts();
+      }
+    };
+
+    /** A map of counters for the current state and the snapshots. */
+    public static class CountsMap
+        extends EnumCounters.Map<CountsMap.Key, Content, Counts> {
+      /** The key type of the map. */
+      public static enum Key { CURRENT, SNAPSHOT }
+
+      private CountsMap() {
+        super(FACTORY);
+      }
+    }
+  }
+
   /** Compute {@link ContentSummary}. */
   public final ContentSummary computeContentSummary() {
-    long[] a = computeContentSummary(new long[]{0,0,0,0});
-    return new ContentSummary(a[0], a[1], a[2], getNsQuota(), 
-                              a[3], getDsQuota());
+    final Content.Counts current = computeContentSummary(
+        new Content.CountsMap()).getCounts(Key.CURRENT);
+    return new ContentSummary(current.get(Content.LENGTH),
+        current.get(Content.FILE) + current.get(Content.SYMLINK),
+        current.get(Content.DIRECTORY), getNsQuota(),
+        current.get(Content.DISKSPACE), getDsQuota());
   }
+
   /**
-   * @return an array of three longs. 
-   * 0: length, 1: file count, 2: directory count 3: disk space
+   * Count subtree content summary with a {@link Content.CountsMap}.
+   *
+   * @param countsMap The subtree counts for returning.
+   * @return The same objects as the counts parameter.
    */
-  abstract long[] computeContentSummary(long[] summary);
+  public abstract Content.CountsMap computeContentSummary(
+      Content.CountsMap countsMap);
+
+  /**
+   * Count subtree content summary with a {@link Content.Counts}.
+   *
+   * @param counts The subtree counts for returning.
+   * @return The same objects as the counts parameter.
+   */
+  public abstract Content.Counts computeContentSummary(Content.Counts counts);
   
   /**
    * Get the quota set for this inode
@@ -334,16 +382,24 @@ public abstract class INode implements Diff.Element<byte[]> {
     return -1;
   }
   
-  boolean isQuotaSet() {
+  final boolean isQuotaSet() {
     return getNsQuota() >= 0 || getDsQuota() >= 0;
   }
   
   /**
-   * Adds total number of names and total disk space taken under 
-   * this tree to counts.
-   * Returns updated counts object.
+   * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages.
+   */
+  final Quota.Counts computeQuotaUsage() {
+    return computeQuotaUsage(new Quota.Counts());
+  }
+
+  /**
+   * Count subtree {@link Quota#NAMESPACE} and {@link Quota#DISKSPACE} usages.
+   * 
+   * @param counts The subtree counts for returning.
+   * @return The same objects as the counts parameter.
    */
-  abstract DirCounts spaceConsumedInTree(DirCounts counts);
+  abstract Quota.Counts computeQuotaUsage(Quota.Counts counts);
   
   /**
    * @return null if the local name is null; otherwise, return the local name.

+ 17 - 28
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
+import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectoryWithSnapshot;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeFileUnderConstructionWithSnapshot;
@@ -511,45 +512,33 @@ public class INodeDirectory extends INode {
   }
 
   @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += 1;
+  Quota.Counts computeQuotaUsage(Quota.Counts counts) {
     if (children != null) {
       for (INode child : children) {
-        child.spaceConsumedInTree(counts);
+        child.computeQuotaUsage(counts);
       }
     }
+    counts.add(Quota.NAMESPACE, 1);
     return counts;    
   }
 
   @Override
-  long[] computeContentSummary(long[] summary) {
-    // Walk through the children of this node, using a new summary array
-    // for the (sub)tree rooted at this node
-    assert 4 == summary.length;
-    long[] subtreeSummary = new long[]{0,0,0,0};
-    if (children != null) {
-      for (INode child : children) {
-        child.computeContentSummary(subtreeSummary);
-      }
-    }
-    if (this instanceof INodeDirectoryWithQuota) {
-      // Warn if the cached and computed diskspace values differ
-      INodeDirectoryWithQuota node = (INodeDirectoryWithQuota)this;
-      long space = node.diskspaceConsumed();
-      assert -1 == node.getDsQuota() || space == subtreeSummary[3];
-      if (-1 != node.getDsQuota() && space != subtreeSummary[3]) {
-        NameNode.LOG.warn("Inconsistent diskspace for directory "
-            +getLocalName()+". Cached: "+space+" Computed: "+subtreeSummary[3]);
-      }
+  public Content.Counts computeContentSummary(final Content.Counts counts) {
+    for (INode child : getChildrenList(null)) {
+      child.computeContentSummary(counts);
     }
+    counts.add(Content.DIRECTORY, 1);
+    return counts;
+  }
 
-    // update the passed summary array with the values for this node's subtree
-    for (int i = 0; i < summary.length; i++) {
-      summary[i] += subtreeSummary[i];
+  @Override
+  public Content.CountsMap computeContentSummary(
+      final Content.CountsMap countsMap) {
+    for (INode child : getChildrenList(null)) {
+      child.computeContentSummary(countsMap);
     }
-
-    summary[2]++;
-    return summary;
+    countsMap.getCounts(Key.CURRENT).add(Content.DIRECTORY, 1);
+    return countsMap;
   }
 
   /**

+ 43 - 21
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java

@@ -31,7 +31,7 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
   /** Name space quota */
   private long nsQuota = Long.MAX_VALUE;
   /** Name space count */
-  private long nsCount = 1L;
+  private long namespace = 1L;
   /** Disk space quota */
   private long dsQuota = HdfsConstants.QUOTA_RESET;
   /** Disk space count */
@@ -46,10 +46,9 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
   public INodeDirectoryWithQuota(INodeDirectory other, boolean adopt,
       long nsQuota, long dsQuota) {
     super(other, adopt);
-    INode.DirCounts counts = new INode.DirCounts();
-    other.spaceConsumedInTree(counts);
-    this.nsCount = counts.getNsCount();
-    this.diskspace = counts.getDsCount();
+    final Quota.Counts counts = other.computeQuotaUsage();
+    this.namespace = counts.get(Quota.NAMESPACE);
+    this.diskspace = counts.get(Quota.DISKSPACE);
     this.nsQuota = nsQuota;
     this.dsQuota = dsQuota;
   }
@@ -95,19 +94,45 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
     nodeToUpdate.dsQuota = dsQuota;
   }
   
-  
   @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += nsCount;
-    counts.dsCount += diskspace;
+  public final Quota.Counts computeQuotaUsage(Quota.Counts counts) {
+    // use cache value
+    counts.add(Quota.NAMESPACE, namespace);
+    counts.add(Quota.DISKSPACE, diskspace);
+    return counts;
+  }
+
+  @Override
+  public Content.CountsMap computeContentSummary(
+      final Content.CountsMap countsMap) {
+    final long original = countsMap.sum(Content.DISKSPACE);
+    super.computeContentSummary(countsMap);
+    checkDiskspace(countsMap.sum(Content.DISKSPACE) - original);
+    return countsMap;
+  }
+
+  @Override
+  public Content.Counts computeContentSummary(
+      final Content.Counts counts) {
+    final long original = counts.get(Content.DISKSPACE);
+    super.computeContentSummary(counts);
+    checkDiskspace(counts.get(Content.DISKSPACE) - original);
     return counts;
   }
+  
+  private void checkDiskspace(final long computed) {
+    if (-1 != getDsQuota() && diskspaceConsumed() != computed) {
+      NameNode.LOG.error("BUG: Inconsistent diskspace for directory "
+          + getFullPathName() + ". Cached = " + diskspaceConsumed()
+          + " != Computed = " + computed);
+    }
+  }
 
   /** Get the number of names in the subtree rooted at this directory
    * @return the size of the subtree rooted at this directory
    */
   long numItemsInTree() {
-    return nsCount;
+    return namespace;
   }
   
   long diskspaceConsumed() {
@@ -120,7 +145,8 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
    * @param dsDelta change to disk space occupied
    */
   void addSpaceConsumed(long nsDelta, long dsDelta) {
-    setSpaceConsumed(nsCount + nsDelta, diskspace + dsDelta);
+    namespace += nsDelta;
+    diskspace += dsDelta;
   }
   
   /** 
@@ -132,7 +158,7 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
    * @param diskspace disk space take by all the nodes under this directory
    */
   void setSpaceConsumed(long namespace, long diskspace) {
-    this.nsCount = namespace;
+    this.namespace = namespace;
     this.diskspace = diskspace;
   }
   
@@ -140,15 +166,11 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
    * @throws QuotaExceededException if the given quota is less than the count
    */
   void verifyQuota(long nsDelta, long dsDelta) throws QuotaExceededException {
-    long newCount = nsCount + nsDelta;
-    long newDiskspace = diskspace + dsDelta;
-    if (nsDelta>0 || dsDelta>0) {
-      if (nsQuota >= 0 && nsQuota < newCount) {
-        throw new NSQuotaExceededException(nsQuota, newCount);
-      }
-      if (dsQuota >= 0 && dsQuota < newDiskspace) {
-        throw new DSQuotaExceededException(dsQuota, newDiskspace);
-      }
+    if (Quota.isViolated(nsQuota, namespace, nsDelta)) {
+      throw new NSQuotaExceededException(nsQuota, namespace + nsDelta);
+    }
+    if (Quota.isViolated(dsQuota, diskspace, dsDelta)) {
+      throw new DSQuotaExceededException(dsQuota, diskspace + dsDelta);
     }
   }
 }

+ 117 - 50
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -25,11 +25,15 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiff;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.FileDiffList;
+import org.apache.hadoop.hdfs.server.namenode.snapshot.FileWithSnapshot.Util;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -160,8 +164,10 @@ public class INodeFile extends INode implements BlockCollection {
   }
 
   @Override
-  public short getBlockReplication() {
-    return getFileReplication(null);
+  public final short getBlockReplication() {
+    return this instanceof FileWithSnapshot?
+        Util.getBlockReplication((FileWithSnapshot)this)
+        : getFileReplication(null);
   }
 
   public void setFileReplication(short replication, Snapshot latest) {
@@ -179,6 +185,11 @@ public class INodeFile extends INode implements BlockCollection {
     return HeaderFormat.getPreferredBlockSize(header);
   }
 
+  /** @return the diskspace required for a full block. */
+  final long getBlockDiskspace() {
+    return getPreferredBlockSize() * getBlockReplication();
+  }
+
   /** @return the blocks of the file. */
   @Override
   public BlockInfo[] getBlocks() {
@@ -259,69 +270,125 @@ public class INodeFile extends INode implements BlockCollection {
     return getFullPathName();
   }
 
+  @Override
+  Quota.Counts computeQuotaUsage(Quota.Counts counts) {
+    counts.add(Quota.NAMESPACE, this instanceof FileWithSnapshot?
+        ((FileWithSnapshot)this).getDiffs().asList().size() + 1: 1);
+    counts.add(Quota.DISKSPACE, diskspaceConsumed());
+    return counts;
+  }
+
+  @Override
+  public final Content.CountsMap computeContentSummary(
+      final Content.CountsMap countsMap) {
+    computeContentSummary4Snapshot(countsMap.getCounts(Key.SNAPSHOT));
+    computeContentSummary4Current(countsMap.getCounts(Key.CURRENT));
+    return countsMap;
+  }
 
   @Override
-  long[] computeContentSummary(long[] summary) {
-    summary[0] += computeFileSize(true, null);
-    summary[1]++;
-    summary[3] += diskspaceConsumed();
-    return summary;
+  public final Content.Counts computeContentSummary(
+      final Content.Counts counts) {
+    computeContentSummary4Snapshot(counts);
+    computeContentSummary4Current(counts);
+    return counts;
+  }
+
+  private void computeContentSummary4Snapshot(final Content.Counts counts) {
+    // file length and diskspace only counted for the latest state of the file
+    // i.e. either the current state or the last snapshot
+    if (this instanceof FileWithSnapshot) {
+      final FileWithSnapshot withSnapshot = (FileWithSnapshot)this;
+      final FileDiffList diffs = withSnapshot.getDiffs();
+      final int n = diffs.asList().size();
+      counts.add(Content.FILE, n);
+      if (n > 0 && withSnapshot.isCurrentFileDeleted()) {
+        counts.add(Content.LENGTH, diffs.getLast().getFileSize());
+      }
+
+      if (withSnapshot.isCurrentFileDeleted()) {
+        final long lastFileSize = diffs.getLast().getFileSize();
+        counts.add(Content.DISKSPACE, lastFileSize * getBlockReplication());
+      }
+    }
+  }
+
+  private void computeContentSummary4Current(final Content.Counts counts) {
+    if (this instanceof FileWithSnapshot
+        && ((FileWithSnapshot)this).isCurrentFileDeleted()) {
+      return;
+    }
+
+    counts.add(Content.LENGTH, computeFileSize());
+    counts.add(Content.FILE, 1);
+    counts.add(Content.DISKSPACE, diskspaceConsumed());
+  }
+
+  /** The same as computeFileSize(null). */
+  public final long computeFileSize() {
+    return computeFileSize(null);
   }
 
-  /** The same as computeFileSize(includesBlockInfoUnderConstruction, null). */
-  public long computeFileSize(boolean includesBlockInfoUnderConstruction) {
-    return computeFileSize(includesBlockInfoUnderConstruction, null);
+  /**
+   * Compute file size of the current file if the given snapshot is null;
+   * otherwise, get the file size from the given snapshot.
+   */
+  public final long computeFileSize(Snapshot snapshot) {
+    if (snapshot != null && this instanceof FileWithSnapshot) {
+      final FileDiff d = ((FileWithSnapshot)this).getDiffs().getDiff(snapshot);
+      if (d != null) {
+        return d.getFileSize();
+      }
+    }
+
+    return computeFileSize(true, false);
+  }
+
+  /**
+   * Compute file size of the current file size
+   * but not including the last block if it is under construction.
+   */
+  public final long computeFileSizeNotIncludingLastUcBlock() {
+    return computeFileSize(false, false);
   }
 
-  /** Compute file size.
-   * May or may not include BlockInfoUnderConstruction.
+  /**
+   * Compute file size of the current file.
+   * 
+   * @param includesLastUcBlock
+   *          If the last block is under construction, should it be included?
+   * @param usePreferredBlockSize4LastUcBlock
+   *          If the last block is under construction, should we use actual
+   *          block size or preferred block size?
+   *          Note that usePreferredBlockSize4LastUcBlock is ignored
+   *          if includesLastUcBlock == false.
+   * @return file size
    */
-  public long computeFileSize(boolean includesBlockInfoUnderConstruction,
-      Snapshot snapshot) {
+  private final long computeFileSize(boolean includesLastUcBlock,
+      boolean usePreferredBlockSize4LastUcBlock) {
     if (blocks == null || blocks.length == 0) {
       return 0;
     }
     final int last = blocks.length - 1;
     //check if the last block is BlockInfoUnderConstruction
-    long bytes = blocks[last] instanceof BlockInfoUnderConstruction
-                 && !includesBlockInfoUnderConstruction?
-                     0: blocks[last].getNumBytes();
+    long size = blocks[last].getNumBytes();
+    if (blocks[last] instanceof BlockInfoUnderConstruction) {
+       if (!includesLastUcBlock) {
+         size = 0;
+       } else if (usePreferredBlockSize4LastUcBlock) {
+         size = getPreferredBlockSize();
+       }
+    }
+    //sum other blocks
     for(int i = 0; i < last; i++) {
-      bytes += blocks[i].getNumBytes();
+      size += blocks[i].getNumBytes();
     }
-    return bytes;
-  }
-  
-
-  @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += 1;
-    counts.dsCount += diskspaceConsumed();
-    return counts;
+    return size;
   }
 
   long diskspaceConsumed() {
-    return diskspaceConsumed(blocks);
-  }
-  
-  private long diskspaceConsumed(Block[] blkArr) {
-    long size = 0;
-    if(blkArr == null) 
-      return 0;
-    
-    for (Block blk : blkArr) {
-      if (blk != null) {
-        size += blk.getNumBytes();
-      }
-    }
-    /* If the last block is being written to, use prefferedBlockSize
-     * rather than the actual block size.
-     */
-    if (blkArr.length > 0 && blkArr[blkArr.length-1] != null && 
-        isUnderConstruction()) {
-      size += getPreferredBlockSize() - blkArr[blkArr.length-1].getNumBytes();
-    }
-    return size * getFileReplication();
+    // use preferred block size for the last block if it is under construction
+    return computeFileSize(true, true) * getBlockReplication();
   }
   
   /**
@@ -349,7 +416,7 @@ public class INodeFile extends INode implements BlockCollection {
   public void dumpTreeRecursively(PrintWriter out, StringBuilder prefix,
       final Snapshot snapshot) {
     super.dumpTreeRecursively(out, prefix, snapshot);
-    out.print(", fileSize=" + computeFileSize(true, snapshot));
+    out.print(", fileSize=" + computeFileSize(snapshot));
     // only compare the first block
     out.print(", blocks=");
     out.print(blocks == null || blocks.length == 0? null: blocks[0]);

+ 17 - 9
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java

@@ -22,6 +22,7 @@ import java.io.PrintWriter;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
 
 /**
@@ -62,12 +63,6 @@ public class INodeSymlink extends INode {
   public byte[] getSymlink() {
     return symlink;
   }
-
-  @Override
-  DirCounts spaceConsumedInTree(DirCounts counts) {
-    counts.nsCount += 1;
-    return counts;
-  }
   
   @Override
   public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
@@ -76,9 +71,22 @@ public class INodeSymlink extends INode {
   }
 
   @Override
-  long[] computeContentSummary(long[] summary) {
-    summary[1]++; // Increment the file count
-    return summary;
+  Quota.Counts computeQuotaUsage(final Quota.Counts counts) {
+    counts.add(Quota.NAMESPACE, 1);
+    return counts;
+  }
+
+  @Override
+  public Content.CountsMap computeContentSummary(
+      final Content.CountsMap countsMap) {
+    computeContentSummary(countsMap.getCounts(Key.CURRENT));
+    return countsMap;
+  }
+
+  @Override
+  public Content.Counts computeContentSummary(final Content.Counts counts) {
+    counts.add(Content.SYMLINK, 1);
+    return counts;
   }
 
   @Override

+ 58 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Quota.java

@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+/** Quota types. */
+public enum Quota {
+  /** The namespace usage, i.e. the number of name objects. */
+  NAMESPACE,
+  /** The diskspace usage in bytes including replication. */
+  DISKSPACE;
+
+  /** Counters for quota counts. */
+  public static class Counts extends EnumCounters<Quota> {
+    /** @return a new counter with the given namespace and diskspace usages. */
+    static Counts newInstance(long namespace, long diskspace) {
+      final Counts c = new Counts();
+      c.set(NAMESPACE, namespace);
+      c.set(DISKSPACE, diskspace);
+      return c;
+    }
+
+    Counts() {
+      super(Quota.values());
+    }
+  }
+
+  /**
+   * Is quota violated?
+   * The quota is violated if quota is set and usage > quota. 
+   */
+  static boolean isViolated(final long quota, final long usage) {
+    return quota >= 0 && usage > quota;
+  }
+
+  /**
+   * Is quota violated?
+   * The quota is violated if quota is set, delta > 0 and usage + delta > quota.
+   */
+  static boolean isViolated(final long quota, final long usage,
+      final long delta) {
+    return quota >= 0 && delta > 0 && usage > quota - delta;
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiff.java

@@ -84,7 +84,7 @@ abstract class AbstractINodeDiff<N extends INode,
   }
 
   /** @return the snapshot object of this diff. */
-  final Snapshot getSnapshot() {
+  public final Snapshot getSnapshot() {
     return snapshot;
   }
 

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/AbstractINodeDiffList.java

@@ -44,7 +44,7 @@ abstract class AbstractINodeDiffList<N extends INode,
   }
 
   /** @return this list as a unmodifiable {@link List}. */
-  final List<D> asList() {
+  public final List<D> asList() {
     return Collections.unmodifiableList(diffs);
   }
 
@@ -111,7 +111,7 @@ abstract class AbstractINodeDiffList<N extends INode,
   }
 
   /** @return the last diff. */
-  final D getLast() {
+  public final D getLast() {
     final int n = diffs.size();
     return n == 0? null: diffs.get(n - 1);
   }
@@ -127,7 +127,7 @@ abstract class AbstractINodeDiffList<N extends INode,
    *         When the diff is null, it means that the current state and
    *         the corresponding snapshot state are the same. 
    */
-  final D getDiff(Snapshot snapshot) {
+  public final D getDiff(Snapshot snapshot) {
     if (snapshot == null) {
       // snapshot == null means the current state, therefore, return null.
       return null;
@@ -195,7 +195,7 @@ abstract class AbstractINodeDiffList<N extends INode,
   }
 
   /** Save the snapshot copy to the latest snapshot. */
-  void saveSelf2Snapshot(Snapshot latest, N currentINode, N snapshotCopy) {
+  public void saveSelf2Snapshot(Snapshot latest, N currentINode, N snapshotCopy) {
     if (latest != null) {
       checkAndAddLatestSnapshotDiff(latest, currentINode).saveSnapshotCopy(
           snapshotCopy, factory, currentINode);

+ 12 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshot.java

@@ -36,13 +36,13 @@ public interface FileWithSnapshot {
   /**
    * The difference of an {@link INodeFile} between two snapshots.
    */
-  static class FileDiff extends AbstractINodeDiff<INodeFile, FileDiff> {
+  public static class FileDiff extends AbstractINodeDiff<INodeFile, FileDiff> {
     /** The file size at snapshot creation time. */
-    final long fileSize;
+    private final long fileSize;
 
     FileDiff(Snapshot snapshot, INodeFile file) {
       super(snapshot, null, null);
-      fileSize = file.computeFileSize(true, null);
+      fileSize = file.computeFileSize();
     }
 
     /** Constructor used by FSImage loading */
@@ -52,6 +52,11 @@ public interface FileWithSnapshot {
       this.fileSize = fileSize;
     }
 
+    /** @return the file size in the snapshot. */
+    public long getFileSize() {
+      return fileSize;
+    }
+
     @Override
     void combinePosteriorAndCollectBlocks(INodeFile currentINode,
         FileDiff posterior, BlocksMapUpdateInfo collectedBlocks) {
@@ -113,15 +118,15 @@ public interface FileWithSnapshot {
   public boolean isCurrentFileDeleted();
 
   /** Utility methods for the classes which implement the interface. */
-  static class Util {
+  public static class Util {
     /** 
      * @return block replication, which is the max file replication among
      *         the file and the diff list.
      */
-    static short getBlockReplication(final FileWithSnapshot file) {
+    public static short getBlockReplication(final FileWithSnapshot file) {
       short max = file.isCurrentFileDeleted()? 0
           : file.asINodeFile().getFileReplication();
-      for(FileDiff d : file.getDiffs().asList()) {
+      for(FileDiff d : file.getDiffs()) {
         if (d.snapshotINode != null) {
           final short replication = d.snapshotINode.getFileReplication();
           if (replication > max) {
@@ -151,7 +156,7 @@ public interface FileWithSnapshot {
         final FileDiff last = file.getDiffs().getLast();
         max = last == null? 0: last.fileSize;
       } else { 
-        max = file.asINodeFile().computeFileSize(true, null);
+        max = file.asINodeFile().computeFileSize();
       }
 
       collectBlocksBeyondMax(file, max, info);

+ 19 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
 import org.apache.hadoop.hdfs.server.namenode.INode;
+import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.util.ReadOnlyList;
@@ -313,6 +314,24 @@ public class INodeDirectorySnapshottable extends INodeDirectoryWithSnapshot {
     }
   }
   
+  @Override
+  public Content.Counts computeContentSummary(final Content.Counts counts) {
+    super.computeContentSummary(counts);
+    counts.add(Content.SNAPSHOT, snapshotsByNames.size());
+    counts.add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
+    return counts;
+  }
+
+  @Override
+  public Content.CountsMap computeContentSummary(
+      final Content.CountsMap countsMap) {
+    super.computeContentSummary(countsMap);
+    countsMap.getCounts(Key.SNAPSHOT).add(Content.SNAPSHOT,
+        snapshotsByNames.size());
+    countsMap.getCounts(Key.CURRENT).add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
+    return countsMap;
+  }
+
   /**
    * Compute the difference between two snapshots (or a snapshot and the current
    * directory) of the directory.

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
 import org.apache.hadoop.hdfs.server.namenode.INode;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
 import org.apache.hadoop.hdfs.server.namenode.INodeDirectoryWithQuota;
+import org.apache.hadoop.hdfs.server.namenode.INode.Content.CountsMap.Key;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff.Container;
 import org.apache.hadoop.hdfs.server.namenode.snapshot.diff.Diff.UndoInfo;
@@ -551,4 +552,28 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     }
     return n;
   }
+
+  @Override
+  public Content.CountsMap computeContentSummary(
+      final Content.CountsMap countsMap) {
+    super.computeContentSummary(countsMap);
+    computeContentSummary4Snapshot(countsMap.getCounts(Key.SNAPSHOT));
+    return countsMap;
+  }
+
+  @Override
+  public Content.Counts computeContentSummary(final Content.Counts counts) {
+    super.computeContentSummary(counts);
+    computeContentSummary4Snapshot(counts);
+    return counts;
+  }
+
+  private void computeContentSummary4Snapshot(final Content.Counts counts) {
+    for(DirectoryDiff d : diffs) {
+      for(INode deleted : d.getChildrenDiff().getDeletedList()) {
+        deleted.computeContentSummary(counts);
+      }
+    }
+    counts.add(Content.DIRECTORY, diffs.asList().size());
+  }
 }

+ 0 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java

@@ -109,19 +109,6 @@ public class INodeFileUnderConstructionWithSnapshot
     return diffs;
   }
 
-  @Override
-  public short getBlockReplication() {
-    return Util.getBlockReplication(this);
-  }
-
-  @Override
-  public long computeFileSize(boolean includesBlockInfoUnderConstruction,
-      Snapshot snapshot) {
-    final FileDiff diff = diffs.getDiff(snapshot);
-    return diff != null? diff.fileSize
-        : super.computeFileSize(includesBlockInfoUnderConstruction, null);
-  }
-
   @Override
   public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
       final BlocksMapUpdateInfo collectedBlocks) {

+ 0 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java

@@ -79,19 +79,6 @@ public class INodeFileWithSnapshot extends INodeFile
     return diffs;
   }
 
-  @Override
-  public short getBlockReplication() {
-    return Util.getBlockReplication(this);
-  }
-
-  @Override
-  public long computeFileSize(boolean includesBlockInfoUnderConstruction,
-      Snapshot snapshot) {
-    final FileDiff diff = diffs.getDiff(snapshot);
-    return diff != null? diff.fileSize
-        : super.computeFileSize(includesBlockInfoUnderConstruction, null);
-  }
-
   @Override
   public int destroySubtreeAndCollectBlocks(final Snapshot snapshot,
       final BlocksMapUpdateInfo collectedBlocks) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/diff/Diff.java

@@ -158,7 +158,7 @@ public class Diff<K, E extends Diff.Element<K>> {
   }
 
   /** @return the deleted list, which is never null. */
-  protected List<E> getDeletedList() {
+  public List<E> getDeletedList() {
     return deleted == null? Collections.<E>emptyList(): deleted;
   }
 

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestINodeFileUnderConstructionWithSnapshot.java

@@ -106,7 +106,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     // check corresponding inodes
     fileNode = (INodeFile) fsdir.getINode(file.toString());
     assertEquals(REPLICATION - 1, fileNode.getFileReplication());
-    assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(true));
+    assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize());
 
     // 3. create snapshot --> append
     hdfs.createSnapshot(dir, "s2");
@@ -115,7 +115,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     // check corresponding inodes
     fileNode = (INodeFile) fsdir.getINode(file.toString());
     assertEquals(REPLICATION - 1,  fileNode.getFileReplication());
-    assertEquals(BLOCKSIZE * 4, fileNode.computeFileSize(true));
+    assertEquals(BLOCKSIZE * 4, fileNode.computeFileSize());
   }
   
   private HdfsDataOutputStream appendFileWithoutClosing(Path file, int length)
@@ -146,7 +146,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     // check: an INodeFileUnderConstructionWithSnapshot should be stored into s0's
     // deleted list, with size BLOCKSIZE*2
     INodeFile fileNode = (INodeFile) fsdir.getINode(file.toString());
-    assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize(true));
+    assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize());
     INodeDirectorySnapshottable dirNode = (INodeDirectorySnapshottable) fsdir
         .getINode(dir.toString());
     DirectoryDiff last = dirNode.getDiffs().getLast();
@@ -158,7 +158,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     
     // re-check nodeInDeleted_S0
     dirNode = (INodeDirectorySnapshottable) fsdir.getINode(dir.toString());
-    assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize(true, s0));
+    assertEquals(BLOCKSIZE * 2, fileNode.computeFileSize(s0));
     
     // 3. take snapshot --> close stream
     hdfs.createSnapshot(dir, "s1");
@@ -171,7 +171,7 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     last = dirNode.getDiffs().getLast();
     Snapshot s1 = last.snapshot;
     assertTrue(fileNode instanceof INodeFileWithSnapshot);
-    assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(true, s1));
+    assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(s1));
     
     // 4. modify file --> append without closing stream --> take snapshot -->
     // close stream
@@ -181,6 +181,6 @@ public class TestINodeFileUnderConstructionWithSnapshot {
     out.close();
     
     // re-check the size of nodeInDeleted_S1
-    assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(true, s1));
+    assertEquals(BLOCKSIZE * 3, fileNode.computeFileSize(s1));
   }  
 }