Browse Source

HADOOP-14445. Delegation tokens are not shared between KMS instances. Contributed by Xiao Chen and Rushabh S Shah.

Xiao Chen 7 years ago
parent
commit
714a079ffb
18 changed files with 1176 additions and 201 deletions
  1. 118 96
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
  2. 21 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java
  3. 56 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java
  4. 103 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java
  5. 18 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java
  6. 10 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
  7. 16 5
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
  8. 5 3
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
  9. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java
  10. 43 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
  11. 49 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java
  12. 1 0
      hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
  13. 2 1
      hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
  14. 20 0
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  15. 166 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
  16. 51 16
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
  17. 65 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java
  18. 431 76
      hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java

+ 118 - 96
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java

@@ -36,11 +36,11 @@ import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.TokenSelector;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
 import org.apache.hadoop.util.HttpExceptionUtils;
-import org.apache.hadoop.util.KMSUtil;
 import org.apache.http.client.utils.URIBuilder;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.slf4j.Logger;
@@ -82,6 +82,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT;
+
 /**
  * KMS client <code>KeyProvider</code> implementation.
  */
@@ -89,16 +92,13 @@ import com.google.common.base.Strings;
 public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     KeyProviderDelegationTokenExtension.DelegationTokenExtension {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(KMSClientProvider.class);
 
   private static final String INVALID_SIGNATURE = "Invalid signature";
 
   private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
 
-  public static final String TOKEN_KIND_STR = KMSDelegationToken.TOKEN_KIND_STR;
-  public static final Text TOKEN_KIND = KMSDelegationToken.TOKEN_KIND;
-
   public static final String SCHEME_NAME = "kms";
 
   private static final String UTF8 = "UTF-8";
@@ -123,12 +123,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
 
   private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
 
+  /* dtService defines the token service value for the kms token.
+   * The value can be legacy format which is ip:port format or it can be uri.
+   * If it's uri format, then the value is read from
+   * CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH at key
+   * provider creation time, and set to token's Service field.
+   * When a token is renewed / canceled, its Service field will be used to
+   * instantiate a KeyProvider, eliminating the need to read configs
+    * at that time.
+   */
   private final Text dtService;
-
-  // Allow fallback to default kms server port 9600 for certain tests that do
-  // not specify the port explicitly in the kms provider url.
-  @VisibleForTesting
-  public static volatile boolean fallbackDefaultPortForTesting = false;
+  private final boolean copyLegacyToken;
 
   private class EncryptedQueueRefiller implements
     ValueQueue.QueueRefiller<EncryptedKeyVersion> {
@@ -152,66 +157,6 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     }
   }
 
-  /**
-   * The KMS implementation of {@link TokenRenewer}.
-   */
-  public static class KMSTokenRenewer extends TokenRenewer {
-    private static final Logger LOG =
-        LoggerFactory.getLogger(KMSTokenRenewer.class);
-
-    @Override
-    public boolean handleKind(Text kind) {
-      return kind.equals(TOKEN_KIND);
-    }
-
-    @Override
-    public boolean isManaged(Token<?> token) throws IOException {
-      return true;
-    }
-
-    @Override
-    public long renew(Token<?> token, Configuration conf) throws IOException {
-      LOG.debug("Renewing delegation token {}", token);
-      KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
-          KeyProviderFactory.KEY_PROVIDER_PATH);
-      try {
-        if (!(keyProvider instanceof
-            KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
-          LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ?
-              "null" : keyProvider.getClass());
-          return 0;
-        }
-        return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
-            keyProvider).renewDelegationToken(token);
-      } finally {
-        if (keyProvider != null) {
-          keyProvider.close();
-        }
-      }
-    }
-
-    @Override
-    public void cancel(Token<?> token, Configuration conf) throws IOException {
-      LOG.debug("Canceling delegation token {}", token);
-      KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
-          KeyProviderFactory.KEY_PROVIDER_PATH);
-      try {
-        if (!(keyProvider instanceof
-            KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
-          LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ?
-              "null" : keyProvider.getClass());
-          return;
-        }
-        ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
-            keyProvider).cancelDelegationToken(token);
-      } finally {
-        if (keyProvider != null) {
-          keyProvider.close();
-        }
-      }
-    }
-  }
-
   public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
     public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
         byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
@@ -334,13 +279,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
           }
           hostsPart = t[0];
         }
-        return createProvider(conf, origUrl, port, hostsPart);
+        return createProvider(conf, origUrl, port, hostsPart, providerUri);
       }
       return null;
     }
 
-    private KeyProvider createProvider(Configuration conf,
-        URL origUrl, int port, String hostsPart) throws IOException {
+    private KeyProvider createProvider(Configuration conf, URL origUrl,
+        int port, String hostsPart, URI providerUri) throws IOException {
       String[] hosts = hostsPart.split(";");
       KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
       for (int i = 0; i < hosts.length; i++) {
@@ -348,7 +293,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
           providers[i] =
               new KMSClientProvider(
                   new URI("kms", origUrl.getProtocol(), hosts[i], port,
-                      origUrl.getPath(), null, null), conf);
+                      origUrl.getPath(), null, null), conf, providerUri);
         } catch (URISyntaxException e) {
           throw new IOException("Could not instantiate KMSProvider.", e);
         }
@@ -425,17 +370,10 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     }
   }
 
-  public KMSClientProvider(URI uri, Configuration conf) throws IOException {
+  public KMSClientProvider(URI uri, Configuration conf, URI providerUri) throws
+      IOException {
     super(conf);
     kmsUrl = createServiceURL(extractKMSPath(uri));
-    int kmsPort = kmsUrl.getPort();
-    if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
-      kmsPort = 9600;
-    }
-
-    InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
-    dtService = SecurityUtil.buildTokenService(addr);
-
     if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
       sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
       try {
@@ -448,6 +386,9 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
             CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_SECONDS,
             CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_DEFAULT);
     authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
+    copyLegacyToken = conf.getBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY,
+        KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT);
+
     configurator = new TimeoutConnConfigurator(timeout, sslFactory);
     encKeyVersionQueue =
         new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
@@ -472,6 +413,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
                     KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
             new EncryptedQueueRefiller());
     authToken = new DelegationTokenAuthenticatedURL.Token();
+    dtService = new Text(providerUri.toString());
     LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
         " created.", kmsUrl, dtService);
   }
@@ -545,7 +487,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
         @Override
         public HttpURLConnection run() throws Exception {
           DelegationTokenAuthenticatedURL authUrl =
-              new DelegationTokenAuthenticatedURL(configurator);
+              createKMSAuthenticatedURL();
           return authUrl.openConnection(url, authToken, doAsUser);
         }
       });
@@ -914,7 +856,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
       LOG.debug("Renewing delegation token {} with url:{}, as:{}",
           token, url, doAsUser);
       final DelegationTokenAuthenticatedURL authUrl =
-          new DelegationTokenAuthenticatedURL(configurator);
+          createKMSAuthenticatedURL();
       return getActualUgi().doAs(
           new PrivilegedExceptionAction<Long>() {
             @Override
@@ -946,7 +888,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
               LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
                   dToken, url, doAsUser);
               final DelegationTokenAuthenticatedURL authUrl =
-                  new DelegationTokenAuthenticatedURL(configurator);
+                  createKMSAuthenticatedURL();
               authUrl.cancelDelegationToken(url, token, doAsUser);
               return null;
             }
