Parcourir la source

Integration of TOS: Volcano Engine Auth.

lijinglun il y a 9 mois
Parent
commit
194af99994

+ 7 - 0
hadoop-cloud-storage-project/hadoop-tos/pom.xml

@@ -34,6 +34,7 @@
 
   <properties>
     <file.encoding>UTF-8</file.encoding>
+    <ve-tos-java-sdk.version>2.8.2</ve-tos-java-sdk.version>
   </properties>
 
   <dependencies>
@@ -49,6 +50,12 @@
       <type>test-jar</type>
     </dependency>
 
+    <dependency>
+      <groupId>com.volcengine</groupId>
+      <artifactId>ve-tos-java-sdk</artifactId>
+      <version>${ve-tos-java-sdk.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>org.assertj</groupId>
       <artifactId>assertj-core</artifactId>

+ 45 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java

@@ -29,4 +29,49 @@ public class TosKeys {
    * The accessKey key to access the tos object storage.
    */
   public static final String FS_TOS_ACCESS_KEY_ID = "fs.tos.access-key-id";
+
+  /**
+   * The secret access key to access the object storage.
+   */
+  public static final String FS_TOS_SECRET_ACCESS_KEY = "fs.tos.secret-access-key";
+
+  /**
+   * The session token to access the object storage.
+   */
+  public static final String FS_TOS_SESSION_TOKEN = "fs.tos.session-token";
+
+  /**
+   * The access key to access the object storage for the configured bucket, where %s is the bucket
+   * name.
+   */
+  public static final String FS_TOS_BUCKET_ACCESS_KEY_ID_TEMPLATE = "fs.tos.bucket.%s.access-key-id";
+
+  /**
+   * The secret access key to access the object storage for the configured bucket, where %s is the
+   * bucket name.
+   */
+  public static final String FS_TOS_BUCKET_SECRET_ACCESS_KEY_TEMPLATE = "fs.tos.bucket.%s.secret-access-key";
+
+  /**
+   * The session token to access the object storage for the configured bucket, where %s is the
+   * bucket name.
+   */
+  public static final String FS_TOS_BUCKET_SESSION_TOKEN_TEMPLATE = "fs.tos.bucket.%s.session-token";
+
+  /**
+   * User customized credential provider classes, separate provider class name with comma if there
+   * are multiple providers.
+   */
+  public static final String FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES =
+      "fs.tos.credential.provider.custom.classes";
+
+  public static final String FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES_DEFAULT =
+      "io.proton.common.object.tos.auth.EnvironmentCredentialsProvider,io.proton.common.object.tos.auth.SimpleCredentialsProvider";
+
+  /**
+   * Construct key from template and corresponding arguments.
+   */
+  public static final String get(String template, Object... arguments) {
+    return String.format(template, arguments);
+  }
 }

+ 25 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java

@@ -0,0 +1,25 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+public class TOS {
+
+  public static final String ENV_TOS_ACCESS_KEY_ID = "TOS_ACCESS_KEY_ID";
+  public static final String ENV_TOS_SECRET_ACCESS_KEY = "TOS_SECRET_ACCESS_KEY";
+  public static final String ENV_TOS_SESSION_TOKEN = "TOS_SESSION_TOKEN";
+  public static final String ENV_TOS_ENDPOINT = "TOS_ENDPOINT";
+}

+ 76 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/AbstractCredentialsProvider.java

@@ -0,0 +1,76 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_ACCESS_KEY_ID;
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_SECRET_ACCESS_KEY;
+
+public abstract class AbstractCredentialsProvider implements CredentialsProvider {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AbstractCredentialsProvider.class);
+
+  protected volatile ExpireableCredential credential;
+  private Configuration conf;
+  private String bucket;
+
+  @Override
+  public void initialize(Configuration conf, String bucket) {
+    this.conf = conf;
+    this.bucket = bucket;
+  }
+
+  /**
+   * throw exception if no valid credential found, the response credential is not null.
+   *
+   * @return credential
+   */
+  @Override
+  public ExpireableCredential credential() {
+    if (credential == null || credential.isExpired()) {
+      synchronized (this) {
+        if (credential == null || credential.isExpired()) {
+          LOG.debug("Credential expired, create a new credential");
+          ExpireableCredential cred = createCredential();
+          Preconditions.checkNotNull(cred.getAccessKeyId(), "%s cannot be null",
+              FS_TOS_ACCESS_KEY_ID);
+          Preconditions.checkNotNull(cred.getAccessKeySecret(), "%s cannot be null",
+              FS_TOS_SECRET_ACCESS_KEY);
+          credential = cred;
+        }
+      }
+    }
+    return credential;
+  }
+
+  public Configuration conf() {
+    return conf;
+  }
+
+  public String bucket() {
+    return bucket;
+  }
+
+  /**
+   * throw exception if not credential found.
+   */
+  protected abstract ExpireableCredential createCredential();
+}

