瀏覽代碼

YARN-4348. ZKRMStateStore.syncInternal shouldn't wait for sync completion for avoiding blocking ZK's event thread. (ozawa)

(cherry picked from commit 0460b8a8a3de232f236f49ef6769d38cda62cc28)
Tsuyoshi Ozawa 9 年之前
父節點
當前提交
921f56306b

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -51,6 +51,9 @@ Release 2.6.3 - UNRELEASED
     YARN-4365. FileSystemNodeLabelStore should check for root dir existence on
     startup (Kuhu Shukla via jlowe)
 
+    YARN-4348. ZKRMStateStore.syncInternal shouldn't wait for sync completion for
+    avoiding blocking ZK's event thread. (ozawa)
+
 Release 2.6.2 - 2015-10-28
 
   INCOMPATIBLE CHANGES

+ 15 - 27
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -114,12 +114,10 @@ public class ZKRMStateStore extends RMStateStore {
   private List<ZKUtil.ZKAuthInfo> zkAuths;
 
   class ZKSyncOperationCallback implements AsyncCallback.VoidCallback {
-    public final CountDownLatch latch = new CountDownLatch(1);
     @Override
     public void processResult(int rc, String path, Object ctx){
       if (rc == Code.OK.intValue()) {
         LOG.info("ZooKeeper sync operation succeeded. path: " + path);
-        latch.countDown();
       } else {
         LOG.fatal("ZooKeeper sync operation failed. Waiting for session " +
             "timeout. path: " + path);
@@ -959,16 +957,20 @@ public class ZKRMStateStore extends RMStateStore {
    * @return true if ZK.sync() succeededs, false if ZK.sync() fails.
    * @throws InterruptedException
    */
-  private boolean syncInternal(String path) throws InterruptedException {
-    ZKSyncOperationCallback cb = new ZKSyncOperationCallback();
-    if (path != null) {
-      zkClient.sync(path, cb, null);
-    } else {
-      zkClient.sync(zkRootNodePath, cb, null);
+  private void syncInternal(final String path) throws InterruptedException {
+    final ZKSyncOperationCallback cb = new ZKSyncOperationCallback();
+    final String pathForSync = (path != null) ? path : zkRootNodePath;
+    try {
+      new ZKAction<Void>() {
+        @Override
+        Void run() throws KeeperException, InterruptedException {
+          zkClient.sync(pathForSync, cb, null);
+          return null;
+        }
+      }.runWithRetries();
+    } catch (Exception e) {
+      LOG.fatal("sync failed.");
     }
-    boolean succeededToSync = cb.latch.await(
-        zkSessionTimeout, TimeUnit.MILLISECONDS);
-    return succeededToSync;
   }
 
   /**
@@ -1181,22 +1183,8 @@ public class ZKRMStateStore extends RMStateStore {
                 "Retry no. " + retry);
             Thread.sleep(zkRetryInterval);
             createConnection();
-            boolean succeededToSync = false;
-            try {
-              succeededToSync = syncInternal(ke.getPath());
-            } catch (InterruptedException ie) {
-              LOG.info("Interrupted sync operation. Giving up!");
-              Thread.currentThread().interrupt();
-              throw ke;
-            }
-            if (succeededToSync) {
-              // continue the operation.
-              continue;
-            } else {
-              // Giving up since new connection without sync can occur an
-              // unexpected view from the client like YARN-3798.
-              LOG.info("Failed to sync with ZK new connection.");
-            }
+            syncInternal(ke.getPath());
+            continue;
           }
           LOG.info("Maxed out ZK retries. Giving up!");
           throw ke;