瀏覽代碼

YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)

(cherry picked from commit 8d88691d162f87f95c9ed7e0a569ef08e8385d4f)
(cherry picked from commit 0d62e948877e5d50f1b6fbe735a94ac6da5ff472)
(cherry picked from commit 4a5b0e708d42fbff571229a43d1762d1767e2db5)
Karthik Kambatla 10 年之前
父節點
當前提交
5a6755cc0f

+ 9 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java

@@ -90,6 +90,14 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
         // XXX this doesn't need to be volatile! (Should probably be final)
         volatile CountDownLatch clientConnected;
         volatile boolean connected;
+        protected ZooKeeper client;
+
+        public void initializeWatchedClient(ZooKeeper zk) {
+            if (client != null) {
+                throw new RuntimeException("Watched Client was already set");
+            }
+            client = zk;
+        }
 
         public CountdownWatcher() {
             reset();
@@ -191,8 +199,7 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
                 zk.close();
             }
         }
-
-
+        watcher.initializeWatchedClient(zk);
         return zk;
     }
 

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -94,6 +94,9 @@ Release 2.6.1 - UNRELEASED
     YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending 
     jobs. (Siqi Li via kasha)
 
+    YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving 
+    events for old client. (Zhihai Xu via kasha)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 32 - 21
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -153,7 +153,13 @@ public class ZKRMStateStore extends RMStateStore {
 
   @VisibleForTesting
   protected ZooKeeper zkClient;
-  private ZooKeeper oldZkClient;
+
+  /* activeZkClient is not used to do actual operations,
+   * it is only used to verify client session for watched events and
+   * it gets activated into zkClient on connection event.
+   */
+  @VisibleForTesting
+  ZooKeeper activeZkClient;
 
   /** Fencing related variables */
   private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@@ -355,21 +361,14 @@ public class ZKRMStateStore extends RMStateStore {
   }
 
   private synchronized void closeZkClients() throws IOException {
-    if (zkClient != null) {
+    zkClient = null;
+    if (activeZkClient != null) {
       try {
-        zkClient.close();
+        activeZkClient.close();
       } catch (InterruptedException e) {
         throw new IOException("Interrupted while closing ZK", e);
       }
-      zkClient = null;
-    }
-    if (oldZkClient != null) {
-      try {
-        oldZkClient.close();
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while closing old ZK", e);
-      }
-      oldZkClient = null;
+      activeZkClient = null;
     }
   }
 
@@ -860,11 +859,16 @@ public class ZKRMStateStore extends RMStateStore {
    * hides the ZK methods of the store from its public interface
    */
   private final class ForwardingWatcher implements Watcher {
+    private ZooKeeper watchedZkClient;
+
+    public ForwardingWatcher(ZooKeeper client) {
+      this.watchedZkClient = client;
+    }
 
     @Override
     public void process(WatchedEvent event) {
       try {
-        ZKRMStateStore.this.processWatchEvent(event);
+        ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
       } catch (Throwable t) {
         LOG.error("Failed to process watcher event " + event + ": "
             + StringUtils.stringifyException(t));
@@ -875,8 +879,16 @@ public class ZKRMStateStore extends RMStateStore {
   @VisibleForTesting
   @Private
   @Unstable
-  public synchronized void processWatchEvent(WatchedEvent event)
-      throws Exception {
+  public synchronized void processWatchEvent(ZooKeeper zk,
+      WatchedEvent event) throws Exception {
+    // only process watcher event from current ZooKeeper Client session.
+    if (zk != activeZkClient) {
+      LOG.info("Ignore watcher event type: " + event.getType() +
+          " with state:" + event.getState() + " for path:" +
+          event.getPath() + " from old session");
+      return;
+    }
+
     Event.EventType eventType = event.getType();
     LOG.info("Watcher event type: " + eventType + " with state:"
         + event.getState() + " for path:" + event.getPath() + " for " + this);
@@ -887,17 +899,15 @@ public class ZKRMStateStore extends RMStateStore {
       switch (event.getState()) {
         case SyncConnected:
           LOG.info("ZKRMStateStore Session connected");
-          if (oldZkClient != null) {
+          if (zkClient == null) {
             // the SyncConnected must be from the client that sent Disconnected
-            zkClient = oldZkClient;
-            oldZkClient = null;
+            zkClient = activeZkClient;
             ZKRMStateStore.this.notifyAll();
             LOG.info("ZKRMStateStore Session restored");
           }
           break;
         case Disconnected:
           LOG.info("ZKRMStateStore Session disconnected");
-          oldZkClient = zkClient;
           zkClient = null;
           break;
         case Expired:
@@ -1127,7 +1137,8 @@ public class ZKRMStateStore extends RMStateStore {
     for (int retries = 0; retries < numRetries && zkClient == null;
         retries++) {
       try {
-        zkClient = getNewZooKeeper();
+        activeZkClient = getNewZooKeeper();
+        zkClient = activeZkClient;
         for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
           zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
         }
@@ -1157,7 +1168,7 @@ public class ZKRMStateStore extends RMStateStore {
   protected synchronized ZooKeeper getNewZooKeeper()
       throws IOException, InterruptedException {
     ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
-    zk.register(new ForwardingWatcher());
+    zk.register(new ForwardingWatcher(zk));
     return zk;
   }
 

+ 26 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java

@@ -71,6 +71,7 @@ public class TestZKRMStateStoreZKClientConnections extends
 
     ZKRMStateStore store;
     boolean forExpire = false;
+    TestForwardingWatcher oldWatcher;
     TestForwardingWatcher watcher;
     CyclicBarrier syncBarrier = new CyclicBarrier(2);
 
@@ -86,35 +87,36 @@ public class TestZKRMStateStoreZKClientConnections extends
       @Override
       public ZooKeeper getNewZooKeeper()
           throws IOException, InterruptedException {
+        oldWatcher = watcher;
+        watcher = new TestForwardingWatcher();
         return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
       }
 
       @Override
-      public synchronized void processWatchEvent(WatchedEvent event)
-          throws Exception {
+      public synchronized void processWatchEvent(ZooKeeper zk,
+          WatchedEvent event) throws Exception {
 
         if (forExpire) {
           // a hack... couldn't find a way to trigger expired event.
           WatchedEvent expriredEvent = new WatchedEvent(
               Watcher.Event.EventType.None,
               Watcher.Event.KeeperState.Expired, null);
-          super.processWatchEvent(expriredEvent);
+          super.processWatchEvent(zk, expriredEvent);
           forExpire = false;
           syncBarrier.await();
         } else {
-          super.processWatchEvent(event);
+          super.processWatchEvent(zk, event);
         }
       }
     }
 
     private class TestForwardingWatcher extends
         ClientBaseWithFixes.CountdownWatcher {
-
       public void process(WatchedEvent event) {
         super.process(event);
         try {
           if (store != null) {
-            store.processWatchEvent(event);
+            store.processWatchEvent(client, event);
           }
         } catch (Throwable t) {
           LOG.error("Failed to process watcher event " + event + ": "
@@ -127,7 +129,6 @@ public class TestZKRMStateStoreZKClientConnections extends
       String workingZnode = "/Test";
       conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
       conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
-      watcher = new TestForwardingWatcher();
       this.store = new TestZKRMStateStore(conf, workingZnode);
       return this.store;
     }
@@ -239,6 +240,24 @@ public class TestZKRMStateStoreZKClientConnections extends
       LOG.error(error, e);
       fail(error);
     }
+
+    // send Disconnected event from old client session to ZKRMStateStore
+    // check the current client session is not affected.
+    Assert.assertTrue(zkClientTester.oldWatcher != null);
+    WatchedEvent disconnectedEvent = new WatchedEvent(
+        Watcher.Event.EventType.None,
+        Watcher.Event.KeeperState.Disconnected, null);
+    zkClientTester.oldWatcher.process(disconnectedEvent);
+    Assert.assertTrue(store.zkClient != null);
+
+    zkClientTester.watcher.process(disconnectedEvent);
+    Assert.assertTrue(store.zkClient == null);
+    WatchedEvent connectedEvent = new WatchedEvent(
+        Watcher.Event.EventType.None,
+        Watcher.Event.KeeperState.SyncConnected, null);
+    zkClientTester.watcher.process(connectedEvent);
+    Assert.assertTrue(store.zkClient != null);
+    Assert.assertTrue(store.zkClient == store.activeZkClient);
   }
 
   @Test(timeout = 20000)