ソースを参照

Integration of TOS: Auth test cases, PrefixStorage and ConfKeys udpate.

lijinglun 10 ヶ月 前
コミット
63f162a73a
15 ファイル変更1242 行追加24 行削除
  1. 32 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ArgumentKey.java
  2. 8 1
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java
  3. 1 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/FileStoreKeys.java
  4. 10 12
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java
  5. 71 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageFactory.java
  6. 266 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/PrefixStorage.java
  7. 1 1
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/DefaultCredentialsProviderChain.java
  8. 5 10
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/SimpleCredentialsProvider.java
  9. 181 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/ParseUtils.java
  10. 60 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/TOSClientContextUtils.java
  11. 60 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestAbstractCredentialsProvider.java
  12. 213 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestDefaultCredentialsProviderChain.java
  13. 64 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestEnvironmentCredentialsProvider.java
  14. 81 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestSimpleCredentialsProvider.java
  15. 189 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestUtility.java

+ 32 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ArgumentKey.java

@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.conf;
+
+public class ArgumentKey {
+
+  private final String template;
+
+  public ArgumentKey(String template) {
+    this.template = template;
+  }
+
+  public String key(Object... arguments) {
+    return String.format(template, arguments);
+  }
+}

+ 8 - 1
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java

@@ -20,6 +20,13 @@ package org.apache.hadoop.fs.tosfs.conf;
 
 
 public class ConfKeys {
 public class ConfKeys {
 
 
+  public static final ArgumentKey FS_TOS_ENDPOINT = new ArgumentKey("fs.%s.endpoint");
 
 
-
+  /**
+   * The object storage implementation for the defined scheme. For example, we can delegate the
+   * scheme 'abc' to TOS (or other object storage),and access the TOS object storage as
+   * 'abc://bucket/path/to/key'
+   */
+  public static final ArgumentKey FS_OBJECT_STORAGE_IMPL =
+      new ArgumentKey("fs.objectstorage.%s.impl");
 }
 }

