Explorar el Código

HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. Contributed by Jing Zhao.

(cherry picked from commit be7a0add8b6561d3c566237cc0370b06e7f32bb4)
Jing Zhao hace 10 años
padre
commit
bd3364e078
Se han modificado 14 ficheros con 350 adiciones y 321 borrados
  1. 2 0
      hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
  2. 5 5
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
  3. 14 2
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java
  4. 71 95
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
  5. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
  6. 193 0
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java
  7. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java
  8. 49 205
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
  9. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java
  10. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
  11. 1 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
  12. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
  13. 3 1
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockUnderConstructionFeature.java
  14. 3 3
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java

+ 2 - 0
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

@@ -697,6 +697,8 @@ Release 2.8.0 - UNRELEASED
     HDFS-9238. Update TestFileCreation.testLeaseExpireHardLimit() to avoid using
     DataNodeTestUtils.getFile(). (Tony Wu via lei)
 
+    HDFS-9223. Code cleanup for DatanodeDescriptor and HeartbeatManager. (jing9)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

+ 5 - 5
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java

@@ -948,9 +948,9 @@ public class BlockManager implements BlockStatsMXBean {
   void addKeyUpdateCommand(final List<DatanodeCommand> cmds,
       final DatanodeDescriptor nodeinfo) {
     // check access key update
-    if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate) {
+    if (isBlockTokenEnabled() && nodeinfo.needKeyUpdate()) {
       cmds.add(new KeyUpdateCommand(blockTokenSecretManager.exportKeys()));
-      nodeinfo.needKeyUpdate = false;
+      nodeinfo.setNeedKeyUpdate(false);
     }
   }
   
@@ -1805,7 +1805,7 @@ public class BlockManager implements BlockStatsMXBean {
 
     try {
       node = datanodeManager.getDatanode(nodeID);
-      if (node == null || !node.isAlive) {
+      if (node == null || !node.isAlive()) {
         throw new IOException(
             "ProcessReport from dead or unregistered node: " + nodeID);
       }
@@ -3227,7 +3227,7 @@ public class BlockManager implements BlockStatsMXBean {
     int deleted = 0;
     int receiving = 0;
     final DatanodeDescriptor node = datanodeManager.getDatanode(nodeID);
-    if (node == null || !node.isAlive) {
+    if (node == null || !node.isAlive()) {
       blockLog.warn("BLOCK* processIncrementalBlockReport"
               + " is received from dead or unregistered node {}", nodeID);
       throw new IOException(
@@ -3374,7 +3374,7 @@ public class BlockManager implements BlockStatsMXBean {
       return false;
     }
 
-    if (node.isAlive) {
+    if (node.isAlive()) {
       return true;
     }
 

+ 14 - 2
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlocksMap.java

@@ -121,7 +121,9 @@ class BlocksMap {
     blockInfo.setBlockCollectionId(INodeId.INVALID_INODE_ID);
     for(int idx = blockInfo.numNodes()-1; idx >= 0; idx--) {
       DatanodeDescriptor dn = blockInfo.getDatanode(idx);
-      dn.removeBlock(blockInfo); // remove from the list and wipe the location
+      if (dn != null) {
+        removeBlock(dn, blockInfo); // remove from the list and wipe the location
+      }
     }
   }
   
@@ -184,7 +186,7 @@ class BlocksMap {
       return false;
 
     // remove block from the data-node list and the node from the block info
-    boolean removed = node.removeBlock(info);
+    boolean removed = removeBlock(node, info);
 
     if (info.getDatanode(0) == null     // no datanodes left
               && info.isDeleted()) {  // does not belong to a file
@@ -193,6 +195,16 @@ class BlocksMap {
     return removed;
   }
 
+  /**
+   * Remove block from the list of blocks belonging to the data-node. Remove
+   * data-node from the block.
+   */
+  static boolean removeBlock(DatanodeDescriptor dn, BlockInfo b) {
+    final DatanodeStorageInfo s = b.findStorageInfo(dn);
+    // if block exists on this datanode
+    return s != null && s.removeBlock(b);
+  }
+
   int size() {
     if (blocks != null) {
       return blocks.size();

+ 71 - 95
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java

@@ -66,29 +66,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public static final Logger LOG =
       LoggerFactory.getLogger(DatanodeDescriptor.class);
   public static final DatanodeDescriptor[] EMPTY_ARRAY = {};
-
-  // Stores status of decommissioning.
-  // If node is not decommissioning, do not use this object for anything.
-  public final DecommissioningStatus decommissioningStatus =
-      new DecommissioningStatus();
-
-  private long curBlockReportId = 0;
-
-  private BitSet curBlockReportRpcsSeen = null;
-
-  public int updateBlockReportContext(BlockReportContext context) {
-    if (curBlockReportId != context.getReportId()) {
-      curBlockReportId = context.getReportId();
-      curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
-    }
-    curBlockReportRpcsSeen.set(context.getCurRpc());
-    return curBlockReportRpcsSeen.cardinality();
-  }
-
-  public void clearBlockReportContext() {
-    curBlockReportId = 0;
-    curBlockReportRpcsSeen = null;
-  }
+  private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
+  private static final List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
+      ImmutableList.of();
 
   /** Block and targets pair */
   @InterfaceAudience.Private
@@ -105,7 +85,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   /** A BlockTargetPair queue. */
   private static class BlockQueue<E> {
-    private final Queue<E> blockq = new LinkedList<E>();
+    private final Queue<E> blockq = new LinkedList<>();
 
     /** Size of the queue */
     synchronized int size() {return blockq.size();}
@@ -131,7 +111,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     /**
      * Returns <tt>true</tt> if the queue contains the specified element.
      */
-    boolean contains(E e) {
+    synchronized boolean contains(E e) {
       return blockq.contains(e);
     }
 
@@ -140,9 +120,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  private final Map<String, DatanodeStorageInfo> storageMap = 
-      new HashMap<>();
-
   /**
    * A list of CachedBlock objects on this datanode.
    */
@@ -171,6 +148,18 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
+  // Stores status of decommissioning.
+  // If node is not decommissioning, do not use this object for anything.
+  public final DecommissioningStatus decommissioningStatus =
+      new DecommissioningStatus();
+
+  private long curBlockReportId = 0;
+
+  private BitSet curBlockReportRpcsSeen = null;
+
+  private final Map<String, DatanodeStorageInfo> storageMap =
+      new HashMap<>();
+
   /**
    * The blocks which we want to cache on this DataNode.
    */
@@ -190,18 +179,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private final CachedBlocksList pendingUncached = 
       new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
 
-  public CachedBlocksList getPendingCached() {
-    return pendingCached;
-  }
-
-  public CachedBlocksList getCached() {
-    return cached;
-  }
-
-  public CachedBlocksList getPendingUncached() {
-    return pendingUncached;
-  }
-
   /**
    * The time when the last batch of caching directives was sent, in
    * monotonic milliseconds.
@@ -210,9 +187,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
-  public boolean isAlive = false;
-  public boolean needKeyUpdate = false;
-
+  private boolean isAlive = false;
+  private boolean needKeyUpdate = false;
   
   // A system administrator can tune the balancer bandwidth parameter
   // (dfs.balance.bandwidthPerSec) dynamically by calling
@@ -241,7 +217,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
   private EnumCounters<StorageType> prevApproxBlocksScheduled
       = new EnumCounters<>(StorageType.class);
   private long lastBlocksScheduledRollTime = 0;
-  private static final int BLOCKS_SCHEDULED_ROLL_INTERVAL = 600*1000; //10min
   private int volumeFailures = 0;
   private VolumeFailureSummary volumeFailureSummary = null;
   
@@ -277,6 +252,48 @@ public class DatanodeDescriptor extends DatanodeInfo {
     updateHeartbeatState(StorageReport.EMPTY_ARRAY, 0L, 0L, 0, 0, null);
   }
 
+  public int updateBlockReportContext(BlockReportContext context) {
+    if (curBlockReportId != context.getReportId()) {
+      curBlockReportId = context.getReportId();
+      curBlockReportRpcsSeen = new BitSet(context.getTotalRpcs());
+    }
+    curBlockReportRpcsSeen.set(context.getCurRpc());
+    return curBlockReportRpcsSeen.cardinality();
+  }
+
+  public void clearBlockReportContext() {
+    curBlockReportId = 0;
+    curBlockReportRpcsSeen = null;
+  }
+
+  public CachedBlocksList getPendingCached() {
+    return pendingCached;
+  }
+
+  public CachedBlocksList getCached() {
+    return cached;
+  }
+
+  public CachedBlocksList getPendingUncached() {
+    return pendingUncached;
+  }
+
+  public boolean isAlive() {
+    return isAlive;
+  }
+
+  public void setAlive(boolean isAlive) {
+    this.isAlive = isAlive;
+  }
+
+  public boolean needKeyUpdate() {
+    return needKeyUpdate;
+  }
+
+  public void setNeedKeyUpdate(boolean needKeyUpdate) {
+    this.needKeyUpdate = needKeyUpdate;
+  }
+
   @VisibleForTesting
   public DatanodeStorageInfo getStorageInfo(String storageID) {
     synchronized (storageMap) {
@@ -311,9 +328,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
   }
 
-  static final private List<DatanodeStorageInfo> EMPTY_STORAGE_INFO_LIST =
-      ImmutableList.of();
-
   List<DatanodeStorageInfo> removeZombieStorages() {
     List<DatanodeStorageInfo> zombies = null;
     synchronized (storageMap) {
@@ -339,28 +353,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return zombies == null ? EMPTY_STORAGE_INFO_LIST : zombies;
   }
 
-  /**
-   * Remove block from the list of blocks belonging to the data-node. Remove
-   * data-node from the block.
-   */
-  boolean removeBlock(BlockInfo b) {
-    final DatanodeStorageInfo s = b.findStorageInfo(this);
-    // if block exists on this datanode
-    if (s != null) {
-      return s.removeBlock(b);
-    }
-    return false;
-  }
-  
-  /**
-   * Remove block from the list of blocks belonging to the data-node. Remove
-   * data-node from the block.
-   */
-  boolean removeBlock(String storageID, BlockInfo b) {
-    DatanodeStorageInfo s = getStorageInfo(storageID);
-    return s != null && s.removeBlock(b);
-  }
-
   public void resetBlocks() {
     setCapacity(0);
     setRemaining(0);
@@ -379,9 +371,9 @@ public class DatanodeDescriptor extends DatanodeInfo {
   public void clearBlockQueues() {
     synchronized (invalidateBlocks) {
       this.invalidateBlocks.clear();
-      this.recoverBlocks.clear();
-      this.replicateBlocks.clear();
     }
+    this.recoverBlocks.clear();
+    this.replicateBlocks.clear();
     // pendingCached, cached, and pendingUncached are protected by the
     // FSN lock.
     this.pendingCached.clear();
@@ -582,9 +574,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
   Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(getStorageInfos());
   }
-  Iterator<BlockInfo> getBlockIterator(final String storageID) {
-    return new BlockIterator(getStorageInfo(storageID));
-  }
 
   void incrementPendingReplicationWithoutTargets() {
     PendingReplicationWithoutTargets++;
@@ -633,16 +622,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
     return PendingReplicationWithoutTargets + replicateBlocks.size();
   }
 
-  /**
-   * The number of block invalidation items that are pending to 
-   * be sent to the datanode
-   */
-  int getNumberOfBlocksToBeInvalidated() {
-    synchronized (invalidateBlocks) {
-      return invalidateBlocks.size();
-    }
-  }
-
   public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
     return replicateBlocks.poll(maxTransfers);
   }
@@ -674,7 +653,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
    *
    * @param t requested storage type
    * @param blockSize requested block size
-   * @return
    */
   public DatanodeStorageInfo chooseStorage4Block(StorageType t,
       long blockSize) {
@@ -684,8 +662,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     long remaining = 0;
     DatanodeStorageInfo storage = null;
     for (DatanodeStorageInfo s : getStorageInfos()) {
-      if (s.getState() == State.NORMAL &&
-          s.getStorageType() == t) {
+      if (s.getState() == State.NORMAL && s.getStorageType() == t) {
         if (storage == null) {
           storage = s;
         }
@@ -721,7 +698,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
   /** Increment the number of blocks scheduled. */
   void incrementBlocksScheduled(StorageType t) {
-    currApproxBlocksScheduled.add(t, 1);;
+    currApproxBlocksScheduled.add(t, 1);
   }
   
   /** Decrement the number of blocks scheduled. */
@@ -765,7 +742,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     
     synchronized void set(int underRep,
         int onlyRep, int underConstruction) {
-      if (isDecommissionInProgress() == false) {
+      if (!isDecommissionInProgress()) {
         return;
       }
       underReplicatedBlocks = underRep;
@@ -775,21 +752,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
 
     /** @return the number of under-replicated blocks */
     public synchronized int getUnderReplicatedBlocks() {
-      if (isDecommissionInProgress() == false) {
+      if (!isDecommissionInProgress()) {
         return 0;
       }
       return underReplicatedBlocks;
     }
     /** @return the number of decommission-only replicas */
     public synchronized int getDecommissionOnlyReplicas() {
-      if (isDecommissionInProgress() == false) {
+      if (!isDecommissionInProgress()) {
         return 0;
       }
       return decommissionOnlyReplicas;
     }
     /** @return the number of under-replicated blocks in open files */
     public synchronized int getUnderReplicatedInOpenFiles() {
-      if (isDecommissionInProgress() == false) {
+      if (!isDecommissionInProgress()) {
         return 0;
       }
       return underReplicatedInOpenFiles;
@@ -800,7 +777,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
     }
     /** @return start time */
     public synchronized long getStartTime() {
-      if (isDecommissionInProgress() == false) {
+      if (!isDecommissionInProgress()) {
         return 0;
       }
       return startTime;
@@ -918,8 +895,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
   }
   
   /**
-   * checks whether atleast first block report has been received
-   * @return
+   * @return whether at least first block report has been received
    */
   public boolean checkBlockReportReceived() {
     if(this.getStorageInfos().length == 0) {

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java

@@ -308,7 +308,7 @@ public class DatanodeManager {
   
   void activate(final Configuration conf) {
     decomManager.activate(conf);
-    heartbeatManager.activate(conf);
+    heartbeatManager.activate();
   }
 
   void close() {
@@ -654,7 +654,7 @@ public class DatanodeManager {
   }
 
   private boolean shouldCountVersion(DatanodeDescriptor node) {
-    return node.getSoftwareVersion() != null && node.isAlive &&
+    return node.getSoftwareVersion() != null && node.isAlive() &&
       !isDatanodeDead(node);
   }
 
@@ -1338,7 +1338,7 @@ public class DatanodeManager {
           throw new DisallowedDatanodeException(nodeinfo);
         }
 
-        if (nodeinfo == null || !nodeinfo.isAlive) {
+        if (nodeinfo == null || !nodeinfo.isAlive()) {
           return new DatanodeCommand[]{RegisterCommand.REGISTER};
         }
 

+ 193 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStats.java

@@ -0,0 +1,193 @@
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+
+import java.util.EnumMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Datanode statistics.
+ * For decommissioning/decommissioned nodes, only used capacity is counted.
+ */
+class DatanodeStats {
+
+  private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
+  private long capacityTotal = 0L;
+  private long capacityUsed = 0L;
+  private long capacityRemaining = 0L;
+  private long blockPoolUsed = 0L;
+  private int xceiverCount = 0;
+  private long cacheCapacity = 0L;
+  private long cacheUsed = 0L;
+
+  private int nodesInService = 0;
+  private int nodesInServiceXceiverCount = 0;
+  private int expiredHeartbeats = 0;
+
+  synchronized void add(final DatanodeDescriptor node) {
+    capacityUsed += node.getDfsUsed();
+    blockPoolUsed += node.getBlockPoolUsed();
+    xceiverCount += node.getXceiverCount();
+    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+      nodesInService++;
+      nodesInServiceXceiverCount += node.getXceiverCount();
+      capacityTotal += node.getCapacity();
+      capacityRemaining += node.getRemaining();
+    } else {
+      capacityTotal += node.getDfsUsed();
+    }
+    cacheCapacity += node.getCacheCapacity();
+    cacheUsed += node.getCacheUsed();
+    Set<StorageType> storageTypes = new HashSet<>();
+    for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
+      statsMap.addStorage(storageInfo, node);
+      storageTypes.add(storageInfo.getStorageType());
+    }
+    for (StorageType storageType : storageTypes) {
+      statsMap.addNode(storageType, node);
+    }
+  }
+
+  synchronized void subtract(final DatanodeDescriptor node) {
+    capacityUsed -= node.getDfsUsed();
+    blockPoolUsed -= node.getBlockPoolUsed();
+    xceiverCount -= node.getXceiverCount();
+    if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
+      nodesInService--;
+      nodesInServiceXceiverCount -= node.getXceiverCount();
+      capacityTotal -= node.getCapacity();
+      capacityRemaining -= node.getRemaining();
+    } else {
+      capacityTotal -= node.getDfsUsed();
+    }
+    cacheCapacity -= node.getCacheCapacity();
+    cacheUsed -= node.getCacheUsed();
+    Set<StorageType> storageTypes = new HashSet<>();
+    for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
+      statsMap.subtractStorage(storageInfo, node);
+      storageTypes.add(storageInfo.getStorageType());
+    }
+    for (StorageType storageType : storageTypes) {
+      statsMap.subtractNode(storageType, node);
+    }
+  }
+
+  /** Increment expired heartbeat counter. */
+  void incrExpiredHeartbeats() {
+    expiredHeartbeats++;
+  }
+
+  synchronized Map<StorageType, StorageTypeStats> getStatsMap() {
+    return statsMap.get();
+  }
+
+  synchronized long getCapacityTotal() {
+    return capacityTotal;
+  }
+
+  synchronized long getCapacityUsed() {
+    return capacityUsed;
+  }
+
+  synchronized long getCapacityRemaining() {
+    return capacityRemaining;
+  }
+
+  synchronized long getBlockPoolUsed() {
+    return blockPoolUsed;
+  }
+
+  synchronized int getXceiverCount() {
+    return xceiverCount;
+  }
+
+  synchronized long getCacheCapacity() {
+    return cacheCapacity;
+  }
+
+  synchronized long getCacheUsed() {
+    return cacheUsed;
+  }
+
+  synchronized int getNodesInService() {
+    return nodesInService;
+  }
+
+  synchronized int getNodesInServiceXceiverCount() {
+    return nodesInServiceXceiverCount;
+  }
+
+  synchronized int getExpiredHeartbeats() {
+    return expiredHeartbeats;
+  }
+
+  synchronized float getCapacityRemainingPercent() {
+    return DFSUtilClient.getPercentRemaining(capacityRemaining, capacityTotal);
+  }
+
+  synchronized float getPercentBlockPoolUsed() {
+    return DFSUtilClient.getPercentUsed(blockPoolUsed, capacityTotal);
+  }
+
+  synchronized long getCapacityUsedNonDFS() {
+    final long nonDFSUsed = capacityTotal - capacityRemaining - capacityUsed;
+    return nonDFSUsed < 0L? 0L : nonDFSUsed;
+  }
+
+  synchronized float getCapacityUsedPercent() {
+    return DFSUtilClient.getPercentUsed(capacityUsed, capacityTotal);
+  }
+
+  static final class StorageTypeStatsMap {
+
+    private Map<StorageType, StorageTypeStats> storageTypeStatsMap =
+        new EnumMap<>(StorageType.class);
+
+    private Map<StorageType, StorageTypeStats> get() {
+      return new EnumMap<>(storageTypeStatsMap);
+    }
+
+    private void addNode(StorageType storageType,
+        final DatanodeDescriptor node) {
+      StorageTypeStats storageTypeStats =
+          storageTypeStatsMap.get(storageType);
+      if (storageTypeStats == null) {
+        storageTypeStats = new StorageTypeStats();
+        storageTypeStatsMap.put(storageType, storageTypeStats);
+      }
+      storageTypeStats.addNode(node);
+    }
+
+    private void addStorage(final DatanodeStorageInfo info,
+        final DatanodeDescriptor node) {
+      StorageTypeStats storageTypeStats =
+          storageTypeStatsMap.get(info.getStorageType());
+      if (storageTypeStats == null) {
+        storageTypeStats = new StorageTypeStats();
+        storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
+      }
+      storageTypeStats.addStorage(info, node);
+    }
+
+    private void subtractStorage(final DatanodeStorageInfo info,
+        final DatanodeDescriptor node) {
+      StorageTypeStats storageTypeStats =
+          storageTypeStatsMap.get(info.getStorageType());
+      if (storageTypeStats != null) {
+        storageTypeStats.subtractStorage(info, node);
+      }
+    }
+
+    private void subtractNode(StorageType storageType,
+        final DatanodeDescriptor node) {
+      StorageTypeStats storageTypeStats =
+          storageTypeStatsMap.get(storageType);
+      if (storageTypeStats != null) {
+        storageTypeStats.subtractNode(node);
+      }
+    }
+  }
+}

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DecommissionManager.java

@@ -226,7 +226,7 @@ public class DecommissionManager {
       hbManager.stopDecommission(node);
       // Over-replicated blocks will be detected and processed when
       // the dead node comes back and send in its full block report.
-      if (node.isAlive) {
+      if (node.isAlive()) {
         blockManager.processOverReplicatedBlocksOnReCommission(node);
       }
       // Remove from tracking in DecommissionManager

+ 49 - 205
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java

@@ -18,18 +18,13 @@
 package org.apache.hadoop.hdfs.server.blockmanagement;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -57,10 +52,10 @@ class HeartbeatManager implements DatanodeStatistics {
    * and removes them from the list.
    * It is synchronized by the heartbeat manager lock.
    */
-  private final List<DatanodeDescriptor> datanodes = new ArrayList<DatanodeDescriptor>();
+  private final List<DatanodeDescriptor> datanodes = new ArrayList<>();
 
   /** Statistics, which are synchronized by the heartbeat manager lock. */
-  private final Stats stats = new Stats();
+  private final DatanodeStats stats = new DatanodeStats();
 
   /** The time period to check for expired datanodes */
   private final long heartbeatRecheckInterval;
@@ -96,7 +91,7 @@ class HeartbeatManager implements DatanodeStatistics {
     }
   }
 
-  void activate(Configuration conf) {
+  void activate() {
     heartbeatThread.start();
   }
 
@@ -105,7 +100,7 @@ class HeartbeatManager implements DatanodeStatistics {
     try {
       // This will no effect if the thread hasn't yet been started.
       heartbeatThread.join(3000);
-    } catch (InterruptedException e) {
+    } catch (InterruptedException ignored) {
     }
   }
   
@@ -114,74 +109,69 @@ class HeartbeatManager implements DatanodeStatistics {
   }
 
   @Override
-  public synchronized long getCapacityTotal() {
-    return stats.capacityTotal;
+  public long getCapacityTotal() {
+    return stats.getCapacityTotal();
   }
 
   @Override
-  public synchronized long getCapacityUsed() {
-    return stats.capacityUsed;
+  public long getCapacityUsed() {
+    return stats.getCapacityUsed();
   }
 
   @Override
-  public synchronized float getCapacityUsedPercent() {
-    return DFSUtilClient.getPercentUsed(stats.capacityUsed, stats.capacityTotal);
+  public float getCapacityUsedPercent() {
+    return stats.getCapacityUsedPercent();
   }
 
   @Override
-  public synchronized long getCapacityRemaining() {
-    return stats.capacityRemaining;
+  public long getCapacityRemaining() {
+    return stats.getCapacityRemaining();
   }
 
   @Override
-  public synchronized float getCapacityRemainingPercent() {
-    return DFSUtilClient.getPercentRemaining(stats.capacityRemaining,
-                                             stats.capacityTotal);
+  public float getCapacityRemainingPercent() {
+    return stats.getCapacityRemainingPercent();
   }
 
   @Override
-  public synchronized long getBlockPoolUsed() {
-    return stats.blockPoolUsed;
+  public long getBlockPoolUsed() {
+    return stats.getBlockPoolUsed();
   }
 
   @Override
-  public synchronized float getPercentBlockPoolUsed() {
-    return DFSUtilClient.getPercentUsed(stats.blockPoolUsed,
-                                        stats.capacityTotal);
+  public float getPercentBlockPoolUsed() {
+    return stats.getPercentBlockPoolUsed();
   }
 
   @Override
-  public synchronized long getCapacityUsedNonDFS() {
-    final long nonDFSUsed = stats.capacityTotal
-        - stats.capacityRemaining - stats.capacityUsed;
-    return nonDFSUsed < 0L? 0L : nonDFSUsed;
+  public long getCapacityUsedNonDFS() {
+    return stats.getCapacityUsedNonDFS();
   }
 
   @Override
-  public synchronized int getXceiverCount() {
-    return stats.xceiverCount;
+  public int getXceiverCount() {
+    return stats.getXceiverCount();
   }
   
   @Override
-  public synchronized int getInServiceXceiverCount() {
-    return stats.nodesInServiceXceiverCount;
+  public int getInServiceXceiverCount() {
+    return stats.getNodesInServiceXceiverCount();
   }
   
   @Override
-  public synchronized int getNumDatanodesInService() {
-    return stats.nodesInService;
+  public int getNumDatanodesInService() {
+    return stats.getNodesInService();
   }
   
   @Override
-  public synchronized long getCacheCapacity() {
-    return stats.cacheCapacity;
+  public long getCacheCapacity() {
+    return stats.getCacheCapacity();
   }
 
   @Override
-  public synchronized long getCacheUsed() {
-    return stats.cacheUsed;
+  public long getCacheUsed() {
+    return stats.getCacheUsed();
   }
-  
 
   @Override
   public synchronized long[] getStats() {
@@ -195,17 +185,17 @@ class HeartbeatManager implements DatanodeStatistics {
   }
 
   @Override
-  public synchronized int getExpiredHeartbeats() {
-    return stats.expiredHeartbeats;
+  public int getExpiredHeartbeats() {
+    return stats.getExpiredHeartbeats();
   }
 
   @Override
-  public  Map<StorageType, StorageTypeStats> getStorageTypeStats() {
-    return stats.statsMap.get();
+  public Map<StorageType, StorageTypeStats> getStorageTypeStats() {
+    return stats.getStatsMap();
   }
 
   synchronized void register(final DatanodeDescriptor d) {
-    if (!d.isAlive) {
+    if (!d.isAlive()) {
       addDatanode(d);
 
       //update its timestamp
@@ -221,14 +211,14 @@ class HeartbeatManager implements DatanodeStatistics {
     // update in-service node count
     stats.add(d);
     datanodes.add(d);
-    d.isAlive = true;
+    d.setAlive(true);
   }
 
   synchronized void removeDatanode(DatanodeDescriptor node) {
-    if (node.isAlive) {
+    if (node.isAlive()) {
       stats.subtract(node);
       datanodes.remove(node);
-      node.isAlive = false;
+      node.setAlive(false);
     }
   }
 
@@ -243,7 +233,7 @@ class HeartbeatManager implements DatanodeStatistics {
   }
 
   synchronized void startDecommission(final DatanodeDescriptor node) {
-    if (!node.isAlive) {
+    if (!node.isAlive()) {
       LOG.info("Dead node {} is decommissioned immediately.", node);
       node.setDecommissioned();
     } else {
@@ -255,8 +245,8 @@ class HeartbeatManager implements DatanodeStatistics {
 
   synchronized void stopDecommission(final DatanodeDescriptor node) {
     LOG.info("Stopping decommissioning of {} node {}",
-        node.isAlive ? "live" : "dead", node);
-    if (!node.isAlive) {
+        node.isAlive() ? "live" : "dead", node);
+    if (!node.isAlive()) {
       node.stopDecommission();
     } else {
       stats.subtract(node);
@@ -302,6 +292,7 @@ class HeartbeatManager implements DatanodeStatistics {
    * B. Remove all blocks in PendingDataNodeMessages for the failed storage
    *    when we remove all blocks from BlocksMap for that storage.
    */
+  @VisibleForTesting
   void heartbeatCheck() {
     final DatanodeManager dm = blockManager.getDatanodeManager();
     // It's OK to check safe mode w/o taking the lock here, we re-check
@@ -354,16 +345,14 @@ class HeartbeatManager implements DatanodeStatistics {
       }
 
       allAlive = dead == null && failedStorage == null;
+      if (!allAlive && namesystem.isInStartupSafeMode()) {
+        return;
+      }
       if (dead != null) {
         // acquire the fsnamesystem lock, and then remove the dead node.
         namesystem.writeLock();
         try {
-          if (namesystem.isInStartupSafeMode()) {
-            return;
-          }
-          synchronized(this) {
-            dm.removeDeadDatanode(dead);
-          }
+          dm.removeDeadDatanode(dead);
         } finally {
           namesystem.writeUnlock();
         }
@@ -372,12 +361,7 @@ class HeartbeatManager implements DatanodeStatistics {
         // acquire the fsnamesystem lock, and remove blocks on the storage.
         namesystem.writeLock();
         try {
-          if (namesystem.isInStartupSafeMode()) {
-            return;
-          }
-          synchronized(this) {
-            blockManager.removeBlocksAssociatedTo(failedStorage);
-          }
+          blockManager.removeBlocksAssociatedTo(failedStorage);
         } finally {
           namesystem.writeUnlock();
         }
@@ -385,7 +369,6 @@ class HeartbeatManager implements DatanodeStatistics {
     }
   }
 
-
   /** Periodically check heartbeat and update block key */
   private class Monitor implements Runnable {
     private long lastHeartbeatCheck;
@@ -404,7 +387,7 @@ class HeartbeatManager implements DatanodeStatistics {
           if (blockManager.shouldUpdateBlockKey(now - lastBlockKeyUpdate)) {
             synchronized(HeartbeatManager.this) {
               for(DatanodeDescriptor d : datanodes) {
-                d.needKeyUpdate = true;
+                d.setNeedKeyUpdate(true);
               }
             }
             lastBlockKeyUpdate = now;
@@ -414,7 +397,7 @@ class HeartbeatManager implements DatanodeStatistics {
         }
         try {
           Thread.sleep(5000);  // 5 seconds
-        } catch (InterruptedException ie) {
+        } catch (InterruptedException ignored) {
         }
         // avoid declaring nodes dead for another cycle if a GC pause lasts
         // longer than the node recheck interval
@@ -425,143 +408,4 @@ class HeartbeatManager implements DatanodeStatistics {
       }
     }
   }
-
-  /** Datanode statistics.
-   * For decommissioning/decommissioned nodes, only used capacity is counted.
-   */
-  private static class Stats {
-
-    private final StorageTypeStatsMap statsMap = new StorageTypeStatsMap();
-
-    private long capacityTotal = 0L;
-    private long capacityUsed = 0L;
-    private long capacityRemaining = 0L;
-    private long blockPoolUsed = 0L;
-    private int xceiverCount = 0;
-    private long cacheCapacity = 0L;
-    private long cacheUsed = 0L;
-
-    private int nodesInService = 0;
-    private int nodesInServiceXceiverCount = 0;
-
-    private int expiredHeartbeats = 0;
-
-    private void add(final DatanodeDescriptor node) {
-      capacityUsed += node.getDfsUsed();
-      blockPoolUsed += node.getBlockPoolUsed();
-      xceiverCount += node.getXceiverCount();
-      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
-        nodesInService++;
-        nodesInServiceXceiverCount += node.getXceiverCount();
-        capacityTotal += node.getCapacity();
-        capacityRemaining += node.getRemaining();
-      } else {
-        capacityTotal += node.getDfsUsed();
-      }
-      cacheCapacity += node.getCacheCapacity();
-      cacheUsed += node.getCacheUsed();
-      Set<StorageType> storageTypes = new HashSet<>();
-      for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
-        statsMap.addStorage(storageInfo, node);
-        storageTypes.add(storageInfo.getStorageType());
-      }
-      for (StorageType storageType : storageTypes) {
-        statsMap.addNode(storageType, node);
-      }
-    }
-
-    private void subtract(final DatanodeDescriptor node) {
-      capacityUsed -= node.getDfsUsed();
-      blockPoolUsed -= node.getBlockPoolUsed();
-      xceiverCount -= node.getXceiverCount();
-      if (!(node.isDecommissionInProgress() || node.isDecommissioned())) {
-        nodesInService--;
-        nodesInServiceXceiverCount -= node.getXceiverCount();
-        capacityTotal -= node.getCapacity();
-        capacityRemaining -= node.getRemaining();
-      } else {
-        capacityTotal -= node.getDfsUsed();
-      }
-      cacheCapacity -= node.getCacheCapacity();
-      cacheUsed -= node.getCacheUsed();
-      Set<StorageType> storageTypes = new HashSet<>();
-      for (DatanodeStorageInfo storageInfo : node.getStorageInfos()) {
-        statsMap.subtractStorage(storageInfo, node);
-        storageTypes.add(storageInfo.getStorageType());
-      }
-      for (StorageType storageType : storageTypes) {
-        statsMap.subtractNode(storageType, node);
-      }
-    }
-    
-    /** Increment expired heartbeat counter. */
-    private void incrExpiredHeartbeats() {
-      expiredHeartbeats++;
-    }
-  }
-
-  /** StorageType specific statistics.
-   * For decommissioning/decommissioned nodes, only used capacity is counted.
-   */
-
-  static final class StorageTypeStatsMap {
-
-    private Map<StorageType, StorageTypeStats> storageTypeStatsMap =
-          new IdentityHashMap<>();
-
-    private StorageTypeStatsMap() {}
-
-    private StorageTypeStatsMap(StorageTypeStatsMap other) {
-      storageTypeStatsMap =
-          new IdentityHashMap<>(other.storageTypeStatsMap);
-      for (Map.Entry<StorageType, StorageTypeStats> entry :
-          storageTypeStatsMap.entrySet()) {
-        entry.setValue(new StorageTypeStats(entry.getValue()));
-      }
-    }
-
-    private Map<StorageType, StorageTypeStats> get() {
-      return Collections.unmodifiableMap(storageTypeStatsMap);
-    }
-
-    private void addNode(StorageType storageType,
-        final DatanodeDescriptor node) {
-      StorageTypeStats storageTypeStats =
-          storageTypeStatsMap.get(storageType);
-      if (storageTypeStats == null) {
-        storageTypeStats = new StorageTypeStats();
-        storageTypeStatsMap.put(storageType, storageTypeStats);
-      }
-      storageTypeStats.addNode(node);
-    }
-
-    private void addStorage(final DatanodeStorageInfo info,
-        final DatanodeDescriptor node) {
-      StorageTypeStats storageTypeStats =
-          storageTypeStatsMap.get(info.getStorageType());
-      if (storageTypeStats == null) {
-        storageTypeStats = new StorageTypeStats();
-        storageTypeStatsMap.put(info.getStorageType(), storageTypeStats);
-      }
-      storageTypeStats.addStorage(info, node);
-    }
-
-    private void subtractStorage(final DatanodeStorageInfo info,
-        final DatanodeDescriptor node) {
-      StorageTypeStats storageTypeStats =
-          storageTypeStatsMap.get(info.getStorageType());
-      if (storageTypeStats != null) {
-        storageTypeStats.subtractStorage(info, node);
-      }
-    }
-
-    private void subtractNode(StorageType storageType,
-        final DatanodeDescriptor node) {
-      StorageTypeStats storageTypeStats =
-          storageTypeStatsMap.get(storageType);
-      if (storageTypeStats != null) {
-        storageTypeStats.subtractNode(node);
-      }
-    }
-  }
 }

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReplicaUnderConstruction.java

@@ -86,7 +86,7 @@ class ReplicaUnderConstruction extends Block {
    * Is data-node the replica belongs to alive.
    */
   boolean isAlive() {
-    return expectedLocation.getDatanodeDescriptor().isAlive;
+    return expectedLocation.getDatanodeDescriptor().isAlive();
   }
 
   @Override // Block

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java

@@ -934,7 +934,7 @@ public final class CacheManager {
     try {
       final DatanodeDescriptor datanode = 
           blockManager.getDatanodeManager().getDatanode(datanodeID);
-      if (datanode == null || !datanode.isAlive) {
+      if (datanode == null || !datanode.isAlive()) {
         throw new IOException(
             "processCacheReport from dead or unregistered datanode: " +
             datanode);

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java

@@ -1762,7 +1762,7 @@ public class DFSTestUtil {
         FSNamesystem namesystem = cluster.getNamesystem();
         final DatanodeDescriptor dd = BlockManagerTestUtil.getDatanode(
             namesystem, nodeID);
-        return (dd.isAlive == alive);
+        return (dd.isAlive() == alive);
       }
     }, 100, waitTime);
   }

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java

@@ -594,7 +594,7 @@ public class TestBlockManager {
   public void testSafeModeIBR() throws Exception {
     DatanodeDescriptor node = spy(nodes.get(0));
     DatanodeStorageInfo ds = node.getStorageInfos()[0];
-    node.isAlive = true;
+    node.setAlive(true);
 
     DatanodeRegistration nodeReg =
         new DatanodeRegistration(node, null, null, "");
@@ -639,7 +639,7 @@ public class TestBlockManager {
     DatanodeDescriptor node = spy(nodes.get(0));
     DatanodeStorageInfo ds = node.getStorageInfos()[0];
 
-    node.isAlive = true;
+    node.setAlive(true);
 
     DatanodeRegistration nodeReg =
         new DatanodeRegistration(node, null, null, "");
@@ -671,7 +671,7 @@ public class TestBlockManager {
 
     DatanodeDescriptor node = nodes.get(0);
     DatanodeStorageInfo ds = node.getStorageInfos()[0];
-    node.isAlive = true;
+    node.setAlive(true);
     DatanodeRegistration nodeReg =  new DatanodeRegistration(node, null, null, "");
 
     // register new node

+ 3 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockUnderConstructionFeature.java

@@ -38,7 +38,9 @@ public class TestBlockUnderConstructionFeature {
     DatanodeStorageInfo s3 = DFSTestUtil.createDatanodeStorageInfo("10.10.1.3", "s3");
     DatanodeDescriptor dd3 = s3.getDatanodeDescriptor();
 
-    dd1.isAlive = dd2.isAlive = dd3.isAlive = true;
+    dd1.setAlive(true);
+    dd2.setAlive(true);
+    dd3.setAlive(true);
     BlockInfoContiguous blockInfo = new BlockInfoContiguous(
         new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3);
     blockInfo.convertToBlockUnderConstruction(BlockUCState.UNDER_CONSTRUCTION,

+ 3 - 3
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeDescriptor.java

@@ -66,7 +66,7 @@ public class TestDatanodeDescriptor {
     assertTrue(storages[0].addBlock(blk) == AddBlockResult.ADDED);
     assertEquals(1, dd.numBlocks());
     // remove a non-existent block
-    assertFalse(dd.removeBlock(blk1));
+    assertFalse(BlocksMap.removeBlock(dd, blk1));
     assertEquals(1, dd.numBlocks());
     // add an existent block
     assertFalse(storages[0].addBlock(blk) == AddBlockResult.ADDED);
@@ -75,10 +75,10 @@ public class TestDatanodeDescriptor {
     assertTrue(storages[0].addBlock(blk1) == AddBlockResult.ADDED);
     assertEquals(2, dd.numBlocks());
     // remove first block
-    assertTrue(dd.removeBlock(blk));
+    assertTrue(BlocksMap.removeBlock(dd, blk));
     assertEquals(1, dd.numBlocks());
     // remove second block
-    assertTrue(dd.removeBlock(blk1));
+    assertTrue(BlocksMap.removeBlock(dd, blk1));
     assertEquals(0, dd.numBlocks());    
   }
 }