Browse Source

HDFS-5651. Remove dfs.namenode.caching.enabled and improve CRM locking. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1555002 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang 11 năm trước cách đây
mục cha
commit
d85c017d04

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -243,6 +243,9 @@ Trunk (Unreleased)
 
 
     HDFS-5636. Enforce a max TTL per cache pool. (awang via cmccabe)
     HDFS-5636. Enforce a max TTL per cache pool. (awang via cmccabe)
 
 
+    HDFS-5651. Remove dfs.namenode.caching.enabled and improve CRM locking.
+    (cmccabe via wang)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)

+ 3 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -108,8 +108,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
   public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
   public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
-  public static final String  DFS_NAMENODE_CACHING_ENABLED_KEY = "dfs.namenode.caching.enabled";
-  public static final boolean DFS_NAMENODE_CACHING_ENABLED_DEFAULT = false;
+  public static final String  DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
+    "dfs.namenode.path.based.cache.block.map.allocation.percent";
+  public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
 
 
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

+ 49 - 101
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

@@ -87,17 +87,17 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
    * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
    * The CacheReplicationMonitor (CRM) lock. Used to synchronize starting and
    * waiting for rescan operations.
    * waiting for rescan operations.
    */
    */
-  private final ReentrantLock lock = new ReentrantLock();
+  private final ReentrantLock lock;
 
 
   /**
   /**
    * Notifies the scan thread that an immediate rescan is needed.
    * Notifies the scan thread that an immediate rescan is needed.
    */
    */
-  private final Condition doRescan = lock.newCondition();
+  private final Condition doRescan;
 
 
   /**
   /**
    * Notifies waiting threads that a rescan has finished.
    * Notifies waiting threads that a rescan has finished.
    */
    */
-  private final Condition scanFinished = lock.newCondition();
+  private final Condition scanFinished;
 
 
   /**
   /**
    * Whether there are pending CacheManager operations that necessitate a
    * Whether there are pending CacheManager operations that necessitate a
@@ -121,11 +121,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
    */
    */
   private boolean shutdown = false;
   private boolean shutdown = false;
 
 
-  /**
-   * The monotonic time at which the current scan started.
-   */
-  private long startTimeMs;
-
   /**
   /**
    * Mark status of the current scan.
    * Mark status of the current scan.
    */
    */