+ 1 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/FileStoreKeys.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.tosfs.conf;
 package org.apache.hadoop.fs.tosfs.conf;
 
 
 public class FileStoreKeys {
 public class FileStoreKeys {
+
   /**
   /**
    * File store object storage endpoint to connect to.
    * File store object storage endpoint to connect to.
    */
    */

+ 10 - 12
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.tosfs.conf;
 package org.apache.hadoop.fs.tosfs.conf;
 
 
 public class TosKeys {
 public class TosKeys {
+
   /**
   /**
    * Tos object storage endpoint to connect to, which should include both region and object domain
    * Tos object storage endpoint to connect to, which should include both region and object domain
    * name.
    * name.
@@ -44,19 +45,22 @@ public class TosKeys {
    * The access key to access the object storage for the configured bucket, where %s is the bucket
    * The access key to access the object storage for the configured bucket, where %s is the bucket
    * name.
    * name.
    */
    */
-  public static final String FS_TOS_BUCKET_ACCESS_KEY_ID_TEMPLATE = "fs.tos.bucket.%s.access-key-id";
+  public static final ArgumentKey FS_TOS_BUCKET_ACCESS_KEY_ID =
+      new ArgumentKey("fs.tos.bucket.%s.access-key-id");
 
 
   /**
   /**
    * The secret access key to access the object storage for the configured bucket, where %s is the
    * The secret access key to access the object storage for the configured bucket, where %s is the
    * bucket name.
    * bucket name.
    */
    */
-  public static final String FS_TOS_BUCKET_SECRET_ACCESS_KEY_TEMPLATE = "fs.tos.bucket.%s.secret-access-key";
+  public static final ArgumentKey FS_TOS_BUCKET_SECRET_ACCESS_KEY =
+      new ArgumentKey("fs.tos.bucket.%s.secret-access-key");
 
 
   /**
   /**
    * The session token to access the object storage for the configured bucket, where %s is the
    * The session token to access the object storage for the configured bucket, where %s is the
    * bucket name.
    * bucket name.
    */
    */
-  public static final String FS_TOS_BUCKET_SESSION_TOKEN_TEMPLATE = "fs.tos.bucket.%s.session-token";
+  public static final ArgumentKey FS_TOS_BUCKET_SESSION_TOKEN =
+      new ArgumentKey("fs.tos.bucket.%s.session-token");
 
 
   /**
   /**
    * User customized credential provider classes, separate provider class name with comma if there
    * User customized credential provider classes, separate provider class name with comma if there
@@ -65,13 +69,7 @@ public class TosKeys {
   public static final String FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES =
   public static final String FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES =
       "fs.tos.credential.provider.custom.classes";
       "fs.tos.credential.provider.custom.classes";
 
 
-  public static final String FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES_DEFAULT =
-      "io.proton.common.object.tos.auth.EnvironmentCredentialsProvider,io.proton.common.object.tos.auth.SimpleCredentialsProvider";
-
-  /**
-   * Construct key from template and corresponding arguments.
-   */
-  public static final String get(String template, Object... arguments) {
-    return String.format(template, arguments);
-  }
+  public static final String[] FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES_DEFAULT =
+      new String[] { "org.apache.hadoop.fs.tosfs.object.tos.auth.EnvironmentCredentialsProvider",
+          "org.apache.hadoop.fs.tosfs.object.tos.auth.SimpleCredentialsProvider" };
 }
 }

+ 71 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageFactory.java

@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+import org.apache.hadoop.util.Preconditions;
+
+import java.lang.reflect.InvocationTargetException;
+
+import static org.apache.hadoop.fs.tosfs.conf.ConfKeys.FS_OBJECT_STORAGE_IMPL;
+
+public class ObjectStorageFactory {
+
+  private static final Configuration DEFAULT_IMPLS = new Configuration();
+
+  static {
+    // Setup default object storage impl for scheme "tos" and "filestore".
+    DEFAULT_IMPLS.set(ConfKeys.FS_OBJECT_STORAGE_IMPL.key("tos"), TOS.class.getName());
+    DEFAULT_IMPLS.set(ConfKeys.FS_OBJECT_STORAGE_IMPL.key("filestore"), FileStore.class.getName());
+  }
+
+  private ObjectStorageFactory() {
+  }
+
+  public static ObjectStorage createWithPrefix(String prefix, String scheme, String bucket, Configuration conf) {
+    ObjectStorage storage = create(scheme, bucket, conf);
+    return new PrefixStorage(storage, prefix);
+  }
+
+  public static ObjectStorage create(String scheme, String bucket, Configuration conf) {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(scheme), "Scheme is null or empty.");
+    Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "Bucket is null or empty.");
+    Preconditions.checkNotNull(conf, "Conf is null.");
+
+    try {
+      String confKey = FS_OBJECT_STORAGE_IMPL.key(scheme);
+      String impl = conf.get(confKey, DEFAULT_IMPLS.get(confKey));
+
+      Preconditions.checkArgument(StringUtils.isNotEmpty(impl),
+          "Cannot locate the ObjectStorage implementation for scheme '%s'", scheme);
+      ObjectStorage store = (ObjectStorage) Class.forName(impl).getDeclaredConstructor().newInstance();
+      store.initialize(conf, bucket);
+      return store;
+    } catch (ClassNotFoundException |
+             InvocationTargetException |
+             InstantiationException |
+             IllegalAccessException |
+             NoSuchMethodException e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

+ 266 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/PrefixStorage.java

@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import com.google.common.collect.Iterables;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
+import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.util.Preconditions;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class PrefixStorage implements DirectoryStorage {
+  private final ObjectStorage storage;
+  private final String prefix;
+
+  public PrefixStorage(ObjectStorage storage, String prefix) {
+    this.storage = storage;
+    this.prefix = prefix;
+  }
+
+  @VisibleForTesting
+  ObjectStorage storage() {
+    return storage;
+  }
+
+  @VisibleForTesting
+  String prefix() {
+    return prefix;
+  }
+
+  @Override
+  public String scheme() {
+    return storage.scheme();
+  }
+
+  @Override
+  public BucketInfo bucket() {
+    return storage.bucket();
+  }
+
+  @Override
+  public void initialize(Configuration conf, String bucket) {
+    storage.initialize(conf, bucket);
+  }
+
+  @Override
+  public Configuration conf() {
+    return storage.conf();
+  }
+
+  @Override
+  public ObjectContent get(String key, long offset, long limit) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    return storage.get(prefix + key, offset, limit);
+  }
+
+  @Override
+  public byte[] put(String key, InputStreamProvider streamProvider, long contentLength) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    return storage.put(prefix + key, streamProvider, contentLength);
+  }
+
+  @Override
+  public byte[] append(String key, InputStreamProvider streamProvider, long contentLength) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    return storage.append(prefix + key, streamProvider, contentLength);
+  }
+
+  @Override
+  public void delete(String key) {
+    Preconditions.checkArgument(key != null, "Object key cannot be null or empty.");
+    storage.delete(prefix + key);
+  }
+
+  @Override
+  public List<String> batchDelete(List<String> keys) {
+    return storage.batchDelete(keys.stream().map(key -> prefix + key).collect(Collectors.toList()));
+  }
+
+  @Override
+  public void deleteAll(String prefix) {
+    storage.deleteAll(this.prefix + prefix);
+  }
+
+  @Override
+  public ObjectInfo head(String key) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    return removePrefix(storage.head(prefix + key));
+  }
+
+  private ListObjectsResponse removePrefix(ListObjectsResponse response) {
+    List<ObjectInfo> objects = response.objects().stream()
+        .map(this::removePrefix)
+        .collect(Collectors.toList());
+    List<String> commonPrefixKeys = response.commonPrefixes().stream()
+        .map(this::removePrefix)
+        .collect(Collectors.toList());
+    return new ListObjectsResponse(objects, commonPrefixKeys);
+  }
+
+  @Override
+  public Iterable<ListObjectsResponse> list(ListObjectsRequest request) {
+    String startAfter = Strings.isNullOrEmpty(request.startAfter()) ?
+        request.startAfter() : prefix + request.startAfter();
+
+    ListObjectsRequest newReq = ListObjectsRequest.builder()
+        .prefix(prefix + request.prefix())
+        .startAfter(startAfter)
+        .maxKeys(request.maxKeys())
+        .delimiter(request.delimiter())
+        .build();
+
+    return Iterables.transform(storage.list(newReq), this::removePrefix);
+  }
+
+  @Override
+  public MultipartUpload createMultipartUpload(String key) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    return removePrefix(storage.createMultipartUpload(prefix + key));
+  }
+
+  @Override
+  public Part uploadPart(
+      String key, String uploadId, int partNum,
+      InputStreamProvider streamProvider, long contentLength) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    return storage.uploadPart(prefix + key, uploadId, partNum, streamProvider, contentLength);
+  }
+
+  @Override
+  public byte[] completeUpload(String key, String uploadId, List<Part> uploadParts) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    return storage.completeUpload(prefix + key, uploadId, uploadParts);
+  }
+
+  @Override
+  public void abortMultipartUpload(String key, String uploadId) {
+    Preconditions.checkArgument(key != null && key.length() > 0,
+        "Object key cannot be null or empty.");
+    storage.abortMultipartUpload(prefix + key, uploadId);
+  }
+
+  @Override
+  public Iterable<MultipartUpload> listUploads(String keyPrefix) {
+    return Iterables.transform(storage.listUploads(prefix + keyPrefix), this::removePrefix);
+  }
+
+  @Override
+  public Part uploadPartCopy(
+      String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart,
+      long copySourceRangeEnd) {
+    return storage.uploadPartCopy(prefix + srcKey, prefix + dstKey, uploadId, partNum,
+        copySourceRangeStart, copySourceRangeEnd);
+  }
+
+  @Override
+  public void copy(String srcKey, String dstKey) {
+    storage.copy(prefix + srcKey, prefix + dstKey);
+  }
+
+  @Override
+  public void rename(String srcKey, String dstKey) {
+    storage.rename(prefix + srcKey, prefix + dstKey);
+  }
+
+  private ObjectInfo removePrefix(ObjectInfo o) {
+    if (o == null) {
+      return null;
+    }
+    return new ObjectInfo(removePrefix(o.key()), o.size(), o.mtime(), o.checksum(), o.isDir());
+  }
+
+  private MultipartUpload removePrefix(MultipartUpload u) {
+    if (u == null) {
+      return null;
+    }
+    return new MultipartUpload(removePrefix(u.key()), u.uploadId(), u.minPartSize(),
+        u.maxPartCount());
+  }
+
+  private String removePrefix(String key) {
+    if (key == null) {
+      return null;
+    } else if (key.startsWith(prefix)) {
+      return key.substring(prefix.length());
+    } else {
+      return key;
+    }
+  }
+
+  @Override
+  public void putTags(String key, Map<String, String> newTags) {
+    storage.putTags(prefix + key, newTags);
+  }
+
+  @Override
+  public Map<String, String> getTags(String key) {
+    return storage.getTags(prefix + key);
+  }
+
+  @Override
+  public ObjectInfo objectStatus(String key) {
+    Preconditions.checkArgument(key != null && !key.isEmpty(),
+        "Object key cannot be null or empty.");
+    return removePrefix(storage.objectStatus(prefix + key));
+  }
+
+  @Override
+  public ChecksumInfo checksumInfo() {
+    return storage.checksumInfo();
+  }
+
+  @Override
+  public void close() throws IOException {
+    storage.close();
+  }
+
+  @Override
+  public Iterable<ObjectInfo> listDir(String key, boolean recursive) {
+    Preconditions.checkArgument(storage instanceof DirectoryStorage);
+    return Iterables.transform(((DirectoryStorage) storage).listDir(prefix + key, recursive),
+        this::removePrefix);
+  }
+
+  @Override
+  public void deleteDir(String key, boolean recursive) {
+    Preconditions.checkArgument(storage instanceof DirectoryStorage);
+    ((DirectoryStorage) storage).deleteDir(prefix + key, recursive);
+  }
+
+  @Override
+  public boolean isEmptyDir(String key) {
+    Preconditions.checkArgument(storage instanceof DirectoryStorage);
+    return ((DirectoryStorage) storage).isEmptyDir(prefix + key);
+  }
+}
+

