فهرست منبع

HADOOP-11151. Automatically refresh auth token and retry on auth failure. Contributed by Arun Suresh.

(cherry picked from commit 2d8e6e2c4a52a4ba815b23d6d1ac21be4df23d9e)
(cherry picked from commit 9ebff016c2de3410622e6da3e9963adbaa894c1a)
Andrew Wang 10 سال پیش
والد
کامیت
ac510c3610

+ 4 - 1
hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/client/AuthenticatedURL.java

@@ -250,7 +250,10 @@ public class AuthenticatedURL {
    * @throws AuthenticationException if an authentication exception occurred.
    */
   public static void extractToken(HttpURLConnection conn, Token token) throws IOException, AuthenticationException {
-    if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
+    int respCode = conn.getResponseCode();
+    if (respCode == HttpURLConnection.HTTP_OK
+        || respCode == HttpURLConnection.HTTP_CREATED
+        || respCode == HttpURLConnection.HTTP_ACCEPTED) {
       Map<String, List<String>> headers = conn.getHeaderFields();
       List<String> cookies = headers.get("Set-Cookie");
       if (cookies != null) {

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -433,6 +433,9 @@ Release 2.6.0 - UNRELEASED
     HADOOP-11160. Fix typo in nfs3 server duplicate entry reporting.
     (Charles Lamb via wheat9)
 
+    HADOOP-11151. Automatically refresh auth token and retry on auth failure.
+    (Arun Suresh via wang)
+
     BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
   
       HADOOP-10734. Implement high-performance secure random number sources.

+ 42 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -76,6 +77,8 @@ import com.google.common.base.Preconditions;
 public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     KeyProviderDelegationTokenExtension.DelegationTokenExtension {
 
+  private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous requests are disallowed";
+
   public static final String TOKEN_KIND = "kms-dt";
 
   public static final String SCHEME_NAME = "kms";
@@ -97,6 +100,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
   public static final String TIMEOUT_ATTR = CONFIG_PREFIX + "timeout";
   public static final int DEFAULT_TIMEOUT = 60;
 
+  /* Number of times to retry authentication in the event of auth failure
+   * (normally happens due to stale authToken) 
+   */
+  public static final String AUTH_RETRY = CONFIG_PREFIX
+      + "authentication.retry-count";
+  public static final int DEFAULT_AUTH_RETRY = 1;
+
   private final ValueQueue<EncryptedKeyVersion> encKeyVersionQueue;
 
   private class EncryptedQueueRefiller implements
@@ -238,6 +248,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
   private ConnectionConfigurator configurator;
   private DelegationTokenAuthenticatedURL.Token authToken;
   private UserGroupInformation loginUgi;
+  private final int authRetry;
 
   @Override
   public String toString() {
@@ -296,6 +307,7 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
       }
     }
     int timeout = conf.getInt(TIMEOUT_ATTR, DEFAULT_TIMEOUT);
+    authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
     configurator = new TimeoutConnConfigurator(timeout, sslFactory);
     encKeyVersionQueue =
         new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
@@ -416,7 +428,12 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
   }
 
   private <T> T call(HttpURLConnection conn, Map jsonOutput,
-      int expectedResponse, Class<T> klass)
+      int expectedResponse, Class<T> klass) throws IOException {
+    return call(conn, jsonOutput, expectedResponse, klass, authRetry);
+  }
+
+  private <T> T call(HttpURLConnection conn, Map jsonOutput,
+      int expectedResponse, Class<T> klass, int authRetryCount)
       throws IOException {
     T ret = null;
     try {
@@ -427,13 +444,36 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension,
       conn.getInputStream().close();
       throw ex;
     }
-    if (conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN) {
+    if ((conn.getResponseCode() == HttpURLConnection.HTTP_FORBIDDEN
+        && conn.getResponseMessage().equals(ANONYMOUS_REQUESTS_DISALLOWED))
+        || conn.getResponseCode() == HttpURLConnection.HTTP_UNAUTHORIZED) {
       // Ideally, this should happen only when there is an Authentication
       // failure. Unfortunately, the AuthenticationFilter returns 403 when it
       // cannot authenticate (Since a 401 requires Server to send
       // WWW-Authenticate header as well)..
       KMSClientProvider.this.authToken =
           new DelegationTokenAuthenticatedURL.Token();
+      KMSClientProvider.this.loginUgi =
+          UserGroupInformation.getCurrentUser();
+      if (authRetryCount > 0) {
+        String contentType = conn.getRequestProperty(CONTENT_TYPE);
+        String requestMethod = conn.getRequestMethod();
+        URL url = conn.getURL();
+        conn = createConnection(url, requestMethod);
+        conn.setRequestProperty(CONTENT_TYPE, contentType);
+        return call(conn, jsonOutput, expectedResponse, klass,
+            authRetryCount - 1);
+      }
+    }
+    try {
+      AuthenticatedURL.extractToken(conn, authToken);
+    } catch (AuthenticationException e) {
+      // Ignore the AuthExceptions.. since we are just using the method to
+      // extract and set the authToken.. (Workaround till we actually fix
+      // AuthenticatedURL properly to set authToken post initialization)
+    } finally {
+      KMSClientProvider.this.loginUgi =
+          UserGroupInformation.getCurrentUser();
     }
     HttpExceptionUtils.validateResponse(conn, expectedResponse);
     if (APPLICATION_JSON_MIME.equalsIgnoreCase(conn.getContentType())

+ 7 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1599,6 +1599,13 @@ for ldap providers in the same way as above does.
 </property>
 
 <!--- KMSClientProvider configurations -->
+<property>
+  <name>hadoop.security.kms.client.authentication.retry-count</name>
+  <value>1</value>
+  <description>
+    Number of time to retry connecting to KMS on authentication failure
+  </description>
+</property>
 <property>
   <name>hadoop.security.kms.client.encrypted.key.cache.size</name>
   <value>500</value>

+ 117 - 10
hadoop-common-project/hadoop-kms/src/test/java/org/apache/hadoop/crypto/key/kms/server/TestKMS.java

@@ -32,8 +32,10 @@ import org.apache.hadoop.minikdc.MiniKdc;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -49,6 +51,8 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.Writer;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.net.ServerSocket;
@@ -791,12 +795,24 @@ public class TestKMS {
   }
 
   @Test
-  public void testKMSRestart() throws Exception {
+  public void testKMSRestartKerberosAuth() throws Exception {
+    doKMSRestart(true);
+  }
+
+  @Test
+  public void testKMSRestartSimpleAuth() throws Exception {
+    doKMSRestart(false);
+  }
+
+  public void doKMSRestart(boolean useKrb) throws Exception {
     Configuration conf = new Configuration();
     conf.set("hadoop.security.authentication", "kerberos");
     UserGroupInformation.setConfiguration(conf);
     final File testDir = getTestDir();
     conf = createBaseKMSConf(testDir);
+    if (useKrb) {
+      conf.set("hadoop.kms.authentication.type", "kerberos");
+    }
     conf.set("hadoop.kms.authentication.kerberos.keytab",
         keytab.getAbsolutePath());
     conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
@@ -855,15 +871,6 @@ public class TestKMS {
                 new PrivilegedExceptionAction<Void>() {
                   @Override
                   public Void run() throws Exception {
-                    try {
-                      retKp.createKey("k2", new byte[16],
-                          new KeyProvider.Options(conf));
-                      Assert.fail("Should fail first time !!");
-                    } catch (IOException e) {
-                      String message = e.getMessage();
-                      Assert.assertTrue("Should be a 403 error : " + message,
-                          message.contains("403"));
-                    }
                     retKp.createKey("k2", new byte[16],
                         new KeyProvider.Options(conf));
                     retKp.createKey("k3", new byte[16],
@@ -876,6 +883,106 @@ public class TestKMS {
         });
   }
 
+  @Test
+  public void testKMSAuthFailureRetry() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set("hadoop.security.authentication", "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    final File testDir = getTestDir();
+    conf = createBaseKMSConf(testDir);
+    conf.set("hadoop.kms.authentication.kerberos.keytab",
+        keytab.getAbsolutePath());
+    conf.set("hadoop.kms.authentication.kerberos.principal", "HTTP/localhost");
+    conf.set("hadoop.kms.authentication.kerberos.name.rules", "DEFAULT");
+
+    for (KMSACLs.Type type : KMSACLs.Type.values()) {
+      conf.set(type.getAclConfigKey(), type.toString());
+    }
+    conf.set(KMSACLs.Type.CREATE.getAclConfigKey(),
+        KMSACLs.Type.CREATE.toString() + ",SET_KEY_MATERIAL");
+
+    conf.set(KMSACLs.Type.ROLLOVER.getAclConfigKey(),
+        KMSACLs.Type.ROLLOVER.toString() + ",SET_KEY_MATERIAL");
+
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k0.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k1.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k2.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k3.ALL", "*");
+    conf.set(KeyAuthorizationKeyProvider.KEY_ACL + "k4.ALL", "*");
+
+    writeConf(testDir, conf);
+
+    runServer(null, null, testDir,
+        new KMSCallable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            final Configuration conf = new Configuration();
+            conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+            final URI uri = createKMSUri(getKMSUrl());
+            doAs("SET_KEY_MATERIAL",
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws Exception {
+                    KMSClientProvider kp = new KMSClientProvider(uri, conf);
+                    kp.createKey("k1", new byte[16],
+                        new KeyProvider.Options(conf));
+                    makeAuthTokenStale(kp);
+                    kp.createKey("k2", new byte[16],
+                        new KeyProvider.Options(conf));
+                    return null;
+                  }
+                });
+            return null;
+          }
+        });
+
+    // Test retry count
+    runServer(null, null, testDir,
+        new KMSCallable<Void>() {
+          @Override
+          public Void call() throws Exception {
+            final Configuration conf = new Configuration();
+            conf.setInt(KeyProvider.DEFAULT_BITLENGTH_NAME, 128);
+            conf.setInt(KMSClientProvider.AUTH_RETRY, 0);
+            final URI uri = createKMSUri(getKMSUrl());
+            doAs("SET_KEY_MATERIAL",
+                new PrivilegedExceptionAction<Void>() {
+                  @Override
+                  public Void run() throws Exception {
+                    KMSClientProvider kp = new KMSClientProvider(uri, conf);
+                    kp.createKey("k3", new byte[16],
+                        new KeyProvider.Options(conf));
+                    makeAuthTokenStale(kp);
+                    try {
+                      kp.createKey("k4", new byte[16],
+                          new KeyProvider.Options(conf));
+                      Assert.fail("Shoud fail since retry count == 0");
+                    } catch (IOException e) {
+                      Assert.assertTrue(
+                          "HTTP exception must be a 403 : " + e.getMessage(), e
+                              .getMessage().contains("403"));
+                    }
+                    return null;
+                  }
+                });
+            return null;
+          }
+        });
+  }
+
+  private void makeAuthTokenStale(KMSClientProvider kp) throws Exception {
+    Field tokF = KMSClientProvider.class.getDeclaredField("authToken");
+    tokF.setAccessible(true);
+    DelegationTokenAuthenticatedURL.Token delToken =
+        (DelegationTokenAuthenticatedURL.Token) tokF.get(kp);
+    String oldTokStr = delToken.toString();
+    Method setM =
+        AuthenticatedURL.Token.class.getDeclaredMethod("set", String.class);
+    setM.setAccessible(true);
+    String newTokStr = oldTokStr.replaceAll("e=[^&]*", "e=1000");
+    setM.invoke(((AuthenticatedURL.Token)delToken), newTokStr);
+  }
+
   @Test
   public void testACLs() throws Exception {
     Configuration conf = new Configuration();