@@ -142,24 +137,27 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
   private long scannedBlocks;
   private long scannedBlocks;
 
 
   public CacheReplicationMonitor(FSNamesystem namesystem,
   public CacheReplicationMonitor(FSNamesystem namesystem,
-      CacheManager cacheManager, long intervalMs) {
+      CacheManager cacheManager, long intervalMs, ReentrantLock lock) {
     this.namesystem = namesystem;
     this.namesystem = namesystem;
     this.blockManager = namesystem.getBlockManager();
     this.blockManager = namesystem.getBlockManager();
     this.cacheManager = cacheManager;
     this.cacheManager = cacheManager;
     this.cachedBlocks = cacheManager.getCachedBlocks();
     this.cachedBlocks = cacheManager.getCachedBlocks();
     this.intervalMs = intervalMs;
     this.intervalMs = intervalMs;
+    this.lock = lock;
+    this.doRescan = this.lock.newCondition();
+    this.scanFinished = this.lock.newCondition();
   }
   }
 
 
   @Override
   @Override
   public void run() {
   public void run() {
-    startTimeMs = 0;
+    long startTimeMs = 0;
+    Thread.currentThread().setName("CacheReplicationMonitor(" +
+        System.identityHashCode(this) + ")");
     LOG.info("Starting CacheReplicationMonitor with interval " +
     LOG.info("Starting CacheReplicationMonitor with interval " +
              intervalMs + " milliseconds");
              intervalMs + " milliseconds");
     try {
     try {
       long curTimeMs = Time.monotonicNow();
       long curTimeMs = Time.monotonicNow();
       while (true) {
       while (true) {
-        // Not all of the variables accessed here need the CRM lock, but take
-        // it anyway for simplicity
         lock.lock();
         lock.lock();
         try {
         try {
           while (true) {
           while (true) {
@@ -180,12 +178,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
             doRescan.await(delta, TimeUnit.MILLISECONDS);
             doRescan.await(delta, TimeUnit.MILLISECONDS);
             curTimeMs = Time.monotonicNow();
             curTimeMs = Time.monotonicNow();
           }
           }
-        } finally {
-          lock.unlock();
-        }
-        // Mark scan as started, clear needsRescan
-        lock.lock();
-        try {
           isScanning = true;
           isScanning = true;
           needsRescan = false;
           needsRescan = false;
         } finally {
         } finally {
@@ -195,7 +187,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
         mark = !mark;
         mark = !mark;
         rescan();
         rescan();
         curTimeMs = Time.monotonicNow();
         curTimeMs = Time.monotonicNow();
-        // Retake the CRM lock to update synchronization-related variables
+        // Update synchronization-related variables.
         lock.lock();
         lock.lock();
         try {
         try {
           isScanning = false;
           isScanning = false;
@@ -208,32 +200,15 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
             scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
             scannedBlocks + " block(s) in " + (curTimeMs - startTimeMs) + " " +
             "millisecond(s).");
             "millisecond(s).");
       }
       }
+    } catch (InterruptedException e) {
+      LOG.info("Shutting down CacheReplicationMonitor.");
+      return;
     } catch (Throwable t) {
     } catch (Throwable t) {
       LOG.fatal("Thread exiting", t);
       LOG.fatal("Thread exiting", t);
       terminate(1, t);
       terminate(1, t);
     }
     }
   }
   }
 
 
-  /**
-   * Similar to {@link CacheReplicationMonitor#waitForRescan()}, except it only
-   * waits if there are pending operations that necessitate a rescan as
-   * indicated by {@link #setNeedsRescan()}.
-   * <p>
-   * Note that this call may release the FSN lock, so operations before and
-   * after are not necessarily atomic.
-   */
-  public void waitForRescanIfNeeded() {
-    lock.lock();
-    try {
-      if (!needsRescan) {
-        return;
-      }
-    } finally {
-      lock.unlock();
-    }
-    waitForRescan();
-  }
-
   /**
   /**
    * Waits for a rescan to complete. This doesn't guarantee consistency with
    * Waits for a rescan to complete. This doesn't guarantee consistency with
    * pending operations, only relative recency, since it will not force a new
    * pending operations, only relative recency, since it will not force a new
@@ -242,49 +217,27 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
    * Note that this call will release the FSN lock, so operations before and
    * Note that this call will release the FSN lock, so operations before and
    * after are not atomic.
    * after are not atomic.
    */
    */
-  public void waitForRescan() {
-    // Drop the FSN lock temporarily and retake it after we finish waiting
-    // Need to handle both the read lock and the write lock
-    boolean retakeWriteLock = false;
-    if (namesystem.hasWriteLock()) {
-      namesystem.writeUnlock();
-      retakeWriteLock = true;
-    } else if (namesystem.hasReadLock()) {
-      namesystem.readUnlock();
-    } else {
-      // Expected to have at least one of the locks
-      Preconditions.checkState(false,
-          "Need to be holding either the read or write lock");
+  public void waitForRescanIfNeeded() {
+    Preconditions.checkArgument(!namesystem.hasWriteLock(),
+        "Must not hold the FSN write lock when waiting for a rescan.");
+    Preconditions.checkArgument(lock.isHeldByCurrentThread(),
+        "Must hold the CRM lock when waiting for a rescan.");
+    if (!needsRescan) {
+      return;
     }
     }
-    // try/finally for retaking FSN lock
-    try {
-      lock.lock();
-      // try/finally for releasing CRM lock
+    // If no scan is already ongoing, mark the CRM as dirty and kick
+    if (!isScanning) {
+      doRescan.signal();
+    }
+    // Wait until the scan finishes and the count advances
+    final long startCount = scanCount;
+    while ((!shutdown) && (startCount >= scanCount)) {
       try {
       try {
-        // If no scan is already ongoing, mark the CRM as dirty and kick
-        if (!isScanning) {
-          needsRescan = true;
-          doRescan.signal();
-        }
-        // Wait until the scan finishes and the count advances
-        final long startCount = scanCount;
-        while (startCount >= scanCount) {
-          try {
-            scanFinished.await();
-          } catch (InterruptedException e) {
-            LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
-                + " rescan", e);
-            break;
-          }
-        }
-      } finally {
-        lock.unlock();
-      }
-    } finally {
-      if (retakeWriteLock) {
-        namesystem.writeLock();
-      } else {
-        namesystem.readLock();
+        scanFinished.await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted while waiting for CacheReplicationMonitor"
+            + " rescan", e);
+        break;
       }
       }
     }
     }
   }
   }