+ 1 - 1
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/DefaultCredentialsProviderChain.java

@@ -62,7 +62,7 @@ public class DefaultCredentialsProviderChain extends AbstractCredentialsProvider
     String[] classes = conf().getStringCollection(FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES)
     String[] classes = conf().getStringCollection(FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES)
         .toArray(new String[0]);
         .toArray(new String[0]);
     if (classes.length == 0) {
     if (classes.length == 0) {
-      classes = FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES_DEFAULT.split(",");
+      classes = FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES_DEFAULT;
     }
     }
     return classes;
     return classes;
   }
   }

+ 5 - 10
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/auth/SimpleCredentialsProvider.java

@@ -21,9 +21,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.tosfs.conf.TosKeys;
 import org.apache.hadoop.fs.tosfs.conf.TosKeys;
 
 
 import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_ACCESS_KEY_ID;
 import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_ACCESS_KEY_ID;
-import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID_TEMPLATE;
-import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY_TEMPLATE;
-import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_BUCKET_SESSION_TOKEN_TEMPLATE;
 import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_SECRET_ACCESS_KEY;
 import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_SECRET_ACCESS_KEY;
 import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_SESSION_TOKEN;
 import static org.apache.hadoop.fs.tosfs.conf.TosKeys.FS_TOS_SESSION_TOKEN;
 
 
