Browse Source

HDFS-5053. NameNode should invoke DataNode APIs to coordinate caching. (Andrew Wang)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1523145 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 12 years ago
parent
commit
40eb94ade3
27 changed files with 2079 additions and 244 deletions
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
  2. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
  4. 11 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java
  5. 36 172
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  6. 595 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java
  7. 302 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
  8. 125 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java
  9. 136 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
  10. 13 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  11. 34 41
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
  12. 67 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java
  13. 18 11
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java
  14. 271 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java
  15. 44 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java
  16. 2 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
  17. 14 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
  18. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
  19. 8 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
  20. 16 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
  21. 67 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
  22. 46 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
  23. 53 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  24. 14 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
  25. 8 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
  26. 13 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
  27. 162 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt

@@ -33,6 +33,8 @@ HDFS-4949 (Unreleased)
     HDFS-5158. Add command-line support for manipulating cache directives.
     (Contributed by Colin Patrick McCabe)
 
+    HDFS-5053. NameNode should invoke DataNode APIs to coordinate caching.
+    (Andrew Wang)
 
   OPTIMIZATIONS
 

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -102,6 +102,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
   public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
+  public static final String  DFS_NAMENODE_CACHING_ENABLED_KEY = "dfs.namenode.caching.enabled";
+  public static final boolean DFS_NAMENODE_CACHING_ENABLED_DEFAULT = false;
 
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java

@@ -713,6 +713,12 @@ public class PBHelper {
     case DatanodeProtocol.DNA_SHUTDOWN:
       builder.setAction(BlockCommandProto.Action.SHUTDOWN);
       break;
+    case DatanodeProtocol.DNA_CACHE:
+      builder.setAction(BlockCommandProto.Action.CACHE);
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      builder.setAction(BlockCommandProto.Action.UNCACHE);
+      break;
     default:
       throw new AssertionError("Invalid action");
     }
@@ -765,6 +771,8 @@ public class PBHelper {
       break;
     case DatanodeProtocol.DNA_TRANSFER:
     case DatanodeProtocol.DNA_INVALIDATE:
+    case DatanodeProtocol.DNA_CACHE:
+    case DatanodeProtocol.DNA_UNCACHE:
     case DatanodeProtocol.DNA_SHUTDOWN:
       builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
           PBHelper.convert((BlockCommand) datanodeCommand));
@@ -818,6 +826,14 @@ public class PBHelper {
     case SHUTDOWN:
       action = DatanodeProtocol.DNA_SHUTDOWN;
       break;
+    case CACHE:
+      action = DatanodeProtocol.DNA_CACHE;
+      break;
+    case UNCACHE:
+      action = DatanodeProtocol.DNA_UNCACHE;
+      break;
+    default:
+      throw new AssertionError("Unknown action type: " + blkCmd.getAction());
     }
     return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
   }

+ 11 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockCollection.java