+ 35 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/CredentialsProvider.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import com.volcengine.tos.auth.Credentials;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+
+public interface CredentialsProvider extends Credentials {
+
+  /**
+   * Initialize the credential provider.
+   *
+   * @param conf   the {@link Configuration} used for building credential provider
+   * @param bucket the binding bucket, it can be null.
+   */
+  void initialize(Configuration conf, @Nullable String bucket);
+}

+ 100 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/DefaultCredentialsProviderChain.java

@@ -0,0 +1,100 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import com.volcengine.tos.TosException;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES;
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES_DEFAULT;
+
+public class DefaultCredentialsProviderChain extends AbstractCredentialsProvider {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DefaultCredentialsProviderChain.class);
+
+  private final List<AbstractCredentialsProvider> providers = new LinkedList<>();
+  private volatile AbstractCredentialsProvider lastUsedProvider;
+
+  @Override
+  public void initialize(Configuration conf, String bucket) {
+    super.initialize(conf, bucket);
+    loadAllCredentialProviders();
+  }
+
+  private void loadAllCredentialProviders() {
+    for (String providerClazz : getCustomProviderClasses()) {
+      try {
+        Class<?> clazz = Class.forName(providerClazz);
+        AbstractCredentialsProvider credentialsProvider =
+            (AbstractCredentialsProvider) clazz.getDeclaredConstructor().newInstance();
+        credentialsProvider.initialize(conf(), bucket());
+        providers.add(credentialsProvider);
+      } catch (Exception e) {
+        LOG.error("Failed to initialize credential provider for {}", providerClazz, e);
+        // throw exception directly since the configurations are invalid.
+        throw new TosException(e);
+      }
+    }
+  }
+
+  private String[] getCustomProviderClasses() {
+    String[] classes = conf().getStringCollection(FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES)
+        .toArray(new String[0]);
+    if (classes.length == 0) {
+      classes = FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES_DEFAULT.split(",");
+    }
+    return classes;
+  }
+
+  @Override
+  protected ExpireableCredential createCredential() {
+    if (lastUsedProvider != null) {
+      return lastUsedProvider.credential();
+    } else {
+      List<Exception> exceptions = new LinkedList<>();
+      for (AbstractCredentialsProvider provider : providers) {
+        try {
+          ExpireableCredential credential = provider.credential();
+          LOG.debug("Access credential from {} successfully, choose it as the candidate provider",
+              provider.getClass().getName());
+          lastUsedProvider = provider;
+          return credential;
+        } catch (Exception e) {
+          LOG.debug("Failed to access credential from provider {}", provider.getClass().getName(), e);
+          exceptions.add(e);
+        }
+      }
+      String errorMsg = "Unable to load TOS credentials from any provider in the chain.";
+      RuntimeException runtimeException = new RuntimeException(errorMsg);
+      exceptions.forEach(runtimeException::addSuppressed);
+      throw runtimeException;
+    }
+  }
+
+  @VisibleForTesting
+  AbstractCredentialsProvider lastUsedProvider() {
+    Preconditions.checkNotNull(lastUsedProvider, "provider cannot be null");
+    return lastUsedProvider;
+  }
+}

+ 34 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/EnvironmentCredentialsProvider.java

@@ -0,0 +1,34 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+
+public class EnvironmentCredentialsProvider extends AbstractCredentialsProvider {
+
+  public static final String NAME = EnvironmentCredentialsProvider.class.getName();
+
+  @Override
+  protected ExpireableCredential createCredential() {
+    return new ExpireableCredential(
+        System.getenv(TOS.ENV_TOS_ACCESS_KEY_ID),
+        System.getenv(TOS.ENV_TOS_SECRET_ACCESS_KEY),
+        System.getenv(TOS.ENV_TOS_SESSION_TOKEN),
+        Long.MAX_VALUE
+    );
+  }
+}

