Sfoglia il codice sorgente

HADOOP-12548. Read s3a creds from a Credential Provider. Contributed by Larry McCay.

cnauroth 9 anni fa
parent
commit
76fab26c5c

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -766,6 +766,9 @@ Release 2.8.0 - UNRELEASED
     HADOOP-12426. Add Entry point for Kerberos health check
     (Steve Loughran via cnauroth)
 
+    HADOOP-12548. Read s3a creds from a Credential Provider.
+    (Larry McCay via cnauroth)
+
   IMPROVEMENTS
 
     HADOOP-12458. Retries is typoed to spell Retires in parts of

+ 152 - 60
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -106,23 +106,11 @@ public class S3AFileSystem extends FileSystem {
     workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
         this.getWorkingDirectory());
 
-    // Try to get our credentials or just connect anonymously
-    String accessKey = conf.get(ACCESS_KEY, null);
-    String secretKey = conf.get(SECRET_KEY, null);
-
-    String userInfo = name.getUserInfo();
-    if (userInfo != null) {
-      int index = userInfo.indexOf(':');
-      if (index != -1) {
-        accessKey = userInfo.substring(0, index);
-        secretKey = userInfo.substring(index + 1);
-      } else {
-        accessKey = userInfo;
-      }
-    }
+    AWSAccessKeys creds = getAWSAccessKeys(name, conf);
 
     AWSCredentialsProviderChain credentials = new AWSCredentialsProviderChain(
-        new BasicAWSCredentialsProvider(accessKey, secretKey),
+        new BasicAWSCredentialsProvider(
+            creds.getAccessKey(), creds.getAccessSecret()),
         new InstanceProfileCredentialsProvider(),
         new AnonymousAWSCredentialsProvider()
     );
@@ -146,6 +134,59 @@ public class S3AFileSystem extends FileSystem {
       awsConf.setSignerOverride(signerOverride);
     }
 
+    initProxySupport(conf, awsConf, secureConnections);
+
+    initAmazonS3Client(conf, credentials, awsConf);
+
+    maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
+    partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
+    multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
+      DEFAULT_MIN_MULTIPART_THRESHOLD);
+    enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
+
+    if (partSize < 5 * 1024 * 1024) {
+      LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
+      partSize = 5 * 1024 * 1024;
+    }
+
+    if (multiPartThreshold < 5 * 1024 * 1024) {
+      LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
+      multiPartThreshold = 5 * 1024 * 1024;
+    }
+
+    int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
+    if (maxThreads < 2) {
+      LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
+      maxThreads = 2;
+    }
+    int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
+    if (totalTasks < 1) {
+      LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
+      totalTasks = 1;
+    }
+    long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
+    threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
+        maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+        "s3a-transfer-shared");
+
+    initTransferManager();
+
+    initCannedAcls(conf);
+
+    if (!s3.doesBucketExist(bucket)) {
+      throw new IOException("Bucket " + bucket + " does not exist");
+    }
+
+    initMultipartUploads(conf);
+
+    serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
+
+    setConf(conf);
+  }
+
+  void initProxySupport(Configuration conf, ClientConfiguration awsConf,
+      boolean secureConnections) throws IllegalArgumentException,
+      IllegalArgumentException {
     String proxyHost = conf.getTrimmed(PROXY_HOST, "");
     int proxyPort = conf.getInt(PROXY_PORT, -1);
     if (!proxyHost.isEmpty()) {
@@ -185,7 +226,11 @@ public class S3AFileSystem extends FileSystem {
       LOG.error(msg);
       throw new IllegalArgumentException(msg);
     }
+  }
 
+  private void initAmazonS3Client(Configuration conf,
+      AWSCredentialsProviderChain credentials, ClientConfiguration awsConf)
+      throws IllegalArgumentException {
     s3 = new AmazonS3Client(credentials, awsConf);
     String endPoint = conf.getTrimmed(ENDPOINT,"");
     if (!endPoint.isEmpty()) {
@@ -197,56 +242,27 @@ public class S3AFileSystem extends FileSystem {
         throw new IllegalArgumentException(msg, e);
       }
     }
+  }
 
-    maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
-    partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
-      DEFAULT_MIN_MULTIPART_THRESHOLD);
-    enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
-
-    if (partSize < 5 * 1024 * 1024) {
-      LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
-      partSize = 5 * 1024 * 1024;
-    }
-
-    if (multiPartThreshold < 5 * 1024 * 1024) {
-      LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
-      multiPartThreshold = 5 * 1024 * 1024;
-    }
-
-    int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
-    if (maxThreads < 2) {
-      LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
-      maxThreads = 2;
-    }
-    int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
-    if (totalTasks < 1) {
-      LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
-      totalTasks = 1;
-    }
-    long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
-    threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
-        maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
-        "s3a-transfer-shared");
-
+  private void initTransferManager() {
     TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
     transferConfiguration.setMinimumUploadPartSize(partSize);
     transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
 
     transfers = new TransferManager(s3, threadPoolExecutor);
     transfers.setConfiguration(transferConfiguration);
+  }
 
