Procházet zdrojové kódy

HDFS-9390. Block management for maintenance states.

Ming Ma před 8 roky
rodič
revize
b61fb267b9
23 změnil soubory, kde provedl 1240 přidání a 389 odebrání
  1. 5 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
  2. 31 22
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
  3. 4 7
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java
  4. 173 76
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  5. 2 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java
  6. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
  7. 21 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
  8. 33 14
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  9. 100 42
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
  10. 10 6
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java
  11. 13 10
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
  12. 21 26
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java
  13. 28 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java
  14. 4 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java
  15. 5 4
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
  16. 7 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
  17. 14 6
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java
  18. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java
  19. 692 83
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java
  20. 5 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
  21. 16 41
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java
  22. 53 25
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java
  23. 1 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java

+ 5 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java

@@ -220,6 +220,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
       "dfs.namenode.reconstruction.pending.timeout-sec";
   public static final int     DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
 
+  public static final String  DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY =
+      "dfs.namenode.maintenance.replication.min";
+  public static final int     DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT
+      = 1;
+
   public static final String  DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY =
       HdfsClientConfigKeys.DeprecatedKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
   public static final int     DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;

+ 31 - 22
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java

@@ -124,48 +124,57 @@ public class DFSUtil {
   }
 
   /**
-   * Compartor for sorting DataNodeInfo[] based on decommissioned states.
-   * Decommissioned nodes are moved to the end of the array on sorting with
-   * this compartor.
+   * Comparator for sorting DataNodeInfo[] based on
+   * decommissioned and entering_maintenance states.
    */
