浏览代码

HADOOP-15671. AliyunOSS: Support Assume Roles in AliyunOSS. Contributed by Jinhu Wu.

(cherry picked from commit 2b635125fb059fc204ed35bc0e264c42dd3a9fe9)
(cherry picked from commit 5da3e8359757c0c1afaccc1d3a0f2bdc453e0311)
(cherry picked from commit 85e00477b8b3ee9c007aa111429588b6616128e2)
(cherry picked from commit c617dba49770d03f9a9a519f6353b2a2afc3a930)
Sammi Chen 6 年之前
父节点
当前提交
76f1834976

+ 3 - 2
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java

@@ -124,7 +124,8 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
         if (null == partETags) {
           throw new IOException("Failed to multipart upload to oss, abort it.");
         }
-        store.completeMultipartUpload(key, uploadId, partETags);
+        store.completeMultipartUpload(key, uploadId,
+            new ArrayList<>(partETags));
       }
     } finally {
       removePartFiles();
@@ -133,7 +134,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
   }
 
   @Override
-  public void write(int b) throws IOException {
+  public synchronized void write(int b) throws IOException {
     singleByte[0] = (byte)b;
     write(singleByte, 0, 1);
   }

+ 3 - 2
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -149,7 +149,7 @@ public class AliyunOSSFileSystemStore {
           "null or empty. Please set proper endpoint with 'fs.oss.endpoint'.");
     }
     CredentialsProvider provider =
