Parcourir la source

merge of r1535792 through r1540238 from trunk.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1540239 13f79535-47bb-0310-9956-ffa450edef68
Tsz-wo Sze il y a 11 ans
Parent
commit
fccbb5072c
26 fichiers modifiés avec 922 ajouts et 491 suppressions
  1. 2 2
      dev-support/test-patch.sh
  2. 39 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
  3. 2 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java
  4. 9 11
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java
  5. 10 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  6. 2 12
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java
  7. 0 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  8. 359 115
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
  9. 38 56
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
  10. 1 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
  11. 88 149
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
  12. 8 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
  13. 22 15
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
  14. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java
  15. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java
  16. 0 13
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
  17. 7 54
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
  18. 117 29
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
  19. 46 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java
  20. 45 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java
  21. 0 5
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
  22. 3 0
      hadoop-yarn-project/CHANGES.txt
  23. 34 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
  24. 11 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  25. 11 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
  26. 60 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

+ 2 - 2
dev-support/test-patch.sh

@@ -425,9 +425,9 @@ checkJavadocWarnings () {
   echo ""
   echo "There appear to be $javadocWarnings javadoc warnings generated by the patched build."
 
-  #There are 11 warnings that are caused by things that are caused by using sun internal APIs.
+  #There are 12 warnings that are caused by things that are caused by using sun internal APIs.
   #There are 2 warnings that are caused by the Apache DS Dn class used in MiniKdc.
-  OK_JAVADOC_WARNINGS=13;
+  OK_JAVADOC_WARNINGS=14;
   ### if current warnings greater than OK_JAVADOC_WARNINGS
   if [[ $javadocWarnings -ne $OK_JAVADOC_WARNINGS ]] ; then
     JIRA_COMMENT="$JIRA_COMMENT

+ 39 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java

@@ -23,7 +23,9 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.RandomAccessFile;
+import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -34,10 +36,11 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.io.SecureIOUtils.AlreadyExistsException;
 import org.apache.hadoop.util.NativeCodeLoader;
 import org.apache.hadoop.util.Shell;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import sun.misc.Unsafe;
+
 import com.google.common.annotations.VisibleForTesting;
 
 /**
@@ -271,6 +274,26 @@ public class NativeIO {
       }
       munlock_native(buffer, len);
     }
+    
+    /**
+     * Unmaps the block from memory. See munmap(2).
+     *
+     * There isn't any portable way to unmap a memory region in Java.
+     * So we use the sun.nio method here.
+     * Note that unmapping a memory region could cause crashes if code
+     * continues to reference the unmapped code.  However, if we don't
+     * manually unmap the memory, we are dependent on the finalizer to
+     * do it, and we have no idea when the finalizer will run.
+     *
+     * @param buffer    The buffer to unmap.
+     */
+    public static void munmap(MappedByteBuffer buffer) {
+      if (buffer instanceof sun.nio.ch.DirectBuffer) {
+        sun.misc.Cleaner cleaner =
+            ((sun.nio.ch.DirectBuffer)buffer).cleaner();
+        cleaner.clean();
+      }
+    }
 
     /** Linux only methods used for getOwner() implementation */
     private static native long getUIDforFDOwnerforOwner(FileDescriptor fd) throws IOException;
@@ -539,6 +562,21 @@ public class NativeIO {
 
   private static native long getMemlockLimit0();
   
+  /**
+   * @return the operating system's page size.
+   */
+  public static long getOperatingSystemPageSize() {
+    try {
+      Field f = Unsafe.class.getDeclaredField("theUnsafe");
+      f.setAccessible(true);
+      Unsafe unsafe = (Unsafe)f.get(null);
+      return unsafe.pageSize();
+    } catch (Throwable e) {
+      LOG.warn("Unable to get operating system page size.  Guessing 4096.", e);
+      return 4096;
+    }
+  }
+
   private static class CachedUid {
     final long timestamp;
     final String username;

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/LossyRetryInvocationHandler.java

@@ -18,9 +18,9 @@
 package org.apache.hadoop.io.retry;
 
 import java.lang.reflect.Method;
-import java.net.UnknownHostException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ipc.RetriableException;
 
 /**
  * A dummy invocation handler extending RetryInvocationHandler. It drops the
@@ -52,7 +52,7 @@ public class LossyRetryInvocationHandler<T> extends RetryInvocationHandler<T> {
     if (retryCount < this.numToDrop) {
       RetryCount.set(++retryCount);
       LOG.info("Drop the response. Current retryCount == " + retryCount);
-      throw new UnknownHostException("Fake Exception");
+      throw new RetriableException("Fake Exception");
     } else {
       LOG.info("retryCount == " + retryCount
           + ". It's time to normally process the response");

+ 9 - 11
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java

@@ -558,27 +558,25 @@ public class RetryPolicies {
           isWrappedStandbyException(e)) {
         return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY,
             getFailoverOrRetrySleepTime(failovers));
-      } else if (e instanceof SocketException ||
-                 (e instanceof IOException && !(e instanceof RemoteException))) {
+      } else if (e instanceof RetriableException
+          || getWrappedRetriableException(e) != null) {
+        // RetriableException or RetriableException wrapped 
+        return new RetryAction(RetryAction.RetryDecision.RETRY,
+              getFailoverOrRetrySleepTime(retries));
+      } else if (e instanceof SocketException
+          || (e instanceof IOException && !(e instanceof RemoteException))) {
         if (isIdempotentOrAtMostOnce) {
           return RetryAction.FAILOVER_AND_RETRY;
         } else {
           return new RetryAction(RetryAction.RetryDecision.FAIL, 0,
-              "the invoked method is not idempotent, and unable to determine " +
-              "whether it was invoked");
+              "the invoked method is not idempotent, and unable to determine "
+                  + "whether it was invoked");
         }
       } else {
-        RetriableException re = getWrappedRetriableException(e);
-        if (re != null) {
-          return new RetryAction(RetryAction.RetryDecision.RETRY,
-              getFailoverOrRetrySleepTime(retries));
-        } else {
           return fallbackPolicy.shouldRetry(e, retries, failovers,
               isIdempotentOrAtMostOnce);
-        }
       }
     }
-    
   }
 
   /**

+ 10 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -359,6 +359,8 @@ Trunk (Unreleased)
     HDFS-5468. CacheAdmin help command does not recognize commands  (Stephen
     Chu via Colin Patrick McCabe)
 
+    HDFS-5394. Fix race conditions in DN caching and uncaching (cmccabe)
+
 Release 2.3.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES
@@ -455,6 +457,9 @@ Release 2.3.0 - UNRELEASED
     HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web
     (Haohui Mai via Arpit Agarwal)
 
+    HDFS-5371. Let client retry the same NN when 
+    "dfs.client.test.drop.namenode.response.number" is enabled. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -506,6 +511,11 @@ Release 2.3.0 - UNRELEASED
     HDFS-5443. Delete 0-sized block when deleting an under-construction file that 
     is included in snapshot. (jing9)
 
+    HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed 
+    file/directory while deletion. (jing9)
+
+    HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9) 
+
 Release 2.2.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 2 - 12
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java

@@ -22,6 +22,7 @@ import java.io.FileInputStream;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.nativeio.NativeIO;
 
 import java.io.IOException;
 import java.lang.ref.WeakReference;
@@ -147,20 +148,9 @@ public class ClientMmap {
 
   /**
    * Unmap the memory region.
-   *
-   * There isn't any portable way to unmap a memory region in Java.
-   * So we use the sun.nio method here.
-   * Note that unmapping a memory region could cause crashes if code
-   * continues to reference the unmapped code.  However, if we don't
-   * manually unmap the memory, we are dependent on the finalizer to
-   * do it, and we have no idea when the finalizer will run.
    */
   void unmap() {
     assert(refCount.get() == 0);
-    if (map instanceof sun.nio.ch.DirectBuffer) {
-      final sun.misc.Cleaner cleaner =
-          ((sun.nio.ch.DirectBuffer) map).cleaner();
-      cleaner.clean();
-    }
+    NativeIO.POSIX.munmap(map);
   }
 }

+ 0 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -47,7 +47,6 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 

+ 359 - 115
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java

@@ -18,24 +18,35 @@
 
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map.Entry;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.io.nativeio.NativeIO;
 
 /**
  * Manages caching for an FsDatasetImpl by using the mmap(2) and mlock(2)
@@ -45,178 +56,411 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class FsDatasetCache {
+  /**
+   * Keys which identify MappableBlocks.
+   */
+  private static final class Key {
+    /**
+     * Block id.
+     */
+    final long id;
+
+    /**
+     * Block pool id.
+     */
+    final String bpid;
+
+    Key(long id, String bpid) {
+      this.id = id;
+      this.bpid = bpid;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (o == null) {
+        return false;
+      }
+      if (!(o.getClass() == getClass())) {
+        return false;
+      }
+      Key other = (Key)o;
+      return ((other.id == this.id) && (other.bpid.equals(this.bpid)));
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder().append(id).append(bpid).hashCode();
+    }
+  };
+
+  /**
+   * MappableBlocks that we know about.
+   */
+  private static final class Value {
+    final State state;
+    final MappableBlock mappableBlock;
+
+    Value(MappableBlock mappableBlock, State state) {
+      this.mappableBlock = mappableBlock;
+      this.state = state;
+    }
+  }
+
+  private enum State {
+    /**
+     * The MappableBlock is in the process of being cached.
+     */
+    CACHING,
+
+    /**
+     * The MappableBlock was in the process of being cached, but it was
+     * cancelled.  Only the FsDatasetCache#WorkerTask can remove cancelled
+     * MappableBlock objects.
+     */
+    CACHING_CANCELLED,
+
+    /**
+     * The MappableBlock is in the cache.
+     */
+    CACHED,
+
+    /**
+     * The MappableBlock is in the process of uncaching.
+     */
+    UNCACHING;
+
+    /**
+     * Whether we should advertise this block as cached to the NameNode and
+     * clients.
+     */
+    public boolean shouldAdvertise() {
+      return (this == CACHED);
+    }
+  }
 
   private static final Log LOG = LogFactory.getLog(FsDatasetCache.class);
 
   /**
-   * Map of cached blocks
+   * Stores MappableBlock objects and the states they're in.
    */
-  private final ConcurrentMap<Long, MappableBlock> cachedBlocks;
+  private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
 
   private final FsDatasetImpl dataset;
+
+  private final ThreadPoolExecutor uncachingExecutor;
+
   /**
-   * Number of cached bytes
+   * The approximate amount of cache space in use.
+   *
+   * This number is an overestimate, counting bytes that will be used only
+   * if pending caching operations succeed.  It does not take into account
+   * pending uncaching operations.
+   *
+   * This overestimate is more useful to the NameNode than an underestimate,
+   * since we don't want the NameNode to assign us more replicas than
+   * we can cache, because of the current batch of operations.
    */
-  private AtomicLong usedBytes;
+  private final UsedBytesCount usedBytesCount;
+
+  public static class PageRounder {
+    private final long osPageSize = NativeIO.getOperatingSystemPageSize();
+
+    /**
+     * Round up a number to the operating system page size.
+     */
+    public long round(long count) {
+      long newCount = 
+          (count + (osPageSize - 1)) / osPageSize;
+      return newCount * osPageSize;
+    }
+  }
+
+  private class UsedBytesCount {
+    private final AtomicLong usedBytes = new AtomicLong(0);
+    
+    private PageRounder rounder = new PageRounder();
+
+    /**
+     * Try to reserve more bytes.
+     *
+     * @param count    The number of bytes to add.  We will round this
+     *                 up to the page size.
+     *
+     * @return         The new number of usedBytes if we succeeded;
+     *                 -1 if we failed.
+     */
+    long reserve(long count) {
+      count = rounder.round(count);
+      while (true) {
+        long cur = usedBytes.get();
+        long next = cur + count;
+        if (next > maxBytes) {
+          return -1;
+        }
+        if (usedBytes.compareAndSet(cur, next)) {
+          return next;
+        }
+      }
+    }
+    
+    /**
+     * Release some bytes that we're using.
+     *
+     * @param count    The number of bytes to release.  We will round this
+     *                 up to the page size.
+     *
+     * @return         The new number of usedBytes.
+     */
+    long release(long count) {
+      count = rounder.round(count);
+      return usedBytes.addAndGet(-count);
+    }
+    
+    long get() {
+      return usedBytes.get();
+    }
+  }
+
   /**
-   * Total cache capacity in bytes
+   * The total cache capacity in bytes.
    */
   private final long maxBytes;
 
   public FsDatasetCache(FsDatasetImpl dataset) {
     this.dataset = dataset;
-    this.cachedBlocks = new ConcurrentHashMap<Long, MappableBlock>();
-    this.usedBytes = new AtomicLong(0);
     this.maxBytes = dataset.datanode.getDnConf().getMaxLockedMemory();
-  }
-
-  /**
-   * @return if the block is cached
-   */
-  boolean isCached(String bpid, long blockId) {
-    MappableBlock mapBlock = cachedBlocks.get(blockId);
-    if (mapBlock != null) {
-      return mapBlock.getBlockPoolId().equals(bpid);
-    }
-    return false;
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("FsDatasetCache-%d-" + dataset.toString())
+        .build();
+    this.usedBytesCount = new UsedBytesCount();
+    this.uncachingExecutor = new ThreadPoolExecutor(
+            0, 1,
+            60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            workerFactory);
+    this.uncachingExecutor.allowCoreThreadTimeOut(true);
   }
 
   /**
    * @return List of cached blocks suitable for translation into a
    * {@link BlockListAsLongs} for a cache report.
    */
-  List<Long> getCachedBlocks(String bpid) {
+  synchronized List<Long> getCachedBlocks(String bpid) {
     List<Long> blocks = new ArrayList<Long>();
-    // ConcurrentHashMap iteration doesn't see latest updates, which is okay
-    Iterator<MappableBlock> it = cachedBlocks.values().iterator();
-    while (it.hasNext()) {
-      MappableBlock mapBlock = it.next();
-      if (mapBlock.getBlockPoolId().equals(bpid)) {
-        blocks.add(mapBlock.getBlock().getBlockId());
+    for (Iterator<Entry<Key, Value>> iter =
+        mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) {
+      Entry<Key, Value> entry = iter.next();
+      if (entry.getKey().bpid.equals(bpid)) {
+        if (entry.getValue().state.shouldAdvertise()) {
+          blocks.add(entry.getKey().id);
+        }
       }
     }
     return blocks;
   }
 
   /**
-   * Asynchronously attempts to cache a block. This is subject to the
-   * configured maximum locked memory limit.
-   * 
-   * @param block block to cache
-   * @param volume volume of the block
-   * @param blockIn stream of the block's data file
-   * @param metaIn stream of the block's meta file
+   * Attempt to begin caching a block.
    */
-  void cacheBlock(String bpid, Block block, FsVolumeImpl volume,
-      FileInputStream blockIn, FileInputStream metaIn) {
-    if (isCached(bpid, block.getBlockId())) {
-      return;
-    }
-    MappableBlock mapBlock = null;
-    try {
-      mapBlock = new MappableBlock(bpid, block, volume, blockIn, metaIn);
-    } catch (IOException e) {
-      LOG.warn("Failed to cache replica " + block + ": Could not instantiate"
-          + " MappableBlock", e);
-      IOUtils.closeQuietly(blockIn);
-      IOUtils.closeQuietly(metaIn);
-      return;
-    }
-    // Check if there's sufficient cache capacity
-    boolean success = false;
-    long bytes = mapBlock.getNumBytes();
-    long used = usedBytes.get();
-    while (used+bytes < maxBytes) {
-      if (usedBytes.compareAndSet(used, used+bytes)) {
-        success = true;
-        break;
+  synchronized void cacheBlock(long blockId, String bpid,
+      String blockFileName, long length, long genstamp,
+      Executor volumeExecutor) {
+    Key key = new Key(blockId, bpid);
+    Value prevValue = mappableBlockMap.get(key);
+    if (prevValue != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Block with id " + blockId + ", pool " + bpid +
+            " already exists in the FsDatasetCache with state " +
+            prevValue.state);
       }
-      used = usedBytes.get();
-    }
-    if (!success) {
-      LOG.warn(String.format(
-          "Failed to cache replica %s: %s exceeded (%d + %d > %d)",
-          mapBlock.getBlock().toString(),
-          DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
-          used, bytes, maxBytes));
-      mapBlock.close();
       return;
     }
-    // Submit it to the worker pool to be cached
-    volume.getExecutor().execute(new WorkerTask(mapBlock));
+    mappableBlockMap.put(key, new Value(null, State.CACHING));
+    volumeExecutor.execute(
+        new CachingTask(key, blockFileName, length, genstamp));
   }
 
-  /**
-   * Uncaches a block if it is cached.
-   * @param blockId id to uncache
-   */
-  void uncacheBlock(String bpid, long blockId) {
-    MappableBlock mapBlock = cachedBlocks.get(blockId);
-    if (mapBlock != null &&
-        mapBlock.getBlockPoolId().equals(bpid) &&
-        mapBlock.getBlock().getBlockId() == blockId) {
-      mapBlock.close();
-      cachedBlocks.remove(blockId);
-      long bytes = mapBlock.getNumBytes();
-      long used = usedBytes.get();
-      while (!usedBytes.compareAndSet(used, used - bytes)) {
-        used = usedBytes.get();
+  synchronized void uncacheBlock(String bpid, long blockId) {
+    Key key = new Key(blockId, bpid);
+    Value prevValue = mappableBlockMap.get(key);
+
+    if (prevValue == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
+            "does not need to be uncached, because it is not currently " +
+            "in the mappableBlockMap.");
+      }
+      return;
+    }
+    switch (prevValue.state) {
+    case CACHING:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Cancelling caching for block with id " + blockId +
+            ", pool " + bpid + ".");
+      }
+      mappableBlockMap.put(key,
+          new Value(prevValue.mappableBlock, State.CACHING_CANCELLED));
+      break;
+    case CACHED:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
+            "has been scheduled for uncaching.");
       }
-      LOG.info("Successfully uncached block " + blockId);
-    } else {
-      LOG.info("Could not uncache block " + blockId + ": unknown block.");
+      mappableBlockMap.put(key,
+          new Value(prevValue.mappableBlock, State.UNCACHING));
+      uncachingExecutor.execute(new UncachingTask(key));
+      break;
+    default:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Block with id " + blockId + ", pool " + bpid + " " +
+            "does not need to be uncached, because it is " +
+            "in state " + prevValue.state + ".");
+      }
+      break;
     }
   }
 
   /**
    * Background worker that mmaps, mlocks, and checksums a block
    */
-  private class WorkerTask implements Runnable {
+  private class CachingTask implements Runnable {
+    private final Key key; 
+    private final String blockFileName;
+    private final long length;
+    private final long genstamp;
 
-    private MappableBlock block;
-    WorkerTask(MappableBlock block) {
-      this.block = block;
+    CachingTask(Key key, String blockFileName, long length, long genstamp) {
+      this.key = key;
+      this.blockFileName = blockFileName;
+      this.length = length;
+      this.genstamp = genstamp;
     }
 
     @Override
     public void run() {
       boolean success = false;
+      FileInputStream blockIn = null, metaIn = null;
+      MappableBlock mappableBlock = null;
+      ExtendedBlock extBlk =
+          new ExtendedBlock(key.bpid, key.id, length, genstamp);
+      long newUsedBytes = usedBytesCount.reserve(length);
+      if (newUsedBytes < 0) {
+        LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid +
+            ": could not reserve " + length + " more bytes in the " +
+            "cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
+            " of " + maxBytes + " exceeded."); 
+        return;
+      }
       try {
-        block.map();
-        block.lock();
-        block.verifyChecksum();
+        try {
+          blockIn = (FileInputStream)dataset.getBlockInputStream(extBlk, 0);
+          metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
+              .getWrappedStream();
+        } catch (ClassCastException e) {
+          LOG.warn("Failed to cache block with id " + key.id + ", pool " +
+                key.bpid + ": Underlying blocks are not backed by files.", e);
+          return;
+        } catch (FileNotFoundException e) {
+          LOG.info("Failed to cache block with id " + key.id + ", pool " +
+                key.bpid + ": failed to find backing files.");
+          return;
+        } catch (IOException e) {
+          LOG.warn("Failed to cache block with id " + key.id + ", pool " +
+                key.bpid + ": failed to open file", e);
+          return;
+        }
+        try {
+          mappableBlock = MappableBlock.
+              load(length, blockIn, metaIn, blockFileName);
+        } catch (ChecksumException e) {
+          // Exception message is bogus since this wasn't caused by a file read
+          LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " +
+                   "checksum verification failed.");
+          return;
+        } catch (IOException e) {
+          LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e);
+          return;
+        }
+        synchronized (FsDatasetCache.this) {
+          Value value = mappableBlockMap.get(key);
+          Preconditions.checkNotNull(value);
+          Preconditions.checkState(value.state == State.CACHING ||
+                                   value.state == State.CACHING_CANCELLED);
+          if (value.state == State.CACHING_CANCELLED) {
+            mappableBlockMap.remove(key);
+            LOG.warn("Caching of block " + key.id + " in " + key.bpid +
+                " was cancelled.");
+            return;
+          }
+          mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED));
+        }
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
+              ".  We are now caching " + newUsedBytes + " bytes in total.");
+        }
         success = true;
