Ver código fonte

HDFS-7411. Change decommission logic to throttle by blocks rather
than nodes in each interval. Contributed by Andrew Wang

Chris Douglas 10 anos atrás
pai
commit
3bc4f3502a
13 arquivos alterados com 996 adições e 374 exclusões
  1. 3 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 4 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  3. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java
  4. 20 101
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  5. 30 79
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  6. 568 51
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
  7. 19 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  8. 307 105
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
  9. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java
  11. 37 22
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
  12. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
  13. 1 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

+ 3 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -414,6 +414,9 @@ Release 2.7.0 - UNRELEASED
 
     HDFS-7855. Separate class Packet from DFSOutputStream. (Li Bo bia jing9)
 
+    HDFS-7411. Change decommission logic to throttle by blocks rather than
+    nodes in each interval. (Andrew Wang via cdouglas)
+
   OPTIMIZATIONS
 
     HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

+ 4 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -455,8 +455,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
   public static final long    DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 30000L;
   public static final String  DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
   public static final int     DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
-  public static final String  DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";
-  public static final int     DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT = 5;
+  public static final String  DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY = "dfs.namenode.decommission.blocks.per.interval";
+  public static final int     DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT = 500000;
+  public static final String  DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES = "dfs.namenode.decommission.max.concurrent.tracked.nodes";
+  public static final int     DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT = 100;
   public static final String  DFS_NAMENODE_HANDLER_COUNT_KEY = "dfs.namenode.handler.count";
   public static final int     DFS_NAMENODE_HANDLER_COUNT_DEFAULT = 10;
   public static final String  DFS_NAMENODE_SERVICE_HANDLER_COUNT_KEY = "dfs.namenode.service.handler.count";

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java

@@ -139,7 +139,7 @@ public class HdfsConfiguration extends Configuration {
       new DeprecationDelta("dfs.federation.nameservice.id",
         DFSConfigKeys.DFS_NAMESERVICE_ID),
       new DeprecationDelta("dfs.client.file-block-storage-locations.timeout",
-        DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS)
+        DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_MS),
     });
   }
 

+ 20 - 101
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -3191,28 +3191,6 @@ public class BlockManager {
     }
     return live;
   }
-
-  private void logBlockReplicationInfo(Block block, DatanodeDescriptor srcNode,
-      NumberReplicas num) {
-    int curReplicas = num.liveReplicas();
-    int curExpectedReplicas = getReplication(block);
-    BlockCollection bc = blocksMap.getBlockCollection(block);
-    StringBuilder nodeList = new StringBuilder();
-    for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
-      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
-      nodeList.append(node);
-      nodeList.append(" ");
-    }
-    LOG.info("Block: " + block + ", Expected Replicas: "
-        + curExpectedReplicas + ", live replicas: " + curReplicas
-        + ", corrupt replicas: " + num.corruptReplicas()
-        + ", decommissioned replicas: " + num.decommissionedReplicas()
-        + ", excess replicas: " + num.excessReplicas()
-        + ", Is Open File: " + bc.isUnderConstruction()
-        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
-        + srcNode + ", Is current datanode decommissioning: "
-        + srcNode.isDecommissionInProgress());
-  }
   
   /**
    * On stopping decommission, check if the node has excess replicas.
@@ -3243,89 +3221,30 @@ public class BlockManager {
   }
 
   /**
-   * Return true if there are any blocks on this node that have not
-   * yet reached their replication factor. Otherwise returns false.
+   * Returns whether a node can be safely decommissioned based on its 
+   * liveness. Dead nodes cannot always be safely decommissioned.
    */