@@ -60,6 +60,17 @@ public interface BlockCollection {
    */
   public short getBlockReplication();
 
+  /**
+   * Set cache replication factor for the collection
+   */
+  public void setCacheReplication(short cacheReplication);
+
+  /**
+   * Get cache replication factor for the collection
+   * @return cache replication value
+   */
+  public short getCacheReplication();
+
   /**
    * Get the name of the collection.
    */

+ 36 - 172
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -77,14 +77,13 @@ import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.Sets;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
  */
 @InterfaceAudience.Private
-public class BlockManager {
+public class BlockManager extends ReportProcessor {
 
   static final Log LOG = LogFactory.getLog(BlockManager.class);
   public static final Log blockLog = NameNode.blockStateChangeLog;
@@ -163,7 +162,7 @@ public class BlockManager {
   final CorruptReplicasMap corruptReplicas = new CorruptReplicasMap();
 
   /** Blocks to be invalidated. */
-  private final InvalidateBlocks invalidateBlocks;
+  private final InvalidateStoredBlocks invalidateBlocks;
   
   /**
    * After a failover, over-replicated blocks may not be handled
@@ -219,7 +218,6 @@ public class BlockManager {
   final boolean encryptDataTransfer;
   
   // Max number of blocks to log info about during a block report.
-  private final long maxNumBlocksToLog;
 
   /**
    * When running inside a Standby node, the node may receive block reports
@@ -237,10 +235,11 @@ public class BlockManager {
   
   public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
       final Configuration conf) throws IOException {
+    super(conf);
     this.namesystem = namesystem;
     datanodeManager = new DatanodeManager(this, namesystem, conf);
     heartbeatManager = datanodeManager.getHeartbeatManager();
-    invalidateBlocks = new InvalidateBlocks(datanodeManager);
+    invalidateBlocks = new InvalidateStoredBlocks(datanodeManager);
 
     // Compute the map capacity by allocating 2% of total memory
     blocksMap = new BlocksMap(DEFAULT_MAP_LOAD_FACTOR);
@@ -300,11 +299,7 @@ public class BlockManager {
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-    
-    this.maxNumBlocksToLog =
-        conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
-            DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
-    
+
     LOG.info("defaultReplication         = " + defaultReplication);
     LOG.info("maxReplication             = " + maxReplication);
     LOG.info("minReplication             = " + minReplication);
@@ -1004,6 +999,7 @@ public class BlockManager {
    * Adds block to list of blocks which will be invalidated on specified
    * datanode and log the operation
    */
+  @Override  // ReportProcessor
   void addToInvalidates(final Block block, final DatanodeInfo datanode) {
     invalidateBlocks.add(block, datanode, true);
   }
@@ -1049,7 +1045,8 @@ public class BlockManager {
     markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
   }
 
-  private void markBlockAsCorrupt(BlockToMarkCorrupt b,
+  @Override // ReportProcessor
+  void markBlockAsCorrupt(BlockToMarkCorrupt b,
                                   DatanodeInfo dn) throws IOException {
     DatanodeDescriptor node = getDatanodeManager().getDatanode(dn);
     if (node == null) {
@@ -1059,7 +1056,7 @@ public class BlockManager {
 
     BlockCollection bc = b.corrupted.getBlockCollection();
     if (bc == null) {
-      blockLog.info("BLOCK markBlockAsCorrupt: " + b
+      blockLogInfo("#markBlockAsCorrupt: " + b
           + " cannot be marked as corrupt as it does not belong to any file");
       addToInvalidates(b.corrupted, node);
       return;
@@ -1123,6 +1120,9 @@ public class BlockManager {
     this.shouldPostponeBlocksFromFuture  = postpone;
   }
 
+  public boolean shouldPostponeBlocksFromFuture() {
+    return this.shouldPostponeBlocksFromFuture;
+  }
 
   private void postponeBlock(Block blk) {
     if (postponedMisreplicatedBlocks.add(blk)) {
@@ -1544,61 +1544,6 @@ public class BlockManager {
        */
     }
   }
-  
-  /**
-   * StatefulBlockInfo is used to build the "toUC" list, which is a list of
-   * updates to the information about under-construction blocks.
-   * Besides the block in question, it provides the ReplicaState
-   * reported by the datanode in the block report. 
-   */
-  private static class StatefulBlockInfo {
-    final BlockInfoUnderConstruction storedBlock;
-    final ReplicaState reportedState;
-    
-    StatefulBlockInfo(BlockInfoUnderConstruction storedBlock, 
-        ReplicaState reportedState) {
-      this.storedBlock = storedBlock;
-      this.reportedState = reportedState;
-    }
-  }
-  
-  /**
-   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
-   * list of blocks that should be considered corrupt due to a block report.
-   */
-  private static class BlockToMarkCorrupt {
-    /** The corrupted block in a datanode. */
-    final BlockInfo corrupted;
-    /** The corresponding block stored in the BlockManager. */
-    final BlockInfo stored;
-    /** The reason to mark corrupt. */
-    final String reason;
-    
-    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
-      Preconditions.checkNotNull(corrupted, "corrupted is null");
-      Preconditions.checkNotNull(stored, "stored is null");
-
-      this.corrupted = corrupted;
-      this.stored = stored;
-      this.reason = reason;
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, String reason) {
-      this(stored, stored, reason);
-    }
-
-    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
-      this(new BlockInfo(stored), stored, reason);
-      //the corrupted block in datanode has a different generation stamp
-      corrupted.setGenerationStamp(gs);
-    }
-
-    @Override
-    public String toString() {
-      return corrupted + "("
-          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
-    }
-  }
 
   /**
    * The given datanode is reporting all its blocks.
@@ -1659,15 +1604,6 @@ public class BlockManager {
         + ", processing time: " + (endTime - startTime) + " msecs");
   }
 
-  /**
-   * The given datanode is reporting all of its cached blocks.
-   * Update the cache state of blocks in the block map.
-   */
-  public void processCacheReport(final DatanodeID nodeID, final String poolId,
-      final BlockListAsLongs newReport) throws IOException {
-    // TODO: Implement me!
-  }
-
   /**
    * Rescan the list of blocks which were previously postponed.
    */
@@ -1699,46 +1635,6 @@ public class BlockManager {
     }
   }
   
-  private void processReport(final DatanodeDescriptor node,
-      final BlockListAsLongs report) throws IOException {
-    // Normal case:
-    // Modify the (block-->datanode) map, according to the difference
-    // between the old and new block report.
-    //
-    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
-    Collection<Block> toRemove = new LinkedList<Block>();
-    Collection<Block> toInvalidate = new LinkedList<Block>();
-    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
-    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
-    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
-
-    // Process the blocks on each queue
-    for (StatefulBlockInfo b : toUC) { 
-      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
-    }
-    for (Block b : toRemove) {
-      removeStoredBlock(b, node);
-    }
-    int numBlocksLogged = 0;
-    for (BlockInfo b : toAdd) {
-      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
-      numBlocksLogged++;
-    }
-    if (numBlocksLogged > maxNumBlocksToLog) {
-      blockLog.info("BLOCK* processReport: logged info for " + maxNumBlocksToLog
-          + " of " + numBlocksLogged + " reported.");
-    }
-    for (Block b : toInvalidate) {
-      blockLog.info("BLOCK* processReport: "
-          + b + " on " + node + " size " + b.getNumBytes()
-          + " does not belong to any file");
-      addToInvalidates(b, node);
-    }
-    for (BlockToMarkCorrupt b : toCorrupt) {
-      markBlockAsCorrupt(b, node);
-    }
-  }
-
   /**
    * processFirstBlockReport is intended only for processing "initial" block
    * reports, the first block report received from a DN after it registers.
@@ -1801,44 +1697,6 @@ public class BlockManager {
     }
   }
 
-  private void reportDiff(DatanodeDescriptor dn, 
-      BlockListAsLongs newReport, 
-      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
-      Collection<Block> toRemove,           // remove from DatanodeDescriptor
-      Collection<Block> toInvalidate,       // should be removed from DN
-      Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
-      Collection<StatefulBlockInfo> toUC) { // add to under-construction list
-    // place a delimiter in the list which separates blocks 
-    // that have been reported from those that have not
-    BlockInfo delimiter = new BlockInfo(new Block(), 1);
-    boolean added = dn.addBlock(delimiter);
-    assert added : "Delimiting block cannot be present in the node";
-    int headIndex = 0; //currently the delimiter is in the head of the list
-    int curIndex;
-
-    if (newReport == null)
-      newReport = new BlockListAsLongs();
-    // scan the report and process newly reported blocks
-    BlockReportIterator itBR = newReport.getBlockReportIterator();
-    while(itBR.hasNext()) {
-      Block iblk = itBR.next();
-      ReplicaState iState = itBR.getCurrentReplicaState();
-      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
-                                  toAdd, toInvalidate, toCorrupt, toUC);
-      // move block to the head of the list
-      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
-        headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
-      }
-    }
-    // collect blocks that have not been reported
-    // all of them are next to the delimiter
-    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
-        delimiter.getNext(0), dn);
-    while(it.hasNext())
-      toRemove.add(it.next());
-    dn.removeBlock(delimiter);
-  }
-
   /**
    * Process a block replica reported by the data-node.
    * No side effects except adding to the passed-in Collections.
@@ -1870,7 +1728,8 @@ public class BlockManager {
    * @return the up-to-date stored block, if it should be kept.
    *         Otherwise, null.
    */
-  private BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
+  @Override // ReportProcessor
+  BlockInfo processReportedBlock(final DatanodeDescriptor dn, 
       final Block block, final ReplicaState reportedState, 
       final Collection<BlockInfo> toAdd, 
       final Collection<Block> toInvalidate, 
@@ -2097,6 +1956,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     }
   }
   
+  @Override // ReportProcessor
   void addStoredBlockUnderConstruction(
       BlockInfoUnderConstruction block, 
       DatanodeDescriptor node, 
@@ -2152,7 +2012,8 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
    * needed replications if this takes care of the problem.
    * @return the block that is stored in blockMap.
    */
-  private Block addStoredBlock(final BlockInfo block,
+  @Override // ReportProcessor
+  Block addStoredBlock(final BlockInfo block,
                                DatanodeDescriptor node,
                                DatanodeDescriptor delNodeHint,
                                boolean logEveryBlock)
@@ -2167,7 +2028,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     }
     if (storedBlock == null || storedBlock.getBlockCollection() == null) {
       // If this block does not belong to anyfile, then we are done.
-      blockLog.info("BLOCK* addStoredBlock: " + block + " on "
+      blockLogInfo("#addStoredBlock: " + block + " on "
           + node + " size " + block.getNumBytes()
           + " but it does not belong to any file");
       // we could add this block to invalidate set of this datanode.
@@ -2189,7 +2050,7 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
       }
     } else {
       curReplicaDelta = 0;
-      blockLog.warn("BLOCK* addStoredBlock: "
+      blockLogWarn("#addStoredBlock: "
           + "Redundant addStoredBlock request received for " + storedBlock
           + " on " + node + " size " + storedBlock.getNumBytes());
     }
@@ -2247,20 +2108,6 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
     return storedBlock;
   }
 
-  private void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
-    if (!blockLog.isInfoEnabled()) {
-      return;
-    }
-    
-    StringBuilder sb = new StringBuilder(500);
-    sb.append("BLOCK* addStoredBlock: blockMap updated: ")
-      .append(node)
-      .append(" is added to ");
-    storedBlock.appendStringTo(sb);
-    sb.append(" size " )
-      .append(storedBlock.getNumBytes());
-    blockLog.info(sb);
-  }
   /**
    * Invalidate corrupt replicas.
    * <p>
@@ -3282,4 +3129,21 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
   public void shutdown() {
     blocksMap.close();
   }
+
+  @Override // ReportProcessor
+  int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
+      int curIndex, int headIndex) {
+    return dn.moveBlockToHead(storedBlock, curIndex, headIndex);
+  }
+
+  @Override // ReportProcessor
+  boolean addBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.addBlock(block);
+  }
+
+  @Override // ReportProcessor
+  boolean removeBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.removeBlock(block);
+  }
+
 }

+ 595 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationManager.java

@@ -0,0 +1,595 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.FSClusterStats;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Analogue of the BlockManager class for cached replicas. Maintains the mapping
+ * of cached blocks to datanodes via processing datanode cache reports. Based on
+ * these reports and addition and removal of caching directives in the
+ * CacheManager, the CacheReplicationManager will schedule caching and uncaching
+ * work.
+ * 
+ * The CacheReplicationManager does not have a separate lock, so depends on
+ * taking the namesystem lock as appropriate.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public class CacheReplicationManager extends ReportProcessor {
+
+  private static final Log LOG =
+      LogFactory.getLog(CacheReplicationManager.class);
+
+  // Statistics
+  private volatile long pendingCacheBlocksCount = 0L;
+  private volatile long underCachedBlocksCount = 0L;
+  private volatile long scheduledCacheBlocksCount = 0L;
+
+  /** Used by metrics */
+  public long getPendingCacheBlocksCount() {
+    return pendingCacheBlocksCount;
+  }
+  /** Used by metrics */
+  public long getUnderCachedBlocksCount() {
+    return underCachedBlocksCount;
+  }
+  /** Used by metrics */
+  public long getScheduledCacheBlocksCount() {
+    return scheduledCacheBlocksCount;
+  }
+  /** Used by metrics */
+  public long getPendingBlocksToUncacheCount() {
+    return blocksToUncache.numBlocks();
+  }
+
+  private final Namesystem namesystem;
+  private final BlockManager blockManager;
+  private final DatanodeManager datanodeManager;
+  private final boolean isCachingEnabled;
+
+  /**
+   * Mapping of blocks to datanodes where the block is cached
+   */
+  final BlocksMap cachedBlocksMap;
+  /**
+   * Blocks to be uncached
+   */
+  private final UncacheBlocks blocksToUncache;
+  /**
+   * Blocks that need to be cached
+   */
+  private final LightWeightHashSet<Block> neededCacheBlocks;
+  /**
+   * Blocks that are being cached
+   */
+  private final PendingReplicationBlocks pendingCacheBlocks;
+
+  /**
+   * Executor for the CacheReplicationMonitor thread
+   */
+  private ExecutorService monitor = null;
+
+  private final Configuration conf;
+
+  public CacheReplicationManager(final Namesystem namesystem,
+      final BlockManager blockManager, final DatanodeManager datanodeManager,
+      final FSClusterStats stats, final Configuration conf) throws IOException {
+    super(conf);
+    this.namesystem = namesystem;
+    this.blockManager = blockManager;
+    this.datanodeManager = datanodeManager;
+    this.conf = conf;
+    isCachingEnabled = conf.getBoolean(
+        DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY,
+        DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
+    if (isCachingEnabled) {
+      cachedBlocksMap = new BlocksMap(BlockManager.DEFAULT_MAP_LOAD_FACTOR);
+      blocksToUncache = new UncacheBlocks();
+      pendingCacheBlocks = new PendingReplicationBlocks(1000 * conf.getInt(
+          DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
+          DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT));
+      neededCacheBlocks = new LightWeightHashSet<Block>();
+    } else {
+      cachedBlocksMap = null;
+      blocksToUncache = null;
+      pendingCacheBlocks = null;
+      neededCacheBlocks = null;
+    }
+  }
+
+  public void activate() {
+    if (isCachingEnabled) {
+      pendingCacheBlocks.start();
+      this.monitor = Executors.newSingleThreadExecutor(
+          new ThreadFactoryBuilder()
+          .setDaemon(true)
+          .setNameFormat(CacheReplicationMonitor.class.toString())
+          .build());
+      monitor.submit(new CacheReplicationMonitor(namesystem, blockManager,
+          datanodeManager, this, blocksToUncache, neededCacheBlocks,
+          pendingCacheBlocks, conf));
+      monitor.shutdown();
+    }
+  }
+
+  public void close() {
+    if (isCachingEnabled) {
+      monitor.shutdownNow();
+      try {
+        monitor.awaitTermination(3000, TimeUnit.MILLISECONDS);
+      } catch (InterruptedException e) {
+      }
+      pendingCacheBlocks.stop();
+      cachedBlocksMap.close();
+    }
+  }
+
+  public void clearQueues() {
+    blocksToUncache.clear();
+    synchronized (neededCacheBlocks) {
+      neededCacheBlocks.clear();
+    }
+    pendingCacheBlocks.clear();
+  }
+
+  public boolean isCachingEnabled() {
+    return isCachingEnabled;
+  }
+
+  /**
+   * @return desired cache replication factor of the block
+   */
+  short getCacheReplication(Block block) {
+    final BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+    return bc == null ? 0 : bc.getCacheReplication();
+  }
+
+  /**
+   * Returns the number of cached replicas of a block
+   */
+  short getNumCached(Block block) {
+    Iterator<DatanodeDescriptor> it = cachedBlocksMap.nodeIterator(block);
+    short numCached = 0;
+    while (it.hasNext()) {
+      it.next();
+      numCached++;
+    }
+    return numCached;
+  }
+
+  /**
+   * The given datanode is reporting all of its cached blocks.
+   * Update the cache state of blocks in the block map.
+   */
+  public void processCacheReport(final DatanodeID nodeID, final String poolId,
+      final BlockListAsLongs newReport) throws IOException {
+    if (!isCachingEnabled) {
+      String error = "cacheReport received from datanode " + nodeID
+          + " but caching is disabled on the namenode ("
+          + DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY + ")";
+      LOG.warn(error + ", ignoring");
+      throw new IOException(error);
+    }
+    namesystem.writeLock();
+    final long startTime = Time.now(); //after acquiring write lock
+    final long endTime;
+    try {
+      final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
+      if (node == null || !node.isAlive) {
+        throw new IOException(
+            "processCacheReport from dead or unregistered node: " + nodeID);
+      }
+
+      // TODO: do an optimized initial cache report while in startup safemode
+      if (namesystem.isInStartupSafeMode()) {
+        blockLogInfo("#processCacheReport: "
+            + "discarded cache report from " + nodeID
+            + " because namenode still in startup phase");
+        return;
+      }
+
+      processReport(node, newReport);
+
+      // TODO: process postponed blocks reported while a standby
+      //rescanPostponedMisreplicatedBlocks();
+    } finally {
+      endTime = Time.now();
+      namesystem.writeUnlock();
+    }
+
+    // Log the block report processing stats from Namenode perspective
+    final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+    if (metrics != null) {
+      metrics.addCacheBlockReport((int) (endTime - startTime));
+    }
+    blockLogInfo("#processCacheReport: from "
+        + nodeID + ", blocks: " + newReport.getNumberOfBlocks()
+        + ", processing time: " + (endTime - startTime) + " msecs");
+  }
+
+  @Override // ReportProcessor
+  void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeInfo dn)
+      throws IOException {
+    throw new UnsupportedOperationException("Corrupt blocks should not be in"
+        + " the cache report");
+  }
+
+  @Override // ReportProcessor
+  void addToInvalidates(final Block b, final DatanodeInfo node) {
+    blocksToUncache.add(b, node, true);
+  }
+
+  @Override // ReportProcessor
+  void addStoredBlockUnderConstruction(
+      BlockInfoUnderConstruction storedBlock, DatanodeDescriptor node,
+      ReplicaState reportedState) {
+    throw new UnsupportedOperationException("Under-construction blocks"
+        + " should not be in the cache report");
+  }
+
+  @Override // ReportProcessor
+  int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
+      int curIndex, int headIndex) {
+    return dn.moveCachedBlockToHead(storedBlock, curIndex, headIndex);
+  }
+
+  @Override // ReportProcessor
+  boolean addBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.addCachedBlock(block);
+  }
+
+  @Override // ReportProcessor
+  boolean removeBlock(DatanodeDescriptor dn, BlockInfo block) {
+    return dn.removeCachedBlock(block);
+  }
+
+  /**
+   * Similar to processReportedBlock. Simpler since it doesn't need to worry
+   * about under construction and corrupt replicas.
+   * 
+   * @return Updated BlockInfo for the block if it should be kept, null if
+   * it is to be invalidated.
+   */
+  @Override // ReportProcessor
+  BlockInfo processReportedBlock(final DatanodeDescriptor dn,
+      final Block block, final ReplicaState reportedState,
+      final Collection<BlockInfo> toAdd,
+      final Collection<Block> toInvalidate,
+      Collection<BlockToMarkCorrupt> toCorrupt,
+      Collection<StatefulBlockInfo> toUC) {
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Reported cached block " + block
+          + " on " + dn + " size " + block.getNumBytes()
+          + " replicaState = " + reportedState);
+    }
+
+    final boolean shouldPostponeBlocksFromFuture =
+        blockManager.shouldPostponeBlocksFromFuture();
+    if (shouldPostponeBlocksFromFuture &&
+        namesystem.isGenStampInFuture(block)) {
+      // TODO: queuing cache operations on the standby
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("processReportedBlock: block " + block + " has a "
+            + "genstamp from the future and namenode is in standby mode,"
+            + " ignoring");
+      }
+      return null;
+    }
+
+    BlockInfo storedBlock = blockManager.blocksMap.getStoredBlock(block);
+    if (storedBlock == null) {
+      // If blocksMap does not contain reported block id,
+      // the BlockManager will take care of invalidating it, and the datanode
+      // will automatically uncache at that point.
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("processReportedBlock: block " + block + " not found "
+            + "in blocksMap, ignoring");
+      }
+      return null;
+    }
+
+    BlockUCState ucState = storedBlock.getBlockUCState();
+
+    // Datanodes currently only will cache completed replicas.
+    // Let's just invalidate anything that's not completed and the right
+    // genstamp and number of bytes.
+    if (!ucState.equals(BlockUCState.COMPLETE) ||
+        block.getGenerationStamp() != storedBlock.getGenerationStamp() ||
+        block.getNumBytes() != storedBlock.getNumBytes()) {
+      if (shouldPostponeBlocksFromFuture) {
+        // TODO: queuing cache operations on the standby
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("processReportedBlock: block " + block + " has a "
+              + "mismatching genstamp or length and namenode is in standby"
+              + " mode, ignoring");
+        }
+        return null;
+      } else {
+        toInvalidate.add(block);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("processReportedBlock: block " + block + " scheduled"
+              + " for uncaching because it is misreplicated"
+              + " or under construction.");
+        }
+        return null;
+      }
+    }
+
+    // It's a keeper
+
+    // Could be present in blocksMap and not in cachedBlocksMap, add it
+    BlockInfo cachedBlock = cachedBlocksMap.getStoredBlock(block);
+    if (cachedBlock == null) {
+      cachedBlock = new BlockInfo(block, 0);
+      cachedBlocksMap.addBlockCollection(cachedBlock,
+          storedBlock.getBlockCollection());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("In memory blockUCState = " + ucState);
+    }
+
+    // Ignore replicas that are already scheduled for removal
+    if (blocksToUncache.contains(dn.getStorageID(), block)) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("processReportedBlock: block " + block + " is already"
+            + " scheduled to be uncached, not adding it to the cachedBlocksMap");
+      }
+      return cachedBlock;
+    }
+
+    // add replica if not already present in the cached block map
+    if (reportedState == ReplicaState.FINALIZED
+        && cachedBlock.findDatanode(dn) < 0) {
+      toAdd.add(cachedBlock);
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("processReportedBlock: block " + block + " scheduled"
+          + " to be added to cachedBlocksMap");
+    }
+    return cachedBlock;
+  }
+
+  /**
+   * Modify (cached block-->datanode) map with a newly cached block. Remove
+   * block from set of needed cache replications if this takes care of the
+   * problem.
+   * 
+   * @return the block that is stored in cachedBlockMap.
+   */
+  @Override // ReportProcessor
+  Block addStoredBlock(final BlockInfo block, DatanodeDescriptor node,
+      DatanodeDescriptor delNodeHint, boolean logEveryBlock) throws IOException {
+    assert block != null && namesystem.hasWriteLock();
+    BlockInfo cachedBlock = block;
+    if (cachedBlock == null || cachedBlock.getBlockCollection() == null) {
+      // If this block does not belong to anyfile, then we are done.
+      blockLogInfo("#addStoredBlock: " + block + " on "
+          + node + " size " + block.getNumBytes()
+          + " but it does not belong to any file");
+      // we could add this block to invalidate set of this datanode.
+      // it will happen in next block report otherwise.
+      return block;
+    }
+
+    BlockCollection bc = cachedBlock.getBlockCollection();
+
+    // add block to the datanode
+    boolean added = node.addCachedBlock(cachedBlock);
+
+    int curReplicaDelta;
+    if (added) {
+      curReplicaDelta = 1;
+      if (logEveryBlock) {
+        logAddStoredBlock(cachedBlock, node);
+      }
+    } else {
+      curReplicaDelta = 0;
+      blockLogWarn("#addStoredBlock: "
+          + "Redundant addCachedBlock request received for " + cachedBlock
+          + " on " + node + " size " + cachedBlock.getNumBytes());
+    }
+
+    // Remove it from pending list if present
+    pendingCacheBlocks.decrement(block, node);
+
+    // Now check for completion of blocks and safe block count
+    int numCachedReplicas = getNumCached(cachedBlock);
+    int numEffectiveCachedReplica = numCachedReplicas
+      + pendingCacheBlocks.getNumReplicas(cachedBlock);
+
+    // if file is under construction, then done for now
+    if (bc instanceof MutableBlockCollection) {
+      return cachedBlock;
+    }
+
+    // do not try to handle over/under-replicated blocks during first safe mode
+    if (!namesystem.isPopulatingReplQueues()) {
+      return cachedBlock;
+    }
+
+    // Under-replicated
+    short cacheReplication = bc.getCacheReplication();
+    if (numEffectiveCachedReplica >= cacheReplication) {
+      synchronized (neededCacheBlocks) {
+        neededCacheBlocks.remove(cachedBlock);
+      }
+    } else {
+      updateNeededCaching(cachedBlock, curReplicaDelta, 0);
+    }
+
+    // Over-replicated, we don't need this new replica
+    if (numEffectiveCachedReplica > cacheReplication) {
+      blocksToUncache.add(cachedBlock, node, true);
+    }
+
+    return cachedBlock;
+  }
+
+  /**
+   * Modify (cached block-->datanode) map. Possibly generate replication tasks,
+   * if the removed block is still valid.
+   */
+  @Override // ReportProcessor
+  void removeStoredBlock(Block block, DatanodeDescriptor node) {
+    blockLogDebug("#removeStoredBlock: " + block + " from " + node);
+    assert (namesystem.hasWriteLock());
+    {
+      if (!cachedBlocksMap.removeNode(block, node)) {
+        blockLogDebug("#removeStoredBlock: "
+            + block + " has already been removed from node " + node);
+        return;
+      }
+
+      // Prune the block from the map if it's the last cache replica
+      if (cachedBlocksMap.getStoredBlock(block).numNodes() == 0) {
+        cachedBlocksMap.removeBlock(block);
+      }
+
+      //
+      // It's possible that the block was removed because of a datanode
+      // failure. If the block is still valid, check if replication is
+      // necessary. In that case, put block on a possibly-will-
+      // be-replicated list.
+      //
+      BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+      if (bc != null) {
+        updateNeededCaching(block, -1, 0);
+      }
+    }
+  }
+
+  /**
+   * Reduce cache replication factor to the new replication by randomly
+   * choosing replicas to invalidate.
+   */
+  private void processOverCachedBlock(final Block block,
+      final short replication) {
+    assert namesystem.hasWriteLock();
+    List<DatanodeDescriptor> nodes = getSafeReplicas(cachedBlocksMap, block);
+    List<DatanodeDescriptor> targets =
+        CacheReplicationPolicy.chooseTargetsToUncache(nodes, replication);
+    for (DatanodeDescriptor dn: targets) {
+      blocksToUncache.add(block, dn, true);
+    }
+  }
+
+  /** Set replication for the blocks. */
+  public void setCacheReplication(final short oldRepl, final short newRepl,
+      final String src, final Block... blocks) {
+    if (!isCachingEnabled) {
+      LOG.warn("Attempted to set cache replication for " + src + " but caching"
+          + " is disabled (" + DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY
+          + "), ignoring");
+      return;
+    }
+    if (newRepl == oldRepl) {
+      return;
+    }
+
+    // update needReplication priority queues
+    for (Block b : blocks) {
+      updateNeededCaching(b, 0, newRepl-oldRepl);
+    }
+
+    if (oldRepl > newRepl) {
+      // old replication > the new one; need to remove copies
+      LOG.info("Decreasing cache replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+      for (Block b : blocks) {
+        processOverCachedBlock(b, newRepl);
+      }
+    } else { // replication factor is increased
+      LOG.info("Increasing cache replication from " + oldRepl + " to " + newRepl
+          + " for " + src);
+    }
+  }
+
+  /** updates a block in under replicated queue */
+  private void updateNeededCaching(final Block block,
+      final int curReplicasDelta, int expectedReplicasDelta) {
+    namesystem.writeLock();
+    try {
+      if (!namesystem.isPopulatingReplQueues()) {
+        return;
+      }
+      final int numCached = getNumCached(block);
+      final int curExpectedReplicas = getCacheReplication(block);
+      if (numCached < curExpectedReplicas) {
+        neededCacheBlocks.add(block);
+      } else {
+        synchronized (neededCacheBlocks) {
+          neededCacheBlocks.remove(block);
+        }
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  /**
+   * Return the safely cached replicas of a block in a BlocksMap
+   */
+  List<DatanodeDescriptor> getSafeReplicas(BlocksMap map, Block block) {
+    List<DatanodeDescriptor> nodes = new ArrayList<DatanodeDescriptor>(3);
+    Collection<DatanodeDescriptor> corrupted =
+        blockManager.corruptReplicas.getNodes(block);
+    Iterator<DatanodeDescriptor> it = map.nodeIterator(block);
+    while (it.hasNext()) {
+      DatanodeDescriptor dn = it.next();
+      // Don't count a decommissioned or decommissioning nodes
+      if (dn.isDecommissioned() || dn.isDecommissionInProgress()) {
+        continue;
+      }
+      // Don't count a corrupted node
+      if (corrupted != null && corrupted.contains(dn)) {
+        continue;
+      }
+      nodes.add(dn);
+    }
+    return nodes;
+  }
+}

+ 302 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+
+/**
+ * Periodically computes new replication work. This consists of two tasks:
+ * 
+ * 1) Assigning blocks in the neededCacheBlocks to datanodes where they will be
+ * cached. This moves them to the pendingCacheBlocks list.
+ * 
+ * 2) Placing caching tasks in pendingCacheBlocks that have timed out
+ * back into neededCacheBlocks for reassignment.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+class CacheReplicationMonitor implements Runnable {
+
+  private static final Log LOG =
+      LogFactory.getLog(CacheReplicationMonitor.class);
+
+  private static final Log blockLog = NameNode.blockStateChangeLog;
+
+  private final Namesystem namesystem;
+  private final BlockManager blockManager;
+  private final DatanodeManager datanodeManager;
+  private final CacheReplicationManager cacheReplManager;
+
+  private final UncacheBlocks blocksToUncache;
+  private final LightWeightHashSet<Block> neededCacheBlocks;
+  private final PendingReplicationBlocks pendingCacheBlocks;
+
+  /**
+   * Re-check period for computing cache replication work
+   */
+  private final long cacheReplicationRecheckInterval;
+
+  public CacheReplicationMonitor(Namesystem namesystem,
+      BlockManager blockManager, DatanodeManager datanodeManager,
+      CacheReplicationManager cacheReplManager,
+      UncacheBlocks blocksToUncache,
+      LightWeightHashSet<Block> neededCacheBlocks,
+      PendingReplicationBlocks pendingCacheBlocks,
+      Configuration conf) {
+    this.namesystem = namesystem;
+    this.blockManager = blockManager;
+    this.datanodeManager = datanodeManager;
+    this.cacheReplManager = cacheReplManager;
+
+    this.blocksToUncache = blocksToUncache;
+    this.neededCacheBlocks = neededCacheBlocks;
+    this.pendingCacheBlocks = pendingCacheBlocks;
+
+    this.cacheReplicationRecheckInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000L;
+  }
+
+  @Override
+  public void run() {
+    LOG.info("CacheReplicationMonitor is starting");
+    while (namesystem.isRunning()) {
+      try {
+        computeCachingWork();
+        processPendingCachingWork();
+        Thread.sleep(cacheReplicationRecheckInterval);
+      } catch (Throwable t) {
+        if (!namesystem.isRunning()) {
+          LOG.info("Stopping CacheReplicationMonitor.");
+          if (!(t instanceof InterruptedException)) {
+            LOG.info("CacheReplicationMonitor received an exception"
+                + " while shutting down.", t);
+          }
+          break;
+        }
+        LOG.fatal("ReplicationMonitor thread received Runtime exception. ", t);
+        terminate(1, t);
+      }
+    }
+  }
+
+  /**
+   * Assigns under-cached blocks to new datanodes.
+   */
+  private void computeCachingWork() {
+    List<Block> blocksToCache = null;
+    namesystem.writeLock();
+    try {
+      synchronized (neededCacheBlocks) {
+        blocksToCache = neededCacheBlocks.pollAll();
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+    computeCachingWorkForBlocks(blocksToCache);
+    computeUncacheWork();
+  }
+
+  private void computeCachingWorkForBlocks(List<Block> blocksToCache) {
+    int requiredRepl, effectiveRepl, additionalRepl;
+    List<DatanodeDescriptor> cachedNodes, storedNodes, targets;
+
+    final HashMap<Block, List<DatanodeDescriptor>> work =
+        new HashMap<Block, List<DatanodeDescriptor>>();
+    namesystem.writeLock();
+    try {
+      synchronized (neededCacheBlocks) {
+        for (Block block: blocksToCache) {
+          // Required number of cached replicas
+          requiredRepl = cacheReplManager.getCacheReplication(block);
+          // Replicas that are safely cached
+          cachedNodes = cacheReplManager.getSafeReplicas(
+              cacheReplManager.cachedBlocksMap, block);
+          // Replicas that are safely stored on disk
+          storedNodes = cacheReplManager.getSafeReplicas(
+              blockManager.blocksMap, block);
+          // "effective" replication factor which includes pending
+          // replication work
+          effectiveRepl = cachedNodes.size()
+              + pendingCacheBlocks.getNumReplicas(block);
+          if (effectiveRepl >= requiredRepl) {
+            neededCacheBlocks.remove(block);
+            blockLog.info("BLOCK* Removing " + block
+                + " from neededCacheBlocks as it has enough cached replicas");
+              continue;
+          }
+          // Choose some replicas to cache if needed
+          additionalRepl = requiredRepl - effectiveRepl;
+          targets = new ArrayList<DatanodeDescriptor>(storedNodes);
+          // Only target replicas that aren't already cached.
+          for (DatanodeDescriptor dn: storedNodes) {
+            if (!cachedNodes.contains(dn)) {
+              targets.add(dn);
+            }
+          }
+          if (targets.size() < additionalRepl) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Block " + block + " cannot be cached on additional"
+                  + " nodes because there are no more available datanodes"
+                  + " with the block on disk.");
+            }
+          }
+          targets = CacheReplicationPolicy.chooseTargetsToCache(block, targets,
+              additionalRepl);
+          if (targets.size() < additionalRepl) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Block " + block + " cannot be cached on additional"
+                  + " nodes because there is not sufficient cache space on"
+                  + " available target datanodes.");
+            }
+          }
+          // Continue if we couldn't get more cache targets
+          if (targets.size() == 0) {
+            continue;
+          }
+
+          // Update datanodes and blocks that were scheduled for caching
+          work.put(block, targets);
+          // Schedule caching on the targets
+          for (DatanodeDescriptor target: targets) {
+            target.addBlockToBeCached(block);
+          }
+          // Add block to the pending queue
+          pendingCacheBlocks.increment(block,
+              targets.toArray(new DatanodeDescriptor[] {}));
+          if (blockLog.isDebugEnabled()) {
+            blockLog.debug("BLOCK* block " + block
+                + " is moved from neededCacheBlocks to pendingCacheBlocks");
+          }
+          // Remove from needed queue if it will be fully replicated
+          if (effectiveRepl + targets.size() >= requiredRepl) {
+            neededCacheBlocks.remove(block);
+          }
+        }
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+
+    if (blockLog.isInfoEnabled()) {
+      // log which blocks have been scheduled for replication
+      for (Entry<Block, List<DatanodeDescriptor>> item : work.entrySet()) {
+        Block block = item.getKey();
+        List<DatanodeDescriptor> nodes = item.getValue();
+        StringBuilder targetList = new StringBuilder("datanode(s)");
+        for (DatanodeDescriptor node: nodes) {
+          targetList.append(' ');
+          targetList.append(node);
+        }
+        blockLog.info("BLOCK* ask " + targetList + " to cache " + block);
+      }
+    }
+
+    if (blockLog.isDebugEnabled()) {
+        blockLog.debug(
+          "BLOCK* neededCacheBlocks = " + neededCacheBlocks.size()
+          + " pendingCacheBlocks = " + pendingCacheBlocks.size());
+    }
+  }
+
+  /**
+   * Reassign pending caching work that has timed out
+   */
+  private void processPendingCachingWork() {
+    Block[] timedOutItems = pendingCacheBlocks.getTimedOutBlocks();
+    if (timedOutItems != null) {
+      namesystem.writeLock();
+      try {
+        for (int i = 0; i < timedOutItems.length; i++) {
+          Block block = timedOutItems[i];
+          final short numCached = cacheReplManager.getNumCached(block);
+          final short cacheReplication =
+              cacheReplManager.getCacheReplication(block);
+          // Needs to be cached if under-replicated
+          if (numCached < cacheReplication) {
+            synchronized (neededCacheBlocks) {
+              neededCacheBlocks.add(block);
+            }
+          }
+        }
+      } finally {
+        namesystem.writeUnlock();
+      }
+    }
+  }
+
+  /**
+   * Schedule blocks for uncaching at datanodes
+   * @return total number of block for deletion
+   */
+  int computeUncacheWork() {
+    final List<String> nodes = blocksToUncache.getStorageIDs();
+    int blockCnt = 0;
+    for (String node: nodes) {
+      blockCnt += uncachingWorkForOneNode(node);
+    }
+    return blockCnt;
+  }
+
+  /**
+   * Gets the list of blocks scheduled for uncaching at a datanode and
+   * schedules them for uncaching.
+   * 
+   * @return number of blocks scheduled for removal
+   */
+  private int uncachingWorkForOneNode(String nodeId) {
+    final List<Block> toInvalidate;
+    final DatanodeDescriptor dn;
+
+    namesystem.writeLock();
+    try {
+      // get blocks to invalidate for the nodeId
+      assert nodeId != null;
+      dn = datanodeManager.getDatanode(nodeId);
+      if (dn == null) {
+        blocksToUncache.remove(nodeId);
+        return 0;
+      }
+      toInvalidate = blocksToUncache.invalidateWork(nodeId, dn);
+      if (toInvalidate == null) {
+        return 0;
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+    if (blockLog.isInfoEnabled()) {
+      blockLog.info("BLOCK* " + getClass().getSimpleName()
+          + ": ask " + dn + " to uncache " + toInvalidate);
+    }
+    return toInvalidate.size();
+  }
+}

+ 125 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationPolicy.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+
+import org.apache.commons.math.random.RandomData;
+import org.apache.commons.math.random.RandomDataImpl;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Helper class used by the CacheReplicationManager and CacheReplicationMonitor
+ * to select datanodes where blocks should be cached or uncached.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public class CacheReplicationPolicy {
+
+  /**
+   * @return List of datanodes with sufficient capacity to cache the block
+   */
+  private static List<DatanodeDescriptor> selectSufficientCapacity(Block block,
+      List<DatanodeDescriptor> targets) {
+    List<DatanodeDescriptor> sufficient =
+        new ArrayList<DatanodeDescriptor>(targets.size());
+    for (DatanodeDescriptor dn: targets) {
+      long remaining = dn.getCacheRemaining();
+      if (remaining >= block.getNumBytes()) {
+        sufficient.add(dn);
+      }
+    }
+    return sufficient;
+  }
+
+  /**
+   * Returns a random datanode from targets, weighted by the amount of free
+   * cache capacity on the datanode. Prunes unsuitable datanodes from the
+   * targets list.
+   * 
+   * @param block Block to be cached
+   * @param targets List of potential cache targets
+   * @return a random DN, or null if no datanodes are available or have enough
+   *         cache capacity.
+   */
+  private static DatanodeDescriptor randomDatanodeByRemainingCache(Block block,
+      List<DatanodeDescriptor> targets) {
+    // Hold a lottery biased by the amount of free space to decide
+    // who gets the block
+    Collections.shuffle(targets);
+    TreeMap<Long, DatanodeDescriptor> lottery =
+        new TreeMap<Long, DatanodeDescriptor>();
+    long totalCacheAvailable = 0;
+    for (DatanodeDescriptor dn: targets) {
+      long remaining = dn.getCacheRemaining();
+      totalCacheAvailable += remaining;
+      lottery.put(totalCacheAvailable, dn);
+    }
+    // Pick our lottery winner
+    RandomData r = new RandomDataImpl();
+    long winningTicket = r.nextLong(0, totalCacheAvailable - 1);
+    Entry<Long, DatanodeDescriptor> winner = lottery.higherEntry(winningTicket);
+    return winner.getValue();
+  }
+
+  /**
+   * Chooses numTargets new cache replicas for a block from a list of targets.
+   * Will return fewer targets than requested if not enough nodes are available.
+   * 
+   * @return List of target datanodes
+   */
+  static List<DatanodeDescriptor> chooseTargetsToCache(Block block,
+      List<DatanodeDescriptor> targets, int numTargets) {
+    List<DatanodeDescriptor> sufficient =
+        selectSufficientCapacity(block, targets);
+    List<DatanodeDescriptor> chosen =
+        new ArrayList<DatanodeDescriptor>(numTargets);
+    for (int i = 0; i < numTargets && !sufficient.isEmpty(); i++) {
+      chosen.add(randomDatanodeByRemainingCache(block, sufficient));
+    }
+    return chosen;
+  }
+
+  /**
+   * Given a list cache replicas where a block is cached, choose replicas to
+   * uncache to drop the cache replication factor down to replication.
+   * 
+   * @param nodes list of datanodes where the block is currently cached
+   * @param replication desired replication factor
+   * @return List of datanodes to uncache
+   */
+  public static List<DatanodeDescriptor> chooseTargetsToUncache(
+      List<DatanodeDescriptor> nodes, short replication) {
+    final int effectiveReplication = nodes.size();
+    List<DatanodeDescriptor> targets =
+        new ArrayList<DatanodeDescriptor>(effectiveReplication);
+    Collections.shuffle(nodes);
+    final int additionalTargetsNeeded = effectiveReplication - replication;
+    int chosen = 0;
+    while (chosen < additionalTargetsNeeded && !nodes.isEmpty()) {
+      targets.add(nodes.get(chosen));
+      chosen++;
+    }
+    return targets;
+  }
+
+}

+ 136 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -22,6 +22,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -30,6 +31,9 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -93,8 +97,24 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  /**
+   * Head of the list of blocks on the datanode
+   */
   private volatile BlockInfo blockList = null;
+  /**
+   * Number of blocks on the datanode
+   */
   private int numBlocks = 0;
+
+  /**
+   * Head of the list of cached blocks on the datanode
+   */
+  private volatile BlockInfo cachedBlockList = null;
+  /**
+   * Number of cached blocks on the datanode
+   */
+  private int numCachedBlocks = 0;
+
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
@@ -134,6 +154,12 @@ public class DatanodeDescriptor extends DatanodeInfo {
   /** A set of blocks to be invalidated by this datanode */
   private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
 
+  /** A queue of blocks to be cached by this datanode */
+  private BlockQueue<Block> cacheBlocks = new BlockQueue<Block>();
+  /** A set of blocks to be uncached by this datanode */
+  private LightWeightHashSet<Block> blocksToUncache =
+      new LightWeightHashSet<Block>();
+
   /* Variables for maintaining number of blocks scheduled to be written to
    * this datanode. This count is approximate and might be slightly bigger
    * in case of errors (e.g. datanode does not report if an error occurs
@@ -260,14 +286,57 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return curIndex;
   }
 
+  /**
+   * Add block to the list of cached blocks on the data-node.
+   * @return true if block was successfully added, false if already present
+   */
+  public boolean addCachedBlock(BlockInfo b) {
+    if (!b.addNode(this))
+      return false;
+    // add to the head of the data-node list
+    cachedBlockList = b.listInsert(cachedBlockList, this);
+    numCachedBlocks++;
+    return true;
+  }
+
+  /**
+   * Remove block from the list of cached blocks on the data-node.
+   * @return true if block was successfully removed, false if not present
+   */
+  public boolean removeCachedBlock(BlockInfo b) {
+    cachedBlockList = b.listRemove(cachedBlockList, this);
+    if (b.removeNode(this)) {
+      numCachedBlocks--;
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Move block to the head of the list of cached blocks on the data-node.
+   * @return the index of the head of the blockList
+   */
+  int moveCachedBlockToHead(BlockInfo b, int curIndex, int headIndex) {
+    cachedBlockList = b.moveBlockToHead(cachedBlockList, this, curIndex,
+        headIndex);
+    return curIndex;
+  }
+
   /**
    * Used for testing only
    * @return the head of the blockList
    */
+  @VisibleForTesting
   protected BlockInfo getHead(){
     return blockList;
   }
 
+  @VisibleForTesting
+  protected BlockInfo getCachedHead() {
+    return cachedBlockList;
+  }
+
   /**
    * Replace specified old block with a new one in the DataNodeDescriptor.
    *
@@ -290,7 +359,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
     setDfsUsed(0);
     setXceiverCount(0);
     this.blockList = null;
+    this.cachedBlockList = null;
     this.invalidateBlocks.clear();
+    this.blocksToUncache.clear();
     this.volumeFailures = 0;
   }
   
@@ -300,12 +371,20 @@ public class DatanodeDescriptor extends DatanodeInfo {
       this.recoverBlocks.clear();
       this.replicateBlocks.clear();
     }
+    synchronized(blocksToUncache) {
+      this.blocksToUncache.clear();
+      this.cacheBlocks.clear();
+    }
   }
 
   public int numBlocks() {
     return numBlocks;
   }
 
+  public int numCachedBlocks() {
+    return numCachedBlocks;
+  }
+
   /**
    * Updates stats from datanode heartbeat.
    */
@@ -358,7 +437,11 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(this.blockList, this);
   }
-  
+
+  public Iterator<BlockInfo> getCachedBlockIterator() {
+    return new BlockIterator(this.cachedBlockList, this);
+  }
+
   /**
    * Store block replication work.
    */
@@ -367,6 +450,14 @@ public class DatanodeDescriptor extends DatanodeInfo {
     replicateBlocks.offer(new BlockTargetPair(block, targets));
   }
 
+  /**
+   * Store block caching work.
+   */
+  void addBlockToBeCached(Block block) {
+    assert(block != null);
+    cacheBlocks.offer(block);
+  }
+
   /**
    * Store block recovery work.
    */
@@ -390,6 +481,18 @@ public class DatanodeDescriptor extends DatanodeInfo {
       }
     }
   }
+  
+  /**
+   * Store block uncaching work.
+   */
+  void addBlocksToBeUncached(List<Block> blocklist) {
+    assert(blocklist != null && blocklist.size() > 0);
+    synchronized (blocksToUncache) {
+      for (Block blk : blocklist) {
+        blocksToUncache.add(blk);
+      }
+    }
+  }
 
   /**
    * The number of work items that are pending to be replicated
@@ -398,6 +501,13 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return replicateBlocks.size();
   }
 
+  /**
+   * The number of pending cache work items
+   */
+  int getNumberOfBlocksToBeCached() {
+    return cacheBlocks.size();
+  }
+
   /**
    * The number of block invalidation items that are pending to 
    * be sent to the datanode
@@ -407,11 +517,24 @@ public class DatanodeDescriptor extends DatanodeInfo {
       return invalidateBlocks.size();
     }
   }
-  
+
+  /**
+   * The number of pending uncache work items
+   */
+  int getNumberOfBlocksToBeUncached() {
+    synchronized (blocksToUncache) {
+      return blocksToUncache.size();
+    }
+  }
+
   public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
     return replicateBlocks.poll(maxTransfers);
   }
 
