浏览代码

YARN-5623. Apply SLIDER-1166 to yarn-native-services branch. Contributed by Gour Saha

Jian He 8 年之前
父节点
当前提交
9dc46aa379

+ 6 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java

@@ -604,11 +604,14 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe
       BlockingZKWatcher watcher = new BlockingZKWatcher();
       BlockingZKWatcher watcher = new BlockingZKWatcher();
       client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher,
       client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher,
           ZKIntegration.SESSION_TIMEOUT);
           ZKIntegration.SESSION_TIMEOUT);
-      client.init();
-      watcher.waitForZKConnection(2 * 1000);
+      boolean fromCache = client.init();
+      if (!fromCache) {
+        watcher.waitForZKConnection(2 * 1000);
+      }
     } catch (InterruptedException e) {
     } catch (InterruptedException e) {
       client = null;
       client = null;
-      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+      log.warn("Interrupted - unable to connect to zookeeper quorum {}",
+          registryQuorum, e);
     } catch (IOException e) {
     } catch (IOException e) {
       log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
       log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
     }
     }

+ 30 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java

@@ -33,6 +33,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
 
 
@@ -65,6 +67,8 @@ public class ZKIntegration implements Watcher, Closeable {
   private final String clustername;
   private final String clustername;
   private final String userPath;
   private final String userPath;
   private int sessionTimeout = SESSION_TIMEOUT;
   private int sessionTimeout = SESSION_TIMEOUT;
+  private static final Map<String, ZooKeeper> ZK_SESSIONS = new HashMap<>();
+
 /**
 /**
  flag to set to indicate that the user path should be created if
  flag to set to indicate that the user path should be created if
  it is not already there
  it is not already there
@@ -93,10 +97,32 @@ public class ZKIntegration implements Watcher, Closeable {
     this.userPath = mkSliderUserPath(username);
     this.userPath = mkSliderUserPath(username);
   }
   }
 
 
-  public void init() throws IOException {
-    assert zookeeper == null;
-    log.debug("Binding ZK client to {}", zkConnection);
-    zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly);
+  /**
+   * Returns true only if an active ZK session is available and retrieved from
+   * cache, false when it has to create a new one.
+   *
+   * @return true if from cache, false when new session created
+   * @throws IOException
+   */
+  public synchronized boolean init() throws IOException {
+    if (zookeeper != null && getAlive()) {
+      return true;
+    }
+
+    synchronized (ZK_SESSIONS) {
+      if (ZK_SESSIONS.containsKey(zkConnection)) {
+        zookeeper = ZK_SESSIONS.get(zkConnection);
+      }
+      if (zookeeper == null || !getAlive()) {
+        log.info("Binding ZK client to {}", zkConnection);
+        zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this,
+            canBeReadOnly);
+        ZK_SESSIONS.put(zkConnection, zookeeper);
+        return false;
+      } else {
+        return true;
+      }
+    }
   }
   }
 
 
   /**
   /**