-  public static final Comparator<DatanodeInfo> DECOM_COMPARATOR = 
-    new Comparator<DatanodeInfo>() {
-      @Override
-      public int compare(DatanodeInfo a, DatanodeInfo b) {
-        return a.isDecommissioned() == b.isDecommissioned() ? 0 : 
-          a.isDecommissioned() ? 1 : -1;
+  public static class ServiceComparator implements Comparator<DatanodeInfo> {
+    @Override
+    public int compare(DatanodeInfo a, DatanodeInfo b) {
+      // Decommissioned nodes will still be moved to the end of the list
+      if (a.isDecommissioned()) {
+        return b.isDecommissioned() ? 0 : 1;
+      } else if (b.isDecommissioned()) {
+        return -1;
       }
-    };
 
+      // ENTERING_MAINTENANCE nodes should be after live nodes.
+      if (a.isEnteringMaintenance()) {
+        return b.isEnteringMaintenance() ? 0 : 1;
+      } else if (b.isEnteringMaintenance()) {
+        return -1;
+      } else {
+        return 0;
+      }
+    }
+  }
 
   /**
-   * Comparator for sorting DataNodeInfo[] based on decommissioned/stale states.
-   * Decommissioned/stale nodes are moved to the end of the array on sorting
-   * with this comparator.
-   */ 
+   * Comparator for sorting DataNodeInfo[] based on
+   * stale, decommissioned and entering_maintenance states.
+   * Order: live -> stale -> entering_maintenance -> decommissioned
+   */
   @InterfaceAudience.Private 
-  public static class DecomStaleComparator implements Comparator<DatanodeInfo> {
+  public static class ServiceAndStaleComparator extends ServiceComparator {
     private final long staleInterval;
 
     /**
-     * Constructor of DecomStaleComparator
+     * Constructor of ServiceAndStaleComparator
      * 
      * @param interval
      *          The time interval for marking datanodes as stale is passed from
      *          outside, since the interval may be changed dynamically
      */
-    public DecomStaleComparator(long interval) {
+    public ServiceAndStaleComparator(long interval) {
       this.staleInterval = interval;
     }
 
     @Override
     public int compare(DatanodeInfo a, DatanodeInfo b) {
-      // Decommissioned nodes will still be moved to the end of the list
-      if (a.isDecommissioned()) {
-        return b.isDecommissioned() ? 0 : 1;
-      } else if (b.isDecommissioned()) {
-        return -1;
+      int ret = super.compare(a, b);
+      if (ret != 0) {
+        return ret;
       }
+
       // Stale nodes will be moved behind the normal nodes
       boolean aStale = a.isStale(staleInterval);
       boolean bStale = b.isStale(staleInterval);

+ 4 - 7
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java

@@ -989,20 +989,17 @@ public class Dispatcher {
   }
 
   private boolean shouldIgnore(DatanodeInfo dn) {
-    // ignore decommissioned nodes
-    final boolean decommissioned = dn.isDecommissioned();
-    // ignore decommissioning nodes
-    final boolean decommissioning = dn.isDecommissionInProgress();
+    // ignore out-of-service nodes
+    final boolean outOfService = !dn.isInService();
     // ignore nodes in exclude list
     final boolean excluded = Util.isExcluded(excludedNodes, dn);
     // ignore nodes not in the include list (if include list is not empty)
     final boolean notIncluded = !Util.isIncluded(includedNodes, dn);
 
-    if (decommissioned || decommissioning || excluded || notIncluded) {
+    if (outOfService || excluded || notIncluded) {
       if (LOG.isTraceEnabled()) {
         LOG.trace("Excluding datanode " + dn
-            + ": decommissioned=" + decommissioned
-            + ", decommissioning=" + decommissioning
+            + ": outOfService=" + outOfService
             + ", excluded=" + excluded
             + ", notIncluded=" + notIncluded);
       }

+ 173 - 76
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -126,6 +126,29 @@ import org.slf4j.LoggerFactory;
 
 /**
  * Keeps information related to the blocks stored in the Hadoop cluster.
+ * For block state management, it tries to maintain the  safety
+ * property of "# of live replicas == # of expected redundancy" under
+ * any events such as decommission, namenode failover, datanode failure.
+ *
+ * The motivation of maintenance mode is to allow admins quickly repair nodes
+ * without paying the cost of decommission. Thus with maintenance mode,
+ * # of live replicas doesn't have to be equal to # of expected redundancy.
+ * If any of the replica is in maintenance mode, the safety property
+ * is extended as follows. These property still apply for the case of zero
+ * maintenance replicas, thus we can use these safe property for all scenarios.
+ * a. # of live replicas >= # of min replication for maintenance.
+ * b. # of live replicas <= # of expected redundancy.
+ * c. # of live replicas and maintenance replicas >= # of expected redundancy.
+ *
+ * For regular replication, # of min live replicas for maintenance is determined
+ * by DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY. This number has to <=
+ * DFS_NAMENODE_REPLICATION_MIN_KEY.
+ * For erasure encoding, # of min live replicas for maintenance is
+ * BlockInfoStriped#getRealDataBlockNum.
+ *
+ * Another safety property is to satisfy the block placement policy. While the
+ * policy is configurable, the replicas the policy is applied to are the live
+ * replicas + maintenance replicas.
  */
 @InterfaceAudience.Private
 public class BlockManager implements BlockStatsMXBean {
@@ -341,6 +364,11 @@ public class BlockManager implements BlockStatsMXBean {
 
   private final BlockIdManager blockIdManager;
 
+  /** Minimum live replicas needed for the datanode to be transitioned
+   * from ENTERING_MAINTENANCE to IN_MAINTENANCE.
+   */
+  private final short minReplicationToBeInMaintenance;
+
   public BlockManager(final Namesystem namesystem, boolean haEnabled,
       final Configuration conf) throws IOException {
     this.namesystem = namesystem;
@@ -373,13 +401,13 @@ public class BlockManager implements BlockStatsMXBean {
     this.maxCorruptFilesReturned = conf.getInt(
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED_KEY,
       DFSConfigKeys.DFS_DEFAULT_MAX_CORRUPT_FILES_RETURNED);
-    this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY, 
-                                          DFSConfigKeys.DFS_REPLICATION_DEFAULT);
+    this.defaultReplication = conf.getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
+        DFSConfigKeys.DFS_REPLICATION_DEFAULT);
 
-    final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY, 
-                                 DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
+    final int maxR = conf.getInt(DFSConfigKeys.DFS_REPLICATION_MAX_KEY,
+        DFSConfigKeys.DFS_REPLICATION_MAX_DEFAULT);
     final int minR = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY,
-                                 DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
     if (minR <= 0)
       throw new IOException("Unexpected configuration parameters: "
           + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
@@ -407,7 +435,7 @@ public class BlockManager implements BlockStatsMXBean {
     this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
     this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
 
-    this.replicationRecheckInterval = 
+    this.replicationRecheckInterval =
       conf.getTimeDuration(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
           DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT,
           TimeUnit.SECONDS) * 1000L;
@@ -428,7 +456,7 @@ public class BlockManager implements BlockStatsMXBean {
     this.encryptDataTransfer =
         conf.getBoolean(DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY,
             DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT);
-    
+
     this.maxNumBlocksToLog =
         conf.getLong(DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
             DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -438,6 +466,25 @@ public class BlockManager implements BlockStatsMXBean {
     this.getBlocksMinBlockSize = conf.getLongBytes(
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY,
         DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT);
+
+    final int minMaintenanceR = conf.getInt(
+        DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
+        DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT);
+
+    if (minMaintenanceR < 0) {
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+          + " = " + minMaintenanceR + " < 0");
+    }
+    if (minMaintenanceR > minR) {
+      throw new IOException("Unexpected configuration parameters: "
+          + DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY
+          + " = " + minMaintenanceR + " > "
+          + DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY
+          + " = " + minR);
+    }
+    this.minReplicationToBeInMaintenance = (short)minMaintenanceR;
+
     this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
 
     bmSafeMode = new BlockManagerSafeMode(this, namesystem, haEnabled, conf);
@@ -668,7 +715,7 @@ public class BlockManager implements BlockStatsMXBean {
     // Dump all datanodes
     getDatanodeManager().datanodeDump(out);
   }
-  
+
   /**
    * Dump the metadata for the given block in a human-readable
    * form.
@@ -697,12 +744,12 @@ public class BlockManager implements BlockStatsMXBean {
       out.print(fileName + ": ");
     }
     // l: == live:, d: == decommissioned c: == corrupt e: == excess
-    out.print(block + ((usableReplicas > 0)? "" : " MISSING") + 
+    out.print(block + ((usableReplicas > 0)? "" : " MISSING") +
               " (replicas:" +
               " l: " + numReplicas.liveReplicas() +
               " d: " + numReplicas.decommissionedAndDecommissioning() +
               " c: " + numReplicas.corruptReplicas() +
-              " e: " + numReplicas.excessReplicas() + ") "); 
+              " e: " + numReplicas.excessReplicas() + ") ");
 
     Collection<DatanodeDescriptor> corruptNodes = 
                                   corruptReplicas.getNodes(block);
@@ -750,6 +797,18 @@ public class BlockManager implements BlockStatsMXBean {
     }
   }
 
+  public short getMinReplicationToBeInMaintenance() {
+    return minReplicationToBeInMaintenance;
+  }
+
+  private short getMinMaintenanceStorageNum(BlockInfo block) {
+    if (block.isStriped()) {
+      return ((BlockInfoStriped) block).getRealDataBlockNum();
+    } else {
+      return minReplicationToBeInMaintenance;
+    }
+  }
+
   public boolean hasMinStorage(BlockInfo block) {
     return countNodes(block).liveReplicas() >= getMinStorageNum(block);
   }
@@ -942,7 +1001,7 @@ public class BlockManager implements BlockStatsMXBean {
     NumberReplicas replicas = countNodes(lastBlock);
     neededReconstruction.remove(lastBlock, replicas.liveReplicas(),
         replicas.readOnlyReplicas(),
-        replicas.decommissionedAndDecommissioning(), getRedundancy(lastBlock));
+        replicas.outOfServiceReplicas(), getExpectedRedundancyNum(lastBlock));
     pendingReconstruction.remove(lastBlock);
 
     // remove this block from the list of pending blocks to be deleted. 
@@ -1078,7 +1137,8 @@ public class BlockManager implements BlockStatsMXBean {
     } else {
       isCorrupt = numCorruptReplicas != 0 && numCorruptReplicas == numNodes;
     }
-    final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
+    int numMachines = isCorrupt ? numNodes: numNodes - numCorruptReplicas;
+    numMachines -= numReplicas.maintenanceNotForReadReplicas();
     DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
     final byte[] blockIndices = blk.isStriped() ? new byte[numMachines] : null;
     int j = 0, i = 0;
@@ -1086,11 +1146,17 @@ public class BlockManager implements BlockStatsMXBean {
       final boolean noCorrupt = (numCorruptReplicas == 0);
       for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
         if (storage.getState() != State.FAILED) {
+          final DatanodeDescriptor d = storage.getDatanodeDescriptor();
+          // Don't pick IN_MAINTENANCE or dead ENTERING_MAINTENANCE states.
+          if (d.isInMaintenance()
+              || (d.isEnteringMaintenance() && !d.isAlive())) {
+            continue;
+          }
+
           if (noCorrupt) {
             machines[j++] = storage;
             i = setBlockIndices(blk, blockIndices, i, storage);
           } else {
-            final DatanodeDescriptor d = storage.getDatanodeDescriptor();
             final boolean replicaCorrupt = isReplicaCorrupt(blk, d);
             if (isCorrupt || !replicaCorrupt) {
               machines[j++] = storage;
@@ -1106,7 +1172,7 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     assert j == machines.length :
-      "isCorrupt: " + isCorrupt + 
+      "isCorrupt: " + isCorrupt +
       " numMachines: " + numMachines +
       " numNodes: " + numNodes +
       " numCorrupt: " + numCorruptNodes +
@@ -1700,8 +1766,11 @@ public class BlockManager implements BlockStatsMXBean {
     return scheduledWork;
   }
 
+  // Check if the number of live + pending replicas satisfies
+  // the expected redundancy.
   boolean hasEnoughEffectiveReplicas(BlockInfo block,
-      NumberReplicas numReplicas, int pendingReplicaNum, int required) {
+      NumberReplicas numReplicas, int pendingReplicaNum) {
+    int required = getExpectedLiveRedundancyNum(block, numReplicas);
     int numEffectiveReplicas = numReplicas.liveReplicas() + pendingReplicaNum;
     return (numEffectiveReplicas >= required) &&
         (pendingReplicaNum > 0 || isPlacementPolicySatisfied(block));
@@ -1716,8 +1785,6 @@ public class BlockManager implements BlockStatsMXBean {
       return null;
     }
 
-    short requiredRedundancy = getExpectedRedundancyNum(block);
-
     // get a source data-node
     List<DatanodeDescriptor> containingNodes = new ArrayList<>();
     List<DatanodeStorageInfo> liveReplicaNodes = new ArrayList<>();
@@ -1726,6 +1793,8 @@ public class BlockManager implements BlockStatsMXBean {
     final DatanodeDescriptor[] srcNodes = chooseSourceDatanodes(block,
         containingNodes, liveReplicaNodes, numReplicas,
         liveBlockIndices, priority);
+    short requiredRedundancy = getExpectedLiveRedundancyNum(block,
+        numReplicas);
     if(srcNodes == null || srcNodes.length == 0) {
       // block can not be reconstructed from any node
       LOG.debug("Block " + block + " cannot be reconstructed " +
@@ -1738,8 +1807,7 @@ public class BlockManager implements BlockStatsMXBean {
     assert liveReplicaNodes.size() >= numReplicas.liveReplicas();
 
     int pendingNum = pendingReconstruction.getNumReplicas(block);
-    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
-        requiredRedundancy)) {
+    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
       neededReconstruction.remove(block, priority);
       blockLog.debug("BLOCK* Removing {} from neededReconstruction as" +
           " it has enough replicas", block);
@@ -1763,9 +1831,11 @@ public class BlockManager implements BlockStatsMXBean {
 
       // should reconstruct all the internal blocks before scheduling
       // replication task for decommissioning node(s).
-      if (additionalReplRequired - numReplicas.decommissioning() > 0) {
-        additionalReplRequired = additionalReplRequired
-            - numReplicas.decommissioning();
+      if (additionalReplRequired - numReplicas.decommissioning() -
+          numReplicas.liveEnteringMaintenanceReplicas() > 0) {
+        additionalReplRequired = additionalReplRequired -
+            numReplicas.decommissioning() -
+            numReplicas.liveEnteringMaintenanceReplicas();
       }
       byte[] indices = new byte[liveBlockIndices.size()];
       for (int i = 0 ; i < liveBlockIndices.size(); i++) {
@@ -1807,11 +1877,11 @@ public class BlockManager implements BlockStatsMXBean {
     }
 
     // do not schedule more if enough replicas is already pending
-    final short requiredRedundancy = getExpectedRedundancyNum(block);
     NumberReplicas numReplicas = countNodes(block);
+    final short requiredRedundancy =
+        getExpectedLiveRedundancyNum(block, numReplicas);
     final int pendingNum = pendingReconstruction.getNumReplicas(block);
-    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum,
-        requiredRedundancy)) {
+    if (hasEnoughEffectiveReplicas(block, numReplicas, pendingNum)) {
       neededReconstruction.remove(block, priority);
       rw.resetTargets();
       blockLog.debug("BLOCK* Removing {} from neededReplications as" +
@@ -1880,7 +1950,7 @@ public class BlockManager implements BlockStatsMXBean {
    * @throws IOException
    *           if the number of targets < minimum replication.
    * @see BlockPlacementPolicy#chooseTarget(String, int, Node,
-   *      Set, long, List, BlockStoragePolicy)
+   *      Set, long, List, BlockStoragePolicy, EnumSet)
    */
   public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
       final int numOfReplicas, final Node client,
@@ -1987,13 +2057,15 @@ public class BlockManager implements BlockStatsMXBean {
         continue;
       }
 
-      // never use already decommissioned nodes or unknown state replicas
-      if (state == null || state == StoredReplicaState.DECOMMISSIONED) {
+      // never use already decommissioned nodes, maintenance node not
+      // suitable for read or unknown state replicas.
+      if (state == null || state == StoredReplicaState.DECOMMISSIONED
+          || state == StoredReplicaState.MAINTENANCE_NOT_FOR_READ) {
         continue;
       }
 
       if (priority != LowRedundancyBlocks.QUEUE_HIGHEST_PRIORITY
-          && !node.isDecommissionInProgress() 
+          && (!node.isDecommissionInProgress() && !node.isEnteringMaintenance())
           && node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams) {
         continue; // already reached replication limit
       }
@@ -2045,10 +2117,10 @@ public class BlockManager implements BlockStatsMXBean {
             continue;
           }
           NumberReplicas num = countNodes(timedOutItems[i]);
-          if (isNeededReconstruction(bi, num.liveReplicas())) {
+          if (isNeededReconstruction(bi, num)) {
             neededReconstruction.add(bi, num.liveReplicas(),
-                num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
-                getRedundancy(bi));
+                num.readOnlyReplicas(), num.outOfServiceReplicas(),
+                getExpectedRedundancyNum(bi));
           }
         }
       } finally {
@@ -3014,10 +3086,9 @@ public class BlockManager implements BlockStatsMXBean {
 
     // handle low redundancy/extra redundancy
     short fileRedundancy = getExpectedRedundancyNum(storedBlock);
-    if (!isNeededReconstruction(storedBlock, numCurrentReplica)) {
+    if (!isNeededReconstruction(storedBlock, num, pendingNum)) {
       neededReconstruction.remove(storedBlock, numCurrentReplica,
-          num.readOnlyReplicas(),
-          num.decommissionedAndDecommissioning(), fileRedundancy);
+          num.readOnlyReplicas(), num.outOfServiceReplicas(), fileRedundancy);
     } else {
       updateNeededReconstructions(storedBlock, curReplicaDelta, 0);
     }
@@ -3040,6 +3111,10 @@ public class BlockManager implements BlockStatsMXBean {
     return storedBlock;
   }
 
+  // If there is any maintenance replica, we don't have to restore
+  // the condition of live + maintenance == expected. We allow
+  // live + maintenance >= expected. The extra redundancy will be removed
+  // when the maintenance node changes to live.
   private boolean shouldProcessExtraRedundancy(NumberReplicas num,
       int expectedNum) {
     final int numCurrent = num.liveReplicas();
@@ -3255,9 +3330,9 @@ public class BlockManager implements BlockStatsMXBean {
     NumberReplicas num = countNodes(block);
     final int numCurrentReplica = num.liveReplicas();
     // add to low redundancy queue if need to be
-    if (isNeededReconstruction(block, numCurrentReplica)) {
+    if (isNeededReconstruction(block, num)) {
       if (neededReconstruction.add(block, numCurrentReplica,
-          num.readOnlyReplicas(), num.decommissionedAndDecommissioning(),
+          num.readOnlyReplicas(), num.outOfServiceReplicas(),
           expectedRedundancy)) {
         return MisReplicationResult.UNDER_REPLICATED;
       }
@@ -3290,9 +3365,9 @@ public class BlockManager implements BlockStatsMXBean {
 
     // update neededReconstruction priority queues
     b.setReplication(newRepl);
+    NumberReplicas num = countNodes(b);
     updateNeededReconstructions(b, 0, newRepl - oldRepl);
-
-    if (oldRepl > newRepl) {
+    if (shouldProcessExtraRedundancy(num, newRepl)) {
       processExtraRedundancyBlock(b, newRepl, null, null);
     }
   }
@@ -3318,14 +3393,14 @@ public class BlockManager implements BlockStatsMXBean {
       }
       final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
       if (storage.areBlockContentsStale()) {
-        LOG.trace("BLOCK* processOverReplicatedBlock: Postponing {}"
+        LOG.trace("BLOCK* processExtraRedundancyBlock: Postponing {}"
             + " since storage {} does not yet have up-to-date information.",
             block, storage);
         postponeBlock(block);
         return;
       }
       if (!isExcess(cur, block)) {
-        if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
+        if (cur.isInService()) {
           // exclude corrupt replicas
           if (corruptNodes == null || !corruptNodes.contains(cur)) {
             nonExcess.add(storage);
@@ -3766,7 +3841,7 @@ public class BlockManager implements BlockStatsMXBean {
     return countNodes(b, false);
   }
 
-  private NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
+  NumberReplicas countNodes(BlockInfo b, boolean inStartupSafeMode) {
     NumberReplicas numberReplicas = new NumberReplicas();
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(b);
     if (b.isStriped()) {
@@ -3797,6 +3872,12 @@ public class BlockManager implements BlockStatsMXBean {
         s = StoredReplicaState.DECOMMISSIONING;
       } else if (node.isDecommissioned()) {
         s = StoredReplicaState.DECOMMISSIONED;
+      } else if (node.isMaintenance()) {
+        if (node.isInMaintenance() || !node.isAlive()) {
+          s = StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
+        } else {
+          s = StoredReplicaState.MAINTENANCE_FOR_READ;
+        }
       } else if (isExcess(node, b)) {
         s = StoredReplicaState.EXCESS;
       } else {
@@ -3868,11 +3949,11 @@ public class BlockManager implements BlockStatsMXBean {
   }
   
   /**
-   * On stopping decommission, check if the node has excess replicas.
+   * On putting the node in service, check if the node has excess replicas.
    * If there are any excess replicas, call processExtraRedundancyBlock().
    * Process extra redundancy blocks only when active NN is out of safe mode.
    */
-  void processExtraRedundancyBlocksOnReCommission(
+  void processExtraRedundancyBlocksOnInService(
       final DatanodeDescriptor srcNode) {
     if (!isPopulatingReplQueues()) {
       return;
@@ -3881,7 +3962,7 @@ public class BlockManager implements BlockStatsMXBean {
     int numExtraRedundancy = 0;
     while(it.hasNext()) {
       final BlockInfo block = it.next();
-      int expectedReplication = this.getRedundancy(block);
+      int expectedReplication = this.getExpectedRedundancyNum(block);
       NumberReplicas num = countNodes(block);
       if (shouldProcessExtraRedundancy(num, expectedReplication)) {
         // extra redundancy block
@@ -3891,14 +3972,15 @@ public class BlockManager implements BlockStatsMXBean {
       }
     }
     LOG.info("Invalidated " + numExtraRedundancy
-        + " extra redundancy blocks on " + srcNode + " during recommissioning");
+        + " extra redundancy blocks on " + srcNode + " after it is in service");
   }
 
   /**
-   * Returns whether a node can be safely decommissioned based on its 
-   * liveness. Dead nodes cannot always be safely decommissioned.
+   * Returns whether a node can be safely decommissioned or in maintenance
+   * based on its liveness. Dead nodes cannot always be safely decommissioned
+   * or in maintenance.
    */
-  boolean isNodeHealthyForDecommission(DatanodeDescriptor node) {
+  boolean isNodeHealthyForDecommissionOrMaintenance(DatanodeDescriptor node) {
     if (!node.checkBlockReportReceived()) {
       LOG.info("Node {} hasn't sent its first block report.", node);
       return false;
@@ -3912,17 +3994,18 @@ public class BlockManager implements BlockStatsMXBean {
     if (pendingReconstructionBlocksCount == 0 &&
         lowRedundancyBlocksCount == 0) {
       LOG.info("Node {} is dead and there are no low redundancy" +
-          " blocks or blocks pending reconstruction. Safe to decommission.",
-          node);
+          " blocks or blocks pending reconstruction. Safe to decommission or",
+          " put in maintenance.", node);
       return true;
     }
 
     LOG.warn("Node {} is dead " +
-        "while decommission is in progress. Cannot be safely " +
-        "decommissioned since there is risk of reduced " +
-        "data durability or data loss. Either restart the failed node or" +
-        " force decommissioning by removing, calling refreshNodes, " +
-        "then re-adding to the excludes files.", node);
+        "while in {}. Cannot be safely " +
+        "decommissioned or be in maintenance since there is risk of reduced " +
+        "data durability or data loss. Either restart the failed node or " +
+        "force decommissioning or maintenance by removing, calling " +
+        "refreshNodes, then re-adding to the excludes or host config files.",
+        node, node.getAdminState());
     return false;
   }
 
@@ -3990,17 +4073,16 @@ public class BlockManager implements BlockStatsMXBean {
       }
       NumberReplicas repl = countNodes(block);
       int pendingNum = pendingReconstruction.getNumReplicas(block);
-      int curExpectedReplicas = getRedundancy(block);
-      if (!hasEnoughEffectiveReplicas(block, repl, pendingNum,
-          curExpectedReplicas)) {
+      int curExpectedReplicas = getExpectedRedundancyNum(block);
+      if (!hasEnoughEffectiveReplicas(block, repl, pendingNum)) {
         neededReconstruction.update(block, repl.liveReplicas() + pendingNum,
-            repl.readOnlyReplicas(), repl.decommissionedAndDecommissioning(),
+            repl.readOnlyReplicas(), repl.outOfServiceReplicas(),
             curExpectedReplicas, curReplicasDelta, expectedReplicasDelta);
       } else {
         int oldReplicas = repl.liveReplicas() + pendingNum - curReplicasDelta;
         int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
         neededReconstruction.remove(block, oldReplicas, repl.readOnlyReplicas(),
-            repl.decommissionedAndDecommissioning(), oldExpectedReplicas);
+            repl.outOfServiceReplicas(), oldExpectedReplicas);
       }
     } finally {
       namesystem.writeUnlock();
@@ -4018,24 +4100,15 @@ public class BlockManager implements BlockStatsMXBean {
       short expected = getExpectedRedundancyNum(block);
       final NumberReplicas n = countNodes(block);
       final int pending = pendingReconstruction.getNumReplicas(block);
-      if (!hasEnoughEffectiveReplicas(block, n, pending, expected)) {
+      if (!hasEnoughEffectiveReplicas(block, n, pending)) {
         neededReconstruction.add(block, n.liveReplicas() + pending,
-            n.readOnlyReplicas(),
-            n.decommissionedAndDecommissioning(), expected);
+            n.readOnlyReplicas(), n.outOfServiceReplicas(), expected);
       } else if (shouldProcessExtraRedundancy(n, expected)) {
         processExtraRedundancyBlock(block, expected, null, null);
       }
     }
   }
 
-  /** 
-   * @return 0 if the block is not found;
-   *         otherwise, return the replication factor of the block.
-   */
-  private int getRedundancy(BlockInfo block) {
-    return getExpectedRedundancyNum(block);
-  }
-
   /**
    * Get blocks to invalidate for <i>nodeId</i>
    * in {@link #invalidateBlocks}.
@@ -4088,6 +4161,8 @@ public class BlockManager implements BlockStatsMXBean {
         .getNodes(storedBlock);
     for (DatanodeStorageInfo storage : blocksMap.getStorages(storedBlock)) {
       final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
+      // Nodes under maintenance should be counted as valid replicas from
+      // rack policy point of view.
       if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()
           && ((corruptNodes == null) || !corruptNodes.contains(cur))) {
         liveNodes.add(cur);
@@ -4102,14 +4177,36 @@ public class BlockManager implements BlockStatsMXBean {
         .isPlacementPolicySatisfied();
   }
 
+  boolean isNeededReconstructionForMaintenance(BlockInfo storedBlock,
+      NumberReplicas numberReplicas) {
+    return storedBlock.isComplete() && (numberReplicas.liveReplicas() <
+        getMinMaintenanceStorageNum(storedBlock) ||
+        !isPlacementPolicySatisfied(storedBlock));
+  }
+
+  boolean isNeededReconstruction(BlockInfo storedBlock,
+      NumberReplicas numberReplicas) {
+    return isNeededReconstruction(storedBlock, numberReplicas, 0);
+  }
+
   /**
    * A block needs reconstruction if the number of redundancies is less than
    * expected or if it does not have enough racks.
    */
-  boolean isNeededReconstruction(BlockInfo storedBlock, int current) {
-    int expected = getExpectedRedundancyNum(storedBlock);
-    return storedBlock.isComplete()
-        && (current < expected || !isPlacementPolicySatisfied(storedBlock));
+  boolean isNeededReconstruction(BlockInfo storedBlock,
+      NumberReplicas numberReplicas, int pending) {
+    return storedBlock.isComplete() &&
+        !hasEnoughEffectiveReplicas(storedBlock, numberReplicas, pending);
+  }
+
+  // Exclude maintenance, but make sure it has minimal live replicas
+  // to satisfy the maintenance requirement.
+  public short getExpectedLiveRedundancyNum(BlockInfo block,
+      NumberReplicas numberReplicas) {
+    final short expectedRedundancy = getExpectedRedundancyNum(block);
+    return (short)Math.max(expectedRedundancy -
+        numberReplicas.maintenanceReplicas(),
+        getMinMaintenanceStorageNum(block));
   }
 
   public short getExpectedRedundancyNum(BlockInfo block) {

+ 2 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java

@@ -833,8 +833,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
                          List<DatanodeStorageInfo> results,
                          boolean avoidStaleNodes) {
     // check if the node is (being) decommissioned
-    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
-      logNodeIsNotChosen(node, "the node is (being) decommissioned ");
+    if (!node.isInService()) {
+      logNodeIsNotChosen(node, "the node isn't in service.");
       return false;
     }
 

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java

@@ -682,7 +682,7 @@ public class CacheReplicationMonitor extends Thread implements Closeable {
       if (datanode == null) {
         continue;
       }
-      if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
+      if (!datanode.isInService()) {
         continue;
       }
       if (corrupt != null && corrupt.contains(datanode)) {

+ 21 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -146,8 +146,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   // Stores status of decommissioning.
   // If node is not decommissioning, do not use this object for anything.
-  public final DecommissioningStatus decommissioningStatus =
-      new DecommissioningStatus();
+  private final LeavingServiceStatus leavingServiceStatus =
+      new LeavingServiceStatus();
 
   private final Map<String, DatanodeStorageInfo> storageMap =
       new HashMap<>();
@@ -276,6 +276,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
     this.needKeyUpdate = needKeyUpdate;
   }
 
+  public LeavingServiceStatus getLeavingServiceStatus() {
+    return leavingServiceStatus;
+  }
+
   @VisibleForTesting
   public DatanodeStorageInfo getStorageInfo(String storageID) {
     synchronized (storageMap) {
@@ -729,51 +733,54 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return (this == obj) || super.equals(obj);
   }
 
-  /** Decommissioning status */
-  public class DecommissioningStatus {
+  /** Leaving service status. */
+  public class LeavingServiceStatus {
     private int underReplicatedBlocks;
-    private int decommissionOnlyReplicas;
+    private int outOfServiceOnlyReplicas;
     private int underReplicatedInOpenFiles;
     private long startTime;
     
     synchronized void set(int underRep,
         int onlyRep, int underConstruction) {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return;
       }
       underReplicatedBlocks = underRep;
-      decommissionOnlyReplicas = onlyRep;
+      outOfServiceOnlyReplicas = onlyRep;
       underReplicatedInOpenFiles = underConstruction;
     }
 
     /** @return the number of under-replicated blocks */
     public synchronized int getUnderReplicatedBlocks() {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
       return underReplicatedBlocks;
     }
-    /** @return the number of decommission-only replicas */
-    public synchronized int getDecommissionOnlyReplicas() {
-      if (!isDecommissionInProgress()) {
+    /** @return the number of blocks with out-of-service-only replicas */
+    public synchronized int getOutOfServiceOnlyReplicas() {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
-      return decommissionOnlyReplicas;
+      return outOfServiceOnlyReplicas;
     }
     /** @return the number of under-replicated blocks in open files */
     public synchronized int getUnderReplicatedInOpenFiles() {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
       return underReplicatedInOpenFiles;
     }
     /** Set start time */
     public synchronized void setStartTime(long time) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
+        return;
+      }
       startTime = time;
     }
     /** @return start time */
     public synchronized long getStartTime() {
-      if (!isDecommissionInProgress()) {
+      if (!isDecommissionInProgress() && !isEnteringMaintenance()) {
         return 0;
       }
       return startTime;

+ 33 - 14
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -388,8 +388,8 @@ public class DatanodeManager {
   public void sortLocatedBlocks(final String targetHost,
       final List<LocatedBlock> locatedBlocks) {
     Comparator<DatanodeInfo> comparator = avoidStaleDataNodesForRead ?
-        new DFSUtil.DecomStaleComparator(staleInterval) :
-        DFSUtil.DECOM_COMPARATOR;
+        new DFSUtil.ServiceAndStaleComparator(staleInterval) :
+        new DFSUtil.ServiceComparator();
     // sort located block
     for (LocatedBlock lb : locatedBlocks) {
       if (lb.isStriped()) {
@@ -632,9 +632,20 @@ public class DatanodeManager {
    * @param nodeInfo datanode descriptor.
    */
   private void removeDatanode(DatanodeDescriptor nodeInfo) {
+    removeDatanode(nodeInfo, true);
+  }
+
+  /**
+   * Remove a datanode descriptor.
+   * @param nodeInfo datanode descriptor.
+   */
+  private void removeDatanode(DatanodeDescriptor nodeInfo,
+      boolean removeBlocksFromBlocksMap) {
     assert namesystem.hasWriteLock();
     heartbeatManager.removeDatanode(nodeInfo);
-    blockManager.removeBlocksAssociatedTo(nodeInfo);
+    if (removeBlocksFromBlocksMap) {
+      blockManager.removeBlocksAssociatedTo(nodeInfo);
+    }
     networktopology.remove(nodeInfo);
     decrementVersionCount(nodeInfo.getSoftwareVersion());
     blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
@@ -655,7 +666,7 @@ public class DatanodeManager {
     try {
       final DatanodeDescriptor descriptor = getDatanode(node);
       if (descriptor != null) {
-        removeDatanode(descriptor);
+        removeDatanode(descriptor, true);
       } else {
         NameNode.stateChangeLog.warn("BLOCK* removeDatanode: "
                                      + node + " does not exist");
@@ -666,7 +677,8 @@ public class DatanodeManager {
   }
 
   /** Remove a dead datanode. */
-  void removeDeadDatanode(final DatanodeID nodeID) {
+  void removeDeadDatanode(final DatanodeID nodeID,
+      boolean removeBlocksFromBlockMap) {
     DatanodeDescriptor d;
     try {
       d = getDatanode(nodeID);
@@ -675,8 +687,9 @@ public class DatanodeManager {
     }
     if (d != null && isDatanodeDead(d)) {
       NameNode.stateChangeLog.info(
-          "BLOCK* removeDeadDatanode: lost heartbeat from " + d);
-      removeDatanode(d);
+          "BLOCK* removeDeadDatanode: lost heartbeat from " + d
+              + ", removeBlocksFromBlockMap " + removeBlocksFromBlockMap);
+      removeDatanode(d, removeBlocksFromBlockMap);
     }
   }
 
@@ -1112,10 +1125,16 @@ public class DatanodeManager {
   }
   
   /**
-   * 1. Added to hosts  --> no further work needed here.
-   * 2. Removed from hosts --> mark AdminState as decommissioned. 
-   * 3. Added to exclude --> start decommission.
-   * 4. Removed from exclude --> stop decommission.
+   * Reload datanode membership and the desired admin operations from
+   * host files. If a node isn't allowed, hostConfigManager.isIncluded returns
+   * false and the node can't be used.
+   * If a node is allowed and the desired admin operation is defined,
+   * it will transition to the desired admin state.
+   * If a node is allowed and upgrade domain is defined,
+   * the upgrade domain will be set on the node.
+   * To use maintenance mode or upgrade domain, set
+   * DFS_NAMENODE_HOSTS_PROVIDER_CLASSNAME_KEY to
+   * CombinedHostFileManager.class.
    */
   private void refreshDatanodes() {
     final Map<String, DatanodeDescriptor> copy;
@@ -1125,17 +1144,17 @@ public class DatanodeManager {
     for (DatanodeDescriptor node : copy.values()) {
       // Check if not include.
       if (!hostConfigManager.isIncluded(node)) {
-        node.setDisallowed(true); // case 2.
+        node.setDisallowed(true);
       } else {
         long maintenanceExpireTimeInMS =
             hostConfigManager.getMaintenanceExpirationTimeInMS(node);
         if (node.maintenanceNotExpired(maintenanceExpireTimeInMS)) {
           decomManager.startMaintenance(node, maintenanceExpireTimeInMS);
         } else if (hostConfigManager.isExcluded(node)) {
-          decomManager.startDecommission(node); // case 3.
+          decomManager.startDecommission(node);
         } else {
           decomManager.stopMaintenance(node);
-          decomManager.stopDecommission(node); // case 4.
+          decomManager.stopDecommission(node);
         }
       }
       node.setUpgradeDomain(hostConfigManager.getUpgradeDomain(node));

+ 100 - 42
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java

@@ -201,7 +201,7 @@ public class DecommissionManager {
           LOG.info("Starting decommission of {} {} with {} blocks",
               node, storage, storage.numBlocks());
         }
-        node.decommissioningStatus.setStartTime(monotonicNow());
+        node.getLeavingServiceStatus().setStartTime(monotonicNow());
         pendingNodes.add(node);
       }
     } else {
@@ -222,7 +222,7 @@ public class DecommissionManager {
       // extra redundancy blocks will be detected and processed when
       // the dead node comes back and send in its full block report.
       if (node.isAlive()) {
-        blockManager.processExtraRedundancyBlocksOnReCommission(node);
+        blockManager.processExtraRedundancyBlocksOnInService(node);
       }
       // Remove from tracking in DecommissionManager
       pendingNodes.remove(node);
@@ -246,6 +246,16 @@ public class DecommissionManager {
     if (!node.isMaintenance()) {
       // Update DN stats maintained by HeartbeatManager
       hbManager.startMaintenance(node);
+      // hbManager.startMaintenance will set dead node to IN_MAINTENANCE.
+      if (node.isEnteringMaintenance()) {
+        for (DatanodeStorageInfo storage : node.getStorageInfos()) {
+          LOG.info("Starting maintenance of {} {} with {} blocks",
+              node, storage, storage.numBlocks());
+        }
+        node.getLeavingServiceStatus().setStartTime(monotonicNow());
+      }
+      // Track the node regardless whether it is ENTERING_MAINTENANCE or
+      // IN_MAINTENANCE to support maintenance expiration.
       pendingNodes.add(node);
     } else {
       LOG.trace("startMaintenance: Node {} in {}, nothing to do." +
@@ -264,8 +274,34 @@ public class DecommissionManager {
       // Update DN stats maintained by HeartbeatManager
       hbManager.stopMaintenance(node);
 
-      // TODO HDFS-9390 remove replicas from block maps
-      // or handle over replicated blocks.
+      // extra redundancy blocks will be detected and processed when
+      // the dead node comes back and send in its full block report.
+      if (!node.isAlive()) {
+        // The node became dead when it was in maintenance, at which point
+        // the replicas weren't removed from block maps.
+        // When the node leaves maintenance, the replicas should be removed
+        // from the block maps to trigger the necessary replication to
+        // maintain the safety property of "# of live replicas + maintenance
+        // replicas" >= the expected redundancy.
+        blockManager.removeBlocksAssociatedTo(node);
+      } else {
+        // Even though putting nodes in maintenance node doesn't cause live
+        // replicas to match expected replication factor, it is still possible
+        // to have over replicated when the node leaves maintenance node.
+        // First scenario:
+        // a. Node became dead when it is at AdminStates.NORMAL, thus
+        //    block is replicated so that 3 replicas exist on other nodes.
+        // b. Admins put the dead node into maintenance mode and then
+        //    have the node rejoin the cluster.
+        // c. Take the node out of maintenance mode.
+        // Second scenario:
+        // a. With replication factor 3, set one replica to maintenance node,
+        //    thus block has 1 maintenance replica and 2 live replicas.
+        // b. Change the replication factor to 2. The block will still have
+        //    1 maintenance replica and 2 live replicas.
+        // c. Take the node out of maintenance mode.
+        blockManager.processExtraRedundancyBlocksOnInService(node);
+      }
 
       // Remove from tracking in DecommissionManager
       pendingNodes.remove(node);
@@ -281,6 +317,11 @@ public class DecommissionManager {
     LOG.info("Decommissioning complete for node {}", dn);
   }
 
+  private void setInMaintenance(DatanodeDescriptor dn) {
+    dn.setInMaintenance();
+    LOG.info("Node {} has entered maintenance mode.", dn);
+  }
+
   /**
    * Checks whether a block is sufficiently replicated/stored for
    * decommissioning. For replicated blocks or striped blocks, full-strength
@@ -288,20 +329,21 @@ public class DecommissionManager {
    * @return true if sufficient, else false.
    */
   private boolean isSufficient(BlockInfo block, BlockCollection bc,
-      NumberReplicas numberReplicas) {
-    final int numExpected = blockManager.getExpectedRedundancyNum(block);
-    final int numLive = numberReplicas.liveReplicas();
-    if (numLive >= numExpected
-        && blockManager.isPlacementPolicySatisfied(block)) {
+      NumberReplicas numberReplicas, boolean isDecommission) {
+    if (blockManager.hasEnoughEffectiveReplicas(block, numberReplicas, 0)) {
       // Block has enough replica, skip
       LOG.trace("Block {} does not need replication.", block);
       return true;
     }
 
+    final int numExpected = blockManager.getExpectedLiveRedundancyNum(block,
+        numberReplicas);
+    final int numLive = numberReplicas.liveReplicas();
+
     // Block is under-replicated
-    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected, 
+    LOG.trace("Block {} numExpected={}, numLive={}", block, numExpected,
         numLive);
-    if (numExpected > numLive) {
+    if (isDecommission && numExpected > numLive) {
       if (bc.isUnderConstruction() && block.equals(bc.getLastBlock())) {
         // Can decom a UC block as long as there will still be minReplicas
         if (blockManager.hasMinStorage(block, numLive)) {
@@ -346,11 +388,16 @@ public class DecommissionManager {
         + ", corrupt replicas: " + num.corruptReplicas()
         + ", decommissioned replicas: " + num.decommissioned()
         + ", decommissioning replicas: " + num.decommissioning()
+        + ", maintenance replicas: " + num.maintenanceReplicas()
+        + ", live entering maintenance replicas: "
+        + num.liveEnteringMaintenanceReplicas()
         + ", excess replicas: " + num.excessReplicas()
         + ", Is Open File: " + bc.isUnderConstruction()
         + ", Datanodes having this block: " + nodeList + ", Current Datanode: "
         + srcNode + ", Is current datanode decommissioning: "
-        + srcNode.isDecommissionInProgress());
+        + srcNode.isDecommissionInProgress() +
+        ", Is current datanode entering maintenance: "
+        + srcNode.isEnteringMaintenance());
   }
 
   @VisibleForTesting
@@ -424,7 +471,7 @@ public class DecommissionManager {
       numBlocksChecked = 0;
       numBlocksCheckedPerLock = 0;
       numNodesChecked = 0;
-      // Check decom progress
+      // Check decommission or maintenance progress.
       namesystem.writeLock();
       try {
         processPendingNodes();
@@ -464,15 +511,14 @@ public class DecommissionManager {
         final DatanodeDescriptor dn = entry.getKey();
         AbstractList<BlockInfo> blocks = entry.getValue();
         boolean fullScan = false;
-        if (dn.isMaintenance()) {
-          // TODO HDFS-9390 make sure blocks are minimally replicated
-          // before transitioning the node to IN_MAINTENANCE state.
-
+        if (dn.isMaintenance() && dn.maintenanceExpired()) {
           // If maintenance expires, stop tracking it.
-          if (dn.maintenanceExpired()) {
-            stopMaintenance(dn);
-            toRemove.add(dn);
-          }
+          stopMaintenance(dn);
+          toRemove.add(dn);
+          continue;
+        }
+        if (dn.isInMaintenance()) {
+          // The dn is IN_MAINTENANCE and the maintenance hasn't expired yet.
           continue;
         }
         if (blocks == null) {
@@ -487,7 +533,7 @@ public class DecommissionManager {
         } else {
           // This is a known datanode, check if its # of insufficiently 
           // replicated blocks has dropped to zero and if it can be decommed
-          LOG.debug("Processing decommission-in-progress node {}", dn);
+          LOG.debug("Processing {} node {}", dn.getAdminState(), dn);
           pruneReliableBlocks(dn, blocks);
         }
         if (blocks.size() == 0) {
@@ -506,22 +552,31 @@ public class DecommissionManager {
           // If the full scan is clean AND the node liveness is okay, 
           // we can finally mark as decommissioned.
           final boolean isHealthy =
-              blockManager.isNodeHealthyForDecommission(dn);
+              blockManager.isNodeHealthyForDecommissionOrMaintenance(dn);
           if (blocks.size() == 0 && isHealthy) {
-            setDecommissioned(dn);
-            toRemove.add(dn);
+            if (dn.isDecommissionInProgress()) {
+              setDecommissioned(dn);
+              toRemove.add(dn);
+            } else if (dn.isEnteringMaintenance()) {
+              // IN_MAINTENANCE node remains in the outOfServiceNodeBlocks to
+              // to track maintenance expiration.
+              setInMaintenance(dn);
+            } else {
+              Preconditions.checkState(false,
+                  "A node is in an invalid state!");
+            }
             LOG.debug("Node {} is sufficiently replicated and healthy, "
-                + "marked as decommissioned.", dn);
+                + "marked as {}.", dn.getAdminState());
           } else {
             LOG.debug("Node {} {} healthy."
                 + " It needs to replicate {} more blocks."
-                + " Decommissioning is still in progress.",
-                dn, isHealthy? "is": "isn't", blocks.size());
+                + " {} is still in progress.", dn,
+                isHealthy? "is": "isn't", blocks.size(), dn.getAdminState());
           }
         } else {
           LOG.debug("Node {} still has {} blocks to replicate "
-                  + "before it is a candidate to finish decommissioning.",
-              dn, blocks.size());
+              + "before it is a candidate to finish {}.",
+              dn, blocks.size(), dn.getAdminState());
         }
         iterkey = dn;
       }
@@ -539,7 +594,7 @@ public class DecommissionManager {
      */
     private void pruneReliableBlocks(final DatanodeDescriptor datanode,
         AbstractList<BlockInfo> blocks) {
-      processBlocksForDecomInternal(datanode, blocks.iterator(), null, true);
+      processBlocksInternal(datanode, blocks.iterator(), null, true);
     }
 
     /**
@@ -554,7 +609,7 @@ public class DecommissionManager {
     private AbstractList<BlockInfo> handleInsufficientlyStored(
         final DatanodeDescriptor datanode) {
       AbstractList<BlockInfo> insufficient = new ChunkedArrayList<>();
-      processBlocksForDecomInternal(datanode, datanode.getBlockIterator(),
+      processBlocksInternal(datanode, datanode.getBlockIterator(),
           insufficient, false);
       return insufficient;
     }
@@ -573,14 +628,14 @@ public class DecommissionManager {
      * @param pruneReliableBlocks         whether to remove blocks reliable
      *                                    enough from the iterator
      */
-    private void processBlocksForDecomInternal(
+    private void processBlocksInternal(
         final DatanodeDescriptor datanode,
         final Iterator<BlockInfo> it,
         final List<BlockInfo> insufficientList,
         boolean pruneReliableBlocks) {
       boolean firstReplicationLog = true;
       int lowRedundancyBlocks = 0;
-      int decommissionOnlyReplicas = 0;
+      int outOfServiceOnlyReplicas = 0;
       int lowRedundancyInOpenFiles = 0;
       while (it.hasNext()) {
         if (insufficientList == null
@@ -626,21 +681,25 @@ public class DecommissionManager {
 
         // Schedule low redundancy blocks for reconstruction if not already
         // pending
-        if (blockManager.isNeededReconstruction(block, liveReplicas)) {
+        boolean isDecommission = datanode.isDecommissionInProgress();
+        boolean neededReconstruction = isDecommission ?
+            blockManager.isNeededReconstruction(block, num) :
+            blockManager.isNeededReconstructionForMaintenance(block, num);
+        if (neededReconstruction) {
           if (!blockManager.neededReconstruction.contains(block) &&
               blockManager.pendingReconstruction.getNumReplicas(block) == 0 &&
               blockManager.isPopulatingReplQueues()) {
             // Process these blocks only when active NN is out of safe mode.
             blockManager.neededReconstruction.add(block,
                 liveReplicas, num.readOnlyReplicas(),
-                num.decommissionedAndDecommissioning(),
+                num.outOfServiceReplicas(),
                 blockManager.getExpectedRedundancyNum(block));
           }
         }
 
         // Even if the block is without sufficient redundancy,
         // it doesn't block decommission if has sufficient redundancy
-        if (isSufficient(block, bc, num)) {
+        if (isSufficient(block, bc, num, isDecommission)) {
           if (pruneReliableBlocks) {
             it.remove();
           }
@@ -662,14 +721,13 @@ public class DecommissionManager {
         if (bc.isUnderConstruction()) {
           lowRedundancyInOpenFiles++;
         }
-        if ((liveReplicas == 0) && (num.decommissionedAndDecommissioning() > 0)) {
-          decommissionOnlyReplicas++;
+        if ((liveReplicas == 0) && (num.outOfServiceReplicas() > 0)) {
+          outOfServiceOnlyReplicas++;
         }
       }
 
-      datanode.decommissioningStatus.set(lowRedundancyBlocks,
-          decommissionOnlyReplicas,
-          lowRedundancyInOpenFiles);
+      datanode.getLeavingServiceStatus().set(lowRedundancyBlocks,
+          outOfServiceOnlyReplicas, lowRedundancyInOpenFiles);
     }
   }
 

+ 10 - 6
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ErasureCodingWork.java

@@ -130,12 +130,14 @@ class ErasureCodingWork extends BlockReconstructionWork {
       // we only need to replicate one internal block to a new rack
       int sourceIndex = chooseSource4SimpleReplication();
       createReplicationWork(sourceIndex, targets[0]);
-    } else if (numberReplicas.decommissioning() > 0 && hasAllInternalBlocks()) {
-      List<Integer> decommissioningSources = findDecommissioningSources();
+    } else if ((numberReplicas.decommissioning() > 0 ||
+        numberReplicas.liveEnteringMaintenanceReplicas() > 0) &&
+        hasAllInternalBlocks()) {
+      List<Integer> leavingServiceSources = findLeavingServiceSources();
       // decommissioningSources.size() should be >= targets.length
-      final int num = Math.min(decommissioningSources.size(), targets.length);
+      final int num = Math.min(leavingServiceSources.size(), targets.length);
       for (int i = 0; i < num; i++) {
-        createReplicationWork(decommissioningSources.get(i), targets[i]);
+        createReplicationWork(leavingServiceSources.get(i), targets[i]);
       }
     } else {
       targets[0].getDatanodeDescriptor().addBlockToBeErasureCoded(
@@ -162,10 +164,12 @@ class ErasureCodingWork extends BlockReconstructionWork {
     }
   }
 
-  private List<Integer> findDecommissioningSources() {
+  private List<Integer> findLeavingServiceSources() {
     List<Integer> srcIndices = new ArrayList<>();
     for (int i = 0; i < getSrcNodes().length; i++) {
-      if (getSrcNodes()[i].isDecommissionInProgress()) {
+      if (getSrcNodes()[i].isDecommissionInProgress() ||
+          (getSrcNodes()[i].isEnteringMaintenance() &&
+          getSrcNodes()[i].isAlive())) {
         srcIndices.add(i);
       }
     }

+ 13 - 10
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

@@ -25,10 +25,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
-import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
 import org.apache.hadoop.util.Daemon;
@@ -269,13 +266,19 @@ class HeartbeatManager implements DatanodeStatistics {
     if (!node.isAlive()) {
       LOG.info("Dead node {} is put in maintenance state immediately.", node);
       node.setInMaintenance();
-    } else if (node.isDecommissioned()) {
-      LOG.info("Decommissioned node " + node + " is put in maintenance state"
-          + " immediately.");
-      node.setInMaintenance();
     } else {
       stats.subtract(node);
-      node.startMaintenance();
+      if (node.isDecommissioned()) {
+        LOG.info("Decommissioned node " + node + " is put in maintenance state"
+            + " immediately.");
+        node.setInMaintenance();
+      } else if (blockManager.getMinReplicationToBeInMaintenance() == 0) {
+        LOG.info("MinReplicationToBeInMaintenance is set to zero. " + node +
+            " is put in maintenance state" + " immediately.");
+        node.setInMaintenance();
+      } else {
+        node.startMaintenance();
+      }
       stats.add(node);
     }
   }
@@ -352,7 +355,7 @@ class HeartbeatManager implements DatanodeStatistics {
     boolean allAlive = false;
     while (!allAlive) {
       // locate the first dead node.
-      DatanodeID dead = null;
+      DatanodeDescriptor dead = null;
 
       // locate the first failed storage that isn't on a dead node.
       DatanodeStorageInfo failedStorage = null;
@@ -401,7 +404,7 @@ class HeartbeatManager implements DatanodeStatistics {
         // acquire the fsnamesystem lock, and then remove the dead node.
         namesystem.writeLock();
         try {
-          dm.removeDeadDatanode(dead);
+          dm.removeDeadDatanode(dead, !dead.isMaintenance());
         } finally {
           namesystem.writeUnlock();
         }

+ 21 - 26
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/LowRedundancyBlocks.java

@@ -155,7 +155,7 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
   private int getPriority(BlockInfo block,
                           int curReplicas,
                           int readOnlyReplicas,
-                          int decommissionedReplicas,
+                          int outOfServiceReplicas,
                           int expectedReplicas) {
     assert curReplicas >= 0 : "Negative replicas!";
     if (curReplicas >= expectedReplicas) {
@@ -164,20 +164,20 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
     }
     if (block.isStriped()) {
       BlockInfoStriped sblk = (BlockInfoStriped) block;
-      return getPriorityStriped(curReplicas, decommissionedReplicas,
+      return getPriorityStriped(curReplicas, outOfServiceReplicas,
           sblk.getRealDataBlockNum(), sblk.getParityBlockNum());
     } else {
       return getPriorityContiguous(curReplicas, readOnlyReplicas,
-          decommissionedReplicas, expectedReplicas);
+          outOfServiceReplicas, expectedReplicas);
     }
   }
 
   private int getPriorityContiguous(int curReplicas, int readOnlyReplicas,
-      int decommissionedReplicas, int expectedReplicas) {
+      int outOfServiceReplicas, int expectedReplicas) {
     if (curReplicas == 0) {
       // If there are zero non-decommissioned replicas but there are
-      // some decommissioned replicas, then assign them highest priority
-      if (decommissionedReplicas > 0) {
+      // some out of service replicas, then assign them highest priority
+      if (outOfServiceReplicas > 0) {
         return QUEUE_HIGHEST_PRIORITY;
       }
       if (readOnlyReplicas > 0) {
@@ -201,11 +201,11 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
     }
   }
 
-  private int getPriorityStriped(int curReplicas, int decommissionedReplicas,
+  private int getPriorityStriped(int curReplicas, int outOfServiceReplicas,
       short dataBlkNum, short parityBlkNum) {
     if (curReplicas < dataBlkNum) {
       // There are some replicas on decommissioned nodes so it's not corrupted
-      if (curReplicas + decommissionedReplicas >= dataBlkNum) {
+      if (curReplicas + outOfServiceReplicas >= dataBlkNum) {
         return QUEUE_HIGHEST_PRIORITY;
       }
       return QUEUE_WITH_CORRUPT_BLOCKS;
@@ -227,18 +227,15 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
    *
    * @param block a low redundancy block
    * @param curReplicas current number of replicas of the block
-   * @param decomissionedReplicas the number of decommissioned replicas
+   * @param outOfServiceReplicas the number of out-of-service replicas
    * @param expectedReplicas expected number of replicas of the block
    * @return true if the block was added to a queue.
    */
   synchronized boolean add(BlockInfo block,
-                           int curReplicas,
-                           int readOnlyReplicas,
-                           int decomissionedReplicas,
-                           int expectedReplicas) {
-    assert curReplicas >= 0 : "Negative replicas!";
+      int curReplicas, int readOnlyReplicas,
+      int outOfServiceReplicas, int expectedReplicas) {
     final int priLevel = getPriority(block, curReplicas, readOnlyReplicas,
-        decomissionedReplicas, expectedReplicas);
+        outOfServiceReplicas, expectedReplicas);
     if(priorityQueues.get(priLevel).add(block)) {
       if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
           expectedReplicas == 1) {
@@ -257,12 +254,10 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
 
   /** Remove a block from a low redundancy queue. */
   synchronized boolean remove(BlockInfo block,
-                              int oldReplicas,
-                              int oldReadOnlyReplicas,
-                              int decommissionedReplicas,
-                              int oldExpectedReplicas) {
+      int oldReplicas, int oldReadOnlyReplicas,
+      int outOfServiceReplicas, int oldExpectedReplicas) {
     final int priLevel = getPriority(block, oldReplicas, oldReadOnlyReplicas,
-        decommissionedReplicas, oldExpectedReplicas);
+        outOfServiceReplicas, oldExpectedReplicas);
     boolean removedBlock = remove(block, priLevel);
     if (priLevel == QUEUE_WITH_CORRUPT_BLOCKS &&
         oldExpectedReplicas == 1 &&
@@ -325,22 +320,22 @@ class LowRedundancyBlocks implements Iterable<BlockInfo> {
    * method call.
    * @param block a low redundancy block
    * @param curReplicas current number of replicas of the block
-   * @param decommissionedReplicas  the number of decommissioned replicas
+   * @param outOfServiceReplicas  the number of out-of-service replicas
    * @param curExpectedReplicas expected number of replicas of the block
    * @param curReplicasDelta the change in the replicate count from before
    * @param expectedReplicasDelta the change in the expected replica count
    *        from before
    */
   synchronized void update(BlockInfo block, int curReplicas,
-                           int readOnlyReplicas, int decommissionedReplicas,
-                           int curExpectedReplicas,
-                           int curReplicasDelta, int expectedReplicasDelta) {
+      int readOnlyReplicas, int outOfServiceReplicas,
+      int curExpectedReplicas,
+      int curReplicasDelta, int expectedReplicasDelta) {
     int oldReplicas = curReplicas-curReplicasDelta;
     int oldExpectedReplicas = curExpectedReplicas-expectedReplicasDelta;
     int curPri = getPriority(block, curReplicas, readOnlyReplicas,
-        decommissionedReplicas, curExpectedReplicas);
+        outOfServiceReplicas, curExpectedReplicas);
     int oldPri = getPriority(block, oldReplicas, readOnlyReplicas,
-        decommissionedReplicas, oldExpectedReplicas);
+        outOfServiceReplicas, oldExpectedReplicas);
     if(NameNode.stateChangeLog.isDebugEnabled()) {
       NameNode.stateChangeLog.debug("LowRedundancyBlocks.update " +
         block +

+ 28 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/NumberReplicas.java

@@ -24,9 +24,11 @@ import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.Store
 import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.DECOMMISSIONING;
 import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.EXCESS;
 import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.LIVE;
+import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_FOR_READ;
+import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.MAINTENANCE_NOT_FOR_READ;
+import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY;
 import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.REDUNDANT;
 import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.STALESTORAGE;
-import static org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas.StoredReplicaState.READONLY;
 
 /**
  * A immutable object that stores the number of live replicas and
@@ -41,6 +43,14 @@ public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaSta
     READONLY,
     DECOMMISSIONING,
     DECOMMISSIONED,
+    // We need live ENTERING_MAINTENANCE nodes to continue
+    // to serve read request while it is being transitioned to live
+    // IN_MAINTENANCE if these are the only replicas left.
+    // MAINTENANCE_NOT_FOR_READ == maintenanceReplicas -
+    // Live ENTERING_MAINTENANCE.
+    MAINTENANCE_NOT_FOR_READ,
+    // Live ENTERING_MAINTENANCE nodes to serve read requests.
+    MAINTENANCE_FOR_READ,
     CORRUPT,
     // excess replicas already tracked by blockmanager's excess map
     EXCESS,
@@ -106,4 +116,20 @@ public class NumberReplicas extends EnumCounters<NumberReplicas.StoredReplicaSta
   public int redundantInternalBlocks() {
     return (int) get(REDUNDANT);
   }
-} 
+
+  public int maintenanceNotForReadReplicas() {
+    return (int) get(MAINTENANCE_NOT_FOR_READ);
+  }
+
+  public int maintenanceReplicas() {
+    return (int) (get(MAINTENANCE_NOT_FOR_READ) + get(MAINTENANCE_FOR_READ));
+  }
+
+  public int outOfServiceReplicas() {
+    return maintenanceReplicas() + decommissionedAndDecommissioning();
+  }
+
+  public int liveEnteringMaintenanceReplicas() {
+    return (int)get(MAINTENANCE_FOR_READ);
+  }
+}

+ 4 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/StorageTypeStats.java

@@ -81,7 +81,7 @@ public class StorageTypeStats {
       final DatanodeDescriptor node) {
     capacityUsed += info.getDfsUsed();
     blockPoolUsed += info.getBlockPoolUsed();
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       capacityTotal += info.getCapacity();
       capacityRemaining += info.getRemaining();
     } else {
@@ -90,7 +90,7 @@ public class StorageTypeStats {
   }
 
   void addNode(final DatanodeDescriptor node) {
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       nodesInService++;
     }
   }
@@ -99,7 +99,7 @@ public class StorageTypeStats {
       final DatanodeDescriptor node) {
     capacityUsed -= info.getDfsUsed();
     blockPoolUsed -= info.getBlockPoolUsed();
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       capacityTotal -= info.getCapacity();
       capacityRemaining -= info.getRemaining();
     } else {
@@ -108,7 +108,7 @@ public class StorageTypeStats {
   }
 
   void subtractNode(final DatanodeDescriptor node) {
-    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+    if (node.isInService()) {
       nodesInService--;
     }
   }

+ 5 - 4
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java

@@ -5462,11 +5462,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
           .<String, Object> builder()
           .put("xferaddr", node.getXferAddr())
           .put("underReplicatedBlocks",
-              node.decommissioningStatus.getUnderReplicatedBlocks())
+          node.getLeavingServiceStatus().getUnderReplicatedBlocks())
+           // TODO use another property name for outOfServiceOnlyReplicas.
           .put("decommissionOnlyReplicas",
-              node.decommissioningStatus.getDecommissionOnlyReplicas())
+          node.getLeavingServiceStatus().getOutOfServiceOnlyReplicas())
           .put("underReplicateInOpenFiles",
-              node.decommissioningStatus.getUnderReplicatedInOpenFiles())
+          node.getLeavingServiceStatus().getUnderReplicatedInOpenFiles())
           .build();
       info.put(node.getHostName() + ":" + node.getXferPort(), innerinfo);
     }
@@ -5528,7 +5529,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
     blockManager.getDatanodeManager().fetchDatanodes(live, null, true);
     for (Iterator<DatanodeDescriptor> it = live.iterator(); it.hasNext();) {
       DatanodeDescriptor node = it.next();
-      if (node.isDecommissionInProgress() || node.isDecommissioned()) {
+      if (!node.isInService()) {
         it.remove();
       }
     }

+ 7 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

@@ -547,6 +547,13 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.maintenance.replication.min</name>
+  <value>1</value>
+  <description>Minimal live block replication in existence of maintenance mode.
+  </description>
+</property>
+
 <property>
   <name>dfs.namenode.safemode.replication.min</name>
   <value></value>

+ 14 - 6
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/AdminStatesBaseTest.java

@@ -102,6 +102,7 @@ public class AdminStatesBaseTest {
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 1);
 
     hostsFileWriter.initialize(conf, "temp/admin");
+
   }
 
   @After
@@ -110,17 +111,22 @@ public class AdminStatesBaseTest {
     shutdownCluster();
   }
 
-  protected void writeFile(FileSystem fileSys, Path name, int repl)
+  static public FSDataOutputStream writeIncompleteFile(FileSystem fileSys,
+      Path name, short repl, short numOfBlocks) throws IOException {
+    return writeFile(fileSys, name, repl, numOfBlocks, false);
+  }
+
+  static protected void writeFile(FileSystem fileSys, Path name, int repl)
       throws IOException {
     writeFile(fileSys, name, repl, 2);
   }
 
-  protected void writeFile(FileSystem fileSys, Path name, int repl,
+  static protected void writeFile(FileSystem fileSys, Path name, int repl,
       int numOfBlocks) throws IOException {
     writeFile(fileSys, name, repl, numOfBlocks, true);
   }
 
-  protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
+  static protected FSDataOutputStream writeFile(FileSystem fileSys, Path name,
       int repl, int numOfBlocks, boolean completeFile)
     throws IOException {
     // create and write a file that contains two blocks of data
@@ -136,6 +142,7 @@ public class AdminStatesBaseTest {
       stm.close();
       return null;
     } else {
+      stm.flush();
       // Do not close stream, return it
       // so that it is not garbage collected
       return stm;
@@ -353,7 +360,7 @@ public class AdminStatesBaseTest {
 
   protected void shutdownCluster() {
     if (cluster != null) {
-      cluster.shutdown();
+      cluster.shutdown(true);
     }
   }
 
@@ -362,12 +369,13 @@ public class AdminStatesBaseTest {
         refreshNodes(conf);
   }
 
-  protected DatanodeDescriptor getDatanodeDesriptor(
+  static private DatanodeDescriptor getDatanodeDesriptor(
       final FSNamesystem ns, final String datanodeUuid) {
     return ns.getBlockManager().getDatanodeManager().getDatanode(datanodeUuid);
   }
 
-  protected void cleanupFile(FileSystem fileSys, Path name) throws IOException {
+  static public void cleanupFile(FileSystem fileSys, Path name)
+      throws IOException {
     assertTrue(fileSys.exists(name));
     fileSys.delete(name, true);
     assertTrue(!fileSys.exists(name));

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDecommission.java

@@ -484,7 +484,7 @@ public class TestDecommission extends AdminStatesBaseTest {
       shutdownCluster();
     }
   }
-  
+
   /**
    * Tests cluster storage statistics during decommissioning for non
    * federated cluster

+ 692 - 83
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestMaintenanceState.java

@@ -18,13 +18,19 @@
 package org.apache.hadoop.hdfs;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
@@ -32,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.util.Time;
 import org.junit.Test;
@@ -40,13 +48,23 @@ import org.junit.Test;
  * This class tests node maintenance.
  */
 public class TestMaintenanceState extends AdminStatesBaseTest {
-  public static final Log LOG = LogFactory.getLog(TestMaintenanceState.class);
-  static private final long EXPIRATION_IN_MS = 500;
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestMaintenanceState.class);
+  static private final long EXPIRATION_IN_MS = 50;
+  private int minMaintenanceR =
+      DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_DEFAULT;
 
   public TestMaintenanceState() {
     setUseCombinedHostFileManager();
   }
 
+  void setMinMaintenanceR(int minMaintenanceR) {
+    this.minMaintenanceR = minMaintenanceR;
+    getConf().setInt(
+        DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
+        minMaintenanceR);
+  }
+
   /**
    * Verify a node can transition from AdminStates.ENTERING_MAINTENANCE to
    * AdminStates.NORMAL.
@@ -55,21 +73,25 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
   public void testTakeNodeOutOfEnteringMaintenance() throws Exception {
     LOG.info("Starting testTakeNodeOutOfEnteringMaintenance");
     final int replicas = 1;
-    final int numNamenodes = 1;
-    final int numDatanodes = 1;
-    final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
+    final Path file = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
 
-    startCluster(numNamenodes, numDatanodes);
+    startCluster(1, 1);
 
-    FileSystem fileSys = getCluster().getFileSystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+    writeFile(fileSys, file, replicas, 1);
 
-    DatanodeInfo nodeOutofService = takeNodeOutofService(0,
+    final DatanodeInfo nodeOutofService = takeNodeOutofService(0,
         null, Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE);
 
+    // When node is in ENTERING_MAINTENANCE state, it can still serve read
+    // requests
+    assertNull(checkWithRetry(ns, fileSys, file, replicas, null,
+        nodeOutofService));
+
     putNodeInService(0, nodeOutofService.getDatanodeUuid());
 
-    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file);
   }
 
   /**
@@ -80,23 +102,21 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
   public void testEnteringMaintenanceExpiration() throws Exception {
     LOG.info("Starting testEnteringMaintenanceExpiration");
     final int replicas = 1;
-    final int numNamenodes = 1;
-    final int numDatanodes = 1;
-    final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
+    final Path file = new Path("/testEnteringMaintenanceExpiration.dat");
 
-    startCluster(numNamenodes, numDatanodes);
+    startCluster(1, 1);
 
-    FileSystem fileSys = getCluster().getFileSystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    writeFile(fileSys, file, replicas, 1);
 
-    // expires in 500 milliseconds
-    DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
-        Time.monotonicNow() + EXPIRATION_IN_MS, null,
-        AdminStates.ENTERING_MAINTENANCE);
+    final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
+        Long.MAX_VALUE, null, AdminStates.ENTERING_MAINTENANCE);
 
-    waitNodeState(nodeOutofService, AdminStates.NORMAL);
+    // Adjust the expiration.
+    takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
+        Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
 
-    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file);
   }
 
   /**
@@ -106,20 +126,18 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
   public void testInvalidExpiration() throws Exception {
     LOG.info("Starting testInvalidExpiration");
     final int replicas = 1;
-    final int numNamenodes = 1;
-    final int numDatanodes = 1;
-    final Path file1 = new Path("/testTakeNodeOutOfEnteringMaintenance.dat");
+    final Path file = new Path("/testInvalidExpiration.dat");
 
-    startCluster(numNamenodes, numDatanodes);
+    startCluster(1, 1);
 
-    FileSystem fileSys = getCluster().getFileSystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    writeFile(fileSys, file, replicas, 1);
 
     // expiration has to be greater than Time.monotonicNow().
     takeNodeOutofService(0, null, Time.monotonicNow(), null,
         AdminStates.NORMAL);
 
-    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file);
   }
 
   /**
@@ -129,18 +147,17 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
   @Test(timeout = 360000)
   public void testPutDeadNodeToMaintenance() throws Exception {
     LOG.info("Starting testPutDeadNodeToMaintenance");
-    final int numNamenodes = 1;
-    final int numDatanodes = 1;
     final int replicas = 1;
-    final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat");
+    final Path file = new Path("/testPutDeadNodeToMaintenance.dat");
 
-    startCluster(numNamenodes, numDatanodes);
+    startCluster(1, 1);
 
-    FileSystem fileSys = getCluster().getFileSystem(0);
-    FSNamesystem ns = getCluster().getNamesystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+    writeFile(fileSys, file, replicas, 1);
 
-    MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0);
+    final MiniDFSCluster.DataNodeProperties dnProp =
+        getCluster().stopDataNode(0);
     DFSTestUtil.waitForDatanodeState(
         getCluster(), dnProp.datanode.getDatanodeUuid(), false, 20000);
 
@@ -153,7 +170,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
     assertEquals(deadInMaintenance + 1, ns.getNumInMaintenanceDeadDataNodes());
     assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes());
 
-    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file);
   }
 
   /**
@@ -164,16 +181,14 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
   @Test(timeout = 360000)
   public void testPutDeadNodeToMaintenanceWithExpiration() throws Exception {
     LOG.info("Starting testPutDeadNodeToMaintenanceWithExpiration");
-    final int numNamenodes = 1;
-    final int numDatanodes = 1;
-    final int replicas = 1;
-    final Path file1 = new Path("/testPutDeadNodeToMaintenance.dat");
+    final Path file =
+        new Path("/testPutDeadNodeToMaintenanceWithExpiration.dat");
 
-    startCluster(numNamenodes, numDatanodes);
+    startCluster(1, 1);
 
     FileSystem fileSys = getCluster().getFileSystem(0);
     FSNamesystem ns = getCluster().getNamesystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    writeFile(fileSys, file, 1, 1);
 
     MiniDFSCluster.DataNodeProperties dnProp = getCluster().stopDataNode(0);
     DFSTestUtil.waitForDatanodeState(
@@ -184,16 +199,17 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
 
     DatanodeInfo nodeOutofService = takeNodeOutofService(0,
         dnProp.datanode.getDatanodeUuid(),
-        Time.monotonicNow() + EXPIRATION_IN_MS, null,
-        AdminStates.IN_MAINTENANCE);
+        Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
 
-    waitNodeState(nodeOutofService, AdminStates.NORMAL);
+    // Adjust the expiration.
+    takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
+        Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
 
     // no change
     assertEquals(deadInMaintenance, ns.getNumInMaintenanceDeadDataNodes());
     assertEquals(liveInMaintenance, ns.getNumInMaintenanceLiveDataNodes());
 
-    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file);
   }
 
   /**
@@ -202,15 +218,12 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
   @Test(timeout = 360000)
   public void testTransitionFromDecommissioned() throws IOException {
     LOG.info("Starting testTransitionFromDecommissioned");
-    final int numNamenodes = 1;
-    final int numDatanodes = 4;
-    final int replicas = 3;
-    final Path file1 = new Path("/testTransitionFromDecommissioned.dat");
+    final Path file = new Path("/testTransitionFromDecommissioned.dat");
 
-    startCluster(numNamenodes, numDatanodes);
+    startCluster(1, 4);
 
-    FileSystem fileSys = getCluster().getFileSystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    writeFile(fileSys, file, 3, 1);
 
     DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null,
         AdminStates.DECOMMISSIONED);
@@ -218,7 +231,7 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
     takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE,
         null, AdminStates.IN_MAINTENANCE);
 
-    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file);
   }
 
   /**
@@ -228,34 +241,33 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
   @Test(timeout = 360000)
   public void testTransitionFromDecommissionedAndExpired() throws IOException {
     LOG.info("Starting testTransitionFromDecommissionedAndExpired");
-    final int numNamenodes = 1;
-    final int numDatanodes = 4;
-    final int replicas = 3;
-    final Path file1 = new Path("/testTransitionFromDecommissioned.dat");
+    final Path file =
+        new Path("/testTransitionFromDecommissionedAndExpired.dat");
 
-    startCluster(numNamenodes, numDatanodes);
+    startCluster(1, 4);
 
-    FileSystem fileSys = getCluster().getFileSystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    writeFile(fileSys, file, 3, 1);
 
-    DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0, null,
-        AdminStates.DECOMMISSIONED);
+    final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0,
+        null, AdminStates.DECOMMISSIONED);
 
     takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
-        Time.monotonicNow() + EXPIRATION_IN_MS, null,
-        AdminStates.IN_MAINTENANCE);
+        Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
 
-    waitNodeState(nodeOutofService, AdminStates.NORMAL);
+    // Adjust the expiration.
+    takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
+        Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
 
-    cleanupFile(fileSys, file1);
+    cleanupFile(fileSys, file);
   }
 
   /**
    * When a node is put to maintenance, it first transitions to
    * AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have minimal
    * replication before it can be transitioned to AdminStates.IN_MAINTENANCE.
-   * If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, admin
-   * state should stay in AdminStates.ENTERING_MAINTENANCE state.
+   * If node becomes dead when it is in AdminStates.ENTERING_MAINTENANCE, it
+   * should stay in AdminStates.ENTERING_MAINTENANCE state.
    */
   @Test(timeout = 360000)
   public void testNodeDeadWhenInEnteringMaintenance() throws Exception {
@@ -263,16 +275,16 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
     final int numNamenodes = 1;
     final int numDatanodes = 1;
     final int replicas = 1;
-    final Path file1 = new Path("/testNodeDeadWhenInEnteringMaintenance.dat");
+    final Path file = new Path("/testNodeDeadWhenInEnteringMaintenance.dat");
 
     startCluster(numNamenodes, numDatanodes);
 
-    FileSystem fileSys = getCluster().getFileSystem(0);
-    FSNamesystem ns = getCluster().getNamesystem(0);
-    writeFile(fileSys, file1, replicas, 1);
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+    writeFile(fileSys, file, replicas, 1);
 
     DatanodeInfo nodeOutofService = takeNodeOutofService(0,
-        getFirstBlockFirstReplicaUuid(fileSys, file1), Long.MAX_VALUE, null,
+        getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
         AdminStates.ENTERING_MAINTENANCE);
     assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
 
@@ -281,30 +293,627 @@ public class TestMaintenanceState extends AdminStatesBaseTest {
     DFSTestUtil.waitForDatanodeState(
         getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000);
     DFSClient client = getDfsClient(0);
-    assertEquals("maintenance node shouldn't be alive", numDatanodes - 1,
+    assertEquals("maintenance node shouldn't be live", numDatanodes - 1,
         client.datanodeReport(DatanodeReportType.LIVE).length);
+    assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
 
     getCluster().restartDataNode(dnProp, true);
     getCluster().waitActive();
     waitNodeState(nodeOutofService, AdminStates.ENTERING_MAINTENANCE);
     assertEquals(1, ns.getNumEnteringMaintenanceDataNodes());
+    assertEquals("maintenance node should be live", numDatanodes,
+        client.datanodeReport(DatanodeReportType.LIVE).length);
+
+    cleanupFile(fileSys, file);
+  }
+
+  /**
+   * When a node is put to maintenance, it first transitions to
+   * AdminStates.ENTERING_MAINTENANCE. It makes sure all blocks have
+   * been properly replicated before it can be transitioned to
+   * AdminStates.IN_MAINTENANCE. The expected replication count takes
+   * DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY and
+   * its file's replication factor into account.
+   */
+  @Test(timeout = 360000)
+  public void testExpectedReplications() throws IOException {
+    LOG.info("Starting testExpectedReplications");
+    testExpectedReplication(1);
+    testExpectedReplication(2);
+    testExpectedReplication(3);
+    testExpectedReplication(4);
+  }
+
+  private void testExpectedReplication(int replicationFactor)
+      throws IOException {
+    testExpectedReplication(replicationFactor,
+        Math.max(replicationFactor - 1, this.minMaintenanceR));
+  }
+
+  private void testExpectedReplication(int replicationFactor,
+      int expectedReplicasInRead) throws IOException {
+    startCluster(1, 5);
+
+    final Path file = new Path("/testExpectedReplication.dat");
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+
+    writeFile(fileSys, file, replicationFactor, 1);
+
+    DatanodeInfo nodeOutofService = takeNodeOutofService(0,
+        getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE,
+        null, AdminStates.IN_MAINTENANCE);
+
+    // The block should be replicated to another datanode to meet
+    // expected replication count.
+    assertNull(checkWithRetry(ns, fileSys, file, expectedReplicasInRead,
+        nodeOutofService));
+
+    cleanupFile(fileSys, file);
+  }
+
+  /**
+   * Verify a node can transition directly to AdminStates.IN_MAINTENANCE when
+   * DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY is set to zero.
+   */
+  @Test(timeout = 360000)
+  public void testZeroMinMaintenanceReplication() throws Exception {
+    LOG.info("Starting testZeroMinMaintenanceReplication");
+    setMinMaintenanceR(0);
+    startCluster(1, 1);
+
+    final Path file = new Path("/testZeroMinMaintenanceReplication.dat");
+    final int replicas = 1;
+
+    FileSystem fileSys = getCluster().getFileSystem(0);
+    writeFile(fileSys, file, replicas, 1);
+
+    takeNodeOutofService(0, null, Long.MAX_VALUE, null,
+        AdminStates.IN_MAINTENANCE);
+
+    cleanupFile(fileSys, file);
+  }
+
+  /**
+   * Verify a node can transition directly to AdminStates.IN_MAINTENANCE when
+   * DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY is set to zero. Then later
+   * transition to NORMAL after maintenance expiration.
+   */
+  @Test(timeout = 360000)
+  public void testZeroMinMaintenanceReplicationWithExpiration()
+      throws Exception {
+    LOG.info("Starting testZeroMinMaintenanceReplicationWithExpiration");
+    setMinMaintenanceR(0);
+    startCluster(1, 1);
+
+    final Path file =
+        new Path("/testZeroMinMaintenanceReplicationWithExpiration.dat");
+
+    FileSystem fileSys = getCluster().getFileSystem(0);
+    writeFile(fileSys, file, 1, 1);
+
+    DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
+        Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
+
+    // Adjust the expiration.
+    takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(),
+        Time.monotonicNow() + EXPIRATION_IN_MS, null, AdminStates.NORMAL);
+
+    cleanupFile(fileSys, file);
+  }
+
+  /**
+   * Transition from IN_MAINTENANCE to DECOMMISSIONED.
+   */
+  @Test(timeout = 360000)
+  public void testTransitionToDecommission() throws IOException {
+    LOG.info("Starting testTransitionToDecommission");
+    final int numNamenodes = 1;
+    final int numDatanodes = 4;
+    startCluster(numNamenodes, numDatanodes);
+
+    final Path file = new Path("testTransitionToDecommission.dat");
+    final int replicas = 3;
+
+    FileSystem fileSys = getCluster().getFileSystem(0);
+    FSNamesystem ns = getCluster().getNamesystem(0);
+
+    writeFile(fileSys, file, replicas, 1);
+
+    DatanodeInfo nodeOutofService = takeNodeOutofService(0,
+        getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
+        AdminStates.IN_MAINTENANCE);
+
+    DFSClient client = getDfsClient(0);
+    assertEquals("All datanodes must be alive", numDatanodes,
+        client.datanodeReport(DatanodeReportType.LIVE).length);
+
+    // test 1, verify the replica in IN_MAINTENANCE state isn't in LocatedBlock
+    assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
+        nodeOutofService));
+
+    takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), 0, null,
+        AdminStates.DECOMMISSIONED);
+
+    // test 2 after decommission has completed, the replication count is
+    // replicas + 1 which includes the decommissioned node.
+    assertNull(checkWithRetry(ns, fileSys, file, replicas + 1, null));
+
+    // test 3, put the node in service, replication count should restore.
+    putNodeInService(0, nodeOutofService.getDatanodeUuid());
+    assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
+
+    cleanupFile(fileSys, file);
+  }
+
+  /**
+   * Transition from decommissioning state to maintenance state.
+   */
+  @Test(timeout = 360000)
+  public void testTransitionFromDecommissioning() throws IOException {
+    LOG.info("Starting testTransitionFromDecommissioning");
+    startCluster(1, 3);
+
+    final Path file = new Path("/testTransitionFromDecommissioning.dat");
+    final int replicas = 3;
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+
+    writeFile(fileSys, file, replicas);
+
+    final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null, 0,
+        null, AdminStates.DECOMMISSION_INPROGRESS);
+
+    takeNodeOutofService(0, nodeOutofService.getDatanodeUuid(), Long.MAX_VALUE,
+        null, AdminStates.IN_MAINTENANCE);
+
+    assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
+        nodeOutofService));
+
+    cleanupFile(fileSys, file);
+  }
+
+
+  /**
+   * First put a node in maintenance, then put a different node
+   * in decommission. Make sure decommission process take
+   * maintenance replica into account.
+   */
+  @Test(timeout = 360000)
+  public void testDecommissionDifferentNodeAfterMaintenances()
+      throws Exception {
+    testDecommissionDifferentNodeAfterMaintenance(2);
+    testDecommissionDifferentNodeAfterMaintenance(3);
+    testDecommissionDifferentNodeAfterMaintenance(4);
+  }
+
+  private void testDecommissionDifferentNodeAfterMaintenance(int repl)
+      throws Exception {
+    startCluster(1, 5);
+
+    final Path file =
+        new Path("/testDecommissionDifferentNodeAfterMaintenance.dat");
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+
+    writeFile(fileSys, file, repl, 1);
+    final DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys,
+        file);
+    String maintenanceDNUuid = nodes[0].getDatanodeUuid();
+    String decommissionDNUuid = nodes[1].getDatanodeUuid();
+    DatanodeInfo maintenanceDN = takeNodeOutofService(0, maintenanceDNUuid,
+        Long.MAX_VALUE, null, null, AdminStates.IN_MAINTENANCE);
 
-    cleanupFile(fileSys, file1);
+    Map<DatanodeInfo, Long> maintenanceNodes = new HashMap<>();
+    maintenanceNodes.put(nodes[0], Long.MAX_VALUE);
+    takeNodeOutofService(0, decommissionDNUuid, 0, null, maintenanceNodes,
+        AdminStates.DECOMMISSIONED);
+    // Out of the replicas returned, one is the decommissioned node.
+    assertNull(checkWithRetry(ns, fileSys, file, repl, maintenanceDN));
+
+    putNodeInService(0, maintenanceDN);
+    assertNull(checkWithRetry(ns, fileSys, file, repl + 1, null));
+
+    cleanupFile(fileSys, file);
+  }
+
+
+  @Test(timeout = 360000)
+  public void testChangeReplicationFactors() throws IOException {
+    // Prior to any change, there is 1 maintenance node and 2 live nodes.
+
+    // Replication factor is adjusted from 3 to 4.
+    // After the change, given 1 maintenance + 2 live is less than the
+    // newFactor, one live nodes will be added.
+    testChangeReplicationFactor(3, 4, 3);
+
+    // Replication factor is adjusted from 3 to 2.
+    // After the change, given 2 live nodes is the same as the newFactor,
+    // no live nodes will be invalidated.
+    testChangeReplicationFactor(3, 2, 2);
+
+    // Replication factor is adjusted from 3 to 1.
+    // After the change, given 2 live nodes is greater than the newFactor,
+    // one live nodes will be invalidated.
+    testChangeReplicationFactor(3, 1, 1);
+  }
+
+  /**
+   * After the change of replication factor, # of live replicas <=
+   * the new replication factor.
+   */
+  private void testChangeReplicationFactor(int oldFactor, int newFactor,
+      int expectedLiveReplicas) throws IOException {
+    LOG.info("Starting testChangeReplicationFactor {} {} {}",
+        oldFactor, newFactor, expectedLiveReplicas);
+    startCluster(1, 5);
+
+    final Path file = new Path("/testChangeReplicationFactor.dat");
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+
+    writeFile(fileSys, file, oldFactor, 1);
+
+    final DatanodeInfo nodeOutofService = takeNodeOutofService(0,
+        getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
+        AdminStates.IN_MAINTENANCE);
+
+    // Verify that the nodeOutofService remains in blocksMap and
+    // # of live replicas For read operation is expected.
+    assertNull(checkWithRetry(ns, fileSys, file, oldFactor - 1,
+        nodeOutofService));
+
+    final DFSClient client = getDfsClient(0);
+    client.setReplication(file.toString(), (short)newFactor);
+
+    // Verify that the nodeOutofService remains in blocksMap and
+    // # of live replicas for read operation.
+    assertNull(checkWithRetry(ns, fileSys, file, expectedLiveReplicas,
+        nodeOutofService));
+
+    putNodeInService(0, nodeOutofService.getDatanodeUuid());
+    assertNull(checkWithRetry(ns, fileSys, file, newFactor, null));
+
+    cleanupFile(fileSys, file);
+  }
+
+
+  /**
+   * Verify the following scenario.
+   * a. Put a live node to maintenance => 1 maintenance, 2 live.
+   * b. The maintenance node becomes dead => block map still has 1 maintenance,
+   *    2 live.
+   * c. Take the node out of maintenance => NN should schedule the replication
+   *    and end up with 3 live.
+   */
+  @Test(timeout = 360000)
+  public void testTakeDeadNodeOutOfMaintenance() throws Exception {
+    LOG.info("Starting testTakeDeadNodeOutOfMaintenance");
+    final int numNamenodes = 1;
+    final int numDatanodes = 4;
+    startCluster(numNamenodes, numDatanodes);
+
+    final Path file = new Path("/testTakeDeadNodeOutOfMaintenance.dat");
+    final int replicas = 3;
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    final FSNamesystem ns = getCluster().getNamesystem(0);
+    writeFile(fileSys, file, replicas, 1);
+
+    final DatanodeInfo nodeOutofService = takeNodeOutofService(0,
+        getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
+        AdminStates.IN_MAINTENANCE);
+
+    assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
+        nodeOutofService));
+
+    final DFSClient client = getDfsClient(0);
+    assertEquals("All datanodes must be alive", numDatanodes,
+        client.datanodeReport(DatanodeReportType.LIVE).length);
+
+    getCluster().stopDataNode(nodeOutofService.getXferAddr());
+    DFSTestUtil.waitForDatanodeState(
+        getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000);
+    assertEquals("maintenance node shouldn't be alive", numDatanodes - 1,
+        client.datanodeReport(DatanodeReportType.LIVE).length);
+
+    // Dead maintenance node's blocks should remain in block map.
+    assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
+        nodeOutofService));
+
+    // When dead maintenance mode is transitioned to out of maintenance mode,
+    // its blocks should be removed from block map.
+    // This will then trigger replication to restore the live replicas back
+    // to replication factor.
+    putNodeInService(0, nodeOutofService.getDatanodeUuid());
+    assertNull(checkWithRetry(ns, fileSys, file, replicas, nodeOutofService,
+        null));
+
+    cleanupFile(fileSys, file);
   }
 
-  static protected String getFirstBlockFirstReplicaUuid(FileSystem fileSys,
+
+  /**
+   * Verify the following scenario.
+   * a. Put a live node to maintenance => 1 maintenance, 2 live.
+   * b. The maintenance node becomes dead => block map still has 1 maintenance,
+   *    2 live.
+   * c. Restart nn => block map only has 2 live => restore the 3 live.
+   * d. Restart the maintenance dn => 1 maintenance, 3 live.
+   * e. Take the node out of maintenance => over replication => 3 live.
+   */
+  @Test(timeout = 360000)
+  public void testWithNNAndDNRestart() throws Exception {
+    LOG.info("Starting testWithNNAndDNRestart");
+    final int numNamenodes = 1;
+    final int numDatanodes = 4;
+    startCluster(numNamenodes, numDatanodes);
+
+    final Path file = new Path("/testWithNNAndDNRestart.dat");
+    final int replicas = 3;
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    FSNamesystem ns = getCluster().getNamesystem(0);
+    writeFile(fileSys, file, replicas, 1);
+
+    DatanodeInfo nodeOutofService = takeNodeOutofService(0,
+        getFirstBlockFirstReplicaUuid(fileSys, file), Long.MAX_VALUE, null,
+        AdminStates.IN_MAINTENANCE);
+
+    assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
+        nodeOutofService));
+
+    DFSClient client = getDfsClient(0);
+    assertEquals("All datanodes must be alive", numDatanodes,
+        client.datanodeReport(DatanodeReportType.LIVE).length);
+
+    MiniDFSCluster.DataNodeProperties dnProp =
+        getCluster().stopDataNode(nodeOutofService.getXferAddr());
+    DFSTestUtil.waitForDatanodeState(
+        getCluster(), nodeOutofService.getDatanodeUuid(), false, 20000);
+    assertEquals("maintenance node shouldn't be alive", numDatanodes - 1,
+        client.datanodeReport(DatanodeReportType.LIVE).length);
+
+    // Dead maintenance node's blocks should remain in block map.
+    assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
+        nodeOutofService));
+
+    // restart nn, nn will restore 3 live replicas given it doesn't
+    // know the maintenance node has the replica.
+    getCluster().restartNameNode(0);
+    ns = getCluster().getNamesystem(0);
+    assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
+
+    // restart dn, nn has 1 maintenance replica and 3 live replicas.
+    getCluster().restartDataNode(dnProp, true);
+    getCluster().waitActive();
+    assertNull(checkWithRetry(ns, fileSys, file, replicas, nodeOutofService));
+
+    // Put the node in service, a redundant replica should be removed.
+    putNodeInService(0, nodeOutofService.getDatanodeUuid());
+    assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
+
+    cleanupFile(fileSys, file);
+  }
+
+
+  /**
+   * Machine under maintenance state won't be chosen for new block allocation.
+   */
+  @Test(timeout = 3600000)
+  public void testWriteAfterMaintenance() throws IOException {
+    LOG.info("Starting testWriteAfterMaintenance");
+    startCluster(1, 3);
+
+    final Path file = new Path("/testWriteAfterMaintenance.dat");
+    int replicas = 3;
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    FSNamesystem ns = getCluster().getNamesystem(0);
+
+    final DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
+        Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
+
+    writeFile(fileSys, file, replicas, 2);
+
+    // Verify nodeOutofService wasn't chosen for write operation.
+    assertNull(checkWithRetry(ns, fileSys, file, replicas - 1,
+        nodeOutofService, null));
+
+    // Put the node back to service, live replicas should be restored.
+    putNodeInService(0, nodeOutofService.getDatanodeUuid());
+    assertNull(checkWithRetry(ns, fileSys, file, replicas, null));
+
+    cleanupFile(fileSys, file);
+  }
+
+  /**
+   * A node has blocks under construction when it is put to maintenance.
+   * Given there are minReplication replicas somewhere else,
+   * it can be transitioned to AdminStates.IN_MAINTENANCE.
+   */
+  @Test(timeout = 360000)
+  public void testEnterMaintenanceWhenFileOpen() throws Exception {
+    LOG.info("Starting testEnterMaintenanceWhenFileOpen");
+    startCluster(1, 3);
+
+    final Path file = new Path("/testEnterMaintenanceWhenFileOpen.dat");
+
+    final FileSystem fileSys = getCluster().getFileSystem(0);
+    writeIncompleteFile(fileSys, file, (short)3, (short)2);
+
+    takeNodeOutofService(0, null, Long.MAX_VALUE, null,
+        AdminStates.IN_MAINTENANCE);
+
+    cleanupFile(fileSys, file);
+  }
+
+  /**
+   * Machine under maintenance state won't be chosen for invalidation.
+   */
+  @Test(timeout = 360000)
+  public void testInvalidation() throws IOException {
+    LOG.info("Starting testInvalidation");
+    int numNamenodes = 1;
+    int numDatanodes = 3;
+    startCluster(numNamenodes, numDatanodes);
+
+    Path file = new Path("/testInvalidation.dat");
+    int replicas = 3;
+
+    FileSystem fileSys = getCluster().getFileSystem(0);
+    FSNamesystem ns = getCluster().getNamesystem(0);
+
+    writeFile(fileSys, file, replicas);
+
+    DatanodeInfo nodeOutofService = takeNodeOutofService(0, null,
+        Long.MAX_VALUE, null, AdminStates.IN_MAINTENANCE);
+
+    DFSClient client = getDfsClient(0);
+    client.setReplication(file.toString(), (short) 1);
+
+    // Verify the nodeOutofService remains in blocksMap.
+    assertNull(checkWithRetry(ns, fileSys, file, 1, nodeOutofService));
+
+    // Restart NN and verify the nodeOutofService remains in blocksMap.
+    getCluster().restartNameNode(0);
+    ns = getCluster().getNamesystem(0);
+    assertNull(checkWithRetry(ns, fileSys, file, 1, nodeOutofService));
+
+    cleanupFile(fileSys, file);
+  }
+
+  static String getFirstBlockFirstReplicaUuid(FileSystem fileSys,
       Path name) throws IOException {
+    DatanodeInfo[] nodes = getFirstBlockReplicasDatanodeInfos(fileSys, name);
+    if (nodes != null && nodes.length != 0) {
+      return nodes[0].getDatanodeUuid();
+    } else {
+      return null;
+    }
+  }
+
+  /*
+  * Verify that the number of replicas are as expected for each block in
+  * the given file.
+  *
+  * @return - null if no failure found, else an error message string.
+  */
+  static String checkFile(FSNamesystem ns, FileSystem fileSys,
+      Path name, int repl, DatanodeInfo expectedExcludedNode,
+      DatanodeInfo expectedMaintenanceNode) throws IOException {
     // need a raw stream
     assertTrue("Not HDFS:"+fileSys.getUri(),
         fileSys instanceof DistributedFileSystem);
     HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name);
+    BlockManager bm = ns.getBlockManager();
     Collection<LocatedBlock> dinfo = dis.getAllBlocks();
+    String output;
     for (LocatedBlock blk : dinfo) { // for each block
       DatanodeInfo[] nodes = blk.getLocations();
-      if (nodes.length > 0) {
-        return nodes[0].getDatanodeUuid();
+      for (int j = 0; j < nodes.length; j++) { // for each replica
+        if (expectedExcludedNode != null &&
+            nodes[j].equals(expectedExcludedNode)) {
+          //excluded node must not be in LocatedBlock.
+          output = "For block " + blk.getBlock() + " replica on " +
+              nodes[j] + " found in LocatedBlock.";
+          LOG.info(output);
+          return output;
+        } else {
+          if (nodes[j].isInMaintenance()) {
+            //IN_MAINTENANCE node must not be in LocatedBlock.
+            output = "For block " + blk.getBlock() + " replica on " +
+                nodes[j] + " which is in maintenance state.";
+            LOG.info(output);
+            return output;
+          }
+        }
+      }
+      if (repl != nodes.length) {
+        output = "Wrong number of replicas for block " + blk.getBlock() +
+            ": expected " + repl + ", got " + nodes.length + " ,";
+        for (int j = 0; j < nodes.length; j++) { // for each replica
+          output += nodes[j] + ",";
+        }
+        output += "pending block # " + ns.getPendingReplicationBlocks() + " ,";
+        output += "under replicated # " + ns.getUnderReplicatedBlocks() + " ,";
+        if (expectedExcludedNode != null) {
+          output += "excluded node " + expectedExcludedNode;
+        }
+
+        LOG.info(output);
+        return output;
+      }
+
+      // Verify it has the expected maintenance node
+      Iterator<DatanodeStorageInfo> storageInfoIter =
+          bm.getStorages(blk.getBlock().getLocalBlock()).iterator();
+      List<DatanodeInfo> maintenanceNodes = new ArrayList<>();
+      while (storageInfoIter.hasNext()) {
+        DatanodeInfo node = storageInfoIter.next().getDatanodeDescriptor();
+        if (node.isMaintenance()) {
+          maintenanceNodes.add(node);
+        }
+      }
+
+      if (expectedMaintenanceNode != null) {
+        if (!maintenanceNodes.contains(expectedMaintenanceNode)) {
+          output = "No maintenance replica on " + expectedMaintenanceNode;
+          LOG.info(output);
+          return output;
+        }
+      } else {
+        if (maintenanceNodes.size() != 0) {
+          output = "Has maintenance replica(s)";
+          LOG.info(output);
+          return output;
+        }
       }
     }
     return null;
   }
+
+  static String checkWithRetry(FSNamesystem ns, FileSystem fileSys,
+      Path name, int repl, DatanodeInfo inMaintenanceNode)
+          throws IOException {
+    return checkWithRetry(ns, fileSys, name, repl, inMaintenanceNode,
+        inMaintenanceNode);
+  }
+
+  static String checkWithRetry(FSNamesystem ns, FileSystem fileSys,
+      Path name, int repl, DatanodeInfo excludedNode,
+      DatanodeInfo underMaintenanceNode) throws IOException {
+    int tries = 0;
+    String output = null;
+    while (tries++ < 200) {
+      try {
+        Thread.sleep(100);
+        output = checkFile(ns, fileSys, name, repl, excludedNode,
+            underMaintenanceNode);
+        if (output == null) {
+          break;
+        }
+      } catch (InterruptedException ie) {
+      }
+    }
+    return output;
+  }
+
+  static private DatanodeInfo[] getFirstBlockReplicasDatanodeInfos(
+      FileSystem fileSys, Path name) throws IOException {
+    // need a raw stream
+    assertTrue("Not HDFS:"+fileSys.getUri(),
+        fileSys instanceof DistributedFileSystem);
+    HdfsDataInputStream dis = (HdfsDataInputStream)fileSys.open(name);
+    Collection<LocatedBlock> dinfo = dis.getAllBlocks();
+    if (dinfo.iterator().hasNext()) { // for the first block
+      return dinfo.iterator().next().getLocations();
+    } else {
+      return null;
+    }
+  }
 }

+ 5 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -415,9 +415,10 @@ public class TestBlockManager {
       throws Exception {
     assertEquals(0, bm.numOfUnderReplicatedBlocks());
     BlockInfo block = addBlockOnNodes(testIndex, origNodes);
-    assertFalse(bm.isNeededReconstruction(block, bm.countLiveNodes(block)));
+    assertFalse(bm.isNeededReconstruction(block,
+        bm.countNodes(block, fsn.isInStartupSafeMode())));
   }
-  
+
   @Test(timeout = 60000)
   public void testNeededReconstructionWhileAppending() throws IOException {
     Configuration conf = new HdfsConfiguration();
@@ -458,7 +459,8 @@ public class TestBlockManager {
         namenode.updatePipeline(clientName, oldBlock, newBlock,
             newLocatedBlock.getLocations(), newLocatedBlock.getStorageIDs());
         BlockInfo bi = bm.getStoredBlock(newBlock.getLocalBlock());
-        assertFalse(bm.isNeededReconstruction(bi, bm.countLiveNodes(bi)));
+        assertFalse(bm.isNeededReconstruction(bi, bm.countNodes(bi,
+            cluster.getNamesystem().isInStartupSafeMode())));
       } finally {
         IOUtils.closeStream(out);
       }

+ 16 - 41
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDecommissioningStatus.java

@@ -26,17 +26,16 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Random;
 
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.AdminStatesBaseTest;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -111,47 +110,22 @@ public class TestDecommissioningStatus {
     if(cluster != null) cluster.shutdown();
   }
 
-  private FSDataOutputStream writeIncompleteFile(FileSystem fileSys, Path name,
-      short repl) throws IOException {
-    // create and write a file that contains three blocks of data
-    FSDataOutputStream stm = fileSys.create(name, true, fileSys.getConf()
-        .getInt(CommonConfigurationKeys.IO_FILE_BUFFER_SIZE_KEY, 4096), repl,
-        blockSize);
-    byte[] buffer = new byte[fileSize];
-    Random rand = new Random(seed);
-    rand.nextBytes(buffer);
-    stm.write(buffer);
-    // need to make sure that we actually write out both file blocks
-    // (see FSOutputSummer#flush)
-    stm.flush();
-    // Do not close stream, return it
-    // so that it is not garbage collected
-    return stm;
-  }
-  
-  static private void cleanupFile(FileSystem fileSys, Path name)
-      throws IOException {
-    assertTrue(fileSys.exists(name));
-    fileSys.delete(name, true);
-    assertTrue(!fileSys.exists(name));
-  }
-
   /*
    * Decommissions the node at the given index
    */
-  private String decommissionNode(FSNamesystem namesystem, DFSClient client,
+  private String decommissionNode(DFSClient client,
       int nodeIndex) throws IOException {
     DatanodeInfo[] info = client.datanodeReport(DatanodeReportType.LIVE);
 
     String nodename = info[nodeIndex].getXferAddr();
-    decommissionNode(namesystem, nodename);
+    decommissionNode(nodename);
     return nodename;
   }
 
   /*
    * Decommissions the node by name
    */
-  private void decommissionNode(FSNamesystem namesystem, String dnName)
+  private void decommissionNode(String dnName)
       throws IOException {
     System.out.println("Decommissioning node: " + dnName);
 
@@ -166,14 +140,14 @@ public class TestDecommissioningStatus {
       int expectedUnderRepInOpenFiles) {
     assertEquals("Unexpected num under-replicated blocks",
         expectedUnderRep,
-        decommNode.decommissioningStatus.getUnderReplicatedBlocks());
+        decommNode.getLeavingServiceStatus().getUnderReplicatedBlocks());
     assertEquals("Unexpected number of decom-only replicas",
         expectedDecommissionOnly,
-        decommNode.decommissioningStatus.getDecommissionOnlyReplicas());
+        decommNode.getLeavingServiceStatus().getOutOfServiceOnlyReplicas());
     assertEquals(
         "Unexpected number of replicas in under-replicated open files",
         expectedUnderRepInOpenFiles,
-        decommNode.decommissioningStatus.getUnderReplicatedInOpenFiles());
+        decommNode.getLeavingServiceStatus().getUnderReplicatedInOpenFiles());
   }
 
   private void checkDFSAdminDecommissionStatus(
@@ -237,13 +211,14 @@ public class TestDecommissioningStatus {
     short replicas = numDatanodes;
     //
     // Decommission one node. Verify the decommission status
-    // 
+    //
     Path file1 = new Path("decommission.dat");
     DFSTestUtil.createFile(fileSys, file1, fileSize, fileSize, blockSize,
         replicas, seed);
 
     Path file2 = new Path("decommission1.dat");
-    FSDataOutputStream st1 = writeIncompleteFile(fileSys, file2, replicas);
+    FSDataOutputStream st1 = AdminStatesBaseTest.writeIncompleteFile(fileSys,
+        file2, replicas, (short)(fileSize / blockSize));
     for (DataNode d: cluster.getDataNodes()) {
       DataNodeTestUtils.triggerBlockReport(d);
     }
@@ -251,7 +226,7 @@ public class TestDecommissioningStatus {
     FSNamesystem fsn = cluster.getNamesystem();
     final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
     for (int iteration = 0; iteration < numDatanodes; iteration++) {
-      String downnode = decommissionNode(fsn, client, iteration);
+      String downnode = decommissionNode(client, iteration);
       dm.refreshNodes(conf);
       decommissionedNodes.add(downnode);
       BlockManagerTestUtil.recheckDecommissionState(dm);
@@ -281,8 +256,8 @@ public class TestDecommissioningStatus {
     hostsFileWriter.initExcludeHost("");
     dm.refreshNodes(conf);
     st1.close();
-    cleanupFile(fileSys, file1);
-    cleanupFile(fileSys, file2);
+    AdminStatesBaseTest.cleanupFile(fileSys, file1);
+    AdminStatesBaseTest.cleanupFile(fileSys, file2);
   }
 
   /**
@@ -308,7 +283,7 @@ public class TestDecommissioningStatus {
     // Decommission the DN.
     FSNamesystem fsn = cluster.getNamesystem();
     final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
-    decommissionNode(fsn, dnName);
+    decommissionNode(dnName);
     dm.refreshNodes(conf);
 
     // Stop the DN when decommission is in progress.
@@ -343,7 +318,7 @@ public class TestDecommissioningStatus {
     
     // Delete the under-replicated file, which should let the 
     // DECOMMISSION_IN_PROGRESS node become DECOMMISSIONED
-    cleanupFile(fileSys, f);
+    AdminStatesBaseTest.cleanupFile(fileSys, f);
     BlockManagerTestUtil.recheckDecommissionState(dm);
     assertTrue("the node should be decommissioned",
         dead.get(0).isDecommissioned());
@@ -376,7 +351,7 @@ public class TestDecommissioningStatus {
     FSNamesystem fsn = cluster.getNamesystem();
     final DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
     DatanodeDescriptor dnDescriptor = dm.getDatanode(dnID);
-    decommissionNode(fsn, dnName);
+    decommissionNode(dnName);
     dm.refreshNodes(conf);
     BlockManagerTestUtil.recheckDecommissionState(dm);
     assertTrue(dnDescriptor.isDecommissioned());

+ 53 - 25
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeCapacityReport.java

@@ -195,9 +195,17 @@ public class TestNamenodeCapacityReport {
   private static final float EPSILON = 0.0001f;
   @Test
   public void testXceiverCount() throws Exception {
+    testXceiverCountInternal(0);
+    testXceiverCountInternal(1);
+  }
+
+  public void testXceiverCountInternal(int minMaintenanceR) throws Exception {
     Configuration conf = new HdfsConfiguration();
     // retry one time, if close fails
-    conf.setInt(HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1);
+    conf.setInt(
+        HdfsClientConfigKeys.BlockWrite.LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_MAINTENANCE_REPLICATION_MIN_KEY,
+        minMaintenanceR);
     MiniDFSCluster cluster = null;
 
     final int nodes = 8;
@@ -220,23 +228,23 @@ public class TestNamenodeCapacityReport {
       int expectedTotalLoad = nodes;  // xceiver server adds 1 to load
       int expectedInServiceNodes = nodes;
       int expectedInServiceLoad = nodes;
-      checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
-      
-      // shutdown half the nodes and force a heartbeat check to ensure
-      // counts are accurate
+      checkClusterHealth(nodes, namesystem, expectedTotalLoad,
+          expectedInServiceNodes, expectedInServiceLoad);
+
+      // Shutdown half the nodes followed by admin operations on those nodes.
+      // Ensure counts are accurate.
       for (int i=0; i < nodes/2; i++) {
         DataNode dn = datanodes.get(i);
         DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
         dn.shutdown();
         DFSTestUtil.setDatanodeDead(dnd);
         BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
-        //Verify decommission of dead node won't impact nodesInService metrics.
-        dnm.getDecomManager().startDecommission(dnd);
+        //Admin operations on dead nodes won't impact nodesInService metrics.
+        startDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0));
         expectedInServiceNodes--;
         assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
         assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
-        //Verify recommission of dead node won't impact nodesInService metrics.
-        dnm.getDecomManager().stopDecommission(dnd);
+        stopDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0));
         assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
       }
 
@@ -247,8 +255,9 @@ public class TestNamenodeCapacityReport {
       datanodes = cluster.getDataNodes();
       expectedInServiceNodes = nodes;
       assertEquals(nodes, datanodes.size());
-      checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
-      
+      checkClusterHealth(nodes, namesystem, expectedTotalLoad,
+          expectedInServiceNodes, expectedInServiceLoad);
+
       // create streams and hsync to force datastreamers to start
       DFSOutputStream[] streams = new DFSOutputStream[fileCount];
       for (int i=0; i < fileCount; i++) {
@@ -263,30 +272,32 @@ public class TestNamenodeCapacityReport {
       }
       // force nodes to send load update
       triggerHeartbeats(datanodes);
-      checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
+      checkClusterHealth(nodes, namesystem, expectedTotalLoad,
+          expectedInServiceNodes, expectedInServiceLoad);
 
-      // decomm a few nodes, substract their load from the expected load,
-      // trigger heartbeat to force load update
+      // admin operations on a few nodes, substract their load from the
+      // expected load, trigger heartbeat to force load update.
       for (int i=0; i < fileRepl; i++) {
         expectedInServiceNodes--;
         DatanodeDescriptor dnd =
             dnm.getDatanode(datanodes.get(i).getDatanodeId());
         expectedInServiceLoad -= dnd.getXceiverCount();
-        dnm.getDecomManager().startDecommission(dnd);
+        startDecommissionOrMaintenance(dnm, dnd, (i % 2 == 0));
         DataNodeTestUtils.triggerHeartbeat(datanodes.get(i));
         Thread.sleep(100);
-        checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
+        checkClusterHealth(nodes, namesystem, expectedTotalLoad,
+            expectedInServiceNodes, expectedInServiceLoad);
       }
-      
+
       // check expected load while closing each stream.  recalc expected
       // load based on whether the nodes in the pipeline are decomm
       for (int i=0; i < fileCount; i++) {
-        int decomm = 0;
+        int adminOps = 0;
         for (DatanodeInfo dni : streams[i].getPipeline()) {
           DatanodeDescriptor dnd = dnm.getDatanode(dni);
           expectedTotalLoad -= 2;
-          if (dnd.isDecommissionInProgress() || dnd.isDecommissioned()) {
-            decomm++;
+          if (!dnd.isInService()) {
+            adminOps++;
           } else {
             expectedInServiceLoad -= 2;
           }
@@ -297,16 +308,17 @@ public class TestNamenodeCapacityReport {
           // nodes will go decommissioned even if there's a UC block whose
           // other locations are decommissioned too.  we'll ignore that
           // bug for now
-          if (decomm < fileRepl) {
+          if (adminOps < fileRepl) {
             throw ioe;
           }
         }
         triggerHeartbeats(datanodes);
         // verify node count and loads 
-        checkClusterHealth(nodes, namesystem, expectedTotalLoad, expectedInServiceNodes, expectedInServiceLoad);
+        checkClusterHealth(nodes, namesystem, expectedTotalLoad,
+            expectedInServiceNodes, expectedInServiceLoad);
       }
 
-      // shutdown each node, verify node counts based on decomm state
+      // shutdown each node, verify node counts based on admin state
       for (int i=0; i < nodes; i++) {
         DataNode dn = datanodes.get(i);
         dn.shutdown();
@@ -320,13 +332,11 @@ public class TestNamenodeCapacityReport {
           expectedInServiceNodes--;
         }
         assertEquals(expectedInServiceNodes, getNumDNInService(namesystem));
-        
         // live nodes always report load of 1.  no nodes is load 0
         double expectedXceiverAvg = (i == nodes-1) ? 0.0 : 1.0;
         assertEquals((double)expectedXceiverAvg,
             getInServiceXceiverAverage(namesystem), EPSILON);
       }
-      
       // final sanity check
       checkClusterHealth(0, namesystem, 0.0, 0, 0.0);
     } finally {
@@ -336,6 +346,24 @@ public class TestNamenodeCapacityReport {
     }
   }
 
+  private void startDecommissionOrMaintenance(DatanodeManager dnm,
+      DatanodeDescriptor dnd, boolean decomm) {
+    if (decomm) {
+      dnm.getDecomManager().startDecommission(dnd);
+    } else {
+      dnm.getDecomManager().startMaintenance(dnd, Long.MAX_VALUE);
+    }
+  }
+
+  private void stopDecommissionOrMaintenance(DatanodeManager dnm,
+      DatanodeDescriptor dnd, boolean decomm) {
+    if (decomm) {
+      dnm.getDecomManager().stopDecommission(dnd);
+    } else {
+      dnm.getDecomManager().stopMaintenance(dnd);
+    }
+  }
+
   private static void checkClusterHealth(
     int numOfLiveNodes,
     FSNamesystem namesystem, double expectedTotalLoad,

+ 1 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/HostsFileWriter.java

@@ -54,6 +54,7 @@ public class HostsFileWriter {
     localFileSys = FileSystem.getLocal(conf);
     Path workingDir = new Path(MiniDFSCluster.getBaseDirectory());
     this.fullDir = new Path(workingDir, dir);
+    cleanup(); // In case there is some left over from previous run.
     assertTrue(localFileSys.mkdirs(this.fullDir));
 
     if (conf.getClass(