+  public List<Block> getCacheBlocks() {
+    return cacheBlocks.poll(cacheBlocks.size());
+  }
+
   public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) {
     List<BlockInfoUnderConstruction> blocks = recoverBlocks.poll(maxTransfers);
     if(blocks == null)
@@ -430,6 +553,17 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  /**
+   * Remove up to the maximum number of blocks to be uncached
+   */
+  public Block[] getInvalidateCacheBlocks() {
+    synchronized (blocksToUncache) {
+      Block[] deleteList = blocksToUncache.pollToArray(
+          new Block[blocksToUncache.size()]);
+      return deleteList.length == 0 ? null : deleteList;
+    }
+  }
+
   /**
    * @return Approximate number of blocks currently scheduled to be written 
    * to this datanode.

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -1230,6 +1230,19 @@ public class DatanodeManager {
               blockPoolId, blks));
         }
         
+        // Check pending caching
+        List<Block> pendingCacheList = nodeinfo.getCacheBlocks();
+        if (pendingCacheList != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_CACHE, blockPoolId,
+              pendingCacheList.toArray(new Block[] {})));
+        }
+        // Check cached block invalidation
+        blks = nodeinfo.getInvalidateCacheBlocks();
+        if (blks != null) {
+          cmds.add(new BlockCommand(DatanodeProtocol.DNA_UNCACHE,
+              blockPoolId, blks));
+        }
+
         blockManager.addKeyUpdateCommand(cmds, nodeinfo);
 
         // check for balancer bandwidth update

+ 34 - 41
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java

@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
-import java.io.PrintWriter;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -35,24 +34,22 @@ import org.apache.hadoop.hdfs.util.LightWeightHashSet;
  * on the machine in question.
  */
 @InterfaceAudience.Private
-class InvalidateBlocks {
+abstract class InvalidateBlocks {
   /** Mapping: StorageID -> Collection of Blocks */
   private final Map<String, LightWeightHashSet<Block>> node2blocks =
       new TreeMap<String, LightWeightHashSet<Block>>();
   /** The total number of blocks in the map. */
   private long numBlocks = 0L;
 
-  private final DatanodeManager datanodeManager;
-
-  InvalidateBlocks(final DatanodeManager datanodeManager) {
-    this.datanodeManager = datanodeManager;
-  }
-
   /** @return the number of blocks to be invalidated . */
   synchronized long numBlocks() {
     return numBlocks;
   }
 
+  synchronized int numStorages() {
+    return node2blocks.size();
+  }
+
   /**
    * @return true if the given storage has the given block listed for
    * invalidation. Blocks are compared including their generation stamps:
@@ -111,22 +108,22 @@ class InvalidateBlocks {
     }
   }
 
-  /** Print the contents to out. */
-  synchronized void dump(final PrintWriter out) {
-    final int size = node2blocks.values().size();
-    out.println("Metasave: Blocks " + numBlocks 
-        + " waiting deletion from " + size + " datanodes.");
-    if (size == 0) {
-      return;
+  /**
+   * Polls up to <i>limit</i> blocks from the list of to-be-invalidated Blocks
+   * for a storage.
+   */
+  synchronized List<Block> pollNumBlocks(final String storageId, final int limit) {
+    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
+    if (set == null) {
+      return null;
     }
-
-    for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
-      final LightWeightHashSet<Block> blocks = entry.getValue();
-      if (blocks.size() > 0) {
-        out.println(datanodeManager.getDatanode(entry.getKey()));
-        out.println(blocks);
-      }
+    List<Block> polledBlocks = set.pollN(limit);
+    // Remove the storage if the set is now empty
+    if (set.isEmpty()) {
+      remove(storageId);
     }
+    numBlocks -= polledBlocks.size();
+    return polledBlocks;
   }
 
   /** @return a list of the storage IDs. */
@@ -134,26 +131,22 @@ class InvalidateBlocks {
     return new ArrayList<String>(node2blocks.keySet());
   }
 
-  synchronized List<Block> invalidateWork(
-      final String storageId, final DatanodeDescriptor dn) {
-    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
-    if (set == null) {
-      return null;
-    }
-
-    // # blocks that can be sent in one message is limited
-    final int limit = datanodeManager.blockInvalidateLimit;
-    final List<Block> toInvalidate = set.pollN(limit);
-
-    // If we send everything in this message, remove this node entry
-    if (set.isEmpty()) {
-      remove(storageId);
-    }
-
-    dn.addBlocksToBeInvalidated(toInvalidate);
-    numBlocks -= toInvalidate.size();
-    return toInvalidate;
+  /**
+   * Return the set of to-be-invalidated blocks for a storage.
+   */
+  synchronized LightWeightHashSet<Block> getBlocks(String storageId) {
+    return node2blocks.get(storageId);
   }
+
+  /**
+   * Schedules invalidation work associated with a storage at the corresponding
+   * datanode.
+   * @param storageId Storage of blocks to be invalidated
+   * @param dn Datanode where invalidation work will be scheduled
+   * @return List of blocks scheduled for invalidation at the datanode
+   */
+  abstract List<Block> invalidateWork(final String storageId,
+      final DatanodeDescriptor dn);
   
   synchronized void clear() {
     node2blocks.clear();

+ 67 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateStoredBlocks.java

@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.PrintWriter;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+
+/**
+ * Subclass of InvalidateBlocks used by the BlockManager to
+ * track blocks on each storage that are scheduled to be invalidated.
+ */
+public class InvalidateStoredBlocks extends InvalidateBlocks {
+
+  private final DatanodeManager datanodeManager;
+
+  InvalidateStoredBlocks(DatanodeManager datanodeManager) {
+    this.datanodeManager = datanodeManager;
+  }
+
+  /** Print the contents to out. */
+  synchronized void dump(final PrintWriter out) {
+    final int size = numStorages();
+    out.println("Metasave: Blocks " + numBlocks() 
+        + " waiting deletion from " + size + " datanodes.");
+    if (size == 0) {
+      return;
+    }
+
+    List<String> storageIds = getStorageIDs();
+    for (String storageId: storageIds) {
+      LightWeightHashSet<Block> blocks = getBlocks(storageId);
+      if (blocks != null && !blocks.isEmpty()) {
+        out.println(datanodeManager.getDatanode(storageId));
+        out.println(blocks);
+      }
+    }
+  }
+
+  @Override
+  synchronized List<Block> invalidateWork(
+      final String storageId, final DatanodeDescriptor dn) {
+    final List<Block> toInvalidate = pollNumBlocks(storageId,
+        datanodeManager.blockInvalidateLimit);
+    if (toInvalidate != null) {
+      dn.addBlocksToBeInvalidated(toInvalidate);
+    }
+    return toInvalidate;
+  }
+}

+ 18 - 11
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/PendingReplicationBlocks.java

@@ -29,20 +29,27 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.util.Daemon;
 
-/***************************************************
- * PendingReplicationBlocks does the bookkeeping of all
- * blocks that are getting replicated.
- *
- * It does the following:
- * 1)  record blocks that are getting replicated at this instant.
- * 2)  a coarse grain timer to track age of replication request
- * 3)  a thread that periodically identifies replication-requests
- *     that never made it.
- *
- ***************************************************/
+/**
+ * PendingReplicationBlocks is used in the BlockManager to track blocks that are
+ * currently being replicated on disk and in the CacheReplicationManager to
+ * track blocks that are currently being cached.
+ * 
+ * <p>
+ * PendingReplicationBlocks performs the following tasks:
+ * </p>
+ * 
+ * <ol>
+ * <li>tracks in-flight replication or caching requests for a block at target
+ * datanodes.</li>
+ * <li>identifies requests that have timed out and need to be rescheduled at a
+ * different datanode.</li>
+ * </ol>
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
 class PendingReplicationBlocks {
   private static final Log LOG = BlockManager.LOG;
 

+ 271 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java

@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles common operations of processing a block report from a datanode,
+ * generating a diff of updates to the BlocksMap, and then feeding the diff
+ * to the subclass-implemented hooks.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public abstract class ReportProcessor {
+
+  static final Log blockLog = NameNode.blockStateChangeLog;
+  private final String className = getClass().getSimpleName();
+  // Max number of blocks to log info about during a block report.
+  final long maxNumBlocksToLog;
+
+  void blockLogDebug(String message) {
+    if (blockLog.isDebugEnabled()) {
+      blockLog.info("BLOCK* " + className + message);
+    }
+  }
+
+  void blockLogInfo(String message) {
+    if (blockLog.isInfoEnabled()) {
+      blockLog.info("BLOCK* " + className + message);
+    }
+  }
+
+  void blockLogWarn(String message) {
+    blockLog.warn("BLOCK* " + className + message);
+  }
+
+  void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
+    if (!blockLog.isInfoEnabled()) {
+      return;
+    }
+    StringBuilder sb = new StringBuilder(500);
+    sb.append("BLOCK* " + className + "#addStoredBlock: blockMap updated: ")
+      .append(node)
+      .append(" is added to ");
+    storedBlock.appendStringTo(sb);
+    sb.append(" size " )
+      .append(storedBlock.getNumBytes());
+    blockLog.info(sb);
+  }
+
+  public ReportProcessor(Configuration conf) {
+    this.maxNumBlocksToLog = conf.getLong(
+        DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
+        DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+  }
+
+  /**
+   * Processes a block report from a datanode, updating the block to
+   * datanode mapping, adding new blocks and removing invalid ones.
+   * Also computes and queues new replication and invalidation work.
+   * @param node Datanode sending the block report
+   * @param report as list of longs
+   * @throws IOException
+   */
+  final void processReport(final DatanodeDescriptor node,
+      final BlockListAsLongs report) throws IOException {
+    // Normal case:
+    // Modify the (block-->datanode) map, according to the difference
+    // between the old and new block report.
+    //
+    Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
+    Collection<Block> toRemove = new LinkedList<Block>();
+    Collection<Block> toInvalidate = new LinkedList<Block>();
+    Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
+    Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+    reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+    // Process the blocks on each queue
+    for (StatefulBlockInfo b : toUC) {
+      addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+    }
+    for (Block b : toRemove) {
+      removeStoredBlock(b, node);
+    }
+    int numBlocksLogged = 0;
+    for (BlockInfo b : toAdd) {
+      addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+      numBlocksLogged++;
+    }
+
+    if (numBlocksLogged > maxNumBlocksToLog) {
+      blockLogInfo("#processReport: logged"
+          + " info for " + maxNumBlocksToLog
+          + " of " + numBlocksLogged + " reported.");
+    }
+    for (Block b : toInvalidate) {
+      blockLogInfo("#processReport: "
+          + b + " on " + node + " size " + b.getNumBytes()
+          + " does not belong to any file");
+      addToInvalidates(b, node);
+    }
+    for (BlockToMarkCorrupt b : toCorrupt) {
+      markBlockAsCorrupt(b, node);
+    }
+  }
+
+  /**
+   * Compute the difference between the current state of the datanode in the
+   * BlocksMap and the new reported state, categorizing changes into
+   * different groups (e.g. new blocks to be added, blocks that were removed,
+   * blocks that should be invalidated, etc.).
+   */
+  private void reportDiff(DatanodeDescriptor dn,
+      BlockListAsLongs newReport,
+      Collection<BlockInfo> toAdd,              // add to DatanodeDescriptor
+      Collection<Block> toRemove,           // remove from DatanodeDescriptor
+      Collection<Block> toInvalidate,       // should be removed from DN
+      Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
+      Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+    // place a delimiter in the list which separates blocks
+    // that have been reported from those that have not
+    BlockInfo delimiter = new BlockInfo(new Block(), 1);
+    boolean added = addBlock(dn, delimiter);
+    assert added : "Delimiting block cannot be present in the node";
+    int headIndex = 0; //currently the delimiter is in the head of the list
+    int curIndex;
+
+    if (newReport == null) {
+      newReport = new BlockListAsLongs();
+    }
+    // scan the report and process newly reported blocks
+    BlockReportIterator itBR = newReport.getBlockReportIterator();
+    while (itBR.hasNext()) {
+      Block iblk = itBR.next();
+      ReplicaState iState = itBR.getCurrentReplicaState();
+      BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+                                  toAdd, toInvalidate, toCorrupt, toUC);
+      // move block to the head of the list
+      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
+        headIndex = moveBlockToHead(dn, storedBlock, curIndex, headIndex);
+      }
+    }
+    // collect blocks that have not been reported
+    // all of them are next to the delimiter
+    Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+        delimiter.getNext(0), dn);
+    while (it.hasNext()) {
+      toRemove.add(it.next());
+    }
+    removeBlock(dn, delimiter);
+  }
+
+  // Operations on the blocks on a datanode
+
+  abstract int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
+      int curIndex, int headIndex);
+
+  abstract boolean addBlock(DatanodeDescriptor dn, BlockInfo block);
+
+  abstract boolean removeBlock(DatanodeDescriptor dn, BlockInfo block);
+
+  // Cache report processing
+
+  abstract BlockInfo processReportedBlock(DatanodeDescriptor dn, Block iblk,
+      ReplicaState iState, Collection<BlockInfo> toAdd,
+      Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt,
+      Collection<StatefulBlockInfo> toUC);
+
+  // Hooks for processing the cache report diff
+
+  abstract Block addStoredBlock(final BlockInfo block,
+      DatanodeDescriptor node, DatanodeDescriptor delNodeHint,
+      boolean logEveryBlock) throws IOException;
+
+  abstract void removeStoredBlock(Block block, DatanodeDescriptor node);
+
+  abstract void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeInfo dn)
+      throws IOException;
+
+  abstract void addToInvalidates(final Block b, final DatanodeInfo node);
+
+  abstract void addStoredBlockUnderConstruction(
+      BlockInfoUnderConstruction storedBlock, DatanodeDescriptor node,
+      ReplicaState reportedState) throws IOException;
+
+  /**
+   * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+   * list of blocks that should be considered corrupt due to a block report.
+   */
+  static class BlockToMarkCorrupt {
+    /** The corrupted block in a datanode. */
+    final BlockInfo corrupted;
+    /** The corresponding block stored in the BlockManager. */
+    final BlockInfo stored;
+    /** The reason to mark corrupt. */
+    final String reason;
+
+    BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+      Preconditions.checkNotNull(corrupted, "corrupted is null");
+      Preconditions.checkNotNull(stored, "stored is null");
+
+      this.corrupted = corrupted;
+      this.stored = stored;
+      this.reason = reason;
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, String reason) {
+      this(stored, stored, reason);
+    }
+
+    BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
+      this(new BlockInfo(stored), stored, reason);
+      //the corrupted block in datanode has a different generation stamp
+      corrupted.setGenerationStamp(gs);
+    }
+
+    @Override
+    public String toString() {
+      return corrupted + "("
+          + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+    }
+  }
+
+  /**
+   * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+   * updates to the information about under-construction blocks.
+   * Besides the block in question, it provides the ReplicaState
+   * reported by the datanode in the block report.
+   */
+  static class StatefulBlockInfo {
+    final BlockInfoUnderConstruction storedBlock;
+    final ReplicaState reportedState;
+
+    StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
+        ReplicaState reportedState) {
+      this.storedBlock = storedBlock;
+      this.reportedState = reportedState;
+    }
+  }
+
+}

