Browse Source

YARN-5333. Some recovered apps are put into default queue when RM HA. Contributed by Jun Gong.

Sunil G 8 years ago
parent
commit
fd112535c8

+ 65 - 38
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -302,15 +302,7 @@ public class AdminService extends CompositeService implements
 
     UserGroupInformation user = checkAccess("transitionToActive");
     checkHaStateChange(reqInfo);
-    try {
-      rm.transitionToActive();
-    } catch (Exception e) {
-      RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
-          "", "RMHAProtocolService",
-          "Exception transitioning to active");
-      throw new ServiceFailedException(
-          "Error when transitioning to Active mode", e);
-    }
+
     try {
       // call all refresh*s for active RM to get the updated configurations.
       refreshAll();
@@ -320,10 +312,22 @@ public class AdminService extends CompositeService implements
           .getDispatcher()
           .getEventHandler()
           .handle(
-          new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED, e));
+              new RMFatalEvent(RMFatalEventType.TRANSITION_TO_ACTIVE_FAILED,
+                  e));
+      throw new ServiceFailedException(
+          "Error on refreshAll during transition to Active", e);
+    }
+
+    try {
+      rm.transitionToActive();
+    } catch (Exception e) {
+      RMAuditLogger.logFailure(user.getShortUserName(), "transitionToActive",
+          "", "RM",
+          "Exception transitioning to active");
       throw new ServiceFailedException(
-          "Error on refreshAll during transistion to Active", e);
+          "Error when transitioning to Active mode", e);
     }
+
     RMAuditLogger.logSuccess(user.getShortUserName(), "transitionToActive",
         "RMHAProtocolService");
   }
@@ -378,12 +382,7 @@ public class AdminService extends CompositeService implements
     RefreshQueuesResponse response =
         recordFactory.newRecordInstance(RefreshQueuesResponse.class);
     try {
-      rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
-      // refresh the reservation system
-      ReservationSystem rSystem = rmContext.getReservationSystem();
-      if (rSystem != null) {
-        rSystem.reinitialize(getConfig(), rmContext);
-      }
+      refreshQueues();
       RMAuditLogger.logSuccess(user.getShortUserName(), argName,
           "AdminService");
       return response;
@@ -392,6 +391,15 @@ public class AdminService extends CompositeService implements
     }
   }
 
+  private void refreshQueues() throws IOException, YarnException {
+    rmContext.getScheduler().reinitialize(getConfig(), this.rmContext);
+    // refresh the reservation system
+    ReservationSystem rSystem = rmContext.getReservationSystem();
+    if (rSystem != null) {
+      rSystem.reinitialize(getConfig(), rmContext);
+    }
+  }
+
   @Override
   public RefreshNodesResponse refreshNodes(RefreshNodesRequest request)
       throws YarnException, StandbyException {
@@ -414,6 +422,13 @@ public class AdminService extends CompositeService implements
     }
   }
 
+  private void refreshNodes() throws IOException, YarnException {
+    Configuration conf =
+        getConfiguration(new Configuration(false),
+            YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
+    rmContext.getNodesListManager().refreshNodes(conf);
+  }
+
   @Override
   public RefreshSuperUserGroupsConfigurationResponse refreshSuperUserGroupsConfiguration(
       RefreshSuperUserGroupsConfigurationRequest request)
@@ -423,6 +438,16 @@ public class AdminService extends CompositeService implements
 
     checkRMStatus(user.getShortUserName(), argName, "refresh super-user-groups.");
 
+    refreshSuperUserGroupsConfiguration();
+    RMAuditLogger.logSuccess(user.getShortUserName(),
+        argName, "AdminService");
+
+    return recordFactory.newRecordInstance(
+        RefreshSuperUserGroupsConfigurationResponse.class);
+  }
+
+  private void refreshSuperUserGroupsConfiguration()
+      throws IOException, YarnException {
     // Accept hadoop common configs in core-site.xml as well as RM specific
     // configurations in yarn-site.xml
     Configuration conf =
@@ -431,11 +456,6 @@ public class AdminService extends CompositeService implements
             YarnConfiguration.YARN_SITE_CONFIGURATION_FILE);
     RMServerUtils.processRMProxyUsersConf(conf);
     ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
-    RMAuditLogger.logSuccess(user.getShortUserName(),
-        argName, "AdminService");
-    
-    return recordFactory.newRecordInstance(
-        RefreshSuperUserGroupsConfigurationResponse.class);
   }
 
   @Override
@@ -447,16 +467,20 @@ public class AdminService extends CompositeService implements
 
     checkRMStatus(user.getShortUserName(), argName, "refresh user-groups.");
 
