Browse Source

YARN-1734. Fixed ResourceManager to update the configurations when it transits from standby to active mode so as to assimilate any changes that happened while it was in standby mode. Contributed by Xuan Gong.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1571539 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 11 years ago
parent
commit
ad70f26b1f

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -348,6 +348,10 @@ Release 2.4.0 - UNRELEASED
     re-registration after a RESYNC and thus avoid hanging. (Rohith Sharma via
     re-registration after a RESYNC and thus avoid hanging. (Rohith Sharma via
     vinodkv)
     vinodkv)
 
 
+    YARN-1734. Fixed ResourceManager to update the configurations when it
+    transits from standby to active mode so as to assimilate any changes that
+    happened while it was in standby mode. (Xuan Gong via vinodkv)
+
 Release 2.3.1 - UNRELEASED
 Release 2.3.1 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 42 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -250,10 +250,20 @@ public class AdminService extends CompositeService implements
   @Override
   @Override
   public synchronized void transitionToActive(
   public synchronized void transitionToActive(
       HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
       HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
+    // call refreshAdminAcls before HA state transition
+    // for the case that adminAcls have been updated in previous active RM
+    try {
+      refreshAdminAcls(false);
+    } catch (YarnException ex) {
+      throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
+    }
+
     UserGroupInformation user = checkAccess("transitionToActive");
     UserGroupInformation user = checkAccess("transitionToActive");
     checkHaStateChange(reqInfo);
     checkHaStateChange(reqInfo);
     try {
     try {
       rm.transitionToActive();
       rm.transitionToActive();
+      // call all refresh*s for active RM to get the updated configurations.
+      refreshAll();
       RMAuditLogger.logSuccess(user.getShortUserName(),
       RMAuditLogger.logSuccess(user.getShortUserName(),
           "transitionToActive", "RMHAProtocolService");
           "transitionToActive", "RMHAProtocolService");
     } catch (Exception e) {
     } catch (Exception e) {
@@ -268,6 +278,13 @@ public class AdminService extends CompositeService implements
   @Override
   @Override
   public synchronized void transitionToStandby(
   public synchronized void transitionToStandby(
       HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
       HAServiceProtocol.StateChangeRequestInfo reqInfo) throws IOException {
+    // call refreshAdminAcls before HA state transition
+    // for the case that adminAcls have been updated in previous active RM
+    try {
+      refreshAdminAcls(false);
+    } catch (YarnException ex) {
+      throw new ServiceFailedException("Can not execute refreshAdminAcls", ex);
+    }
     UserGroupInformation user = checkAccess("transitionToStandby");
     UserGroupInformation user = checkAccess("transitionToStandby");
     checkHaStateChange(reqInfo);
     checkHaStateChange(reqInfo);
     try {
     try {
@@ -406,10 +423,15 @@ public class AdminService extends CompositeService implements
   @Override
   @Override
   public RefreshAdminAclsResponse refreshAdminAcls(
   public RefreshAdminAclsResponse refreshAdminAcls(
       RefreshAdminAclsRequest request) throws YarnException, IOException {
       RefreshAdminAclsRequest request) throws YarnException, IOException {
+    return refreshAdminAcls(true);
+  }
+
+  private RefreshAdminAclsResponse refreshAdminAcls(boolean checkRMHAState)
+      throws YarnException, IOException {
     String argName = "refreshAdminAcls";
     String argName = "refreshAdminAcls";
     UserGroupInformation user = checkAcls(argName);
     UserGroupInformation user = checkAcls(argName);
-    
-    if (!isRMActive()) {
+
+    if (checkRMHAState && !isRMActive()) {
       RMAuditLogger.logFailure(user.getShortUserName(), argName,
       RMAuditLogger.logFailure(user.getShortUserName(), argName,
           adminAcl.toString(), "AdminService",
           adminAcl.toString(), "AdminService",
           "ResourceManager is not active. Can not refresh user-groups.");
           "ResourceManager is not active. Can not refresh user-groups.");
@@ -521,6 +543,24 @@ public class AdminService extends CompositeService implements
     return conf;
     return conf;
   }
   }
 
 
+  private void refreshAll() throws ServiceFailedException {
+    try {
+      refreshQueues(RefreshQueuesRequest.newInstance());
+      refreshNodes(RefreshNodesRequest.newInstance());
+      refreshSuperUserGroupsConfiguration(
+          RefreshSuperUserGroupsConfigurationRequest.newInstance());
+      refreshUserToGroupsMappings(
+          RefreshUserToGroupsMappingsRequest.newInstance());
+      if (getConfig().getBoolean(
+          CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
+          false)) {
+        refreshServiceAcls(RefreshServiceAclsRequest.newInstance());
+      }
+    } catch (Exception ex) {
+      throw new ServiceFailedException(ex.getMessage());
+    }
+  }
+
   @VisibleForTesting
   @VisibleForTesting
   public AccessControlList getAccessControlList() {
   public AccessControlList getAccessControlList() {
     return this.adminAcl;
     return this.adminAcl;

+ 92 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMAdminService.java

@@ -34,12 +34,16 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
+import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
 import org.apache.hadoop.security.GroupMappingServiceProvider;
 import org.apache.hadoop.security.GroupMappingServiceProvider;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
+import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RefreshAdminAclsRequest;
@@ -518,6 +522,94 @@ public class TestRMAdminService {
     Assert.assertTrue(excludeHosts.contains("0.0.0.0:123"));
     Assert.assertTrue(excludeHosts.contains("0.0.0.0:123"));
   }
   }
 
 
+  @Test
+  public void testRMHAWithFileSystemBasedConfiguration() throws IOException,
+      YarnException {
+    StateChangeRequestInfo requestInfo = new StateChangeRequestInfo(
+        HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+    configuration.set(YarnConfiguration.RM_CONFIGURATION_PROVIDER_CLASS,
+        "org.apache.hadoop.yarn.FileSystemBasedConfigurationProvider");
+    configuration.setBoolean(YarnConfiguration.RM_HA_ENABLED, true);
+    configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    configuration.set(YarnConfiguration.RM_HA_IDS, "rm1,rm2");
+    int base = 100;
+    for (String confKey : YarnConfiguration
+        .getServiceAddressConfKeys(configuration)) {
+      configuration.set(HAUtil.addSuffix(confKey, "rm1"), "0.0.0.0:"
+          + (base + 20));
+      configuration.set(HAUtil.addSuffix(confKey, "rm2"), "0.0.0.0:"
+          + (base + 40));
+      base = base * 2;
+    }
+    Configuration conf1 = new Configuration(configuration);
+    conf1.set(YarnConfiguration.RM_HA_ID, "rm1");
+    Configuration conf2 = new Configuration(configuration);
+    conf2.set(YarnConfiguration.RM_HA_ID, "rm2");
+
+    // upload default configurations
+    uploadDefaultConfiguration();
+
+    MockRM rm1 = null;
+    MockRM rm2 = null;
+    try {
+      rm1 = new MockRM(conf1);
+      rm1.init(conf1);
+      rm1.start();
+      Assert.assertTrue(rm1.getRMContext().getHAServiceState()
+          == HAServiceState.STANDBY);
+
+      rm2 = new MockRM(conf2);
+      rm2.init(conf1);
+      rm2.start();
+      Assert.assertTrue(rm2.getRMContext().getHAServiceState()
+          == HAServiceState.STANDBY);
+
+      rm1.adminService.transitionToActive(requestInfo);
+      Assert.assertTrue(rm1.getRMContext().getHAServiceState()
+          == HAServiceState.ACTIVE);
+
+      CapacitySchedulerConfiguration csConf =
+          new CapacitySchedulerConfiguration();
+      csConf.set("yarn.scheduler.capacity.maximum-applications", "5000");
+      uploadConfiguration(csConf, "capacity-scheduler.xml");
+
+      rm1.adminService.refreshQueues(RefreshQueuesRequest.newInstance());
+
+      int maxApps =
+          ((CapacityScheduler) rm1.getRMContext().getScheduler())
+              .getConfiguration().getMaximumSystemApplications();
+      Assert.assertEquals(maxApps, 5000);
+
+      // Before failover happens, the maxApps is
+      // still the default value on the standby rm : rm2
+      int maxAppsBeforeFailOver =
+          ((CapacityScheduler) rm2.getRMContext().getScheduler())
+              .getConfiguration().getMaximumSystemApplications();
+      Assert.assertEquals(maxAppsBeforeFailOver, 10000);
+
+      // Do the failover
+      rm1.adminService.transitionToStandby(requestInfo);
+      rm2.adminService.transitionToActive(requestInfo);
+      Assert.assertTrue(rm1.getRMContext().getHAServiceState()
+          == HAServiceState.STANDBY);
+      Assert.assertTrue(rm2.getRMContext().getHAServiceState()
+          == HAServiceState.ACTIVE);
+
+      int maxAppsAfter =
+          ((CapacityScheduler) rm2.getRMContext().getScheduler())
+              .getConfiguration().getMaximumSystemApplications();
+
+      Assert.assertEquals(maxAppsAfter, 5000);
+    } finally {
+      if (rm1 != null) {
+        rm1.stop();
+      }
+      if (rm2 != null) {
+        rm2.stop();
+      }
+    }
+  }
+
   private String writeConfigurationXML(Configuration conf, String confXMLName)
   private String writeConfigurationXML(Configuration conf, String confXMLName)
       throws IOException {
       throws IOException {
     DataOutputStream output = null;
     DataOutputStream output = null;