Explorar o código

YARN-5694. ZKRMStateStore can prevent the transition to standby if the ZK node is unreachable. Contributed by Daniel Templeton

Jian He %!s(int64=8) %!d(string=hai) anos
pai
achega
db716c27ba

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -389,7 +389,7 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   @Override
-  protected synchronized void closeInternal() throws Exception {
+  protected void closeInternal() throws Exception {
     if (verifyActiveStatusThread != null) {
       verifyActiveStatusThread.interrupt();
       verifyActiveStatusThread.join(1000);
@@ -976,9 +976,12 @@ public class ZKRMStateStore extends RMStateStore {
   /**
    * Helper method that creates fencing node, executes the passed operations,
    * and deletes the fencing node.
+   *
+   * @param opList the list of ZK operations to perform
+   * @throws Exception if any of the ZK operations fail
    */
-  private synchronized void doMultiWithRetries(
-      final List<Op> opList) throws Exception {
+  @VisibleForTesting
+  synchronized void doMultiWithRetries(final List<Op> opList) throws Exception {
     final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
     execOpList.add(createFencingNodePathOp);
     execOpList.addAll(opList);

+ 92 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStore.java

@@ -23,6 +23,9 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,12 +37,15 @@ import org.apache.hadoop.yarn.conf.HAUtil;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.records.Version;
 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.Op;
 import org.apache.zookeeper.data.Stat;
 import org.junit.Assert;
 import org.junit.Test;
+import static org.junit.Assert.assertFalse;
 
 public class TestZKRMStateStore extends RMStateStoreTestBase {
 
@@ -47,7 +53,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
   private static final int ZK_TIMEOUT_MS = 1000;
 
   class TestZKRMStateStoreTester implements RMStateStoreHelper {
-
     ZooKeeper client;
     TestZKRMStateStoreInternal store;
     String workingZnode;
@@ -58,7 +63,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
           throws Exception {
         init(conf);
         start();
-        assertTrue(znodeWorkingPath.equals(workingZnode));
+        assertEquals(workingZnode, znodeWorkingPath);
       }
 
       @Override
@@ -238,4 +243,89 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
         HAServiceProtocol.HAServiceState.ACTIVE,
         rm2.getRMContext().getRMAdminService().getServiceStatus().getState());
   }
+
+  @Test
+  public void testTransitionWithUnreachableZK() throws Exception {
+    final AtomicBoolean zkUnreachable = new AtomicBoolean(false);
+    final CountDownLatch threadHung = new CountDownLatch(1);
+    final Configuration conf = createHARMConf("rm1,rm2", "rm1", 1234);
+    conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, false);
+
+    // Create a state store that can simulate losing contact with the ZK node
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester() {
+      @Override
+      public RMStateStore getRMStateStore() throws Exception {
+        YarnConfiguration storeConf = new YarnConfiguration(conf);
+        workingZnode = "/Test";
+        storeConf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
+        storeConf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
+            workingZnode);
+        storeConf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 500);
+        this.client = createClient();
+        this.store = new TestZKRMStateStoreInternal(storeConf, workingZnode) {
+          @Override
+          synchronized void doMultiWithRetries(final List<Op> opList)
+              throws Exception {
+            if (zkUnreachable.get()) {
+              // Let the test know that it can now proceed
+              threadHung.countDown();
+
+              // Take a long nap while holding the lock to simulate the ZK node
+              // being unreachable. This behavior models what happens in
+              // super.doStoreMultiWithRetries() when the ZK node it unreachble.
+              // If that behavior changes, then this test should also change or
+              // be phased out.
+              Thread.sleep(60000);
+            } else {
+              // Business as usual
+              super.doMultiWithRetries(opList);
+            }
+          }
+        };
+        return this.store;
+      }
+    };
+
+    // Start with a single RM in HA mode
+    final RMStateStore store = zkTester.getRMStateStore();
+    final MockRM rm = new MockRM(conf, store);
+    rm.start();
+
+    // Make the RM active
+    final StateChangeRequestInfo req = new StateChangeRequestInfo(
+        HAServiceProtocol.RequestSource.REQUEST_BY_USER);
+
+    rm.getRMContext().getRMAdminService().transitionToActive(req);
+    assertEquals("RM with ZKStore didn't start",
+        Service.STATE.STARTED, rm.getServiceState());
+    assertEquals("RM should be Active",
+        HAServiceProtocol.HAServiceState.ACTIVE,
+        rm.getRMContext().getRMAdminService().getServiceStatus().getState());
+
+    // Simulate the ZK node going dark and wait for the
+    // VerifyActiveStatusThread to hang
+    zkUnreachable.set(true);
+
+    assertTrue("Unable to perform test because Verify Active Status Thread "
+        + "did not run", threadHung.await(2, TimeUnit.SECONDS));
+
+    // Try to transition the RM to standby.  Give up after 2000ms.
+    Thread standby = new Thread(new Runnable() {
+      @Override
+      public void run() {
+        try {
+          rm.getRMContext().getRMAdminService().transitionToStandby(req);
+        } catch (IOException ex) {
+          // OK to exit
+        }
+      }
+    }, "Test Unreachable ZK Thread");
+
+    standby.start();
+    standby.join(2000);
+
+    assertFalse("The thread initiating the transition to standby is hung",
+        standby.isAlive());
+    zkUnreachable.set(false);
+  }
 }