+  private void initCannedAcls(Configuration conf) {
     String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
     if (!cannedACLName.isEmpty()) {
       cannedACL = CannedAccessControlList.valueOf(cannedACLName);
     } else {
       cannedACL = null;
     }
+  }
 
-    if (!s3.doesBucketExist(bucket)) {
-      throw new IOException("Bucket " + bucket + " does not exist");
-    }
-
+  private void initMultipartUploads(Configuration conf) {
     boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, 
       DEFAULT_PURGE_EXISTING_MULTIPART);
     long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE, 
@@ -257,10 +273,51 @@ public class S3AFileSystem extends FileSystem {
 
       transfers.abortMultipartUploads(bucket, purgeBefore);
     }
+  }
 
-    serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
-
-    setConf(conf);
+  /**
+   * Return the access key and secret for S3 API use.
+   * Credentials may exist in configuration, within credential providers
+   * or indicated in the UserInfo of the name URI param.
+   * @param name the URI for which we need the access keys.
+   * @param conf the Configuration object to interogate for keys.
+   * @return AWSAccessKeys
+   */
+  AWSAccessKeys getAWSAccessKeys(URI name, Configuration conf)
+      throws IOException {
+    String accessKey = null;
+    String secretKey = null;
+    String userInfo = name.getUserInfo();
+    if (userInfo != null) {
+      int index = userInfo.indexOf(':');
+      if (index != -1) {
+        accessKey = userInfo.substring(0, index);
+        secretKey = userInfo.substring(index + 1);
+      } else {
+        accessKey = userInfo;
+      }
+    }
+    if (accessKey == null) {
+      try {
+        final char[] key = conf.getPassword(ACCESS_KEY);
+        if (key != null) {
+          accessKey = (new String(key)).trim();
+        }
+      } catch(IOException ioe) {
+        throw new IOException("Cannot find AWS access key.", ioe);
+      }
+    }
+    if (secretKey == null) {
+      try {
+        final char[] pass = conf.getPassword(SECRET_KEY);
+        if (pass != null) {
+          secretKey = (new String(pass)).trim();
+        }
+      } catch(IOException ioe) {
+        throw new IOException("Cannot find AWS secret key.", ioe);
+      }
+    }
+    return new AWSAccessKeys(accessKey, secretKey);
   }
 
   /**
@@ -341,14 +398,14 @@ public class S3AFileSystem extends FileSystem {
    * Create an FSDataOutputStream at the indicated Path with write-progress
    * reporting.
    * @param f the file name to open
-   * @param permission
+   * @param permission the permission to set.
    * @param overwrite if a file with this name already exists, then if true,
    *   the file will be overwritten, and if false an error will be thrown.
    * @param bufferSize the size of the buffer to be used.
    * @param replication required block replication for the file.
-   * @param blockSize
-   * @param progress
-   * @throws IOException
+   * @param blockSize the requested block size.
+   * @param progress the progress reporter.
+   * @throws IOException in the event of IO related errors.
    * @see #setPermission(Path, FsPermission)
    */
   @Override