+ 44 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Subclass of InvalidateBlocks used by the CacheReplicationManager to
+ * track blocks on each storage that are scheduled to be uncached.
+ */
+@InterfaceAudience.Private
+public class UncacheBlocks extends InvalidateBlocks {
+
+  UncacheBlocks() {
+  }
+
+  @Override
+  synchronized List<Block> invalidateWork(
+      final String storageId, final DatanodeDescriptor dn) {
+    final List<Block> toInvalidate = pollNumBlocks(storageId, Integer.MAX_VALUE);
+    if (toInvalidate != null) {
+      dn.addBlocksToBeUncached(toInvalidate);
+    }
+    return toInvalidate;
+  }
+}

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java

@@ -552,10 +552,12 @@ class BPOfferService {
     case DatanodeProtocol.DNA_CACHE:
       LOG.info("DatanodeCommand action: DNA_CACHE");
       dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+      dn.metrics.incrBlocksCached(bcmd.getBlocks().length);
       break;
     case DatanodeProtocol.DNA_UNCACHE:
       LOG.info("DatanodeCommand action: DNA_UNCACHE");
       dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+      dn.metrics.incrBlocksUncached(bcmd.getBlocks().length);
       break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command

+ 14 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java

@@ -449,11 +449,24 @@ class BPServiceActor implements Runnable {
     DatanodeCommand cmd = null;
     long startTime = Time.monotonicNow();
     if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
-      // TODO: Implement me!
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending cacheReport from service actor: " + this);
+      }
+      lastCacheReport = startTime;
+
       String bpid = bpos.getBlockPoolId();
       BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid);
