浏览代码

YARN-11513: Applications submitted to ambiguous queue fail during recovery if "Specified" Placement Rule is used (#5748)

susheel-gupta 1 年之前
父节点
当前提交
c82ea52e4c

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -1056,7 +1056,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       LOG.info("Placed application with ID " + context.getApplicationId() +
           " in queue: " + placementContext.getQueue() +
           ", original submission queue was: " + context.getQueue());
-      context.setQueue(placementContext.getQueue());
+      context.setQueue(placementContext.getFullQueuePath());
     }
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java

@@ -283,7 +283,7 @@ public class TestAppManager extends AppManagerTestBase{
     setupDispatcher(rmContext, conf);
   }
 
-  private static PlacementManager createMockPlacementManager(
+  public static PlacementManager createMockPlacementManager(
       String userRegex, String placementQueue, String placementParentQueue
   ) throws YarnException {
     PlacementManager placementMgr = mock(PlacementManager.class);

+ 91 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java

@@ -1651,6 +1651,97 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     assertUnmanagedAMQueueMetrics(qm2, 1, 0, 0, 1);
   }
 
+  //   Test behavior of an app if two same name leaf queue with different queuePath
+  //   during work preserving rm restart with %specified mapping Placement Rule.
+  //   Test case does following:
+  //1. Submit an apps to queue root.joe.test.
+  //2. While the applications is running, restart the rm and
+  //   check whether the app submitted to the queue it was submitted initially.
+  //3. Verify that application running successfully.
+  @Test(timeout = 60000)
+  public void testQueueRecoveryOnRMWorkPreservingRestart() throws Exception {
+    if (getSchedulerType() != SchedulerType.CAPACITY) {
+      return;
+    }
+    CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
+
+    csConf.setQueues(
+            CapacitySchedulerConfiguration.ROOT, new String[] {"default", "joe", "john"});
+    csConf.setCapacity(
+            CapacitySchedulerConfiguration.ROOT + "." + "joe", 25);
+    csConf.setCapacity(
+            CapacitySchedulerConfiguration.ROOT + "." + "john", 25);
+    csConf.setCapacity(
+            CapacitySchedulerConfiguration.ROOT + "." + "default", 50);
+
+    final String q1 = CapacitySchedulerConfiguration.ROOT + "." + "joe";
+    final String q2 = CapacitySchedulerConfiguration.ROOT + "." + "john";
+    csConf.setQueues(q1, new String[] {"test"});
+    csConf.setQueues(q2, new String[] {"test"});
+    csConf.setCapacity(
+            CapacitySchedulerConfiguration.ROOT + "." + "joe.test", 100);
+    csConf.setCapacity(
+            CapacitySchedulerConfiguration.ROOT + "." + "john.test", 100);
+
+    csConf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON,
+        "{\"rules\" : [{\"type\": \"user\", \"policy\" : \"specified\", " +
+        "\"fallbackResult\" : \"skip\", \"matches\" : \"*\"}]}");
+
+    // start RM
+    rm1 = new MockRM(csConf);
+    rm1.start();
+    MockMemoryRMStateStore memStore =
+        (MockMemoryRMStateStore) rm1.getRMStateStore();
+    MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+    nm1.registerNode();
+
+    RMContext newMockRMContext = rm1.getRMContext();
+    newMockRMContext.setQueuePlacementManager(TestAppManager.createMockPlacementManager(
+        "user1|user2", "test", "root.joe"));
+
+    MockRMAppSubmissionData data =
+        MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1)
+            .withAppName("app")
+            .withQueue("root.joe.test")
+            .withUser("user1")
+            .withAcls(null)
+            .build();
+
+    RMApp app = MockRMAppSubmitter.submit(rm1, data);
+    MockAM am = MockRM.launchAndRegisterAM(app, rm1, nm1);
+    rm1.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+
+    MockRM rm2 = new MockRM(csConf, memStore) {
+      @Override
+      protected RMAppManager createRMAppManager() {
+        return new RMAppManager(this.rmContext, this.scheduler,
+            this.masterService, this.applicationACLsManager, conf) {
+          @Override
+          ApplicationPlacementContext placeApplication(
+              PlacementManager placementManager,
+              ApplicationSubmissionContext context, String user,
+              boolean isRecovery) throws YarnException {
+            return super.placeApplication(
+                    newMockRMContext.getQueuePlacementManager(), context, user, isRecovery);
+          }
+        };
+      }
+    };
+
+    nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+    rm2.start();
+    RMApp recoveredApp0 =
+        rm2.getRMContext().getRMApps().get(app.getApplicationId());
+
+    rm2.waitForState(recoveredApp0.getApplicationId(), RMAppState.ACCEPTED);
+    am.setAMRMProtocol(rm2.getApplicationMasterService(), rm2.getRMContext());
+    am.registerAppAttempt(true);
+    rm2.waitForState(recoveredApp0.getApplicationId(), RMAppState.RUNNING);
+
+    Assert.assertEquals("root.joe.test", recoveredApp0.getQueue());
+  }
+
   private void assertUnmanagedAMQueueMetrics(QueueMetrics qm, int appsSubmitted,
       int appsPending, int appsRunning, int appsCompleted) {
     Assert.assertEquals(appsSubmitted, qm.getUnmanagedAppsSubmitted());