ソースを参照

YARN-7790. Improve Capacity Scheduler Async Scheduling to better handle node failures. Contributed by Wangda Tan.

Sunil G 7 年 前
コミット
e9c72d04be

+ 28 - 23
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -980,11 +980,11 @@ public abstract class AbstractYarnScheduler
   /**
   /**
    * Get lists of new containers from NodeManager and process them.
    * Get lists of new containers from NodeManager and process them.
    * @param nm The RMNode corresponding to the NodeManager
    * @param nm The RMNode corresponding to the NodeManager
+   * @param schedulerNode schedulerNode
    * @return list of completed containers
    * @return list of completed containers
    */
    */
-  protected List<ContainerStatus> updateNewContainerInfo(RMNode nm) {
-    SchedulerNode node = getNode(nm.getNodeID());
-
+  private List<ContainerStatus> updateNewContainerInfo(RMNode nm,
+      SchedulerNode schedulerNode) {
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<UpdatedContainerInfo> containerInfoList = nm.pullContainerUpdates();
     List<ContainerStatus> newlyLaunchedContainers =
     List<ContainerStatus> newlyLaunchedContainers =
         new ArrayList<>();
         new ArrayList<>();
@@ -999,14 +999,15 @@ public abstract class AbstractYarnScheduler
 
 
     // Processing the newly launched containers
     // Processing the newly launched containers
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
     for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
-      containerLaunchedOnNode(launchedContainer.getContainerId(), node);
+      containerLaunchedOnNode(launchedContainer.getContainerId(),
+          schedulerNode);
     }
     }
 
 
     // Processing the newly increased containers
     // Processing the newly increased containers
     List<Container> newlyIncreasedContainers =
     List<Container> newlyIncreasedContainers =
         nm.pullNewlyIncreasedContainers();
         nm.pullNewlyIncreasedContainers();
     for (Container container : newlyIncreasedContainers) {
     for (Container container : newlyIncreasedContainers) {
-      containerIncreasedOnNode(container.getId(), node, container);
+      containerIncreasedOnNode(container.getId(), schedulerNode, container);
     }
     }
 
 
     return completedContainers;
     return completedContainers;
@@ -1017,12 +1018,12 @@ public abstract class AbstractYarnScheduler
    * @param completedContainers Extracted list of completed containers
    * @param completedContainers Extracted list of completed containers
    * @param releasedResources Reference resource object for completed containers
    * @param releasedResources Reference resource object for completed containers
    * @param nodeId NodeId corresponding to the NodeManager
    * @param nodeId NodeId corresponding to the NodeManager
+   * @param schedulerNode schedulerNode
    * @return The total number of released containers
    * @return The total number of released containers
    */
    */