@@ -33,14 +30,12 @@ public class SimpleCredentialsProvider extends AbstractCredentialsProvider {
 
 
   @Override
   @Override
   protected ExpireableCredential createCredential() {
   protected ExpireableCredential createCredential() {
-    String accessKey = lookup(conf(), TosKeys.get(FS_TOS_BUCKET_ACCESS_KEY_ID_TEMPLATE, bucket()),
-        FS_TOS_ACCESS_KEY_ID);
-    String secretKey =
-        lookup(conf(), TosKeys.get(FS_TOS_BUCKET_SECRET_ACCESS_KEY_TEMPLATE, bucket()),
-            FS_TOS_SECRET_ACCESS_KEY);
+    String accessKey =
+        lookup(conf(), TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket()), FS_TOS_ACCESS_KEY_ID);
+    String secretKey = lookup(conf(), TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket()),
+        FS_TOS_SECRET_ACCESS_KEY);
     String sessionToken =
     String sessionToken =
-        lookup(conf(), TosKeys.get(FS_TOS_BUCKET_SESSION_TOKEN_TEMPLATE, bucket()),
-            FS_TOS_SESSION_TOKEN);
+        lookup(conf(), TosKeys.FS_TOS_BUCKET_SESSION_TOKEN.key(bucket()), FS_TOS_SESSION_TOKEN);
     if (StringUtils.isEmpty(sessionToken)) {
     if (StringUtils.isEmpty(sessionToken)) {
       // This is a static ak sk configuration.
       // This is a static ak sk configuration.
       return new ExpireableCredential(accessKey, secretKey);
       return new ExpireableCredential(accessKey, secretKey);

+ 181 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/ParseUtils.java

@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Splitter;
+import org.apache.hadoop.util.Preconditions;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class ParseUtils {
+  private static final String ERROR_MSG = "Failed to parse value %s as %s, property key %s";
+
+  private ParseUtils() {
+  }
+
+  public static String envAsString(String key) {
+    return envAsString(key, true);
+  }
+
+  public static String envAsString(String key, boolean allowNull) {
+    String value = System.getenv(key);
+    if (!allowNull) {
+      Preconditions.checkNotNull(value, "os env key: %s cannot be null", key);
+    }
+    return value;
+  }
+
+  public static String envAsString(String key, String defaultValue) {
+    String value = System.getenv(key);
+    return StringUtils.isEmpty(value) ? defaultValue : value;
+  }
+
+  public static boolean envAsBoolean(String key, boolean defaultValue) {
+    String value = System.getenv(key);
+    if (StringUtils.isEmpty(value)) {
+      return defaultValue;
+    }
+    checkBoolean(key, value);
+    return Boolean.parseBoolean(value);
+  }
+
+  public static int getInt(Map<String, String> props, String key) {
+    String value = props.get(key);
+    try {
+      return Integer.parseInt(value);
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(String.format(ERROR_MSG, value, "integer", key));
+    }
+  }
+
+  public static int getInt(Map<String, String> props, String key, int defaultValue) {
+    if (!props.containsKey(key)) {
+      return defaultValue;
+    } else {
+      return getInt(props, key);
+    }
+  }
+
+  public static long getLong(Map<String, String> props, String key) {
+    String value = props.get(key);
+    try {
+      return Long.parseLong(value);
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(String.format(ERROR_MSG, value, "long", key));
+    }
+  }
+
+  public static long getLong(Map<String, String> props, String key, long defaultValue) {
+    if (!props.containsKey(key)) {
+      return defaultValue;
+    } else {
+      return getLong(props, key);
+    }
+  }
+
+  public static double getDouble(Map<String, String> props, String key) {
+    String value = props.get(key);
+    try {
+      return Double.parseDouble(value);
+    } catch (NumberFormatException e) {
+      throw new IllegalArgumentException(String.format(ERROR_MSG, value, "double", key));
+    }
+  }
+
+  public static double getDouble(Map<String, String> props, String key, double defaultValue) {
+    if (!props.containsKey(key)) {
+      return defaultValue;
+    } else {
+      return getDouble(props, key);
+    }
+  }
+
+  public static String getString(Map<String, String> props, String key) {
+    String value = props.get(key);
+    Preconditions.checkNotNull(value, "The value of config key %s is null", key);
+    return value;
+  }
+
+  public static String getString(Map<String, String> props, String key, String defaultValue) {
+    if (!props.containsKey(key)) {
+      return defaultValue;
+    } else {
+      return getString(props, key);
+    }
+  }
+
+  public static List<String> getList(Map<String, String> props, String key) {
+    String value = props.get(key);
+    Preconditions.checkNotNull(value, "The value of config key %s is null", key);
+    return Splitter.on(',').splitToStream(value).map(String::trim).collect(Collectors.toList());
+  }
+
+  public static List<String> getList(Map<String, String> props, String key, List<String> defaultValue) {
+    if (!props.containsKey(key)) {
+      return defaultValue;
+    } else {
+      return getList(props, key);
+    }
+  }
+
+  public static boolean getBoolean(Map<String, String> props, String key) {
+    String value = props.get(key);
+    checkBoolean(key, value);
+    return Boolean.parseBoolean(value);
+  }
+
+  public static boolean getBoolean(Map<String, String> props, String key, boolean defaultValue) {
+    if (!props.containsKey(key)) {
+      return defaultValue;
+    } else {
+      return getBoolean(props, key);
+    }
+  }
+
+  public static boolean isBoolean(String value) {
+    return "true".equalsIgnoreCase(value) || "false".equalsIgnoreCase(value);
+  }
+
+  public static void checkBoolean(String key, String value) {
+    if (!isBoolean(value)) {
+      throw new IllegalArgumentException(String.format(ERROR_MSG, value, "boolean", key));
+    }
+  }
+
+  public static boolean isLong(String value) {
+    try {
+      Long.parseLong(value);
+      return true;
+    } catch (NumberFormatException e) {
+      return false;
+    }
+  }
+
+  public static boolean isDouble(String value) {
+    try {
+      Double.parseDouble(value);
+      return true;
+    } catch (NumberFormatException e) {
+      return false;
+    }
+  }
+}

+ 60 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/TOSClientContextUtils.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TOSClientContextUtils {
+  public static final Logger LOG = LoggerFactory.getLogger(TOSClientContextUtils.class);
+
+  private TOSClientContextUtils() {
+  }
+
+  public static String normalizeEndpoint(String endpoint) {
+    for (String scheme : new String[]{"https://", "http://", "tos://"}) {
+      if (endpoint.startsWith(scheme)) {
+        return endpoint.substring(scheme.length());
+      }
+    }
+    return endpoint;
+  }
+
+  public static String parseRegion(String endpoint) {
+    String region = null;
+    String newEndpoint = normalizeEndpoint(endpoint);
+    String[] parts = newEndpoint.split("\\.");
+    if (parts.length == 3) {
+      // Endpoint  is formatted like 'tos-<region>.volces.com'
+      region = parts[0].replace("tos-", "");
+    } else if (parts.length == 4) {
+      // Endpoint is formatted like '<bucket>.tos-<region>.volces.com'
+      region = parts[1].replace("tos-", "");
+    } else if (parts.length == 6) {
+      // Endpoint is formatted like '<ep-id>.tos.<region>.privatelink.volces.com'
+      region = parts[2];
+    } else if (parts.length == 7) {
+      // Endpoint is formatted like '<bucket>.<ep-id>.tos.<region>.privatelink.volces.com'
+      region = parts[3];
+    }
+    LOG.debug("parse region [{}] from endpoint [{}]", region, endpoint);
+    return region;
+  }
+}
+

+ 60 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestAbstractCredentialsProvider.java

@@ -0,0 +1,60 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+
+public abstract class TestAbstractCredentialsProvider {
+  private String envAccessKeyId;
+  private String envSecretAccessKey;
+  private String envSessionToken;
+
+  protected Configuration getConf() {
+    return new Configuration();
+  }
+
+  protected void saveOsCredEnv() {
+    if (StringUtils.isNotEmpty(System.getenv(TOS.ENV_TOS_ACCESS_KEY_ID))) {
+      envAccessKeyId = System.getenv(TOS.ENV_TOS_ACCESS_KEY_ID);
+    }
+
+    if (StringUtils.isNotEmpty(System.getenv(TOS.ENV_TOS_SECRET_ACCESS_KEY))) {
+      envSecretAccessKey = System.getenv(TOS.ENV_TOS_SECRET_ACCESS_KEY);
+    }
+
+    if (StringUtils.isNotEmpty(System.getenv(TOS.ENV_TOS_SESSION_TOKEN))) {
+      envSessionToken = System.getenv(TOS.ENV_TOS_SESSION_TOKEN);
+    }
+  }
+
+  protected void resetOsCredEnv() {
+    resetOsCredEnv(TOS.ENV_TOS_ACCESS_KEY_ID, envAccessKeyId);
+    resetOsCredEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY, envSecretAccessKey);
+    resetOsCredEnv(TOS.ENV_TOS_SESSION_TOKEN, envSessionToken);
+  }
+
+  private void resetOsCredEnv(String key, String value) {
+    if (StringUtils.isNotEmpty(value)) {
+      TestUtility.setSystemEnv(key, value);
+    } else {
+      TestUtility.removeSystemEnv(key);
+    }
+  }
+}

+ 213 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestDefaultCredentialsProviderChain.java

@@ -0,0 +1,213 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import com.volcengine.tos.TosException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.tosfs.util.TestUtility.removeSystemEnv;
+import static org.apache.hadoop.fs.tosfs.util.TestUtility.setSystemEnv;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class TestDefaultCredentialsProviderChain extends TestAbstractCredentialsProvider {
+
+  private static final String MOCK_TEST_AK = "AK";
+  private static final String MOCK_TEST_SK = "SK";
+  private static final String MOCK_TEST_TST_TOKEN = "STS_TOKEN";
+
+  private static final String MOCK_TEST_AK_WITH_BUCKET = "AK_WITH_BUCKET";
+  private static final String MOCK_TEST_SK_WITH_BUCKET = "SK_WITH_BUCKET";
+  private static final String MOCK_TEST_STS_TOKEN_WITH_BUCKET = "STS_TOKEN_WITH_BUCKET";
+
+  private static final String MOCK_TEST_ENV_AK = "ENV_AK";
+  private static final String MOCK_TEST_ENV_SK = "ENV_SK";
+  private static final String MOCK_TEST_ENV_STS_TOKEN = "ENV_STS_TOKEN";
+
+  private static final String MOCK_TEST_BUCKET = "test";
+  private static final String MOCK_TEST_ROLE_NAME = "roleName";
+  private static final String MOCK_PATH = "/volcstack/latest/iam/security_credentials/";
+  private static final String API_ENDPOINT = MOCK_PATH + MOCK_TEST_ROLE_NAME;
+  private static final String EXPIRED_TIME_PATTERN = "yyyy-MM-dd'T'HH:mm:ssXXX";
+
+  @Override
+  public Configuration getConf() {
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), MOCK_TEST_AK_WITH_BUCKET);
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), MOCK_TEST_SK_WITH_BUCKET);
+    conf.set(TosKeys.FS_TOS_BUCKET_SESSION_TOKEN.key("test"), MOCK_TEST_STS_TOKEN_WITH_BUCKET);
+    conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, MOCK_TEST_AK);
+    conf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY, MOCK_TEST_SK);
+    conf.set(TosKeys.FS_TOS_SESSION_TOKEN, MOCK_TEST_TST_TOKEN);
+    return conf;
+  }
+
+  @Before
+  public void setUp() {
+    saveOsCredEnv();
+  }
+
+  @Test
+  public void testLoadCredFromEnvProvider() {
+    Configuration conf = getConf();
+    setSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID, MOCK_TEST_ENV_AK);
+    setSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY, MOCK_TEST_ENV_SK);
+    setSystemEnv(TOS.ENV_TOS_SESSION_TOKEN, MOCK_TEST_ENV_STS_TOKEN);
+    DefaultCredentialsProviderChain chain = new DefaultCredentialsProviderChain();
+    chain.initialize(conf, null);
+
+    assertEquals(String.format("expect %s", MOCK_TEST_ENV_AK), chain.credential().getAccessKeyId(),
+        MOCK_TEST_ENV_AK);
+    assertEquals(String.format("expect %s", MOCK_TEST_ENV_SK),
+        chain.credential().getAccessKeySecret(), MOCK_TEST_ENV_SK);
+    assertEquals(String.format("expect %s", MOCK_TEST_ENV_STS_TOKEN),
+        chain.credential().getSecurityToken(), MOCK_TEST_ENV_STS_TOKEN);
+    Assert.assertTrue(chain.lastUsedProvider() instanceof EnvironmentCredentialsProvider);
+  }
+
+  @Test
+  public void testLoadCredFromSimpleProviderWithBucket() {
+    Configuration conf = getConf();
+    removeSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID);
+    removeSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY);
+    removeSystemEnv(TOS.ENV_TOS_SESSION_TOKEN);
+    DefaultCredentialsProviderChain chain = new DefaultCredentialsProviderChain();
+    chain.initialize(conf, MOCK_TEST_BUCKET);
+
+    assertEquals(
+        String.format("expect %s", MOCK_TEST_AK_WITH_BUCKET),
+        chain.credential().getAccessKeyId(), MOCK_TEST_AK_WITH_BUCKET);
+    assertEquals(
+        String.format("expect %s", MOCK_TEST_SK_WITH_BUCKET),
+        chain.credential().getAccessKeySecret(), MOCK_TEST_SK_WITH_BUCKET);
+    assertEquals(
+        String.format("expect %s", MOCK_TEST_STS_TOKEN_WITH_BUCKET),
+        chain.credential().getSecurityToken(), MOCK_TEST_STS_TOKEN_WITH_BUCKET);
+    Assert.assertTrue(chain.lastUsedProvider() instanceof SimpleCredentialsProvider);
+  }
+
+  @Test
+  public void testLoadCredFromSimpleProvider() {
+    Configuration conf = getConf();
+    removeSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID);
+    removeSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY);
+    DefaultCredentialsProviderChain chain = new DefaultCredentialsProviderChain();
+    chain.initialize(conf, "test-bucket");
+
+    assertEquals(String.format("expect %s", MOCK_TEST_AK), chain.credential().getAccessKeyId(),
+        MOCK_TEST_AK);
+    assertEquals(String.format("expect %s", MOCK_TEST_SK), chain.credential().getAccessKeySecret(),
+        MOCK_TEST_SK);
+    Assert.assertTrue(chain.lastUsedProvider() instanceof SimpleCredentialsProvider);
+  }
+
+  @Test
+  public void testNotFoundAnyProvider() {
+    removeSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID);
+    removeSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY);
+    DefaultCredentialsProviderChain chain = new DefaultCredentialsProviderChain();
+    chain.initialize(new Configuration(), MOCK_TEST_BUCKET);
+    Assert.assertThrows(RuntimeException.class, chain::credential);
+  }
+
+  @After
+  public void after() {
+    resetOsCredEnv();
+  }
+
+  @Test
+  public void testShouldReturnAKSKFollowByProviderSequence() {
+    setSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID, "ENV_ACCESS_KEY");
+    setSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY, "ENV_SECRET_KEY");
+
+    // use the simple credential provider at first.
+    String providerClassesStr = SimpleCredentialsProvider.class.getName() + ','
+        + EnvironmentCredentialsProvider.class.getName();
+
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES, providerClassesStr);
+    conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, MOCK_TEST_AK);
+    conf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY, MOCK_TEST_SK);
+    conf.set(TosKeys.FS_TOS_SESSION_TOKEN, MOCK_TEST_TST_TOKEN);
+
+    DefaultCredentialsProviderChain provider = new DefaultCredentialsProviderChain();
+    provider.initialize(conf, MOCK_TEST_BUCKET);
+
+    ExpireableCredential cred = provider.createCredential();
+    assertEquals(MOCK_TEST_AK, cred.getAccessKeyId());
+    assertEquals(MOCK_TEST_SK, cred.getAccessKeySecret());
+
+    assertFalse(cred.isExpired());
+
+    // use the env credential provider at first.
+    providerClassesStr = EnvironmentCredentialsProvider.class.getName() + ','
+        + SimpleCredentialsProvider.class.getName();
+    conf.set(TosKeys.FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES, providerClassesStr);
+
+    provider = new DefaultCredentialsProviderChain();
+    provider.initialize(conf, MOCK_TEST_BUCKET);
+    cred = provider.createCredential();
+    assertEquals("ENV_ACCESS_KEY", cred.getAccessKeyId());
+    assertEquals("ENV_SECRET_KEY", cred.getAccessKeySecret());
+    assertFalse(cred.isExpired());
+
+    removeSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID);
+    removeSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY);
+  }
+
+  @Test
+  public void testShouldThrowExceptionWhenCustomClassNotFound() {
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES,
+        SimpleCredentialsProvider.class.getName() + "NotExist");
+
+    DefaultCredentialsProviderChain provider = new DefaultCredentialsProviderChain();
+    TosException tosException =
+        assertThrows(TosException.class, () -> provider.initialize(conf, null));
+    assertTrue(tosException.getCause() instanceof ClassNotFoundException);
+  }
+
+  @Test
+  public void testShouldThrowExceptionIfNoDefaultConstructorFound() {
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_CUSTOM_CREDENTIAL_PROVIDER_CLASSES,
+        TestCredentialProviderNoDefaultConstructor.class.getName());
+    DefaultCredentialsProviderChain provider = new DefaultCredentialsProviderChain();
+    RuntimeException exception =
+        assertThrows(RuntimeException.class, () -> provider.initialize(conf, null));
+    Assert.assertTrue(exception.getMessage().contains("java.lang.NoSuchMethodException"));
+  }
+
+  static class TestCredentialProviderNoDefaultConstructor extends AbstractCredentialsProvider {
+
+    TestCredentialProviderNoDefaultConstructor(String fake) {
+    }
+
+    @Override
+    protected ExpireableCredential createCredential() {
+      return null;
+    }
+  }
+}

