Pārlūkot izejas kodu

Revert "YARN-8233. NPE in CapacityScheduler#tryCommit when handling allocate/reserve proposal whose allocatedOrReservedContainer is null. Contributed by Tao Yang."

This reverts commit dd8479e80d3f0fe87a6edb099e7f617bff42106a.
Akira Ajisaka 6 gadi atpakaļ
vecāks
revīzija
52af95fdce

+ 28 - 58
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -2627,11 +2627,7 @@ public class CapacityScheduler extends
         .getContainersToKill().isEmpty()) {
       list = new ArrayList<>();
       for (RMContainer rmContainer : csAssignment.getContainersToKill()) {
-        SchedulerContainer schedulerContainer =
-            getSchedulerContainer(rmContainer, false);
-        if (schedulerContainer != null) {
-          list.add(schedulerContainer);
-        }
+        list.add(getSchedulerContainer(rmContainer, false));
       }
     }
 
@@ -2639,16 +2635,10 @@ public class CapacityScheduler extends
       if (null == list) {
         list = new ArrayList<>();
       }
-      SchedulerContainer schedulerContainer =
-          getSchedulerContainer(csAssignment.getExcessReservation(), false);
-      if (schedulerContainer != null) {
-        list.add(schedulerContainer);
-      }
+      list.add(
+          getSchedulerContainer(csAssignment.getExcessReservation(), false));
     }
 
-    if (list != null && list.isEmpty()) {
-      list = null;
-    }
     return list;
   }
 
@@ -2733,15 +2723,11 @@ public class CapacityScheduler extends
       ((RMContainerImpl)rmContainer).setAllocationTags(
           new HashSet<>(schedulingRequest.getAllocationTags()));
 
-      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
-          schedulerContainer = getSchedulerContainer(rmContainer, true);
-      if (schedulerContainer == null) {
-        allocated = null;
-      } else {
-        allocated = new ContainerAllocationProposal<>(schedulerContainer,
-            null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, resource);
-      }
+      allocated = new ContainerAllocationProposal<>(
+          getSchedulerContainer(rmContainer, true),
+          null, null, NodeType.NODE_LOCAL, NodeType.NODE_LOCAL,
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+          resource);
     }
 
     if (null != allocated) {
@@ -2771,27 +2757,16 @@ public class CapacityScheduler extends
           csAssignment.getAssignmentInformation().getAllocationDetails();
       if (!allocations.isEmpty()) {
         RMContainer rmContainer = allocations.get(0).rmContainer;
-        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
-            schedulerContainer = getSchedulerContainer(rmContainer, true);
-        if (schedulerContainer == null) {
-          allocated = null;
-          // Decrease unconfirmed resource if app is alive
-          FiCaSchedulerApp app = getApplicationAttempt(
-              rmContainer.getApplicationAttemptId());
-          if (app != null) {
-            app.decUnconfirmedRes(rmContainer.getAllocatedResource());
-          }
-        } else {
-          allocated = new ContainerAllocationProposal<>(schedulerContainer,
-              getSchedulerContainersToRelease(csAssignment),
-              getSchedulerContainer(
-                  csAssignment.getFulfilledReservedContainer(), false),
-              csAssignment.getType(), csAssignment.getRequestLocalityType(),
-              csAssignment.getSchedulingMode() != null ?
-                  csAssignment.getSchedulingMode() :
-                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
-              csAssignment.getResource());
-        }
+        allocated = new ContainerAllocationProposal<>(
+            getSchedulerContainer(rmContainer, true),
+            getSchedulerContainersToRelease(csAssignment),
+            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
+                false), csAssignment.getType(),
+            csAssignment.getRequestLocalityType(),
+            csAssignment.getSchedulingMode() != null ?
+                csAssignment.getSchedulingMode() :
+                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+            csAssignment.getResource());
       }
 
       // Reserved something
@@ -2799,21 +2774,16 @@ public class CapacityScheduler extends
           csAssignment.getAssignmentInformation().getReservationDetails();
       if (!reservation.isEmpty()) {
         RMContainer rmContainer = reservation.get(0).rmContainer;
-        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
-            schedulerContainer = getSchedulerContainer(rmContainer, false);
-        if (schedulerContainer == null) {
-          reserved = null;
-        } else {
-          reserved = new ContainerAllocationProposal<>(schedulerContainer,
-              getSchedulerContainersToRelease(csAssignment),
-              getSchedulerContainer(
-                  csAssignment.getFulfilledReservedContainer(), false),
-              csAssignment.getType(), csAssignment.getRequestLocalityType(),
-              csAssignment.getSchedulingMode() != null ?
-                  csAssignment.getSchedulingMode() :
-                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
-              csAssignment.getResource());
-        }
+        reserved = new ContainerAllocationProposal<>(
+            getSchedulerContainer(rmContainer, false),
+            getSchedulerContainersToRelease(csAssignment),
+            getSchedulerContainer(csAssignment.getFulfilledReservedContainer(),
+                false), csAssignment.getType(),
+            csAssignment.getRequestLocalityType(),
+            csAssignment.getSchedulingMode() != null ?
+                csAssignment.getSchedulingMode() :
+                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY,
+            csAssignment.getResource());
       }
     }
 