-      } catch (ChecksumException e) {
-        // Exception message is bogus since this wasn't caused by a file read
-        LOG.warn("Failed to cache block " + block.getBlock() + ": Checksum "
-            + "verification failed.");
-      } catch (IOException e) {
-        LOG.warn("Failed to cache block " + block.getBlock() + ": IOException",
-            e);
-      }
-      // If we failed or the block became uncacheable in the meantime,
-      // clean up and return the reserved cache allocation 
-      if (!success || 
-          !dataset.validToCache(block.getBlockPoolId(),
-              block.getBlock().getBlockId())) {
-        block.close();
-        long used = usedBytes.get();
-        while (!usedBytes.compareAndSet(used, used-block.getNumBytes())) {
-          used = usedBytes.get();
+      } finally {
+        if (!success) {
+          newUsedBytes = usedBytesCount.release(length);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Caching of block " + key.id + " in " +
+              key.bpid + " was aborted.  We are now caching only " +
+              newUsedBytes + " + bytes in total.");
+          }
+          IOUtils.closeQuietly(blockIn);
+          IOUtils.closeQuietly(metaIn);
+          if (mappableBlock != null) {
+            mappableBlock.close();
+          }
         }
-      } else {
-        LOG.info("Successfully cached block " + block.getBlock());
-        cachedBlocks.put(block.getBlock().getBlockId(), block);
+      }
+    }
+  }
+
+  private class UncachingTask implements Runnable {
+    private final Key key; 
+
+    UncachingTask(Key key) {
+      this.key = key;
+    }
+
+    @Override
+    public void run() {
+      Value value;
+      
+      synchronized (FsDatasetCache.this) {
+        value = mappableBlockMap.get(key);
+      }
+      Preconditions.checkNotNull(value);
+      Preconditions.checkArgument(value.state == State.UNCACHING);
+      // TODO: we will eventually need to do revocation here if any clients
+      // are reading via mmap with checksums enabled.  See HDFS-5182.
+      IOUtils.closeQuietly(value.mappableBlock);
+      synchronized (FsDatasetCache.this) {
+        mappableBlockMap.remove(key);
+      }
+      long newUsedBytes =
+          usedBytesCount.release(value.mappableBlock.getLength());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
+            " completed.  usedBytes = " + newUsedBytes);
       }
     }
   }
 
   // Stats related methods for FsDatasetMBean
 
