瀏覽代碼

Merge 1524829 from trunk to branch-2 for YARN-353. Add Zookeeper-based store implementation for RMStateStore. Contributed by Bikas Saha, Jian He and Karthik Kambatla.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1524831 13f79535-47bb-0310-9956-ffa450edef68
Hitesh Shah 11 年之前
父節點
當前提交
2f97dced61
共有 10 個文件被更改,包括 1016 次插入27 次删除
  1. 8 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java
  2. 5 0
      hadoop-yarn-project/CHANGES.txt
  3. 28 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 45 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  5. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
  6. 0 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java
  7. 11 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java
  8. 621 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java
  9. 70 17
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java
  10. 218 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java

+ 8 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ClientBaseWithFixes.java

@@ -53,6 +53,8 @@ import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Copy-paste of ClientBase from ZooKeeper, but without any of the
  * JMXEnv verification. There seems to be a bug ZOOKEEPER-1438
@@ -111,7 +113,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
         synchronized boolean isConnected() {
             return connected;
         }
-        synchronized void waitForConnected(long timeout) throws InterruptedException, TimeoutException {
+        @VisibleForTesting
+        public synchronized void waitForConnected(long timeout)
+            throws InterruptedException, TimeoutException {
             long expire = Time.now() + timeout;
             long left = timeout;
             while(!connected && left > 0) {
@@ -123,7 +127,9 @@ public abstract class ClientBaseWithFixes extends ZKTestCase {
 
             }
         }
-        synchronized void waitForDisconnected(long timeout) throws InterruptedException, TimeoutException {
+        @VisibleForTesting
+        public synchronized void waitForDisconnected(long timeout)
+            throws InterruptedException, TimeoutException {
             long expire = Time.now() + timeout;
             long left = timeout;
             while(connected && left > 0) {

+ 5 - 0
hadoop-yarn-project/CHANGES.txt

@@ -12,10 +12,15 @@ Release 2.3.0 - UNRELEASED
   IMPROVEMENTS
 
     YARN-905. Add state filters to nodes CLI (Wei Yan via Sandy Ryza)
+
     YARN-1098. Separate out RM services into Always On and Active (Karthik
     Kambatla via bikas)
+
     YARN-1027. Implement RMHAProtocolService (Karthik Kambatla via bikas)
 
+    YARN-353. Add Zookeeper-based store implementation for RMStateStore.
+    (Bikas Saha, Jian He and Karthik Kambatla via hitesh)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 28 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -276,12 +276,40 @@ public class YarnConfiguration extends Configuration {
   public static final String RM_HA_ENABLED = RM_HA_PREFIX + "enabled";
   public static final boolean DEFAULT_RM_HA_ENABLED = false;
   
+
+  ////////////////////////////////
+  // RM state store configs
+  ////////////////////////////////
   /** The class to use as the persistent store.*/
   public static final String RM_STORE = RM_PREFIX + "store.class";
   
   /** URI for FileSystemRMStateStore */
   public static final String FS_RM_STATE_STORE_URI =
                                            RM_PREFIX + "fs.state-store.uri";
+  /**
+   * Comma separated host:port pairs, each corresponding to a ZK server for
+   * ZKRMStateStore
+   */
+  public static final String ZK_STATE_STORE_PREFIX =
+      RM_PREFIX + "zk.state-store.";
+  public static final String ZK_RM_STATE_STORE_NUM_RETRIES =
+      ZK_STATE_STORE_PREFIX + "num-retries";
+  public static final int DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES = 3;
+  public static final String ZK_RM_STATE_STORE_ADDRESS =
+      ZK_STATE_STORE_PREFIX + "address";
+  /** Timeout in millisec for ZK server connection for ZKRMStateStore */
+  public static final String ZK_RM_STATE_STORE_TIMEOUT_MS =
+      ZK_STATE_STORE_PREFIX + "timeout.ms";
+  public static final int DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS = 60000;
+  /** Parent znode path under which ZKRMStateStore will create znodes */
+  public static final String ZK_RM_STATE_STORE_PARENT_PATH =
+      ZK_STATE_STORE_PREFIX + "parent-path";
+  public static final String DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH = "/rmstore";
+  /** ACL for znodes in ZKRMStateStore */
+  public static final String ZK_RM_STATE_STORE_ACL =
+      ZK_STATE_STORE_PREFIX + "acl";
+  public static final String DEFAULT_ZK_RM_STATE_STORE_ACL =
+      "world:anyone:rwcda";
 
   /** The maximum number of completed applications RM keeps. */ 
   public static final String RM_MAX_COMPLETED_APPLICATIONS =

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -258,6 +258,51 @@
     <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.FileSystemRMStateStore</value>
   </property>
 
+  <property>
+    <description>Host:Port of the ZooKeeper server where RM state will 
+    be stored. This must be supplied when using
+    org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.zk.state-store.address</name>
+    <!--value>127.0.0.1:2181</value-->
+  </property>
+
+  <property>
+    <description>Number of times ZKRMStateStore tries to connect to
+    ZooKeeper. This may be supplied when using
+    org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.zk.state-store.num-retries</name>
+    <value>3</value>
+  </property>
+
+  <property>
+    <description>Full path of the ZooKeeper znode where RM state will be
+    stored. This must be supplied when using
+    org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.zk.state-store.parent-path</name>
+    <value>/rmstore</value>
+  </property>
+
+  <property>
+    <description>Timeout when connecting to ZooKeeper.
+    This may be supplied when using
+    org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.zk.state-store.timeout.ms</name>
+    <value>60000</value>
+  </property>
+
+  <property>
+    <description>ACL's to be used for ZooKeeper znodes.
+    This may be supplied when using
+    org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore
+    as the value for yarn.resourcemanager.store.class</description>
+    <name>yarn.resourcemanager.zk.state-store.acl</name>
+    <value>world:anyone:rwcda</value>
+  </property>
+
   <property>
     <description>URI pointing to the location of the FileSystem path where
     RM state will be stored. This must be supplied when using

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml

@@ -41,6 +41,16 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-hdfs</artifactId>

+ 0 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/FileSystemRMStateStore.java

@@ -63,12 +63,6 @@ public class FileSystemRMStateStore extends RMStateStore {
   public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
 
   private static final String ROOT_DIR_NAME = "FSRMStateRoot";
-  private static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
-  private static final String RM_APP_ROOT = "RMAppRoot";
-  private static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
-  private static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
-  private static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
-      "RMDTSequenceNumber_";
 
   protected FileSystem fs;
 

+ 11 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStore.java

@@ -65,6 +65,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
  */
 public abstract class RMStateStore extends AbstractService {
 
+  // constants for RM App state and RMDTSecretManagerState.
+  protected static final String RM_APP_ROOT = "RMAppRoot";
+  protected static final String RM_DT_SECRET_MANAGER_ROOT = "RMDTSecretManagerRoot";
+  protected static final String DELEGATION_KEY_PREFIX = "DelegationKey_";
+  protected static final String DELEGATION_TOKEN_PREFIX = "RMDelegationToken_";
+  protected static final String DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX =
+      "RMDTSequenceNumber_";
+
   public static final Log LOG = LogFactory.getLog(RMStateStore.class);
 
   public RMStateStore() {
@@ -464,8 +472,9 @@ public abstract class RMStateStore extends AbstractService {
               (ApplicationAttemptStateDataPBImpl) ApplicationAttemptStateDataPBImpl
                   .newApplicationAttemptStateData(attemptState.getAttemptId(),
                     attemptState.getMasterContainer(), appAttemptTokens);
-
-            LOG.info("Storing info for attempt: " + attemptState.getAttemptId());
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Storing info for attempt: " + attemptState.getAttemptId());
+            }
             storeApplicationAttemptState(attemptState.getAttemptId().toString(), 
                                          attemptStateData);
           } catch (Exception e) {

+ 621 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/ZKRMStateStore.java

@@ -0,0 +1,621 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
+import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.ZKUtil;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.Code;
+import org.apache.zookeeper.Op;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.Watcher.Event;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Stat;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+@Private
+@Unstable
+public class ZKRMStateStore extends RMStateStore {
+
+  public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
+
+  private static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
+  private int numRetries;
+
+  private String zkHostPort = null;
+  private int zkSessionTimeout;
+  private List<ACL> zkAcl;
+  private String zkRootNodePath;
+  private String rmDTSecretManagerRoot;
+  private String rmAppRoot;
+  private String dtSequenceNumberPath = null;
+
+  @VisibleForTesting
+  protected String znodeWorkingPath;
+
+  @VisibleForTesting
+  protected ZooKeeper zkClient;
+  private ZooKeeper oldZkClient;
+
+  @Override
+  public synchronized void initInternal(Configuration conf) throws Exception {
+    zkHostPort = conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS);
+    if (zkHostPort == null) {
+      throw new YarnRuntimeException("No server address specified for " +
+          "zookeeper state store for Resource Manager recovery. " +
+          YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS + " is not configured.");
+    }
+    numRetries =
+        conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_NUM_RETRIES,
+            YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_NUM_RETRIES);
+    znodeWorkingPath =
+        conf.get(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH,
+            YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH);
+    zkSessionTimeout =
+        conf.getInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS,
+            YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_TIMEOUT_MS);
+    // Parse authentication from configuration.
+    String zkAclConf =
+        conf.get(YarnConfiguration.ZK_RM_STATE_STORE_ACL,
+            YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_ACL);
+    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
+
+    try {
+      zkAcl = ZKUtil.parseACLs(zkAclConf);
+    } catch (ZKUtil.BadAclFormatException bafe) {
+      LOG.error("Invalid format for " + YarnConfiguration.ZK_RM_STATE_STORE_ACL);
+      throw bafe;
+    }
+
+    zkRootNodePath = znodeWorkingPath + "/" + ROOT_ZNODE_NAME;
+    rmDTSecretManagerRoot = zkRootNodePath + "/" + RM_DT_SECRET_MANAGER_ROOT;
+    rmAppRoot = zkRootNodePath + "/" + RM_APP_ROOT;
+  }
+
+  @Override
+  public synchronized void startInternal() throws Exception {
+    // createConnection for future API calls
+    createConnection();
+
+    // ensure root dirs exist
+    createRootDir(znodeWorkingPath);
+    createRootDir(zkRootNodePath);
+    createRootDir(rmDTSecretManagerRoot);
+    createRootDir(rmAppRoot);
+  }
+
+  private void createRootDir(String rootPath) throws Exception {
+    try {
+      createWithRetries(rootPath, null, zkAcl, CreateMode.PERSISTENT);
+    } catch (KeeperException ke) {
+      if (ke.code() != Code.NODEEXISTS) {
+        throw ke;
+      }
+    }
+  }
+
+  private synchronized void closeZkClients() throws IOException {
+    if (zkClient != null) {
+      try {
+        zkClient.close();
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while closing ZK", e);
+      }
+      zkClient = null;
+    }
+    if (oldZkClient != null) {
+      try {
+        oldZkClient.close();
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while closing old ZK", e);
+      }
+      oldZkClient = null;
+    }
+  }
+
+  @Override
+  protected synchronized void closeInternal() throws Exception {
+    closeZkClients();
+  }
+
+  @Override
+  public synchronized RMState loadState() throws Exception {
+    RMState rmState = new RMState();
+    // recover DelegationTokenSecretManager
+    loadRMDTSecretManagerState(rmState);
+    // recover RM applications
+    loadRMAppState(rmState);
+    return rmState;
+  }
+
+  private synchronized void loadRMDTSecretManagerState(RMState rmState)
+      throws Exception {
+    List<String> childNodes = zkClient.getChildren(rmDTSecretManagerRoot, true);
+
+    for (String childNodeName : childNodes) {
+      if (childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
+        rmState.rmSecretManagerState.dtSequenceNumber =
+            Integer.parseInt(childNodeName.split("_")[1]);
+        continue;
+      }
+      String childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+
+      ByteArrayInputStream is = new ByteArrayInputStream(childData);
+      DataInputStream fsIn = new DataInputStream(is);
+      try {
+        if (childNodeName.startsWith(DELEGATION_KEY_PREFIX)) {
+          DelegationKey key = new DelegationKey();
+          key.readFields(fsIn);
+          rmState.rmSecretManagerState.masterKeyState.add(key);
+        } else if (childNodeName.startsWith(DELEGATION_TOKEN_PREFIX)) {
+          RMDelegationTokenIdentifier identifier =
+              new RMDelegationTokenIdentifier();
+          identifier.readFields(fsIn);
+          long renewDate = fsIn.readLong();
+          rmState.rmSecretManagerState.delegationTokenState.put(identifier,
+              renewDate);
+        }
+      } finally {
+        is.close();
+      }
+    }
+  }
+
+  private synchronized void loadRMAppState(RMState rmState) throws Exception {
+    List<String> childNodes = zkClient.getChildren(rmAppRoot, true);
+    List<ApplicationAttemptState> attempts =
+        new ArrayList<ApplicationAttemptState>();
+    for (String childNodeName : childNodes) {
+      String childNodePath = getNodePath(rmAppRoot, childNodeName);
+      byte[] childData = getDataWithRetries(childNodePath, true);
+      if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
+        // application
+        LOG.info("Loading application from znode: " + childNodeName);
+        ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
+        ApplicationStateDataPBImpl appStateData =
+            new ApplicationStateDataPBImpl(
+                ApplicationStateDataProto.parseFrom(childData));
+        ApplicationState appState =
+            new ApplicationState(appStateData.getSubmitTime(),
+                appStateData.getApplicationSubmissionContext(),
+                appStateData.getUser());
+        if (!appId.equals(appState.context.getApplicationId())) {
+          throw new YarnRuntimeException("The child node name is different " +
+              "from the application id");
+        }
+        rmState.appState.put(appId, appState);
+      } else if (childNodeName
+          .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
+        // attempt
+        LOG.info("Loading application attempt from znode: " + childNodeName);
+        ApplicationAttemptId attemptId =
+            ConverterUtils.toApplicationAttemptId(childNodeName);
+        ApplicationAttemptStateDataPBImpl attemptStateData =
+            new ApplicationAttemptStateDataPBImpl(
+                ApplicationAttemptStateDataProto.parseFrom(childData));
+        Credentials credentials = null;
+        if (attemptStateData.getAppAttemptTokens() != null) {
+          credentials = new Credentials();
+          DataInputByteBuffer dibb = new DataInputByteBuffer();
+          dibb.reset(attemptStateData.getAppAttemptTokens());
+          credentials.readTokenStorageStream(dibb);
+        }
+        ApplicationAttemptState attemptState =
+            new ApplicationAttemptState(attemptId,
+                attemptStateData.getMasterContainer(), credentials);
+        if (!attemptId.equals(attemptState.getAttemptId())) {
+          throw new YarnRuntimeException("The child node name is different " +
+              "from the application attempt id");
+        }
+        attempts.add(attemptState);
+      } else {
+        LOG.info("Unknown child node with name: " + childNodeName);
+      }
+    }
+
+    // go through all attempts and add them to their apps
+    for (ApplicationAttemptState attemptState : attempts) {
+      ApplicationId appId = attemptState.getAttemptId().getApplicationId();
+      ApplicationState appState = rmState.appState.get(appId);
+      if (appState != null) {
+        appState.attempts.put(attemptState.getAttemptId(), attemptState);
+      } else {
+        // the application znode may have been removed when the application
+        // completed but the RM might have stopped before it could remove the
+        // application attempt znodes
+        LOG.info("Application node not found for attempt: "
+            + attemptState.getAttemptId());
+        deleteWithRetries(
+            getNodePath(rmAppRoot, attemptState.getAttemptId().toString()),
+            0);
+      }
+    }
+  }
+
+  @Override
+  public synchronized void storeApplicationState(
+      String appId, ApplicationStateDataPBImpl appStateDataPB) throws
+      Exception {
+    String nodeCreatePath = getNodePath(rmAppRoot, appId);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing info for app: " + appId + " at: " + nodeCreatePath);
+    }
+    byte[] appStateData = appStateDataPB.getProto().toByteArray();
+    createWithRetries(
+        nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public synchronized void storeApplicationAttemptState(
+      String attemptId, ApplicationAttemptStateDataPBImpl attemptStateDataPB)
+      throws Exception {
+    String nodeCreatePath = getNodePath(rmAppRoot, attemptId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing info for attempt: " + attemptId + " at: "
+          + nodeCreatePath);
+    }
+    byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
+    createWithRetries(nodeCreatePath, attemptStateData, zkAcl,
+        CreateMode.PERSISTENT);
+  }
+
+  @Override
+  public synchronized void removeApplicationState(ApplicationState appState)
+      throws Exception {
+    String appId = appState.getAppId().toString();
+    String nodeRemovePath = getNodePath(rmAppRoot, appId);
+    ArrayList<Op> opList = new ArrayList<Op>();
+    opList.add(Op.delete(nodeRemovePath, 0));
+
+    for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
+      String attemptRemovePath = getNodePath(rmAppRoot, attemptId.toString());
+      opList.add(Op.delete(attemptRemovePath, 0));
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing info for app: " + appId + " at: " + nodeRemovePath
+          + " and its attempts.");
+    }
+    doMultiWithRetries(opList);
+  }
+
+  @Override
+  protected synchronized void storeRMDelegationTokenAndSequenceNumberState(
+      RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate,
+      int latestSequenceNumber) throws Exception {
+    ArrayList<Op> opList = new ArrayList<Op>();
+    // store RM delegation token
+    String nodeCreatePath =
+        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+            + rmDTIdentifier.getSequenceNumber());
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream fsOut = new DataOutputStream(os);
+    try {
+      rmDTIdentifier.write(fsOut);
+      fsOut.writeLong(renewDate);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Storing RMDelegationToken_" +
+            rmDTIdentifier.getSequenceNumber());
+      }
+      opList.add(Op.create(nodeCreatePath, os.toByteArray(), zkAcl,
+          CreateMode.PERSISTENT));
+    } finally {
+      os.close();
+    }
+
+    // store sequence number
+    String latestSequenceNumberPath =
+        getNodePath(rmDTSecretManagerRoot,
+            DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX + latestSequenceNumber);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing " + DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX +
+          latestSequenceNumber);
+    }
+
+    if (dtSequenceNumberPath != null) {
+      opList.add(Op.delete(dtSequenceNumberPath, 0));
+    }
+    opList.add(Op.create(latestSequenceNumberPath, null, zkAcl,
+        CreateMode.PERSISTENT));
+    dtSequenceNumberPath = latestSequenceNumberPath;
+    doMultiWithRetries(opList);
+  }
+
+  @Override
+  protected synchronized void removeRMDelegationTokenState(
+      RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
+    String nodeRemovePath =
+        getNodePath(rmDTSecretManagerRoot, DELEGATION_TOKEN_PREFIX
+            + rmDTIdentifier.getSequenceNumber());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing RMDelegationToken_"
+          + rmDTIdentifier.getSequenceNumber());
+    }
+    deleteWithRetries(nodeRemovePath, 0);
+  }
+
+  @Override
+  protected synchronized void storeRMDTMasterKeyState(
+      DelegationKey delegationKey) throws Exception {
+    String nodeCreatePath =
+        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+            + delegationKey.getKeyId());
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream fsOut = new DataOutputStream(os);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing RMDelegationKey_" + delegationKey.getKeyId());
+    }
+    delegationKey.write(fsOut);
+    try {
+      createWithRetries(nodeCreatePath, os.toByteArray(), zkAcl,
+          CreateMode.PERSISTENT);
+    } finally {
+      os.close();
+    }
+  }
+
+  @Override
+  protected synchronized void removeRMDTMasterKeyState(
+      DelegationKey delegationKey) throws Exception {
+    String nodeRemovePath =
+        getNodePath(rmDTSecretManagerRoot, DELEGATION_KEY_PREFIX
+            + delegationKey.getKeyId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
+    }
+    deleteWithRetries(nodeRemovePath, 0);
+  }
+
+  // ZK related code
+  /**
+   * Watcher implementation which forward events to the ZKRMStateStore This
+   * hides the ZK methods of the store from its public interface
+   */
+  private final class ForwardingWatcher implements Watcher {
+
+    @Override
+    public void process(WatchedEvent event) {
+      try {
+        ZKRMStateStore.this.processWatchEvent(event);
+      } catch (Throwable t) {
+        LOG.error("Failed to process watcher event " + event + ": "
+            + StringUtils.stringifyException(t));
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public synchronized void processWatchEvent(WatchedEvent event)
+      throws Exception {
+    Event.EventType eventType = event.getType();
+    LOG.info("Watcher event type: " + eventType + " with state:"
+        + event.getState() + " for path:" + event.getPath() + " for " + this);
+
+    if (eventType == Event.EventType.None) {
+
+      // the connection state has changed
+      switch (event.getState()) {
+        case SyncConnected:
+          LOG.info("ZKRMStateStore Session connected");
+          if (oldZkClient != null) {
+            // the SyncConnected must be from the client that sent Disconnected
+            zkClient = oldZkClient;
+            oldZkClient = null;
+            ZKRMStateStore.this.notifyAll();
+            LOG.info("ZKRMStateStore Session restored");
+          }
+          break;
+        case Disconnected:
+          LOG.info("ZKRMStateStore Session disconnected");
+          oldZkClient = zkClient;
+          zkClient = null;
+          break;
+        case Expired:
+          // the connection got terminated because of session timeout
+          // call listener to reconnect
+          LOG.info("Session expired");
+          createConnection();
+          break;
+        default:
+          LOG.error("Unexpected Zookeeper" +
+              " watch event state: " + event.getState());
+          break;
+      }
+    }
+  }
+
+  @VisibleForTesting
+  String getNodePath(String root, String nodeName) {
+    return (root + "/" + nodeName);
+  }
+
+  @VisibleForTesting
+  public String createWithRetries(
+      final String path, final byte[] data, final List<ACL> acl,
+      final CreateMode mode) throws Exception {
+    return new ZKAction<String>() {
+      @Override
+      public String run() throws KeeperException, InterruptedException {
+        return zkClient.create(path, data, acl, mode);
+      }
+    }.runWithRetries();
+  }
+
+  private void deleteWithRetries(final String path, final int version)
+      throws Exception {
+    new ZKAction<Void>() {
+      @Override
+      public Void run() throws KeeperException, InterruptedException {
+        /**
+         * Call exists() to leave a watch on the node denoted by path.
+         * Delete node if exists. To pass the existence information to the
+         * caller, call delete irrespective of whether node exists or not.
+         */
+        if (zkClient.exists(path, true) == null) {
+          LOG.error("Trying to delete a path (" + path
+              + ") that doesn't exist.");
+        }
+        zkClient.delete(path, version);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  private void doMultiWithRetries(final ArrayList<Op> opList) throws Exception {
+    new ZKAction<Void>() {
+      @Override
+      public Void run() throws KeeperException, InterruptedException {
+        zkClient.multi(opList);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  @VisibleForTesting
+  public void setDataWithRetries(final String path, final byte[] data,
+                                 final int version) throws Exception {
+    new ZKAction<Void>() {
+      @Override
+      public Void run() throws KeeperException, InterruptedException {
+        zkClient.setData(path, data, version);
+        return null;
+      }
+    }.runWithRetries();
+  }
+
+  @VisibleForTesting
+  public byte[] getDataWithRetries(final String path, final boolean watch)
+      throws Exception {
+    return new ZKAction<byte[]>() {
+      @Override
+      public byte[] run() throws KeeperException, InterruptedException {
+        Stat stat = new Stat();
+        return zkClient.getData(path, watch, stat);
+      }
+    }.runWithRetries();
+  }
+
+  private abstract class ZKAction<T> {
+    // run() expects synchronization on ZKRMStateStore.this
+    abstract T run() throws KeeperException, InterruptedException;
+
+    T runWithCheck() throws Exception {
+      long startTime = System.currentTimeMillis();
+      synchronized (ZKRMStateStore.this) {
+        while (zkClient == null) {
+          ZKRMStateStore.this.wait(zkSessionTimeout);
+          if (zkClient != null) {
+            break;
+          }
+          if (System.currentTimeMillis() - startTime > zkSessionTimeout) {
+            throw new IOException("Wait for ZKClient creation timed out");
+          }
+        }
+        return run();
+      }
+    }
+
+    T runWithRetries() throws Exception {
+      int retry = 0;
+      while (true) {
+        try {
+          return runWithCheck();
+        } catch (KeeperException ke) {
+          if (shouldRetry(ke.code()) && ++retry < numRetries) {
+            continue;
+          }
+          throw ke;
+        }
+      }
+    }
+  }
+
+  private static boolean shouldRetry(Code code) {
+    switch (code) {
+      case CONNECTIONLOSS:
+      case OPERATIONTIMEOUT:
+        return true;
+      default:
+        break;
+    }
+    return false;
+  }
+
+  private synchronized void createConnection()
+      throws IOException, InterruptedException {
+    closeZkClients();
+    for (int retries = 0; retries < numRetries && zkClient == null;
+        retries++) {
+      try {
+        zkClient = getNewZooKeeper();
+      } catch (IOException ioe) {
+        // Retry in case of network failures
+        LOG.info("Failed to connect to the ZooKeeper on attempt - " +
+            (retries + 1));
+        ioe.printStackTrace();
+      }
+    }
+    if (zkClient == null) {
+      LOG.error("Unable to connect to Zookeeper");
+      throw new YarnRuntimeException("Unable to connect to Zookeeper");
+    }
+    ZKRMStateStore.this.notifyAll();
+    LOG.info("Created new ZK connection");
+  }
+
+  // protected to mock for testing
+  @VisibleForTesting
+  protected synchronized ZooKeeper getNewZooKeeper()
+      throws IOException, InterruptedException {
+    ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
+    zk.register(new ForwardingWatcher());
+    return zk;
+  }
+}

+ 70 - 17
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestRMStateStore.java

@@ -26,8 +26,10 @@ import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.io.IOException;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 
 import javax.crypto.SecretKey;
@@ -40,6 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.io.Text;
@@ -67,13 +70,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAt
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import org.apache.zookeeper.ZooKeeper;
+
 import org.junit.Test;
 
-public class TestRMStateStore {
+public class TestRMStateStore extends ClientBaseWithFixes{
 
   public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
 
-  class TestDispatcher implements Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
+  static class TestDispatcher implements
+      Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
 
     ApplicationAttemptId attemptId;
     Exception storedException;
@@ -82,7 +89,8 @@ public class TestRMStateStore {
 
     @SuppressWarnings("rawtypes")
     @Override
-    public void register(Class<? extends Enum> eventType, EventHandler handler) {
+    public void register(Class<? extends Enum> eventType,
+                         EventHandler handler) {
     }
 
     @Override
@@ -108,10 +116,18 @@ public class TestRMStateStore {
     boolean isFinalStateValid() throws Exception;
   }
 
+  @Test
+  public void testZKRMStateStoreRealZK() throws Exception {
+    TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
+    testRMAppStateStore(zkTester);
+    testRMDTSecretManagerStateStore(zkTester);
+  }
+
   @Test
   public void testFSRMStateStore() throws Exception {
     HdfsConfiguration conf = new HdfsConfiguration();
-    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+    MiniDFSCluster cluster =
+        new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
     try {
       TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
       testRMAppStateStore(fsTester);
@@ -121,6 +137,41 @@ public class TestRMStateStore {
     }
   }
 
+  class TestZKRMStateStoreTester implements RMStateStoreHelper {
+    ZooKeeper client;
+    ZKRMStateStore store;
+
+    class TestZKRMStateStore extends ZKRMStateStore {
+      public TestZKRMStateStore(Configuration conf, String workingZnode)
+          throws Exception {
+        init(conf);
+        start();
+        assertTrue(znodeWorkingPath.equals(workingZnode));
+      }
+
+      @Override
+      public ZooKeeper getNewZooKeeper() throws IOException {
+        return client;
+      }
+    }
+
+    public RMStateStore getRMStateStore() throws Exception {
+      String workingZnode = "/Test";
+      YarnConfiguration conf = new YarnConfiguration();
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+      this.client = createClient();
+      this.store = new TestZKRMStateStore(conf, workingZnode);
+      return this.store;
+    }
+
+    @Override
+    public boolean isFinalStateValid() throws Exception {
+      List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
+      return nodes.size() == 1;
+    }
+  }
+
   class TestFSRMStateStoreTester implements RMStateStoreHelper {
     Path workingDirPathURI;
     FileSystemRMStateStore store;
@@ -149,7 +200,8 @@ public class TestRMStateStore {
     @Override
     public RMStateStore getRMStateStore() throws Exception {
       YarnConfiguration conf = new YarnConfiguration();
-      conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI, workingDirPathURI.toString());
+      conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
+          workingDirPathURI.toString());
       this.store = new TestFileSystemRMStore(conf);
       return store;
     }
@@ -158,11 +210,7 @@ public class TestRMStateStore {
     public boolean isFinalStateValid() throws Exception {
       FileSystem fs = cluster.getFileSystem();
       FileStatus[] files = fs.listStatus(workingDirPathURI);
-      if(files.length == 1) {
-        // only store root directory should exist
-        return true;
-      }
-      return false;
+      return files.length == 1;
     }
   }
 
@@ -183,9 +231,10 @@ public class TestRMStateStore {
     dispatcher.notified = false;
   }
 
-  void storeApp(RMStateStore store, ApplicationId appId, long time)
-                                                              throws Exception {
-    ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
+  void storeApp(
+      RMStateStore store, ApplicationId appId, long time) throws Exception {
+    ApplicationSubmissionContext context =
+        new ApplicationSubmissionContextPBImpl();
     context.setApplicationId(appId);
 
     RMApp mockApp = mock(RMApp.class);
@@ -216,7 +265,8 @@ public class TestRMStateStore {
     return container.getId();
   }
 
-  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper) throws Exception {
+  void testRMAppStateStore(RMStateStoreHelper stateStoreHelper)
+      throws Exception {
     long submitTime = System.currentTimeMillis();
     Configuration conf = new YarnConfiguration();
     RMStateStore store = stateStoreHelper.getRMStateStore();
@@ -271,7 +321,8 @@ public class TestRMStateStore {
     RMApp mockRemovedApp = mock(RMApp.class);
     HashMap<ApplicationAttemptId, RMAppAttempt> attempts =
                               new HashMap<ApplicationAttemptId, RMAppAttempt>();
-    ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl();
+    ApplicationSubmissionContext context =
+        new ApplicationSubmissionContextPBImpl();
     context.setApplicationId(appIdRemoved);
     when(mockRemovedApp.getSubmitTime()).thenReturn(submitTime);
     when(mockRemovedApp.getApplicationSubmissionContext()).thenReturn(context);
@@ -288,7 +339,8 @@ public class TestRMStateStore {
     // load state
     store = stateStoreHelper.getRMStateStore();
     RMState state = store.loadState();
-    Map<ApplicationId, ApplicationState> rmAppState = state.getApplicationState();
+    Map<ApplicationId, ApplicationState> rmAppState =
+        state.getApplicationState();
 
     ApplicationState appState = rmAppState.get(appId1);
     // app is loaded
@@ -362,7 +414,8 @@ public class TestRMStateStore {
         store.loadState().getRMDTSecretManagerState();
     Assert.assertEquals(token1, secretManagerState.getTokenState());
     Assert.assertEquals(keySet, secretManagerState.getMasterKeyState());
-    Assert.assertEquals(sequenceNumber, secretManagerState.getDTSequenceNumber());
+    Assert.assertEquals(sequenceNumber,
+        secretManagerState.getDTSequenceNumber());
   }
 
   private Token<AMRMTokenIdentifier> generateAMRMToken(

+ 218 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStoreZKClientConnections.java

@@ -0,0 +1,218 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.recovery;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ha.ClientBaseWithFixes;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
+import org.apache.hadoop.util.ZKUtil;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CyclicBarrier;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestZKRMStateStoreZKClientConnections extends
+    ClientBaseWithFixes {
+  private static final int ZK_OP_WAIT_TIME = 3000;
+  private Log LOG =
+      LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
+
+  class TestZKClient {
+    ZKRMStateStore store;
+    boolean forExpire = false;
+    TestForwardingWatcher watcher;
+    CyclicBarrier syncBarrier = new CyclicBarrier(2);
+
+    protected class TestZKRMStateStore extends ZKRMStateStore {
+      public TestZKRMStateStore(Configuration conf, String workingZnode)
+          throws Exception {
+        init(conf);
+        start();
+        assertTrue(znodeWorkingPath.equals(workingZnode));
+      }
+
+      @Override
+      public ZooKeeper getNewZooKeeper()
+          throws IOException, InterruptedException {
+        return createClient(watcher, hostPort, 100);
+      }
+
+      @Override
+      public synchronized void processWatchEvent(WatchedEvent event)
+          throws Exception {
+
+        if (forExpire) {
+          // a hack... couldn't find a way to trigger expired event.
+          WatchedEvent expriredEvent = new WatchedEvent(
+              Watcher.Event.EventType.None,
+              Watcher.Event.KeeperState.Expired, null);
+          super.processWatchEvent(expriredEvent);
+          forExpire = false;
+          syncBarrier.await();
+        } else {
+          super.processWatchEvent(event);
+        }
+      }
+    }
+
+    private class TestForwardingWatcher extends
+        ClientBaseWithFixes.CountdownWatcher {
+      public void process(WatchedEvent event) {
+        super.process(event);
+        try {
+          if (store != null) {
+            store.processWatchEvent(event);
+          }
+        } catch (Throwable t) {
+          LOG.error("Failed to process watcher event " + event + ": "
+              + StringUtils.stringifyException(t));
+        }
+      }
+    }
+
+    public RMStateStore getRMStateStore(Configuration conf) throws Exception {
+      String workingZnode = "/Test";
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
+      conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
+      watcher = new TestForwardingWatcher();
+      this.store = new TestZKRMStateStore(conf, workingZnode);
+      return this.store;
+    }
+  }
+
+  @Test(timeout = 20000)
+  public void testZKClientDisconnectAndReconnect()
+      throws Exception {
+
+    TestZKClient zkClientTester = new TestZKClient();
+    String path = "/test";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+    ZKRMStateStore store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    // trigger watch
+    store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+    store.getDataWithRetries(path, true);
+    store.setDataWithRetries(path, "newBytes".getBytes(), 0);
+
+    stopServer();
+    zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
+    try {
+      store.getDataWithRetries(path, true);
+      fail("Expected ZKClient time out exception");
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains(
+          "Wait for ZKClient creation timed out"));
+    }
+
+    // ZKRMStateStore Session restored
+    startServer();
+    zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
+    byte[] ret = null;
+    try {
+      ret = store.getDataWithRetries(path, true);
+    } catch (Exception e) {
+      String error = "ZKRMStateStore Session restore failed";
+      LOG.error(error, e);
+      fail(error);
+    }
+    Assert.assertEquals("newBytes", new String(ret));
+  }
+
+  @Test(timeout = 20000)
+  public void testZKSessionTimeout() throws Exception {
+
+    TestZKClient zkClientTester = new TestZKClient();
+    String path = "/test";
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.setInt(YarnConfiguration.ZK_RM_STATE_STORE_TIMEOUT_MS, 100);
+    ZKRMStateStore store =
+        (ZKRMStateStore) zkClientTester.getRMStateStore(conf);
+    TestDispatcher dispatcher = new TestDispatcher();
+    store.setRMDispatcher(dispatcher);
+
+    // a hack to trigger expired event
+    zkClientTester.forExpire = true;
+
+    // trigger watch
+    store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
+        CreateMode.PERSISTENT);
+    store.getDataWithRetries(path, true);
+    store.setDataWithRetries(path, "bytes".getBytes(), 0);
+
+    zkClientTester.syncBarrier.await();
+    // after this point, expired event has already been processed.
+
+    try {
+      byte[] ret = store.getDataWithRetries(path, false);
+      Assert.assertEquals("bytes", new String(ret));
+    } catch (Exception e) {
+      String error = "New session creation failed";
+      LOG.error(error, e);
+      fail(error);
+    }
+  }
+
+  @Test (timeout = 20000)
+  public void testSetZKAcl() {
+    TestZKClient zkClientTester = new TestZKClient();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "world:anyone:rwca");
+    try {
+      zkClientTester.store.zkClient.delete(zkClientTester.store
+          .znodeWorkingPath, -1);
+      fail("Shouldn't be able to delete path");
+    } catch (Exception e) {/* expected behavior */}
+  }
+
+  @Test (timeout = 20000)
+  public void testInvalidZKAclConfiguration() {
+    TestZKClient zkClientTester = new TestZKClient();
+    YarnConfiguration conf = new YarnConfiguration();
+    conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ACL, "randomstring&*");
+    try {
+      zkClientTester.getRMStateStore(conf);
+      fail("ZKRMStateStore created with bad ACL");
+    } catch (ZKUtil.BadAclFormatException bafe) {
+      // expected behavior
+    } catch (Exception e) {
+      String error = "Incorrect exception on BadAclFormat";
+      LOG.error(error, e);
+      fail(error);
+    }
+  }
+}