+ 64 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestEnvironmentCredentialsProvider.java

@@ -0,0 +1,64 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestEnvironmentCredentialsProvider extends TestAbstractCredentialsProvider {
+
+  @Before
+  public void setUp() {
+    saveOsCredEnv();
+  }
+
+  @Test
+  public void testLoadAkSkFromEnvProvider() {
+    TestUtility.setSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID, "AccessKeyId");
+    TestUtility.setSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY, "SecretAccessKey");
+
+    EnvironmentCredentialsProvider provider = new EnvironmentCredentialsProvider();
+    provider.initialize(new Configuration(), null);
+
+    ExpireableCredential oldCred = provider.credential();
+    Assert.assertEquals("provider ak must be equals to env ak", oldCred.getAccessKeyId(), "AccessKeyId");
+    Assert.assertEquals("provider sk must be equals to env sk", oldCred.getAccessKeySecret(), "SecretAccessKey");
+
+    TestUtility.setSystemEnv(TOS.ENV_TOS_ACCESS_KEY_ID, "newAccessKeyId");
+    TestUtility.setSystemEnv(TOS.ENV_TOS_SECRET_ACCESS_KEY, "newSecretAccessKey");
+    TestUtility.setSystemEnv(TOS.ENV_TOS_SESSION_TOKEN, "newSessionToken");
+
+    Assert.assertFalse(oldCred.isExpired());
+
+    ExpireableCredential newCred = provider.credential();
+    Assert.assertEquals("provider ak must be equals to env ak", newCred.getAccessKeyId(), "AccessKeyId");
+    Assert.assertEquals("provider sk must be equals to env sk", newCred.getAccessKeySecret(), "SecretAccessKey");
+
+    Assert.assertFalse(newCred.isExpired());
+  }
+
+  @After
+  public void resetEnv() {
+    resetOsCredEnv();
+  }
+}
+

