瀏覽代碼

YARN-8127. Resource leak when async scheduling is enabled. Contributed by Tao Yang.

(cherry picked from commit 7eb783e2634d8c11fb646f1f2fdf597336325312)
Weiwei Yang 7 年之前
父節點
當前提交
dc03afc7df

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -344,6 +344,16 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         return false;
         return false;
       }
       }
     }
     }
+    // If allocate from reserved container, make sure node is still reserved
+    if (allocation.getAllocateFromReservedContainer() != null
+        && reservedContainerOnNode == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Try to allocate from reserved container " + allocation
+            .getAllocateFromReservedContainer().getRmContainer()
+            .getContainerId() + ", but node is not reserved");
+      }
+      return false;
+    }
 
 
     // Do we have enough space on this node?
     // Do we have enough space on this node?
     Resource availableResource = Resources.clone(
     Resource availableResource = Resources.clone(

+ 90 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -579,6 +579,96 @@ public class TestCapacitySchedulerAsyncScheduling {
     rm.stop();
     rm.stop();
   }
   }
 
 
+  // Testcase for YARN-8127
+  @Test (timeout = 30000)
+  public void testCommitDuplicatedAllocateFromReservedProposals()
+      throws Exception {
+    // disable async-scheduling for simulating complex scene
+    Configuration disableAsyncConf = new Configuration(conf);
+    disableAsyncConf.setBoolean(
+        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
+
+    // init RM & NMs
+    final MockRM rm = new MockRM(disableAsyncConf);
+    rm.start();
+    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
+    rm.registerNode("192.168.0.2:2234", 8 * GB);
+
+    // init scheduler & nodes
+    while (
+        ((CapacityScheduler) rm.getRMContext().getScheduler()).getNodeTracker()
+            .nodeCount() < 2) {
+      Thread.sleep(10);
+    }
+    Assert.assertEquals(2,
+        ((AbstractYarnScheduler) rm.getRMContext().getScheduler())
+            .getNodeTracker().nodeCount());
+    final CapacityScheduler cs =
+        (CapacityScheduler) rm.getRMContext().getScheduler();
+    final SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    // launch app
+    RMApp app = rm.submitApp(1 * GB, "app", "user", null, false, "default",
+        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm1);
+    FiCaSchedulerApp schedulerApp =
+        cs.getApplicationAttempt(am.getApplicationAttemptId());
+
+    // app asks 1 * 6G container
+    // nm1 runs 2 container(container_01/AM, container_02)
+    allocateAndLaunchContainers(am, nm1, rm, 1,
+        Resources.createResource(6 * GB), 0, 2);
+    Assert.assertEquals(2, sn1.getNumContainers());
+    Assert.assertEquals(1 * GB, sn1.getUnallocatedResource().getMemorySize());
+
+    // app asks 5 * 2G container
+    // nm1 reserves 1 * 2G containers
+    am.allocate(Arrays.asList(ResourceRequest
+        .newInstance(Priority.newInstance(0), "*",
+            Resources.createResource(2 * GB), 5)), null);
+    cs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+    Assert.assertEquals(1, schedulerApp.getReservedContainers().size());
+
+    // rm kills 1 * 6G container_02
+    for (RMContainer rmContainer : sn1.getCopiedListOfRunningContainers()) {
+      if (rmContainer.getContainerId().getContainerId() != 1) {
+        cs.completedContainer(rmContainer, ContainerStatus
+                .newInstance(rmContainer.getContainerId(),
+                    ContainerState.COMPLETE, "",
+                    ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+            RMContainerEventType.KILL);
+      }
+    }
+    Assert.assertEquals(7 * GB, sn1.getUnallocatedResource().getMemorySize());
+
+    final CapacityScheduler spyCs = Mockito.spy(cs);
+    // handle CapacityScheduler#tryCommit, submit duplicated proposals
+    // that do allocation for reserved container for three times,
+    // to simulate that case in YARN-8127
+    Mockito.doAnswer(new Answer<Object>() {
+      public Boolean answer(InvocationOnMock invocation) throws Exception {
+        ResourceCommitRequest request =
+            (ResourceCommitRequest) invocation.getArguments()[1];
+        if (request.getFirstAllocatedOrReservedContainer()
+            .getAllocateFromReservedContainer() != null) {
+          for (int i=0; i<3; i++) {
+            cs.tryCommit((Resource) invocation.getArguments()[0],
+                (ResourceCommitRequest) invocation.getArguments()[1]);
+          }
+          Assert.assertEquals(2, sn1.getCopiedListOfRunningContainers().size());
+          Assert.assertEquals(5 * GB,
+              sn1.getUnallocatedResource().getMemorySize());
+        }
+        return true;
+      }
+    }).when(spyCs).tryCommit(Mockito.any(Resource.class),
+        Mockito.any(ResourceCommitRequest.class));
+
+    spyCs.handle(new NodeUpdateSchedulerEvent(sn1.getRMNode()));
+
+    rm.stop();
+  }
+
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
   private void allocateAndLaunchContainers(MockAM am, MockNM nm, MockRM rm,
       int nContainer, Resource resource, int priority, int startContainerId)
       int nContainer, Resource resource, int priority, int startContainerId)
       throws Exception {
       throws Exception {