@@ -294,42 +247,43 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
    * changes that require a rescan.
    * changes that require a rescan.
    */
    */
   public void setNeedsRescan() {
   public void setNeedsRescan() {
-    lock.lock();
-    try {
-      this.needsRescan = true;
-    } finally {
-      lock.unlock();
-    }
+    Preconditions.checkArgument(lock.isHeldByCurrentThread(),
+        "Must hold the CRM lock when setting the needsRescan bit.");
+    this.needsRescan = true;
   }
   }
 
 
   /**
   /**
-   * Shut down and join the monitor thread.
+   * Shut down the monitor thread.
    */
    */
   @Override
   @Override
   public void close() throws IOException {
   public void close() throws IOException {
+    Preconditions.checkArgument(namesystem.hasWriteLock());
     lock.lock();
     lock.lock();
     try {
     try {
       if (shutdown) return;
       if (shutdown) return;
+      // Since we hold both the FSN write lock and the CRM lock here,
+      // we know that the CRM thread cannot be currently modifying
+      // the cache manager state while we're closing it.
+      // Since the CRM thread checks the value of 'shutdown' after waiting
+      // for a lock, we know that the thread will not modify the cache
+      // manager state after this point.
       shutdown = true;
       shutdown = true;
       doRescan.signalAll();
       doRescan.signalAll();
       scanFinished.signalAll();
       scanFinished.signalAll();
     } finally {
     } finally {
       lock.unlock();
       lock.unlock();
     }
     }
-    try {
-      if (this.isAlive()) {
-        this.join(60000);
-      }
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-    }
   }
   }
 
 
