فهرست منبع

HADOOP-11017. KMS delegation token secret manager should be able to use zookeeper as store. (asuresh via tucu)

Alejandro Abdelnur 10 سال پیش
والد
کامیت
db890eef32
12فایلهای تغییر یافته به همراه1013 افزوده شده و 75 حذف شده
  1. 2 0
      hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java
  2. 3 0
      hadoop-common-project/hadoop-common/CHANGES.txt
  3. 13 0
      hadoop-common-project/hadoop-common/pom.xml
  4. 111 21
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java
  5. 727 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java
  6. 10 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java
  7. 1 25
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
  8. 57 19
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java
  9. 2 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java
  10. 68 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java
  11. 9 8
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java
  12. 10 0
      hadoop-project/pom.xml

+ 2 - 0
hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/util/ZKSignerSecretProvider.java

@@ -197,6 +197,8 @@ public class ZKSignerSecretProvider extends RolloverSignerSecretProvider {
       client = (CuratorFramework) curatorClientObj;
     } else {
       client = createCuratorClient(config);
+      servletContext.setAttribute(
+          ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE, client);
     }
     this.tokenValidity = tokenValidity;
     shouldDisconnect = Boolean.parseBoolean(

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -537,6 +537,9 @@ Release 2.6.0 - UNRELEASED
 
     HADOOP-10970. Cleanup KMS configuration keys. (wang)
 
+    HADOOP-11017. KMS delegation token secret manager should be able to use 
+    zookeeper as store. (asuresh via tucu)
+
   OPTIMIZATIONS
 
     HADOOP-10838. Byte array native checksumming. (James Thomas via todd)

+ 13 - 0
hadoop-common-project/hadoop-common/pom.xml

@@ -218,6 +218,19 @@
       <groupId>com.jcraft</groupId>
       <artifactId>jsch</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-client</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.google.code.findbugs</groupId>
       <artifactId>jsr305</artifactId>

+ 111 - 21
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java

@@ -127,7 +127,7 @@ extends AbstractDelegationTokenIdentifier>
   public synchronized void reset() {
     currentId = 0;
     allKeys.clear();
-    delegationTokenSequenceNumber = 0;
+    setDelegationTokenSeqNum(0);
     currentTokens.clear();
   }
   
@@ -141,7 +141,7 @@ extends AbstractDelegationTokenIdentifier>
     if (key.getKeyId() > currentId) {
       currentId = key.getKeyId();
     }
-    allKeys.put(key.getKeyId(), key);
+    storeDelegationKey(key);
   }
 
   public synchronized DelegationKey[] getAllKeys() {
@@ -163,24 +163,108 @@ extends AbstractDelegationTokenIdentifier>
     return;
   }
 
+  // for ZK based secretManager
+  protected void updateMasterKey(DelegationKey key) throws IOException{
+    return;
+  }
+
   // RM
   protected void removeStoredMasterKey(DelegationKey key) {
     return;
   }
 
   // RM