+  /**
+   * Get the approximate amount of cache space used.
+   */
   public long getDnCacheUsed() {
-    return usedBytes.get();
+    return usedBytesCount.get();
   }
 
+  /**
+   * Get the maximum amount of bytes we can cache.  This is a constant.
+   */
   public long getDnCacheCapacity() {
     return maxBytes;
   }

+ 38 - 56
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java

@@ -32,12 +32,12 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executor;
 
 import javax.management.NotCompliantMBeanException;
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -599,7 +599,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
   private synchronized ReplicaBeingWritten append(String bpid,
       FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
-    // uncache the block
+    // If the block is cached, start uncaching it.
     cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
     // unlink the finalized replica
     replicaInfo.unlinkBlock(1);
@@ -1244,10 +1244,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
         volumeMap.remove(bpid, invalidBlks[i]);
         perVolumeReplicaMap.get(v.getStorageID()).remove(bpid, invalidBlks[i]);
       }
-
-      // Uncache the block synchronously
+      // If the block is cached, start uncaching it.
       cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
-      // Delete the block asynchronously to make sure we can do it fast enough
+      // Delete the block asynchronously to make sure we can do it fast enough.
+      // It's ok to unlink the block file before the uncache operation
+      // finishes.
       asyncDiskService.deleteAsync(v, f,
           FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
           new ExtendedBlock(bpid, invalidBlks[i]));
@@ -1257,66 +1258,47 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
     }
   }
 