@@ -998,6 +940,17 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     return token;
   }
 
+  @VisibleForTesting
+  DelegationTokenAuthenticatedURL createKMSAuthenticatedURL() {
+    return new DelegationTokenAuthenticatedURL(configurator) {
+      @Override
+      public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+          getDelegationToken(URL url, Credentials creds) {
+        return selectKMSDelegationToken(creds);
+      }
+    };
+  }
+
   @Override
   public Token<?>[] addDelegationTokens(final String renewer,
       Credentials credentials) throws IOException {
@@ -1006,7 +959,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     if (token == null) {
       final URL url = createURL(null, null, null, null);
       final DelegationTokenAuthenticatedURL authUrl =
-          new DelegationTokenAuthenticatedURL(configurator);
+          createKMSAuthenticatedURL();
       try {
         final String doAsUser = getDoAsUser();
         token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
@@ -1020,9 +973,16 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
           }
         });
         if (token != null) {
-          LOG.debug("New token received: ({})", token);
+          if (KMSDelegationToken.TOKEN_KIND.equals(token.getKind())) {
+            // do not set service for legacy kind, for compatibility.
+            token.setService(dtService);
+          }
+          LOG.info("New token created: ({})", token);
           credentials.addToken(token.getService(), token);
-          tokens = new Token<?>[] { token };
+          Token<?> legacyToken = createAndAddLegacyToken(credentials, token);
+          tokens = legacyToken == null ?
+              new Token<?>[] {token} :
+              new Token<?>[] {token, legacyToken};
         } else {
           throw new IOException("Got NULL as delegation token");
         }
@@ -1039,13 +999,75 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     return tokens;
   }
 
