Quellcode durchsuchen

YARN-8153. Guaranteed containers always stay in SCHEDULED on NM after restart. Contributed by Yang Wang.

Weiwei Yang vor 7 Jahren
Ursprung
Commit
84531ad9b6

+ 5 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java

@@ -501,8 +501,11 @@ public class ContainerScheduler extends AbstractService implements
 
   private void startContainer(Container container) {
     LOG.info("Starting container [" + container.getContainerId()+ "]");
-    runningContainers.put(container.getContainerId(), container);
-    this.utilizationTracker.addContainerResources(container);
+    // Skip to put into runningContainers and addUtilization when recover
+    if (!runningContainers.containsKey(container.getContainerId())) {
+      runningContainers.put(container.getContainerId(), container);
+      this.utilizationTracker.addContainerResources(container);
+    }
     if (container.getContainerTokenIdentifier().getExecutionType() ==
         ExecutionType.OPPORTUNISTIC) {
       this.metrics.startOpportunisticContainer(container.getResource());

+ 51 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -91,6 +92,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ResourceMappings;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncher;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEvent;
@@ -439,6 +441,54 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     assertNotNull(app);
     containerStatus = getContainerStatus(context, cm, cid);
     assertEquals(targetResource, containerStatus.getCapability());
+    cm.stop();
+  }
+
+  @Test
+  public void testContainerSchedulerRecovery() throws Exception {
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
+    NMStateStoreService stateStore = new NMMemoryStateStoreService();
+    stateStore.init(conf);
+    stateStore.start();
+    context = createContext(conf, stateStore);
+    ContainerManagerImpl cm = createContainerManager(context, delSrvc);
+    ((NMContext) context).setContainerManager(cm);
+    cm.init(conf);
+    cm.start();
+    // add an application by starting a container
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    ApplicationAttemptId attemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId cid = ContainerId.newContainerId(attemptId, 1);
+
+    commonLaunchContainer(appId, cid, cm);
+
+    Application app = context.getApplications().get(appId);
+    assertNotNull(app);
+
+    ResourceUtilization utilization =
+        ResourceUtilization.newInstance(1024, 2048, 0.25F);
+    assertEquals(cm.getContainerScheduler().getNumRunningContainers(), 1);
+    assertEquals(utilization,
+        cm.getContainerScheduler().getCurrentUtilization());
+
+    // restart and verify container scheduler has recovered correctly
+    cm.stop();
+    context = createContext(conf, stateStore);
+    cm = createContainerManager(context, delSrvc);
+    ((NMContext) context).setContainerManager(cm);
+    cm.init(conf);
+    cm.start();
+    assertEquals(1, context.getApplications().size());
+    app = context.getApplications().get(appId);
+    assertNotNull(app);
+    waitForNMContainerState(cm, cid, ContainerState.RUNNING);
+
+    assertEquals(cm.getContainerScheduler().getNumRunningContainers(), 1);
+    assertEquals(utilization,
+        cm.getContainerScheduler().getCurrentUtilization());
+    cm.stop();
   }
 
   @Test
@@ -494,6 +544,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
         resourceMappings.getAssignedResources("numa").equals(numaResources));
     Assert.assertTrue(
         resourceMappings.getAssignedResources("fpga").equals(fpgaResources));
+    cm.stop();
   }
 
   @Test