瀏覽代碼

YARN-3798. ZKRMStateStore shouldn't create new session without occurrance of SESSIONEXPIED. (ozawa and Varun Saxena)

Tsuyoshi Ozawa 9 年之前
父節點
當前提交
b898f8014f

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -45,6 +45,9 @@ Release 2.6.2 - UNRELEASED
     YARN-3896. RMNode transitioned from RUNNING to REBOOTED because its response id 
     has not been reset synchronously. (Jun Gong via rohithsharmaks)
 
+    YARN-3798. ZKRMStateStore shouldn't create new session without occurrance of 
+    SESSIONEXPIED. (ozawa and Varun Saxena)
+
 Release 2.6.1 - 2015-09-23
 
   INCOMPATIBLE CHANGES

+ 76 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -28,6 +28,8 @@ import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -63,6 +65,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.Ap
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
@@ -110,6 +113,20 @@ public class ZKRMStateStore extends RMStateStore {
   private List<ACL> zkAcl;
   private List<ZKUtil.ZKAuthInfo> zkAuths;
 
+  class ZKSyncOperationCallback implements AsyncCallback.VoidCallback {
+    public final CountDownLatch latch = new CountDownLatch(1);
+    @Override
+    public void processResult(int rc, String path, Object ctx){
+      if (rc == Code.OK.intValue()) {
+        LOG.info("ZooKeeper sync operation succeeded. path: " + path);
+        latch.countDown();
+      } else {
+        LOG.fatal("ZooKeeper sync operation failed. Waiting for session " +
+            "timeout. path: " + path);
+      }
+    }
+  }
+
   /**
    *
    * ROOT_DIR_PATH
@@ -295,6 +312,7 @@ public class ZKRMStateStore extends RMStateStore {
     createRootDir(delegationTokensRootPath);
     createRootDir(dtSequenceNumberPath);
     createRootDir(amrmTokenSecretManagerRoot);
+    syncInternal(zkRootNodePath);
   }
 
   private void createRootDir(final String rootPath) throws Exception {
@@ -915,6 +933,7 @@ public class ZKRMStateStore extends RMStateStore {
           // call listener to reconnect
           LOG.info("ZKRMStateStore Session expired");
           createConnection();
+          syncInternal(event.getPath());
           break;
         default:
           LOG.error("Unexpected Zookeeper" +
@@ -931,6 +950,27 @@ public class ZKRMStateStore extends RMStateStore {
     return (root + "/" + nodeName);
   }
 
+  /**
+   * Helper method to call ZK's sync() after calling createConnection().
+   * Note that sync path is meaningless for now:
+   * http://mail-archives.apache.org/mod_mbox/zookeeper-user/201102.mbox/browser
+   * @param path path to sync, nullable value. If the path is null,
+   *             zkRootNodePath is used to sync.
+   * @return true if ZK.sync() succeededs, false if ZK.sync() fails.
+   * @throws InterruptedException
+   */
+  private boolean syncInternal(String path) throws InterruptedException {
+    ZKSyncOperationCallback cb = new ZKSyncOperationCallback();
+    if (path != null) {
+      zkClient.sync(path, cb, null);
+    } else {
+      zkClient.sync(zkRootNodePath, cb, null);
+    }
+    boolean succeededToSync = cb.latch.await(
+        zkSessionTimeout, TimeUnit.MILLISECONDS);
+    return succeededToSync;
+  }
+
   /**
    * Helper method that creates fencing node, executes the passed operations,
    * and deletes the fencing node.
@@ -1091,6 +1131,18 @@ public class ZKRMStateStore extends RMStateStore {
       switch (code) {
         case CONNECTIONLOSS:
         case OPERATIONTIMEOUT:
+          return true;
+        default:
+          break;
+      }
+      return false;
+    }
+
+    private boolean shouldRetryWithNewConnection(Code code) {
+      // For fast recovery, we choose to close current connection after
+      // SESSIONMOVED occurs. Latest state of a zknode path is ensured by
+      // following zk.sync(path) operation.
+      switch (code) {
         case SESSIONEXPIRED:
         case SESSIONMOVED:
           return true;
@@ -1118,12 +1170,34 @@ public class ZKRMStateStore extends RMStateStore {
             return null;
           }
           LOG.info("Exception while executing a ZK operation.", ke);
-          if (shouldRetry(ke.code()) && ++retry < numRetries) {
+          retry++;
+          if (shouldRetry(ke.code()) && retry < numRetries) {
             LOG.info("Retrying operation on ZK. Retry no. " + retry);
             Thread.sleep(zkRetryInterval);
-            createConnection();
             continue;
           }
+          if (shouldRetryWithNewConnection(ke.code()) && retry < numRetries) {
+            LOG.info("Retrying operation on ZK with new Connection. " +
+                "Retry no. " + retry);
+            Thread.sleep(zkRetryInterval);
+            createConnection();
+            boolean succeededToSync = false;
+            try {
+              succeededToSync = syncInternal(ke.getPath());
+            } catch (InterruptedException ie) {
+              LOG.info("Interrupted sync operation. Giving up!");
+              Thread.currentThread().interrupt();
+              throw ke;
+            }
+            if (succeededToSync) {
+              // continue the operation.
+              continue;
+            } else {
+              // Giving up since new connection without sync can occur an
+              // unexpected view from the client like YARN-3798.
+              LOG.info("Failed to sync with ZK new connection.");
+            }
+          }
           LOG.info("Maxed out ZK retries. Giving up!");
           throw ke;
         }