-  protected void storeNewToken(TokenIdent ident, long renewDate) {
+  protected void storeNewToken(TokenIdent ident, long renewDate) throws IOException{
     return;
   }
+
   // RM
   protected void removeStoredToken(TokenIdent ident) throws IOException {
 
   }
   // RM
-  protected void updateStoredToken(TokenIdent ident, long renewDate) {
+  protected void updateStoredToken(TokenIdent ident, long renewDate) throws IOException {
     return;
   }
 
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected int getDelegationTokenSeqNum() {
+    return delegationTokenSequenceNumber;
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected int incrementDelegationTokenSeqNum() {
+    return ++delegationTokenSequenceNumber;
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected void setDelegationTokenSeqNum(int seqNum) {
+    delegationTokenSequenceNumber = seqNum;
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected DelegationKey getDelegationKey(int keyId) {
+    return allKeys.get(keyId);
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected void storeDelegationKey(DelegationKey key) throws IOException {
+    allKeys.put(key.getKeyId(), key);
+    storeNewMasterKey(key);
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected void updateDelegationKey(DelegationKey key) throws IOException {
+    allKeys.put(key.getKeyId(), key);
+    updateMasterKey(key);
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+    return currentTokens.get(ident);
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected void storeToken(TokenIdent ident,
+      DelegationTokenInformation tokenInfo) throws IOException {
+    currentTokens.put(ident, tokenInfo);
+    storeNewToken(ident, tokenInfo.getRenewDate());
+  }
+
+  /**
+   * For subclasses externalizing the storage, for example Zookeeper
+   * based implementations
+   */
+  protected void updateToken(TokenIdent ident,
+      DelegationTokenInformation tokenInfo) throws IOException {
+    currentTokens.put(ident, tokenInfo);
+    updateStoredToken(ident, tokenInfo.getRenewDate());
+  }
+
   /**
    * This method is intended to be used for recovering persisted delegation
    * tokens
@@ -196,17 +280,18 @@ extends AbstractDelegationTokenIdentifier>
           "Can't add persisted delegation token to a running SecretManager.");
     }
     int keyId = identifier.getMasterKeyId();
-    DelegationKey dKey = allKeys.get(keyId);
+    DelegationKey dKey = getDelegationKey(keyId);
     if (dKey == null) {
       LOG.warn("No KEY found for persisted identifier " + identifier.toString());
       return;
     }
     byte[] password = createPassword(identifier.getBytes(), dKey.getKey());
-    if (identifier.getSequenceNumber() > this.delegationTokenSequenceNumber) {
-      this.delegationTokenSequenceNumber = identifier.getSequenceNumber();
+    int delegationTokenSeqNum = getDelegationTokenSeqNum();
+    if (identifier.getSequenceNumber() > delegationTokenSeqNum) {
+      setDelegationTokenSeqNum(identifier.getSequenceNumber());
     }
-    if (currentTokens.get(identifier) == null) {
-      currentTokens.put(identifier, new DelegationTokenInformation(renewDate,
+    if (getTokenInfo(identifier) == null) {
+      storeToken(identifier, new DelegationTokenInformation(renewDate,
           password, getTrackingIdIfEnabled(identifier)));
     } else {
       throw new IOException("Same delegation token being added twice.");
@@ -234,7 +319,7 @@ extends AbstractDelegationTokenIdentifier>
     synchronized (this) {
       currentId = newKey.getKeyId();
       currentKey = newKey;
-      allKeys.put(currentKey.getKeyId(), currentKey);
+      storeDelegationKey(currentKey);
     }
   }
   
@@ -252,7 +337,7 @@ extends AbstractDelegationTokenIdentifier>
        * updateMasterKey() isn't called at expected interval. Add it back to
        * allKeys just in case.
        */
-      allKeys.put(currentKey.getKeyId(), currentKey);
+      updateDelegationKey(currentKey);
     }
     updateCurrentKey();
   }
@@ -276,19 +361,25 @@ extends AbstractDelegationTokenIdentifier>
   protected synchronized byte[] createPassword(TokenIdent identifier) {
     int sequenceNum;
     long now = Time.now();
-    sequenceNum = ++delegationTokenSequenceNumber;
+    sequenceNum = incrementDelegationTokenSeqNum();
     identifier.setIssueDate(now);
     identifier.setMaxDate(now + tokenMaxLifetime);
     identifier.setMasterKeyId(currentId);
     identifier.setSequenceNumber(sequenceNum);
     LOG.info("Creating password for identifier: " + identifier);
     byte[] password = createPassword(identifier.getBytes(), currentKey.getKey());
-    storeNewToken(identifier, now + tokenRenewInterval);
-    currentTokens.put(identifier, new DelegationTokenInformation(now
-        + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier)));
+    DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+        + tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
+    try {
+      storeToken(identifier, tokenInfo);
+    } catch (IOException ioe) {
+      LOG.error("Could not store token !!", ioe);
+    }
     return password;
   }
   
+
+
   /**
    * Find the DelegationTokenInformation for the given token id, and verify that
    * if the token is expired. Note that this method should be called with 
@@ -297,7 +388,7 @@ extends AbstractDelegationTokenIdentifier>
   protected DelegationTokenInformation checkToken(TokenIdent identifier)
       throws InvalidToken {
     assert Thread.holdsLock(this);
-    DelegationTokenInformation info = currentTokens.get(identifier);
+    DelegationTokenInformation info = getTokenInfo(identifier);
     if (info == null) {
       throw new InvalidToken("token (" + identifier.toString()
           + ") can't be found in cache");
@@ -322,7 +413,7 @@ extends AbstractDelegationTokenIdentifier>
   }
 
   public synchronized String getTokenTrackingId(TokenIdent identifier) {
-    DelegationTokenInformation info = currentTokens.get(identifier);
+    DelegationTokenInformation info = getTokenInfo(identifier);
     if (info == null) {
       return null;
     }
@@ -373,7 +464,7 @@ extends AbstractDelegationTokenIdentifier>
       throw new AccessControlException(renewer +
           " tries to renew a token with renewer " + id.getRenewer());
     }
-    DelegationKey key = allKeys.get(id.getMasterKeyId());
+    DelegationKey key = getDelegationKey(id.getMasterKeyId());
     if (key == null) {
       throw new InvalidToken("Unable to find master key for keyId="
           + id.getMasterKeyId()
@@ -390,11 +481,10 @@ extends AbstractDelegationTokenIdentifier>
     DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
         password, trackingId);
 
-    if (currentTokens.get(id) == null) {
+    if (getTokenInfo(id) == null) {
       throw new InvalidToken("Renewal request for unknown token");
     }
-    currentTokens.put(id, info);
-    updateStoredToken(id, renewTime);
+    updateToken(id, info);
     return renewTime;
   }
   

+ 727 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java

@@ -0,0 +1,727 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.curator.ensemble.fixed.FixedEnsembleProvider;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.CuratorFrameworkFactory.Builder;
+import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.imps.DefaultACLProvider;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
+import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
+import org.apache.curator.framework.recipes.shared.SharedCount;
+import org.apache.curator.retry.RetryNTimes;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooDefs.Perms;
+import org.apache.zookeeper.client.ZooKeeperSaslClient;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+/**
+ * An implementation of {@link AbstractDelegationTokenSecretManager} that
+ * persists TokenIdentifiers and DelegationKeys in Zookeeper. This class can
+ * be used by HA (Highly available) services that consists of multiple nodes.
+ * This class ensures that Identifiers and Keys are replicated to all nodes of
+ * the service.
+ */
+@InterfaceAudience.Private
+public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
+    extends AbstractDelegationTokenSecretManager<TokenIdent> {
+
+  private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
+  public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
+      + "zkNumRetries";
+  public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
+      + "zkSessionTimeout";
+  public static final String ZK_DTSM_ZK_CONNECTION_TIMEOUT = ZK_CONF_PREFIX
+      + "zkConnectionTimeout";
+  public static final String ZK_DTSM_ZNODE_WORKING_PATH = ZK_CONF_PREFIX
+      + "znodeWorkingPath";
+  public static final String ZK_DTSM_ZK_AUTH_TYPE = ZK_CONF_PREFIX
+      + "zkAuthType";
+  public static final String ZK_DTSM_ZK_CONNECTION_STRING = ZK_CONF_PREFIX
+      + "zkConnectionString";
+  public static final String ZK_DTSM_ZK_KERBEROS_KEYTAB = ZK_CONF_PREFIX
+      + "kerberos.keytab";
+  public static final String ZK_DTSM_ZK_KERBEROS_PRINCIPAL = ZK_CONF_PREFIX
+      + "kerberos.principal";
+
+  public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
+  public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
+  public static final int ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT = 10000;
+  public static final String ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT = "zkdtsm";
+
+  private static Logger LOG = LoggerFactory
+      .getLogger(ZKDelegationTokenSecretManager.class);
+
+  private static final String JAAS_LOGIN_ENTRY_NAME =
+      "ZKDelegationTokenSecretManagerClient";
+
+  private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
+  private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
+  private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
+  private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
+
+  private static final String DELEGATION_KEY_PREFIX = "DK_";
+  private static final String DELEGATION_TOKEN_PREFIX = "DT_";
+
+  private static final ThreadLocal<CuratorFramework> CURATOR_TL =
+      new ThreadLocal<CuratorFramework>();
+
+  public static void setCurator(CuratorFramework curator) {
+    CURATOR_TL.set(curator);
+  }
+
+  private final boolean isExternalClient;
+  private final CuratorFramework zkClient;
+  private SharedCount seqCounter;
+  private PathChildrenCache keyCache;
+  private PathChildrenCache tokenCache;
+  private ExecutorService listenerThreadPool;
+
+  public ZKDelegationTokenSecretManager(Configuration conf) {
+    super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
+        DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
+        conf.getLong(DelegationTokenManager.MAX_LIFETIME,
+            DelegationTokenManager.MAX_LIFETIME_DEFAULT) * 1000,
+        conf.getLong(DelegationTokenManager.RENEW_INTERVAL,
+            DelegationTokenManager.RENEW_INTERVAL_DEFAULT * 1000),
+        conf.getLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL,
+            DelegationTokenManager.REMOVAL_SCAN_INTERVAL_DEFAULT) * 1000);
+    if (CURATOR_TL.get() != null) {
+      zkClient = CURATOR_TL.get();
+      isExternalClient = true;
+    } else {
+      String connString = conf.get(ZK_DTSM_ZK_CONNECTION_STRING);
+      Preconditions.checkNotNull(connString,
+          "Zookeeper connection string cannot be null");
+      String authType = conf.get(ZK_DTSM_ZK_AUTH_TYPE);
+
+      // AuthType has to be explicitly set to 'none' or 'sasl'
+      Preconditions.checkNotNull(authType, "Zookeeper authType cannot be null !!");
+      Preconditions.checkArgument(
+          authType.equals("sasl") || authType.equals("none"),
+          "Zookeeper authType must be one of [none, sasl]");
+
+      Builder builder = null;
+      try {
+        ACLProvider aclProvider = null;
+        if (authType.equals("sasl")) {
+          LOG.info("Connecting to ZooKeeper with SASL/Kerberos"
+              + "and using 'sasl' ACLs");
+          String principal = setJaasConfiguration(conf);
+          System.setProperty(ZooKeeperSaslClient.LOGIN_CONTEXT_NAME_KEY,
+              JAAS_LOGIN_ENTRY_NAME);
+          System.setProperty("zookeeper.authProvider.1",
+              "org.apache.zookeeper.server.auth.SASLAuthenticationProvider");
+          aclProvider = new SASLOwnerACLProvider(principal);
+        } else { // "none"
+          LOG.info("Connecting to ZooKeeper without authentication");
+          aclProvider = new DefaultACLProvider(); // open to everyone
+        }
+        int sessionT =
+            conf.getInt(ZK_DTSM_ZK_SESSION_TIMEOUT,
+                ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT);
+        int numRetries =
+            conf.getInt(ZK_DTSM_ZK_NUM_RETRIES, ZK_DTSM_ZK_NUM_RETRIES_DEFAULT);
+        builder =
+            CuratorFrameworkFactory
+                .builder()
+                .aclProvider(aclProvider)
+                .namespace(
+                    conf.get(ZK_DTSM_ZNODE_WORKING_PATH,
+                        ZK_DTSM_ZNODE_WORKING_PATH_DEAFULT)
+                        + "/"
+                        + ZK_DTSM_NAMESPACE
+                )
+                .sessionTimeoutMs(sessionT)
+                .connectionTimeoutMs(
+                    conf.getInt(ZK_DTSM_ZK_CONNECTION_TIMEOUT,
+                        ZK_DTSM_ZK_CONNECTION_TIMEOUT_DEFAULT)
+                )
+                .retryPolicy(
+                    new RetryNTimes(numRetries, sessionT / numRetries));
+      } catch (Exception ex) {
+        throw new RuntimeException("Could not Load ZK acls or auth");
+      }
+      zkClient = builder.ensembleProvider(new FixedEnsembleProvider(connString))
+          .build();
+      isExternalClient = false;
+    }
+    listenerThreadPool = Executors.newFixedThreadPool(2);
+  }
+
+  private String setJaasConfiguration(Configuration config) throws Exception {
+    String keytabFile =
+        config.get(ZK_DTSM_ZK_KERBEROS_KEYTAB, "").trim();
+    if (keytabFile == null || keytabFile.length() == 0) {
+      throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_KEYTAB
+          + " must be specified");
+    }
+    String principal =
+        config.get(ZK_DTSM_ZK_KERBEROS_PRINCIPAL, "").trim();
+    if (principal == null || principal.length() == 0) {
+      throw new IllegalArgumentException(ZK_DTSM_ZK_KERBEROS_PRINCIPAL
+          + " must be specified");
+    }
+
+    JaasConfiguration jConf =
+        new JaasConfiguration(JAAS_LOGIN_ENTRY_NAME, principal, keytabFile);
+    javax.security.auth.login.Configuration.setConfiguration(jConf);
+    return principal.split("[/@]")[0];
+  }
+
+  /**
+   * Creates a programmatic version of a jaas.conf file. This can be used
+   * instead of writing a jaas.conf file and setting the system property,
+   * "java.security.auth.login.config", to point to that file. It is meant to be
+   * used for connecting to ZooKeeper.
+   */
+  @InterfaceAudience.Private
+  public static class JaasConfiguration extends
+      javax.security.auth.login.Configuration {
+
+    private static AppConfigurationEntry[] entry;
+    private String entryName;
+
+    /**
+     * Add an entry to the jaas configuration with the passed in name,
+     * principal, and keytab. The other necessary options will be set for you.
+     *
+     * @param entryName
+     *          The name of the entry (e.g. "Client")
+     * @param principal
+     *          The principal of the user
+     * @param keytab
+     *          The location of the keytab
+     */
+    public JaasConfiguration(String entryName, String principal, String keytab) {
+      this.entryName = entryName;
+      Map<String, String> options = new HashMap<String, String>();
+      options.put("keyTab", keytab);
+      options.put("principal", principal);
+      options.put("useKeyTab", "true");
+      options.put("storeKey", "true");
+      options.put("useTicketCache", "false");
+      options.put("refreshKrb5Config", "true");
+      String jaasEnvVar = System.getenv("HADOOP_JAAS_DEBUG");
+      if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
+        options.put("debug", "true");
+      }
+      entry = new AppConfigurationEntry[] {
+          new AppConfigurationEntry(getKrb5LoginModuleName(),
+              AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
+              options) };
+    }
+
+    @Override
+    public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
+      return (entryName.equals(name)) ? entry : null;
+    }
+
+    private String getKrb5LoginModuleName() {
+      String krb5LoginModuleName;
+      if (System.getProperty("java.vendor").contains("IBM")) {
+        krb5LoginModuleName = "com.ibm.security.auth.module.Krb5LoginModule";
+      } else {
+        krb5LoginModuleName = "com.sun.security.auth.module.Krb5LoginModule";
+      }
+      return krb5LoginModuleName;
+    }
+  }
+
+  @Override
+  public void startThreads() throws IOException {
+    if (!isExternalClient) {
+      try {
+        zkClient.start();
+      } catch (Exception e) {
+        throw new IOException("Could not start Curator Framework", e);
+      }
+    }
+    try {
+      seqCounter = new SharedCount(zkClient, ZK_DTSM_SEQNUM_ROOT, 0);
+      if (seqCounter != null) {
+        seqCounter.start();
+      }
+    } catch (Exception e) {
+      throw new IOException("Could not start Sequence Counter", e);
+    }
+    try {
+      createPersistentNode(ZK_DTSM_MASTER_KEY_ROOT);
+      createPersistentNode(ZK_DTSM_TOKENS_ROOT);
+    } catch (Exception e) {
+      throw new RuntimeException("Could not create ZK paths");
+    }
+    try {
+      keyCache = new PathChildrenCache(zkClient, ZK_DTSM_MASTER_KEY_ROOT, true);
+      if (keyCache != null) {
+        keyCache.start(StartMode.POST_INITIALIZED_EVENT);
+        keyCache.getListenable().addListener(new PathChildrenCacheListener() {
+          @Override
+          public void childEvent(CuratorFramework client,
+              PathChildrenCacheEvent event)
+              throws Exception {
+            switch (event.getType()) {
+            case CHILD_ADDED:
+              processKeyAddOrUpdate(event.getData().getData());
+              break;
+            case CHILD_UPDATED:
+              processKeyAddOrUpdate(event.getData().getData());
+              break;
+            case CHILD_REMOVED:
+              processKeyRemoved(event.getData().getPath());
+              break;
+            default:
+              break;
+            }
+          }
+        }, listenerThreadPool);
+      }
+    } catch (Exception e) {
+      throw new IOException("Could not start PathChildrenCache for keys", e);
+    }
+    try {
+      tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
+      if (tokenCache != null) {
+        tokenCache.start(StartMode.POST_INITIALIZED_EVENT);
+        tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
+
+          @Override
+          public void childEvent(CuratorFramework client,
+              PathChildrenCacheEvent event) throws Exception {
+            switch (event.getType()) {
+            case CHILD_ADDED:
+              processTokenAddOrUpdate(event.getData().getData());
+              break;
+            case CHILD_UPDATED:
+              processTokenAddOrUpdate(event.getData().getData());
+              break;
+            case CHILD_REMOVED:
+              processTokenRemoved(event.getData().getData());
+              break;
+            default:
+              break;
+            }
+          }
+        }, listenerThreadPool);
+      }
+    } catch (Exception e) {
+      throw new IOException("Could not start PathChildrenCache for tokens", e);
+    }
+    super.startThreads();
+  }
+
+  private void processKeyAddOrUpdate(byte[] data) throws IOException {
+    ByteArrayInputStream bin = new ByteArrayInputStream(data);
+    DataInputStream din = new DataInputStream(bin);
+    DelegationKey key = new DelegationKey();
+    key.readFields(din);
+    allKeys.put(key.getKeyId(), key);
+  }
+
+  private void processKeyRemoved(String path) {
+    int i = path.lastIndexOf('/');
+    if (i > 0) {
+      String tokSeg = path.substring(i + 1);
+      int j = tokSeg.indexOf('_');
+      if (j > 0) {
+        int keyId = Integer.parseInt(tokSeg.substring(j + 1));
+        allKeys.remove(keyId);
+      }
+    }
+  }
+
+  private void processTokenAddOrUpdate(byte[] data) throws IOException {
+    ByteArrayInputStream bin = new ByteArrayInputStream(data);
+    DataInputStream din = new DataInputStream(bin);
+    TokenIdent ident = createIdentifier();
+    ident.readFields(din);
+    long renewDate = din.readLong();
+    int pwdLen = din.readInt();
+    byte[] password = new byte[pwdLen];
+    int numRead = din.read(password, 0, pwdLen);
+    if (numRead > -1) {
+      DelegationTokenInformation tokenInfo =
+          new DelegationTokenInformation(renewDate, password);
+      currentTokens.put(ident, tokenInfo);
+    }
+  }
+
+  private void processTokenRemoved(byte[] data) throws IOException {
+    ByteArrayInputStream bin = new ByteArrayInputStream(data);
+    DataInputStream din = new DataInputStream(bin);
+    TokenIdent ident = createIdentifier();
+    ident.readFields(din);
+    currentTokens.remove(ident);
+  }
+
+  @Override
+  public void stopThreads() {
+    try {
+      if (!isExternalClient && (zkClient != null)) {
+        zkClient.close();
+      }
+      if (seqCounter != null) {
+        seqCounter.close();
+      }
+      if (keyCache != null) {
+        keyCache.close();
+      }
+      if (tokenCache != null) {
+        tokenCache.close();
+      }
+    } catch (Exception e) {
+      LOG.error("Could not stop Curator Framework", e);
+      // Ignore
+    }
+    super.stopThreads();
+  }
+
+  private void createPersistentNode(String nodePath) throws Exception {
+    try {
+      zkClient.create().withMode(CreateMode.PERSISTENT).forPath(nodePath);
+    } catch (KeeperException.NodeExistsException ne) {
+      LOG.debug(nodePath + " znode already exists !!");
+    } catch (Exception e) {
+      throw new IOException(nodePath + " znode could not be created !!", e);
+    }
+  }
+
+  @Override
+  protected int getDelegationTokenSeqNum() {
+    return seqCounter.getCount();
+  }
+
+  @Override
+  protected int incrementDelegationTokenSeqNum() {
+    try {
+      while (!seqCounter.trySetCount(seqCounter.getCount() + 1)) {
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Could not increment shared counter !!", e);
+    }
+    return seqCounter.getCount();
+  }
+
+  @Override
+  protected void setDelegationTokenSeqNum(int seqNum) {
+    delegationTokenSequenceNumber = seqNum;
+  }
+
+  @Override
+  protected DelegationKey getDelegationKey(int keyId) {
+    // First check if its I already have this key
+    DelegationKey key = allKeys.get(keyId);
+    // Then query ZK
+    if (key == null) {
+      try {
+        key = getKeyFromZK(keyId);
+        if (key != null) {
+          allKeys.put(keyId, key);
+        }
+      } catch (IOException e) {
+        LOG.error("Error retrieving key [" + keyId + "] from ZK", e);
+      }
+    }
+    return key;
+  }
+
+  private DelegationKey getKeyFromZK(int keyId) throws IOException {
+    String nodePath =
+        getNodePath(ZK_DTSM_MASTER_KEY_ROOT, DELEGATION_KEY_PREFIX + keyId);
+    try {
+      byte[] data = zkClient.getData().forPath(nodePath);
+      if ((data == null) || (data.length == 0)) {
+        return null;
+      }
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      DataInputStream din = new DataInputStream(bin);
+      DelegationKey key = new DelegationKey();
+      key.readFields(din);
+      return key;
+    } catch (KeeperException.NoNodeException e) {
+      LOG.error("No node in path [" + nodePath + "]");
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return null;
+  }
+
+  @Override
+  protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
+    // First check if I have this..
+    DelegationTokenInformation tokenInfo = currentTokens.get(ident);
+    // Then query ZK
+    if (tokenInfo == null) {
+      try {
+        tokenInfo = getTokenInfoFromZK(ident);
+        if (tokenInfo != null) {
+          currentTokens.put(ident, tokenInfo);
+        }
+      } catch (IOException e) {
+        LOG.error("Error retrieving tokenInfo [" + ident.getSequenceNumber()
+            + "] from ZK", e);
+      }
+    }
+    return tokenInfo;
+  }
+
+  private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
+      throws IOException {
+    String nodePath =
+        getNodePath(ZK_DTSM_TOKENS_ROOT,
+            DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
+    try {
+      byte[] data = zkClient.getData().forPath(nodePath);
+      if ((data == null) || (data.length == 0)) {
+        return null;
+      }
+      ByteArrayInputStream bin = new ByteArrayInputStream(data);
+      DataInputStream din = new DataInputStream(bin);
+      createIdentifier().readFields(din);
+      long renewDate = din.readLong();
+      int pwdLen = din.readInt();
+      byte[] password = new byte[pwdLen];
+      int numRead = din.read(password, 0, pwdLen);
+      if (numRead > -1) {
+        DelegationTokenInformation tokenInfo =
+            new DelegationTokenInformation(renewDate, password);
+        return tokenInfo;
+      }
+    } catch (KeeperException.NoNodeException e) {
+      LOG.error("No node in path [" + nodePath + "]");
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    }
+    return null;
+  }
+
+  @Override
+  protected void storeDelegationKey(DelegationKey key) throws IOException {
+    allKeys.put(key.getKeyId(), key);
+    addOrUpdateDelegationKey(key, false);
+  }
+
+  @Override
+  protected void updateDelegationKey(DelegationKey key) throws IOException {
+    allKeys.put(key.getKeyId(), key);
+    addOrUpdateDelegationKey(key, true);
+  }
+
+  private void addOrUpdateDelegationKey(DelegationKey key, boolean isUpdate)
+      throws IOException {
+    String nodeCreatePath =
+        getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
+            DELEGATION_KEY_PREFIX + key.getKeyId());
+    ByteArrayOutputStream os = new ByteArrayOutputStream();
+    DataOutputStream fsOut = new DataOutputStream(os);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Storing ZKDTSMDelegationKey_" + key.getKeyId());
+    }
+    key.write(fsOut);
+    try {
+      if (zkClient.checkExists().forPath(nodeCreatePath) != null) {
+        zkClient.setData().forPath(nodeCreatePath, os.toByteArray())
+            .setVersion(-1);
+        if (!isUpdate) {
+          LOG.debug("Key with path [" + nodeCreatePath
+              + "] already exists.. Updating !!");
+        }
+      } else {
+        zkClient.create().withMode(CreateMode.PERSISTENT)
+            .forPath(nodeCreatePath, os.toByteArray());
+        if (isUpdate) {
+          LOG.debug("Updating non existent Key path [" + nodeCreatePath
+              + "].. Adding new !!");
+        }
+      }
+    } catch (KeeperException.NodeExistsException ne) {
+      LOG.debug(nodeCreatePath + " znode already exists !!");
+    } catch (Exception ex) {
+      throw new IOException(ex);
+    } finally {
+      os.close();
+    }
+  }
+
+  @Override
+  protected void removeStoredMasterKey(DelegationKey key) {
+    String nodeRemovePath =
+        getNodePath(ZK_DTSM_MASTER_KEY_ROOT,
+            DELEGATION_KEY_PREFIX + key.getKeyId());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing ZKDTSMDelegationKey_" + key.getKeyId());
+    }
+    try {
+      if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+        zkClient.delete().forPath(nodeRemovePath);
+      } else {
+        LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
+      }
+    } catch (Exception e) {
+      LOG.debug(nodeRemovePath + " znode could not be removed!!");
+    }
+  }
+
+  @Override
+  protected void storeToken(TokenIdent ident,
+      DelegationTokenInformation tokenInfo) throws IOException {
+    currentTokens.put(ident, tokenInfo);
+    try {
+      addOrUpdateToken(ident, tokenInfo, false);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected void updateToken(TokenIdent ident,
+      DelegationTokenInformation tokenInfo) throws IOException {
+    currentTokens.put(ident, tokenInfo);
+    String nodeRemovePath =
+        getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+            + ident.getSequenceNumber());
+    try {
+      if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+        addOrUpdateToken(ident, tokenInfo, false);
+        LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
+      } else {
+        addOrUpdateToken(ident, tokenInfo, true);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Could not update Stored Token ZKDTSMDelegationToken_"
+          + ident.getSequenceNumber(), e);
+    }
+  }
+
+  @Override
+  protected void removeStoredToken(TokenIdent ident)
+      throws IOException {
+    String nodeRemovePath =
+        getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+            + ident.getSequenceNumber());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing ZKDTSMDelegationToken_"
+          + ident.getSequenceNumber());
+    }
+    try {
+      if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
+        LOG.debug("Attempted to remove a non-existing znode " + nodeRemovePath);
+      } else {
+        zkClient.delete().forPath(nodeRemovePath);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          "Could not remove Stored Token ZKDTSMDelegationToken_"
+          + ident.getSequenceNumber(), e);
+    }
+  }
+
+  private void addOrUpdateToken(TokenIdent ident,
+      DelegationTokenInformation info, boolean isUpdate) throws Exception {
+    String nodeCreatePath =
+        getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
+            + ident.getSequenceNumber());
+    ByteArrayOutputStream tokenOs = new ByteArrayOutputStream();
+    DataOutputStream tokenOut = new DataOutputStream(tokenOs);
+    ByteArrayOutputStream seqOs = new ByteArrayOutputStream();
+
+    try {
+      ident.write(tokenOut);
+      tokenOut.writeLong(info.getRenewDate());
+      tokenOut.writeInt(info.getPassword().length);
+      tokenOut.write(info.getPassword());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug((isUpdate ? "Storing " : "Updating ")
+            + "ZKDTSMDelegationToken_" +
+            ident.getSequenceNumber());
+      }
+      if (isUpdate) {
+        zkClient.setData().forPath(nodeCreatePath, tokenOs.toByteArray())
+            .setVersion(-1);
+      } else {
+        zkClient.create().withMode(CreateMode.PERSISTENT)
+            .forPath(nodeCreatePath, tokenOs.toByteArray());
+      }
+    } finally {
+      seqOs.close();
+    }
+  }
+
+  /**
+   * Simple implementation of an {@link ACLProvider} that simply returns an ACL
+   * that gives all permissions only to a single principal.
+   */
+  private static class SASLOwnerACLProvider implements ACLProvider {
+
+    private final List<ACL> saslACL;
+
+    private SASLOwnerACLProvider(String principal) {
+      this.saslACL = Collections.singletonList(
+          new ACL(Perms.ALL, new Id("sasl", principal)));
+    }
+
+    @Override
+    public List<ACL> getDefaultAcl() {
+      return saslACL;
+    }
+
+    @Override
+    public List<ACL> getAclForPath(String path) {
+      return saslACL;
+    }
+  }
+
+  @VisibleForTesting
+  @Private
+  @Unstable
+  static String getNodePath(String root, String nodeName) {
+    return (root + "/" + nodeName);
+  }
+}

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationFilter.java