@@ -377,7 +434,7 @@ public class S3AFileSystem extends FileSystem {
    * @param f the existing file to be appended.
    * @param bufferSize the size of the buffer to be used.
    * @param progress for reporting progress if it is not null.
-   * @throws IOException
+   * @throws IOException indicating that append is not supported.
    */
   public FSDataOutputStream append(Path f, int bufferSize, 
     Progressable progress) throws IOException {
@@ -585,7 +642,7 @@ public class S3AFileSystem extends FileSystem {
    * true, the directory is deleted else throws an exception. In
    * case of a file the recursive can be set to either true or false.
    * @return  true if delete is successful else false.
-   * @throws IOException
+   * @throws IOException due to inability to delete a directory or file.
    */
   public boolean delete(Path f, boolean recursive) throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -791,7 +848,7 @@ public class S3AFileSystem extends FileSystem {
    * Set the current working directory for the given file system. All relative
    * paths will be resolved relative to it.
    *
-   * @param new_dir
+   * @param new_dir the current working directory.
    */
   public void setWorkingDirectory(Path new_dir) {
     workingDir = new_dir;
@@ -1202,4 +1259,39 @@ public class S3AFileSystem extends FileSystem {
         "such as not being able to access the network.");
     LOG.info("Error Message: {}" + ace, ace);
   }
+
+  /**
+   * This is a simple encapsulation of the
+   * S3 access key and secret.
+   */
+  static class AWSAccessKeys {
+    private String accessKey = null;
+    private String accessSecret = null;
+
+    /**
+     * Constructor.
+     * @param key - AWS access key
+     * @param secret - AWS secret key
+     */
+    public AWSAccessKeys(String key, String secret) {
+      accessKey = key;
+      accessSecret = secret;
+    }
+
+    /**
+     * Return the AWS access key.
+     * @return key
+     */
+    public String getAccessKey() {
+      return accessKey;
+    }
+
+    /**
+     * Return the AWS secret key.
+     * @return secret
+     */
+    public String getAccessSecret() {
+      return accessSecret;
+    }
+  }
 }

+ 146 - 5
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AConfiguration.java

@@ -32,7 +32,21 @@ import org.slf4j.LoggerFactory;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+import java.io.File;
+import java.net.URI;
+import java.io.IOException;
+
+import org.apache.hadoop.security.ProviderUtils;
+import org.apache.hadoop.security.alias.CredentialProvider;
+import org.apache.hadoop.security.alias.CredentialProviderFactory;
+
+import org.junit.rules.TemporaryFolder;
+
 public class TestS3AConfiguration {
+  private static final String EXAMPLE_ID = "AKASOMEACCESSKEY";
+  private static final String EXAMPLE_KEY =
+      "RGV0cm9pdCBSZ/WQgY2xl/YW5lZCB1cAEXAMPLE";
+
   private Configuration conf;
   private S3AFileSystem fs;
 
@@ -44,6 +58,9 @@ public class TestS3AConfiguration {
   @Rule
   public Timeout testTimeout = new Timeout(30 * 60 * 1000);
 
+  @Rule
+  public final TemporaryFolder tempDir = new TemporaryFolder();
+
   /**
    * Test if custom endpoint is picked up.
    * <p/>
@@ -59,7 +76,7 @@ public class TestS3AConfiguration {
    * @throws Exception
    */
   @Test
-  public void TestEndpoint() throws Exception {
+  public void testEndpoint() throws Exception {
     conf = new Configuration();
     String endpoint = conf.getTrimmed(TEST_ENDPOINT, "");
     if (endpoint.isEmpty()) {
@@ -85,7 +102,7 @@ public class TestS3AConfiguration {
   }
 
   @Test
-  public void TestProxyConnection() throws Exception {
+  public void testProxyConnection() throws Exception {
     conf = new Configuration();
     conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
     conf.set(Constants.PROXY_HOST, "127.0.0.1");
@@ -103,7 +120,7 @@ public class TestS3AConfiguration {
   }
 
   @Test
-  public void TestProxyPortWithoutHost() throws Exception {
+  public void testProxyPortWithoutHost() throws Exception {
     conf = new Configuration();
     conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
     conf.setInt(Constants.PROXY_PORT, 1);
@@ -120,7 +137,7 @@ public class TestS3AConfiguration {
   }
 
   @Test
-  public void TestAutomaticProxyPortSelection() throws Exception {
+  public void testAutomaticProxyPortSelection() throws Exception {
     conf = new Configuration();
     conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
     conf.set(Constants.PROXY_HOST, "127.0.0.1");
@@ -145,7 +162,7 @@ public class TestS3AConfiguration {
   }
 
   @Test
-  public void TestUsernameInconsistentWithPassword() throws Exception {
+  public void testUsernameInconsistentWithPassword() throws Exception {
     conf = new Configuration();
     conf.setInt(Constants.MAX_ERROR_RETRIES, 2);
     conf.set(Constants.PROXY_HOST, "127.0.0.1");
@@ -177,4 +194,128 @@ public class TestS3AConfiguration {
       }
     }
   }
+
+  @Test
+  public void testCredsFromCredentialProvider() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    provisionAccessKeys(conf);
+
+    S3AFileSystem s3afs = new S3AFileSystem();
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
+    S3AFileSystem.AWSAccessKeys creds =
+        s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf);
+    assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
+  }
+
+  void provisionAccessKeys(final Configuration conf) throws Exception {
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(Constants.ACCESS_KEY,
+        EXAMPLE_ID.toCharArray());
+    provider.createCredentialEntry(Constants.SECRET_KEY,
+        EXAMPLE_KEY.toCharArray());
+    provider.flush();
+  }
+
+  @Test
+  public void testCredsFromUserInfo() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    provisionAccessKeys(conf);
+
+    S3AFileSystem s3afs = new S3AFileSystem();
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
+    URI uriWithUserInfo = new URI("s3a://123:456@foobar");
+    S3AFileSystem.AWSAccessKeys creds =
+        s3afs.getAWSAccessKeys(uriWithUserInfo, conf);
+    assertEquals("AccessKey incorrect.", "123", creds.getAccessKey());
+    assertEquals("SecretKey incorrect.", "456", creds.getAccessSecret());
+  }
+
+  @Test
+  public void testIDFromUserInfoSecretFromCredentialProvider()
+      throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    provisionAccessKeys(conf);
+
+    S3AFileSystem s3afs = new S3AFileSystem();
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID + "LJM");
+    URI uriWithUserInfo = new URI("s3a://123@foobar");
+    S3AFileSystem.AWSAccessKeys creds =
+        s3afs.getAWSAccessKeys(uriWithUserInfo, conf);
+    assertEquals("AccessKey incorrect.", "123", creds.getAccessKey());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
+  }
+
+  @Test
+  public void testSecretFromCredentialProviderIDFromConfig() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(Constants.SECRET_KEY,
+        EXAMPLE_KEY.toCharArray());
+    provider.flush();
+
+    S3AFileSystem s3afs = new S3AFileSystem();
+    conf.set(Constants.ACCESS_KEY, EXAMPLE_ID);
+    S3AFileSystem.AWSAccessKeys creds =
+        s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf);
+    assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
+  }
+
+  @Test
+  public void testIDFromCredentialProviderSecretFromConfig() throws Exception {
+    // set up conf to have a cred provider
+    final Configuration conf = new Configuration();
+    final File file = tempDir.newFile("test.jks");
+    final URI jks = ProviderUtils.nestURIForLocalJavaKeyStoreProvider(
+        file.toURI());
+    conf.set(CredentialProviderFactory.CREDENTIAL_PROVIDER_PATH,
+        jks.toString());
+
+    // add our creds to the provider
+    final CredentialProvider provider =
+        CredentialProviderFactory.getProviders(conf).get(0);
+    provider.createCredentialEntry(Constants.ACCESS_KEY,
+        EXAMPLE_ID.toCharArray());
+    provider.flush();
+
+    S3AFileSystem s3afs = new S3AFileSystem();
+    conf.set(Constants.SECRET_KEY, EXAMPLE_KEY);
+    S3AFileSystem.AWSAccessKeys creds =
+        s3afs.getAWSAccessKeys(new URI("s3a://foobar"), conf);
+    assertEquals("AccessKey incorrect.", EXAMPLE_ID, creds.getAccessKey());
+    assertEquals("SecretKey incorrect.", EXAMPLE_KEY, creds.getAccessSecret());
+  }
 }