-  private void rescan() {
+  private void rescan() throws InterruptedException {
     scannedDirectives = 0;
     scannedDirectives = 0;
     scannedBlocks = 0;
     scannedBlocks = 0;
     namesystem.writeLock();
     namesystem.writeLock();
     try {
     try {
+      if (shutdown) {
+        throw new InterruptedException("CacheReplicationMonitor was " +
+            "shut down.");
+      }
       resetStatistics();
       resetStatistics();
       rescanCacheDirectives();
       rescanCacheDirectives();
       rescanCachedBlockMap();
       rescanCachedBlockMap();
@@ -609,9 +563,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
   private void addNewPendingUncached(int neededUncached,
   private void addNewPendingUncached(int neededUncached,
       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
       List<DatanodeDescriptor> pendingUncached) {
       List<DatanodeDescriptor> pendingUncached) {
-    if (!cacheManager.isActive()) {
-      return;
-    }
     // Figure out which replicas can be uncached.
     // Figure out which replicas can be uncached.
     LinkedList<DatanodeDescriptor> possibilities =
     LinkedList<DatanodeDescriptor> possibilities =
         new LinkedList<DatanodeDescriptor>();
         new LinkedList<DatanodeDescriptor>();
@@ -647,9 +598,6 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
   private void addNewPendingCached(int neededCached,
   private void addNewPendingCached(int neededCached,
       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
       CachedBlock cachedBlock, List<DatanodeDescriptor> cached,
       List<DatanodeDescriptor> pendingCached) {
       List<DatanodeDescriptor> pendingCached) {
-    if (!cacheManager.isActive()) {
-      return;
-    }
     // To figure out which replicas can be cached, we consult the
     // To figure out which replicas can be cached, we consult the
     // blocksMap.  We don't want to try to cache a corrupt replica, though.
     // blocksMap.  We don't want to try to cache a corrupt replica, though.
     BlockInfo blockInfo = blockManager.
     BlockInfo blockInfo = blockManager.

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -1443,6 +1443,13 @@ public class DatanodeManager {
     return getClass().getSimpleName() + ": " + host2DatanodeMap;
     return getClass().getSimpleName() + ": " + host2DatanodeMap;
   }
   }
 
 
+  public void clearPendingCachingCommands() {
+    for (DatanodeDescriptor dn : datanodeMap.values()) {
+      dn.getPendingCached().clear();
+      dn.getPendingUncached().clear();
+    }
+  }
+
   public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
   public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
     this.shouldSendCachingCommands = shouldSendCachingCommands;
     this.shouldSendCachingCommands = shouldSendCachingCommands;
   }
   }

+ 84 - 99
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -17,8 +17,8 @@
  */
  */
 package org.apache.hadoop.hdfs.server.namenode;
 package org.apache.hadoop.hdfs.server.namenode;
 
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
@@ -40,6 +40,7 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -84,7 +85,7 @@ import com.google.common.annotations.VisibleForTesting;
 /**
 /**
  * The Cache Manager handles caching on DataNodes.
  * The Cache Manager handles caching on DataNodes.
  *
  *
- * This class is instantiated by the FSNamesystem when caching is enabled.
+ * This class is instantiated by the FSNamesystem.
  * It maintains the mapping of cached blocks to datanodes via processing
  * It maintains the mapping of cached blocks to datanodes via processing
  * datanode cache reports. Based on these reports and addition and removal of
  * datanode cache reports. Based on these reports and addition and removal of
  * caching directives, we will schedule caching and uncaching work.
  * caching directives, we will schedule caching and uncaching work.
@@ -93,6 +94,8 @@ import com.google.common.annotations.VisibleForTesting;
 public final class CacheManager {
 public final class CacheManager {
   public static final Log LOG = LogFactory.getLog(CacheManager.class);
   public static final Log LOG = LogFactory.getLog(CacheManager.class);
 
 
+  private static final float MIN_CACHED_BLOCKS_PERCENT = 0.001f;
+
   // TODO: add pending / underCached / schedule cached blocks stats.
   // TODO: add pending / underCached / schedule cached blocks stats.
 
 
   /**
   /**
@@ -148,32 +151,14 @@ public final class CacheManager {
   private final long scanIntervalMs;
   private final long scanIntervalMs;
 
 
   /**
   /**
-   * Whether caching is enabled.
-   *
-   * If caching is disabled, we will not process cache reports or store
-   * information about what is cached where.  We also do not start the
-   * CacheReplicationMonitor thread.  This will save resources, but provide
-   * less functionality.
-   *     
-   * Even when caching is disabled, we still store path-based cache
-   * information.  This information is stored in the edit log and fsimage.  We
-   * don't want to lose it just because a configuration setting was turned off.
-   * However, we will not act on this information if caching is disabled.
+   * All cached blocks.
    */
    */
-  private final boolean enabled;
+  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
 
 
   /**
   /**
-   * Whether the CacheManager is active.
-   * 
-   * When the CacheManager is active, it tells the DataNodes what to cache
-   * and uncache.  The CacheManager cannot become active if enabled = false.
+   * Lock which protects the CacheReplicationMonitor.
    */
    */
-  private boolean active = false;
-
-  /**
-   * All cached blocks.
-   */
-  private final GSet<CachedBlock, CachedBlock> cachedBlocks;
+  private final ReentrantLock crmLock = new ReentrantLock();
 
 
   /**
   /**
    * The CacheReplicationMonitor.
    * The CacheReplicationMonitor.
@@ -194,54 +179,51 @@ public final class CacheManager {
     scanIntervalMs = conf.getLong(
     scanIntervalMs = conf.getLong(
         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
         DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
-    this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY,
-        DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
-    this.cachedBlocks = !enabled ? null :
-        new LightWeightGSet<CachedBlock, CachedBlock>(
-            LightWeightGSet.computeCapacity(0.25, "cachedBlocks"));
+    float cachedBlocksPercent = conf.getFloat(
+          DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT,
+          DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT);
+    if (cachedBlocksPercent < MIN_CACHED_BLOCKS_PERCENT) {
+      LOG.info("Using minimum value " + MIN_CACHED_BLOCKS_PERCENT +
+        " for " + DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT);
+      cachedBlocksPercent = MIN_CACHED_BLOCKS_PERCENT;
+    }
+    this.cachedBlocks = new LightWeightGSet<CachedBlock, CachedBlock>(
+          LightWeightGSet.computeCapacity(cachedBlocksPercent,
+              "cachedBlocks"));
+
   }
   }
 
 
-  /**
-   * Activate the cache manager.
-   * 
-   * When the cache manager is active, tell the datanodes where to cache files.
-   */
-  public void activate() {
-    assert namesystem.hasWriteLock();
-    if (enabled && (!active)) {
-      LOG.info("Activating CacheManager.  " +
-          "Starting replication monitor thread...");
-      active = true;
-      monitor = new CacheReplicationMonitor(namesystem, this,
-         scanIntervalMs);
-      monitor.start();
+  public void startMonitorThread() {
+    crmLock.lock();
+    try {
+      if (this.monitor == null) {
+        this.monitor = new CacheReplicationMonitor(namesystem, this,
+            scanIntervalMs, crmLock);
+        this.monitor.start();
+      }
+    } finally {
+      crmLock.unlock();
     }
     }
   }
   }
 
 
-  /**
-   * Deactivate the cache manager.
-   * 
-   * When the cache manager is inactive, it does not tell the datanodes where to
-   * cache files.
-   */
-  public void deactivate() {
-    assert namesystem.hasWriteLock();
-    if (active) {
-      LOG.info("Deactivating CacheManager.  " +
-          "stopping CacheReplicationMonitor thread...");
-      active = false;
-      IOUtils.closeQuietly(monitor);
-      monitor = null;
-      LOG.info("CacheReplicationMonitor thread stopped and deactivated.");
+  public void stopMonitorThread() {
+    crmLock.lock();
+    try {
+      if (this.monitor != null) {
+        CacheReplicationMonitor prevMonitor = this.monitor;
+        this.monitor = null;
+        IOUtils.closeQuietly(prevMonitor);
+      }
+    } finally {
+      crmLock.unlock();
     }
     }
   }
   }
 
 
-  /**
-   * Return true only if the cache manager is active.
-   * Must be called under the FSN read or write lock.
-   */
-  public boolean isActive() {
-    return active;
+  public void clearDirectiveStats() {
+    assert namesystem.hasWriteLock();
+    for (CacheDirective directive : directivesById.values()) {
+      directive.resetStatistics();
+    }
   }
   }
 
 
   /**
   /**
@@ -480,9 +462,7 @@ public final class CacheManager {
     directive.addBytesNeeded(stats.getBytesNeeded());
     directive.addBytesNeeded(stats.getBytesNeeded());
     directive.addFilesNeeded(directive.getFilesNeeded());
     directive.addFilesNeeded(directive.getFilesNeeded());
 
 
-    if (monitor != null) {
-      monitor.setNeedsRescan();
-    }
+    setNeedsRescan();
   }
   }
 
 
   /**
   /**
@@ -514,10 +494,6 @@ public final class CacheManager {
       long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
       long expiryTime = validateExpiryTime(info, pool.getMaxRelativeExpiryMs());
       // Do quota validation if required
       // Do quota validation if required
       if (!flags.contains(CacheFlag.FORCE)) {
       if (!flags.contains(CacheFlag.FORCE)) {
-        // Can't kick and wait if caching is disabled
-        if (monitor != null) {
-          monitor.waitForRescan();
-        }
         checkLimit(pool, path, replication);
         checkLimit(pool, path, replication);
       }
       }
       // All validation passed
       // All validation passed
@@ -622,9 +598,7 @@ public final class CacheManager {
       validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs());
       validateExpiryTime(infoWithDefaults, destPool.getMaxRelativeExpiryMs());
 
 
       // Indicate changes to the CRM
       // Indicate changes to the CRM
-      if (monitor != null) {
-        monitor.setNeedsRescan();
-      }
+      setNeedsRescan();
 
 
       // Validation passed
       // Validation passed
       removeInternal(prevEntry);
       removeInternal(prevEntry);
@@ -659,9 +633,7 @@ public final class CacheManager {
     pool.getDirectiveList().remove(directive);
     pool.getDirectiveList().remove(directive);
     assert directive.getPool() == null;
     assert directive.getPool() == null;
 
 
-    if (monitor != null) {
-      monitor.setNeedsRescan();
-    }
+    setNeedsRescan();
   }
   }
 
 
   public void removeDirective(long id, FSPermissionChecker pc)
   public void removeDirective(long id, FSPermissionChecker pc)
@@ -694,9 +666,6 @@ public final class CacheManager {
     if (filter.getReplication() != null) {
     if (filter.getReplication() != null) {
       throw new IOException("Filtering by replication is unsupported.");
       throw new IOException("Filtering by replication is unsupported.");
     }
     }
-    if (monitor != null) {
-      monitor.waitForRescanIfNeeded();
-    }
     ArrayList<CacheDirectiveEntry> replies =
     ArrayList<CacheDirectiveEntry> replies =
         new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
         new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
     int numReplies = 0;
     int numReplies = 0;
@@ -805,9 +774,7 @@ public final class CacheManager {
         bld.append(prefix).append("set limit to " + info.getLimit());
         bld.append(prefix).append("set limit to " + info.getLimit());
         prefix = "; ";
         prefix = "; ";
         // New limit changes stats, need to set needs refresh
         // New limit changes stats, need to set needs refresh
-        if (monitor != null) {
-          monitor.setNeedsRescan();
-        }
+        setNeedsRescan();
       }
       }
       if (info.getMaxRelativeExpiryMs() != null) {
       if (info.getMaxRelativeExpiryMs() != null) {
         final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
         final Long maxRelativeExpiry = info.getMaxRelativeExpiryMs();
@@ -853,9 +820,7 @@ public final class CacheManager {
         directivesById.remove(directive.getId());
         directivesById.remove(directive.getId());
         iter.remove();
         iter.remove();
       }
       }
-      if (monitor != null) {
-        monitor.setNeedsRescan();
-      }
+      setNeedsRescan();
     } catch (IOException e) {
     } catch (IOException e) {
       LOG.info("removeCachePool of " + poolName + " failed: ", e);
       LOG.info("removeCachePool of " + poolName + " failed: ", e);
       throw e;
       throw e;
@@ -866,9 +831,6 @@ public final class CacheManager {
   public BatchedListEntries<CachePoolEntry>
   public BatchedListEntries<CachePoolEntry>
       listCachePools(FSPermissionChecker pc, String prevKey) {
       listCachePools(FSPermissionChecker pc, String prevKey) {
     assert namesystem.hasReadLock();
     assert namesystem.hasReadLock();
-    if (monitor != null) {
-      monitor.waitForRescanIfNeeded();
-    }
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
     ArrayList<CachePoolEntry> results = 
     ArrayList<CachePoolEntry> results = 
         new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
         new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -884,9 +846,6 @@ public final class CacheManager {
   }
   }
 
 
   public void setCachedLocations(LocatedBlock block) {
   public void setCachedLocations(LocatedBlock block) {
-    if (!enabled) {
-      return;
-    }
     CachedBlock cachedBlock =
     CachedBlock cachedBlock =
         new CachedBlock(block.getBlock().getBlockId(),
         new CachedBlock(block.getBlock().getBlockId(),
             (short)0, false);
             (short)0, false);
@@ -902,12 +861,6 @@ public final class CacheManager {
 
 
   public final void processCacheReport(final DatanodeID datanodeID,
   public final void processCacheReport(final DatanodeID datanodeID,
       final List<Long> blockIds) throws IOException {
       final List<Long> blockIds) throws IOException {
-    if (!enabled) {
-      LOG.info("Ignoring cache report from " + datanodeID +
-          " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " +
-          "number of blocks: " + blockIds.size());
-      return;
-    }
     namesystem.writeLock();
     namesystem.writeLock();
     final long startTime = Time.monotonicNow();
     final long startTime = Time.monotonicNow();
     final long endTime;
     final long endTime;
@@ -1085,4 +1038,36 @@ public final class CacheManager {
     }
     }
     prog.endStep(Phase.LOADING_FSIMAGE, step);
     prog.endStep(Phase.LOADING_FSIMAGE, step);
   }
   }
+
+  public void waitForRescanIfNeeded() {
+    crmLock.lock();
+    try {
+      if (monitor != null) {
+        monitor.waitForRescanIfNeeded();
+      }
+    } finally {
+      crmLock.unlock();
+    }
+  }
+
+  private void setNeedsRescan() {
+    crmLock.lock();
+    try {
+      if (monitor != null) {
+        monitor.setNeedsRescan();
+      }
+    } finally {
+      crmLock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  public Thread getCacheReplicationMonitor() {
+    crmLock.lock();
+    try {
+      return monitor;
+    } finally {
+      crmLock.unlock();
+    }
+  }
 }
 }

+ 12 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -929,7 +929,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     writeLock();
     writeLock();
     try {
     try {
       if (blockManager != null) blockManager.close();
       if (blockManager != null) blockManager.close();
-      cacheManager.deactivate();
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
     }
     }
@@ -999,7 +998,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
           editLogRollerThreshold, editLogRollerInterval));
           editLogRollerThreshold, editLogRollerInterval));
       nnEditLogRoller.start();
       nnEditLogRoller.start();
 
 
-      cacheManager.activate();
+      cacheManager.startMonitorThread();
       blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
       blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
@@ -1050,7 +1049,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
         // so that the tailer starts from the right spot.
         // so that the tailer starts from the right spot.
         dir.fsImage.updateLastAppliedTxIdFromWritten();
         dir.fsImage.updateLastAppliedTxIdFromWritten();
       }
       }
-      cacheManager.deactivate();
+      cacheManager.stopMonitorThread();
+      cacheManager.clearDirectiveStats();
+      blockManager.getDatanodeManager().clearPendingCachingCommands();
       blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
       blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
     } finally {
     } finally {
       writeUnlock();
       writeUnlock();
@@ -7064,6 +7065,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
       return (Long) cacheEntry.getPayload();
       return (Long) cacheEntry.getPayload();
     }
     }
     boolean success = false;
     boolean success = false;
+    if (!flags.contains(CacheFlag.FORCE)) {
+      cacheManager.waitForRescanIfNeeded();
+    }
     writeLock();
     writeLock();
     Long result = null;
     Long result = null;
     try {
     try {
@@ -7105,6 +7109,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     if (cacheEntry != null && cacheEntry.isSuccess()) {
     if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
       return;
     }
     }
+    if (!flags.contains(CacheFlag.FORCE)) {
+      cacheManager.waitForRescanIfNeeded();
+    }
     writeLock();
     writeLock();
     try {
     try {
       checkOperation(OperationCategory.WRITE);
       checkOperation(OperationCategory.WRITE);
@@ -7164,6 +7171,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     final FSPermissionChecker pc = isPermissionEnabled ?
     final FSPermissionChecker pc = isPermissionEnabled ?
         getPermissionChecker() : null;
         getPermissionChecker() : null;
     BatchedListEntries<CacheDirectiveEntry> results;
     BatchedListEntries<CacheDirectiveEntry> results;
+    cacheManager.waitForRescanIfNeeded();
     readLock();
     readLock();
     boolean success = false;
     boolean success = false;
     try {
     try {
@@ -7287,6 +7295,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
     BatchedListEntries<CachePoolEntry> results;
     BatchedListEntries<CachePoolEntry> results;
     checkOperation(OperationCategory.READ);
     checkOperation(OperationCategory.READ);
     boolean success = false;
     boolean success = false;
+    cacheManager.waitForRescanIfNeeded();
     readLock();
     readLock();
     try {
     try {
       checkOperation(OperationCategory.READ);
       checkOperation(OperationCategory.READ);

+ 6 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -1476,13 +1476,13 @@
 </property>
 </property>
 
 
 <property>
 <property>
-  <name>dfs.namenode.caching.enabled</name>
-  <value>false</value>
+  <name>dfs.namenode.path.based.cache.block.map.allocation.percent</name>
+  <value>0.25</value>
   <description>
   <description>
-    Set to true to enable block caching.  This flag enables the NameNode to
-    maintain a mapping of cached blocks to DataNodes via processing DataNode
-    cache reports.  Based on these reports and addition and removal of caching
-    directives, the NameNode will schedule caching and uncaching work.
+    The percentage of the Java heap which we will allocate to the cached blocks
+    map.  The cached blocks map is a hash map which uses chained hashing.
+    Smaller maps may be accessed more slowly if the number of cached blocks is
+    large; larger maps will consume more memory.
   </description>
   </description>
 </property>
 </property>
 
 

+ 7 - 6
hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm

@@ -242,12 +242,6 @@ Centralized Cache Management in HDFS
 
 
   Be sure to configure the following:
   Be sure to configure the following:
 
 
-  * dfs.namenode.caching.enabled
-
-    This must be set to true to enable caching. If this is false, the NameNode
-    will ignore cache reports, and will not ask DataNodes to cache
-    blocks.
-
   * dfs.datanode.max.locked.memory
   * dfs.datanode.max.locked.memory
 
 
     The DataNode will treat this as the maximum amount of memory it can use for
     The DataNode will treat this as the maximum amount of memory it can use for
@@ -281,6 +275,13 @@ Centralized Cache Management in HDFS
 
 
     By default, this parameter is set to 10000, which is 10 seconds.
     By default, this parameter is set to 10000, which is 10 seconds.
 
 
+  * dfs.namenode.path.based.cache.block.map.allocation.percent
+
+    The percentage of the Java heap which we will allocate to the cached blocks
+    map.  The cached blocks map is a hash map which uses chained hashing.
+    Smaller maps may be accessed more slowly if the number of cached blocks is
+    large; larger maps will consume more memory.  The default is 0.25 percent.
+
 ** {OS Limits}
 ** {OS Limits}
 
 
   If you get the error "Cannot start datanode because the configured max
   If you get the error "Cannot start datanode because the configured max

+ 0 - 2
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java

@@ -109,14 +109,12 @@ public class TestFsDatasetCache {
   public void setUp() throws Exception {
   public void setUp() throws Exception {
     assumeTrue(!Path.WINDOWS);
     assumeTrue(!Path.WINDOWS);
     conf = new HdfsConfiguration();
     conf = new HdfsConfiguration();
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
     conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
         500);
         500);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
     conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         CACHE_CAPACITY);
         CACHE_CAPACITY);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY, true);
 
 
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
     prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
     NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
     NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());

+ 0 - 51
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java

@@ -21,7 +21,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER;
 import static org.apache.hadoop.hdfs.protocol.CachePoolInfo.RELATIVE_EXPIRY_NEVER;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
@@ -118,7 +117,6 @@ public class TestCacheDirectives {
     conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
     conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
     conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
-    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
     conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
     conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
     conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
     conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
     // set low limits here for testing purposes
     // set low limits here for testing purposes
@@ -867,55 +865,6 @@ public class TestCacheDirectives {
     }
     }
   }
   }
 
 
-  @Test(timeout=120000)
-  public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
-      throws Exception {
-    cluster.shutdown();
-    HdfsConfiguration conf = createCachingConf();
-    conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
-    MiniDFSCluster cluster =
-      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
-
-    try {
-      cluster.waitActive();
-      DistributedFileSystem dfs = cluster.getFileSystem();
-      NameNode namenode = cluster.getNameNode();
-      // Create the pool
-      String pool = "pool1";
-      namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
-      // Create some test files
-      final int numFiles = 2;
-      final int numBlocksPerFile = 2;
-      final List<String> paths = new ArrayList<String>(numFiles);
-      for (int i=0; i<numFiles; i++) {
-        Path p = new Path("/testCachePaths-" + i);
-        FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
-            (int)BLOCK_SIZE);
-        paths.add(p.toUri().getPath());
-      }
-      // Check the initial statistics at the namenode
-      waitForCachedBlocks(namenode, 0, 0,
-          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:0");
-      // Cache and check each path in sequence
-      int expected = 0;
-      for (int i=0; i<numFiles; i++) {
-        CacheDirectiveInfo directive =
-            new CacheDirectiveInfo.Builder().
-              setPath(new Path(paths.get(i))).
-              setPool(pool).
-              build();
-        dfs.addCacheDirective(directive);
-        waitForCachedBlocks(namenode, expected, 0,
-          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:1");
-      }
-      Thread.sleep(20000);
-      waitForCachedBlocks(namenode, expected, 0,
-          "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
-    } finally {
-      cluster.shutdown();
-    }
-  }
-
   @Test(timeout=120000)
   @Test(timeout=120000)
   public void testWaitForCachedReplicasInDirectory() throws Exception {
   public void testWaitForCachedReplicasInDirectory() throws Exception {
     // Create the pool
     // Create the pool

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestHAStateTransitions.java

@@ -27,6 +27,7 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
+import java.util.LinkedList;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
@@ -59,6 +60,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.Mockito;
 
 
+import com.google.common.util.concurrent.Uninterruptibles;
+
 /**
 /**
  * Tests state transition from active->standby, and manual failover
  * Tests state transition from active->standby, and manual failover
  * and failback between two namenodes.
  * and failback between two namenodes.
@@ -124,6 +127,17 @@ public class TestHAStateTransitions {
     }
     }
   }
   }
 
 
+  private void addCrmThreads(MiniDFSCluster cluster,
+      LinkedList<Thread> crmThreads) {
+    for (int nn = 0; nn <= 1; nn++) {
+      Thread thread = cluster.getNameNode(nn).getNamesystem().
+          getCacheManager().getCacheReplicationMonitor();
+      if (thread != null) {
+        crmThreads.add(thread);
+      }
+    }
+  }
+
   /**
   /**
    * Test that transitioning a service to the state that it is already
    * Test that transitioning a service to the state that it is already
    * in is a nop, specifically, an exception is not thrown.
    * in is a nop, specifically, an exception is not thrown.
@@ -131,19 +145,30 @@ public class TestHAStateTransitions {
   @Test
   @Test
   public void testTransitionToCurrentStateIsANop() throws Exception {
   public void testTransitionToCurrentStateIsANop() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1L);
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
     MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
       .nnTopology(MiniDFSNNTopology.simpleHATopology())
       .nnTopology(MiniDFSNNTopology.simpleHATopology())
       .numDataNodes(1)
       .numDataNodes(1)
       .build();
       .build();
+    LinkedList<Thread> crmThreads = new LinkedList<Thread>();
     try {
     try {
       cluster.waitActive();
       cluster.waitActive();
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToActive(0);
       cluster.transitionToActive(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToActive(0);
       cluster.transitionToActive(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToStandby(0);
       cluster.transitionToStandby(0);
+      addCrmThreads(cluster, crmThreads);
       cluster.transitionToStandby(0);
       cluster.transitionToStandby(0);
+      addCrmThreads(cluster, crmThreads);
     } finally {
     } finally {
       cluster.shutdown();
       cluster.shutdown();
     }
     }
+    // Verify that all cacheReplicationMonitor threads shut down
+    for (Thread thread : crmThreads) {
+      Uninterruptibles.joinUninterruptibly(thread);
+    }
   }
   }
 
 
   /**
   /**