-  private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
-    // Add existing credentials from the UGI, since provider is cached.
-    Credentials creds = ugi.getCredentials();
+  /**
+   * If {@link CommonConfigurationKeysPublic#KMS_CLIENT_COPY_LEGACY_TOKEN_KEY}
+   * is true when creating the provider, then copy the passed-in token of
+   * {@link KMSDelegationToken#TOKEN_KIND} and create a new token of
+   * {@link KMSDelegationToken#TOKEN_LEGACY_KIND}, and add it to credentials.
+   *
+   * @return The legacy token, or null if one should not be created.
+   */
+  private Token<?> createAndAddLegacyToken(Credentials credentials,
+      Token<?> token) {
+    if (!copyLegacyToken || !KMSDelegationToken.TOKEN_KIND
+        .equals(token.getKind())) {
+      LOG.debug("Not creating legacy token because copyLegacyToken={}, "
+          + "token={}", copyLegacyToken, token);
+      return null;
+    }
+    // copy a KMS_DELEGATION_TOKEN and create a new kms-dt with the same
+    // underlying token for backwards-compatibility. Old clients/renewers
+    // does not parse the new token and can only work with kms-dt.
+    final Token<?> legacyToken = new Token(token);
+    legacyToken.setKind(KMSDelegationToken.TOKEN_LEGACY_KIND);
+    final InetSocketAddress addr =
+        new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
+    final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
+    legacyToken.setService(fallBackServiceText);
+    LOG.info("Copied token to legacy kind: {}", legacyToken);
+    credentials.addToken(legacyToken.getService(), legacyToken);
+    return legacyToken;
+  }
+
+  @VisibleForTesting
+  public Text getDelegationTokenService() {
+    return dtService;
+  }
+
+  /**
+   * Given a list of tokens, return the token that should be used for KMS
+   * authentication.
+   */
+  @VisibleForTesting
+  Token selectKMSDelegationToken(Credentials creds) {
+    // always look for TOKEN_KIND first
+    final TokenSelector<AbstractDelegationTokenIdentifier> tokenSelector =
+        new AbstractDelegationTokenSelector<AbstractDelegationTokenIdentifier>(
+            KMSDelegationToken.TOKEN_KIND) {
+        };
+    Token token = tokenSelector.selectToken(dtService, creds.getAllTokens());
+    LOG.debug("Searching service {} found token {}", dtService, token);
+    if (token != null) {
+      return token;
+    }
+
+    // fall back to look for token by service, regardless of kind.
+    // this is old behavior, keeping for compatibility reasons (for example,
+    // even if KMS server is new, if the job is submitted with an old kms
+    // client, job runners on new version should be able to find the token).
+    final InetSocketAddress addr =
+        new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
+    final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
+    token = creds.getToken(fallBackServiceText);
+    LOG.debug("Selected delegation token {} using service:{}", token,
+        fallBackServiceText);
+    return token;
+  }
+
+  private boolean containsKmsDt(UserGroupInformation ugi) {
+    final Credentials creds = ugi.getCredentials();
     if (!creds.getAllTokens().isEmpty()) {
-      LOG.debug("Searching for token that matches service: {}", dtService);
-      org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
-          dToken = creds.getToken(dtService);
+      final Token dToken = selectKMSDelegationToken(creds);
       if (dToken != null) {
         return true;
       }

+ 21 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java

@@ -27,7 +27,10 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier
 @InterfaceAudience.Private
 public final class KMSDelegationToken {
 
-  public static final String TOKEN_KIND_STR = "kms-dt";
+  public static final String TOKEN_LEGACY_KIND_STR = "kms-dt";
+  public static final Text TOKEN_LEGACY_KIND = new Text(TOKEN_LEGACY_KIND_STR);
+
+  public static final String TOKEN_KIND_STR = "KMS_DELEGATION_TOKEN";
   public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
 
   // Utility class is not supposed to be instantiated.
@@ -49,4 +52,21 @@ public final class KMSDelegationToken {
       return TOKEN_KIND;
     }
   }
+
+  /**
+   * DelegationTokenIdentifier used for the KMS for legacy tokens.
+   */
+  @Deprecated
+  public static class KMSLegacyDelegationTokenIdentifier
+      extends DelegationTokenIdentifier {
+
+    public KMSLegacyDelegationTokenIdentifier() {
+      super(TOKEN_LEGACY_KIND);
+    }
+
+    @Override
+    public Text getKind() {
+      return TOKEN_LEGACY_KIND;
+    }
+  }
 }

+ 56 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.KMSUtil;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
+
+/**
+ * The {@link KMSTokenRenewer} that supports legacy tokens.
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class KMSLegacyTokenRenewer extends KMSTokenRenewer {
+
+  @Override
+  public boolean handleKind(Text kind) {
+    return kind.equals(TOKEN_LEGACY_KIND);
+  }
+
+  /**
+   * Create a key provider for token renewal / cancellation.
+   * Caller is responsible for closing the key provider.
+   */
+  @Override
+  protected KeyProvider createKeyProvider(Token<?> token,
+      Configuration conf) throws IOException {
+    assert token.getKind().equals(TOKEN_LEGACY_KIND);
+    // Legacy tokens get service from configuration.
+    return KMSUtil.createKeyProvider(conf,
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+  }
+}

+ 103 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.util.KMSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+
+/**
+ * The KMS implementation of {@link TokenRenewer}.
+ */
+@InterfaceAudience.Private
+public class KMSTokenRenewer extends TokenRenewer {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(org.apache.hadoop.crypto.key.kms.KMSTokenRenewer.class);
+
+  @Override
+  public boolean handleKind(Text kind) {
+    return kind.equals(TOKEN_KIND);
+  }
+
+  @Override
+  public boolean isManaged(Token<?> token) throws IOException {
+    return true;
+  }
+
+  @Override
+  public long renew(Token<?> token, Configuration conf) throws IOException {
+    LOG.debug("Renewing delegation token {}", token);
+    final KeyProvider keyProvider = createKeyProvider(token, conf);
+    try {
+      if (!(keyProvider instanceof
+          KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
+        LOG.warn("keyProvider {} cannot renew token {}.",
+            keyProvider == null ? "null" : keyProvider.getClass(), token);
+        return 0;
+      }
+      return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
+          keyProvider).renewDelegationToken(token);
+    } finally {
+      if (keyProvider != null) {
+        keyProvider.close();
+      }
+    }
+  }
+
+  @Override
+  public void cancel(Token<?> token, Configuration conf) throws IOException {
+    LOG.debug("Canceling delegation token {}", token);
+    final KeyProvider keyProvider = createKeyProvider(token, conf);
+    try {
+      if (!(keyProvider instanceof
+          KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
+        LOG.warn("keyProvider {} cannot cancel token {}.",
+            keyProvider == null ? "null" : keyProvider.getClass(), token);
+        return;
+      }
+      ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
+          keyProvider).cancelDelegationToken(token);
+    } finally {
+      if (keyProvider != null) {
+        keyProvider.close();
+      }
+    }
+  }
+
+  /**
+   * Create a key provider for token renewal / cancellation.
+   * Caller is responsible for closing the key provider.
+   */
+  protected KeyProvider createKeyProvider(Token<?> token,
+      Configuration conf) throws IOException {
+    return KMSUtil
+        .createKeyProviderFromTokenService(conf, token.getService().toString());
+  }
+}

+ 18 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java

@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -716,6 +716,16 @@ public class CommonConfigurationKeysPublic {
   /**  Default value is 100 ms. */
   public static final int KMS_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT  = 100;
 
+  /**
+   * @see
+   * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
+   * core-default.xml</a>
+   */
+  public static final String KMS_CLIENT_COPY_LEGACY_TOKEN_KEY =
+      "hadoop.security.kms.client.copy.legacy.token";
+  /**  Default value is true. */
+  public static final boolean KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT = true;
+
   /**
    * @see
    * <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">

+ 16 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java

@@ -300,11 +300,7 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
             creds.getAllTokens());
       }
       if (!creds.getAllTokens().isEmpty()) {
-        InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
-            url.getPort());
-        Text service = SecurityUtil.buildTokenService(serviceAddr);
-        dToken = creds.getToken(service);
-        LOG.debug("Using delegation token {} from service:{}", dToken, service);
+        dToken = getDelegationToken(url, creds);
         if (dToken != null) {
           if (useQueryStringForDelegationToken()) {
             // delegation token will go in the query string, injecting it
@@ -340,6 +336,21 @@ public class DelegationTokenAuthenticatedURL extends AuthenticatedURL {
     return conn;
   }
 
+  /**
+   * Select a delegation token from all tokens in credentials, based on url.
+   */
+  @InterfaceAudience.Private
+  public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+      getDelegationToken(URL url, Credentials creds) {
+    final InetSocketAddress serviceAddr =
+        new InetSocketAddress(url.getHost(), url.getPort());
+    final Text service = SecurityUtil.buildTokenService(serviceAddr);
+    org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
+        creds.getToken(service);
+    LOG.debug("Selected delegation token {} using service:{}", dToken, service);
+    return dToken;
+  }
+
   /**
    * Requests a delegation token using the configured <code>Authenticator</code>
    * for authentication.

+ 5 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java

@@ -79,7 +79,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceStability.Evolving
 public abstract class DelegationTokenAuthenticationHandler
     implements AuthenticationHandler {
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(DelegationTokenAuthenticationHandler.class);
 
   protected static final String TYPE_POSTFIX = "-dt";
@@ -194,7 +194,8 @@ public abstract class DelegationTokenAuthenticationHandler
       HttpServletRequest request, HttpServletResponse response)
       throws IOException, AuthenticationException {
     boolean requestContinues = true;
-    LOG.trace("Processing operation for req=({}), token: {}", request, token);
+    LOG.trace("Processing operation for req=({}), token: {}",
+        request.getRequestURL(), token);
     String op = ServletUtils.getParameter(request,
         KerberosDelegationTokenAuthenticator.OP_PARAM);
     op = (op != null) ? StringUtils.toUpperCase(op) : null;
@@ -377,7 +378,8 @@ public abstract class DelegationTokenAuthenticationHandler
             HttpServletResponse.SC_FORBIDDEN, new AuthenticationException(ex));
       }
     } else {
-      LOG.debug("Falling back to {} (req={})", authHandler.getClass(), request);
+      LOG.debug("Falling back to {} (req={})", authHandler.getClass(),
+          request.getRequestURL());
       token = authHandler.authenticate(request, response);
     }
     return token;

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java

@@ -48,7 +48,7 @@ import java.util.Map;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class DelegationTokenAuthenticator implements Authenticator {
-  private static Logger LOG = 
+  public static final Logger LOG =
       LoggerFactory.getLogger(DelegationTokenAuthenticator.class);
   
   private static final String CONTENT_TYPE = "Content-Type";

+ 43 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java

@@ -26,14 +26,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 /**
  * Utils for KMS.
  */
 @InterfaceAudience.Private
 public final class KMSUtil {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(KMSUtil.class);
+  public static final Logger LOG = LoggerFactory.getLogger(KMSUtil.class);
 
   private KMSUtil() { /* Hidden constructor */ }
 
@@ -55,6 +55,13 @@ public final class KMSUtil {
     if (providerUriStr == null || providerUriStr.isEmpty()) {
       return null;
     }
+    KeyProvider kp = KMSUtilFaultInjector.get().createKeyProviderForTests(
+        providerUriStr, conf);
+    if (kp != null) {
+      LOG.info("KeyProvider is created with uri: {}. This should happen only " +
+              "in tests.", providerUriStr);
+      return kp;
+    }
     return createKeyProviderFromUri(conf, URI.create(providerUriStr));
   }
 
@@ -71,4 +78,38 @@ public final class KMSUtil {
     }
     return keyProvider;
   }
+
+  /**
+   * Creates a key provider from token service field, which must be URI format.
+   *
+   * @param conf
+   * @param tokenServiceValue
+   * @return new KeyProvider or null
+   * @throws IOException
+   */
+  public static KeyProvider createKeyProviderFromTokenService(
+      final Configuration conf, final String tokenServiceValue)
+      throws IOException {
+    LOG.debug("Creating key provider from token service value {}. ",
+        tokenServiceValue);
+    final KeyProvider kp = KMSUtilFaultInjector.get()
+        .createKeyProviderForTests(tokenServiceValue, conf);
+    if (kp != null) {
+      LOG.info("KeyProvider is created with uri: {}. This should happen only "
+          + "in tests.", tokenServiceValue);
+      return kp;
+    }
+    if (!tokenServiceValue.contains("://")) {
+      throw new IllegalArgumentException(
+          "Invalid token service " + tokenServiceValue);
+    }
+    final URI tokenServiceUri;
+    try {
+      tokenServiceUri = new URI(tokenServiceValue);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(
+          "Invalid token service " + tokenServiceValue, e);
+    }
+    return createKeyProviderFromUri(conf, tokenServiceUri);
+  }
 }

+ 49 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java

@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+
+import java.io.IOException;
+
+/**
+ * Used for returning custom KeyProvider from test methods.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class KMSUtilFaultInjector {
+  private static KMSUtilFaultInjector instance = new KMSUtilFaultInjector();
+
+  public static KMSUtilFaultInjector get() {
+    return instance;
+  }
+
+  public static void set(KMSUtilFaultInjector injector) {
+    instance = injector;
+  }
+
+  public KeyProvider createKeyProviderForTests(String value, Configuration conf)
+      throws IOException {
+    return null;
+  }
+}

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier

@@ -12,3 +12,4 @@
 #   limitations under the License.
 #
 org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSDelegationTokenIdentifier
+org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSLegacyDelegationTokenIdentifier

+ 2 - 1
hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer

@@ -11,4 +11,5 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 #
-org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer
+org.apache.hadoop.crypto.key.kms.KMSTokenRenewer
+org.apache.hadoop.crypto.key.kms.KMSLegacyTokenRenewer

+ 20 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2182,6 +2182,26 @@
   </description>
 </property>
 
+<property>
+  <name>hadoop.security.kms.client.copy.legacy.token</name>
+  <value>true</value>
+  <description>
+    Expert only. Whether the KMS client provider should copy a token to legacy
+    kind. This is for KMS_DELEGATION_TOKEN to be backwards compatible. With the
+    default value set to true, the client will locally duplicate the
+    KMS_DELEGATION_TOKEN token and create a kms-dt token, with the service field
+    conforming to kms-dt. All other parts of the token remain the same.
+    Then the new clients will use KMS_DELEGATION_TOKEN and old clients will
+    use kms-dt to authenticate. Default value is true.
+    You should only change this to false if you know all the KMS servers
+    , clients (including both job submitters and job runners) and the
+    token renewers (usually Yarn RM) are on a version that supports
+    KMS_DELEGATION_TOKEN.
+    Turning this off prematurely may result in old clients failing to
+    authenticate with new servers.
+  </description>
+</property>
+
 <property>
   <name>hadoop.security.kms.client.failover.sleep.max.millis</name>
   <value>2000</value>

+ 166 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java

@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URL;
+
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for {@link KMSClientProvider} class.
+ */
+public class TestKMSClientProvider {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestKMSClientProvider.class);
+
+  private final Token token = new Token();
+  private final Token legacyToken = new Token();
+  private final String uriString = "kms://https@host:16000/kms";
+  private final String legacyTokenService = "host:16000";
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(30000);
+
+  {
+    GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
+  }
+
+  @Before
+  public void setup() {
+    SecurityUtil.setTokenServiceUseIp(false);
+    token.setKind(TOKEN_KIND);
+    token.setService(new Text(uriString));
+    legacyToken.setKind(TOKEN_LEGACY_KIND);
+    legacyToken.setService(new Text(legacyTokenService));
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testNotCopyFromLegacyToken() throws Exception {
+    final DelegationTokenAuthenticatedURL url =
+        mock(DelegationTokenAuthenticatedURL.class);
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      final KMSClientProvider spyKp = spy(kp);
+      when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
+      when(url.getDelegationToken(any(URL.class),
+          any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
+          any(String.class))).thenReturn(legacyToken);
+
+      final Credentials creds = new Credentials();
+      final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
+      LOG.info("Got tokens: {}", tokens);
+      assertEquals(1, tokens.length);
+      LOG.info("uri:" + uriString);
+      // if KMS server returned a legacy token, new client should leave the
+      // service being legacy and not set uri string
+      assertEquals(legacyTokenService, tokens[0].getService().toString());
+    } finally {
+      kp.close();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testCopyFromToken() throws Exception {
+    final DelegationTokenAuthenticatedURL url =
+        mock(DelegationTokenAuthenticatedURL.class);
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      final KMSClientProvider spyKp = spy(kp);
+      when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
+      when(url.getDelegationToken(any(URL.class),
+          any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
+          any(String.class))).thenReturn(token);
+
+      final Credentials creds = new Credentials();
+      final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
+      LOG.info("Got tokens: {}", tokens);
+      assertEquals(2, tokens.length);
+      assertTrue(creds.getAllTokens().contains(token));
+      assertNotNull(creds.getToken(legacyToken.getService()));
+    } finally {
+      kp.close();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSelectTokenWhenBothExist() throws Exception {
+    final Credentials creds = new Credentials();
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      creds.addToken(token.getService(), token);
+      creds.addToken(legacyToken.getService(), legacyToken);
+      Token t = kp.selectKMSDelegationToken(creds);
+      assertEquals(token, t);
+    } finally {
+      kp.close();
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testSelectTokenLegacyService() throws Exception {
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      Text legacyService = new Text(legacyTokenService);
+      token.setService(legacyService);
+      final Credentials creds = new Credentials();
+      creds.addToken(legacyService, token);
+      Token t = kp.selectKMSDelegationToken(creds);
+      assertEquals(token, t);
+    } finally {
+      kp.close();
+    }
+  }
+}

+ 51 - 16
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java

@@ -42,7 +42,8 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.junit.After;
+import org.apache.hadoop.util.KMSUtil;
+import org.apache.hadoop.util.KMSUtilFaultInjector;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -56,33 +57,68 @@ public class TestLoadBalancingKMSClientProvider {
     SecurityUtil.setTokenServiceUseIp(false);
   }
 
-  @After
-  public void teardown() throws IOException {
-    KMSClientProvider.fallbackDefaultPortForTesting = false;
+  private void setKMSUtilFaultInjector() {
+    KMSUtilFaultInjector injector = new KMSUtilFaultInjector() {
+      @Override
+      public KeyProvider createKeyProviderForTests(
+          String value, Configuration conf) throws IOException {
+        return TestLoadBalancingKMSClientProvider
+            .createKeyProviderForTests(value, conf);
+      }
+    };
+    KMSUtilFaultInjector.set(injector);
+  }
+
+  public static KeyProvider createKeyProviderForTests(
+      String value, Configuration conf) throws IOException {
+    // The syntax for kms servers will be
+    // kms://http@localhost:port1/kms,kms://http@localhost:port2/kms
+    if (!value.contains(",")) {
+      return null;
+    }
+    String[] keyProviderUrisStr = value.split(",");
+    KMSClientProvider[] keyProviderArr =
+        new KMSClientProvider[keyProviderUrisStr.length];
+
+    int i = 0;
+    for (String keyProviderUri: keyProviderUrisStr) {
+      KMSClientProvider kmcp =
+          new KMSClientProvider(URI.create(keyProviderUri), conf, URI
+              .create(value));
+      keyProviderArr[i] = kmcp;
+      i++;
+    }
+    LoadBalancingKMSClientProvider lbkcp =
+        new LoadBalancingKMSClientProvider(keyProviderArr, conf);
+    return lbkcp;
   }
 
   @Test
   public void testCreation() throws Exception {
     Configuration conf = new Configuration();
-    KMSClientProvider.fallbackDefaultPortForTesting = true;
     KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
-        "kms://http@host1/kms/foo"), conf);
+        "kms://http@host1:9600/kms/foo"), conf);
     assertTrue(kp instanceof LoadBalancingKMSClientProvider);
     KMSClientProvider[] providers =
         ((LoadBalancingKMSClientProvider) kp).getProviders();
     assertEquals(1, providers.length);
-    assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/"),
+    assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/"),
         Sets.newHashSet(providers[0].getKMSUrl()));