+ 81 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/auth/TestSimpleCredentialsProvider.java

@@ -0,0 +1,81 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos.auth;
+
+import com.volcengine.tos.auth.Credential;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestSimpleCredentialsProvider extends TestAbstractCredentialsProvider {
+
+  @Test
+  public void testStaticCredentials() {
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY, "SECRET_KEY");
+    conf.set(TosKeys.FS_TOS_SESSION_TOKEN, "STS_TOKEN");
+    SimpleCredentialsProvider provider = new SimpleCredentialsProvider();
+    provider.initialize(conf, "test");
+    Credential credentials = provider.credential();
+    Assert.assertEquals("access key must be ACCESS_KEY", "ACCESS_KEY",
+        credentials.getAccessKeyId());
+    Assert.assertEquals("secret key must be SECRET_KEY", "SECRET_KEY",
+        credentials.getAccessKeySecret());
+    Assert.assertEquals("sts token must be STS_TOKEN", "STS_TOKEN",
+        credentials.getSecurityToken());
+  }
+
+  @Test
+  public void testStaticCredentialsWithBucket() {
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY");
+    conf.set(TosKeys.FS_TOS_BUCKET_SESSION_TOKEN.key("test"), "STS_TOKEN");
+    SimpleCredentialsProvider provider = new SimpleCredentialsProvider();
+    provider.initialize(conf, "test");
+    Credential credentials = provider.credential();
+    Assert.assertEquals("access key must be ACCESS_KEY", "ACCESS_KEY",
+        credentials.getAccessKeyId());
+    Assert.assertEquals("secret key must be SECRET_KEY", "SECRET_KEY",
+        credentials.getAccessKeySecret());
+    Assert.assertEquals("sts token must be STS_TOKEN", "STS_TOKEN",
+        credentials.getSecurityToken());
+  }
+
+  @Test
+  public void testStaticCredentialsWithPriority() {
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY, "SECRET_KEY");
+    conf.set(TosKeys.FS_TOS_SESSION_TOKEN, "STS_TOKEN");
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY_BUCKET");
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY_BUCKET");
+    conf.set(TosKeys.FS_TOS_BUCKET_SESSION_TOKEN.key("test"), "STS_TOKEN_BUCKET");
+
+    SimpleCredentialsProvider provider = new SimpleCredentialsProvider();
+    provider.initialize(conf, "test");
+    Credential credentials = provider.credential();
+    Assert.assertEquals("access key must be ACCESS_KEY_BUCKET", "ACCESS_KEY_BUCKET",
+        credentials.getAccessKeyId());
+    Assert.assertEquals("secret key must be SECRET_KEY_BUCKET", "SECRET_KEY_BUCKET",
+        credentials.getAccessKeySecret());
+    Assert.assertEquals("sts token must be STS_TOKEN_BUCKET", "STS_TOKEN_BUCKET",
+        credentials.getSecurityToken());
+  }
+}