+      long createTime = Time.monotonicNow();
+
       cmd = bpNamenode.cacheReport(bpRegistration, bpid,
           blocks.getBlockListAsLongs());
+      long sendTime = Time.monotonicNow();
+      long createCost = createTime - startTime;
+      long sendCost = sendTime - createTime;
+      dn.getMetrics().addCacheReport(sendCost);
+      LOG.info("CacheReport of " + blocks.getNumberOfBlocks()
+          + " blocks took " + createCost + " msec to generate and "
+          + sendCost + " msecs for RPC and NN processing");
     }
     return cmd;
   }

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java

@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@@ -114,9 +116,9 @@ public class DNConf {
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
-    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
-    this.cacheReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
         DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+        DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
     
     long initBRDelay = conf.getLong(
         DFS_BLOCKREPORT_INITIAL_DELAY_KEY,

+ 8 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java

@@ -105,10 +105,10 @@ public class FsDatasetCache {
    */
   List<Block> getCachedBlocks(String bpid) {
     List<Block> blocks = new ArrayList<Block>();
-    MappableBlock mapBlock = null;
     // ConcurrentHashMap iteration doesn't see latest updates, which is okay
-    for (Iterator<MappableBlock> it = cachedBlocks.values().iterator();
-        it.hasNext(); mapBlock = it.next()) {
+    Iterator<MappableBlock> it = cachedBlocks.values().iterator();
+    while (it.hasNext()) {
+      MappableBlock mapBlock = it.next();
       if (mapBlock.getBlockPoolId().equals(bpid)) {
         blocks.add(mapBlock.getBlock());
       }
@@ -174,12 +174,15 @@ public class FsDatasetCache {
         mapBlock.getBlockPoolId().equals(bpid) &&
         mapBlock.getBlock().equals(block)) {
       mapBlock.close();
-      cachedBlocks.remove(mapBlock);
+      cachedBlocks.remove(block.getBlockId());
       long bytes = mapBlock.getNumBytes();
       long used = usedBytes.get();
       while (!usedBytes.compareAndSet(used, used - bytes)) {
         used = usedBytes.get();
       }
+      LOG.info("Successfully uncached block " + block);
+    } else {
+      LOG.info("Could not uncache block " + block + ": unknown block.");
     }
   }
 
@@ -219,6 +222,7 @@ public class FsDatasetCache {
           used = usedBytes.get();
         }
       } else {
+        LOG.info("Successfully cached block " + block.getBlock());
         cachedBlocks.put(block.getBlock().getBlockId(), block);
       }
     }

+ 16 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java

@@ -57,6 +57,8 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong blocksRemoved;
   @Metric MutableCounterLong blocksVerified;
   @Metric MutableCounterLong blockVerificationFailures;
+  @Metric MutableCounterLong blocksCached;
+  @Metric MutableCounterLong blocksUncached;
   @Metric MutableCounterLong readsFromLocalClient;
   @Metric MutableCounterLong readsFromRemoteClient;
   @Metric MutableCounterLong writesFromLocalClient;
@@ -74,6 +76,7 @@ public class DataNodeMetrics {
   @Metric MutableRate replaceBlockOp;
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
+  @Metric MutableRate cacheReports;
   @Metric MutableRate packetAckRoundTripTimeNanos;
   MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
   
@@ -151,6 +154,10 @@ public class DataNodeMetrics {
     blockReports.add(latency);
   }
 
+  public void addCacheReport(long latency) {
+    cacheReports.add(latency);
+  }
+
   public void incrBlocksReplicated(int delta) {
     blocksReplicated.incr(delta);
   }
@@ -175,6 +182,15 @@ public class DataNodeMetrics {
     blocksVerified.incr();
   }
 
+
+  public void incrBlocksCached(int delta) {
+    blocksCached.incr(delta);
+  }
+
+  public void incrBlocksUncached(int delta) {
+    blocksUncached.incr(delta);
+  }
+
   public void addReadBlockOp(long latency) {
     readBlockOp.add(latency);
   }

+ 67 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -26,9 +26,9 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -51,7 +51,7 @@ import org.apache.hadoop.util.Fallible;
 /**
  * The Cache Manager handles caching on DataNodes.
  */
-final class CacheManager {
+public final class CacheManager {
   public static final Log LOG = LogFactory.getLog(CacheManager.class);
 
   /**
@@ -69,6 +69,12 @@ final class CacheManager {
   private final TreeMap<PathBasedCacheDirective, PathBasedCacheEntry> entriesByDirective =
       new TreeMap<PathBasedCacheDirective, PathBasedCacheEntry>();
 
+  /**
+   * Cache entries, sorted by path
+   */
+  private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath =
+      new TreeMap<String, List<PathBasedCacheEntry>>();
+
   /**
    * Cache pools, sorted by name.
    */
@@ -90,9 +96,14 @@ final class CacheManager {
    */
   private final int maxListCacheDirectivesResponses;
 
-  CacheManager(FSDirectory dir, Configuration conf) {
+  final private FSNamesystem namesystem;
+  final private FSDirectory dir;
+
+  CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) {
     // TODO: support loading and storing of the CacheManager state
     clear();
+    this.namesystem = namesystem;
+    this.dir = dir;
     maxListCachePoolsResponses = conf.getInt(
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
@@ -104,6 +115,7 @@ final class CacheManager {
   synchronized void clear() {
     entriesById.clear();
     entriesByDirective.clear();
+    entriesByPath.clear();
     cachePools.clear();
     nextEntryId = 1;
   }
@@ -131,7 +143,8 @@ final class CacheManager {
     try {
       directive.validate();
     } catch (IOException ioe) {
-      LOG.info("addDirective " + directive + ": validation failed.");
+      LOG.info("addDirective " + directive + ": validation failed: "
+          + ioe.getClass().getName() + ": " + ioe.getMessage());
       return new Fallible<PathBasedCacheEntry>(ioe);
     }
     // Check if we already have this entry.
@@ -152,8 +165,34 @@ final class CacheManager {
     }
     LOG.info("addDirective " + directive + ": added cache directive "
         + directive);
+
+    // Success!
+    // First, add it to the various maps
     entriesByDirective.put(directive, entry);
     entriesById.put(entry.getEntryId(), entry);
+    String path = directive.getPath();
+    List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
+    if (entryList == null) {
+      entryList = new ArrayList<PathBasedCacheEntry>(1);
+      entriesByPath.put(path, entryList);
+    }
+    entryList.add(entry);
+
+    // Next, set the path as cached in the namesystem
+    try {
+      INode node = dir.getINode(directive.getPath());
+      if (node.isFile()) {
+        INodeFile file = node.asFile();
+        // TODO: adjustable cache replication factor
+        namesystem.setCacheReplicationInt(directive.getPath(),
+            file.getBlockReplication());
+      }
+    } catch (IOException ioe) {
+      LOG.info("addDirective " + directive +": failed to cache file: " +
+          ioe.getClass().getName() +": " + ioe.getMessage());
+      return new Fallible<PathBasedCacheEntry>(ioe);
+    }
+
     return new Fallible<PathBasedCacheEntry>(entry);
   }
 
@@ -201,7 +240,31 @@ final class CacheManager {
       return new Fallible<Long>(
           new UnexpectedRemovePathBasedCacheEntryException(entryId));
     }
+    // Remove the corresponding entry in entriesByPath.
+    String path = existing.getDirective().getPath();
+    List<PathBasedCacheEntry> entries = entriesByPath.get(path);
+    if (entries == null || !entries.remove(existing)) {
+      return new Fallible<Long>(
+          new UnexpectedRemovePathBasedCacheEntryException(entryId));
+    }
+    if (entries.size() == 0) {
+      entriesByPath.remove(path);
+    }
     entriesById.remove(entryId);
+
+    // Set the path as uncached in the namesystem
+    try {
+      INode node = dir.getINode(existing.getDirective().getPath());
+      if (node.isFile()) {
+        namesystem.setCacheReplicationInt(existing.getDirective().getPath(),
+            (short) 0);
+      }
+    } catch (IOException e) {
+      LOG.warn("removeEntry " + entryId + ": failure while setting cache"
+          + " replication factor", e);
+      return new Fallible<Long>(e);
+    }
+    LOG.info("removeEntry successful for PathCacheEntry id " + entryId);
     return new Fallible<Long>(entryId);
   }
 

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java

@@ -1091,6 +1091,52 @@ public class FSDirectory implements Closeable {
     return file.getBlocks();
   }
 
+  /**
+   * Set cache replication for a file
+   * 
+   * @param src file name
+   * @param replication new replication
+   * @param blockRepls block replications - output parameter
+   * @return array of file blocks
+   * @throws QuotaExceededException
+   * @throws SnapshotAccessControlException
+   */
+  Block[] setCacheReplication(String src, short replication, short[] blockRepls)
+      throws QuotaExceededException, UnresolvedLinkException,
+      SnapshotAccessControlException {
+    waitForReady();
+    writeLock();
+    try {
+      return unprotectedSetCacheReplication(src, replication, blockRepls);
+    } finally {
+      writeUnlock();
+    }
+  }
+
+  Block[] unprotectedSetCacheReplication(String src, short replication,
+      short[] blockRepls) throws QuotaExceededException,
+      UnresolvedLinkException, SnapshotAccessControlException {
+    assert hasWriteLock();
+
+    final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
+    final INode inode = iip.getLastINode();
+    if (inode == null || !inode.isFile()) {
+      return null;
+    }
+    INodeFile file = inode.asFile();
+    final short oldBR = file.getCacheReplication();
+
+    // TODO: Update quotas here as repl goes up or down
+    file.setCacheReplication(replication);
+    final short newBR = file.getCacheReplication();
+
+    if (blockRepls != null) {
+      blockRepls[0] = oldBR;
+      blockRepls[1] = newBR;
+    }
+    return file.getBlocks();
+  }
+
   /**
    * @param path the file path
    * @return the block size of the file. 

+ 53 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -367,6 +367,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   private final BlockManager blockManager;
   private final SnapshotManager snapshotManager;
   private final CacheManager cacheManager;
+  private final CacheReplicationManager cacheReplicationManager;
   private final DatanodeStatistics datanodeStatistics;
 
   // Block pool ID used by this namenode
@@ -694,7 +695,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(fsImage, this, conf);
       this.snapshotManager = new SnapshotManager(dir);
-      this.cacheManager= new CacheManager(dir, conf);
+      this.cacheManager = new CacheManager(this, dir, conf);
+      this.cacheReplicationManager = new CacheReplicationManager(this,
+          blockManager, blockManager.getDatanodeManager(), this, conf);
       this.safeMode = new SafeModeInfo(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -871,6 +874,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         getCompleteBlocksTotal());
       setBlockTotal();
       blockManager.activate(conf);
+      cacheReplicationManager.activate();
     } finally {
       writeUnlock();
     }
@@ -887,6 +891,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     try {
       if (blockManager != null) blockManager.close();
+      if (cacheReplicationManager != null) cacheReplicationManager.close();
     } finally {
       writeUnlock();
     }
@@ -917,7 +922,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         blockManager.getDatanodeManager().markAllDatanodesStale();
         blockManager.clearQueues();
         blockManager.processAllPendingDNMessages();
-        
+
+        cacheReplicationManager.clearQueues();
+
         if (!isInSafeMode() ||
             (isInSafeMode() && safeMode.isPopulatingReplQueues())) {
           LOG.info("Reprocessing replication and invalidation queues");
@@ -1910,6 +1917,42 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return isFile;
   }
 
+  boolean setCacheReplicationInt(String src, final short replication)
+      throws IOException {
+    final boolean isFile;
+    FSPermissionChecker pc = getPermissionChecker();
+    checkOperation(OperationCategory.WRITE);
+    byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot set replication for " + src, safeMode);
+      }
+      src = FSDirectory.resolvePath(src, pathComponents, dir);
+      if (isPermissionEnabled) {
+        checkPathAccess(pc, src, FsAction.WRITE);
+      }
+
+      final short[] blockRepls = new short[2]; // 0: old, 1: new
+      final Block[] blocks = dir.setCacheReplication(src, replication,
+          blockRepls);
+      isFile = (blocks != null);
+      if (isFile) {
+        cacheReplicationManager.setCacheReplication(blockRepls[0],
+            blockRepls[1], src, blocks);
+      }
+    } finally {
+      writeUnlock();
+    }
+
+    getEditLog().logSync();
+    if (isFile) {
+      logAuditEvent(true, "setReplication", src);
+    }
+    return isFile;
+  }
+
   long getPreferredBlockSize(String filename) 
       throws IOException, UnresolvedLinkException {
     FSPermissionChecker pc = getPermissionChecker();
@@ -6391,6 +6434,14 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
   public FSDirectory getFSDirectory() {
     return dir;
   }
+  /** @return the cache manager. */
+  public CacheManager getCacheManager() {
+    return cacheManager;
+  }
+  /** @return the cache replication manager. */
+  public CacheReplicationManager getCacheReplicationManager() {
+    return cacheReplicationManager;
+  }
 
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {
@@ -6959,10 +7010,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     return results;
   }
 
-  public CacheManager getCacheManager() {
-    return cacheManager;
-  }
-
   /**
    * Default AuditLogger implementation; used when no access logger is
    * defined in the config file. It can also be explicitly listed in the

+ 14 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java

@@ -104,6 +104,8 @@ public class INodeFile extends INodeWithAdditionalFields
 
   private BlockInfo[] blocks;
 
+  private short cacheReplication = 0;
+
   INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
       BlockInfo[] blklist, short replication, long preferredBlockSize) {
     super(id, name, permissions, mtime, atime);
@@ -199,6 +201,18 @@ public class INodeFile extends INodeWithAdditionalFields
     return nodeToUpdate;
   }
 
+  @Override
+  public void setCacheReplication(short cacheReplication) {
+    Preconditions.checkArgument(cacheReplication <= getBlockReplication(),
+        "Cannot set cache replication higher than block replication factor");
+    this.cacheReplication = cacheReplication;
+  }
+
+  @Override
+  public short getCacheReplication() {
+    return cacheReplication;
+  }
+
   /** @return preferred block size (in bytes) of the file. */
   @Override
   public long getPreferredBlockSize() {

+ 8 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java

@@ -968,7 +968,14 @@ class NameNodeRpcServer implements NamenodeProtocols {
       String poolId, long[] blocks) throws IOException {
     verifyRequest(nodeReg);
     BlockListAsLongs blist = new BlockListAsLongs(blocks);
-    namesystem.getBlockManager().processCacheReport(nodeReg, poolId, blist);
+    if (blockStateChangeLog.isDebugEnabled()) {
+      blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
+           + "from " + nodeReg + " " + blist.getNumberOfBlocks()
+           + " blocks");
+    }
+
+    namesystem.getCacheReplicationManager()
+        .processCacheReport(nodeReg, poolId, blist);
     return null;
   }
 

+ 13 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java

@@ -79,6 +79,8 @@ public class NameNodeMetrics {
   MutableCounterLong transactionsBatchedInSync;
   @Metric("Block report") MutableRate blockReport;
   MutableQuantiles[] blockReportQuantiles;
+  @Metric("Cache report") MutableRate cacheReport;
+  MutableQuantiles[] cacheReportQuantiles;
 
   @Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
   @Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
@@ -89,6 +91,7 @@ public class NameNodeMetrics {
     final int len = intervals.length;
     syncsQuantiles = new MutableQuantiles[len];
     blockReportQuantiles = new MutableQuantiles[len];
+    cacheReportQuantiles = new MutableQuantiles[len];
     
     for (int i = 0; i < len; i++) {
       int interval = intervals[i];
@@ -98,6 +101,9 @@ public class NameNodeMetrics {
       blockReportQuantiles[i] = registry.newQuantiles(
           "blockReport" + interval + "s", 
           "Block report", "ops", "latency", interval);
+      cacheReportQuantiles[i] = registry.newQuantiles(
+          "cacheReport" + interval + "s",
+          "Cache report", "ops", "latency", interval);
     }
   }
 
@@ -227,6 +233,13 @@ public class NameNodeMetrics {
     }
   }
 
+  public void addCacheBlockReport(long latency) {
+    cacheReport.add(latency);
+    for (MutableQuantiles q : cacheReportQuantiles) {
+      q.add(latency);
+    }
+  }
+
   public void setSafeModeTime(long elapsed) {
     safeModeTime.set((int) elapsed);
   }

+ 162 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java

@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Fallible;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCacheReplicationManager {
+
+  // Most Linux installs allow a default of 64KB locked memory
+  private static final long CACHE_CAPACITY = 64 * 1024;
+  private static final long BLOCK_SIZE = 4096;
+
+  private static Configuration conf;
+  private static MiniDFSCluster cluster = null;
+  private static FileSystem fs;
+  private static NameNode nn;
+  private static NamenodeProtocols nnRpc;
+  private static CacheReplicationManager cacheReplManager;
+  final private static FileSystemTestHelper helper = new FileSystemTestHelper();
+  private static Path rootDir;
+
+  @Before
+  public void setUp() throws Exception {
+
+    assumeTrue(NativeIO.isAvailable());
+
+    conf = new HdfsConfiguration();
+    conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        CACHE_CAPACITY);
+    conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+    conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+
+    cluster = new MiniDFSCluster.Builder(conf)
+        .numDataNodes(1).build();
+    cluster.waitActive();
+
+    fs = cluster.getFileSystem();
+    nn = cluster.getNameNode();
+    nnRpc = nn.getRpcServer();
+    cacheReplManager = nn.getNamesystem().getCacheReplicationManager();
+    rootDir = helper.getDefaultWorkingDirectory(fs);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.close();
+    }
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  private int countNumCachedBlocks() {
+    return cacheReplManager.cachedBlocksMap.size();
+  }
+
+  private void waitForExpectedNumCachedBlocks(final int expected)
+      throws Exception {
+    int actual = countNumCachedBlocks();
+    while (expected != actual)  {
+      Thread.sleep(500);
+      actual = countNumCachedBlocks();
+    }
+  }
+
+  @Test(timeout=60000)
+  public void testCachePaths() throws Exception {
+    // Create the pool
+    final String pool = "friendlyPool";
+    nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+    // Create some test files
+    final int numFiles = 3;
+    final int numBlocksPerFile = 2;
+    final List<String> paths = new ArrayList<String>(numFiles);
+    for (int i=0; i<numFiles; i++) {
+      Path p = new Path(rootDir, "testCachePaths-" + i);
+      FileSystemTestHelper.createFile(fs, p, numBlocksPerFile, (int)BLOCK_SIZE);
+      paths.add(p.toUri().getPath());
+    }
+    // Check the initial statistics at the namenode
+    int expected = 0;
+    waitForExpectedNumCachedBlocks(expected);
+    // Cache and check each path in sequence
+    for (int i=0; i<numFiles; i++) {
+      List<PathBasedCacheDirective> toAdd =
+          new ArrayList<PathBasedCacheDirective>();
+      toAdd.add(new PathBasedCacheDirective(paths.get(i), pool));
+      List<Fallible<PathBasedCacheEntry>> fallibles =
+          nnRpc.addPathBasedCacheDirectives(toAdd);
+      assertEquals("Unexpected number of fallibles",
+          1, fallibles.size());
+      PathBasedCacheEntry entry = fallibles.get(0).get();
+      PathBasedCacheDirective directive = entry.getDirective();
+      assertEquals("Directive does not match requested path", paths.get(i),
+          directive.getPath());
+      assertEquals("Directive does not match requested pool", pool,
+          directive.getPool());
+      expected += numBlocksPerFile;
+      waitForExpectedNumCachedBlocks(expected);
+    }
+    // Uncache and check each path in sequence
+    RemoteIterator<PathBasedCacheEntry> entries =
+        nnRpc.listPathBasedCacheEntries(0, null, null);
+    for (int i=0; i<numFiles; i++) {
+      PathBasedCacheEntry entry = entries.next();
+      List<Long> toRemove = new ArrayList<Long>();
+      toRemove.add(entry.getEntryId());
+      List<Fallible<Long>> fallibles = nnRpc.removePathBasedCacheEntries(toRemove);
+      assertEquals("Unexpected number of fallibles", 1, fallibles.size());
+      Long l = fallibles.get(0).get();
+      assertEquals("Removed entryId does not match requested",
+          entry.getEntryId(), l.longValue());
+      expected -= numBlocksPerFile;
+      waitForExpectedNumCachedBlocks(expected);
+    }
+  }
+}