-  boolean isReplicationInProgress(DatanodeDescriptor srcNode) {
-    boolean status = false;
-    boolean firstReplicationLog = true;
-    int underReplicatedBlocks = 0;
-    int decommissionOnlyReplicas = 0;
-    int underReplicatedInOpenFiles = 0;
-    final Iterator<? extends Block> it = srcNode.getBlockIterator();
-    while(it.hasNext()) {
-      final Block block = it.next();
-      BlockCollection bc = blocksMap.getBlockCollection(block);
-
-      if (bc != null) {
-        NumberReplicas num = countNodes(block);
-        int curReplicas = num.liveReplicas();
-        int curExpectedReplicas = getReplication(block);
-                
-        if (isNeededReplication(block, curExpectedReplicas, curReplicas)) {
-          if (curExpectedReplicas > curReplicas) {
-            if (bc.isUnderConstruction()) {
-              if (block.equals(bc.getLastBlock()) && curReplicas > minReplication) {
-                continue;
-              }
-              underReplicatedInOpenFiles++;
-            }
-            
-            // Log info about one block for this node which needs replication
-            if (!status) {
-              status = true;
-              if (firstReplicationLog) {
-                logBlockReplicationInfo(block, srcNode, num);
-              }
-              // Allowing decommission as long as default replication is met
-              if (curReplicas >= defaultReplication) {
-                status = false;
-                firstReplicationLog = false;
-              }
-            }
-            underReplicatedBlocks++;
-            if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
-              decommissionOnlyReplicas++;
-            }
-          }
-          if (!neededReplications.contains(block) &&
-            pendingReplications.getNumReplicas(block) == 0 &&
-            namesystem.isPopulatingReplQueues()) {
-            //
-            // These blocks have been reported from the datanode
-            // after the startDecommission method has been executed. These
-            // blocks were in flight when the decommissioning was started.
-            // Process these blocks only when active NN is out of safe mode.
-            //
-            neededReplications.add(block,
-                                   curReplicas,
-                                   num.decommissionedReplicas(),
-                                   curExpectedReplicas);
-          }
-        }
-      }
+  boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
+    if (node.isAlive) {
+      return true;
     }
 
-    if (!status && !srcNode.isAlive) {
-      updateState();
-      if (pendingReplicationBlocksCount == 0 &&
-          underReplicatedBlocksCount == 0) {
-        LOG.info("srcNode {} is dead and there are no under-replicated" +
-            " blocks or blocks pending replication. Marking as " +
-            "decommissioned.");
-      } else {
-        LOG.warn("srcNode " + srcNode + " is dead " +
-            "while decommission is in progress. Continuing to mark " +
-            "it as decommission in progress so when it rejoins the " +
-            "cluster it can continue the decommission process.");
-        status = true;
-      }
+    updateState();
+    if (pendingReplicationBlocksCount == 0 &&
+        underReplicatedBlocksCount == 0) {
+      LOG.info("Node {} is dead and there are no under-replicated" +
+          " blocks or blocks pending replication. Safe to decommission.", 
+          node);
+      return true;
     }
 
-    srcNode.decommissioningStatus.set(underReplicatedBlocks,
-        decommissionOnlyReplicas, 
-        underReplicatedInOpenFiles);
-    return status;
+    LOG.warn("Node {} is dead " +
+        "while decommission is in progress. Cannot be safely " +
+        "decommissioned since there is risk of reduced " +
+        "data durability or data loss. Either restart the failed node or" +
+        " force decommissioning by removing, calling refreshNodes, " +
+        "then re-adding to the excludes files.", node);
+    return false;
   }
 
   public int getActiveBlockCount() {
@@ -3496,7 +3415,7 @@ public class BlockManager {
    * A block needs replication if the number of replicas is less than expected
    * or if it does not have enough racks.
    */
-  private boolean isNeededReplication(Block b, int expected, int current) {
+  boolean isNeededReplication(Block b, int expected, int current) {
     return current < expected || !blockHasEnoughRacks(b);
   }
   

+ 30 - 79
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.util.CyclicIteration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
-import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
@@ -53,8 +52,6 @@ import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.util.*;
 
-import static org.apache.hadoop.util.Time.now;
-
 /**
  * Manage datanodes, include decommission and other activities.
  */
@@ -65,9 +62,9 @@ public class DatanodeManager {
 
   private final Namesystem namesystem;
   private final BlockManager blockManager;
+  private final DecommissionManager decomManager;
   private final HeartbeatManager heartbeatManager;
   private final FSClusterStats fsClusterStats;
-  private Daemon decommissionthread = null;
 
   /**
    * Stores the datanode -> block map.  
@@ -110,7 +107,7 @@ public class DatanodeManager {
   private final HostFileManager hostFileManager = new HostFileManager();
 
   /** The period to wait for datanode heartbeat.*/
-  private final long heartbeatExpireInterval;
+  private long heartbeatExpireInterval;
   /** Ask Datanode only up to this many blocks to delete. */
   final int blockInvalidateLimit;
 
@@ -182,6 +179,8 @@ public class DatanodeManager {
     this.blockManager = blockManager;
     
     this.heartbeatManager = new HeartbeatManager(namesystem, blockManager, conf);
+    this.decomManager = new DecommissionManager(namesystem, blockManager,
+        heartbeatManager);
     this.fsClusterStats = newFSClusterStats();
 
     networktopology = NetworkTopology.getInstance(conf);
@@ -307,25 +306,12 @@ public class DatanodeManager {
   }
   
   void activate(final Configuration conf) {
-    final DecommissionManager dm = new DecommissionManager(namesystem, blockManager);
-    this.decommissionthread = new Daemon(dm.new Monitor(
-        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 
-                    DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT),
-        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY, 
-                    DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_DEFAULT)));
-    decommissionthread.start();
-
+    decomManager.activate(conf);
     heartbeatManager.activate(conf);
   }
 
   void close() {
-    if (decommissionthread != null) {
-      decommissionthread.interrupt();
-      try {
-        decommissionthread.join(3000);
-      } catch (InterruptedException e) {
-      }
-    }
+    decomManager.close();
     heartbeatManager.close();
   }
 
@@ -339,6 +325,20 @@ public class DatanodeManager {
     return heartbeatManager;
   }
 
+  @VisibleForTesting
+  public DecommissionManager getDecomManager() {
+    return decomManager;
+  }
+
+  HostFileManager getHostFileManager() {
+    return hostFileManager;
+  }
+
+  @VisibleForTesting
+  public void setHeartbeatExpireInterval(long expiryMs) {
+    this.heartbeatExpireInterval = expiryMs;
+  }
+
   @VisibleForTesting
   public FSClusterStats getFSClusterStats() {
     return fsClusterStats;
@@ -825,63 +825,14 @@ public class DatanodeManager {
   }
 
   /**
-   * Decommission the node if it is in exclude list.
+   * Decommission the node if it is in the host exclude list.
+   *
+   * @param nodeReg datanode
    */
-  private void checkDecommissioning(DatanodeDescriptor nodeReg) { 
+  void startDecommissioningIfExcluded(DatanodeDescriptor nodeReg) {
     // If the registered node is in exclude list, then decommission it
-    if (hostFileManager.isExcluded(nodeReg)) {
-      startDecommission(nodeReg);
-    }
-  }
-
-  /**
-   * Change, if appropriate, the admin state of a datanode to 
-   * decommission completed. Return true if decommission is complete.
-   */
-  boolean checkDecommissionState(DatanodeDescriptor node) {
-    // Check to see if all blocks in this decommissioned
-    // node has reached their target replication factor.
-    if (node.isDecommissionInProgress() && node.checkBlockReportReceived()) {
-      if (!blockManager.isReplicationInProgress(node)) {
-        node.setDecommissioned();
-        LOG.info("Decommission complete for " + node);
-      }
-    }
-    return node.isDecommissioned();
-  }
-
-  /** Start decommissioning the specified datanode. */
-  @InterfaceAudience.Private
-  @VisibleForTesting
-  public void startDecommission(DatanodeDescriptor node) {
-    if (!node.isDecommissionInProgress()) {
-      if (!node.isAlive) {
-        LOG.info("Dead node " + node + " is decommissioned immediately.");
-        node.setDecommissioned();
-      } else if (!node.isDecommissioned()) {
-        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
-          LOG.info("Start Decommissioning " + node + " " + storage
-              + " with " + storage.numBlocks() + " blocks");
-        }
-        heartbeatManager.startDecommission(node);
-        node.decommissioningStatus.setStartTime(now());
-
-        // all the blocks that reside on this node have to be replicated.
-        checkDecommissionState(node);
-      }
-    }
-  }
-
-  /** Stop decommissioning the specified datanodes. */
-  void stopDecommission(DatanodeDescriptor node) {
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      LOG.info("Stop Decommissioning " + node);
-      heartbeatManager.stopDecommission(node);
-      // Over-replicated blocks will be detected and processed when 
-      // the dead node comes back and send in its full block report.
-      if (node.isAlive) {
-        blockManager.processOverReplicatedBlocksOnReCommission(node);
-      }
+    if (getHostFileManager().isExcluded(nodeReg)) {
+      decomManager.startDecommission(nodeReg);
     }
   }
 
@@ -992,7 +943,7 @@ public class DatanodeManager {
           // also treat the registration message as a heartbeat
           heartbeatManager.register(nodeS);
           incrementVersionCount(nodeS.getSoftwareVersion());
-          checkDecommissioning(nodeS);
+          startDecommissioningIfExcluded(nodeS);
           success = true;
         } finally {
           if (!success) {
@@ -1028,7 +979,7 @@ public class DatanodeManager {
         // because its is done when the descriptor is created
         heartbeatManager.addDatanode(nodeDescr);
         incrementVersionCount(nodeReg.getSoftwareVersion());
-        checkDecommissioning(nodeDescr);
+        startDecommissioningIfExcluded(nodeDescr);
         success = true;
       } finally {
         if (!success) {
@@ -1091,9 +1042,9 @@ public class DatanodeManager {
         node.setDisallowed(true); // case 2.
       } else {
         if (hostFileManager.isExcluded(node)) {
-          startDecommission(node); // case 3.
+          decomManager.startDecommission(node); // case 3.
         } else {
-          stopDecommission(node); // case 4.
+          decomManager.stopDecommission(node); // case 4.
         }
       }
     }

+ 568 - 51
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java

@@ -17,88 +17,605 @@
  */
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
+import java.util.AbstractList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
+import java.util.Queue;
+import java.util.TreeMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.util.CyclicIteration;
+import org.apache.hadoop.util.ChunkedArrayList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.apache.hadoop.util.Time.now;
 
 /**
- * Manage node decommissioning.
+ * Manages datanode decommissioning. A background monitor thread 
+ * periodically checks the status of datanodes that are in-progress of 
+ * decommissioning.
+ * <p/>
+ * A datanode can be decommissioned in a few situations:
+ * <ul>
+ * <li>If a DN is dead, it is decommissioned immediately.</li>
+ * <li>If a DN is alive, it is decommissioned after all of its blocks 
+ * are sufficiently replicated. Merely under-replicated blocks do not 
+ * block decommissioning as long as they are above a replication 
+ * threshold.</li>
+ * </ul>
+ * In the second case, the datanode transitions to a 
+ * decommission-in-progress state and is tracked by the monitor thread. The 
+ * monitor periodically scans through the list of insufficiently replicated
+ * blocks on these datanodes to 
+ * determine if they can be decommissioned. The monitor also prunes this list 
+ * as blocks become replicated, so monitor scans will become more efficient 
+ * over time.
+ * <p/>
+ * Decommission-in-progress nodes that become dead do not progress to 
+ * decommissioned until they become live again. This prevents potential 
+ * durability loss for singly-replicated blocks (see HDFS-6791).
+ * <p/>
+ * This class depends on the FSNamesystem lock for synchronization.
  */
 @InterfaceAudience.Private
-@InterfaceStability.Evolving
-class DecommissionManager {
-  static final Log LOG = LogFactory.getLog(DecommissionManager.class);
+public class DecommissionManager {
+  private static final Logger LOG = LoggerFactory.getLogger(DecommissionManager
+      .class);
 
   private final Namesystem namesystem;
-  private final BlockManager blockmanager;
+  private final BlockManager blockManager;
+  private final HeartbeatManager hbManager;
+  private final ScheduledExecutorService executor;
+
+  /**
+   * Map containing the decommission-in-progress datanodes that are being
+   * tracked so they can be be marked as decommissioned.
+   * <p/>
+   * This holds a set of references to the under-replicated blocks on the DN at
+   * the time the DN is added to the map, i.e. the blocks that are preventing
+   * the node from being marked as decommissioned. During a monitor tick, this
+   * list is pruned as blocks becomes replicated.
+   * <p/>
+   * Note also that the reference to the list of under-replicated blocks 
+   * will be null on initial add
+   * <p/>
+   * However, this map can become out-of-date since it is not updated by block
+   * reports or other events. Before being finally marking as decommissioned,
+   * another check is done with the actual block map.
+   */
+  private final TreeMap<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
+      decomNodeBlocks;
+
+  /**
+   * Tracking a node in decomNodeBlocks consumes additional memory. To limit
+   * the impact on NN memory consumption, we limit the number of nodes in 
+   * decomNodeBlocks. Additional nodes wait in pendingNodes.
+   */
+  private final Queue<DatanodeDescriptor> pendingNodes;
+
+  private Monitor monitor = null;
 
   DecommissionManager(final Namesystem namesystem,
-      final BlockManager blockmanager) {
+      final BlockManager blockManager, final HeartbeatManager hbManager) {
     this.namesystem = namesystem;
-    this.blockmanager = blockmanager;
+    this.blockManager = blockManager;
+    this.hbManager = hbManager;
+
+    executor = Executors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setNameFormat("DecommissionMonitor-%d")
+            .setDaemon(true).build());
+    decomNodeBlocks = new TreeMap<>();
+    pendingNodes = new LinkedList<>();
+  }
+
+  /**
+   * Start the decommission monitor thread.
+   * @param conf
+   */
+  void activate(Configuration conf) {
+    final int intervalSecs =
+        conf.getInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+            DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT);
+    checkArgument(intervalSecs >= 0, "Cannot set a negative " +
+        "value for " + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY);
+
+    // By default, the new configuration key overrides the deprecated one.
+    // No # node limit is set.
+    int blocksPerInterval = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_DEFAULT);
+    int nodesPerInterval = Integer.MAX_VALUE;
+
+    // If the expected key isn't present and the deprecated one is, 
+    // use the deprecated one into the new one. This overrides the 
+    // default.
+    //
+    // Also print a deprecation warning.
+    final String deprecatedKey =
+        "dfs.namenode.decommission.nodes.per.interval";
+    final String strNodes = conf.get(deprecatedKey);
+    if (strNodes != null) {
+      nodesPerInterval = Integer.parseInt(strNodes);
+      blocksPerInterval = Integer.MAX_VALUE;
+      LOG.warn("Using deprecated configuration key {} value of {}.",
+          deprecatedKey, nodesPerInterval); 
+      LOG.warn("Please update your configuration to use {} instead.", 
+          DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+    }
+    checkArgument(blocksPerInterval > 0,
+        "Must set a positive value for "
+        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY);
+
+    final int maxConcurrentTrackedNodes = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES_DEFAULT);
+    checkArgument(maxConcurrentTrackedNodes >= 0, "Cannot set a negative " +
+        "value for "
+        + DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES);
+
+    monitor = new Monitor(blocksPerInterval, 
+        nodesPerInterval, maxConcurrentTrackedNodes);
+    executor.scheduleAtFixedRate(monitor, intervalSecs, intervalSecs,
+        TimeUnit.SECONDS);
+
+    LOG.debug("Activating DecommissionManager with interval {} seconds, " +
+            "{} max blocks per interval, {} max nodes per interval, " +
+            "{} max concurrently tracked nodes.", intervalSecs,
+        blocksPerInterval, nodesPerInterval, maxConcurrentTrackedNodes);
+  }
+
+  /**
+   * Stop the decommission monitor thread, waiting briefly for it to terminate.
+   */
+  void close() {
+    executor.shutdownNow();
+    try {
+      executor.awaitTermination(3000, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {}
+  }
+
+  /**
+   * Start decommissioning the specified datanode. 
+   * @param node
+   */
+  @VisibleForTesting
+  public void startDecommission(DatanodeDescriptor node) {
+    if (!node.isDecommissionInProgress()) {
+      if (!node.isAlive) {
+        LOG.info("Dead node {} is decommissioned immediately.", node);
+        node.setDecommissioned();
+      } else if (!node.isDecommissioned()) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          LOG.info("Starting decommission of {} {} with {} blocks", 
+              node, storage, storage.numBlocks());
+        }
+        // Update DN stats maintained by HeartbeatManager
+        hbManager.startDecommission(node);
+        node.decommissioningStatus.setStartTime(now());
+        pendingNodes.add(node);
+      }
+    } else {
+      LOG.trace("startDecommission: Node {} is already decommission in "
+              + "progress, nothing to do.", node);
+    }
+  }
+
+  /**
+   * Stop decommissioning the specified datanode. 
+   * @param node
+   */
+  void stopDecommission(DatanodeDescriptor node) {
+    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      LOG.info("Stopping decommissioning of node {}", node);
+      // Update DN stats maintained by HeartbeatManager
+      hbManager.stopDecommission(node);
+      // Over-replicated blocks will be detected and processed when 
+      // the dead node comes back and send in its full block report.
+      if (node.isAlive) {
+        blockManager.processOverReplicatedBlocksOnReCommission(node);
+      }
+      // Remove from tracking in DecommissionManager
+      pendingNodes.remove(node);
+      decomNodeBlocks.remove(node);
+    } else {
+      LOG.trace("stopDecommission: Node {} is not decommission in progress " +
+          "or decommissioned, nothing to do.", node);
+    }
+  }
+
+  private void setDecommissioned(DatanodeDescriptor dn) {
+    dn.setDecommissioned();
+    LOG.info("Decommissioning complete for node {}", dn);
   }
 
-  /** Periodically check decommission status. */
-  class Monitor implements Runnable {
-    /** recheckInterval is how often namenode checks
-     *  if a node has finished decommission
+  /**
+   * Checks whether a block is sufficiently replicated for decommissioning.
+   * Full-strength replication is not always necessary, hence "sufficient".
+   * @return true if sufficient, else false.
+   */
+  private boolean isSufficientlyReplicated(BlockInfoContiguous block, 
+      BlockCollection bc,
+      NumberReplicas numberReplicas) {
+    final int numExpected = bc.getBlockReplication();
+    final int numLive = numberReplicas.liveReplicas();
+    if (!blockManager.isNeededReplication(block, numExpected, numLive)) {
+      // Block doesn't need replication. Skip.
+      LOG.trace("Block {} does not need replication.", block);
+      return true;
+    }
+
+    // Block is under-replicated
+    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, 
+        numLive);
+    if (numExpected > numLive) {
+      if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
+        // Can decom a UC block as long as there will still be minReplicas
+        if (numLive >= blockManager.minReplication) {
+          LOG.trace("UC block {} sufficiently-replicated since numLive ({}) "
+              + ">= minR ({})", block, numLive, blockManager.minReplication);
+          return true;
+        } else {
+          LOG.trace("UC block {} insufficiently-replicated since numLive "
+              + "({}) < minR ({})", block, numLive,
+              blockManager.minReplication);
+        }
+      } else {
+        // Can decom a non-UC as long as the default replication is met
+        if (numLive >= blockManager.defaultReplication) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  private static void logBlockReplicationInfo(Block block, BlockCollection bc,
+      DatanodeDescriptor srcNode, NumberReplicas num,
+      Iterable<DatanodeStorageInfo> storages) {
+    int curReplicas = num.liveReplicas();
+    int curExpectedReplicas = bc.getBlockReplication();
+    StringBuilder nodeList = new StringBuilder();
+    for (DatanodeStorageInfo storage : storages) {
+      final DatanodeDescriptor node = storage.getDatanodeDescriptor();
+      nodeList.append(node);
+      nodeList.append(" ");
+    }
+    LOG.info("Block: " + block + ", Expected Replicas: "
+        + curExpectedReplicas + ", live replicas: " + curReplicas
+        + ", corrupt replicas: " + num.corruptReplicas()
+        + ", decommissioned replicas: " + num.decommissionedReplicas()
+        + ", excess replicas: " + num.excessReplicas()
+        + ", Is Open File: " + bc.isUnderConstruction()
+        + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
+        + srcNode + ", Is current datanode decommissioning: "
+        + srcNode.isDecommissionInProgress());
+  }
+
+  @VisibleForTesting
+  public int getNumPendingNodes() {
+    return pendingNodes.size();
+  }
+
+  @VisibleForTesting
+  public int getNumTrackedNodes() {
+    return decomNodeBlocks.size();
+  }
+
+  @VisibleForTesting
+  public int getNumNodesChecked() {
+    return monitor.numNodesChecked;
+  }
+
+  /**
+   * Checks to see if DNs have finished decommissioning.
+   * <p/>
+   * Since this is done while holding the namesystem lock, 
+   * the amount of work per monitor tick is limited.
+   */
+  private class Monitor implements Runnable {
+    /**
+     * The maximum number of blocks to check per tick.
+     */
+    private final int numBlocksPerCheck;
+    /**
+     * The maximum number of nodes to check per tick.
      */
-    private final long recheckInterval;
-    /** The number of decommission nodes to check for each interval */
     private final int numNodesPerCheck;
-    /** firstkey can be initialized to anything. */
-    private String firstkey = "";
+    /**
+     * The maximum number of nodes to track in decomNodeBlocks. A value of 0
+     * means no limit.
+     */
+    private final int maxConcurrentTrackedNodes;
+    /**
+     * The number of blocks that have been checked on this tick.
+     */
+    private int numBlocksChecked = 0;
+    /**
+     * The number of nodes that have been checked on this tick. Used for 
+     * testing.
+     */
+    private int numNodesChecked = 0;
+    /**
+     * The last datanode in decomNodeBlocks that we've processed
+     */
+    private DatanodeDescriptor iterkey = new DatanodeDescriptor(new 
+        DatanodeID("", "", "", 0, 0, 0, 0));
 
-    Monitor(int recheckIntervalInSecond, int numNodesPerCheck) {
-      this.recheckInterval = recheckIntervalInSecond * 1000L;
+    Monitor(int numBlocksPerCheck, int numNodesPerCheck, int 
+        maxConcurrentTrackedNodes) {
+      this.numBlocksPerCheck = numBlocksPerCheck;
       this.numNodesPerCheck = numNodesPerCheck;
+      this.maxConcurrentTrackedNodes = maxConcurrentTrackedNodes;
+    }
+
+    private boolean exceededNumBlocksPerCheck() {
+      LOG.trace("Processed {} blocks so far this tick", numBlocksChecked);
+      return numBlocksChecked >= numBlocksPerCheck;
+    }
+
+    @Deprecated
+    private boolean exceededNumNodesPerCheck() {
+      LOG.trace("Processed {} nodes so far this tick", numNodesChecked);
+      return numNodesChecked >= numNodesPerCheck;
     }
 
-    /**
-     * Check decommission status of numNodesPerCheck nodes
-     * for every recheckInterval milliseconds.
-     */
     @Override
     public void run() {
-      for(; namesystem.isRunning(); ) {
-        namesystem.writeLock();
-        try {
-          check();
-        } finally {
-          namesystem.writeUnlock();
+      if (!namesystem.isRunning()) {
+        LOG.info("Namesystem is not running, skipping decommissioning checks"
+            + ".");
+        return;
+      }
+      // Reset the checked count at beginning of each iteration
+      numBlocksChecked = 0;
+      numNodesChecked = 0;
+      // Check decom progress
+      namesystem.writeLock();
+      try {
+        processPendingNodes();
+        check();
+      } finally {
+        namesystem.writeUnlock();
+      }
+      if (numBlocksChecked + numNodesChecked > 0) {
+        LOG.info("Checked {} blocks and {} nodes this tick", numBlocksChecked,
+            numNodesChecked);
+      }
+    }
+
+    /**
+     * Pop datanodes off the pending list and into decomNodeBlocks, 
+     * subject to the maxConcurrentTrackedNodes limit.
+     */
+    private void processPendingNodes() {
+      while (!pendingNodes.isEmpty() &&
+          (maxConcurrentTrackedNodes == 0 ||
+           decomNodeBlocks.size() < maxConcurrentTrackedNodes)) {
+        decomNodeBlocks.put(pendingNodes.poll(), null);
+      }
+    }
+
+    private void check() {
+      final Iterator<Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>>
+          it = new CyclicIteration<>(decomNodeBlocks, iterkey).iterator();
+      final LinkedList<DatanodeDescriptor> toRemove = new LinkedList<>();
+
+      while (it.hasNext()
+          && !exceededNumBlocksPerCheck()
+          && !exceededNumNodesPerCheck()) {
+        numNodesChecked++;
+        final Map.Entry<DatanodeDescriptor, AbstractList<BlockInfoContiguous>>
+            entry = it.next();
+        final DatanodeDescriptor dn = entry.getKey();
+        AbstractList<BlockInfoContiguous> blocks = entry.getValue();
+        boolean fullScan = false;
+        if (blocks == null) {
+          // This is a newly added datanode, run through its list to schedule 
+          // under-replicated blocks for replication and collect the blocks 
+          // that are insufficiently replicated for further tracking
+          LOG.debug("Newly-added node {}, doing full scan to find " +
+              "insufficiently-replicated blocks.", dn);
+          blocks = handleInsufficientlyReplicated(dn);
+          decomNodeBlocks.put(dn, blocks);
+          fullScan = true;
+        } else {
+          // This is a known datanode, check if its # of insufficiently 
+          // replicated blocks has dropped to zero and if it can be decommed
+          LOG.debug("Processing decommission-in-progress node {}", dn);
+          pruneSufficientlyReplicated(dn, blocks);
         }
-  
-        try {
-          Thread.sleep(recheckInterval);
-        } catch (InterruptedException ie) {
-          LOG.warn(this.getClass().getSimpleName() + " interrupted: " + ie);
+        if (blocks.size() == 0) {
+          if (!fullScan) {
+            // If we didn't just do a full scan, need to re-check with the 
+            // full block map.
+            //
+            // We've replicated all the known insufficiently replicated 
+            // blocks. Re-check with the full block map before finally 
+            // marking the datanode as decommissioned 
+            LOG.debug("Node {} has finished replicating current set of "
+                + "blocks, checking with the full block map.", dn);
+            blocks = handleInsufficientlyReplicated(dn);
+            decomNodeBlocks.put(dn, blocks);
+          }
+          // If the full scan is clean AND the node liveness is okay, 
+          // we can finally mark as decommissioned.
+          final boolean isHealthy =
+              blockManager.isNodeHealthyForDecommission(dn);
+          if (blocks.size() == 0 && isHealthy) {
+            setDecommissioned(dn);
+            toRemove.add(dn);
+            LOG.debug("Node {} is sufficiently replicated and healthy, "
+                + "marked as decommissioned.", dn);
+          } else {
+            if (LOG.isDebugEnabled()) {
+              StringBuilder b = new StringBuilder("Node {} ");
+              if (isHealthy) {
+                b.append("is ");
+              } else {
+                b.append("isn't ");
+              }
+              b.append("healthy and still needs to replicate {} more blocks," +
+                  " decommissioning is still in progress.");
+              LOG.debug(b.toString(), dn, blocks.size());
+            }
+          }
+        } else {
+          LOG.debug("Node {} still has {} blocks to replicate "
+                  + "before it is a candidate to finish decommissioning.",
+              dn, blocks.size());
         }
+        iterkey = dn;
+      }
+      // Remove the datanodes that are decommissioned
+      for (DatanodeDescriptor dn : toRemove) {
+        Preconditions.checkState(dn.isDecommissioned(),
+            "Removing a node that is not yet decommissioned!");
+        decomNodeBlocks.remove(dn);
       }
     }
-    
-    private void check() {
-      final DatanodeManager dm = blockmanager.getDatanodeManager();
-      int count = 0;
-      for(Map.Entry<String, DatanodeDescriptor> entry
-          : dm.getDatanodeCyclicIteration(firstkey)) {
-        final DatanodeDescriptor d = entry.getValue();
-        firstkey = entry.getKey();
-
-        if (d.isDecommissionInProgress()) {
-          try {
-            dm.checkDecommissionState(d);
-          } catch(Exception e) {
-            LOG.warn("entry=" + entry, e);
+
+    /**
+     * Removes sufficiently replicated blocks from the block list of a 
+     * datanode.
+     */
+    private void pruneSufficientlyReplicated(final DatanodeDescriptor datanode,
+        AbstractList<BlockInfoContiguous> blocks) {
+      processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
+    }
+
+    /**
+     * Returns a list of blocks on a datanode that are insufficiently 
+     * replicated, i.e. are under-replicated enough to prevent decommission.
+     * <p/>
+     * As part of this, it also schedules replication work for 
+     * any under-replicated blocks.
+     *
+     * @param datanode
+     * @return List of insufficiently replicated blocks 
+     */
+    private AbstractList<BlockInfoContiguous> handleInsufficientlyReplicated(
+        final DatanodeDescriptor datanode) {
+      AbstractList<BlockInfoContiguous> insufficient = new ChunkedArrayList<>();
+      processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
+          insufficient, false);
+      return insufficient;
+    }
+
+    /**
+     * Used while checking if decommission-in-progress datanodes can be marked
+     * as decommissioned. Combines shared logic of 
+     * pruneSufficientlyReplicated and handleInsufficientlyReplicated.
+     *
+     * @param datanode                    Datanode
+     * @param it                          Iterator over the blocks on the
+     *                                    datanode
+     * @param insufficientlyReplicated    Return parameter. If it's not null,
+     *                                    will contain the insufficiently
+     *                                    replicated-blocks from the list.
+     * @param pruneSufficientlyReplicated whether to remove sufficiently
+     *                                    replicated blocks from the iterator
+     * @return true if there are under-replicated blocks in the provided block
+     * iterator, else false.
+     */
+    private void processBlocksForDecomInternal(
+        final DatanodeDescriptor datanode,
+        final Iterator<BlockInfoContiguous> it,
+        final List<BlockInfoContiguous> insufficientlyReplicated,
+        boolean pruneSufficientlyReplicated) {
+      boolean firstReplicationLog = true;
+      int underReplicatedBlocks = 0;
+      int decommissionOnlyReplicas = 0;
+      int underReplicatedInOpenFiles = 0;
+      while (it.hasNext()) {
+        numBlocksChecked++;
+        final BlockInfoContiguous block = it.next();
+        // Remove the block from the list if it's no longer in the block map,
+        // e.g. the containing file has been deleted
+        if (blockManager.blocksMap.getStoredBlock(block) == null) {
+          LOG.trace("Removing unknown block {}", block);
+          it.remove();
+          continue;
+        }
+        BlockCollection bc = blockManager.blocksMap.getBlockCollection(block);
+        if (bc == null) {
+          // Orphan block, will be invalidated eventually. Skip.
+          continue;
+        }
+
+        final NumberReplicas num = blockManager.countNodes(block);
+        final int liveReplicas = num.liveReplicas();
+        final int curReplicas = liveReplicas;
+
+        // Schedule under-replicated blocks for replication if not already
+        // pending
+        if (blockManager.isNeededReplication(block, bc.getBlockReplication(),
+            liveReplicas)) {
+          if (!blockManager.neededReplications.contains(block) &&
+              blockManager.pendingReplications.getNumReplicas(block) == 0 &&
+              namesystem.isPopulatingReplQueues()) {
+            // Process these blocks only when active NN is out of safe mode.
+            blockManager.neededReplications.add(block,
+                curReplicas,
+                num.decommissionedReplicas(),
+                bc.getBlockReplication());
           }
-          if (++count == numNodesPerCheck) {
-            return;
+        }
+
+        // Even if the block is under-replicated, 
+        // it doesn't block decommission if it's sufficiently replicated 
+        if (isSufficientlyReplicated(block, bc, num)) {
+          if (pruneSufficientlyReplicated) {
+            it.remove();
           }
+          continue;
+        }
+
+        // We've found an insufficiently replicated block.
+        if (insufficientlyReplicated != null) {
+          insufficientlyReplicated.add(block);
+        }
+        // Log if this is our first time through
+        if (firstReplicationLog) {
+          logBlockReplicationInfo(block, bc, datanode, num,
+              blockManager.blocksMap.getStorages(block));
+          firstReplicationLog = false;
+        }
+        // Update various counts
+        underReplicatedBlocks++;
+        if (bc.isUnderConstruction()) {
+          underReplicatedInOpenFiles++;
+        }
+        if ((curReplicas == 0) && (num.decommissionedReplicas() > 0)) {
+          decommissionOnlyReplicas++;
         }
       }
+
+      datanode.decommissioningStatus.set(underReplicatedBlocks,
+          decommissionOnlyReplicas,
+          underReplicatedInOpenFiles);
     }
   }
+
+  @VisibleForTesting
+  void runMonitor() throws ExecutionException, InterruptedException {
+    Future f = executor.submit(monitor);
+    f.get();
+  }
 }

+ 19 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -736,10 +736,25 @@
 </property>
 
 <property>
-  <name>dfs.namenode.decommission.nodes.per.interval</name>
-  <value>5</value>
-  <description>The number of nodes namenode checks if decommission is complete
-  in each dfs.namenode.decommission.interval.</description>
+  <name>dfs.namenode.decommission.blocks.per.interval</name>
+  <value>500000</value>
+  <description>The approximate number of blocks to process per 
+      decommission interval, as defined in dfs.namenode.decommission.interval.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.decommission.max.concurrent.tracked.nodes</name>
+  <value>100</value>
+  <description>
+    The maximum number of decommission-in-progress datanodes nodes that will be
+    tracked at one time by the namenode. Tracking a decommission-in-progress
+    datanode consumes additional NN memory proportional to the number of blocks
+    on the datnode. Having a conservative limit reduces the potential impact
+    of decomissioning a large number of nodes at once.
+      
+    A value of 0 means no limit will be enforced.
+  </description>
 </property>
 
 <property>

+ 307 - 105
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -26,39 +27,56 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
+import java.util.concurrent.ExecutionException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import com.google.common.base.Supplier;
+import com.google.common.collect.Lists;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.PathUtils;
+import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class tests the decommissioning of nodes.
  */
 public class TestDecommission {
-  public static final Log LOG = LogFactory.getLog(TestDecommission.class);
+  public static final Logger LOG = LoggerFactory.getLogger(TestDecommission
+      .class);
   static final long seed = 0xDEADBEEFL;
   static final int blockSize = 8192;
   static final int fileSize = 16384;
@@ -89,6 +107,7 @@ public class TestDecommission {
     conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, HEARTBEAT_INTERVAL);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, BLOCKREPORT_INTERVAL_MSEC);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 4);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, NAMENODE_REPLICATION_INTERVAL);
@@ -105,7 +124,7 @@ public class TestDecommission {
     }
   }
   
-  private void writeConfigFile(Path name, ArrayList<String> nodes) 
+  private void writeConfigFile(Path name, List<String> nodes) 
     throws IOException {
     // delete if it already exists
     if (localFileSys.exists(name)) {
@@ -149,7 +168,7 @@ public class TestDecommission {
    * @param downnode - if null, there is no decommissioned node for this file.
    * @return - null if no failure found, else an error message string.
    */
-  private String checkFile(FileSystem fileSys, Path name, int repl,
+  private static String checkFile(FileSystem fileSys, Path name, int repl,
     String downnode, int numDatanodes) throws IOException {
     boolean isNodeDown = (downnode != null);
     // need a raw stream
@@ -261,7 +280,7 @@ public class TestDecommission {
   /* Ask a specific NN to stop decommission of the datanode and wait for each
    * to reach the NORMAL state.
    */
-  private void recomissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
+  private void recommissionNode(int nnIndex, DatanodeInfo decommissionedNode) throws IOException {
     LOG.info("Recommissioning node: " + decommissionedNode);
     writeConfigFile(excludeFile, null);
     refreshNodes(cluster.getNamesystem(nnIndex), conf);
@@ -279,7 +298,7 @@ public class TestDecommission {
       LOG.info("Waiting for node " + node + " to change state to "
           + state + " current state: " + node.getAdminState());
       try {
-        Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+        Thread.sleep(HEARTBEAT_INTERVAL * 500);
       } catch (InterruptedException e) {
         // nothing
       }
@@ -321,28 +340,27 @@ public class TestDecommission {
   }
   
   private void verifyStats(NameNode namenode, FSNamesystem fsn,
-      DatanodeInfo node, boolean decommissioning)
+      DatanodeInfo info, DataNode node, boolean decommissioning)
       throws InterruptedException, IOException {
-    // Do the stats check over 10 iterations
+    // Do the stats check over 10 heartbeats
     for (int i = 0; i < 10; i++) {
       long[] newStats = namenode.getRpcServer().getStats();
 
       // For decommissioning nodes, ensure capacity of the DN is no longer
       // counted. Only used space of the DN is counted in cluster capacity
-      assertEquals(newStats[0], decommissioning ? node.getDfsUsed() : 
-        node.getCapacity());
+      assertEquals(newStats[0],
+          decommissioning ? info.getDfsUsed() : info.getCapacity());
 
       // Ensure cluster used capacity is counted for both normal and
       // decommissioning nodes
-      assertEquals(newStats[1], node.getDfsUsed());
+      assertEquals(newStats[1], info.getDfsUsed());
 
       // For decommissioning nodes, remaining space from the DN is not counted
-      assertEquals(newStats[2], decommissioning ? 0 : node.getRemaining());
+      assertEquals(newStats[2], decommissioning ? 0 : info.getRemaining());
 
       // Ensure transceiver count is same as that DN
-      assertEquals(fsn.getTotalLoad(), node.getXceiverCount());
-      
-      Thread.sleep(HEARTBEAT_INTERVAL * 1000); // Sleep heart beat interval
+      assertEquals(fsn.getTotalLoad(), info.getXceiverCount());
+      DataNodeTestUtils.triggerHeartbeat(node);
     }
   }
 
@@ -406,14 +424,6 @@ public class TestDecommission {
     cluster.shutdown();
   }
   
-  /**
-   * Tests recommission for non federated cluster
-   */
-  @Test(timeout=360000)
-  public void testRecommission() throws IOException {
-    testRecommission(1, 6);
-  }
-
   /**
    * Test decommission for federeated cluster
    */
@@ -500,12 +510,12 @@ public class TestDecommission {
     //    1. the last DN would have been chosen as excess replica, given its
     //    heartbeat is considered old.
     //    Please refer to BlockPlacementPolicyDefault#chooseReplicaToDelete
-    //    2. After recomissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
+    //    2. After recommissionNode finishes, SBN has 3 live replicas ( 0, 1, 2 )
     //    and one excess replica ( 3 )
     // After the fix,
-    //    After recomissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
+    //    After recommissionNode finishes, SBN has 4 live replicas ( 0, 1, 2, 3 )
     Thread.sleep(slowHeartbeatDNwaitTime);
-    recomissionNode(1, decomNodeFromSBN);
+    recommissionNode(1, decomNodeFromSBN);
 
     // Step 3.b, ask ANN to recommission the first DN.
     // To verify the fix, the test makes sure the excess replica picked by ANN
@@ -524,7 +534,7 @@ public class TestDecommission {
     cluster.restartDataNode(nextToLastDNprop);
     cluster.waitActive();
     Thread.sleep(slowHeartbeatDNwaitTime);
-    recomissionNode(0, decommissionedNodeFromANN);
+    recommissionNode(0, decommissionedNodeFromANN);
 
     // Step 3.c, make sure the DN has deleted the block and report to NNs
     cluster.triggerHeartbeats();
@@ -606,69 +616,88 @@ public class TestDecommission {
     cluster.shutdown();
   }
 
+  /**
+   * Test that over-replicated blocks are deleted on recommission.
+   */
+  @Test(timeout=120000)
+  public void testRecommission() throws Exception {
+    final int numDatanodes = 6;
+    try {
+      LOG.info("Starting test testRecommission");
 
-  private void testRecommission(int numNamenodes, int numDatanodes) 
-    throws IOException {
-    LOG.info("Starting test testRecommission");
+      startCluster(1, numDatanodes, conf);
 
-    startCluster(numNamenodes, numDatanodes, conf);
-  
-    ArrayList<ArrayList<DatanodeInfo>> namenodeDecomList = 
-      new ArrayList<ArrayList<DatanodeInfo>>(numNamenodes);
-    for(int i = 0; i < numNamenodes; i++) {
-      namenodeDecomList.add(i, new ArrayList<DatanodeInfo>(numDatanodes));
-    }
-    Path file1 = new Path("testDecommission.dat");
-    int replicas = numDatanodes - 1;
-      
-    for (int i = 0; i < numNamenodes; i++) {
-      ArrayList<DatanodeInfo> decommissionedNodes = namenodeDecomList.get(i);
-      FileSystem fileSys = cluster.getFileSystem(i);
+      final Path file1 = new Path("testDecommission.dat");
+      final int replicas = numDatanodes - 1;
+
+      ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
+      final FileSystem fileSys = cluster.getFileSystem();
+
+      // Write a file to n-1 datanodes
       writeFile(fileSys, file1, replicas);
-        
-      // Decommission one node. Verify that node is decommissioned.
-      DatanodeInfo decomNode = decommissionNode(i, null, decommissionedNodes,
-          AdminStates.DECOMMISSIONED);
+
+      // Decommission one of the datanodes with a replica
+      BlockLocation loc = fileSys.getFileBlockLocations(file1, 0, 1)[0];
+      assertEquals("Unexpected number of replicas from getFileBlockLocations",
+          replicas, loc.getHosts().length);
+      final String toDecomHost = loc.getNames()[0];
+      String toDecomUuid = null;
+      for (DataNode d : cluster.getDataNodes()) {
+        if (d.getDatanodeId().getXferAddr().equals(toDecomHost)) {
+          toDecomUuid = d.getDatanodeId().getDatanodeUuid();
+          break;
+        }
+      }
+      assertNotNull("Could not find a dn with the block!", toDecomUuid);
+      final DatanodeInfo decomNode =
+          decommissionNode(0, toDecomUuid, decommissionedNodes,
+              AdminStates.DECOMMISSIONED);
       decommissionedNodes.add(decomNode);
-        
+      final BlockManager blockManager =
+          cluster.getNamesystem().getBlockManager();
+      final DatanodeManager datanodeManager =
+          blockManager.getDatanodeManager();
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+
       // Ensure decommissioned datanode is not automatically shutdown
-      DFSClient client = getDfsClient(cluster.getNameNode(i), conf);
-      assertEquals("All datanodes must be alive", numDatanodes, 
+      DFSClient client = getDfsClient(cluster.getNameNode(), conf);
+      assertEquals("All datanodes must be alive", numDatanodes,
           client.datanodeReport(DatanodeReportType.LIVE).length);
-      int tries =0;
+
       // wait for the block to be replicated
-      while (tries++ < 20) {
-        try {
-          Thread.sleep(1000);
-          if (checkFile(fileSys, file1, replicas, decomNode.getXferAddr(),
-              numDatanodes) == null) {
-            break;
-          }
-        } catch (InterruptedException ie) {
-        }
-      }
-      assertTrue("Checked if block was replicated after decommission, tried "
-          + tries + " times.", tries < 20);
-
-      // stop decommission and check if the new replicas are removed
-      recomissionNode(0, decomNode);
-      // wait for the block to be deleted
-      tries = 0;
-      while (tries++ < 20) {
-        try {
-          Thread.sleep(1000);
-          if (checkFile(fileSys, file1, replicas, null, numDatanodes) == null) {
-            break;
+      final ExtendedBlock b = DFSTestUtil.getFirstBlock(fileSys, file1);
+      final String uuid = toDecomUuid;
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          BlockInfoContiguous info =
+              blockManager.getStoredBlock(b.getLocalBlock());
+          int count = 0;
+          StringBuilder sb = new StringBuilder("Replica locations: ");
+          for (int i = 0; i < info.numNodes(); i++) {
+            DatanodeDescriptor dn = info.getDatanode(i);
+            sb.append(dn + ", ");
+            if (!dn.getDatanodeUuid().equals(uuid)) {
+              count++;
+            }
           }
-        } catch (InterruptedException ie) {
+          LOG.info(sb.toString());
+          LOG.info("Count: " + count);
+          return count == replicas;
         }
-      }
+      }, 500, 30000);
+
+      // redecommission and wait for over-replication to be fixed
+      recommissionNode(0, decomNode);
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+      DFSTestUtil.waitForReplication(cluster, b, 1, replicas, 0);
+
       cleanupFile(fileSys, file1);
-      assertTrue("Checked if node was recommissioned " + tries + " times.",
-         tries < 20);
-      LOG.info("tried: " + tries + " times before recommissioned");
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
     }
-    cluster.shutdown();
   }
   
   /**
@@ -702,20 +731,35 @@ public class TestDecommission {
       
       FSNamesystem fsn = cluster.getNamesystem(i);
       NameNode namenode = cluster.getNameNode(i);
-      DatanodeInfo downnode = decommissionNode(i, null, null,
+      
+      DatanodeInfo decomInfo = decommissionNode(i, null, null,
           AdminStates.DECOMMISSION_INPROGRESS);
+      DataNode decomNode = getDataNode(decomInfo);
       // Check namenode stats for multiple datanode heartbeats
-      verifyStats(namenode, fsn, downnode, true);
+      verifyStats(namenode, fsn, decomInfo, decomNode, true);
       
       // Stop decommissioning and verify stats
       writeConfigFile(excludeFile, null);
       refreshNodes(fsn, conf);
-      DatanodeInfo ret = NameNodeAdapter.getDatanode(fsn, downnode);
-      waitNodeState(ret, AdminStates.NORMAL);
-      verifyStats(namenode, fsn, ret, false);
+      DatanodeInfo retInfo = NameNodeAdapter.getDatanode(fsn, decomInfo);
+      DataNode retNode = getDataNode(decomInfo);
+      waitNodeState(retInfo, AdminStates.NORMAL);
+      verifyStats(namenode, fsn, retInfo, retNode, false);
     }
   }
-  
+
+  private DataNode getDataNode(DatanodeInfo decomInfo) {
+    DataNode decomNode = null;
+    for (DataNode dn: cluster.getDataNodes()) {
+      if (decomInfo.equals(dn.getDatanodeId())) {
+        decomNode = dn;
+        break;
+      }
+    }
+    assertNotNull("Could not find decomNode in cluster!", decomNode);
+    return decomNode;
+  }
+
   /**
    * Test host/include file functionality. Only datanodes
    * in the include file are allowed to connect to the namenode in a non
@@ -901,9 +945,9 @@ public class TestDecommission {
    * It is not recommended to use a registration name which is not also a
    * valid DNS hostname for the DataNode.  See HDFS-5237 for background.
    */
+  @Ignore
   @Test(timeout=360000)
-  public void testIncludeByRegistrationName() throws IOException,
-      InterruptedException {
+  public void testIncludeByRegistrationName() throws Exception {
     Configuration hdfsConf = new Configuration(conf);
     // Any IPv4 address starting with 127 functions as a "loopback" address
     // which is connected to the current host.  So by choosing 127.0.0.100
@@ -926,15 +970,22 @@ public class TestDecommission {
     refreshNodes(cluster.getNamesystem(0), hdfsConf);
 
     // Wait for the DN to be marked dead.
-    DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
-    while (true) {
-      DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
-      if (info.length == 1) {
-        break;
+    LOG.info("Waiting for DN to be marked as dead.");
+    final DFSClient client = getDfsClient(cluster.getNameNode(0), hdfsConf);
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        BlockManagerTestUtil
+            .checkHeartbeat(cluster.getNamesystem().getBlockManager());
+        try {
+          DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.DEAD);
+          return info.length == 1;
+        } catch (IOException e) {
+          LOG.warn("Failed to check dead DNs", e);
+          return false;
+        }
       }
-      LOG.info("Waiting for datanode to be marked dead");
-      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
-    }
+    }, 500, 5000);
 
     // Use a non-empty include file with our registration name.
     // It should work.
@@ -944,18 +995,169 @@ public class TestDecommission {
     writeConfigFile(hostsFile,  nodes);
     refreshNodes(cluster.getNamesystem(0), hdfsConf);
     cluster.restartDataNode(0);
+    cluster.triggerHeartbeats();
 
     // Wait for the DN to come back.
-    while (true) {
-      DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
-      if (info.length == 1) {
-        Assert.assertFalse(info[0].isDecommissioned());
-        Assert.assertFalse(info[0].isDecommissionInProgress());
-        assertEquals(registrationName, info[0].getHostName());
-        break;
+    LOG.info("Waiting for DN to come back.");
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        BlockManagerTestUtil
+            .checkHeartbeat(cluster.getNamesystem().getBlockManager());
+        try {
+          DatanodeInfo info[] = client.datanodeReport(DatanodeReportType.LIVE);
+          if (info.length == 1) {
+            Assert.assertFalse(info[0].isDecommissioned());
+            Assert.assertFalse(info[0].isDecommissionInProgress());
+            assertEquals(registrationName, info[0].getHostName());
+            return true;
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to check dead DNs", e);
+        }
+        return false;
       }
-      LOG.info("Waiting for datanode to come back");
-      Thread.sleep(HEARTBEAT_INTERVAL * 1000);
+    }, 500, 5000);
+  }
+  
+  @Test(timeout=120000)
+  public void testBlocksPerInterval() throws Exception {
+    Configuration newConf = new Configuration(conf);
+    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+        .setLevel(Level.TRACE);
+    // Turn the blocks per interval way down
+    newConf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_BLOCKS_PER_INTERVAL_KEY,
+        3);
+    // Disable the normal monitor runs
+    newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+        Integer.MAX_VALUE);
+    startCluster(1, 3, newConf);
+    final FileSystem fs = cluster.getFileSystem();
+    final DatanodeManager datanodeManager =
+        cluster.getNamesystem().getBlockManager().getDatanodeManager();
+    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+    // Write a 3 block file, so each node has one block. Should scan 3 nodes.
+    DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 3);
+    // Write another file, should only scan two
+    DFSTestUtil.createFile(fs, new Path("/file2"), 64, (short)3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 2);
+    // One more file, should only scan 1
+    DFSTestUtil.createFile(fs, new Path("/file3"), 64, (short)3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 1);
+    // blocks on each DN now exceeds limit, still scan at least one node
+    DFSTestUtil.createFile(fs, new Path("/file4"), 64, (short)3, 0xBAD1DEA);
+    doDecomCheck(datanodeManager, decomManager, 1);
+  }
+
+  @Deprecated
+  @Test(timeout=120000)
+  public void testNodesPerInterval() throws Exception {
+    Configuration newConf = new Configuration(conf);
+    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+        .setLevel(Level.TRACE);
+    // Set the deprecated configuration key which limits the # of nodes per 
+    // interval
+    newConf.setInt("dfs.namenode.decommission.nodes.per.interval", 1);
+    // Disable the normal monitor runs
+    newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY,
+        Integer.MAX_VALUE);
+    startCluster(1, 3, newConf);
+    final FileSystem fs = cluster.getFileSystem();
+    final DatanodeManager datanodeManager =
+        cluster.getNamesystem().getBlockManager().getDatanodeManager();
+    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+    // Write a 3 block file, so each node has one block. Should scan 1 node 
+    // each time.
+    DFSTestUtil.createFile(fs, new Path("/file1"), 64, (short) 3, 0xBAD1DEA);
+    for (int i=0; i<3; i++) {
+      doDecomCheck(datanodeManager, decomManager, 1);
     }
   }
+
+  private void doDecomCheck(DatanodeManager datanodeManager,
+      DecommissionManager decomManager, int expectedNumCheckedNodes)
+      throws IOException, ExecutionException, InterruptedException {
+    // Decom all nodes
+    ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
+    for (DataNode d: cluster.getDataNodes()) {
+      DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
+          decommissionedNodes,
+          AdminStates.DECOMMISSION_INPROGRESS);
+      decommissionedNodes.add(dn);
+    }
+    // Run decom scan and check
+    BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+    assertEquals("Unexpected # of nodes checked", expectedNumCheckedNodes, 
+        decomManager.getNumNodesChecked());
+    // Recommission all nodes
+    for (DatanodeInfo dn : decommissionedNodes) {
+      recommissionNode(0, dn);
+    }
+  }
+
+  @Test(timeout=120000)
+  public void testPendingNodes() throws Exception {
+    Configuration newConf = new Configuration(conf);
+    org.apache.log4j.Logger.getLogger(DecommissionManager.class)
+        .setLevel(Level.TRACE);
+    // Only allow one node to be decom'd at a time
+    newConf.setInt(
+        DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_MAX_CONCURRENT_TRACKED_NODES,
+        1);
+    // Disable the normal monitor runs
+    newConf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 
+        Integer.MAX_VALUE);
+    startCluster(1, 3, newConf);
+    final FileSystem fs = cluster.getFileSystem();
+    final DatanodeManager datanodeManager =
+        cluster.getNamesystem().getBlockManager().getDatanodeManager();
+    final DecommissionManager decomManager = datanodeManager.getDecomManager();
+
+    // Keep a file open to prevent decom from progressing
+    HdfsDataOutputStream open1 =
+        (HdfsDataOutputStream) fs.create(new Path("/openFile1"), (short)3);
+    // Flush and trigger block reports so the block definitely shows up on NN
+    open1.write(123);
+    open1.hflush();
+    for (DataNode d: cluster.getDataNodes()) {
+      DataNodeTestUtils.triggerBlockReport(d);
+    }
+    // Decom two nodes, so one is still alive
+    ArrayList<DatanodeInfo> decommissionedNodes = Lists.newArrayList();
+    for (int i=0; i<2; i++) {
+      final DataNode d = cluster.getDataNodes().get(i);
+      DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(), 
+          decommissionedNodes, 
+          AdminStates.DECOMMISSION_INPROGRESS);
+      decommissionedNodes.add(dn);
+    }
+
+    for (int i=2; i>=0; i--) {
+      assertTrackedAndPending(decomManager, 0, i);
+      BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+    }
+
+    // Close file, try to decom the last node, should get stuck in tracked
+    open1.close();
+    final DataNode d = cluster.getDataNodes().get(2);
+    DatanodeInfo dn = decommissionNode(0, d.getDatanodeUuid(),
+        decommissionedNodes,
+        AdminStates.DECOMMISSION_INPROGRESS);
+    decommissionedNodes.add(dn);
+    BlockManagerTestUtil.recheckDecommissionState(datanodeManager);
+    
+    assertTrackedAndPending(decomManager, 1, 0);
+  }
+
+  private void assertTrackedAndPending(DecommissionManager decomManager,
+      int tracked, int pending) {
+    assertEquals("Unexpected number of tracked nodes", tracked,
+        decomManager.getNumTrackedNodes());
+    assertEquals("Unexpected number of pending nodes", pending,
+        decomManager.getNumPendingNodes());
+  }
 }

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerTestUtil.java

@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.ExecutionException;
 
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -300,9 +301,8 @@ public class BlockManagerTestUtil {
    * Have DatanodeManager check decommission state.
    * @param dm the DatanodeManager to manipulate
    */
-  public static void checkDecommissionState(DatanodeManager dm,
-      DatanodeDescriptor node) {
-    dm.checkDecommissionState(node);
+  public static void recheckDecommissionState(DatanodeManager dm)
+      throws ExecutionException, InterruptedException {
+    dm.getDecomManager().runMonitor();
   }
-
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicyConsiderLoad.java

@@ -137,7 +137,7 @@ public class TestReplicationPolicyConsiderLoad {
       // returns false
       for (int i = 0; i < 3; i++) {
         DatanodeDescriptor d = dnManager.getDatanode(dnrList.get(i));
-        dnManager.startDecommission(d);
+        dnManager.getDecomManager().startDecommission(d);
         d.setDecommissioned();
       }
       assertEquals((double)load/3, dnManager.getFSClusterStats()

+ 37 - 22
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java

@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
@@ -29,7 +28,6 @@ import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Random;
-import java.util.concurrent.TimeoutException;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
@@ -53,7 +51,12 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DecommissionManager;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -90,7 +93,8 @@ public class TestDecommissioningStatus {
     conf.set(DFSConfigKeys.DFS_HOSTS_EXCLUDE, excludeFile.toUri().getPath());
     Path includeFile = new Path(dir, "include");
     conf.set(DFSConfigKeys.DFS_HOSTS, includeFile.toUri().getPath());
-    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 2000);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 
+        1000);
     conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
         4);
@@ -104,6 +108,9 @@ public class TestDecommissioningStatus {
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDatanodes).build();
     cluster.waitActive();
     fileSys = cluster.getFileSystem();
+    cluster.getNamesystem().getBlockManager().getDatanodeManager()
+        .setHeartbeatExpireInterval(3000);
+    Logger.getLogger(DecommissionManager.class).setLevel(Level.DEBUG);
   }
 
   @AfterClass
@@ -199,13 +206,16 @@ public class TestDecommissioningStatus {
   private void checkDecommissionStatus(DatanodeDescriptor decommNode,
       int expectedUnderRep, int expectedDecommissionOnly,
       int expectedUnderRepInOpenFiles) {
-    assertEquals(decommNode.decommissioningStatus.getUnderReplicatedBlocks(),
-        expectedUnderRep);
+    assertEquals("Unexpected num under-replicated blocks",
+        expectedUnderRep,
+        decommNode.decommissioningStatus.getUnderReplicatedBlocks());
+    assertEquals("Unexpected number of decom-only replicas",
+        expectedDecommissionOnly,
+        decommNode.decommissioningStatus.getDecommissionOnlyReplicas());
     assertEquals(
-        decommNode.decommissioningStatus.getDecommissionOnlyReplicas(),
-        expectedDecommissionOnly);
-    assertEquals(decommNode.decommissioningStatus
-        .getUnderReplicatedInOpenFiles(), expectedUnderRepInOpenFiles);
+        "Unexpected number of replicas in under-replicated open files",
+        expectedUnderRepInOpenFiles,
+        decommNode.decommissioningStatus.getUnderReplicatedInOpenFiles());
   }
 
   private void checkDFSAdminDecommissionStatus(
@@ -257,7 +267,7 @@ public class TestDecommissioningStatus {
    * Tests Decommissioning Status in DFS.
    */
   @Test
-  public void testDecommissionStatus() throws IOException, InterruptedException {
+  public void testDecommissionStatus() throws Exception {
     InetSocketAddress addr = new InetSocketAddress("localhost", cluster
         .getNameNodePort());
     DFSClient client = new DFSClient(addr, conf);
@@ -266,7 +276,7 @@ public class TestDecommissioningStatus {
     DistributedFileSystem fileSys = cluster.getFileSystem();
     DFSAdmin admin = new DFSAdmin(cluster.getConfiguration(0));
 
-    short replicas = 2;
+    short replicas = numDatanodes;
     //
     // Decommission one node. Verify the decommission status
     // 
@@ -275,7 +285,9 @@ public class TestDecommissioningStatus {
 
     Path file2 = new Path("decommission1.dat");
     FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas);
-    Thread.sleep(5000);
+    for (DataNode d: cluster.getDataNodes()) {
+      DataNodeTestUtils.triggerBlockReport(d);
+    }
 
     FSNamesystem fsn = cluster.getNamesystem();
     final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
@@ -283,19 +295,22 @@ public class TestDecommissioningStatus {
       String downnode = decommissionNode(fsn, client, localFileSys, iteration);
       dm.refreshNodes(conf);
       decommissionedNodes.add(downnode);
-      Thread.sleep(5000);
+      BlockManagerTestUtil.recheckDecommissionState(dm);
       final List<DatanodeDescriptor> decommissioningNodes = dm.getDecommissioningNodes();
       if (iteration == 0) {
         assertEquals(decommissioningNodes.size(), 1);
         DatanodeDescriptor decommNode = decommissioningNodes.get(0);
-        checkDecommissionStatus(decommNode, 4, 0, 2);
+        checkDecommissionStatus(decommNode, 3, 0, 1);
         checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 1),
             fileSys, admin);
       } else {
         assertEquals(decommissioningNodes.size(), 2);
         DatanodeDescriptor decommNode1 = decommissioningNodes.get(0);
         DatanodeDescriptor decommNode2 = decommissioningNodes.get(1);
-        checkDecommissionStatus(decommNode1, 4, 4, 2);
+        // This one is still 3,3,1 since it passed over the UC block 
+        // earlier, before node 2 was decommed
+        checkDecommissionStatus(decommNode1, 3, 3, 1);
+        // This one is 4,4,2 since it has the full state
         checkDecommissionStatus(decommNode2, 4, 4, 2);
         checkDFSAdminDecommissionStatus(decommissioningNodes.subList(0, 2),
             fileSys, admin);
@@ -317,8 +332,7 @@ public class TestDecommissioningStatus {
    * the replication process after it rejoins the cluster.
    */
   @Test(timeout=120000)
-  public void testDecommissionStatusAfterDNRestart()
-      throws IOException, InterruptedException {
+  public void testDecommissionStatusAfterDNRestart() throws Exception {
     DistributedFileSystem fileSys =
         (DistributedFileSystem)cluster.getFileSystem();
 
@@ -357,7 +371,7 @@ public class TestDecommissioningStatus {
     BlockManagerTestUtil.checkHeartbeat(fsn.getBlockManager());
 
     // Force DatanodeManager to check decommission state.
-    BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
+    BlockManagerTestUtil.recheckDecommissionState(dm);
 
     // Verify that the DN remains in DECOMMISSION_INPROGRESS state.
     assertTrue("the node should be DECOMMISSION_IN_PROGRESSS",
@@ -371,7 +385,7 @@ public class TestDecommissioningStatus {
     // Delete the under-replicated file, which should let the 
     // DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
     cleanupFile(fileSys, f);
-    BlockManagerTestUtil.checkDecommissionState(dm, dead.get(0));
+    BlockManagerTestUtil.recheckDecommissionState(dm);
     assertTrue("the node should be decommissioned",
         dead.get(0).isDecommissioned());
 
@@ -392,8 +406,9 @@ public class TestDecommissioningStatus {
    * DECOMMISSIONED
    */
   @Test(timeout=120000)
-  public void testDecommissionDeadDN()
-      throws IOException, InterruptedException, TimeoutException {
+  public void testDecommissionDeadDN() throws Exception {
+    Logger log = Logger.getLogger(DecommissionManager.class);
+    log.setLevel(Level.DEBUG);
     DatanodeID dnID = cluster.getDataNodes().get(0).getDatanodeId();
     String dnName = dnID.getXferAddr();
     DataNodeProperties stoppedDN = cluster.stopDataNode(0);
@@ -404,7 +419,7 @@ public class TestDecommissioningStatus {
     DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
     decommissionNode(fsn, localFileSys, dnName);
     dm.refreshNodes(conf);
-    BlockManagerTestUtil.checkDecommissionState(dm, dnDescriptor);
+    BlockManagerTestUtil.recheckDecommissionState(dm);
     assertTrue(dnDescriptor.isDecommissioned());
 
     // Add the node back

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsck.java

@@ -1305,7 +1305,7 @@ public class TestFsck {
           .getBlockManager().getBlockCollection(eb.getLocalBlock())
           .getBlocks()[0].getDatanode(0);
       cluster.getNameNode().getNamesystem().getBlockManager()
-          .getDatanodeManager().startDecommission(dn);
+          .getDatanodeManager().getDecomManager().startDecommission(dn);
       String dnName = dn.getXferAddr();
 
       //wait for decommission start

+ 1 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

@@ -30,8 +30,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSOutputStream;
@@ -240,7 +238,7 @@ public class TestNamenodeCapacityReport {
         DatanodeDescriptor dnd =
             dnm.getDatanode(datanodes.get(i).getDatanodeId());
         expectedInServiceLoad -= dnd.getXceiverCount();
-        dnm.startDecommission(dnd);
+        dnm.getDecomManager().startDecommission(dnd);
         DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
         Thread.sleep(100);
         checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);