-  synchronized boolean validToCache(String bpid, long blockId) {
-    ReplicaInfo info = volumeMap.get(bpid, blockId);
-    if (info == null) {
-      LOG.warn("Failed to cache replica in block pool " + bpid +
-          " with block id " + blockId + ": ReplicaInfo not found.");
-      return false;
-    }
-    FsVolumeImpl volume = (FsVolumeImpl)info.getVolume();
-    if (volume == null) {
-      LOG.warn("Failed to cache block with id " + blockId +
-          ": Volume not found.");
-      return false;
-    }
-    if (info.getState() != ReplicaState.FINALIZED) {
-      LOG.warn("Failed to block with id " + blockId + 
-          ": Replica is not finalized.");
-      return false;
-    }
-    return true;
-  }
-
   /**
    * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
    */
   private void cacheBlock(String bpid, long blockId) {
-    ReplicaInfo info;
     FsVolumeImpl volume;
+    String blockFileName;
+    long length, genstamp;
+    Executor volumeExecutor;
+
     synchronized (this) {
-      if (!validToCache(bpid, blockId)) {
+      ReplicaInfo info = volumeMap.get(bpid, blockId);
+      if (info == null) {
+        LOG.warn("Failed to cache block with id " + blockId + ", pool " +
+            bpid + ": ReplicaInfo not found.");
         return;
       }
-      info = volumeMap.get(bpid, blockId);
-      volume = (FsVolumeImpl)info.getVolume();
-    }
-    // Try to open block and meta streams
-    FileInputStream blockIn = null;
-    FileInputStream metaIn = null;
-    boolean success = false;
-    ExtendedBlock extBlk =
-        new ExtendedBlock(bpid, blockId,
-            info.getBytesOnDisk(), info.getGenerationStamp());
-    try {
-      blockIn = (FileInputStream)getBlockInputStream(extBlk, 0);
-      metaIn = (FileInputStream)getMetaDataInputStream(extBlk)
-          .getWrappedStream();
-      success = true;
-    } catch (ClassCastException e) {
-      LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks"
-          + " are not backed by files.", e);
-    } catch (IOException e) {
-      LOG.warn("Failed to cache replica " + extBlk + ": IOException while"
-          + " trying to open block or meta files.", e);
-    }
-    if (!success) {
-      IOUtils.closeQuietly(blockIn);
-      IOUtils.closeQuietly(metaIn);
-      return;
+      if (info.getState() != ReplicaState.FINALIZED) {
+        LOG.warn("Failed to cache block with id " + blockId + ", pool " +
+            bpid + ": replica is not finalized; it is in state " +
+            info.getState());
+        return;
+      }
+      try {
+        volume = (FsVolumeImpl)info.getVolume();
+        if (volume == null) {
+          LOG.warn("Failed to cache block with id " + blockId + ", pool " +
+              bpid + ": volume not found.");
+          return;
+        }
+      } catch (ClassCastException e) {
+        LOG.warn("Failed to cache block with id " + blockId +
+            ": volume was not an instance of FsVolumeImpl.");
+        return;
+      }
+      blockFileName = info.getBlockFile().getAbsolutePath();
+      length = info.getVisibleLength();
+      genstamp = info.getGenerationStamp();
+      volumeExecutor = volume.getCacheExecutor();
     }
-    cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(),
-        volume, blockIn, metaIn);
+    cacheManager.cacheBlock(blockId, bpid, 
+        blockFileName, length, genstamp, volumeExecutor);
   }
 
   @Override // FsDatasetSpi

+ 1 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -199,7 +198,7 @@ class FsVolumeImpl implements FsVolumeSpi {
     return getBlockPoolSlice(bpid).addBlock(b, f);
   }
 