+ 0 - 83
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAsyncScheduling.java

@@ -56,11 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerA
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
@@ -846,86 +843,6 @@ public class TestCapacitySchedulerAsyncScheduling {
     return new ResourceCommitRequest(allocateProposals, null, null);
   }
 
-  @Test(timeout = 30000)
-  public void testReturnNullWhenGetSchedulerContainer() throws Exception {
-    // disable async-scheduling for simulating complex scenario
-    Configuration disableAsyncConf = new Configuration(conf);
-    disableAsyncConf.setBoolean(
-        CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false);
-
-    // init RM & NMs
-    final MockRM rm = new MockRM(disableAsyncConf);
-    rm.start();
-    final MockNM nm1 = rm.registerNode("192.168.0.1:1234", 8 * GB);
-    final MockNM nm2 = rm.registerNode("192.168.0.2:2234", 8 * GB);
-    rm.drainEvents();
-    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
-    SchedulerNode sn1 = cs.getSchedulerNode(nm1.getNodeId());
-    RMNode rmNode1 = cs.getNode(nm1.getNodeId()).getRMNode();
-    SchedulerNode sn2 = cs.getSchedulerNode(nm2.getNodeId());
-
-    // launch app1-am on nm1
-    RMApp app1 = rm.submitApp(1 * GB, "app1", "user", null, false, "default",
-        YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS, null, null, true, true);
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
-
-    // app2 asks 1 * 1G container
-    am1.allocate(ImmutableList.of(ResourceRequest
-        .newInstance(Priority.newInstance(0), "*",
-            Resources.createResource(1 * GB), 1)), null);
-    RMContainer amContainer = cs.getRMContainer(
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 1));
-
-    // spy CapacityScheduler
-    final CapacityScheduler spyCs = Mockito.spy(cs);
-    // hook CapacityScheduler#submitResourceCommitRequest
-    List<CSAssignment> assignmentSnapshots = new ArrayList<>();
-    Mockito.doAnswer(new Answer<Object>() {
-      public Boolean answer(InvocationOnMock invocation) throws Exception {
-        CSAssignment assignment = (CSAssignment) invocation.getArguments()[1];
-        if (cs.getNode(nm1.getNodeId()) != null) {
-          // decommission nm1 for first allocation on nm1
-          cs.getRMContext().getDispatcher().getEventHandler().handle(
-              new RMNodeEvent(nm1.getNodeId(), RMNodeEventType.DECOMMISSION));
-          rm.drainEvents();
-          Assert.assertEquals(NodeState.DECOMMISSIONED, rmNode1.getState());
-          Assert.assertNull(cs.getNode(nm1.getNodeId()));
-          assignmentSnapshots.add(assignment);
-        } else {
-          // add am container on nm1 to containersToKill
-          // for second allocation on nm2
-          assignment.setContainersToKill(ImmutableList.of(amContainer));
-        }
-        // check no NPE in actual submit, before YARN-8233 will throw NPE
-        cs.submitResourceCommitRequest((Resource) invocation.getArguments()[0],
-            assignment);
-        return false;
-      }
-    }).when(spyCs).submitResourceCommitRequest(Mockito.any(Resource.class),
-        Mockito.any(CSAssignment.class));
-
-    // allocation on nm1, test return null when get scheduler container
-    CandidateNodeSet<FiCaSchedulerNode> candidateNodeSet =
-        new SimpleCandidateNodeSet(sn1);
-    spyCs.allocateContainersToNode(candidateNodeSet, false);
-    // make sure unconfirmed resource is decreased correctly
-    Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
-        .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL,
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
-
-    // allocation on nm2,
-    // test return null when get scheduler container to release
-    candidateNodeSet =
-        new SimpleCandidateNodeSet(sn2);
-    spyCs.allocateContainersToNode(candidateNodeSet, false);
-    // make sure unconfirmed resource is decreased correctly
-    Assert.assertTrue(spyCs.getApplicationAttempt(am1.getApplicationAttemptId())
-        .hasPendingResourceRequest(RMNodeLabelsManager.NO_LABEL,
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
-
-    rm.stop();
-  }
-
   private void keepNMHeartbeat(List<MockNM> mockNMs, int interval) {
     if (nmHeartbeatThread != null) {
       nmHeartbeatThread.setShouldStop();