-
-    kp = new KMSClientProvider.Factory().createProvider(new URI(
-        "kms://http@host1;host2;host3/kms/foo"), conf);
+    setKMSUtilFaultInjector();
+    String uriStr = "kms://http@host1:9600/kms/foo," +
+        "kms://http@host2:9600/kms/foo," +
+        "kms://http@host3:9600/kms/foo";
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        uriStr);
+    kp = KMSUtil.createKeyProvider(conf, CommonConfigurationKeysPublic
+        .HADOOP_SECURITY_KEY_PROVIDER_PATH);
     assertTrue(kp instanceof LoadBalancingKMSClientProvider);
     providers =
         ((LoadBalancingKMSClientProvider) kp).getProviders();
     assertEquals(3, providers.length);
-    assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/",
-        "http://host2/kms/foo/v1/",
-        "http://host3/kms/foo/v1/"),
+    assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/",
+        "http://host2:9600/kms/foo/v1/",
+        "http://host3:9600/kms/foo/v1/"),
         Sets.newHashSet(providers[0].getKMSUrl(),
             providers[1].getKMSUrl(),
             providers[2].getKMSUrl()));
@@ -208,7 +244,7 @@ public class TestLoadBalancingKMSClientProvider {
 
   private class MyKMSClientProvider extends KMSClientProvider {
     public MyKMSClientProvider(URI uri, Configuration conf) throws IOException {
-      super(uri, conf);
+      super(uri, conf, uri);
     }
 
     @Override
@@ -245,9 +281,8 @@ public class TestLoadBalancingKMSClientProvider {
   @Test
   public void testClassCastException() throws Exception {
     Configuration conf = new Configuration();
-    KMSClientProvider.fallbackDefaultPortForTesting = true;
     KMSClientProvider p1 = new MyKMSClientProvider(
-        new URI("kms://http@host1/kms/foo"), conf);
+        new URI("kms://http@host1:9600/kms/foo"), conf);
     LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
         new KMSClientProvider[] {p1}, 0, conf);
     try {

+ 65 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link KMSUtil}.
+ */
+public class TestKMSUtil {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestKMSUtil.class);
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(90000);
+
+  @Test
+  public void testCreateKeyProviderFromTokenService() throws Exception {
+    final Configuration conf = new Configuration();
+    KeyProvider kp = KMSUtil.createKeyProviderFromTokenService(conf,
+        "kms://https@localhost:9600/kms");
+    assertNotNull(kp);
+    kp.close();
+
+    kp = KMSUtil.createKeyProviderFromTokenService(conf,
+        "kms://https@localhost:9600/kms,kms://localhost1:9600/kms");
+    assertNotNull(kp);
+    kp.close();
+
+    String invalidService = "whatever:9600";
+    try {
+      KMSUtil.createKeyProviderFromTokenService(conf, invalidService);
+    } catch (Exception ex) {
+      LOG.info("Expected exception:", ex);
+      assertTrue(ex instanceof IllegalArgumentException);
+      GenericTestUtils.assertExceptionContains(
+          "Invalid token service " + invalidService, ex);
+    }
+  }
+}

+ 431 - 76
hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java

@@ -1,3 +1,4 @@
+
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -30,20 +31,28 @@ import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension.EncryptedKeyVersi
 import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
 import org.apache.hadoop.crypto.key.kms.KMSClientProvider;
 import org.apache.hadoop.crypto.key.kms.KMSDelegationToken;
+import org.apache.hadoop.crypto.key.kms.KMSTokenRenewer;
 import org.apache.hadoop.crypto.key.kms.LoadBalancingKMSClientProvider;
+import org.apache.hadoop.crypto.key.kms.TestLoadBalancingKMSClientProvider;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticationHandler;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.KMSUtil;
+import org.apache.hadoop.util.KMSUtilFaultInjector;
 import org.apache.hadoop.util.Time;
 import org.apache.log4j.Level;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -63,7 +72,6 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Writer;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketTimeoutException;
 import java.net.URI;
@@ -81,17 +89,46 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH;
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 public class TestKMS {
   private static final Logger LOG = LoggerFactory.getLogger(TestKMS.class);
 
   private static final String SSL_RELOADER_THREAD_NAME =
       "Truststore reloader thread";
 
+  private final KMSUtilFaultInjector oldInjector =
+      KMSUtilFaultInjector.get();
+
+  // Injector to create providers with different ports. Can only happen in tests
+  private final KMSUtilFaultInjector testInjector =
+      new KMSUtilFaultInjector() {
+        @Override
+        public KeyProvider createKeyProviderForTests(String value,
+            Configuration conf) throws IOException {
+          return TestLoadBalancingKMSClientProvider
+              .createKeyProviderForTests(value, conf);
+        }
+      };
+
   @Rule
   public final Timeout testTimeout = new Timeout(180000);
 
   @Before
-  public void cleanUp() {
+  public void setUp() throws Exception {
+    GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
+    GenericTestUtils
+        .setLogLevel(DelegationTokenAuthenticationHandler.LOG, Level.TRACE);
+    GenericTestUtils
+        .setLogLevel(DelegationTokenAuthenticator.LOG, Level.TRACE);
+    GenericTestUtils.setLogLevel(KMSUtil.LOG, Level.TRACE);
     // resetting kerberos security
     Configuration conf = new Configuration();
     UserGroupInformation.setConfiguration(conf);
@@ -111,17 +148,71 @@ public class TestKMS {
   }
 
   public static abstract class KMSCallable<T> implements Callable<T> {
-    private URL kmsUrl;
+    private List<URL> kmsUrl;
 
     protected URL getKMSUrl() {
-      return kmsUrl;
+      return kmsUrl.get(0);
+    }
+
+    protected URL[] getKMSHAUrl() {
+      URL[] urls = new URL[kmsUrl.size()];
+      return kmsUrl.toArray(urls);
+    }
+
+    protected void addKMSUrl(URL url) {
+      if (kmsUrl == null) {
+        kmsUrl = new ArrayList<URL>();
+      }
+      kmsUrl.add(url);
+    }
+
+    /*
+     * The format of the returned value will be
+     * kms://http:kms1.example1.com:port1,kms://http:kms2.example2.com:port2
+     */
+    protected String generateLoadBalancingKeyProviderUriString() {
+      if (kmsUrl == null || kmsUrl.size() == 0) {
+        return null;
+      }
+      StringBuffer sb = new StringBuffer();
+
+      for (int i = 0; i < kmsUrl.size(); i++) {
+        sb.append(KMSClientProvider.SCHEME_NAME + "://" +
+            kmsUrl.get(0).getProtocol() + "@");
+        URL url = kmsUrl.get(i);
+        sb.append(url.getAuthority());
+        if (url.getPath() != null) {
+          sb.append(url.getPath());
+        }
+        if (i < kmsUrl.size() - 1) {
+          sb.append(",");
+        }
+      }
+      return sb.toString();
     }
   }
 
   protected KeyProvider createProvider(URI uri, Configuration conf)
       throws IOException {
     return new LoadBalancingKMSClientProvider(
-        new KMSClientProvider[] { new KMSClientProvider(uri, conf) }, conf);
+        new KMSClientProvider[] {new KMSClientProvider(uri, conf, uri)}, conf);
+  }
+
+  /**
+   * create a LoadBalancingKMSClientProvider from an array of URIs.
+   * @param uris an array of KMS URIs
+   * @param conf configuration object
+   * @return a LoadBalancingKMSClientProvider object
+   * @throws IOException
+   */
+  protected LoadBalancingKMSClientProvider createHAProvider(URI[] uris,
+      Configuration conf, String originalUri) throws IOException {
+    KMSClientProvider[] providers = new KMSClientProvider[uris.length];
+    for (int i = 0; i < providers.length; i++) {
+      providers[i] =
+          new KMSClientProvider(uris[i], conf, URI.create(originalUri));
+    }
+    return new LoadBalancingKMSClientProvider(providers, conf);
   }
 
   protected <T> T runServer(String keystore, String password, File confDir,
@@ -131,22 +222,33 @@ public class TestKMS {
 
   protected <T> T runServer(int port, String keystore, String password, File confDir,
       KMSCallable<T> callable) throws Exception {
+    return runServer(new int[] {port}, keystore, password, confDir, callable);
+  }
+
+  protected <T> T runServer(int[] ports, String keystore, String password,
+      File confDir, KMSCallable<T> callable) throws Exception {
     MiniKMS.Builder miniKMSBuilder = new MiniKMS.Builder().setKmsConfDir(confDir)
         .setLog4jConfFile("log4j.properties");
     if (keystore != null) {
       miniKMSBuilder.setSslConf(new File(keystore), password);
     }
-    if (port > 0) {
-      miniKMSBuilder.setPort(port);
+    final List<MiniKMS> kmsList = new ArrayList<>();
+    for (int i=0; i< ports.length; i++) {
+      if (ports[i] > 0) {
+        miniKMSBuilder.setPort(ports[i]);
+      }
+      MiniKMS miniKMS = miniKMSBuilder.build();
+      kmsList.add(miniKMS);
+      miniKMS.start();
+      LOG.info("Test KMS running at: " + miniKMS.getKMSUrl());
+      callable.addKMSUrl(miniKMS.getKMSUrl());
     }
-    MiniKMS miniKMS = miniKMSBuilder.build();
-    miniKMS.start();
     try {
-      System.out.println("Test KMS running at: " + miniKMS.getKMSUrl());
-      callable.kmsUrl = miniKMS.getKMSUrl();
       return callable.call();
     } finally {
-      miniKMS.stop();
+      for (MiniKMS miniKMS: kmsList) {
+        miniKMS.stop();
+      }
     }
   }
 
@@ -201,6 +303,13 @@ public class TestKMS {
     return new URI("kms://" + str);
   }
 
+  public static URI[] createKMSHAUri(URL[] kmsUrls) throws Exception {
+    URI[] uris = new URI[kmsUrls.length];
+    for (int i = 0; i < kmsUrls.length; i++) {
+      uris[i] = createKMSUri(kmsUrls[i]);
+    }
+    return uris;
+  }
 
   private static class KerberosConfiguration
       extends javax.security.auth.login.Configuration {
@@ -278,13 +387,19 @@ public class TestKMS {
         principals.toArray(new String[principals.size()]));
   }
 
+  @After
+  public void tearDown() throws Exception {
+    UserGroupInformation.setShouldRenewImmediatelyForTests(false);
+    UserGroupInformation.reset();
+    KMSUtilFaultInjector.set(oldInjector);
+  }
+
   @AfterClass
-  public static void tearDownMiniKdc() throws Exception {
+  public static void shutdownMiniKdc() {
     if (kdc != null) {
       kdc.stop();
+      kdc = null;
     }
-    UserGroupInformation.setShouldRenewImmediatelyForTests(false);
-    UserGroupInformation.reset();
   }
 
   private <T> T doAs(String user, final PrivilegedExceptionAction<T> action)
@@ -379,8 +494,9 @@ public class TestKMS {
                 Token<?>[] tokens =
                     ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
                     .addDelegationTokens("myuser", new Credentials());
-                Assert.assertEquals(1, tokens.length);
-                Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
+                assertEquals(2, tokens.length);
+                assertEquals(KMSDelegationToken.TOKEN_KIND,
+                    tokens[0].getKind());
                 kp.close();
                 return null;
               }
@@ -395,8 +511,8 @@ public class TestKMS {
           Token<?>[] tokens =
               ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)kp)
               .addDelegationTokens("myuser", new Credentials());
-          Assert.assertEquals(1, tokens.length);
-          Assert.assertEquals("kms-dt", tokens[0].getKind().toString());
+          assertEquals(2, tokens.length);
+          assertEquals(KMSDelegationToken.TOKEN_KIND, tokens[0].getKind());
           kp.close();
         }
         return null;
@@ -1737,7 +1853,6 @@ public class TestKMS {
             return null;
           }
         });
-
         nonKerberosUgi.addCredentials(credentials);
 
         try {
@@ -1793,6 +1908,17 @@ public class TestKMS {
     testDelegationTokensOps(true, true);
   }
 
+  private Text getTokenService(KeyProvider provider) throws IOException {
+    assertTrue("KeyProvider should be an instance of KMSClientProvider",
+        (provider instanceof LoadBalancingKMSClientProvider));
+    assertEquals("Num client providers should be 1", 1,
+        ((LoadBalancingKMSClientProvider)provider).getProviders().length);
+    Text tokenService =
+        (((LoadBalancingKMSClientProvider)provider).getProviders()[0])
+        .getDelegationTokenService();
+    return tokenService;
+  }
+
   private void testDelegationTokensOps(final boolean ssl, final boolean kerb)
       throws Exception {
     final File confDir = getTestDir();
@@ -1824,11 +1950,16 @@ public class TestKMS {
         final URI uri = createKMSUri(getKMSUrl());
         clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
             createKMSUri(getKMSUrl()).toString());
+        clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
 
         doAs("client", new PrivilegedExceptionAction<Void>() {
           @Override
           public Void run() throws Exception {
             KeyProvider kp = createProvider(uri, clientConf);
+            // Unset the conf value for key provider path just to be sure that
+            // the key provider created for renew and cancel token is from
+            // token service field.
+            clientConf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
             // test delegation token retrieval
             KeyProviderDelegationTokenExtension kpdte =
                 KeyProviderDelegationTokenExtension.
@@ -1836,13 +1967,10 @@ public class TestKMS {
             final Credentials credentials = new Credentials();
             final Token<?>[] tokens =
                 kpdte.addDelegationTokens("client1", credentials);
-            Assert.assertEquals(1, credentials.getAllTokens().size());
-            InetSocketAddress kmsAddr =
-                new InetSocketAddress(getKMSUrl().getHost(),
-                    getKMSUrl().getPort());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                credentials.getToken(SecurityUtil.buildTokenService(kmsAddr)).
-                    getKind());
+            Text tokenService = getTokenService(kp);
+            assertEquals(1, credentials.getAllTokens().size());
+            assertEquals(TOKEN_KIND,
+                credentials.getToken(tokenService).getKind());
 
             // Test non-renewer user cannot renew.
             for (Token<?> token : tokens) {
@@ -1970,12 +2098,11 @@ public class TestKMS {
         final URI uri = createKMSUri(getKMSUrl());
         clientConf.set(KeyProviderFactory.KEY_PROVIDER_PATH,
             createKMSUri(getKMSUrl()).toString());
+        clientConf.setBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY, false);
         final KeyProvider kp = createProvider(uri, clientConf);
         final KeyProviderDelegationTokenExtension kpdte =
             KeyProviderDelegationTokenExtension.
                 createKeyProviderDelegationTokenExtension(kp);
-        final InetSocketAddress kmsAddr =
-            new InetSocketAddress(getKMSUrl().getHost(), getKMSUrl().getPort());
 
         // Job 1 (e.g. Yarn log aggregation job), with user DT.
         final Collection<Token<?>> job1Token = new HashSet<>();
@@ -1985,16 +2112,17 @@ public class TestKMS {
             // Get a DT and use it.
             final Credentials credentials = new Credentials();
             kpdte.addDelegationTokens("client", credentials);
+            Text tokenService = getTokenService(kp);
             Assert.assertEquals(1, credentials.getAllTokens().size());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND, credentials.
-                getToken(SecurityUtil.buildTokenService(kmsAddr)).getKind());
+
             UserGroupInformation.getCurrentUser().addCredentials(credentials);
             LOG.info("Added kms dt to credentials: {}", UserGroupInformation.
                 getCurrentUser().getCredentials().getAllTokens());
-            Token<?> token =
+            final Token<?> token =
                 UserGroupInformation.getCurrentUser().getCredentials()
-                    .getToken(SecurityUtil.buildTokenService(kmsAddr));
-            Assert.assertNotNull(token);
+                    .getToken(tokenService);
+            assertNotNull(token);
+            assertEquals(TOKEN_KIND, token.getKind());
             job1Token.add(token);
 
             // Decode the token to get max time.
@@ -2029,17 +2157,16 @@ public class TestKMS {
             // Get a new DT, but don't use it yet.
             final Credentials newCreds = new Credentials();
             kpdte.addDelegationTokens("client", newCreds);
-            Assert.assertEquals(1, newCreds.getAllTokens().size());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
-                    getKind());
+            assertEquals(1, newCreds.getAllTokens().size());
+            final Text tokenService = getTokenService(kp);
+            assertEquals(TOKEN_KIND,
+                newCreds.getToken(tokenService).getKind());
 
             // Using job 1's DT should fail.
             final Credentials oldCreds = new Credentials();
             for (Token<?> token : job1Token) {
-              if (token.getKind().equals(KMSDelegationToken.TOKEN_KIND)) {
-                oldCreds
-                    .addToken(SecurityUtil.buildTokenService(kmsAddr), token);
+              if (token.getKind().equals(TOKEN_KIND)) {
+                oldCreds.addToken(tokenService, token);
               }
             }
             UserGroupInformation.getCurrentUser().addCredentials(oldCreds);
@@ -2053,12 +2180,11 @@ public class TestKMS {
             }
 
             // Using the new DT should succeed.
-            Assert.assertEquals(1, newCreds.getAllTokens().size());
-            Assert.assertEquals(KMSDelegationToken.TOKEN_KIND,
-                newCreds.getToken(SecurityUtil.buildTokenService(kmsAddr)).
-                    getKind());
+            assertEquals(1, newCreds.getAllTokens().size());
+            assertEquals(TOKEN_KIND,
+                newCreds.getToken(tokenService).getKind());
             UserGroupInformation.getCurrentUser().addCredentials(newCreds);
-            LOG.info("Credetials now are: {}", UserGroupInformation
+            LOG.info("Credentials now are: {}", UserGroupInformation
                 .getCurrentUser().getCredentials().getAllTokens());
             kp.getKeys();
             return null;
@@ -2084,7 +2210,13 @@ public class TestKMS {
     doKMSWithZK(true, true);
   }
 
-  public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+  private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+      KMSCallable<T> callable) throws Exception {
+    return runServerWithZooKeeper(zkDTSM, zkSigner, callable, 1);
+  }
+
+  private <T> T runServerWithZooKeeper(boolean zkDTSM, boolean zkSigner,
+      KMSCallable<T> callable, int kmsSize) throws Exception {
     TestingServer zkServer = null;
     try {
       zkServer = new TestingServer();
@@ -2130,43 +2262,266 @@ public class TestKMS {
 
       writeConf(testDir, conf);
 
-      KMSCallable<KeyProvider> c =
-          new KMSCallable<KeyProvider>() {
-        @Override
-        public KeyProvider call() throws Exception {
-          final Configuration conf = new Configuration();
-          conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
-          final URI uri = createKMSUri(getKMSUrl());
-
-          final KeyProvider kp =
-              doAs("SET_KEY_MATERIAL",
-                  new PrivilegedExceptionAction<KeyProvider>() {
-                    @Override
-                    public KeyProvider run() throws Exception {
-                      KeyProvider kp = createProvider(uri, conf);
-                          kp.createKey("k1", new byte[16],
-                              new KeyProvider.Options(conf));
-                          kp.createKey("k2", new byte[16],
-                              new KeyProvider.Options(conf));
-                          kp.createKey("k3", new byte[16],
-                              new KeyProvider.Options(conf));
-                      return kp;
-                    }
-                  });
-          return kp;
-        }
-      };
-
-      runServer(null, null, testDir, c);
+      int[] ports = new int[kmsSize];
+      for (int i = 0; i < ports.length; i++) {
+        ports[i] = -1;
+      }
+      return runServer(ports, null, null, testDir, callable);
     } finally {
       if (zkServer != null) {
         zkServer.stop();
         zkServer.close();
       }
     }
+  }
+
+  public void doKMSWithZK(boolean zkDTSM, boolean zkSigner) throws Exception {
+    KMSCallable<KeyProvider> c =
+        new KMSCallable<KeyProvider>() {
+          @Override
+          public KeyProvider call() throws Exception {
+            final Configuration conf = new Configuration();
+            conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+            final URI uri = createKMSUri(getKMSUrl());
+
+            final KeyProvider kp =
+                doAs("SET_KEY_MATERIAL",
+                    new PrivilegedExceptionAction<KeyProvider>() {
+                      @Override
+                      public KeyProvider run() throws Exception {
+                        KeyProvider kp = createProvider(uri, conf);
+                        kp.createKey("k1", new byte[16],
+                            new KeyProvider.Options(conf));
+                        kp.createKey("k2", new byte[16],
+                            new KeyProvider.Options(conf));
+                        kp.createKey("k3", new byte[16],
+                            new KeyProvider.Options(conf));
+                        return kp;
+                      }
+                    });
+            return kp;
+          }
+        };
+
+    runServerWithZooKeeper(zkDTSM, zkSigner, c);
+  }
+
+  @Test
+  public void doKMSHAZKWithDelegationTokenAccess() throws Exception {
+    KMSCallable<Void> c = new KMSCallable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final Configuration conf = new Configuration();
+        conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+        final URI[] uris = createKMSHAUri(getKMSHAUrl());
+        final Credentials credentials = new Credentials();
+        final String lbUri = generateLoadBalancingKeyProviderUriString();
+        final LoadBalancingKMSClientProvider lbkp =
+            createHAProvider(uris, conf, lbUri);
+        conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+        // Login as a Kerberos user principal using keytab.
+        // Connect to KMS to create a delegation token and add it to credentials
+        final String keyName = "k0";
+        doAs("SET_KEY_MATERIAL",
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                KeyProviderDelegationTokenExtension kpdte =
+                    KeyProviderDelegationTokenExtension.
+                        createKeyProviderDelegationTokenExtension(lbkp);
+                kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
+                kpdte.createKey(keyName, new KeyProvider.Options(conf));
+                return null;
+              }
+            });
+
+        assertTokenIdentifierEquals(credentials);
+
+        final LoadBalancingKMSClientProvider lbkp1 =
+            createHAProvider(uris, conf, lbUri);
+        // verify both tokens can be used to authenticate
+        for (Token t : credentials.getAllTokens()) {
+          assertTokenAccess(lbkp1, keyName, t);
+        }
+        return null;
+      }
+    };
+    runServerWithZooKeeper(true, true, c, 2);
+  }
+
+  /**
+   * Assert that the passed in credentials have 2 tokens, of kind
+   * {@link KMSDelegationToken#TOKEN_KIND} and
+   * {@link KMSDelegationToken#TOKEN_LEGACY_KIND}. Assert that the 2 tokens have
+   * the same identifier.
+   */
+  private void assertTokenIdentifierEquals(Credentials credentials)
+      throws IOException {
+    // verify the 2 tokens have the same identifier
+    assertEquals(2, credentials.getAllTokens().size());
+    Token token = null;
+    Token legacyToken = null;
+    for (Token t : credentials.getAllTokens()) {
+      if (KMSDelegationToken.TOKEN_KIND.equals(t.getKind())) {
+        token = t;
+      } else if (KMSDelegationToken.TOKEN_LEGACY_KIND.equals(t.getKind())) {
+        legacyToken = t;
+      }
+    }
+    assertNotNull(token);
+    assertNotNull(legacyToken);
+    final DelegationTokenIdentifier tokenId =
+        (DelegationTokenIdentifier) token.decodeIdentifier();
+    final DelegationTokenIdentifier legacyTokenId =
+        (DelegationTokenIdentifier) legacyToken.decodeIdentifier();
+    assertEquals("KMS DT and legacy dt should have identical identifier",
+        tokenId, legacyTokenId);
+  }
+
+  /**
+   * Tests token access with each providers in the
+   * {@link LoadBalancingKMSClientProvider}. This is to make sure the 2 token
+   * kinds are compatible and can both be used to authenticate.
+   */
+  @SuppressWarnings("unchecked")
+  private void assertTokenAccess(final LoadBalancingKMSClientProvider lbkp,
+      final String keyName, final Token token) throws Exception {
+    UserGroupInformation tokenUgi =
+        UserGroupInformation.createUserForTesting("test", new String[] {});
+    // Verify the tokens can authenticate to any KMS
+    tokenUgi.addToken(token);
+    tokenUgi.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        // Create a kms client with one provider at a time. Must use one
+        // provider so that if it fails to authenticate, it does not fall
+        // back to the next KMS instance.
+        // It should succeed because its delegation token can access any
+        // KMS instances.
+        for (KMSClientProvider provider : lbkp.getProviders()) {
+          if (token.getKind().equals(TOKEN_LEGACY_KIND) && !token.getService()
+              .equals(provider.getDelegationTokenService())) {
+            // Historically known issue: Legacy token can only work with the
+            // key provider specified in the token's Service
+            continue;
+          }
+          LOG.info("Rolling key {} via provider {} with token {}.", keyName,
+              provider, token);
+          provider.rollNewVersion(keyName);
+        }
+        return null;
+      }
+    });
+  }
+
+  @Test
+  public void testKMSHAZKDelegationTokenRenewCancel() throws Exception {
+    testKMSHAZKDelegationTokenRenewCancel(TOKEN_KIND);
+  }
+
+  @Test
+  public void testKMSHAZKDelegationTokenRenewCancelLegacy() throws Exception {
+    testKMSHAZKDelegationTokenRenewCancel(TOKEN_LEGACY_KIND);
+  }
+
+  private void testKMSHAZKDelegationTokenRenewCancel(final Text tokenKind)
+      throws Exception {
+    GenericTestUtils.setLogLevel(KMSTokenRenewer.LOG, Level.TRACE);
+    assertTrue(tokenKind == TOKEN_KIND || tokenKind == TOKEN_LEGACY_KIND);
+    KMSCallable<Void> c = new KMSCallable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        final Configuration conf = new Configuration();
+        final URI[] uris = createKMSHAUri(getKMSHAUrl());
+        final Credentials credentials = new Credentials();
+        // Create a UGI without Kerberos auth. It will be authenticated with
+        // delegation token.
+        final UserGroupInformation nonKerberosUgi =
+            UserGroupInformation.getCurrentUser();
+        final String lbUri = generateLoadBalancingKeyProviderUriString();
+        final LoadBalancingKMSClientProvider lbkp =
+            createHAProvider(uris, conf, lbUri);
+        conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+        // Login as a Kerberos user principal using keytab.
+        // Connect to KMS to create a delegation token and add it to credentials
+        doAs("SET_KEY_MATERIAL",
+            new PrivilegedExceptionAction<Void>() {
+              @Override
+              public Void run() throws Exception {
+                KeyProviderDelegationTokenExtension kpdte =
+                    KeyProviderDelegationTokenExtension.
+                        createKeyProviderDelegationTokenExtension(lbkp);
+                kpdte.addDelegationTokens("SET_KEY_MATERIAL", credentials);
+                return null;
+              }
+            });
 
+        // Test token renewal and cancellation
+        final Collection<Token<? extends TokenIdentifier>> tokens =
+            credentials.getAllTokens();
+        doAs("SET_KEY_MATERIAL", new PrivilegedExceptionAction<Void>() {
+          @Override
+          public Void run() throws Exception {
+            Assert.assertEquals(2, tokens.size());
+            boolean tokenFound = false;
+            for (Token token : tokens) {
+              if (!tokenKind.equals(token.getKind())) {
+                continue;
+              } else {
+                tokenFound = true;
+              }
+              KMSUtilFaultInjector.set(testInjector);
+              setupConfForToken(token.getKind(), conf, lbUri);
+
+              LOG.info("Testing token: {}", token);
+              long tokenLife = token.renew(conf);
+              LOG.info("Renewed token {}, new lifetime:{}", token, tokenLife);
+              Thread.sleep(10);
+              long newTokenLife = token.renew(conf);
+              LOG.info("Renewed token {}, new lifetime:{}", token,
+                  newTokenLife);
+              assertTrue(newTokenLife > tokenLife);
+
+              boolean canceled = false;
+              // test delegation token cancellation
+              if (!canceled) {
+                token.cancel(conf);
+                LOG.info("Cancelled token {}", token);
+                canceled = true;
+              }
+              assertTrue("token should have been canceled", canceled);
+              try {
+                token.renew(conf);
+                fail("should not be able to renew a canceled token " + token);
+              } catch (Exception e) {
+                LOG.info("Expected exception when renewing token", e);
+              }
+            }
+            assertTrue("Should have found token kind " + tokenKind + " from "
+                + tokens, tokenFound);
+            return null;
+          }
+        });
+        return null;
+      }
+    };
+    runServerWithZooKeeper(true, true, c, 2);
   }
 
+  /**
+   * Set or unset the key provider configuration based on token kind.
+   */
+  private void setupConfForToken(Text tokenKind, Configuration conf,
+      String lbUri) {
+    if (tokenKind.equals(TOKEN_KIND)) {
+      conf.unset(HADOOP_SECURITY_KEY_PROVIDER_PATH);
+    } else {
+      // conf is only required for legacy tokens to create provider,
+      // new tokens create provider by parsing its own Service field
+      assertEquals(TOKEN_LEGACY_KIND, tokenKind);
+      conf.set(HADOOP_SECURITY_KEY_PROVIDER_PATH, lbUri);
+    }
+  }
 
   @Test
   public void testProxyUserKerb() throws Exception {