-  Executor getExecutor() {
+  Executor getCacheExecutor() {
     return cacheExecutor;
   }
 

+ 88 - 149
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java

@@ -28,184 +28,104 @@ import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileChannel.MapMode;
 
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.ChecksumException;
-import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
- * Low-level wrapper for a Block and its backing files that provides mmap,
- * mlock, and checksum verification operations.
- * 
- * This could be a private class of FsDatasetCache, not meant for other users.
+ * Represents an HDFS block that is mmapped by the DataNode.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-class MappableBlock implements Closeable {
-
-  private final String bpid;
-  private final Block block;
-  private final FsVolumeImpl volume;
-
-  private final FileInputStream blockIn;
-  private final FileInputStream metaIn;
-  private final FileChannel blockChannel;
-  private final FileChannel metaChannel;
-  private final long blockSize;
-
-  private boolean isMapped;
-  private boolean isLocked;
-  private boolean isChecksummed;
-
-  private MappedByteBuffer blockMapped = null;
-
-  public MappableBlock(String bpid, Block blk, FsVolumeImpl volume,
-      FileInputStream blockIn, FileInputStream metaIn) throws IOException {
-    this.bpid = bpid;
-    this.block = blk;
-    this.volume = volume;
-
-    this.blockIn = blockIn;
-    this.metaIn = metaIn;
-    this.blockChannel = blockIn.getChannel();
-    this.metaChannel = metaIn.getChannel();
-    this.blockSize = blockChannel.size();
-
-    this.isMapped = false;
-    this.isLocked = false;
-    this.isChecksummed = false;
-  }
-
-  public String getBlockPoolId() {
-    return bpid;
-  }
-
-  public Block getBlock() {
-    return block;
-  }
-
-  public FsVolumeImpl getVolume() {
-    return volume;
-  }
-
-  public boolean isMapped() {
-    return isMapped;
-  }
-
-  public boolean isLocked() {
-    return isLocked;
-  }
-
-  public boolean isChecksummed() {
-    return isChecksummed;
+public class MappableBlock implements Closeable {
+  public static interface Mlocker {
+    void mlock(MappedByteBuffer mmap, long length) throws IOException;
   }
-
-  /**
-   * Returns the number of bytes on disk for the block file
-   */
-  public long getNumBytes() {
-    return blockSize;
-  }
-
-  /**
-   * Maps the block into memory. See mmap(2).
-   */
-  public void map() throws IOException {
-    if (isMapped) {
-      return;
+  
+  private static class PosixMlocker implements Mlocker {
+    public void mlock(MappedByteBuffer mmap, long length)
+        throws IOException {
+      NativeIO.POSIX.mlock(mmap, length);
     }
-    blockMapped = blockChannel.map(MapMode.READ_ONLY, 0, blockSize);
-    isMapped = true;
   }
 
-  /**
-   * Unmaps the block from memory. See munmap(2).
-   */
-  public void unmap() {
-    if (!isMapped) {
-      return;
-    }
-    if (blockMapped instanceof sun.nio.ch.DirectBuffer) {
-      sun.misc.Cleaner cleaner =
-          ((sun.nio.ch.DirectBuffer)blockMapped).cleaner();
-      cleaner.clean();
-    }
-    isMapped = false;
-    isLocked = false;
-    isChecksummed = false;
-  }
+  @VisibleForTesting
+  public static Mlocker mlocker = new PosixMlocker();
 
-  /**
-   * Locks the block into memory. This prevents the block from being paged out.
-   * See mlock(2).
-   */
-  public void lock() throws IOException {
-    Preconditions.checkArgument(isMapped,
-        "Block must be mapped before it can be locked!");
-    if (isLocked) {
-      return;
-    }
-    NativeIO.POSIX.mlock(blockMapped, blockSize);
-    isLocked = true;
+  private MappedByteBuffer mmap;
+  private final long length;
+
+  MappableBlock(MappedByteBuffer mmap, long length) {
+    this.mmap = mmap;
+    this.length = length;
+    assert length > 0;
   }
 
-  /**
-   * Unlocks the block from memory, allowing it to be paged out. See munlock(2).
-   */
-  public void unlock() throws IOException {
-    if (!isLocked || !isMapped) {
-      return;
-    }
-    NativeIO.POSIX.munlock(blockMapped, blockSize);
-    isLocked = false;
-    isChecksummed = false;
+  public long getLength() {
+    return length;
   }
 
   /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   * Load the block.
+   *
+   * mmap and mlock the block, and then verify its checksum.
+   *
+   * @param length         The current length of the block.
+   * @param blockIn        The block input stream.  Should be positioned at the
+   *                       start.  The caller must close this.
+   * @param metaIn         The meta file input stream.  Should be positioned at
+   *                       the start.  The caller must close this.
+   * @param blockFileName  The block file name, for logging purposes.
+   *
+   * @return               The Mappable block.
    */
-  private int fillBuffer(FileChannel channel, ByteBuffer buf)
-      throws IOException {
-    int bytesRead = channel.read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = channel.read(buf);
-      if (n < 0) {
-        //EOF
-        return bytesRead;
+  public static MappableBlock load(long length,
+      FileInputStream blockIn, FileInputStream metaIn,
+      String blockFileName) throws IOException {
+    MappableBlock mappableBlock = null;
+    MappedByteBuffer mmap = null;
+    try {
+      FileChannel blockChannel = blockIn.getChannel();
+      if (blockChannel == null) {
+        throw new IOException("Block InputStream has no FileChannel.");
+      }
+      mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
+      mlocker.mlock(mmap, length);
+      verifyChecksum(length, metaIn, blockChannel, blockFileName);
+      mappableBlock = new MappableBlock(mmap, length);
+    } finally {
+      if (mappableBlock == null) {
+        if (mmap != null) {
+          NativeIO.POSIX.munmap(mmap); // unmapping also unlocks
+        }
       }
-      bytesRead += n;
     }
-    return bytesRead;
+    return mappableBlock;
   }
 
   /**
    * Verifies the block's checksum. This is an I/O intensive operation.
    * @return if the block was successfully checksummed.
    */
-  public void verifyChecksum() throws IOException, ChecksumException {
-    Preconditions.checkArgument(isLocked && isMapped,
-        "Block must be mapped and locked before checksum verification!");
-    // skip if checksum has already been successfully verified
-    if (isChecksummed) {
-      return;
-    }
+  private static void verifyChecksum(long length,
+      FileInputStream metaIn, FileChannel blockChannel, String blockFileName)
+          throws IOException, ChecksumException {
     // Verify the checksum from the block's meta file
     // Get the DataChecksum from the meta file header
-    metaChannel.position(0);
     BlockMetadataHeader header =
         BlockMetadataHeader.readHeader(new DataInputStream(
             new BufferedInputStream(metaIn, BlockMetadataHeader
                 .getHeaderSize())));
+    FileChannel metaChannel = metaIn.getChannel();
+    if (metaChannel == null) {
+      throw new IOException("Block InputStream meta file has no FileChannel.");
+    }
     DataChecksum checksum = header.getChecksum();
     final int bytesPerChecksum = checksum.getBytesPerChecksum();
     final int checksumSize = checksum.getChecksumSize();
@@ -214,13 +134,13 @@ class MappableBlock implements Closeable {
     ByteBuffer checksumBuf = ByteBuffer.allocate(numChunks*checksumSize);
     // Verify the checksum
     int bytesVerified = 0;
-    while (bytesVerified < blockChannel.size()) {
+    while (bytesVerified < length) {
       Preconditions.checkState(bytesVerified % bytesPerChecksum == 0,
           "Unexpected partial chunk before EOF");
       assert bytesVerified % bytesPerChecksum == 0;
       int bytesRead = fillBuffer(blockChannel, blockBuf);
       if (bytesRead == -1) {
-        throw new IOException("Premature EOF");
+        throw new IOException("checksum verification failed: premature EOF");
       }
       blockBuf.flip();
       // Number of read chunks, including partial chunk at end
@@ -228,22 +148,41 @@ class MappableBlock implements Closeable {
       checksumBuf.limit(chunks*checksumSize);
       fillBuffer(metaChannel, checksumBuf);
       checksumBuf.flip();
-      checksum.verifyChunkedSums(blockBuf, checksumBuf, block.getBlockName(),
+      checksum.verifyChunkedSums(blockBuf, checksumBuf, blockFileName,
           bytesVerified);
       // Success
       bytesVerified += bytesRead;
       blockBuf.clear();
       checksumBuf.clear();
     }
-    isChecksummed = true;
-    // Can close the backing file since everything is safely in memory
-    blockChannel.close();
+  }
+
+  /**
+   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   */
+  private static int fillBuffer(FileChannel channel, ByteBuffer buf)
+      throws IOException {
+    int bytesRead = channel.read(buf);
+    if (bytesRead < 0) {
+      //EOF
+      return bytesRead;
+    }
+    while (buf.remaining() > 0) {
+      int n = channel.read(buf);
+      if (n < 0) {
+        //EOF
+        return bytesRead;
+      }
+      bytesRead += n;
+    }
+    return bytesRead;
   }
 
   @Override
   public void close() {
-    unmap();
-    IOUtils.closeQuietly(blockIn);
-    IOUtils.closeQuietly(metaIn);
+    if (mmap != null) {
+      NativeIO.POSIX.munmap(mmap);
+      mmap = null;
+    }
   }
 }

+ 8 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java

@@ -646,16 +646,14 @@ public abstract class INodeReference extends INode {
           FileWithSnapshot sfile = (FileWithSnapshot) referred;
           // make sure we mark the file as deleted
           sfile.deleteCurrentFile();
-          if (snapshot != null) {
-            try {
-              // when calling cleanSubtree of the referred node, since we 
-              // compute quota usage updates before calling this destroy 
-              // function, we use true for countDiffChange
-              referred.cleanSubtree(snapshot, prior, collectedBlocks,
-                  removedINodes, true);
-            } catch (QuotaExceededException e) {
-              LOG.error("should not exceed quota while snapshot deletion", e);
-            }
+          try {
+            // when calling cleanSubtree of the referred node, since we 
+            // compute quota usage updates before calling this destroy 
+            // function, we use true for countDiffChange
+            referred.cleanSubtree(snapshot, prior, collectedBlocks,
+                removedINodes, true);
+          } catch (QuotaExceededException e) {
+            LOG.error("should not exceed quota while snapshot deletion", e);
           }
         } else if (referred instanceof INodeDirectoryWithSnapshot) {
           // similarly, if referred is a directory, it must be an

+ 22 - 15
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java

@@ -716,14 +716,8 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
           List<INode> cList = priorDiff.diff.getList(ListType.CREATED);
           List<INode> dList = priorDiff.diff.getList(ListType.DELETED);
-          priorCreated = new HashMap<INode, INode>(cList.size());
-          for (INode cNode : cList) {
-            priorCreated.put(cNode, cNode);
-          }
-          priorDeleted = new HashMap<INode, INode>(dList.size());
-          for (INode dNode : dList) {
-            priorDeleted.put(dNode, dNode);
-          }
+          priorCreated = cloneDiffList(cList);
+          priorDeleted = cloneDiffList(dList);
         }
       }
       
@@ -896,6 +890,17 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
     counts.add(Content.DIRECTORY, diffs.asList().size());
   }
   
+  private static Map<INode, INode> cloneDiffList(List<INode> diffList) {
+    if (diffList == null || diffList.size() == 0) {
+      return null;
+    }
+    Map<INode, INode> map = new HashMap<INode, INode>(diffList.size());
+    for (INode node : diffList) {
+      map.put(node, node);
+    }
+    return map;
+  }
+  
   /**
    * Destroy a subtree under a DstReference node.
    */
@@ -914,26 +919,28 @@ public class INodeDirectoryWithSnapshot extends INodeDirectoryWithQuota {
         destroyDstSubtree(inode.asReference().getReferredINode(), snapshot,
             prior, collectedBlocks, removedINodes);
       }
-    } else if (inode.isFile() && snapshot != null) {
+    } else if (inode.isFile()) {
       inode.cleanSubtree(snapshot, prior, collectedBlocks, removedINodes, true);
     } else if (inode.isDirectory()) {
       Map<INode, INode> excludedNodes = null;
       if (inode instanceof INodeDirectoryWithSnapshot) {
         INodeDirectoryWithSnapshot sdir = (INodeDirectoryWithSnapshot) inode;
+        
         DirectoryDiffList diffList = sdir.getDiffs();
+        DirectoryDiff priorDiff = diffList.getDiff(prior);
+        if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
+          List<INode> dList = priorDiff.diff.getList(ListType.DELETED);
+          excludedNodes = cloneDiffList(dList);
+        }
+        
         if (snapshot != null) {
           diffList.deleteSnapshotDiff(snapshot, prior, sdir, collectedBlocks,
               removedINodes, true);
         }
-        DirectoryDiff priorDiff = diffList.getDiff(prior);
+        priorDiff = diffList.getDiff(prior);
         if (priorDiff != null && priorDiff.getSnapshot().equals(prior)) {
           priorDiff.diff.destroyCreatedList(sdir, collectedBlocks,
               removedINodes);
-          List<INode> dList = priorDiff.diff.getList(ListType.DELETED);
-          excludedNodes = new HashMap<INode, INode>(dList.size());
-          for (INode dNode : dList) {
-            excludedNodes.put(dNode, dNode);
-          }
         }
       }
       for (INode child : inode.asDirectory().getChildrenList(prior)) {

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileUnderConstructionWithSnapshot.java

@@ -109,8 +109,10 @@ public class INodeFileUnderConstructionWithSnapshot
       final List<INode> removedINodes, final boolean countDiffChange) 
       throws QuotaExceededException {
     if (snapshot == null) { // delete the current file
-      recordModification(prior, null);
-      isCurrentFileDeleted = true;
+      if (!isCurrentFileDeleted()) {
+        recordModification(prior, null);
+        deleteCurrentFile();
+      }
       Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
       return Quota.Counts.newInstance();
     } else { // delete a snapshot

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeFileWithSnapshot.java

@@ -96,8 +96,10 @@ public class INodeFileWithSnapshot extends INodeFile
       final List<INode> removedINodes, final boolean countDiffChange) 
       throws QuotaExceededException {
     if (snapshot == null) { // delete the current file
-      recordModification(prior, null);
-      isCurrentFileDeleted = true;
+      if (!isCurrentFileDeleted()) {
+        recordModification(prior, null);
+        deleteCurrentFile();
+      }
       Util.collectBlocksAndClear(this, collectedBlocks, removedINodes);
       return Quota.Counts.newInstance();
     } else { // delete a snapshot

+ 0 - 13
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java

@@ -683,19 +683,6 @@ public class WebHdfsFileSystem extends FileSystem
     }
   }
 
-  @VisibleForTesting
-  final class ConnRunner extends AbstractRunner {
-    protected ConnRunner(final HttpOpParam.Op op, HttpURLConnection conn) {
-      super(op, false);
-      this.conn = conn;
-    }
-
-    @Override
-    protected URL getUrl() {
-      return null;
-    }
-  }
-
   private FsPermission applyUMask(FsPermission permission) {
     if (permission == null) {
       permission = FsPermission.getDefault();

+ 7 - 54
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java

@@ -23,20 +23,14 @@ package org.apache.hadoop.hdfs.security;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.NetworkInterface;
-import java.net.URL;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Enumeration;
-import java.util.Map;
-
-import javax.servlet.http.HttpServletResponse;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -47,23 +41,17 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
-import org.apache.hadoop.hdfs.web.resources.DoAsParam;
-import org.apache.hadoop.hdfs.web.resources.ExceptionHandler;
-import org.apache.hadoop.hdfs.web.resources.GetOpParam;
-import org.apache.hadoop.hdfs.web.resources.PostOpParam;
-import org.apache.hadoop.hdfs.web.resources.PutOpParam;
 import org.apache.hadoop.security.TestDoAsEffectiveUser;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.Token;
-import org.apache.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestDelegationTokenForProxyUser {
   private static MiniDFSCluster cluster;
@@ -155,56 +143,26 @@ public class TestDelegationTokenForProxyUser {
     }
   }
   
-  @Test(timeout=20000)
+  @Test(timeout=5000)
   public void testWebHdfsDoAs() throws Exception {
     WebHdfsTestUtil.LOG.info("START: testWebHdfsDoAs()");
-    ((Log4JLogger)NamenodeWebHdfsMethods.LOG).getLogger().setLevel(Level.ALL);
-    ((Log4JLogger)ExceptionHandler.LOG).getLogger().setLevel(Level.ALL);
     WebHdfsTestUtil.LOG.info("ugi.getShortUserName()=" + ugi.getShortUserName());
     final WebHdfsFileSystem webhdfs = WebHdfsTestUtil.getWebHdfsFileSystemAs(ugi, config);
     
     final Path root = new Path("/");
     cluster.getFileSystem().setPermission(root, new FsPermission((short)0777));
 
-    {
-      //test GETHOMEDIRECTORY with doAs
-      final URL url = WebHdfsTestUtil.toUrl(webhdfs,
-          GetOpParam.Op.GETHOMEDIRECTORY,  root, new DoAsParam(PROXY_USER));
-      final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-      final Map<?, ?> m = WebHdfsTestUtil.connectAndGetJson(conn, HttpServletResponse.SC_OK);
-      conn.disconnect();
-  
-      final Object responsePath = m.get(Path.class.getSimpleName());
-      WebHdfsTestUtil.LOG.info("responsePath=" + responsePath);
-      Assert.assertEquals("/user/" + PROXY_USER, responsePath);
-    }
+    Whitebox.setInternalState(webhdfs, "ugi", proxyUgi);
 
     {
-      //test GETHOMEDIRECTORY with DOas
-      final URL url = WebHdfsTestUtil.toUrl(webhdfs,
-          GetOpParam.Op.GETHOMEDIRECTORY,  root, new DoAsParam(PROXY_USER) {
-            @Override
-            public String getName() {
-              return "DOas";
-            }
-      });
-      final HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-      final Map<?, ?> m = WebHdfsTestUtil.connectAndGetJson(conn, HttpServletResponse.SC_OK);
-      conn.disconnect();
-  
-      final Object responsePath = m.get(Path.class.getSimpleName());
+      Path responsePath = webhdfs.getHomeDirectory();
       WebHdfsTestUtil.LOG.info("responsePath=" + responsePath);
-      Assert.assertEquals("/user/" + PROXY_USER, responsePath);
+      Assert.assertEquals(webhdfs.getUri() + "/user/" + PROXY_USER, responsePath.toString());
     }
 
     final Path f = new Path("/testWebHdfsDoAs/a.txt");
     {
-      //test create file with doAs
-      final PutOpParam.Op op = PutOpParam.Op.CREATE;
-      final URL url = WebHdfsTestUtil.toUrl(webhdfs, op,  f, new DoAsParam(PROXY_USER));
-      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-      conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
-      final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
+      FSDataOutputStream out = webhdfs.create(f);
       out.write("Hello, webhdfs user!".getBytes());
       out.close();
   
@@ -214,12 +172,7 @@ public class TestDelegationTokenForProxyUser {
     }
 
     {
-      //test append file with doAs
-      final PostOpParam.Op op = PostOpParam.Op.APPEND;
-      final URL url = WebHdfsTestUtil.toUrl(webhdfs, op,  f, new DoAsParam(PROXY_USER));
-      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
-      conn = WebHdfsTestUtil.twoStepWrite(webhdfs, op, conn);
-      final FSDataOutputStream out = WebHdfsTestUtil.write(webhdfs, op, conn, 4096);
+      final FSDataOutputStream out = webhdfs.append(f);
       out.write("\nHello again!".getBytes());
       out.close();
   

+ 117 - 29
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -26,8 +26,11 @@ import static org.mockito.Mockito.doReturn;
 
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.HdfsBlockLocation;
@@ -42,6 +45,8 @@ import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
 import org.apache.hadoop.hdfs.server.namenode.FSImage;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -52,12 +57,18 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
 import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.log4j.Logger;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.base.Preconditions;
+import com.google.common.base.Supplier;
+
 public class TestFsDatasetCache {
+  private static final Log LOG = LogFactory.getLog(TestFsDatasetCache.class);
 
   // Most Linux installs allow a default of 64KB locked memory
   private static final long CACHE_CAPACITY = 64 * 1024;
@@ -71,12 +82,14 @@ public class TestFsDatasetCache {
   private static DataNode dn;
   private static FsDatasetSpi<?> fsd;
   private static DatanodeProtocolClientSideTranslatorPB spyNN;
+  private static PageRounder rounder = new PageRounder();
 
   @Before
   public void setUp() throws Exception {
     assumeTrue(!Path.WINDOWS);
-    assumeTrue(NativeIO.isAvailable());
+    assumeTrue(NativeIO.getMemlockLimit() >= CACHE_CAPACITY);
     conf = new HdfsConfiguration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         CACHE_CAPACITY);
@@ -169,19 +182,34 @@ public class TestFsDatasetCache {
    * Blocks until cache usage hits the expected new value.
    */
   private long verifyExpectedCacheUsage(final long expected) throws Exception {
-    long cacheUsed = fsd.getDnCacheUsed();
-    while (cacheUsed != expected) {
-      cacheUsed = fsd.getDnCacheUsed();
-      Thread.sleep(100);
-    }
-    assertEquals("Unexpected amount of cache used", expected, cacheUsed);
-    return cacheUsed;
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      private int tries = 0;
+      
+      @Override
+      public Boolean get() {
+        long curDnCacheUsed = fsd.getDnCacheUsed();
+        if (curDnCacheUsed != expected) {
+          if (tries++ > 10) {
+            LOG.info("verifyExpectedCacheUsage: expected " +
+                expected + ", got " + curDnCacheUsed + "; " +
+                "memlock limit = " + NativeIO.getMemlockLimit() +
+                ".  Waiting...");
+          }
+          return false;
+        }
+        return true;
+      }
+    }, 100, 60000);
+    return expected;
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=600000)
   public void testCacheAndUncacheBlock() throws Exception {
+    LOG.info("beginning testCacheAndUncacheBlock");
     final int NUM_BLOCKS = 5;
 
+    verifyExpectedCacheUsage(0);
+
     // Write a test file
     final Path testFile = new Path("/testCacheBlock");
     final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
@@ -211,15 +239,23 @@ public class TestFsDatasetCache {
       setHeartbeatResponse(uncacheBlock(locs[i]));
       current = verifyExpectedCacheUsage(current - blockSizes[i]);
     }
+    LOG.info("finishing testCacheAndUncacheBlock");
   }
 
-  @Test(timeout=60000)
+  @Test(timeout=600000)
   public void testFilesExceedMaxLockedMemory() throws Exception {
+    LOG.info("beginning testFilesExceedMaxLockedMemory");
+
+    // We don't want to deal with page rounding issues, so skip this
+    // test if page size is weird
+    long osPageSize = NativeIO.getOperatingSystemPageSize();
+    assumeTrue(osPageSize == 4096); 
+
     // Create some test files that will exceed total cache capacity
-    // Don't forget that meta files take up space too!
-    final int numFiles = 4;
-    final long fileSize = CACHE_CAPACITY / numFiles;
-    final Path[] testFiles = new Path[4];
+    final int numFiles = 5;
+    final long fileSize = 15000;
+
+    final Path[] testFiles = new Path[numFiles];
     final HdfsBlockLocation[][] fileLocs = new HdfsBlockLocation[numFiles][];
     final long[] fileSizes = new long[numFiles];
     for (int i=0; i<numFiles; i++) {
@@ -235,35 +271,87 @@ public class TestFsDatasetCache {
     }
 
     // Cache the first n-1 files
-    long current = 0;
+    long total = 0;
+    verifyExpectedCacheUsage(0);
     for (int i=0; i<numFiles-1; i++) {
       setHeartbeatResponse(cacheBlocks(fileLocs[i]));
-      current = verifyExpectedCacheUsage(current + fileSizes[i]);
+      total = verifyExpectedCacheUsage(rounder.round(total + fileSizes[i]));
     }
-    final long oldCurrent = current;
 
     // nth file should hit a capacity exception
     final LogVerificationAppender appender = new LogVerificationAppender();
     final Logger logger = Logger.getRootLogger();
     logger.addAppender(appender);
     setHeartbeatResponse(cacheBlocks(fileLocs[numFiles-1]));
-    int lines = 0;
-    while (lines == 0) {
-      Thread.sleep(100);
-      lines = appender.countLinesWithMessage(
-          DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + " exceeded");
-    }
 
-    // Uncache the cached part of the nth file
-    setHeartbeatResponse(uncacheBlocks(fileLocs[numFiles-1]));
-    while (fsd.getDnCacheUsed() != oldCurrent) {
-      Thread.sleep(100);
-    }
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        int lines = appender.countLinesWithMessage(
+            "more bytes in the cache: " +
+            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY);
+        return lines > 0;
+      }
+    }, 500, 30000);
 
     // Uncache the n-1 files
     for (int i=0; i<numFiles-1; i++) {
       setHeartbeatResponse(uncacheBlocks(fileLocs[i]));
-      current = verifyExpectedCacheUsage(current - fileSizes[i]);
+      total -= rounder.round(fileSizes[i]);
+      verifyExpectedCacheUsage(total);
+    }
+    LOG.info("finishing testFilesExceedMaxLockedMemory");
+  }
+
+  @Test(timeout=600000)
+  public void testUncachingBlocksBeforeCachingFinishes() throws Exception {
+    LOG.info("beginning testUncachingBlocksBeforeCachingFinishes");
+    final int NUM_BLOCKS = 5;
+
+    verifyExpectedCacheUsage(0);
+
+    // Write a test file
+    final Path testFile = new Path("/testCacheBlock");
+    final long testFileLen = BLOCK_SIZE*NUM_BLOCKS;
+    DFSTestUtil.createFile(fs, testFile, testFileLen, (short)1, 0xABBAl);
+
+    // Get the details of the written file
+    HdfsBlockLocation[] locs =
+        (HdfsBlockLocation[])fs.getFileBlockLocations(testFile, 0, testFileLen);
+    assertEquals("Unexpected number of blocks", NUM_BLOCKS, locs.length);
+    final long[] blockSizes = getBlockSizes(locs);
+
+    // Check initial state
+    final long cacheCapacity = fsd.getDnCacheCapacity();
+    long cacheUsed = fsd.getDnCacheUsed();
+    long current = 0;
+    assertEquals("Unexpected cache capacity", CACHE_CAPACITY, cacheCapacity);
+    assertEquals("Unexpected amount of cache used", current, cacheUsed);
+
+    MappableBlock.mlocker = new MappableBlock.Mlocker() {
+      @Override
+      public void mlock(MappedByteBuffer mmap, long length) throws IOException {
+        LOG.info("An mlock operation is starting.");
+        try {
+          Thread.sleep(3000);
+        } catch (InterruptedException e) {
+          Assert.fail();
+        }
+      }
+    };
+    // Starting caching each block in succession.  The usedBytes amount
+    // should increase, even though caching doesn't complete on any of them.
+    for (int i=0; i<NUM_BLOCKS; i++) {
+      setHeartbeatResponse(cacheBlock(locs[i]));
+      current = verifyExpectedCacheUsage(current + blockSizes[i]);
     }
+    
+    setHeartbeatResponse(new DatanodeCommand[] {
+      getResponse(locs, DatanodeProtocol.DNA_UNCACHE)
+    });
+
+    // wait until all caching jobs are finished cancelling.
+    current = verifyExpectedCacheUsage(0);
+    LOG.info("finishing testUncachingBlocksBeforeCachingFinishes");
   }
 }

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestRenameWithSnapshots.java

@@ -2243,4 +2243,50 @@ public class TestRenameWithSnapshots {
     
     restartClusterAndCheckImage(true);
   }
+  
+  /**
+   * Make sure we clean the whole subtree under a DstReference node after 
+   * deleting a snapshot.
+   * see HDFS-5476.
+   */
+  @Test
+  public void testCleanDstReference() throws Exception {
+    final Path test = new Path("/test");
+    final Path foo = new Path(test, "foo");
+    final Path bar = new Path(foo, "bar");
+    hdfs.mkdirs(bar);
+    SnapshotTestHelper.createSnapshot(hdfs, test, "s0");
+    
+    // create file after s0 so that the file should not be included in s0
+    final Path fileInBar = new Path(bar, "file");
+    DFSTestUtil.createFile(hdfs, fileInBar, BLOCKSIZE, REPL, SEED);
+    // rename foo --> foo2
+    final Path foo2 = new Path(test, "foo2");
+    hdfs.rename(foo, foo2);
+    // create snapshot s1, note the file is included in s1
+    hdfs.createSnapshot(test, "s1");
+    // delete bar and foo2
+    hdfs.delete(new Path(foo2, "bar"), true);
+    hdfs.delete(foo2, true);
+    
+    final Path sfileInBar = SnapshotTestHelper.getSnapshotPath(test, "s1",
+        "foo2/bar/file");
+    assertTrue(hdfs.exists(sfileInBar));
+    
+    hdfs.deleteSnapshot(test, "s1");
+    assertFalse(hdfs.exists(sfileInBar));
+    
+    restartClusterAndCheckImage(true);
+    // make sure the file under bar is deleted 
+    final Path barInS0 = SnapshotTestHelper.getSnapshotPath(test, "s0",
+        "foo/bar");
+    INodeDirectoryWithSnapshot barNode = (INodeDirectoryWithSnapshot) fsdir
+        .getINode(barInS0.toString());
+    assertEquals(0, barNode.getChildrenList(null).size());
+    List<DirectoryDiff> diffList = barNode.getDiffs().asList();
+    assertEquals(1, diffList.size());
+    DirectoryDiff diff = diffList.get(0);
+    assertEquals(0, diff.getChildrenDiff().getList(ListType.DELETED).size());
+    assertEquals(0, diff.getChildrenDiff().getList(ListType.CREATED).size());
+  }
 }

+ 45 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java

@@ -347,4 +347,49 @@ public class TestSnapshotBlocksMap {
     assertEquals(1, blks.length);
     assertEquals(BLOCKSIZE, blks[0].getNumBytes());
   }
+  
+  /**
+   * 1. rename under-construction file with 0-sized blocks after snapshot.
+   * 2. delete the renamed directory.
+   * make sure we delete the 0-sized block.
+   * see HDFS-5476.
+   */
+  @Test
+  public void testDeletionWithZeroSizeBlock3() throws Exception {
+    final Path foo = new Path("/foo");
+    final Path subDir = new Path(foo, "sub");
+    final Path bar = new Path(subDir, "bar");
+    DFSTestUtil.createFile(hdfs, bar, BLOCKSIZE, REPLICATION, 0L);
+
+    hdfs.append(bar);
+
+    INodeFile barNode = fsdir.getINode4Write(bar.toString()).asFile();
+    BlockInfo[] blks = barNode.getBlocks();
+    assertEquals(1, blks.length);
+    ExtendedBlock previous = new ExtendedBlock(fsn.getBlockPoolId(), blks[0]);
+    cluster.getNameNodeRpc()
+        .addBlock(bar.toString(), hdfs.getClient().getClientName(), previous,
+            null, barNode.getId(), null);
+
+    SnapshotTestHelper.createSnapshot(hdfs, foo, "s1");
+
+    // rename bar
+    final Path bar2 = new Path(subDir, "bar2");
+    hdfs.rename(bar, bar2);
+    
+    INodeFile bar2Node = fsdir.getINode4Write(bar2.toString()).asFile();
+    blks = bar2Node.getBlocks();
+    assertEquals(2, blks.length);
+    assertEquals(BLOCKSIZE, blks[0].getNumBytes());
+    assertEquals(0, blks[1].getNumBytes());
+
+    // delete subDir
+    hdfs.delete(subDir, true);
+    
+    final Path sbar = SnapshotTestHelper.getSnapshotPath(foo, "s1", "sub/bar");
+    barNode = fsdir.getINode(sbar.toString()).asFile();
+    blks = barNode.getBlocks();
+    assertEquals(1, blks.length);
+    assertEquals(BLOCKSIZE, blks[0].getNumBytes());
+  }
 }

+ 0 - 5
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java

@@ -78,11 +78,6 @@ public class WebHdfsTestUtil {
     Assert.assertEquals(expectedResponseCode, conn.getResponseCode());
     return WebHdfsFileSystem.jsonParse(conn, false);
   }
-  
-  public static HttpURLConnection twoStepWrite(final WebHdfsFileSystem webhdfs,
-      final HttpOpParam.Op op, HttpURLConnection conn) throws IOException {
-    return webhdfs.new ConnRunner(op, conn).twoStepWrite();
-  }
 
   public static FSDataOutputStream write(final WebHdfsFileSystem webhdfs,
       final HttpOpParam.Op op, final HttpURLConnection conn,

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -88,6 +88,9 @@ Release 2.3.0 - UNRELEASED
     YARN-1323. Set HTTPS webapp address along with other RPC addresses in HAUtil
     (Karthik Kambatla via Sandy Ryza)
 
+    YARN-1121. Changed ResourceManager's state-store to drain all events on
+    shut-down. (Jian He via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 34 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java

@@ -49,6 +49,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   private final BlockingQueue<Event> eventQueue;
   private volatile boolean stopped = false;
 
+  // Configuration flag for enabling/disabling draining dispatcher's events on
+  // stop functionality.
+  private volatile boolean drainEventsOnStop = false;
+
+  // Indicates all the remaining dispatcher's events on stop have been drained
+  // and processed.
+  private volatile boolean drained = true;
+
+  // For drainEventsOnStop enabled only, block newly coming events into the
+  // queue while stopping.
+  private volatile boolean blockNewEvents = false;
+  private EventHandler handlerInstance = null;
+
   private Thread eventHandlingThread;
   protected final Map<Class<? extends Enum>, EventHandler> eventDispatchers;
   private boolean exitOnDispatchException;
@@ -68,6 +81,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       @Override
       public void run() {
         while (!stopped && !Thread.currentThread().isInterrupted()) {
+          drained = eventQueue.isEmpty();
           Event event;
           try {
             event = eventQueue.take();
@@ -102,8 +116,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
     eventHandlingThread.start();
   }
 
+  public void setDrainEventsOnStop() {
+    drainEventsOnStop = true;
+  }
+
   @Override
   protected void serviceStop() throws Exception {
+    if (drainEventsOnStop) {
+      blockNewEvents = true;
+      LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
+      while(!drained) {
+        Thread.yield();
+      }
+    }
     stopped = true;
     if (eventHandlingThread != null) {
       eventHandlingThread.interrupt();
@@ -173,11 +198,19 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
 
   @Override
   public EventHandler getEventHandler() {
-    return new GenericEventHandler();
+    if (handlerInstance == null) {
+      handlerInstance = new GenericEventHandler();
+    }
+    return handlerInstance;
   }
 
   class GenericEventHandler implements EventHandler<Event> {
     public void handle(Event event) {
+      if (blockNewEvents) {
+        return;
+      }
+      drained = false;
+
       /* all this method does is enqueue all the events onto the queue */
       int qSize = eventQueue.size();
       if (qSize !=0 && qSize %1000 == 0) {

+ 11 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -261,17 +261,20 @@ public abstract class RMStateStore extends AbstractService {
   }
   
   AsyncDispatcher dispatcher;
-  
-  public synchronized void serviceInit(Configuration conf) throws Exception{    
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception{
     // create async handler
     dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.register(RMStateStoreEventType.class, 
                         new ForwardingEventHandler());
+    dispatcher.setDrainEventsOnStop();
     initInternal(conf);
   }
-  
-  protected synchronized void serviceStart() throws Exception {
+
+  @Override
+  protected void serviceStart() throws Exception {
     dispatcher.start();
     startInternal();
   }
@@ -288,11 +291,12 @@ public abstract class RMStateStore extends AbstractService {
    */
   protected abstract void startInternal() throws Exception;
 
-  public synchronized void serviceStop() throws Exception {
+  @Override
+  protected void serviceStop() throws Exception {
     closeInternal();
     dispatcher.stop();
   }
-  
+
   /**
    * Derived classes close themselves using this method.
    * The base class will be closed and the event dispatcher will be shutdown 
@@ -509,8 +513,7 @@ public abstract class RMStateStore extends AbstractService {
   }
 
   // Dispatcher related code
-  
-  private synchronized void handleStoreEvent(RMStateStoreEvent event) {
+  protected void handleStoreEvent(RMStateStoreEvent event) {
     if (event.getType().equals(RMStateStoreEventType.STORE_APP)
         || event.getType().equals(RMStateStoreEventType.UPDATE_APP)) {
       ApplicationState appState = null;

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java

@@ -163,6 +163,14 @@ public class MockRM extends ResourceManager {
   public RMApp submitApp(int masterMemory, String name, String user,
       Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
       int maxAppAttempts, Credentials ts, String appType) throws Exception {
+    return submitApp(masterMemory, name, user, acls, unmanaged, queue,
+      maxAppAttempts, ts, appType, true);
+  }
+
+  public RMApp submitApp(int masterMemory, String name, String user,
+      Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
+      int maxAppAttempts, Credentials ts, String appType,
+      boolean waitForAccepted) throws Exception {
     ApplicationClientProtocol client = getClientRMService();
     GetNewApplicationResponse resp = client.getNewApplication(Records
         .newRecord(GetNewApplicationRequest.class));
@@ -222,7 +230,9 @@ public class MockRM extends ResourceManager {
     }.setClientReq(client, req);
     fakeUser.doAs(action);
     // make sure app is immediately available after submit
-    waitForState(appId, RMAppState.ACCEPTED);
+    if (waitForAccepted) {
+      waitForState(appId, RMAppState.ACCEPTED);
+    }
     return getRMContext().getRMApps().get(appId);
   }
 

+ 60 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java

@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -1062,6 +1063,65 @@ public class TestRMRestart {
     rm2.stop();
   }
 
+  @Test
+  public void testRMStateStoreDispatcherDrainedOnRMStop() throws Exception {
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      volatile boolean wait = true;
+      @Override
+      public void serviceStop() throws Exception {
+        // Unblock app saving request.
+        wait = false;
+        super.serviceStop();
+      }
+
+      @Override
+      protected void handleStoreEvent(RMStateStoreEvent event) {
+        // Block app saving request.
+        while (wait);
+        super.handleStoreEvent(event);
+      }
+    };
+    memStore.init(conf);
+
+    // start RM
+    final MockRM rm1 = new MockRM(conf, memStore);
+    rm1.start();
+
+    // create apps.
+    final ArrayList<RMApp> appList = new ArrayList<RMApp>();
+    final int NUM_APPS = 5;
+
+    for (int i = 0; i < NUM_APPS; i++) {
+      RMApp app = rm1.submitApp(200, "name", "user",
+            new HashMap<ApplicationAccessType, String>(), false,
+            "default", -1, null, "MAPREDUCE", false);
+      appList.add(app);
+      rm1.waitForState(app.getApplicationId(), RMAppState.NEW_SAVING);
+    }
+    // all apps's saving request are now enqueued to RMStateStore's dispatcher
+    // queue, and will be processed once rm.stop() is called.
+
+    // Nothing exist in state store before stop is called.
+    Map<ApplicationId, ApplicationState> rmAppState =
+        memStore.getState().getApplicationState();
+    Assert.assertTrue(rmAppState.size() == 0);
+
+    // stop rm
+    rm1.stop();
+
+    // Assert app info is still saved even if stop is called with pending saving
+    // request on dispatcher.
+    for (RMApp app : appList) {
+      ApplicationState appState = rmAppState.get(app.getApplicationId());
+      Assert.assertNotNull(appState);
+      Assert.assertEquals(0, appState.getAttemptCount());
+      Assert.assertEquals(appState.getApplicationSubmissionContext()
+        .getApplicationId(), app.getApplicationSubmissionContext()
+        .getApplicationId());
+    }
+    Assert.assertTrue(rmAppState.size() == NUM_APPS);
+  }
+
   public static class TestSecurityMockRM extends MockRM {
 
     public TestSecurityMockRM(Configuration conf, RMStateStore store) {