浏览代码

YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and FatalEventDispatcher try to transition RM to StandBy at the same time. Contributed by Rohith Sharmaks
(cherry picked from commit 395275af8622c780b9071c243422b0780e096202)

Jian He 10 年之前
父节点
当前提交
e29e864c51

+ 4 - 0
hadoop-yarn-project/CHANGES.txt

@@ -796,6 +796,10 @@ Release 2.6.0 - UNRELEASED
     YARN-2805. Fixed ResourceManager to load HA configs correctly before kerberos
     YARN-2805. Fixed ResourceManager to load HA configs correctly before kerberos
     login. (Wangda Tan via vinodkv)
     login. (Wangda Tan via vinodkv)
 
 
+    YARN-2579. Fixed a deadlock issue when EmbeddedElectorService and
+    FatalEventDispatcher try to transition RM to StandBy at the same time.
+    (Rohith Sharmaks via jianhe)
+
 Release 2.5.2 - UNRELEASED
 Release 2.5.2 - UNRELEASED
 
 
   INCOMPATIBLE CHANGES
   INCOMPATIBLE CHANGES

+ 1 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestRMFailover.java

@@ -43,8 +43,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
-import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxyServer;
 import org.junit.After;
 import org.junit.After;
@@ -173,7 +171,6 @@ public class TestRMFailover extends ClientBaseWithFixes {
     verifyConnections();
     verifyConnections();
   }
   }
 
 