+ 189 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestUtility.java

@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+import org.apache.hadoop.util.Lists;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class TestUtility {
+  private static final String ENV_TOS_BUCKET = "TOS_BUCKET";
+  private static final String ENV_TEST_SCHEME = "TEST_SCHEME";
+  private static final String ENV_DIRECTORY_BUCKET = "DIRECTORY_BUCKET";
+  private static final String ENV_DIRECTORY_BUCKET_ENDPOINT = "DIRECTORY_BUCKET_ENDPOINT";
+  private static final Random RND = new Random(System.currentTimeMillis());
+
+  private TestUtility() {
+  }
+
+  public static byte[] rand(int size) {
+    byte[] buffer = new byte[size];
+    RND.nextBytes(buffer);
+    return buffer;
+  }
+
+  public static int randInt(int bound) {
+    return RND.nextInt(bound);
+  }
+
+  public static String randomWithChinese() {
+    return RandomStringUtils.random(10, 0x4e00, 0x9fa5, false, false);
+  }
+
+  public static String createUniquePath(String scheme) {
+    String bucket = bucket();
+    if (bucket != null) {
+      return String.format("%s://%s/%s-%s/", scheme, bucket, scheme, UUIDUtils.random());
+    } else {
+      throw new IllegalStateException("OS test bucket is not available");
+    }
+  }
+
+  public static String defaultFs() {
+    return String.format("%s://%s/", scheme(), bucket());
+  }
+
+  public static String scheme() {
+    return ParseUtils.envAsString(ENV_TEST_SCHEME, "tos");
+  }
+
+  public static String bucket() {
+    String bucket = ParseUtils.envAsString(ENV_TOS_BUCKET);
+    if (bucket != null) {
+      return bucket;
+    }
+
+    // Parse from endpoint if it is formatted like http[s]://<bucket>.<region>.xxx.com
+    String endpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT);
+    if (endpoint != null) {
+      for (String scheme : Lists.newArrayList("http://", "https://")) {
+        if (endpoint.startsWith(scheme)) {
+          endpoint = endpoint.substring(scheme.length());
+        }
+      }
+
+      String[] elements = endpoint.split("\\.");
+      if (elements.length == 4) {
+        return elements[0];
+      }
+    }
+    throw new RuntimeException("Cannot decide the bucket name for object storage with scheme 'tos'");
+  }
+
+  public static String region() {
+    return TOSClientContextUtils.parseRegion(ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT));
+  }
+
+  public static String endpoint() {
+    return ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT);
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void setSystemEnv(String key, String value) {
+    try {
+      Map<String, String> env = System.getenv();
+      Class<?> cl = env.getClass();
+      Field field = cl.getDeclaredField("m");
+      field.setAccessible(true);
+      Map<String, String> writableEnv = (Map<String, String>) field.get(env);
+      writableEnv.put(key, value);
+    } catch (Exception e) {
+      throw new IllegalStateException("Failed to set environment variable", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public static void removeSystemEnv(String key) {
+    try {
+      Map<String, String> env = System.getenv();
+      Class<?> cl = env.getClass();
+      Field field = cl.getDeclaredField("m");
+      field.setAccessible(true);
+      Map<String, String> writableEnv = (Map<String, String>) field.get(env);
+      writableEnv.remove(key);
+    } catch (Exception e) {
+      throw new IllegalStateException(String.format("Failed to remove environment variable: %s", key), e);
+    }
+  }
+
+  public static FileContext createTestFileContext(Configuration conf) throws IOException {
+    URI testURI = URI.create(defaultFs());
+    return FileContext.getFileContext(testURI, conf);
+  }
+
+  private static ObjectStorage generalBucketObjectStorage() {
+    Configuration conf = new Configuration();
+    String endpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, "");
+    if (!StringUtils.isEmpty(endpoint)) {
+      conf.set(ConfKeys.FS_TOS_ENDPOINT.key(scheme()), endpoint);
+    }
+
+    return ObjectStorageFactory.createWithPrefix(
+        String.format("%s-%s/", scheme(), UUIDUtils.random()), scheme(), bucket(), conf);
+  }
+
+  public static ObjectStorage directoryBucketObjectStorage(Configuration conf) {
+    String bucket = ParseUtils.envAsString(ENV_DIRECTORY_BUCKET);
+    if (StringUtils.isEmpty(bucket)) {
+      return null;
+    } else {
+      String endpoint = ParseUtils.envAsString(ENV_DIRECTORY_BUCKET_ENDPOINT, "");
+      if (!StringUtils.isEmpty(endpoint)) {
+        conf.set(ConfKeys.FS_TOS_ENDPOINT.key(scheme()), endpoint);
+      }
+      return ObjectStorageFactory.createWithPrefix(
+          String.format("%s-%s/", scheme(), UUIDUtils.random()), scheme(), bucket, conf);
+    }
+  }
+
+  public static List<ObjectStorage> createTestObjectStorage(String fileStoreRoot) {
+    List<ObjectStorage> storages = new ArrayList<>();
+
+    // 1. FileStore
+    Configuration fileStoreConf = new Configuration();
+    fileStoreConf.set(ConfKeys.FS_TOS_ENDPOINT.key("filestore"), fileStoreRoot);
+    storages.add(ObjectStorageFactory.create("filestore", TestUtility.bucket(), fileStoreConf));
+
+    // 2. General Bucket
+    storages.add(generalBucketObjectStorage());
+
+    // 3. Directory Bucket is optional
+    ObjectStorage directoryObjectStorage = directoryBucketObjectStorage(new Configuration());
+    if (directoryObjectStorage != null) {
+      storages.add(directoryObjectStorage);
+    }
+
+    return storages;
+  }
+}
+