@@ -18,6 +18,7 @@
 package org.apache.hadoop.security.token.delegation.web;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -28,9 +29,11 @@ import org.apache.hadoop.security.authentication.server.AuthenticationHandler;
 import org.apache.hadoop.security.authentication.server.AuthenticationToken;
 import org.apache.hadoop.security.authentication.server.KerberosAuthenticationHandler;
 import org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
+import org.apache.hadoop.security.authentication.util.ZKSignerSecretProvider;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
 import org.apache.hadoop.util.HttpExceptionUtils;
 import org.apache.http.NameValuePair;
 import org.apache.http.client.utils.URLEncodedUtils;
@@ -153,7 +156,14 @@ public class DelegationTokenAuthenticationFilter
 
   @Override
   public void init(FilterConfig filterConfig) throws ServletException {
+    // A single CuratorFramework should be used for a ZK cluster.
+    // If the ZKSignerSecretProvider has already created it, it has to
+    // be set here... to be used by the ZKDelegationTokenSecretManager
+    ZKDelegationTokenSecretManager.setCurator((CuratorFramework)
+        filterConfig.getServletContext().getAttribute(ZKSignerSecretProvider.
+            ZOOKEEPER_SIGNER_SECRET_PROVIDER_CURATOR_CLIENT_ATTRIBUTE));
     super.init(filterConfig);
+    ZKDelegationTokenSecretManager.setCurator(null);
     AuthenticationHandler handler = getAuthenticationHandler();
     AbstractDelegationTokenSecretManager dtSecretManager =
         (AbstractDelegationTokenSecretManager) filterConfig.getServletContext().

+ 1 - 25
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java

@@ -78,19 +78,6 @@ public abstract class DelegationTokenAuthenticationHandler
 
   public static final String TOKEN_KIND = PREFIX + "token-kind";
 
-  public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
-  public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
-
-  public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
-  public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
-
-  public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
-  public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
-
-  public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
-      "removal-scan-interval.sec";
-  public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
-
   private static final Set<String> DELEGATION_TOKEN_OPS = new HashSet<String>();
 
   static final String DELEGATION_TOKEN_UGI_ATTRIBUTE =
@@ -142,7 +129,6 @@ public abstract class DelegationTokenAuthenticationHandler
   @VisibleForTesting
   @SuppressWarnings("unchecked")
   public void initTokenManager(Properties config) {
-    String configPrefix = authHandler.getType() + ".";
     Configuration conf = new Configuration(false);
     for (Map.Entry entry : config.entrySet()) {
       conf.set((String) entry.getKey(), (String) entry.getValue());
@@ -153,17 +139,7 @@ public abstract class DelegationTokenAuthenticationHandler
           "The configuration does not define the token kind");
     }
     tokenKind = tokenKind.trim();
-    long updateInterval = conf.getLong(configPrefix + UPDATE_INTERVAL,
-        UPDATE_INTERVAL_DEFAULT);
-    long maxLifeTime = conf.getLong(configPrefix + MAX_LIFETIME,
-        MAX_LIFETIME_DEFAULT);
-    long renewInterval = conf.getLong(configPrefix + RENEW_INTERVAL,
-        RENEW_INTERVAL_DEFAULT);
-    long removalScanInterval = conf.getLong(
-        configPrefix + REMOVAL_SCAN_INTERVAL, REMOVAL_SCAN_INTERVAL_DEFAULT);
-    tokenManager = new DelegationTokenManager(new Text(tokenKind),
-        updateInterval * 1000, maxLifeTime * 1000, renewInterval * 1000,
-        removalScanInterval * 1000);
+    tokenManager = new DelegationTokenManager(conf, new Text(tokenKind));
     tokenManager.init();
   }
 