-        AliyunOSSUtils.getCredentialsProvider(conf);
+        AliyunOSSUtils.getCredentialsProvider(uri, conf);
     ossClient = new OSSClient(endPoint, provider, clientConf);
     uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(conf,
         MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
@@ -168,6 +168,8 @@ public class AliyunOSSFileSystemStore {
       multipartThreshold = 1024 * 1024 * 1024;
     }
 
+    bucketName = uri.getHost();
+
     String cannedACLName = conf.get(CANNED_ACL_KEY, CANNED_ACL_DEFAULT);
     if (StringUtils.isNotEmpty(cannedACLName)) {
       CannedAccessControlList cannedACL =
@@ -176,7 +178,6 @@ public class AliyunOSSFileSystemStore {
     }
 
     maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
-    bucketName = uri.getHost();
   }
 
   /**

+ 5 - 3
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSUtils.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.aliyun.oss;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.URI;
 
 import com.aliyun.oss.common.auth.CredentialsProvider;
 import com.google.common.base.Preconditions;
@@ -95,13 +96,14 @@ final public class AliyunOSSUtils {
    * Create credential provider specified by configuration, or create default
    * credential provider if not specified.
    *
+   * @param uri uri passed by caller
    * @param conf configuration
    * @return a credential provider
    * @throws IOException on any problem. Class construction issues may be
    * nested inside the IOE.
    */
-  public static CredentialsProvider getCredentialsProvider(Configuration conf)
-      throws IOException {
+  public static CredentialsProvider getCredentialsProvider(
+      URI uri, Configuration conf) throws IOException {
     CredentialsProvider credentials;
 
     String className = conf.getTrimmed(CREDENTIALS_PROVIDER_KEY);
@@ -117,7 +119,7 @@ final public class AliyunOSSUtils {
         try {
           credentials =
               (CredentialsProvider)credClass.getDeclaredConstructor(
-                  Configuration.class).newInstance(conf);
+                  URI.class, Configuration.class).newInstance(uri, conf);
         } catch (NoSuchMethodException | SecurityException e) {
           credentials =
               (CredentialsProvider)credClass.getDeclaredConstructor()

+ 115 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AssumedRoleCredentialProvider.java

@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import com.aliyun.oss.common.auth.Credentials;
+import com.aliyun.oss.common.auth.CredentialsProvider;
+import com.aliyun.oss.common.auth.InvalidCredentialsException;
+import com.aliyun.oss.common.auth.STSAssumeRoleSessionCredentialsProvider;
+import com.aliyuncs.exceptions.ClientException;
+import com.aliyuncs.profile.DefaultProfile;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
+
+/**
+ * Support assumed role credentials for authenticating with Aliyun.
+ * roleArn is configured in core-site.xml
+ */
+public class AssumedRoleCredentialProvider implements CredentialsProvider {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(AssumedRoleCredentialProvider.class);
+  public static final String NAME
+      = "org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider";
+  private Credentials credentials;
+  private String roleArn;
+  private long duration;
+  private String stsEndpoint;
+  private String sessionName;
+  private double expiredFactor;
+  private STSAssumeRoleSessionCredentialsProvider stsCredentialsProvider;
+
+  public AssumedRoleCredentialProvider(URI uri, Configuration conf) {
+    roleArn = conf.getTrimmed(Constants.ROLE_ARN, "");
+    if (StringUtils.isEmpty(roleArn)) {
+      throw new InvalidCredentialsException(
+          "fs.oss.assumed.role.arn is empty");
+    }
+
+    duration = conf.getLong(Constants.ASSUMED_ROLE_DURATION,
+        Constants.ASSUMED_ROLE_DURATION_DEFAULT);
+
+    expiredFactor = conf.getDouble(Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR,
+        Constants.ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT);
+
+    stsEndpoint = conf.getTrimmed(Constants.ASSUMED_ROLE_STS_ENDPOINT, "");
+    if (StringUtils.isEmpty(stsEndpoint)) {
+      throw new InvalidCredentialsException(
+          "fs.oss.assumed.role.sts.endpoint is empty");
+    }
+
+    sessionName = conf.getTrimmed(Constants.ASSUMED_ROLE_SESSION_NAME, "");
+
+    String accessKeyId;
+    String accessKeySecret;
+    try {
+      accessKeyId = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_ID);
+      accessKeySecret = AliyunOSSUtils.getValueWithKey(conf, ACCESS_KEY_SECRET);
+    } catch (IOException e) {
+      throw new InvalidCredentialsException(e);
+    }
+
+    try {
+      DefaultProfile.addEndpoint("", "", "Sts", stsEndpoint);
+    } catch (ClientException e) {
+      throw new InvalidCredentialsException(e);
+    }
+
+    stsCredentialsProvider = new STSAssumeRoleSessionCredentialsProvider(
+        new com.aliyuncs.auth.BasicCredentials(accessKeyId, accessKeySecret),
+        roleArn, DefaultProfile.getProfile("", accessKeyId, accessKeySecret))
+            .withExpiredDuration(duration).withExpiredFactor(expiredFactor);
+
+    if (!StringUtils.isEmpty(sessionName)) {
+      stsCredentialsProvider.withRoleSessionName(sessionName);
+    }
+  }
+
+  @Override
+  public void setCredentials(Credentials creds) {
+    throw new InvalidCredentialsException(
+        "Should not set credentials from external call");
+  }
+
+  @Override
+  public Credentials getCredentials() {
+    credentials = stsCredentialsProvider.getCredentials();
+    if (credentials == null) {
+      throw new InvalidCredentialsException("Invalid credentials");
+    }
+    return credentials;
+  }
+}

+ 22 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.aliyun.oss;
 
+import com.aliyun.oss.common.utils.AuthUtils;
 import com.aliyun.oss.common.utils.VersionInfoUtils;
 
 /**
@@ -42,6 +43,27 @@ public final class Constants {
   public static final String ACCESS_KEY_SECRET = "fs.oss.accessKeySecret";
   public static final String SECURITY_TOKEN = "fs.oss.securityToken";
 
+  // Assume role configurations
+  public static final String ROLE_ARN = "fs.oss.assumed.role.arn";
+  public static final String ASSUMED_ROLE_DURATION =
+      "fs.oss.assumed.role.session.duration";
+  // Default session duration(in seconds)
+  public static final long ASSUMED_ROLE_DURATION_DEFAULT = 30 * 60;
+
+  // Expired factor of sts token
+  // For example, if session duration is 900s and expiredFactor is 0.8
+  // sts token will be refreshed after 900 * 0.8s
+  public static final String ASSUMED_ROLE_STS_EXPIRED_FACTOR =
+      "fs.oss.assumed.role.sts.expiredFactor";
+
+  public static final double ASSUMED_ROLE_STS_EXPIRED_FACTOR_DEFAULT =
+      AuthUtils.DEFAULT_EXPIRED_FACTOR;
+
+  public static final String ASSUMED_ROLE_STS_ENDPOINT =
+      "fs.oss.assumed.role.sts.endpoint";
+  public static final String ASSUMED_ROLE_SESSION_NAME =
+      "fs.oss.assumed.role.session.name";
+
   // Number of simultaneous connections to oss
   public static final String MAXIMUM_CONNECTIONS_KEY =
       "fs.oss.connection.maximum";

+ 50 - 0
hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md

@@ -117,6 +117,56 @@ please raise your issues with them.
        </description>
     </property>
 
+    <property>
+      <name>fs.oss.assumed.role.arn</name>
+      <description>
+        Role ARN for the role to be assumed.
+        Required if the fs.oss.credentials.provider is
+        org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
+      </description>
+    </property>
+
+    <property>
+      <name>fs.oss.assumed.role.sts.endpoint</name>
+      <description>
+        STS Token Service endpoint.
+        Required if the fs.oss.credentials.provider is
+        org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
+      </description>
+    </property>
+
+    <property>
+      <name>fs.oss.assumed.role.session.name</name>
+      <value />
+      <description>
+        Session name for the assumed role, must be valid characters
+        according to Aliyun API. It is optional, will be generated by
+        oss java sdk if it is empty.
+        Only used if the fs.oss.credentials.provider is
+        org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
+      </description>
+    </property>
+
+    <property>
+      <name>fs.oss.assumed.role.session.duration</name>
+      <value />
+      <description>
+        Duration of assumed roles before it is expired. Default is 30 minutes.
+        Only used if the fs.oss.credentials.provider is
+        org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
+      </description>
+    </property>
+
+    <property>
+      <name>fs.oss.assumed.role.sts.expiredFactor</name>
+      <value />
+      <description>
+        Sts token will be refreshed after (expiredFactor * duration) seconds.
+        Only used if the fs.oss.credentials.provider is
+        org.apache.hadoop.fs.aliyun.oss.AssumedRoleCredentialProvider.
+      </description>
+    </property>
+
     <property>
       <name>fs.oss.proxy.host</name>
       <description>Hostname of the (optinal) proxy server for Aliyun OSS connection</description>

+ 50 - 5
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunCredentials.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.aliyun.oss;
 
 import com.aliyun.oss.common.auth.Credentials;
+import com.aliyun.oss.common.auth.CredentialsProvider;
 import com.aliyun.oss.common.auth.InvalidCredentialsException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.aliyun.oss.contract.AliyunOSSContract;
@@ -27,9 +28,15 @@ import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
 
 import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_ID;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.ACCESS_KEY_SECRET;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_SESSION_NAME;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ASSUMED_ROLE_STS_ENDPOINT;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.CREDENTIALS_PROVIDER_KEY;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.ROLE_ARN;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.SECURITY_TOKEN;
 
 /**
@@ -63,16 +70,54 @@ public class TestAliyunCredentials extends AbstractFSContractTestBase {
     validateCredential(conf);
   }
 
-  private void validateCredential(Configuration conf) {
+  @Test
+  public void testCredentialMissingRoleArn() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
+    conf.set(ROLE_ARN, "");
+    validateCredential(conf);
+  }
+
+  @Test
+  public void testCredentialMissingStsEndpoint() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
+    conf.set(ASSUMED_ROLE_STS_ENDPOINT, "");
+    validateCredential(conf);
+  }
+
+  @Test
+  public void testCredentialInvalidSessionName() throws Throwable {
+    Configuration conf = new Configuration();
+    conf.set(CREDENTIALS_PROVIDER_KEY, AssumedRoleCredentialProvider.NAME);
+    conf.set(ASSUMED_ROLE_SESSION_NAME, "hadoop oss");
+    validateCredential(conf);
+  }
+
+  private void validateCredential(URI uri, Configuration conf) {
     try {
-      AliyunCredentialsProvider provider
-          = new AliyunCredentialsProvider(conf);
+      CredentialsProvider provider =
+          AliyunOSSUtils.getCredentialsProvider(uri, conf);
       Credentials credentials = provider.getCredentials();
       fail("Expected a CredentialInitializationException, got " + credentials);
     } catch (InvalidCredentialsException expected) {
       // expected
     } catch (IOException e) {
-      fail("Unexpected exception.");
+      Throwable cause = e.getCause();
+      if (cause instanceof InvocationTargetException) {
+        boolean isInstance =
+            ((InvocationTargetException)cause).getTargetException()
+                instanceof InvalidCredentialsException;
+        if (!isInstance) {
+          fail("Unexpected exception.");
+        }
+      } else {
+        fail("Unexpected exception.");
+      }
     }
   }
-}
+
+  private void validateCredential(Configuration conf) {
+    validateCredential(null, conf);
+  }
+}