-  protected int updateCompletedContainers(List<ContainerStatus>
-      completedContainers, Resource releasedResources, NodeId nodeId) {
+  private int updateCompletedContainers(List<ContainerStatus> completedContainers,
+      Resource releasedResources, NodeId nodeId, SchedulerNode schedulerNode) {
     int releasedContainers = 0;
     int releasedContainers = 0;
-    SchedulerNode node = getNode(nodeId);
     List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
     List<ContainerId> untrackedContainerIdList = new ArrayList<ContainerId>();
     for (ContainerStatus completedContainer : completedContainers) {
     for (ContainerStatus completedContainer : completedContainers) {
       ContainerId containerId = completedContainer.getContainerId();
       ContainerId containerId = completedContainer.getContainerId();
@@ -1030,8 +1031,8 @@ public abstract class AbstractYarnScheduler
       RMContainer container = getRMContainer(containerId);
       RMContainer container = getRMContainer(containerId);
       completedContainer(container,
       completedContainer(container,
           completedContainer, RMContainerEventType.FINISHED);
           completedContainer, RMContainerEventType.FINISHED);
-      if (node != null) {
-        node.releaseContainer(containerId, true);
+      if (schedulerNode != null) {
+        schedulerNode.releaseContainer(containerId, true);
       }
       }
 
 
       if (container != null) {
       if (container != null) {
@@ -1076,14 +1077,14 @@ public abstract class AbstractYarnScheduler
   /**
   /**
    * Update container and utilization information on the NodeManager.
    * Update container and utilization information on the NodeManager.
    * @param nm The NodeManager to update
    * @param nm The NodeManager to update
+   * @param schedulerNode schedulerNode
    */
    */
-  protected void updateNodeResourceUtilization(RMNode nm) {
-    SchedulerNode node = getNode(nm.getNodeID());
+  protected void updateNodeResourceUtilization(RMNode nm,
+      SchedulerNode schedulerNode) {
     // Updating node resource utilization
     // Updating node resource utilization
-    node.setAggregatedContainersUtilization(
+    schedulerNode.setAggregatedContainersUtilization(
         nm.getAggregatedContainersUtilization());
         nm.getAggregatedContainersUtilization());
-    node.setNodeUtilization(nm.getNodeUtilization());
-
+    schedulerNode.setNodeUtilization(nm.getNodeUtilization());
   }
   }
 
 
   /**
   /**
@@ -1097,12 +1098,17 @@ public abstract class AbstractYarnScheduler
     }
     }
 
 
     // Process new container information
     // Process new container information
-    List<ContainerStatus> completedContainers = updateNewContainerInfo(nm);
+    SchedulerNode schedulerNode = getNode(nm.getNodeID());
+    List<ContainerStatus> completedContainers = updateNewContainerInfo(nm,
+        schedulerNode);
+
+    // Notify Scheduler Node updated.
+    schedulerNode.notifyNodeUpdate();
 
 
     // Process completed containers
     // Process completed containers
     Resource releasedResources = Resource.newInstance(0, 0);
     Resource releasedResources = Resource.newInstance(0, 0);
     int releasedContainers = updateCompletedContainers(completedContainers,
     int releasedContainers = updateCompletedContainers(completedContainers,
-        releasedResources, nm.getNodeID());
+        releasedResources, nm.getNodeID(), schedulerNode);
 
 
     // If the node is decommissioning, send an update to have the total
     // If the node is decommissioning, send an update to have the total
     // resource equal to the used resource, so no available resource to
     // resource equal to the used resource, so no available resource to
@@ -1115,18 +1121,17 @@ public abstract class AbstractYarnScheduler
           .getEventHandler()
           .getEventHandler()
           .handle(
           .handle(
               new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
               new RMNodeResourceUpdateEvent(nm.getNodeID(), ResourceOption
-                  .newInstance(getSchedulerNode(nm.getNodeID())
-                      .getAllocatedResource(), 0)));
+                  .newInstance(schedulerNode.getAllocatedResource(), 0)));
     }
     }
 
 
     updateSchedulerHealthInformation(releasedResources, releasedContainers);
     updateSchedulerHealthInformation(releasedResources, releasedContainers);
-    updateNodeResourceUtilization(nm);
+    updateNodeResourceUtilization(nm, schedulerNode);
 
 
     // Now node data structures are up-to-date and ready for scheduling.
     // Now node data structures are up-to-date and ready for scheduling.
     if(LOG.isDebugEnabled()) {
     if(LOG.isDebugEnabled()) {
-      SchedulerNode node = getNode(nm.getNodeID());
-      LOG.debug("Node being looked for scheduling " + nm +
-          " availableResource: " + node.getUnallocatedResource());
+      LOG.debug(
+          "Node being looked for scheduling " + nm + " availableResource: "
+              + schedulerNode.getUnallocatedResource());
     }
     }
   }
   }
 
 

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
@@ -76,6 +77,9 @@ public abstract class SchedulerNode {
 
 
   private volatile Set<String> labels = null;
   private volatile Set<String> labels = null;
 
 
+  // Last updated time
+  private volatile long lastHeartbeatMonotonicTime;
+
   public SchedulerNode(RMNode node, boolean usePortForNodeName,
   public SchedulerNode(RMNode node, boolean usePortForNodeName,
       Set<String> labels) {
       Set<String> labels) {
     this.rmNode = node;
     this.rmNode = node;
@@ -87,6 +91,7 @@ public abstract class SchedulerNode {
       nodeName = rmNode.getHostName();
       nodeName = rmNode.getHostName();
     }
     }
     this.labels = ImmutableSet.copyOf(labels);
     this.labels = ImmutableSet.copyOf(labels);
+    this.lastHeartbeatMonotonicTime = Time.monotonicNow();
   }
   }
 
 
   public SchedulerNode(RMNode node, boolean usePortForNodeName) {
   public SchedulerNode(RMNode node, boolean usePortForNodeName) {
@@ -453,6 +458,17 @@ public abstract class SchedulerNode {
     return this.nodeUtilization;
     return this.nodeUtilization;
   }
   }
 
 
+  public long getLastHeartbeatMonotonicTime() {
+    return lastHeartbeatMonotonicTime;
+  }
+
+  /**
+   * This will be called for each node heartbeat.
+   */
+  public void notifyNodeUpdate() {
+    this.lastHeartbeatMonotonicTime = Time.monotonicNow();
+  }
+
 
 
   private static class ContainerInfo {
   private static class ContainerInfo {
     private final RMContainer container;
     private final RMContainer container;

+ 45 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -142,7 +142,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleC
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock;
-import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -181,8 +180,6 @@ public class CapacityScheduler extends
 
 
   private CSConfigurationProvider csConfProvider;
   private CSConfigurationProvider csConfProvider;
 
 
-  protected Clock monotonicClock;
-
   @Override
   @Override
   public void setConf(Configuration conf) {
   public void setConf(Configuration conf) {
       yarnConf = conf;
       yarnConf = conf;
@@ -243,6 +240,8 @@ public class CapacityScheduler extends
   private RMNodeLabelsManager labelManager;
   private RMNodeLabelsManager labelManager;
   private AppPriorityACLsManager appPriorityACLManager;
   private AppPriorityACLsManager appPriorityACLManager;
 
 
+  private static boolean printedVerboseLoggingForAsyncScheduling = false;
+
   /**
   /**
    * EXPERT
    * EXPERT
    */
    */
@@ -471,6 +470,22 @@ public class CapacityScheduler extends
 
 
   private final static Random random = new Random(System.currentTimeMillis());
   private final static Random random = new Random(System.currentTimeMillis());
 
 
+  private static boolean shouldSkipNodeSchedule(FiCaSchedulerNode node,
+      CapacityScheduler cs, boolean printVerboseLog) {
+    // Skip node which missed 2 heartbeats since the node might be dead and
+    // we should not continue allocate containers on that.
+    long timeElapsedFromLastHeartbeat =
+        Time.monotonicNow() - node.getLastHeartbeatMonotonicTime();
+    if (timeElapsedFromLastHeartbeat > cs.nmHeartbeatInterval * 2) {
+      if (printVerboseLog && LOG.isDebugEnabled()) {
+        LOG.debug("Skip scheduling on node because it haven't heartbeated for "
+            + timeElapsedFromLastHeartbeat / 1000.0f + " secs");
+      }
+      return true;
+    }
+    return false;
+  }
+
   /**
   /**
    * Schedule on all nodes by starting at a random point.
    * Schedule on all nodes by starting at a random point.
    * @param cs
    * @param cs
@@ -481,16 +496,42 @@ public class CapacityScheduler extends
     Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
     Collection<FiCaSchedulerNode> nodes = cs.nodeTracker.getAllNodes();
     int start = random.nextInt(nodes.size());
     int start = random.nextInt(nodes.size());
 
 
+    // To avoid too verbose DEBUG logging, only print debug log once for
+    // every 10 secs.
+    boolean printSkipedNodeLogging = false;
+    if (Time.monotonicNow() / 1000 % 10 == 0) {
+      printSkipedNodeLogging = (!printedVerboseLoggingForAsyncScheduling);
+    } else {
+      printedVerboseLoggingForAsyncScheduling = false;
+    }
+
+    // Allocate containers of node [start, end)
     for (FiCaSchedulerNode node : nodes) {
     for (FiCaSchedulerNode node : nodes) {
       if (current++ >= start) {
       if (current++ >= start) {
+        if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+          continue;
+        }
         cs.allocateContainersToNode(node.getNodeID(), false);
         cs.allocateContainersToNode(node.getNodeID(), false);
       }
       }
     }
     }
-    // Now, just get everyone to be safe
+
+    current = 0;
+
+    // Allocate containers of node [0, start)
     for (FiCaSchedulerNode node : nodes) {
     for (FiCaSchedulerNode node : nodes) {
+      if (current++ > start) {
+        break;
+      }
+      if (shouldSkipNodeSchedule(node, cs, printSkipedNodeLogging)) {
+        continue;
+      }
       cs.allocateContainersToNode(node.getNodeID(), false);
       cs.allocateContainersToNode(node.getNodeID(), false);
     }
     }
 
 
+    if (printSkipedNodeLogging) {
+      printedVerboseLoggingForAsyncScheduling = true;
+    }
+
     Thread.sleep(cs.getAsyncScheduleInterval());
     Thread.sleep(cs.getAsyncScheduleInterval());
   }
   }
 
 

+ 35 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHAForAsyncScheduler.java

@@ -28,13 +28,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAsyncScheduling;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
+import java.util.Arrays;
+import java.util.List;
+
 public class TestRMHAForAsyncScheduler extends RMHATestBase {
 public class TestRMHAForAsyncScheduler extends RMHATestBase {
+  private TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread
+      nmHeartbeatThread = null;
 
 
   @Before
   @Before
   @Override
   @Override
@@ -57,26 +63,49 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
         CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
   }
   }
 
 
+  private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+    nmHeartbeatThread =
+        new TestCapacitySchedulerAsyncScheduling.NMHeartbeatThread(mockNMs,
+            interval);
+    nmHeartbeatThread.start();
+  }
+
+  private void pauseNMHeartbeat() {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+  }
+
   @Test(timeout = 60000)
   @Test(timeout = 60000)
   public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
   public void testAsyncScheduleThreadStateAfterRMHATransit() throws Exception {
     // start two RMs, and transit rm1 to active, rm2 to standby
     // start two RMs, and transit rm1 to active, rm2 to standby
     startRMs();
     startRMs();
     // register NM
     // register NM
-    rm1.registerNode("h1:1234", 8192, 8);
+    MockNM nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
     // submit app1 and check
     // submit app1 and check
     RMApp app1 = submitAppAndCheckLaunched(rm1);
     RMApp app1 = submitAppAndCheckLaunched(rm1);
+    keepNMHeartbeat(Arrays.asList(nm), 1000);
 
 
     // failover RM1 to RM2
     // failover RM1 to RM2
     explicitFailover();
     explicitFailover();
     checkAsyncSchedulerThreads(Thread.currentThread());
     checkAsyncSchedulerThreads(Thread.currentThread());
+    pauseNMHeartbeat();
 
 
     // register NM, kill app1
     // register NM, kill app1
-    rm2.registerNode("h1:1234", 8192, 8);
+    nm = rm2.registerNode("192.1.1.1:1234", 8192, 8);
+    keepNMHeartbeat(Arrays.asList(nm), 1000);
+
     rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
     rm2.waitForState(app1.getCurrentAppAttempt().getAppAttemptId(),
         RMAppAttemptState.LAUNCHED);
         RMAppAttemptState.LAUNCHED);
     rm2.killApp(app1.getApplicationId());
     rm2.killApp(app1.getApplicationId());
     // submit app3 and check
     // submit app3 and check
     RMApp app2 = submitAppAndCheckLaunched(rm2);
     RMApp app2 = submitAppAndCheckLaunched(rm2);
+    pauseNMHeartbeat();
 
 
     // failover RM2 to RM1
     // failover RM2 to RM1
     HAServiceProtocol.StateChangeRequestInfo requestInfo =
     HAServiceProtocol.StateChangeRequestInfo requestInfo =
@@ -92,12 +121,15 @@ public class TestRMHAForAsyncScheduler extends RMHATestBase {
     checkAsyncSchedulerThreads(Thread.currentThread());
     checkAsyncSchedulerThreads(Thread.currentThread());
 
 
     // register NM, kill app2
     // register NM, kill app2
-    rm1.registerNode("h1:1234", 8192, 8);
+    nm = rm1.registerNode("192.1.1.1:1234", 8192, 8);
+    keepNMHeartbeat(Arrays.asList(nm), 1000);
+
     rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
     rm1.waitForState(app2.getCurrentAppAttempt().getAppAttemptId(),
         RMAppAttemptState.LAUNCHED);
         RMAppAttemptState.LAUNCHED);
     rm1.killApp(app2.getApplicationId());
     rm1.killApp(app2.getApplicationId());
     // submit app3 and check
     // submit app3 and check
     submitAppAndCheckLaunched(rm1);
     submitAppAndCheckLaunched(rm1);
+    pauseNMHeartbeat();
 
 
     rm1.stop();
     rm1.stop();
     rm2.stop();
     rm2.stop();

+ 152 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
@@ -72,6 +73,8 @@ public class TestCapacitySchedulerAsyncScheduling {
 
 
   RMNodeLabelsManager mgr;
   RMNodeLabelsManager mgr;
 
 
+  private NMHeartbeatThread nmHeartbeatThread = null;
+
   @Before
   @Before
   public void setUp() throws Exception {
   public void setUp() throws Exception {
     conf = new YarnConfiguration();
     conf = new YarnConfiguration();
@@ -122,9 +125,11 @@ public class TestCapacitySchedulerAsyncScheduling {
     List<MockNM> nms = new ArrayList<>();
     List<MockNM> nms = new ArrayList<>();
     // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
     // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
     for (int i = 0; i < 10; i++) {
     for (int i = 0; i < 10; i++) {
-      nms.add(rm.registerNode("h-" + i + ":1234", 20 * GB));
+      nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
     }
     }
 
 
+    keepNMHeartbeat(nms, 1000);
+
     List<MockAM> ams = new ArrayList<>();
     List<MockAM> ams = new ArrayList<>();
     // Add 3 applications to the cluster, one app in one queue
     // Add 3 applications to the cluster, one app in one queue
     // the i-th app ask (20 * i) containers. So in total we will have
     // the i-th app ask (20 * i) containers. So in total we will have
@@ -185,8 +190,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // init RM & NMs & Nodes
     // init RM & NMs & Nodes
     final MockRM rm = new MockRM(disableAsyncConf);
     final MockRM rm = new MockRM(disableAsyncConf);
     rm.start();
     rm.start();
-    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
-    final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 9 * GB);
     List<MockNM> nmLst = new ArrayList<>();
     List<MockNM> nmLst = new ArrayList<>();
     nmLst.add(nm1);
     nmLst.add(nm1);
     nmLst.add(nm2);
     nmLst.add(nm2);
@@ -277,8 +282,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // init RM & NMs & Nodes
     // init RM & NMs & Nodes
     final MockRM rm = new MockRM(disableAsyncConf);
     final MockRM rm = new MockRM(disableAsyncConf);
     rm.start();
     rm.start();
-    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
-    final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+    final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("127.0.0.2:2234", 9 * GB);
 
 
     // init scheduler nodes
     // init scheduler nodes
     int waitTime = 1000;
     int waitTime = 1000;
@@ -416,8 +421,8 @@ public class TestCapacitySchedulerAsyncScheduling {
     // init RM & NMs & Nodes
     // init RM & NMs & Nodes
     final MockRM rm = new MockRM(disableAsyncConf);
     final MockRM rm = new MockRM(disableAsyncConf);
     rm.start();
     rm.start();
-    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
-    final MockNM nm2 = rm.registerNode("h2:1234", 9 * GB);
+    final MockNM nm1 = rm.registerNode("127.0.0.1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("127.0.0.2:1234", 9 * GB);
     List<MockNM> nmLst = new ArrayList<>();
     List<MockNM> nmLst = new ArrayList<>();
     nmLst.add(nm1);
     nmLst.add(nm1);
     nmLst.add(nm2);
     nmLst.add(nm2);
@@ -476,6 +481,146 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.stop();
     rm.stop();
   }
   }
 
 
+  /**
+   * Make sure scheduler skips NMs which haven't heartbeat for a while.
+   * @throws Exception
+   */
+  @Test
+  public void testAsyncSchedulerSkipNoHeartbeatNMs() throws Exception {
+    int heartbeatInterval = 100;
+    conf.setInt(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+        1);
+    conf.setInt(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+        + ".scheduling-interval-ms", 100);
+    // Heartbeat interval is 100 ms.
+    conf.setInt(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, heartbeatInterval);
+
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+
+    // inject node label manager
+    MockRM rm = new MockRM(TestUtils.getConfigurationWithMultipleQueues(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    rm.getRMContext().setNodeLabelManager(mgr);
+    rm.start();
+
+    List<MockNM> nms = new ArrayList<>();
+    // Add 10 nodes to the cluster, in the cluster we have 200 GB resource
+    for (int i = 0; i < 10; i++) {
+      nms.add(rm.registerNode("127.0.0." + i + ":1234", 20 * GB));
+    }
+
+    List<MockAM> ams = new ArrayList<>();
+
+    keepNMHeartbeat(nms, heartbeatInterval);
+
+    for (int i = 0; i < 3; i++) {
+      RMApp rmApp = rm.submitApp(1024, "app", "user", null, false,
+          Character.toString((char) (i % 34 + 97)), 1, null, null, false);
+      MockAM am = MockRM.launchAMWhenAsyncSchedulingEnabled(rmApp, rm);
+      am.registerAppAttempt();
+      ams.add(am);
+    }
+
+    pauseNMHeartbeat();
+
+    Thread.sleep(heartbeatInterval * 3);
+
+    // Applications request containers.
+    for (int i = 0; i < 3; i++) {
+      ams.get(i).allocate("*", 1024, 20 * (i + 1), new ArrayList<>());
+    }
+
+    for (int i = 0; i < 5; i++) {
+      // Do heartbeat for NM 0-4
+      nms.get(i).nodeHeartbeat(true);
+    }
+
+    // Wait for 2000 ms.
+    Thread.sleep(2000);
+
+    // Make sure that NM5-9 don't have non-AM containers.
+    for (int i = 0; i < 9; i++) {
+      if (i < 5) {
+        Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) > 0);
+      } else {
+        Assert.assertTrue(checkNumNonAMContainersOnNode(cs, nms.get(i)) == 0);
+      }
+    }
+
+    rm.close();
+  }
+
+  public static class NMHeartbeatThread extends Thread {
+    private List<MockNM> mockNMS;
+    private int interval;
+    private volatile boolean shouldStop = false;
+
+    public NMHeartbeatThread(List<MockNM> mockNMs, int interval) {
+      this.mockNMS = mockNMs;
+      this.interval = interval;
+    }
+
+    public void run() {
+      while (true) {
+        if (shouldStop) {
+          break;
+        }
+        for (MockNM nm : mockNMS) {
+          try {
+            nm.nodeHeartbeat(true);
+          } catch (Exception e) {
+            e.printStackTrace();
+          }
+        }
+        try {
+          Thread.sleep(interval);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    public void setShouldStop() {
+      shouldStop = true;
+    }
+  }
+
+  private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+    nmHeartbeatThread = new NMHeartbeatThread(mockNMs, interval);
+    nmHeartbeatThread.start();
+  }
+
+  private void pauseNMHeartbeat() {
+    if (nmHeartbeatThread != null) {
+      nmHeartbeatThread.setShouldStop();
+      nmHeartbeatThread = null;
+    }
+  }
+
+  private int checkNumNonAMContainersOnNode(CapacityScheduler cs, MockNM nm) {
+    SchedulerNode node = cs.getNode(nm.getNodeId());
+    int nonAMContainer = 0;
+    for (RMContainer c : node.getCopiedListOfRunningContainers()) {
+      if (!c.isAMContainer()) {
+         nonAMContainer++;
+      }
+    }
+    return nonAMContainer;
+  }
+
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
       int nContainer, Resource resource, int priority, int startContainerId)
       int nContainer, Resource resource, int priority, int startContainerId)
       throws Exception {
       throws Exception {