Bläddra i källkod

YARN-3933. FairScheduler: Multiple calls to completedContainer are not safe. (Shiwei Guo and Miklos Szegedi via kasha)

(cherry picked from commit 646c6d6509f515b1373288869fb92807fa2ddc9b)
(cherry picked from commit 30ff5bff1aa88545ab5bf56958951ba9525fd8ab)
Karthik Kambatla 8 år sedan
förälder
incheckning
233252aa9a

+ 7 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -124,7 +124,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
   synchronized public void containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
-    
+
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
     
@@ -140,9 +140,13 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
         );
     LOG.info("Completed container: " + rmContainer.getContainerId() + 
         " in state: " + rmContainer.getState() + " event:" + event);
-    
+
     // Remove from the list of containers
-    liveContainers.remove(rmContainer.getContainerId());
+    if (liveContainers.remove(containerId) == null) {
+      LOG.info("Additional complete request on completed container " +
+          rmContainer.getContainerId());
+      return;
+    }
 
     RMAuditLogger.logSuccess(getUser(), 
         AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 

+ 54 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -92,12 +92,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
@@ -3915,7 +3917,58 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     verifyAppRunnable(attId5, false);
     verifyQueueNumRunnable("queue1", 2, 1);
   }
-  
+
+  @Test
+  public void testMultipleCompletedEvent() throws Exception {
+    // Set up a fair scheduler
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"queue1\">");
+    out.println("<maxAMShare>0.2</maxAMShare>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Create a node
+    RMNode node =
+        MockNodes.newNodeInfo(1, Resources.createResource(20480, 20),
+            0, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent = new NodeAddedSchedulerEvent(node);
+    NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node);
+    scheduler.handle(nodeEvent);
+    scheduler.update();
+
+    // Launch an app
+    ApplicationAttemptId attId1 = createAppAttemptId(1, 1);
+    createApplicationWithAMResource(
+        attId1, "queue1", "user1",
+        Resource.newInstance(1024, 1));
+    createSchedulingRequestExistingApplication(
+        1024, 1,
+        RMAppAttemptImpl.AM_CONTAINER_PRIORITY.getPriority(), attId1);
+    FSAppAttempt app1 = scheduler.getSchedulerApp(attId1);
+    scheduler.update();
+    scheduler.handle(updateEvent);
+
+    RMContainer container = app1.getLiveContainersMap().
+        values().iterator().next();
+    scheduler.completedContainer(container, SchedulerUtils
+        .createAbnormalContainerStatus(container.getContainerId(),
+            SchedulerUtils.LOST_CONTAINER), RMContainerEventType.KILL);
+    scheduler.completedContainer(container, SchedulerUtils
+        .createAbnormalContainerStatus(container.getContainerId(),
+            SchedulerUtils.COMPLETED_APPLICATION),
+        RMContainerEventType.FINISHED);
+    assertEquals(Resources.none(), app1.getResourceUsage());
+  }
+
   @Test
   public void testQueueMaxAMShare() throws Exception {
     conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);