Bladeren bron

HDFS-10897. Ozone: SCM: Add NodeManager. Contributed by Anu Engineer.

Anu Engineer 8 jaren geleden
bovenliggende
commit
71c9b5be7b

+ 140 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneClientUtils.java

@@ -32,6 +32,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS;
 
 /**
  * Utility methods for Ozone and Container Clients.
@@ -238,4 +244,138 @@ public final class OzoneClientUtils {
     services.put(OZONE_SCM_SERVICE_ID, serviceInstances);
     return services;
   }
+
+  /**
+   * Checks that a given value is with a range.
+   *
+   * For example, sanitizeUserArgs(17, 3, 5, 10)
+   * ensures that 17 is greater/equal than 3 * 5 and less/equal to 3 * 10.
+   *
+   * @param valueTocheck  - value to check
+   * @param baseValue     - the base value that is being used.
+   * @param minFactor     - range min - a 2 here makes us ensure that value
+   *                        valueTocheck is at least twice the baseValue.
+   * @param maxFactor     - range max
+   * @return long
+   */
+  private static long sanitizeUserArgs(long valueTocheck, long baseValue,
+                                       long minFactor, long maxFactor)
+      throws IllegalArgumentException {
+    if ((valueTocheck >= (baseValue * minFactor)) &&
+        (valueTocheck <= (baseValue * maxFactor))) {
+      return valueTocheck;
+    }
+    String errMsg = String.format("%d is not within min = %d or max = " +
+        "%d", valueTocheck, baseValue * minFactor, baseValue * maxFactor);
+    throw new IllegalArgumentException(errMsg);
+  }
+
+
+  /**
+   * Returns the interval in which the heartbeat processor thread runs.
+   *
+   * @param conf - Configuration
+   * @return long in Milliseconds.
+   */
+  public static long getScmheartbeatCheckerInterval(Configuration conf) {
+    return conf.getLong(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS,
+        OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT);
+  }
+
+
+  /**
+   * Heartbeat Interval - Defines the heartbeat frequency from a datanode to
+   * SCM.
+   *
+   * @param conf - Ozone Config
+   * @return - HB interval in seconds.
+   */
+  public static int getScmHeartbeatInterval(Configuration conf) {
+    return conf.getInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS,
+        OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT);
+  }
+
+
+  /**
+   * Get the Stale Node interval, which is used by SCM to flag a datanode as
+   * stale, if the heartbeat from that node has been missing for this duration.
+   *
+   * @param conf - Configuration.
+   * @return - Long, Milliseconds to wait before flagging a node as stale.
+   */
+  public static long getStaleNodeInterval(Configuration conf) {
+
+    long staleNodeIntevalMs = conf.getLong(OZONE_SCM_STALENODE_INTERVAL_MS,
+        OZONE_SCM_STALENODE_INTERVAL_DEFAULT);
+
+    long heartbeatThreadFrequencyMs = getScmheartbeatCheckerInterval(conf);
+
+    long heartbeatIntervalMs = getScmHeartbeatInterval(conf) * 1000;
+
+
+    // Make sure that StaleNodeInterval is configured way above the frequency
+    // at which we run the heartbeat thread.
+    //
+    // Here we check that staleNodeInterval is at least five times more than the
+    // frequency at which the accounting thread is going to run.
+    try {
+      sanitizeUserArgs(staleNodeIntevalMs, heartbeatThreadFrequencyMs, 5, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Stale Node Interval MS is cannot be honored due to " +
+              "mis-configured {}. ex:  {}",
+          OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, ex);
+      throw ex;
+    }
+
+    // Make sure that stale node value is greater than configured value that
+    // datanodes are going to send HBs.
+    try {
+      sanitizeUserArgs(staleNodeIntevalMs, heartbeatIntervalMs, 3, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Stale Node Interval MS is cannot be honored due to " +
+              "mis-configured {}. ex:  {}",
+          OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, ex);
+      throw ex;
+    }
+    return staleNodeIntevalMs;
+  }
+
+
+  /**
+   * Gets the interval for dead node flagging. This has to be a value that is
+   * greater than stale node value,  and by transitive relation we also know
+   * that this value is greater than heartbeat interval and heartbeatProcess
+   * Interval.
+   *
+   * @param conf
+   * @return
+   */
+  public static long getDeadNodeInterval(Configuration conf) {
+    long staleNodeIntervalMs = getStaleNodeInterval(conf);
+    long deadNodeIntervalMs = conf.getLong(
+        OZONE_SCM_DEADNODE_INTERVAL_MS, OZONE_SCM_DEADNODE_INTERVAL_DEFAULT);
+
+    try {
+      // Make sure that dead nodes Ms is at least twice the time for staleNodes
+      // with a max of 1000 times the staleNodes.
+      sanitizeUserArgs(deadNodeIntervalMs, staleNodeIntervalMs, 2, 1000);
+    } catch (IllegalArgumentException ex) {
+      LOG.error("Dead Node Interval MS is cannot be honored due to " +
+              "mis-configured {}. ex:  {}",
+          OZONE_SCM_STALENODE_INTERVAL_MS, ex);
+      throw ex;
+    }
+    return deadNodeIntervalMs;
+  }
+
+  /**
+   * Returns the maximum number of heartbeat to process per loop of the process
+   * thread.
+   * @param conf Configration
+   * @return - int -- Number of HBs to process
+   */
+  public static int getMaxHBToProcessPerLoop(Configuration conf){
+    return conf.getInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS,
+        OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT);
+  }
 }

+ 25 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java