-  @SuppressWarnings("unchecked")
   @Test
   @Test
   public void testAutomaticFailover()
   public void testAutomaticFailover()
       throws YarnException, InterruptedException, IOException {
       throws YarnException, InterruptedException, IOException {
@@ -196,10 +193,7 @@ public class TestRMFailover extends ClientBaseWithFixes {
     // so it transitions to standby.
     // so it transitions to standby.
     ResourceManager rm = cluster.getResourceManager(
     ResourceManager rm = cluster.getResourceManager(
         cluster.getActiveRMIndex());
         cluster.getActiveRMIndex());
-    RMFatalEvent event =
-        new RMFatalEvent(RMFatalEventType.STATE_STORE_FENCED,
-            "Fake RMFatalEvent");
-    rm.getRMContext().getDispatcher().getEventHandler().handle(event);
+    rm.handleTransitionToStandBy();
     int maxWaitingAttempts = 2000;
     int maxWaitingAttempts = 2000;
     while (maxWaitingAttempts-- > 0 ) {
     while (maxWaitingAttempts-- > 0 ) {
       if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {
       if (rm.getRMContext().getHAServiceState() == HAServiceState.STANDBY) {

+ 0 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMFatalEventType.java

@@ -23,7 +23,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 public enum RMFatalEventType {
 public enum RMFatalEventType {
   // Source <- Store
   // Source <- Store
-  STATE_STORE_FENCED,
   STATE_STORE_OP_FAILED,
   STATE_STORE_OP_FAILED,
 
 
   // Source <- Embedded Elector
   // Source <- Embedded Elector

+ 22 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -269,6 +269,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   @VisibleForTesting
   @VisibleForTesting
   protected void setRMStateStore(RMStateStore rmStore) {
   protected void setRMStateStore(RMStateStore rmStore) {
     rmStore.setRMDispatcher(rmDispatcher);
     rmStore.setRMDispatcher(rmDispatcher);
+    rmStore.setResourceManager(this);
     rmContext.setStateStore(rmStore);
     rmContext.setStateStore(rmStore);
   }
   }
 
 
@@ -397,11 +398,12 @@ public class ResourceManager extends CompositeService implements Recoverable {
     private EventHandler<SchedulerEvent> schedulerDispatcher;
     private EventHandler<SchedulerEvent> schedulerDispatcher;
     private ApplicationMasterLauncher applicationMasterLauncher;
     private ApplicationMasterLauncher applicationMasterLauncher;
     private ContainerAllocationExpirer containerAllocationExpirer;
     private ContainerAllocationExpirer containerAllocationExpirer;
-
+    private ResourceManager rm;
     private boolean recoveryEnabled;
     private boolean recoveryEnabled;
 
 
-    RMActiveServices() {
+    RMActiveServices(ResourceManager rm) {
       super("RMActiveServices");
       super("RMActiveServices");
+      this.rm = rm;
     }
     }
 
 
     @Override
     @Override
@@ -449,6 +451,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
       try {
       try {
         rmStore.init(conf);
         rmStore.init(conf);
         rmStore.setRMDispatcher(rmDispatcher);
         rmStore.setRMDispatcher(rmDispatcher);
+        rmStore.setResourceManager(rm);
       } catch (Exception e) {
       } catch (Exception e) {
         // the Exception from stateStore.init() needs to be handled for
         // the Exception from stateStore.init() needs to be handled for
         // HA and we need to give up master status if we got fenced
         // HA and we need to give up master status if we got fenced
@@ -729,39 +732,31 @@ public class ResourceManager extends CompositeService implements Recoverable {
   @Private
   @Private
   public static class RMFatalEventDispatcher
   public static class RMFatalEventDispatcher
       implements EventHandler<RMFatalEvent> {
       implements EventHandler<RMFatalEvent> {
-    private final RMContext rmContext;
-    private final ResourceManager rm;
-
-    public RMFatalEventDispatcher(
-        RMContext rmContext, ResourceManager resourceManager) {
-      this.rmContext = rmContext;
-      this.rm = resourceManager;
-    }
 
 
     @Override
     @Override
     public void handle(RMFatalEvent event) {
     public void handle(RMFatalEvent event) {
       LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
       LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
           event.getType().name() + ". Cause:\n" + event.getCause());
           event.getType().name() + ". Cause:\n" + event.getCause());
 
 
-      if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) {
-        LOG.info("RMStateStore has been fenced");
-        if (rmContext.isHAEnabled()) {
-          try {
-            // Transition to standby and reinit active services
-            LOG.info("Transitioning RM to Standby mode");
-            rm.transitionToStandby(true);
-            rm.adminService.resetLeaderElection();
-            return;
-          } catch (Exception e) {
-            LOG.fatal("Failed to transition RM to Standby mode.");
-          }
-        }
-      }
-
       ExitUtil.terminate(1, event.getCause());
       ExitUtil.terminate(1, event.getCause());
     }
     }
   }
   }
 
 
+  public void handleTransitionToStandBy() {
+    if (rmContext.isHAEnabled()) {
+      try {
+        // Transition to standby and reinit active services
+        LOG.info("Transitioning RM to Standby mode");
+        transitionToStandby(true);
+        adminService.resetLeaderElection();
+        return;
+      } catch (Exception e) {
+        LOG.fatal("Failed to transition RM to Standby mode.");
+        ExitUtil.terminate(1, e);
+      }
+    }
+  }
+
   @Private
   @Private
   public static final class ApplicationEventDispatcher implements
   public static final class ApplicationEventDispatcher implements
       EventHandler<RMAppEvent> {
       EventHandler<RMAppEvent> {
@@ -990,7 +985,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
    * @throws Exception
    * @throws Exception
    */
    */
   protected void createAndInitActiveServices() throws Exception {
   protected void createAndInitActiveServices() throws Exception {
-    activeServices = new RMActiveServices();
+    activeServices = new RMActiveServices(this);
     activeServices.init(conf);
     activeServices.init(conf);
   }
   }
 
 
@@ -1227,7 +1222,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
   private Dispatcher setupDispatcher() {
   private Dispatcher setupDispatcher() {
     Dispatcher dispatcher = createDispatcher();
     Dispatcher dispatcher = createDispatcher();
     dispatcher.register(RMFatalEventType.class,
     dispatcher.register(RMFatalEventType.class,
-        new ResourceManager.RMFatalEventDispatcher(this.rmContext, this));
+        new ResourceManager.RMFatalEventDispatcher());
     return dispatcher;
     return dispatcher;
   }
   }
 
 

+ 20 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@@ -87,6 +88,7 @@ public abstract class RMStateStore extends AbstractService {
       "AMRMTokenSecretManagerRoot";
       "AMRMTokenSecretManagerRoot";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String VERSION_NODE = "RMVersionNode";
   protected static final String EPOCH_NODE = "EpochNode";
   protected static final String EPOCH_NODE = "EpochNode";
+  private ResourceManager resourceManager;
 
 
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
 
@@ -818,13 +820,15 @@ public abstract class RMStateStore extends AbstractService {
    * @param failureCause the exception due to which the operation failed
    * @param failureCause the exception due to which the operation failed
    */
    */
   protected void notifyStoreOperationFailed(Exception failureCause) {
   protected void notifyStoreOperationFailed(Exception failureCause) {
-    RMFatalEventType type;
     if (failureCause instanceof StoreFencedException) {
     if (failureCause instanceof StoreFencedException) {
-      type = RMFatalEventType.STATE_STORE_FENCED;
+      Thread standByTransitionThread =
+          new Thread(new StandByTransitionThread());
+      standByTransitionThread.setName("StandByTransitionThread Handler");
+      standByTransitionThread.start();
     } else {
     } else {
-      type = RMFatalEventType.STATE_STORE_OP_FAILED;
+      rmDispatcher.getEventHandler().handle(
+        new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
     }
     }
-    rmDispatcher.getEventHandler().handle(new RMFatalEvent(type, failureCause));
   }
   }
  
  
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
@@ -866,4 +870,16 @@ public abstract class RMStateStore extends AbstractService {
    * @throws Exception
    * @throws Exception
    */
    */
   public abstract void deleteStore() throws Exception;
   public abstract void deleteStore() throws Exception;
+
+  public void setResourceManager(ResourceManager rm) {
+    this.resourceManager = rm;
+  }
+
+  private class StandByTransitionThread implements Runnable {
+    @Override
+    public void run() {
+      LOG.info("RMStateStore has been fenced");
+      resourceManager.handleTransitionToStandBy();
+    }
+  }
 }
 }

+ 62 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMHA.java

@@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFencedException;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -451,6 +452,67 @@ public class TestRMHA {
     checkActiveRMFunctionality();
     checkActiveRMFunctionality();
   }
   }
 
 
+  @Test(timeout = 90000)
+  public void testTransitionedToStandbyShouldNotHang() throws Exception {
+    configuration.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+    Configuration conf = new YarnConfiguration(configuration);
+
+    MemoryRMStateStore memStore = new MemoryRMStateStore() {
+      @Override
+      public synchronized void updateApplicationState(ApplicationState appState) {
+        notifyStoreOperationFailed(new StoreFencedException());
+      }
+    };
+    memStore.init(conf);
+    rm = new MockRM(conf, memStore) {
+      @Override
+      void stopActiveServices() throws Exception {
+        Thread.sleep(10000);
+        super.stopActiveServices();
+      }
+    };
+    rm.init(conf);
+    final StateChangeRequestInfo requestInfo =
+        new StateChangeRequestInfo(
+            HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    assertEquals(STATE_ERR, HAServiceState.INITIALIZING, rm.adminService
+        .getServiceStatus().getState());
+    assertFalse("RM is ready to become active before being started",
+        rm.adminService.getServiceStatus().isReadyToBecomeActive());
+    checkMonitorHealth();
+
+    rm.start();
+    checkMonitorHealth();
+    checkStandbyRMFunctionality();
+
+    // 2. Transition to Active.
+    rm.adminService.transitionToActive(requestInfo);
+
+    // 3. Try Transition to standby
+    Thread t = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          rm.transitionToStandby(true);
+        } catch (IOException e) {
+          e.printStackTrace();
+        } catch (Exception e) {
+          // TODO Auto-generated catch block
+          e.printStackTrace();
+        }
+      }
+    });
+    t.start();
+
+    rm.getRMContext().getStateStore().updateApplicationState(null);
+    t.join(); // wait for thread to finish
+
+    rm.adminService.transitionToStandby(requestInfo);
+    checkStandbyRMFunctionality();
+    rm.stop();
+  }
+
   public void innerTestHAWithRMHostName(boolean includeBindHost) {
   public void innerTestHAWithRMHostName(boolean includeBindHost) {
     //this is run two times, with and without a bind host configured
     //this is run two times, with and without a bind host configured
     if (includeBindHost) {
     if (includeBindHost) {