瀏覽代碼

YARN-6678. Handle IllegalStateException in Async Scheduling mode of CapacityScheduler. Contributed by Tao Yang.

Sunil G 7 年之前
父節點
當前提交
f64cfeaf61

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -426,6 +426,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
           // accepted & confirmed, it will become RESERVED state
           if (schedulerContainer.getRmContainer().getState()
               == RMContainerState.RESERVED) {
+            // Check if node currently reserved by other application, there may
+            // be some outdated proposals in async-scheduling environment
+            if (schedulerContainer.getRmContainer() != schedulerContainer
+                .getSchedulerNode().getReservedContainer()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Try to re-reserve a container, but node "
+                    + schedulerContainer.getSchedulerNode()
+                    + " is already reserved by another container"
+                    + schedulerContainer.getSchedulerNode()
+                    .getReservedContainer().getContainerId());
+              }
+              return false;
+            }
             // Set reReservation == true
             reReservation = true;
           } else {

+ 147 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -20,7 +20,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -41,20 +44,26 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnSched
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 public class TestCapacitySchedulerAsyncScheduling {
   private final int GB = 1024;
@@ -257,6 +266,144 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.stop();
   }
 
+  // Testcase for YARN-6678
+  @Test(timeout = 30000)
+  public void testCommitOutdatedReservedProposal() throws Exception {
+    // disable async-scheduling for simulating complex since scene
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs & Nodes
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("h1:1234", 9 * GB);
+    final MockNM nm2 = rm.registerNode("h2:2234", 9 * GB);
+
+    // init scheduler nodes
+    int waitTime = 1000;
+    while (waitTime > 0 &&
+        ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+            .getNodeTracker().nodeCount() < 2) {
+      waitTime -= 10;
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(2,
+        ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+            .getNodeTracker().nodeCount());
+
+    YarnScheduler scheduler = rm.getRMContext().getScheduler();
+    final SchedulerNode sn1 =
+        ((CapacityScheduler) scheduler).getSchedulerNode(nm1.getNodeId());
+    final SchedulerNode sn2 =
+        ((CapacityScheduler) scheduler).getSchedulerNode(nm2.getNodeId());
+
+    // submit app1, am1 is running on nm1
+    RMApp app = rm.submitApp(200, "app", "user", null, "default");
+    final MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+    // submit app2, am2 is running on nm1
+    RMApp app2 = rm.submitApp(200, "app", "user", null, "default");
+    final MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1);
+
+    // allocate and launch 2 containers for app1
+    allocateAndLaunchContainers(am, nm1, rm, 1,
+        Resources.createResource(5 * GB), 0, 2);
+    allocateAndLaunchContainers(am, nm2, rm, 1,
+        Resources.createResource(5 * GB), 0, 3);
+
+    // nm1 runs 3 containers(app1-container_01/AM, app1-container_02,
+    //                       app2-container_01/AM)
+    // nm2 runs 1 container(app1-container_03)
+    Assert.assertEquals(3, sn1.getNumContainers());
+    Assert.assertEquals(1, sn2.getNumContainers());
+
+    // reserve 1 container(app1-container_04) for app1 on nm1
+    ResourceRequest rr2 = ResourceRequest
+        .newInstance(Priority.newInstance(0), "*",
+            Resources.createResource(5 * GB), 1);
+    am.allocate(Arrays.asList(rr2), null);
+    nm1.nodeHeartbeat(true);
+    // wait app1-container_04 reserved on nm1
+    waitTime = 1000;
+    while (waitTime > 0 && sn1.getReservedContainer() == null) {
+      waitTime -= 10;
+      Thread.sleep(10);
+    }
+    Assert.assertNotNull(sn1.getReservedContainer());
+
+    final CapacityScheduler cs = (CapacityScheduler) scheduler;
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    final AtomicBoolean isFirstReserve = new AtomicBoolean(true);
+    final AtomicBoolean isChecked = new AtomicBoolean(false);
+    // handle CapacityScheduler#tryCommit,
+    // reproduce the process that can raise IllegalStateException before
+    Mockito.doAnswer(new Answer<Object>() {
+      public Object answer(InvocationOnMock invocation) throws Exception {
+        ResourceCommitRequest request =
+            (ResourceCommitRequest) invocation.getArguments()[1];
+        if (request.getContainersToReserve().size() > 0 && isFirstReserve
+            .compareAndSet(true, false)) {
+          // release app1-container_03 on nm2
+          RMContainer killableContainer =
+              sn2.getCopiedListOfRunningContainers().get(0);
+          cs.completedContainer(killableContainer, ContainerStatus
+                  .newInstance(killableContainer.getContainerId(),
+                      ContainerState.COMPLETE, "",
+                      ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+              RMContainerEventType.KILL);
+          Assert.assertEquals(0, sn2.getCopiedListOfRunningContainers().size());
+          // unreserve app1-container_04 on nm1
+          // and allocate app1-container_05 on nm2
+          cs.handle(new NodeUpdateSchedulerEvent(sn2.getRMNode()));
+          int waitTime = 1000;
+          while (waitTime > 0
+              && sn2.getCopiedListOfRunningContainers().size() == 0) {
+            waitTime -= 10;
+            Thread.sleep(10);
+          }
+          Assert.assertEquals(1, sn2.getCopiedListOfRunningContainers().size());
+          Assert.assertNull(sn1.getReservedContainer());
+
+          // reserve app2-container_02 on nm1
+          ResourceRequest rr3 = ResourceRequest
+              .newInstance(Priority.newInstance(0), "*",
+                  Resources.createResource(5 * GB), 1);
+          am2.allocate(Arrays.asList(rr3), null);
+          cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+          waitTime = 1000;
+          while (waitTime > 0 && sn1.getReservedContainer() == null) {
+            waitTime -= 10;
+            Thread.sleep(10);
+          }
+          Assert.assertNotNull(sn1.getReservedContainer());
+
+          // call real apply
+          try {
+            cs.tryCommit((Resource) invocation.getArguments()[0],
+                (ResourceCommitRequest) invocation.getArguments()[1]);
+          } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+          }
+          isChecked.set(true);
+        } else {
+          cs.tryCommit((Resource) invocation.getArguments()[0],
+              (ResourceCommitRequest) invocation.getArguments()[1]);
+        }
+        return null;
+      }
+    }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+        Mockito.any(ResourceCommitRequest.class));
+
+    spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+
+    waitTime = 1000;
+    while (waitTime > 0 && !isChecked.get()) {
+      waitTime -= 10;
+      Thread.sleep(10);
+    }
+    rm.stop();
+  }
 
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
       int nContainer, Resource resource, int priority, int startContainerId)