Ver Fonte

YARN-5918. Handle Opportunistic scheduling allocate request failure when NM is lost. (Bibin A Chundatt via asuresh)

(cherry picked from commit 005850b28feb2f7bb8c2844d11e3f9d21b45d754)
(cherry picked from commit cbff10b4147f98a89b393519b17e16385294af07)
Arun Suresh há 8 anos atrás
pai
commit
44774eb21c

+ 9 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java

@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
 
@@ -409,15 +410,19 @@ public class OpportunisticContainerAllocatorAMService
   private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
     ArrayList<RemoteNode> retNodes = new ArrayList<>();
     for (NodeId nId : nodeIds) {
-      retNodes.add(convertToRemoteNode(nId));
+      RemoteNode remoteNode = convertToRemoteNode(nId);
+      if (null != remoteNode) {
+        retNodes.add(remoteNode);
+      }
     }
     return retNodes;
   }
 
   private RemoteNode convertToRemoteNode(NodeId nodeId) {
-    return RemoteNode.newInstance(nodeId,
-        ((AbstractYarnScheduler)rmContext.getScheduler()).getNode(nodeId)
-            .getHttpAddress());
+    SchedulerNode node =
+        ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
+    return node != null ? RemoteNode.newInstance(nodeId, node.getHttpAddress())
+        : null;
   }
 
   private Resource createMaxContainerResource() {

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java

@@ -165,9 +165,11 @@ public class NodeQueueLoadMonitor implements ClusterMonitor {
   }
 
   @Override
-  public void addNode(List<NMContainerStatus> containerStatuses, RMNode
-      rmNode) {
-    LOG.debug("Node added event from: " + rmNode.getNode().getName());
+  public void addNode(List<NMContainerStatus> containerStatuses,
+      RMNode rmNode) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Node added event from: " + rmNode.getNode().getName());
+    }
     // Ignoring this currently : at least one NODE_UPDATE heartbeat is
     // required to ensure node eligibility.
   }

+ 96 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

@@ -61,11 +61,23 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
+import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
-
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -77,6 +89,89 @@ import java.util.List;
  */
 public class TestOpportunisticContainerAllocatorAMService {
 
+  private static final int GB = 1024;
+
+  @Test(timeout = 60000)
+  public void testNodeRemovalDuringAllocate() throws Exception {
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration();
+    YarnConfiguration conf = new YarnConfiguration(csConf);
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS, 100);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
+    MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
+    nm1.registerNode();
+    nm2.registerNode();
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app1.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm.getRMContext().getRMNodes().get(nm2.getNodeId());
+    nm1.nodeHeartbeat(true);
+    nm2.nodeHeartbeat(true);
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    ((RMNodeImpl) rmNode2)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    OpportunisticContainerContext ctxt = ((CapacityScheduler) scheduler)
+        .getApplicationAttempt(attemptId).getOpportunisticContainerContext();
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode2));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    // Both node 1 and node 2 will be applicable for scheduling.
+    for (int i = 0; i < 10; i++) {
+      am1.allocate(
+          Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+              "*", Resources.createResource(1 * GB), 2)),
+          null);
+      if (ctxt.getNodeMap().size() == 2) {
+        break;
+      }
+      Thread.sleep(50);
+    }
+    Assert.assertEquals(2, ctxt.getNodeMap().size());
+    // Remove node from scheduler but not from AM Service.
+    scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
+    // After removal of node 1, only 1 node will be applicable for scheduling.
+    for (int i = 0; i < 10; i++) {
+      try {
+        am1.allocate(
+            Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+                "*", Resources.createResource(1 * GB), 2)),
+            null);
+      } catch (Exception e) {
+        Assert.fail("Allocate request should be handled on node removal");
+      }
+      if (ctxt.getNodeMap().size() == 1) {
+        break;
+      }
+      Thread.sleep(50);
+    }
+    Assert.assertEquals(1, ctxt.getNodeMap().size());
+  }
+
+  private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
+      int queueLength) {
+    OpportunisticContainersStatus status1 =
+        Mockito.mock(OpportunisticContainersStatus.class);
+    Mockito.when(status1.getEstimatedQueueWaitTime()).thenReturn(waitTime);
+    Mockito.when(status1.getWaitQueueLength()).thenReturn(queueLength);
+    return status1;
+  }
+
   // Test if the OpportunisticContainerAllocatorAMService can handle both
   // DSProtocol as well as AMProtocol clients
   @Test