Explorar o código

YARN-1861. Fixed a bug in RM to reset leader-election on fencing that was causing both RMs to be stuck in standby mode when automatic failover is enabled. Contributed by Karthik Kambatla and Xuan Gong.
svn merge --ignore-ancestry -c 1594356 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.4@1594359 13f79535-47bb-0310-9956-ffa450edef68

Vinod Kumar Vavilapalli %!s(int64=11) %!d(string=hai) anos
pai
achega
04dfc3ee4c

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -101,6 +101,10 @@ Release 2.4.1 - UNRELEASED
     YARN-1201. TestAMAuthorization fails with local hostname cannot be resolved. 
     (Wangda Tan via junping_du)
 
+    YARN-1861. Fixed a bug in RM to reset leader-election on fencing that was
+    causing both RMs to be stuck in standby mode when automatic failover is
+    enabled. (Karthik Kambatla and Xuan Gong via vinodkv)
+
 Release 2.4.0 - 2014-04-07 
 
   INCOMPATIBLE CHANGES

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java

@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.ha.HAServiceProtocol;
+import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -42,6 +43,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
 import org.junit.After;
 import org.junit.Assert;
@@ -169,6 +173,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
     verifyConnections();
   }
 
+  @SuppressWarnings("unchecked")
   @Test
   public void testAutomaticFailover()
       throws YarnException, InterruptedException, IOException {
@@ -186,6 +191,25 @@ public class TestRMFailover extends ClientBaseWithFixes {
 
     failover();
     verifyConnections();
+
+    // Make the current Active handle an RMFatalEvent,
+    // so it transitions to standby.
+    ResourceManager rm = cluster.getResourceManager(
+        cluster.getActiveRMIndex());
+    RMFatalEvent event =
+        new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
+            "Fake RMFatalEvent");
+    rm.getRMContext().getDispatcher().getEventHandler().handle(event);
+    int maxWaitingAttempts = 2000;
+    while (maxWaitingAttempts-- > 0 ) {
+      if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
+        break;
+      }
+      Thread.sleep(1);
+    }
+    Assert.assertFalse("RM didn't transition to Standby ",
+        maxWaitingAttempts == 0);
+    verifyConnections();
   }
 
   @Test

+ 11 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java

@@ -26,6 +26,7 @@ import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ha.HAServiceProtocol;
@@ -86,6 +87,7 @@ public class AdminService extends CompositeService implements
   private String rmId;
 
   private boolean autoFailoverEnabled;
+  private EmbeddedElectorService embeddedElector;
 
   private Server server;
   private InetSocketAddress masterServiceAddress;
@@ -106,7 +108,8 @@ public class AdminService extends CompositeService implements
       autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf);
       if (autoFailoverEnabled) {
         if (HAUtil.isAutomaticFailoverEmbedded(conf)) {
-          addIfService(createEmbeddedElectorService());
+          embeddedElector = createEmbeddedElectorService();
+          addIfService(embeddedElector);
         }
       }
     }
@@ -181,6 +184,13 @@ public class AdminService extends CompositeService implements
     return new EmbeddedElectorService(rmContext);
   }
 
+  @InterfaceAudience.Private
+  void resetLeaderElection() {
+    if (embeddedElector != null) {
+      embeddedElector.resetLeaderElection();
+    }
+  }
+
   private UserGroupInformation checkAccess(String method) throws IOException {
     return RMServerUtils.verifyAccess(adminAcl, method, LOG);
   }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java

@@ -190,4 +190,9 @@ public class EmbeddedElectorService extends AbstractService
     }
     return true;
   }
+
+  public void resetLeaderElection() {
+    elector.quitElection(false);
+    elector.joinElection(localActiveNodeInfo);
+  }
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -664,6 +664,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
             // Transition to standby and reinit active services
             LOG.info("Transitioning RM to Standby mode");
             rm.transitionToStandby(true);
+            rm.adminService.resetLeaderElection();
             return;
           } catch (Exception e) {
             LOG.fatal("Failed to transition RM to Standby mode.");

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

@@ -658,12 +658,14 @@ public class MiniYARNCluster extends CompositeService {
    */
   public boolean waitForNodeManagersToConnect(long timeout)
       throws YarnException, InterruptedException {
-    ResourceManager rm = getResourceManager();
     GetClusterMetricsRequest req = GetClusterMetricsRequest.newInstance();
-
     for (int i = 0; i < timeout / 100; i++) {
-      if (nodeManagers.length == rm.getClientRMService().getClusterMetrics(req)
-          .getClusterMetrics().getNumNodeManagers()) {
+      ResourceManager rm = getResourceManager();
+      if (rm == null) {
+        throw new YarnException("Can not find the active RM.");
+      }
+      else if (nodeManagers.length == rm.getClientRMService()
+            .getClusterMetrics(req).getClusterMetrics().getNumNodeManagers()) {
         return true;
       }
       Thread.sleep(100);