@@ -70,6 +70,31 @@ public final class OzoneConfigKeys {
       "ozone.scm.handler.count.key";
   public static final int OZONE_SCM_HANDLER_COUNT_DEFAULT = 10;
 
+  public static final String OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS =
+      "ozone.scm.heartbeat.interval.seconds";
+  public static final int OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT =
+      30;
+
+  public static final String OZONE_SCM_DEADNODE_INTERVAL_MS =
+      "ozone.scm.dead.node.interval.ms";
+  public static final long OZONE_SCM_DEADNODE_INTERVAL_DEFAULT =
+      OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 20L;
+
+  public static final String OZONE_SCM_MAX_HB_COUNT_TO_PROCESS =
+      "ozone.scm.max.hb.count.to.process";
+  public static final int OZONE_SCM_MAX_HB_COUNT_TO_PROCESS_DEFAULT = 5000;
+
+  public static final String OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS =
+      "ozone.scm.heartbeat.thread.interval.ms";
+  public static final long OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS_DEFAULT =
+      3000;
+
+  public static final String OZONE_SCM_STALENODE_INTERVAL_MS =
+      "ozone.scm.stale.node.interval.ms";
+  public static final long OZONE_SCM_STALENODE_INTERVAL_DEFAULT =
+      OZONE_SCM_HEARBEAT_INTERVAL_SECONDS_DEFAULT * 1000L * 3L;
+
+
   /**
    * There is no need to instantiate this class.
    */

+ 120 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java

@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.hdfs.server.blockmanagement.UnresolvedTopologyException;
+
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * A node manager supports a simple interface for managing a datanode.
+ * <p/>
+ * 1. A datanode registers with the NodeManager.
+ * <p/>
+ * 2. If the node is allowed to register, we add that to the nodes that we need
+ * to keep track of.
+ * <p/>
+ * 3. A heartbeat is made by the node at a fixed frequency.
+ * <p/>
+ * 4. A node can be in any of these 4 states: {HEALTHY, STALE, DEAD,
+ * DECOMMISSIONED}
+ * <p/>
+ * HEALTHY - It is a datanode that is regularly heartbeating us.
+ *
+ * STALE - A datanode for which we have missed few heart beats.
+ *
+ * DEAD - A datanode that we have not heard from for a while.
+ *
+ * DECOMMISSIONED - Someone told us to remove this node from the tracking
+ * list, by calling removeNode. We will throw away this nodes info soon.
+ */
+public interface NodeManager extends Closeable, Runnable {
+
+  /**
+   * Update the heartbeat timestamp.
+   *
+   * @param datanodeID - Name of the datanode that send us heatbeat.
+   */
+  void updateHeartbeat(DatanodeID datanodeID);
+
+  /**
+   * Add a New Datanode to the NodeManager.
+   *
+   * @param nodeReg - Datanode ID.
+   * @throws UnresolvedTopologyException
+   */
+  void registerNode(DatanodeID nodeReg)
+      throws UnresolvedTopologyException;
+
+  /**
+   * Removes a data node from the management of this Node Manager.
+   *
+   * @param node - DataNode.
+   * @throws UnregisteredNodeException
+   */
+  void removeNode(DatanodeID node) throws UnregisteredNodeException;
+
+  /**
+   * Gets all Live Datanodes that is currently communicating with SCM.
+   *
+   * @return List of Datanodes that are Heartbeating SCM.
+   */
+
+  List<DatanodeID> getNodes(NODESTATE nodestate);
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM.
+   *
+   * @return int -- count
+   */
+  int getNodeCount(NODESTATE nodestate);
+
+  /**
+   * Get all datanodes known to SCM.
+   *
+   * @return List of DatanodeIDs known to SCM.
+   */
+  List<DatanodeID> getAllNodes();
+
+  /**
+   * Get the minimum number of nodes to get out of safe mode.
+   *
+   * @return int
+   */
+  int getMinimumSafeModeNodes();
+
+  /**
+   * Reports if we have exited out of safe mode by discovering enough nodes.
+   *
+   * @return True if we are out of Node layer safe mode, false otherwise.
+   */
+  boolean isOutOfNodeSafeMode();
+
+  /**
+   * Enum that represents the Node State. This is used in calls to getNodeList
+   * and getNodeCount. TODO: Add decommission when we support it.
+   */
+  enum NODESTATE {
+    HEALTHY,
+    STALE,
+    DEAD
+  }
+
+}

+ 513 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java

@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
+import org.apache.hadoop.ozone.OzoneClientUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+
+/**
+ * Maintains information about the Datanodes on SCM side.
+ * <p/>
+ * Heartbeats under SCM is very simple compared to HDFS heartbeatManager.
+ * <p/>
+ * Here we maintain 3 maps, and we propagate a node from healthyNodesMap to
+ * staleNodesMap to deadNodesMap. This moving of a node from one map to
+ * another is controlled by 4 configuration variables. These variables define
+ * how many heartbeats must go missing for the node to move from one map to
+ * another.
+ * <p/>
+ * Each heartbeat that SCMNodeManager receives is  put into heartbeatQueue. The
+ * worker thread wakes up and grabs that heartbeat from the queue. The worker
+ * thread will lookup the healthynodes map and update the timestamp if the entry
+ * is there. if not it will look up stale and deadnodes map.
+ * <p/>
+ *
+ * TODO: Replace with Node Registration code.
+ * if the node is not found in any of these tables it is treated as new node for
+ * time being and added to the healthy nodes list.
+ *
+ * <p/>
+ *
+ * The getNode(byState) functions make copy of node maps and then creates a
+ * list based on that. It should be assumed that these get functions always
+ * report *stale* information. For example, getting the deadNodeCount
+ * followed by
+ * getNodes(DEAD) could very well produce totally different count. Also
+ * getNodeCount(HEALTHY) + getNodeCount(DEAD) + getNodeCode(STALE), is not
+ * guaranteed to add up to the total nodes that we know off. Please treat all
+ * get functions in this file as a snap-shot of information that is
+ * inconsistent as soon as you read it.
+ */
+public class SCMNodeManager implements NodeManager {
+
+  @VisibleForTesting
+  static final Logger LOG =
+      LoggerFactory.getLogger(SCMNodeManager.class);
+  /**
+   * Key = NodeID, value = timestamp.
+   */
+  private final Map<String, Long> healthyNodes;
+  private final Map<String, Long> staleNodes;
+  private final Map<String, Long> deadNodes;
+  private final Queue<DatanodeID> heartbeatQueue;
+  private final Map<String, DatanodeID> nodes;
+  private final AtomicInteger healthyNodeCount;
+  private final AtomicInteger staleNodeCount;
+  private final AtomicInteger deadNodeCount;
+  private final AtomicInteger totalNodes;
+  private final long staleNodeIntervalMs;
+  private final long deadNodeIntervalMs;
+  private final long heartbeatCheckerIntervalMs;
+  private final long datanodeHBIntervalSeconds;
+  private final ScheduledExecutorService executorService;
+  private long lastHBcheckStart;
+  private long lastHBcheckFinished = 0;
+  private long lastHBProcessedCount;
+  private int safeModeNodeCount;
+  private final int maxHBToProcessPerLoop;
+
+  /**
+   * Constructs SCM machine Manager.
+   */
+  public SCMNodeManager(Configuration conf) {
+    heartbeatQueue = new ConcurrentLinkedQueue<>();
+    healthyNodes = new ConcurrentHashMap<>();
+    deadNodes = new ConcurrentHashMap<>();
+    staleNodes = new ConcurrentHashMap<>();
+    nodes = new HashMap<>();
+
+    healthyNodeCount = new AtomicInteger(0);
+    staleNodeCount = new AtomicInteger(0);
+    deadNodeCount = new AtomicInteger(0);
+    totalNodes = new AtomicInteger(0);
+
+    // TODO: Support this value as a Percentage of known machines.
+    safeModeNodeCount = 1;
+
+    staleNodeIntervalMs = OzoneClientUtils.getStaleNodeInterval(conf);
+    deadNodeIntervalMs = OzoneClientUtils.getDeadNodeInterval(conf);
+    heartbeatCheckerIntervalMs =
+        OzoneClientUtils.getScmheartbeatCheckerInterval(conf);
+    datanodeHBIntervalSeconds = OzoneClientUtils.getScmHeartbeatInterval(conf);
+    maxHBToProcessPerLoop = OzoneClientUtils.getMaxHBToProcessPerLoop(conf);
+
+    executorService = HadoopExecutors.newScheduledThreadPool(1,
+        new ThreadFactoryBuilder().setDaemon(true)
+            .setNameFormat("SCM Heartbeat Processing Thread - %d").build());
+
+    Preconditions.checkState(heartbeatCheckerIntervalMs > 0);
+    executorService.schedule(this, heartbeatCheckerIntervalMs,
+        TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Add a New Datanode to the NodeManager. This function is invoked with
+   * synchronised(this) being held.
+   *
+   * @param nodeReg - node to register
+   */
+  @Override
+  public void registerNode(DatanodeID nodeReg) {
+    if (nodes.containsKey(nodeReg.getDatanodeUuid())) {
+      LOG.error("Datanode is already registered. Datanode: {}",
+          nodeReg.toString());
+      return;
+    }
+    nodes.put(nodeReg.getDatanodeUuid(), nodeReg);
+    totalNodes.incrementAndGet();
+    healthyNodes.put(nodeReg.getDatanodeUuid(), monotonicNow());
+    healthyNodeCount.incrementAndGet();
+    LOG.info("Data node with ID: {} Registered.", nodeReg.getDatanodeUuid());
+  }
+
+  /**
+   * Register the heartbeat with Machine Manager.
+   *
+   * This requires no synchronization since the heartbeat queue is
+   * ConcurrentLinkedQueue. Hence we don't protect it specifically via a lock.
+   *
+   * @param datanodeID - Name of the datanode that send us heartbeat.
+   */
+  @Override
+  public void updateHeartbeat(DatanodeID datanodeID) {
+    // Checking for NULL to make sure that we don't get
+    // an exception from ConcurrentList.
+    // This could be a problem in tests, if this function is invoked via
+    // protobuf, transport layer will guarantee that this is not null.
+    if (datanodeID != null) {
+      heartbeatQueue.add(datanodeID);
+      return;
+    }
+    LOG.error("Datanode ID in heartbeat is null");
+  }
+
+  /**
+   * Removes a data node from the management of this Node Manager.
+   *
+   * @param node - DataNode.
+   * @throws UnregisteredNodeException
+   */
+  @Override
+  public void removeNode(DatanodeID node) throws UnregisteredNodeException {
+    // TODO : Fix me.
+
+  }
+
+  /**
+   * Gets all datanodes that are in a certain state. This function works by
+   * taking a snapshot of the current collection and then returning the list
+   * from that collection. This means that real map might have changed by the
+   * time we return this list.
+   *
+   * @return List of Datanodes that are known to SCM in the requested state.
+   */
+  @Override
+  public List<DatanodeID> getNodes(NODESTATE nodestate)
+      throws IllegalArgumentException{
+    Map<String, Long> set;
+    switch (nodestate) {
+    case HEALTHY:
+      synchronized (this) {
+        set = Collections.unmodifiableMap(new HashMap<>(healthyNodes));
+      }
+      break;
+    case STALE:
+      synchronized (this) {
+        set = Collections.unmodifiableMap(new HashMap<>(staleNodes));
+      }
+      break;
+    case DEAD:
+      synchronized (this) {
+        set = Collections.unmodifiableMap(new HashMap<>(deadNodes));
+      }
+      break;
+    default:
+      throw new IllegalArgumentException("Unknown node state requested.");
+    }
+
+    return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
+            .collect(Collectors.toList());
+  }
+
+  /**
+   * Returns all datanodes that are known to SCM.
+   *
+   * @return List of DatanodeIDs
+   */
+  @Override
+  public List<DatanodeID> getAllNodes() {
+    Map<String, DatanodeID> set;
+    synchronized (this) {
+      set = Collections.unmodifiableMap(new HashMap<>(nodes));
+    }
+    return set.entrySet().stream().map(entry -> nodes.get(entry.getKey()))
+            .collect(Collectors.toList());
+  }
+
+  /**
+   * Get the minimum number of nodes to get out of safe mode.
+   *
+   * @return int
+   */
+  @Override
+  public int getMinimumSafeModeNodes() {
+    return safeModeNodeCount;
+  }
+
+  /**
+   * Sets the Minimum SafeModeNode count, used only in testing.
+   *
+   * @param count  - Number of nodes.
+   */
+  @VisibleForTesting
+  public void setMinimumSafeModeNodes(int count) {
+    safeModeNodeCount = count;
+  }
+
+  /**
+   * Reports if we have exited out of safe mode.
+   *
+   * @return true if we are out of safe mode.
+   */
+  @Override
+  public boolean isOutOfNodeSafeMode() {
+    LOG.trace("Node count : {}", totalNodes.get());
+
+    //TODO : Support a boolean to force getting out of Safe mode.
+    return (totalNodes.get() >= getMinimumSafeModeNodes());
+  }
+
+  /**
+   * Returns the Number of Datanodes by State they are in.
+   *
+   * @return int -- count
+   */
+  @Override
+  public int getNodeCount(NODESTATE nodestate) {
+    switch (nodestate) {
+    case HEALTHY:
+      return healthyNodeCount.get();
+    case STALE:
+      return staleNodeCount.get();
+    case DEAD:
+      return deadNodeCount.get();
+    default:
+      throw new IllegalArgumentException("Unknown node state requested.");
+    }
+  }
+
+  /**
+   * Used for testing.
+   * @return true if the HB check is done.
+   */
+  @VisibleForTesting
+  public boolean waitForHeartbeatThead() {
+    return lastHBcheckFinished != 0;
+  }
+
+  /**
+   * This is the real worker thread that processes the HB queue. We do the
+   * following things in this thread.
+   *
+   * Process the Heartbeats that are in the HB Queue.
+   * Move Stale or Dead node to healthy if we got a heartbeat from them.
+   * Move Stales Node to dead node table if it is needed.
+   * Move healthy nodes to stale nodes if it is needed.
+   *
+   * if it is a new node, we call register node and add it to the list of nodes.
+   * This will be replaced when we support registration of a node in SCM.
+   * @see Thread#run()
+   */
+  @Override
+  public void run() {
+    lastHBcheckStart = monotonicNow();
+    lastHBProcessedCount = 0;
+
+    // Process the whole queue.
+    while (!heartbeatQueue.isEmpty() &&
+        (lastHBProcessedCount < maxHBToProcessPerLoop)) {
+      DatanodeID datanodeID = heartbeatQueue.poll();
+      synchronized (this) {
+        handleHeartbeat(datanodeID);
+      }
+      // we are shutting down or something give up processing the rest of
+      // HBs. This will terminate the HB processing thread.
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.info("Current Thread is isInterrupted, shutting down HB " +
+            "processing thread for Node Manager.");
+        return;
+      }
+    }
+
+    if (lastHBProcessedCount >= maxHBToProcessPerLoop) {
+      LOG.error("SCM is being flooded by heartbeats. Not able to keep up with" +
+          " the heartbeat counts. Processed {} heartbeats. Breaking out of" +
+          " loop. Leaving rest to be processed later. ", lastHBProcessedCount);
+    }
+
+    // Iterate over the Stale nodes and decide if we need to move any node to
+    // dead State.
+    long currentTime = monotonicNow();
+    for (Map.Entry<String, Long> entry : staleNodes.entrySet()) {
+      if (currentTime - entry.getValue() > deadNodeIntervalMs) {
+        synchronized (this) {
+          moveStaleNodeToDead(entry);
+        }
+      }
+    }
+
+    // Iterate over the healthy nodes and decide if we need to move any node to
+    // Stale State.
+    currentTime = monotonicNow();
+    for (Map.Entry<String, Long> entry : healthyNodes.entrySet()) {
+      if (currentTime - entry.getValue() > staleNodeIntervalMs) {
+        synchronized (this) {
+          moveHealthyNodeToStale(entry);
+        }
+      }
+    }
+    lastHBcheckFinished = monotonicNow();
+
+    monitorHBProcessingTime();
+
+    // we purposefully make this non-deterministic. Instead of using a
+    // scheduleAtFixedFrequency  we will just go to sleep
+    // and wake up at the next rendezvous point, which is currentTime +
+    // heartbeatCheckerIntervalMs. This leads to the issue that we are now
+    // heart beating not at a fixed cadence, but clock tick + time taken to
+    // work.
+    //
+    // This time taken to work can skew the heartbeat processor thread.
+    // The reason why we don't care is because of the following reasons.
+    //
+    // 1. checkerInterval is general many magnitudes faster than datanode HB
+    // frequency.
+    //
+    // 2. if we have too much nodes, the SCM would be doing only HB
+    // processing, this could lead to SCM's CPU starvation. With this
+    // approach we always guarantee that  HB thread sleeps for a little while.
+    //
+    // 3. It is possible that we will never finish processing the HB's in the
+    // thread. But that means we have a mis-configured system. We will warn
+    // the users by logging that information.
+    //
+    // 4. And the most important reason, heartbeats are not blocked even if
+    // this thread does not run, they will go into the processing queue.
+
+    if (!Thread.currentThread().isInterrupted()) {
+      executorService.schedule(this, heartbeatCheckerIntervalMs, TimeUnit
+          .MILLISECONDS);
+    } else {
+      LOG.info("Current Thread is interrupted, shutting down HB processing " +
+          "thread for Node Manager.");
+    }
+  }
+
+  /**
+   * If we have taken too much time for HB processing, log that information.
+   */
+  private void monitorHBProcessingTime() {
+    if (TimeUnit.MILLISECONDS.toSeconds(lastHBcheckFinished -
+        lastHBcheckStart) > datanodeHBIntervalSeconds) {
+      LOG.error("Total time spend processing datanode HB's is greater than " +
+              "configured values for datanode heartbeats. Please adjust the" +
+              " heartbeat configs. Time Spend on HB processing: {} seconds " +
+              "Datanode heartbeat Interval: {} seconds , heartbeats " +
+              "processed: {}",
+          TimeUnit.MILLISECONDS
+              .toSeconds(lastHBcheckFinished - lastHBcheckStart),
+          datanodeHBIntervalSeconds, lastHBProcessedCount);
+    }
+  }
+
+  /**
+   * Moves a Healthy node to a Stale node state.
+   *
+   * @param entry - Map Entry
+   */
+  private void moveHealthyNodeToStale(Map.Entry<String, Long> entry) {
+    LOG.trace("Moving healthy node to stale: {}", entry.getKey());
+    healthyNodes.remove(entry.getKey());
+    healthyNodeCount.decrementAndGet();
+    staleNodes.put(entry.getKey(), entry.getValue());
+    staleNodeCount.incrementAndGet();
+  }
+
+  /**
+   * Moves a Stale node to a dead node state.
+   *
+   * @param entry - Map Entry
+   */
+  private void moveStaleNodeToDead(Map.Entry<String, Long> entry) {
+    LOG.trace("Moving stale node to dead: {}", entry.getKey());
+    staleNodes.remove(entry.getKey());
+    staleNodeCount.decrementAndGet();
+    deadNodes.put(entry.getKey(), entry.getValue());
+    deadNodeCount.incrementAndGet();
+  }
+
+  /**
+   * Handles a single heartbeat from a datanode.
+   *
+   * @param datanodeID - datanode ID.
+   */
+  private void handleHeartbeat(DatanodeID datanodeID) {
+    lastHBProcessedCount++;
+
+    // If this node is already in the list of known and healthy nodes
+    // just update the last timestamp and return.
+    if (healthyNodes.containsKey(datanodeID.getDatanodeUuid())) {
+      healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+      return;
+    }
+
+    // A stale node has heartbeat us we need to remove the node from stale
+    // list and move to healthy list.
+    if (staleNodes.containsKey(datanodeID.getDatanodeUuid())) {
+      staleNodes.remove(datanodeID.getDatanodeUuid());
+      healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+      healthyNodeCount.incrementAndGet();
+      staleNodeCount.decrementAndGet();
+      return;
+    }
+
+    // A dead node has heartbeat us, we need to remove that node from dead
+    // node list and move it to the healthy list.
+    if (deadNodes.containsKey(datanodeID.getDatanodeUuid())) {
+      deadNodes.remove(datanodeID.getDatanodeUuid());
+      healthyNodes.put(datanodeID.getDatanodeUuid(), monotonicNow());
+      deadNodeCount.decrementAndGet();
+      healthyNodeCount.incrementAndGet();
+      return;
+    }
+
+    registerNode(datanodeID);
+  }
+
+  /**
+   * Closes this stream and releases any system resources associated with it. If
+   * the stream is already closed then invoking this method has no effect.
+   *
+   * @throws IOException if an I/O error occurs
+   */
+  @Override
+  public void close() throws IOException {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        executorService.shutdownNow();
+      }
+
+      if (!executorService.awaitTermination(5, TimeUnit.SECONDS)) {
+        LOG.error("Unable to shutdown NodeManager properly.");
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @VisibleForTesting
+  long getLastHBProcessedCount() {
+    return lastHBProcessedCount;
+  }
+
+}

+ 31 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/package-info.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+/**
+ * The node package deals with node management.
+ * <p/>
+ * The node manager takes care of node registrations, removal of node and
+ * handling of heartbeats.
+ * <p/>
+ * The node manager maintains statistics that gets send as part of
+ * heartbeats.
+ * <p/>
+ * The container manager polls the node manager to learn the state of
+ * datanodes  that it is interested in.
+ * <p/>
+ */

+ 22 - 0
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/package-info.java

@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.scm;
+
+/**
+ * This package contains Storage Container Manager classes.
+ */

+ 864 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/node/TestNodeManager.java

@@ -0,0 +1,864 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.hamcrest.CoreMatchers;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_DEADNODE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_MAX_HB_COUNT_TO_PROCESS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_STALENODE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.HEALTHY;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.STALE;
+import static org.apache.hadoop.ozone.scm.node.NodeManager.NODESTATE.DEAD;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.StringStartsWith.startsWith;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test the Node Manager class.
+ */
+public class TestNodeManager {
+  @Rule
+  public ExpectedException thrown = ExpectedException.none();
+
+  @BeforeClass
+  public static void init() throws IOException {
+  }
+
+  /**
+   * Returns a new copy of Configuration.
+   *
+   * @return Config
+   */
+  Configuration getConf() {
+    return new OzoneConfiguration();
+  }
+
+  /**
+   * Create a new datanode ID.
+   *
+   * @return DatanodeID
+   */
+  DatanodeID getDatanodeID() {
+    return getDatanodeID(UUID.randomUUID().toString());
+  }
+
+  /**
+   * Create a new DatanodeID with NodeID set to the string.
+   *
+   * @param uuid - node ID, it is generally UUID.
+   * @return DatanodeID.
+   */
+  DatanodeID getDatanodeID(String uuid) {
+    Random random = new Random();
+    String ipAddress = random.nextInt(256) + "."
+        + random.nextInt(256) + "."
+        + random.nextInt(256) + "."
+        + random.nextInt(256);
+
+    String hostName = RandomStringUtils.randomAscii(8);
+    return new DatanodeID(ipAddress, hostName, uuid,
+        0, 0, 0, 0);
+  }
+
+  /**
+   * Creates a NodeManager.
+   *
+   * @param config - Config for the node manager.
+   * @return SCNNodeManager
+   * @throws IOException
+   */
+
+  SCMNodeManager createNodeManager(Configuration config) throws IOException {
+    SCMNodeManager nodeManager = new SCMNodeManager(config);
+    assertFalse("Node manager should be in safe mode",
+        nodeManager.isOutOfNodeSafeMode());
+    return nodeManager;
+  }
+
+  /**
+   * Tests that Node manager handles heartbeats correctly, and comes out of Safe
+   * Mode.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmHeartbeat() throws IOException,
+      InterruptedException, TimeoutException {
+
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+
+      // Send some heartbeats from different nodes.
+      for (int x = 0; x < nodeManager.getMinimumSafeModeNodes(); x++) {
+        nodeManager.updateHeartbeat(getDatanodeID());
+      }
+
+      // Wait for 4 seconds max.
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+          4 * 1000);
+
+      assertTrue("Heartbeat thread should have picked up the scheduled " +
+              "heartbeats and transitioned out of safe mode.",
+          nodeManager.isOutOfNodeSafeMode());
+    }
+  }
+
+  /**
+   * asserts that if we send no heartbeats node manager stays in safemode.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmNoHeartbeats() throws IOException,
+      InterruptedException, TimeoutException {
+
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+          4 * 1000);
+      assertFalse("No heartbeats, Node manager should have been in safe mode.",
+          nodeManager.isOutOfNodeSafeMode());
+    }
+  }
+
+  /**
+   * Asserts that if we don't get enough unique nodes we stay in safemode.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmNotEnoughHeartbeats() throws IOException,
+      InterruptedException, TimeoutException {
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+
+      // Need 100 nodes to come out of safe mode, only one node is sending HB.
+      nodeManager.setMinimumSafeModeNodes(100);
+      nodeManager.updateHeartbeat(getDatanodeID());
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+          4 * 1000);
+      assertFalse("Not enough heartbeat, Node manager should have been in " +
+          "safemode.", nodeManager.isOutOfNodeSafeMode());
+    }
+  }
+
+  /**
+   * Asserts that many heartbeat from the same node is counted as a single
+   * node.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmSameNodeHeartbeats() throws IOException,
+      InterruptedException, TimeoutException {
+
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      nodeManager.setMinimumSafeModeNodes(3);
+      DatanodeID datanodeID = getDatanodeID();
+
+      // Send 10 heartbeat from same node, and assert we never leave safe mode.
+      for (int x = 0; x < 10; x++) {
+        nodeManager.updateHeartbeat(datanodeID);
+      }
+
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+          4 * 1000);
+      assertFalse("Not enough nodes have send heartbeat to node manager.",
+          nodeManager.isOutOfNodeSafeMode());
+    }
+  }
+
+  /**
+   * Asserts that adding heartbeats after shutdown does not work. This implies
+   * that heartbeat thread has been shutdown safely by closing the node
+   * manager.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmShutdown() throws IOException, InterruptedException,
+      TimeoutException {
+    Configuration conf = getConf();
+    conf.setInt(OzoneConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
+    SCMNodeManager nodeManager = createNodeManager(conf);
+    nodeManager.close();
+
+    // These should never be processed.
+    nodeManager.updateHeartbeat(getDatanodeID());
+
+    // Let us just wait for 2 seconds to prove that HBs are not processed.
+    Thread.sleep(2 * 1000);
+
+    assertFalse("Node manager executor service is shutdown, should never exit" +
+        " safe mode", nodeManager.isOutOfNodeSafeMode());
+
+    assertEquals("Assert new HBs were never processed", 0,
+        nodeManager.getLastHBProcessedCount());
+  }
+
+  /**
+   * Asserts that we detect as many healthy nodes as we have generated heartbeat
+   * for.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmHealthyNodeCount() throws IOException,
+      InterruptedException, TimeoutException {
+    Configuration conf = getConf();
+    final int count = 10;
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      for (int x = 0; x < count; x++) {
+        nodeManager.updateHeartbeat(getDatanodeID());
+      }
+      GenericTestUtils.waitFor(() -> nodeManager.waitForHeartbeatThead(), 100,
+          4 * 1000);
+      assertEquals(count, nodeManager.getNodeCount(HEALTHY));
+    }
+  }
+
+  /**
+   * Asserts that if user provides a value less than 5 times the heartbeat
+   * interval as the StaleNode Value, we throw since that is a QoS that we
+   * cannot maintain.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+
+  @Test
+  public void testScmSanityOfUserConfig1() throws IOException,
+      InterruptedException, TimeoutException {
+    Configuration conf = getConf();
+    final int interval = 100;
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+
+    // This should be 5 times more than  OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS
+    // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, interval);
+
+    thrown.expect(IllegalArgumentException.class);
+
+    // This string is a multiple of the interval value
+    thrown.expectMessage(
+        startsWith("100 is not within min = 500 or max = 100000"));
+    createNodeManager(conf);
+  }
+
+  /**
+   * Asserts that if Stale Interval value is more than 5 times the value of HB
+   * processing thread it is a sane value.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmSanityOfUserConfig2() throws IOException,
+      InterruptedException, TimeoutException {
+    Configuration conf = getConf();
+    final int interval = 100;
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+
+    // This should be 5 times more than  OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS
+    // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+    createNodeManager(conf).close();
+  }
+
+  /**
+   * Asserts that a single node moves from Healthy to stale node if it misses
+   * the heartbeat.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmDetectStaleNode() throws IOException,
+      InterruptedException, TimeoutException {
+    Configuration conf = getConf();
+    final int interval = 100;
+    final int nodeCount = 10;
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+    // This should be 5 times more than  OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS
+    // and 3 times more than OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeID> nodeList = new LinkedList<>();
+      DatanodeID staleNode = getDatanodeID();
+      for (int x = 0; x < nodeCount; x++) {
+        nodeList.add(getDatanodeID());
+      }
+      // Heartbeat once
+      nodeManager.updateHeartbeat(staleNode);
+
+      // Heartbeat all other nodes.
+      nodeList.forEach(nodeManager::updateHeartbeat);
+
+      // Wait for 2 seconds .. and heartbeat good nodes again.
+      Thread.sleep(2 * 1000);
+      nodeList.forEach(nodeManager::updateHeartbeat);
+
+      // Wait for 2 more seconds, 3 seconds is the stale window for this test
+      Thread.sleep(2 * 1000);
+
+      List<DatanodeID> staleNodeList = nodeManager.getNodes(NodeManager
+          .NODESTATE.STALE);
+      assertEquals("Expected to find 1 stale node", 1, nodeManager
+          .getNodeCount(STALE));
+      assertEquals("Expected to find 1 stale node", 1, staleNodeList.size());
+      assertEquals("Stale node is not the expected ID", staleNode
+          .getDatanodeUuid(), staleNodeList.get(0).getDatanodeUuid());
+    }
+  }
+
+  /**
+   * Asserts that a single node moves from Healthy to dead node if it misses
+   * enough heartbeats.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmDetectDeadNode() throws IOException,
+      InterruptedException, TimeoutException {
+    final int interval = 100;
+    final int nodeCount = 10;
+
+    Configuration conf = getConf();
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, interval);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+    conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
+
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      List<DatanodeID> nodeList = new LinkedList<>();
+
+      DatanodeID deadNode = getDatanodeID();
+      for (int x = 0; x < nodeCount; x++) {
+        nodeList.add(getDatanodeID());
+      }
+      // Heartbeat once
+      nodeManager.updateHeartbeat(deadNode);
+
+      // Heartbeat all other nodes.
+      nodeList.forEach(nodeManager::updateHeartbeat);
+
+      // Wait for 2 seconds .. and heartbeat good nodes again.
+      Thread.sleep(2 * 1000);
+
+      nodeList.forEach(nodeManager::updateHeartbeat);
+      Thread.sleep(3 * 1000);
+
+      // heartbeat good nodes again.
+      nodeList.forEach(nodeManager::updateHeartbeat);
+
+      //  6 seconds is the dead window for this test , so we wait a total of
+      // 7 seconds to make sure that the node moves into dead state.
+      Thread.sleep(2 * 1000);
+
+      // Check for the dead node now.
+      List<DatanodeID> deadNodeList = nodeManager
+          .getNodes(DEAD);
+      assertEquals("Expected to find 1 dead node", 1, nodeManager
+          .getNodeCount(DEAD));
+      assertEquals("Expected to find 1 dead node", 1, deadNodeList.size());
+      assertEquals("Dead node is not the expected ID", deadNode
+          .getDatanodeUuid(), deadNodeList.get(0).getDatanodeUuid());
+    }
+  }
+
+  /**
+   * Asserts that if we get duplicate registration calls for a datanode, we will
+   * ignore it and LOG the error.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmDuplicateRegistrationLogsError() throws IOException,
+      InterruptedException, TimeoutException {
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      GenericTestUtils.LogCapturer logCapturer =
+          GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
+      DatanodeID duplicateNodeID = getDatanodeID();
+      nodeManager.registerNode(duplicateNodeID);
+      nodeManager.registerNode(duplicateNodeID);
+      logCapturer.stopCapturing();
+      assertThat(logCapturer.getOutput(), containsString("Datanode is already" +
+          " registered."));
+    }
+  }
+
+  /**
+   * Asserts that we log an error for null in datanode ID.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmLogErrorOnNullDatanode() throws IOException,
+      InterruptedException, TimeoutException {
+    try (SCMNodeManager nodeManager = createNodeManager(getConf())) {
+      GenericTestUtils.LogCapturer logCapturer =
+          GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
+      nodeManager.updateHeartbeat(null);
+      logCapturer.stopCapturing();
+      assertThat(logCapturer.getOutput(), containsString("Datanode ID in " +
+          "heartbeat is null"));
+    }
+  }
+
+  /**
+   * Asserts that a dead node, stale node and healthy nodes co-exist. The counts
+   * , lists and node ID match the expected node state.
+   * <p/>
+   * This test is pretty complicated because it explores all states of Node
+   * manager in a single test. Please read thru the comments to get an idea of
+   * the current state of the node Manager.
+   * <p/>
+   * This test is written like a state machine to avoid threads and concurrency
+   * issues. This test is replicated below with the use of threads. Avoiding
+   * threads make it easy to debug the state machine.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmClusterIsInExpectedState1() throws IOException,
+      InterruptedException, TimeoutException {
+
+    DatanodeID healthyNode = getDatanodeID("HealthyNode");
+    DatanodeID staleNode = getDatanodeID("StaleNode");
+    DatanodeID deadNode = getDatanodeID("DeadNode");
+
+    /**
+     * These values are very important. Here is what it means so you don't
+     * have to look it up while reading this code.
+     *
+     *  OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS - This the frequency of the
+     *  HB processing thread that is running in the SCM. This thread must run
+     *  for the SCM  to process the Heartbeats.
+     *
+     *  OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS - This is the frequency at which
+     *  datanodes will send heartbeats to SCM. Please note: This is the only
+     *  config value for node manager that is specified in seconds. We don't
+     *  want SCM heartbeat resolution to be more than in seconds.
+     *  In this test it is not used, but we are forced to set it because we
+     *  have validation code that checks Stale Node interval and Dead Node
+     *  interval is larger than the value of
+     *  OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS.
+     *
+     *  OZONE_SCM_STALENODE_INTERVAL_MS - This is the time that must elapse
+     *  from the last heartbeat for us to mark a node as stale. In this test
+     *  we set that to 3. That is if a node has not heartbeat SCM for last 3
+     *  seconds we will mark it as stale.
+     *
+     *  OZONE_SCM_DEADNODE_INTERVAL_MS - This is the time that must elapse
+     *  from the last heartbeat for a node to be marked dead. We have an
+     *  additional constraint that this must be at least 2 times bigger than
+     *  Stale node Interval.
+     *
+     *  With these we are trying to explore the state of this cluster with
+     *  various timeouts. Each section is commented so that you can keep
+     *  track of the state of the cluster nodes.
+     *
+     */
+
+    Configuration conf = getConf();
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+    conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
+
+
+    /**
+     * Cluster state: Healthy: All nodes are heartbeat-ing like normal.
+     */
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      nodeManager.updateHeartbeat(healthyNode);
+      nodeManager.updateHeartbeat(staleNode);
+      nodeManager.updateHeartbeat(deadNode);
+
+      // Sleep so that heartbeat processing thread gets to run.
+      Thread.sleep(500);
+
+      //Assert all nodes are healthy.
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(3, nodeManager.getNodeCount(HEALTHY));
+
+      /**
+       * Cluster state: Quiesced: We are going to sleep for 3 seconds. Which
+       * means that no node is heartbeating. All nodes should move to Stale.
+       */
+      Thread.sleep(3 * 1000);
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(3, nodeManager.getNodeCount(STALE));
+
+
+      /**
+       * Cluster State : Move healthy node back to healthy state, move other 2
+       * nodes to Stale State.
+       *
+       * We heartbeat healthy node after 1 second and let other 2 nodes elapse
+       * the 3 second windows.
+       */
+
+      nodeManager.updateHeartbeat(healthyNode);
+      nodeManager.updateHeartbeat(staleNode);
+      nodeManager.updateHeartbeat(deadNode);
+
+      Thread.sleep(1500);
+      nodeManager.updateHeartbeat(healthyNode);
+      Thread.sleep(2 * 1000);
+      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+
+
+      // 3.5 seconds from last heartbeat for the stale and deadNode. So those
+      //  2 nodes must move to Stale state and the healthy node must
+      // remain in the healthy State.
+      List<DatanodeID> healthyList = nodeManager.getNodes(HEALTHY);
+      assertEquals("Expected one healthy node", 1, healthyList.size());
+      assertEquals("Healthy node is not the expected ID", healthyNode
+          .getDatanodeUuid(), healthyList.get(0).getDatanodeUuid());
+
+      assertEquals(2, nodeManager.getNodeCount(STALE));
+
+      /**
+       * Cluster State: Allow healthyNode to remain in healthy state and
+       * staleNode to move to stale state and deadNode to move to dead state.
+       */
+
+      nodeManager.updateHeartbeat(healthyNode);
+      nodeManager.updateHeartbeat(staleNode);
+      Thread.sleep(1500);
+      nodeManager.updateHeartbeat(healthyNode);
+      Thread.sleep(2 * 1000);
+
+      // 3.5 seconds have elapsed for stale node, so it moves into Stale.
+      // 7 seconds have elapsed for dead node, so it moves into dead.
+      // 2 Seconds have elapsed for healthy node, so it stays in healhty state.
+      healthyList = nodeManager.getNodes(HEALTHY);
+      List<DatanodeID> staleList = nodeManager.getNodes(STALE);
+      List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
+
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(1, nodeManager.getNodeCount(HEALTHY));
+      assertEquals(1, nodeManager.getNodeCount(STALE));
+      assertEquals(1, nodeManager.getNodeCount(DEAD));
+
+      assertEquals("Expected one healthy node", 1, healthyList.size());
+      assertEquals("Healthy node is not the expected ID", healthyNode
+          .getDatanodeUuid(), healthyList.get(0).getDatanodeUuid());
+
+      assertEquals("Expected one stale node", 1, staleList.size());
+      assertEquals("Stale node is not the expected ID", staleNode
+          .getDatanodeUuid(), staleList.get(0).getDatanodeUuid());
+
+      assertEquals("Expected one dead node", 1, deadList.size());
+      assertEquals("Dead node is not the expected ID", deadNode
+          .getDatanodeUuid(), deadList.get(0).getDatanodeUuid());
+
+      /**
+       * Cluster State : let us heartbeat all the nodes and verify that we get
+       * back all the nodes in healthy state.
+       */
+      nodeManager.updateHeartbeat(healthyNode);
+      nodeManager.updateHeartbeat(staleNode);
+      nodeManager.updateHeartbeat(deadNode);
+      Thread.sleep(500);
+      //Assert all nodes are healthy.
+      assertEquals(3, nodeManager.getAllNodes().size());
+      assertEquals(3, nodeManager.getNodeCount(HEALTHY));
+    }
+  }
+
+  /**
+   * Heartbeat a given set of nodes at a specified frequency.
+   *
+   * @param manager       - Node Manager
+   * @param list          - List of datanodeIDs
+   * @param sleepDuration - Duration to sleep between heartbeats.
+   * @throws InterruptedException
+   */
+  private void heartbeatNodeSet(NodeManager manager, List<DatanodeID> list,
+                                int sleepDuration) throws InterruptedException {
+    while (!Thread.currentThread().isInterrupted()) {
+      list.forEach(manager::updateHeartbeat);
+      Thread.sleep(sleepDuration);
+    }
+  }
+
+  /**
+   * Create a set of Nodes with a given prefix.
+   *
+   * @param count  - number of nodes.
+   * @param prefix - A prefix string that can be used in verification.
+   * @return List of Nodes.
+   */
+  private List<DatanodeID> createNodeSet(int count, String prefix) {
+    List<DatanodeID> list = new LinkedList<>();
+    for (int x = 0; x < count; x++) {
+      list.add(getDatanodeID(prefix + x));
+    }
+    return list;
+  }
+
+  /**
+   * Function that tells us if we found the right number of stale nodes.
+   *
+   * @param nodeManager - node manager
+   * @param count       - number of stale nodes to look for.
+   * @return true if we found the expected number.
+   */
+  private boolean findNodes(NodeManager nodeManager, int count,
+                            NodeManager.NODESTATE state) {
+    return count == nodeManager.getNodeCount(state);
+  }
+
+  /**
+   * Asserts that we can create a set of nodes that send its heartbeats from
+   * different threads and NodeManager behaves as expected.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  @Test
+  public void testScmClusterIsInExpectedState2() throws IOException,
+      InterruptedException, TimeoutException {
+    final int healthyCount = 5000;
+    final int staleCount = 100;
+    final int deadCount = 10;
+
+    Configuration conf = getConf();
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+    conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
+    conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 7000);
+
+    List<DatanodeID> healthyNodeList = createNodeSet(healthyCount, "Healthy");
+    List<DatanodeID> staleNodeList = createNodeSet(staleCount, "Stale");
+    List<DatanodeID> deadNodeList = createNodeSet(deadCount, "Dead");
+
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      Runnable healthyNodeTask = () -> {
+        try {
+          // 2 second heartbeat makes these nodes stay healthy.
+          heartbeatNodeSet(nodeManager, healthyNodeList, 2 * 1000);
+        } catch (InterruptedException ignored) {
+        }
+      };
+
+      Runnable staleNodeTask = () -> {
+        try {
+          // 4 second heartbeat makes these nodes go to stale and back to
+          // healthy again.
+          heartbeatNodeSet(nodeManager, staleNodeList, 4 * 1000);
+        } catch (InterruptedException ignored) {
+        }
+      };
+
+
+      // No Thread just one time HBs the node manager, so that these will be
+      // marked as dead nodes eventually.
+      deadNodeList.forEach(nodeManager::updateHeartbeat);
+
+      Thread thread1 = new Thread(healthyNodeTask);
+      thread1.setDaemon(true);
+      thread1.start();
+
+
+      Thread thread2 = new Thread(staleNodeTask);
+      thread2.setDaemon(true);
+      thread2.start();
+
+      Thread.sleep(10 * 1000);
+
+      // Assert all healthy nodes are healthy now, this has to be a greater
+      // than check since Stale nodes can be healthy when we check the state.
+
+      assertTrue(nodeManager.getNodeCount(HEALTHY) >= healthyCount);
+
+      assertEquals(deadCount, nodeManager.getNodeCount(DEAD));
+
+      List<DatanodeID> deadList = nodeManager.getNodes(DEAD);
+
+      for (DatanodeID node : deadList) {
+        assertThat(node.getDatanodeUuid(), CoreMatchers.startsWith("Dead"));
+      }
+
+      // Checking stale nodes is tricky since they have to move between
+      // healthy and stale to avoid becoming dead nodes. So we search for
+      // that state for a while, if we don't find that state waitfor will
+      // throw.
+      GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
+          500, 4 * 1000);
+
+      thread1.interrupt();
+      thread2.interrupt();
+    }
+  }
+
+  /**
+   * Asserts that we can handle 6000+ nodes heartbeating SCM.
+   *
+   * @throws IOException
+   * @throws InterruptedException
+   * @throws TimeoutException
+   */
+  @Test
+  public void testScmCanHandleScale() throws IOException,
+      InterruptedException, TimeoutException {
+    final int healthyCount = 3000;
+    final int staleCount = 3000;
+    List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
+    List<DatanodeID> staleList = createNodeSet(staleCount, "s");
+    Configuration conf = getConf();
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 100);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+    conf.setInt(OZONE_SCM_STALENODE_INTERVAL_MS, 3 * 1000);
+    conf.setInt(OZONE_SCM_DEADNODE_INTERVAL_MS, 6 * 1000);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      Runnable healthyNodeTask = () -> {
+        try {
+          heartbeatNodeSet(nodeManager, healthyList, 2 * 1000);
+        } catch (InterruptedException ignored) {
+
+        }
+      };
+
+      Runnable staleNodeTask = () -> {
+        try {
+          heartbeatNodeSet(nodeManager, staleList, 4 * 1000);
+        } catch (InterruptedException ignored) {
+        }
+      };
+
+      Thread thread1 = new Thread(healthyNodeTask);
+      thread1.setDaemon(true);
+      thread1.start();
+
+
+      Thread thread2 = new Thread(staleNodeTask);
+      thread2.setDaemon(true);
+      thread2.start();
+      Thread.sleep(3 * 1000);
+
+      GenericTestUtils.waitFor(() -> findNodes(nodeManager, staleCount, STALE),
+          500, 20 * 1000);
+      assertEquals("Node count mismatch", healthyCount + staleCount, nodeManager
+          .getAllNodes().size());
+
+      thread1.interrupt();
+      thread2.interrupt();
+    }
+  }
+
+  /**
+   * Asserts that SCM backs off from HB processing instead of going into an
+   * infinite loop if SCM is flooded with too many heartbeats. This many not be
+   * the best thing to do, but SCM tries to protect itself and logs an error
+   * saying that it is getting flooded with heartbeats. In real world this can
+   * lead to many nodes becoming stale or dead due to the fact that SCM is not
+   * able to keep up with heartbeat processing. This test just verifies that SCM
+   * will log that information.
+   */
+  @Test
+  public void testScmLogsHeartbeatFlooding() throws IOException,
+      InterruptedException {
+    final int healthyCount = 3000;
+    List<DatanodeID> healthyList = createNodeSet(healthyCount, "h");
+
+    // Make the HB process thread run slower.
+    Configuration conf = getConf();
+    conf.setInt(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 500);
+    conf.setInt(OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 1);
+    conf.setInt(OZONE_SCM_MAX_HB_COUNT_TO_PROCESS, 500);
+
+    try (SCMNodeManager nodeManager = createNodeManager(conf)) {
+      GenericTestUtils.LogCapturer logCapturer =
+          GenericTestUtils.LogCapturer.captureLogs(SCMNodeManager.LOG);
+      Runnable healthyNodeTask = () -> {
+        try {
+          // No wait in the HB sending loop.
+          heartbeatNodeSet(nodeManager, healthyList, 0);
+        } catch (InterruptedException ignored) {
+        }
+      };
+      Thread thread1 = new Thread(healthyNodeTask);
+      thread1.setDaemon(true);
+      thread1.start();
+
+      Thread.sleep(6 * 1000);
+
+
+      thread1.interrupt();
+      logCapturer.stopCapturing();
+
+      assertThat(logCapturer.getOutput(), containsString("SCM is being " +
+          "flooded by heartbeats. Not able to keep up with the heartbeat " +
+          "counts."));
+    }
+  }
+}