Browse Source

YARN-8232. RMContainer lost queue name when RM HA happens. (Hu Ziqian via wangda)

Change-Id: Ia21e1da6871570c993bbedde76ce32929e95970f
(cherry picked from commit 6b96a73bb0f0ad1c877a062b19091e3e15a33ec4)
Wangda Tan 7 years ago
parent
commit
a5a9c8cf0f

+ 4 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -515,7 +515,8 @@ public abstract class AbstractYarnScheduler
         }
 
         // create container
-        RMContainer rmContainer = recoverAndCreateContainer(container, nm);
+        RMContainer rmContainer = recoverAndCreateContainer(container, nm,
+            schedulerApp.getQueue().getQueueName());
 
         // recover RMContainer
         rmContainer.handle(
@@ -565,7 +566,7 @@ public abstract class AbstractYarnScheduler
   }
 
   private RMContainer recoverAndCreateContainer(NMContainerStatus status,
-      RMNode node) {
+      RMNode node, String queueName) {
     Container container =
         Container.newInstance(status.getContainerId(), node.getNodeID(),
           node.getHttpAddress(), status.getAllocatedResource(),
@@ -578,6 +579,7 @@ public abstract class AbstractYarnScheduler
         SchedulerRequestKey.extractFrom(container), attemptId, node.getNodeID(),
         applications.get(attemptId.getApplicationId()).getUser(), rmContext,
         status.getCreationTime(), status.getNodeLabelExpression());
+    ((RMContainerImpl) rmContainer).setQueueName(queueName);
     return rmContainer;
   }
 

+ 50 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java

@@ -30,22 +30,14 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.ContainerState;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.*;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -59,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
@@ -874,4 +867,51 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
       rm.stop();
     }
   }
+
+  @Test(timeout=60000)
+  public void testContainerRecoveredByNode() throws Exception {
+    System.out.println("Starting testContainerRecoveredByNode");
+    final int maxMemory = 10 * 1024;
+    YarnConfiguration conf = getConf();
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    conf.set(
+        YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+    MockRM rm1 = new MockRM(conf);
+    try {
+      rm1.start();
+      RMApp app1 =
+          rm1.submitApp(200, "name", "user",
+              new HashMap<ApplicationAccessType, String>(), false, "default",
+              -1, null, "Test", false, true);
+      MockNM nm1 =
+          new MockNM("127.0.0.1:1234", 10240, rm1.getResourceTrackerService());
+      nm1.registerNode();
+      MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+      am1.allocate("127.0.0.1", 8192, 1, new ArrayList<ContainerId>());
+
+      YarnScheduler scheduler = rm1.getResourceScheduler();
+
+      RMNode node1 = MockNodes.newNodeInfo(
+          0, Resources.createResource(maxMemory), 1, "127.0.0.2");
+      ContainerId containerId = ContainerId.newContainerId(
+          app1.getCurrentAppAttempt().getAppAttemptId(), 2);
+      NMContainerStatus containerReport =
+          NMContainerStatus.newInstance(containerId, 0, ContainerState.RUNNING,
+              Resource.newInstance(1024, 1), "recover container", 0,
+              Priority.newInstance(0), 0);
+      List<NMContainerStatus> containerReports = new ArrayList<>();
+      containerReports.add(containerReport);
+      scheduler.handle(new NodeAddedSchedulerEvent(node1, containerReports));
+      RMContainer rmContainer = scheduler.getRMContainer(containerId);
+
+      //verify queue name when rmContainer is recovered
+      Assert.assertEquals(app1.getQueue(), rmContainer.getQueueName());
+
+    } finally {
+      rm1.stop();
+      System.out.println("Stopping testContainerRecoveredByNode");
+    }
+  }
 }