+ 67 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/ExpireableCredential.java

@@ -0,0 +1,67 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import com.volcengine.tos.auth.Credential;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.util.Preconditions;
+
+public class ExpireableCredential extends Credential {
+  public static final String EXPIRED_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ssXXX";
+  public static final int EXPIRED_INTERVAL_MILLIS = 1000 * 60; // 1 minute
+
+  protected final long expireTimeMills;
+
+  /**
+   * The credential is never expired, default sts value is null and expired is Long.MAX_VALUE.
+   *
+   * @param accessKeyId     IAM AK.
+   * @param accessKeySecret IAM SK.
+   */
+  public ExpireableCredential(
+      String accessKeyId,
+      String accessKeySecret) {
+    this(accessKeyId, accessKeySecret, "", Long.MAX_VALUE);
+  }
+
+  /**
+   * Credential that can expire.
+   *
+   * @param accessKeyId     IAM AK.
+   * @param accessKeySecret IAM SK.
+   * @param sessionToken   Session token.
+   * @param expireTimeMills Session token expire time,
+   *                        the default value is the request time +6H if get it from the meta service.
+   */
+  public ExpireableCredential(
+      String accessKeyId,
+      String accessKeySecret,
+      String sessionToken,
+      long expireTimeMills) {
+    super(accessKeyId, accessKeySecret, sessionToken);
+    Preconditions.checkNotNull(accessKeyId,
+        "%s cannot be null", TosKeys.FS_TOS_ACCESS_KEY_ID);
+    Preconditions.checkNotNull(accessKeySecret,
+        "%s cannot be null", TosKeys.FS_TOS_SECRET_ACCESS_KEY);
+    Preconditions.checkArgument(expireTimeMills > 0, "expiredTime must be > 0");
+    this.expireTimeMills = expireTimeMills;
+  }
+
+  public boolean isExpired() {
+    return expireTimeMills - System.currentTimeMillis() <= EXPIRED_INTERVAL_MILLIS;
+  }
+}

+ 63 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/SimpleCredentialsProvider.java

@@ -0,0 +1,63 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_ACCESS_KEY_ID;
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID_TEMPLATE;
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY_TEMPLATE;
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_BUCKET_SESSION_TOKEN_TEMPLATE;
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_SECRET_ACCESS_KEY;
+import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_SESSION_TOKEN;
+
+public class SimpleCredentialsProvider extends AbstractCredentialsProvider {
+
+  public static final String NAME = SimpleCredentialsProvider.class.getName();
+
+  @Override
+  protected ExpireableCredential createCredential() {
+    String accessKey = lookup(conf(), TosKeys.get(FS_TOS_BUCKET_ACCESS_KEY_ID_TEMPLATE, bucket()),
+        FS_TOS_ACCESS_KEY_ID);
+    String secretKey =
+        lookup(conf(), TosKeys.get(FS_TOS_BUCKET_SECRET_ACCESS_KEY_TEMPLATE, bucket()),
+            FS_TOS_SECRET_ACCESS_KEY);
+    String sessionToken =
+        lookup(conf(), TosKeys.get(FS_TOS_BUCKET_SESSION_TOKEN_TEMPLATE, bucket()),
+            FS_TOS_SESSION_TOKEN);
+    if (StringUtils.isEmpty(sessionToken)) {
+      // This is a static ak sk configuration.
+      return new ExpireableCredential(accessKey, secretKey);
+    } else {
+      // This is an assume role configuration. Due to the ak, sk and token won't be refreshed in conf, set the
+      // expireTime to Long.MAX_VALUE.
+      return new ExpireableCredential(accessKey, secretKey, sessionToken, Long.MAX_VALUE);
+    }
+  }
+
+  static String lookup(Configuration conf, String key, String fallbackKey) {
+    if (StringUtils.isNotEmpty(key)) {
+      String dynValue = conf.get(key);
+      if (StringUtils.isNotEmpty(dynValue)) {
+        return dynValue;
+      }
+    }
+    return conf.get(fallbackKey);
+  }
+}