-    Groups.getUserToGroupsMappingService(
-        getConfiguration(new Configuration(false),
-            YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
-
-    RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService");
+    refreshUserToGroupsMappings();
+    RMAuditLogger.logSuccess(user.getShortUserName(), argName,
+            "AdminService");
 
     return recordFactory.newRecordInstance(
         RefreshUserToGroupsMappingsResponse.class);
   }
 
+  private void refreshUserToGroupsMappings() throws IOException, YarnException {
+    Groups.getUserToGroupsMappingService(
+        getConfiguration(new Configuration(false),
+            YarnConfiguration.CORE_SITE_CONFIGURATION_FILE)).refresh();
+  }
+
   @Override
   public RefreshAdminAclsResponse refreshAdminAcls(
       RefreshAdminAclsRequest request) throws YarnException, IOException {
@@ -499,6 +523,14 @@ public class AdminService extends CompositeService implements
 
     checkRMStatus(user.getShortUserName(), argName, "refresh Service ACLs.");
 
+    refreshServiceAcls();
+    RMAuditLogger.logSuccess(user.getShortUserName(), argName,
+            "AdminService");
+
+    return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
+  }
+
+  private void refreshServiceAcls() throws IOException, YarnException {
     PolicyProvider policyProvider = RMPolicyProvider.getInstance();
     Configuration conf =
         getConfiguration(new Configuration(false),
@@ -510,10 +542,6 @@ public class AdminService extends CompositeService implements
         conf, policyProvider);
     rmContext.getResourceTrackerService().refreshServiceAcls(
         conf, policyProvider);
-
-    RMAuditLogger.logSuccess(user.getShortUserName(), argName, "AdminService");
-
-    return recordFactory.newRecordInstance(RefreshServiceAclsResponse.class);
   }
 
   private synchronized void refreshServiceAcls(Configuration configuration,
@@ -597,16 +625,15 @@ public class AdminService extends CompositeService implements
 
   private void refreshAll() throws ServiceFailedException {
     try {
-      refreshQueues(RefreshQueuesRequest.newInstance());
-      refreshNodes(RefreshNodesRequest.newInstance());
-      refreshSuperUserGroupsConfiguration(
-          RefreshSuperUserGroupsConfigurationRequest.newInstance());
-      refreshUserToGroupsMappings(
-          RefreshUserToGroupsMappingsRequest.newInstance());
+      checkAcls("refreshAll");
+      refreshQueues();
+      refreshNodes();
+      refreshSuperUserGroupsConfiguration();
+      refreshUserToGroupsMappings();
       if (getConfig().getBoolean(
           CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
           false)) {
-        refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
+        refreshServiceAcls();
       }
     } catch (Exception ex) {
       throw new ServiceFailedException(ex.getMessage());

+ 69 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -52,6 +52,7 @@ import javax.xml.parsers.ParserConfigurationException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.GroupMappingServiceProvider;
 import org.apache.hadoop.yarn.MockApps;
@@ -73,9 +74,13 @@ import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -83,6 +88,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -95,7 +101,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtil
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
@@ -4524,4 +4529,67 @@ public class TestFairScheduler extends FairSchedulerTestBase {
     assertEquals("Incorrect number of perf metrics", 1,
         collector.getRecords().size());
   }
+
+  @Test(timeout = 120000)
+  public void testRefreshQueuesWhenRMHA() throws Exception {
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
+    conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false);
+    conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false);
+    conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+    HAServiceProtocol.StateChangeRequestInfo requestInfo =
+        new HAServiceProtocol.StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    // 1. start a standby RM, file 'ALLOC_FILE' is empty, so there is no queues
+    MockRM rm1 = new MockRM(conf, null);
+    rm1.init(conf);
+    rm1.start();
+    rm1.getAdminService().transitionToStandby(requestInfo);
+
+    // 2. add a new queue "test_queue"
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<queue name=\"test_queue\">");
+    out.println("  <maxRunningApps>3</maxRunningApps>");
+    out.println("</queue>");
+    out.println("</allocations>");
+    out.close();
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+
+    // 3. start a active RM
+    MockRM rm2 = new MockRM(conf, memStore);
+    rm2.init(conf);
+    rm2.start();
+
+    MockNM nm =
+        new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+    nm.registerNode();
+
+    rm2.getAdminService().transitionToActive(requestInfo);
+
+    // 4. submit a app to the new added queue "test_queue"
+    RMApp app = rm2.submitApp(200, "test_app", "user", null, "test_queue");
+    RMAppAttempt attempt0 = app.getCurrentAppAttempt();
+    nm.nodeHeartbeat(true);
+    MockAM am0 = rm2.sendAMLaunched(attempt0.getAppAttemptId());
+    am0.registerAppAttempt();
+    assertEquals("root.test_queue", app.getQueue());
+
+    // 5. Transit rm1 to active, recover app
+    ((RMContextImpl) rm1.getRMContext()).setStateStore(memStore);
+    rm1.getAdminService().transitionToActive(requestInfo);
+    rm1.drainEvents();
+    assertEquals(1, rm1.getRMContext().getRMApps().size());
+    RMApp recoveredApp =
+        rm1.getRMContext().getRMApps().values().iterator().next();
+    assertEquals("root.test_queue", recoveredApp.getQueue());
+
+    rm1.stop();
+    rm2.stop();
+  }
 }