+ 57 - 19
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenManager.java

@@ -17,16 +17,20 @@
  */
 package org.apache.hadoop.security.token.delegation.web;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
+import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * Delegation Token Manager used by the
@@ -35,20 +39,36 @@ import java.io.IOException;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
-class DelegationTokenManager {
+public class DelegationTokenManager {
+
+  public static final String ENABLE_ZK_KEY = "zk-dt-secret-manager.enable";
+
+  public static final String PREFIX = "delegation-token.";
+
+  public static final String UPDATE_INTERVAL = PREFIX + "update-interval.sec";
+  public static final long UPDATE_INTERVAL_DEFAULT = 24 * 60 * 60;
+
+  public static final String MAX_LIFETIME = PREFIX + "max-lifetime.sec";
+  public static final long MAX_LIFETIME_DEFAULT = 7 * 24 * 60 * 60;
+
+  public static final String RENEW_INTERVAL = PREFIX + "renew-interval.sec";
+  public static final long RENEW_INTERVAL_DEFAULT = 24 * 60 * 60;
+
+  public static final String REMOVAL_SCAN_INTERVAL = PREFIX +
+      "removal-scan-interval.sec";
+  public static final long REMOVAL_SCAN_INTERVAL_DEFAULT = 60 * 60;
 
   private static class DelegationTokenSecretManager
       extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
 
     private Text tokenKind;
 
-    public DelegationTokenSecretManager(Text tokenKind,
-        long delegationKeyUpdateInterval,
-        long delegationTokenMaxLifetime,
-        long delegationTokenRenewInterval,
-        long delegationTokenRemoverScanInterval) {
-      super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
-          delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+    public DelegationTokenSecretManager(Configuration conf, Text tokenKind) {
+      super(conf.getLong(UPDATE_INTERVAL, UPDATE_INTERVAL_DEFAULT) * 1000,
+          conf.getLong(MAX_LIFETIME, MAX_LIFETIME_DEFAULT) * 1000,
+          conf.getLong(RENEW_INTERVAL, RENEW_INTERVAL_DEFAULT) * 1000,
+          conf.getLong(REMOVAL_SCAN_INTERVAL,
+              REMOVAL_SCAN_INTERVAL_DEFAULT * 1000));
       this.tokenKind = tokenKind;
     }
 
@@ -56,21 +76,34 @@ class DelegationTokenManager {
     public DelegationTokenIdentifier createIdentifier() {
       return new DelegationTokenIdentifier(tokenKind);
     }
+  }
+
+  private static class ZKSecretManager
+      extends ZKDelegationTokenSecretManager<DelegationTokenIdentifier> {
+
+    private Text tokenKind;
+
+    public ZKSecretManager(Configuration conf, Text tokenKind) {
+      super(conf);
+      this.tokenKind = tokenKind;
+    }
 
+    @Override
+    public DelegationTokenIdentifier createIdentifier() {
+      return new DelegationTokenIdentifier(tokenKind);
+    }
   }
 
   private AbstractDelegationTokenSecretManager secretManager = null;
   private boolean managedSecretManager;
   private Text tokenKind;
 
-  public DelegationTokenManager(Text tokenKind,
-      long delegationKeyUpdateInterval,
-      long delegationTokenMaxLifetime,
-      long delegationTokenRenewInterval,
-      long delegationTokenRemoverScanInterval) {
-    this.secretManager = new DelegationTokenSecretManager(tokenKind,
-        delegationKeyUpdateInterval, delegationTokenMaxLifetime,
-        delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
+  public DelegationTokenManager(Configuration conf, Text tokenKind) {
+    if (conf.getBoolean(ENABLE_ZK_KEY, false)) {
+      this.secretManager = new ZKSecretManager(conf, tokenKind);
+    } else {
+      this.secretManager = new DelegationTokenSecretManager(conf, tokenKind);
+    }
     this.tokenKind = tokenKind;
     managedSecretManager = true;
   }
@@ -150,4 +183,9 @@ class DelegationTokenManager {
     return id.getUser();
   }
 
+  @VisibleForTesting
+  @SuppressWarnings("rawtypes")
+  public AbstractDelegationTokenSecretManager getDelegationTokenSecretManager() {
+    return secretManager;
+  }
 }

+ 2 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestDelegationToken.java

@@ -121,7 +121,7 @@ public class TestDelegationToken {
 
     @Override
     protected void storeNewToken(TestDelegationTokenIdentifier ident,
-        long renewDate) {
+        long renewDate) throws IOException {
       super.storeNewToken(ident, renewDate);
       isStoreNewTokenCalled = true;
     }
@@ -135,7 +135,7 @@ public class TestDelegationToken {
 
     @Override
     protected void updateStoredToken(TestDelegationTokenIdentifier ident,
-        long renewDate) {
+        long renewDate) throws IOException {
       super.updateStoredToken(ident, renewDate);
       isUpdateStoredTokenCalled = true;
     }

+ 68 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java

@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestZKDelegationTokenSecretManager {
+
+  private static final long DAY_IN_SECS = 86400;
+
+  @Test
+  public void testZKDelTokSecretManager() throws Exception {
+    TestingServer zkServer = new TestingServer();
+    DelegationTokenManager tm1, tm2 = null;
+    zkServer.start();
+    try {
+      String connectString = zkServer.getConnectString();
+      Configuration conf = new Configuration();
+      conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true);
+      conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString);
+      conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath");
+      conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none");
+      conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
+      conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
+      conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
+      conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
+      tm1 = new DelegationTokenManager(conf, new Text("foo"));
+      tm1.init();
+      tm2 = new DelegationTokenManager(conf, new Text("foo"));
+      tm2.init();
+
+      Token<DelegationTokenIdentifier> token =
+          tm1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      Assert.assertNotNull(token);
+      tm2.verifyToken(token);
+
+      token = tm2.createToken(UserGroupInformation.getCurrentUser(), "bar");
+      Assert.assertNotNull(token);
+      tm1.verifyToken(token);
+    } finally {
+      zkServer.close();
+    }
+  }
+}

+ 9 - 8
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/web/TestDelegationTokenManager.java

@@ -17,27 +17,28 @@
  */
 package org.apache.hadoop.security.token.delegation.web;
 
+import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Arrays;
-
 public class TestDelegationTokenManager {
 
   private static final long DAY_IN_SECS = 86400;
 
   @Test
   public void testDTManager() throws Exception {
-    DelegationTokenManager tm = new DelegationTokenManager(new Text("foo"),
-        DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS, DAY_IN_SECS);
+    Configuration conf = new Configuration(false);
+    conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS);
+    conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS);
+    conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS);
+    conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS);
+    DelegationTokenManager tm =
+        new DelegationTokenManager(conf, new Text("foo"));
     tm.init();
     Token<DelegationTokenIdentifier> token =
         tm.createToken(UserGroupInformation.getCurrentUser(), "foo");

+ 10 - 0
hadoop-project/pom.xml

@@ -864,6 +864,16 @@
        <version>2.9.1</version>
      </dependency>
 
+     <dependency>
+       <groupId>org.apache.curator</groupId>
+       <artifactId>curator-recipes</artifactId>
+       <version>2.6.0</version>
+     </dependency>
+     <dependency>
+       <groupId>org.apache.curator</groupId>
+       <artifactId>curator-client</artifactId>
+       <version>2.6.0</version>
+     </dependency>
      <dependency>
        <groupId>org.apache.curator</groupId>
        <artifactId>curator-framework</artifactId>