Browse Source

YARN-9164. Shutdown NM may cause NPE when opportunistic container scheduling is enabled. Contributed by lujie.

Weiwei Yang 6 năm trước cách đây
mục cha
commit
cfe89e6f96

+ 5 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.java

@@ -348,9 +348,11 @@ public class OpportunisticContainerAllocatorAMService
       RMContainer rmContainer =
           SchedulerUtils.createOpportunisticRmContainer(
               rmContext, container, isRemotelyAllocated);
-      rmContainer.handle(
-          new RMContainerEvent(container.getId(),
-              RMContainerEventType.ACQUIRED));
+      if (rmContainer!=null) {
+        rmContainer.handle(
+            new RMContainerEvent(container.getId(),
+                RMContainerEventType.ACQUIRED));
+      }
     }
   }
 

+ 8 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -693,8 +693,10 @@ public abstract class AbstractYarnScheduler
         LOG.debug("Completed container: " + rmContainer.getContainerId() +
             " in state: " + rmContainer.getState() + " event:" + event);
       }
-      getSchedulerNode(rmContainer.getNodeId()).releaseContainer(
-          rmContainer.getContainerId(), false);
+      SchedulerNode node = getSchedulerNode(rmContainer.getNodeId());
+      if (node != null) {
+        node.releaseContainer(rmContainer.getContainerId(), false);
+      }
     }
 
     // If the container is getting killed in ACQUIRED state, the requester (AM
@@ -1300,8 +1302,10 @@ public abstract class AbstractYarnScheduler
               uReq.getContainerUpdateType()) {
             RMContainer demotedRMContainer =
                 createDemotedRMContainer(appAttempt, oppCntxt, rmContainer);
-            appAttempt.addToNewlyDemotedContainers(
-                uReq.getContainerId(), demotedRMContainer);
+            if (demotedRMContainer != null) {
+              appAttempt.addToNewlyDemotedContainers(
+                      uReq.getContainerId(), demotedRMContainer);
+            }
           } else {
             RMContainer demotedRMContainer = createDecreasedRMContainer(
                 appAttempt, uReq, rmContainer);

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java

@@ -564,6 +564,11 @@ public class SchedulerUtils {
 
   public static RMContainer createOpportunisticRmContainer(RMContext rmContext,
       Container container, boolean isRemotelyAllocated) {
+    SchedulerNode node = ((AbstractYarnScheduler) rmContext.getScheduler())
+        .getNode(container.getNodeId());
+    if (node == null) {
+      return null;
+    }
     SchedulerApplicationAttempt appAttempt =
         ((AbstractYarnScheduler) rmContext.getScheduler())
             .getCurrentAttemptForContainer(container.getId());
@@ -572,8 +577,7 @@ public class SchedulerUtils {
         appAttempt.getApplicationAttemptId(), container.getNodeId(),
         appAttempt.getUser(), rmContext, isRemotelyAllocated);
     appAttempt.addRMContainer(container.getId(), rmContainer);
-    ((AbstractYarnScheduler) rmContext.getScheduler()).getNode(
-        container.getNodeId()).allocateContainer(rmContainer);
+    node.allocateContainer(rmContainer);
     return rmContainer;
   }
 }

+ 78 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.java

@@ -23,6 +23,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
 import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
@@ -72,14 +73,19 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistrib
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -88,6 +94,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo
     .FifoScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
@@ -95,12 +102,17 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.base.Supplier;
+
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.TimeoutException;
 
 /**
  * Test cases for {@link OpportunisticContainerAllocatorAMService}.
@@ -798,6 +810,72 @@ public class TestOpportunisticContainerAllocatorAMService {
     Assert.assertEquals(1, ctxt.getNodeMap().size());
   }
 
+  @Test(timeout = 60000)
+  public void testAppAttemptRemovalAfterNodeRemoval() throws Exception {
+    MockNM nm = new MockNM("h:1234", 4096, rm.getResourceTrackerService());
+    nm.registerNode();
+    OpportunisticContainerAllocatorAMService amservice =
+        (OpportunisticContainerAllocatorAMService) rm
+            .getApplicationMasterService();
+    RMApp app = rm.submitApp(1 * GB, "app", "user", null, "default");
+    ApplicationAttemptId attemptId =
+        app.getCurrentAppAttempt().getAppAttemptId();
+    MockAM am = MockRM.launchAndRegisterAM(app, rm, nm);
+    ResourceScheduler scheduler = rm.getResourceScheduler();
+    SchedulerApplicationAttempt schedulerAttempt =
+        ((CapacityScheduler)scheduler).getApplicationAttempt(attemptId);
+    RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm.getNodeId());
+    nm.nodeHeartbeat(true);
+    ((RMNodeImpl) rmNode1)
+        .setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
+    // Send add and update node events to AM Service.
+    amservice.handle(new NodeAddedSchedulerEvent(rmNode1));
+    amservice.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return scheduler.getNumClusterNodes() == 1;
+        }
+      }, 10, 200 * 100);
+    }catch (TimeoutException e) {
+      fail("timed out while waiting for NM to add.");
+    }
+    AllocateResponse allocateResponse = am.allocate(
+            Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1),
+                "*", Resources.createResource(1 * GB), 2, true, null,
+                ExecutionTypeRequest.newInstance(
+                    ExecutionType.OPPORTUNISTIC, true))),
+                null);
+    List<Container> allocatedContainers = allocateResponse
+        .getAllocatedContainers();
+    Container container = allocatedContainers.get(0);
+    scheduler.handle(new NodeRemovedSchedulerEvent(rmNode1));
+    try {
+      GenericTestUtils.waitFor(new Supplier<Boolean>() {
+        @Override public Boolean get() {
+          return scheduler.getNumClusterNodes() == 0;
+        }
+      }, 10, 200 * 100);
+    }catch (TimeoutException e) {
+      fail("timed out while waiting for NM to remove.");
+    }
+    //test YARN-9165
+    RMContainer rmContainer = null;
+    rmContainer = SchedulerUtils.createOpportunisticRmContainer(
+                    rm.getRMContext(), container, true);
+    if (rmContainer == null) {
+      rmContainer = new RMContainerImpl(container,
+        SchedulerRequestKey.extractFrom(container),
+        schedulerAttempt.getApplicationAttemptId(), container.getNodeId(),
+        schedulerAttempt.getUser(), rm.getRMContext(), true);
+    }
+    assert(rmContainer!=null);
+    //test YARN-9164
+    schedulerAttempt.addRMContainer(container.getId(), rmContainer);
+    scheduler.handle(new AppAttemptRemovedSchedulerEvent(attemptId,
+        RMAppAttemptState.FAILED, false));
+  }
+
   private OpportunisticContainersStatus getOppurtunisticStatus(int waitTime,
       int queueLength) {
     OpportunisticContainersStatus status1 =