Browse Source

Integration of TOS: Add TOS.

lijinglun 10 months ago
parent
commit
3a4966f525

+ 62 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java

@@ -0,0 +1,62 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.common;
+
+// TODO: Remove this class?
+public class Bytes {
+  private Bytes() {
+  }
+
+  public static final byte[] EMPTY_BYTES = new byte[0];
+
+  // Encode basic Java types into big-endian binaries.
+
+  public static byte[] toBytes(boolean b) {
+    return new byte[]{b ? (byte) -1 : (byte) 0};
+  }
+
+  public static byte[] toBytes(byte b) {
+    return new byte[]{b};
+  }
+
+  public static byte[] toBytes(short val) {
+    byte[] b = new byte[2];
+    for (int i = 1; i >= 0; i--) {
+      b[i] = (byte) val;
+      val >>>= 8;
+    }
+    return b;
+  }
+
+  public static byte[] toBytes(int val) {
+    byte[] b = new byte[4];
+    for (int i = 3; i >= 0; i--) {
+      b[i] = (byte) val;
+      val >>>= 8;
+    }
+    return b;
+  }
+
+  public static byte[] toBytes(long val) {
+    byte[] b = new byte[8];
+    for (int i = 7; i >= 0; i--) {
+      b[i] = (byte) val;
+      val >>>= 8;
+    }
+    return b;
+  }
+}

+ 1 - 1
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/PrefixStorage.java

@@ -18,12 +18,12 @@
 
 package org.apache.hadoop.fs.tosfs.object;
 
-import com.google.common.collect.Iterables;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
 import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
 import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hadoop.util.Preconditions;
 
 import java.io.IOException;

+ 137 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/ChainTOSInputStream.java

@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import org.apache.hadoop.fs.tosfs.common.Chain;
+import org.apache.hadoop.util.Preconditions;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ChainTOSInputStream extends InputStream {
+  private final Chain<TOSInputStream> chain;
+  private final TOS.GetObjectFactory factory;
+  private final String key;
+  private long curOff;
+  private final long endOff; // range end offset (inclusive)
+  private final long maxDrainByteSize;
+  private final int maxInputStreamRetries;
+
+  private int readBytes;
+  private long skipped;
+  private byte[] objChecksum = null;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  public ChainTOSInputStream(
+      TOS.GetObjectFactory factory,
+      String key,
+      long startOff,
+      long endOff,
+      long maxDrainByteSize,
+      int maxInputStreamRetries) {
+    this.factory = factory;
+    this.key = key;
+    this.curOff = startOff;
+    this.endOff = endOff;
+    this.maxDrainByteSize = maxDrainByteSize;
+    this.maxInputStreamRetries = maxInputStreamRetries;
+    this.chain = createChain();
+    Preconditions.checkNotNull(objChecksum, "Checksum should not be null.");
+  }
+
+  private Chain<TOSInputStream> createChain() {
+    Chain.Builder<TOSInputStream> builder = Chain.<TOSInputStream>builder()
+        .shouldContinue(e -> !(e instanceof EOFException));
+
+    for (int i = 0; i <= maxInputStreamRetries; i++) {
+      builder.addLast(() -> {
+        GetObjectOutput output = factory.create(key, curOff, endOff);
+
+        // Note: If there are some IO errors occur, the ChainTOSInputStream will create a new stream in the chain to
+        // continue reading object data, we need to record the checksum during first open object stream, and ensure the
+        // checksum of object stream won't be changed if opening object many times within the lifecycle of the chained
+        // stream in case the underlying object is changed.
+        if (objChecksum == null) {
+          // Init the stream checksum.
+          objChecksum = output.checksum();
+        }
+        return new TOSInputStream(output, curOff, endOff, maxDrainByteSize, objChecksum);
+      });
+    }
+
+    try {
+      return builder.build();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    skipped = 0;
+    return chain.run(stream -> {
+      long skip = stream.skip(n - skipped);
+
+      curOff += skip;
+      skipped += skip;
+      return skipped;
+    });
+  }
+
+  @Override
+  public int read() throws IOException {
+    return chain.run(stream -> {
+      int ret = stream.read();
+      curOff++;
+      return ret;
+    });
+  }
+
+  @Override
+  public int available() throws IOException {
+    return chain.run(InputStream::available);
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) throws IOException {
+    readBytes = 0;
+    return chain.run(in -> {
+      int read = in.read(b, off + readBytes, len - readBytes);
+
+      readBytes += read;
+      curOff += read;
+      return readBytes;
+    });
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed.compareAndSet(false, true)) {
+      chain.close();
+    }
+  }
+
+  public byte[] checksum() {
+    return objChecksum;
+  }
+}
+

+ 1231 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClient.java

@@ -0,0 +1,1231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.TOSClientConfiguration;
+import com.volcengine.tos.TOSV2ClientBuilder;
+import com.volcengine.tos.TosClientException;
+import com.volcengine.tos.TosException;
+import com.volcengine.tos.TosServerException;
+import com.volcengine.tos.auth.Credential;
+import com.volcengine.tos.auth.Credentials;
+import com.volcengine.tos.TOSV2;
+import com.volcengine.tos.comm.HttpStatus;
+import com.volcengine.tos.comm.common.ACLType;
+import com.volcengine.tos.internal.RequestOptionsBuilder;
+import com.volcengine.tos.model.acl.GetObjectAclOutput;
+import com.volcengine.tos.model.acl.PutObjectAclInput;
+import com.volcengine.tos.model.acl.PutObjectAclOutput;
+import com.volcengine.tos.model.bucket.CreateBucketInput;
+import com.volcengine.tos.model.bucket.CreateBucketOutput;
+import com.volcengine.tos.model.bucket.CreateBucketV2Input;
+import com.volcengine.tos.model.bucket.CreateBucketV2Output;
+import com.volcengine.tos.model.bucket.DeleteBucketCORSInput;
+import com.volcengine.tos.model.bucket.DeleteBucketCORSOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketCustomDomainInput;
+import com.volcengine.tos.model.bucket.DeleteBucketCustomDomainOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketEncryptionInput;
+import com.volcengine.tos.model.bucket.DeleteBucketEncryptionOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketInput;
+import com.volcengine.tos.model.bucket.DeleteBucketInventoryInput;
+import com.volcengine.tos.model.bucket.DeleteBucketInventoryOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketLifecycleInput;
+import com.volcengine.tos.model.bucket.DeleteBucketLifecycleOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketMirrorBackInput;
+import com.volcengine.tos.model.bucket.DeleteBucketMirrorBackOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketPolicyInput;
+import com.volcengine.tos.model.bucket.DeleteBucketPolicyOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketRealTimeLogInput;
+import com.volcengine.tos.model.bucket.DeleteBucketRealTimeLogOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketRenameInput;
+import com.volcengine.tos.model.bucket.DeleteBucketRenameOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketReplicationInput;
+import com.volcengine.tos.model.bucket.DeleteBucketReplicationOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketTaggingInput;
+import com.volcengine.tos.model.bucket.DeleteBucketTaggingOutput;
+import com.volcengine.tos.model.bucket.DeleteBucketWebsiteInput;
+import com.volcengine.tos.model.bucket.DeleteBucketWebsiteOutput;
+import com.volcengine.tos.model.bucket.GetBucketACLInput;
+import com.volcengine.tos.model.bucket.GetBucketACLOutput;
+import com.volcengine.tos.model.bucket.GetBucketCORSInput;
+import com.volcengine.tos.model.bucket.GetBucketCORSOutput;
+import com.volcengine.tos.model.bucket.GetBucketEncryptionInput;
+import com.volcengine.tos.model.bucket.GetBucketEncryptionOutput;
+import com.volcengine.tos.model.bucket.GetBucketInventoryInput;
+import com.volcengine.tos.model.bucket.GetBucketInventoryOutput;
+import com.volcengine.tos.model.bucket.GetBucketLifecycleInput;
+import com.volcengine.tos.model.bucket.GetBucketLifecycleOutput;
+import com.volcengine.tos.model.bucket.GetBucketLocationInput;
+import com.volcengine.tos.model.bucket.GetBucketLocationOutput;
+import com.volcengine.tos.model.bucket.GetBucketMirrorBackInput;
+import com.volcengine.tos.model.bucket.GetBucketMirrorBackOutput;
+import com.volcengine.tos.model.bucket.GetBucketNotificationInput;
+import com.volcengine.tos.model.bucket.GetBucketNotificationOutput;
+import com.volcengine.tos.model.bucket.GetBucketNotificationType2Input;
+import com.volcengine.tos.model.bucket.GetBucketNotificationType2Output;
+import com.volcengine.tos.model.bucket.GetBucketPolicyInput;
+import com.volcengine.tos.model.bucket.GetBucketPolicyOutput;
+import com.volcengine.tos.model.bucket.GetBucketRealTimeLogInput;
+import com.volcengine.tos.model.bucket.GetBucketRealTimeLogOutput;
+import com.volcengine.tos.model.bucket.GetBucketRenameInput;
+import com.volcengine.tos.model.bucket.GetBucketRenameOutput;
+import com.volcengine.tos.model.bucket.GetBucketReplicationInput;
+import com.volcengine.tos.model.bucket.GetBucketReplicationOutput;
+import com.volcengine.tos.model.bucket.GetBucketTaggingInput;
+import com.volcengine.tos.model.bucket.GetBucketTaggingOutput;
+import com.volcengine.tos.model.bucket.GetBucketVersioningInput;
+import com.volcengine.tos.model.bucket.GetBucketVersioningOutput;
+import com.volcengine.tos.model.bucket.GetBucketWebsiteInput;
+import com.volcengine.tos.model.bucket.GetBucketWebsiteOutput;
+import com.volcengine.tos.model.bucket.HeadBucketOutput;
+import com.volcengine.tos.model.bucket.HeadBucketV2Input;
+import com.volcengine.tos.model.bucket.HeadBucketV2Output;
+import com.volcengine.tos.model.bucket.ListBucketCustomDomainInput;
+import com.volcengine.tos.model.bucket.ListBucketCustomDomainOutput;
+import com.volcengine.tos.model.bucket.ListBucketInventoryInput;
+import com.volcengine.tos.model.bucket.ListBucketInventoryOutput;
+import com.volcengine.tos.model.bucket.ListBucketsInput;
+import com.volcengine.tos.model.bucket.ListBucketsOutput;
+import com.volcengine.tos.model.bucket.ListBucketsV2Input;
+import com.volcengine.tos.model.bucket.ListBucketsV2Output;
+import com.volcengine.tos.model.bucket.PutBucketACLInput;
+import com.volcengine.tos.model.bucket.PutBucketACLOutput;
+import com.volcengine.tos.model.bucket.PutBucketCORSInput;
+import com.volcengine.tos.model.bucket.PutBucketCORSOutput;
+import com.volcengine.tos.model.bucket.PutBucketCustomDomainInput;
+import com.volcengine.tos.model.bucket.PutBucketCustomDomainOutput;
+import com.volcengine.tos.model.bucket.PutBucketEncryptionInput;
+import com.volcengine.tos.model.bucket.PutBucketEncryptionOutput;
+import com.volcengine.tos.model.bucket.PutBucketInventoryInput;
+import com.volcengine.tos.model.bucket.PutBucketInventoryOutput;
+import com.volcengine.tos.model.bucket.PutBucketLifecycleInput;
+import com.volcengine.tos.model.bucket.PutBucketLifecycleOutput;
+import com.volcengine.tos.model.bucket.PutBucketMirrorBackInput;
+import com.volcengine.tos.model.bucket.PutBucketMirrorBackOutput;
+import com.volcengine.tos.model.bucket.PutBucketNotificationInput;
+import com.volcengine.tos.model.bucket.PutBucketNotificationOutput;
+import com.volcengine.tos.model.bucket.PutBucketNotificationType2Input;
+import com.volcengine.tos.model.bucket.PutBucketNotificationType2Output;
+import com.volcengine.tos.model.bucket.PutBucketPolicyInput;
+import com.volcengine.tos.model.bucket.PutBucketPolicyOutput;
+import com.volcengine.tos.model.bucket.PutBucketRealTimeLogInput;
+import com.volcengine.tos.model.bucket.PutBucketRealTimeLogOutput;
+import com.volcengine.tos.model.bucket.PutBucketRenameInput;
+import com.volcengine.tos.model.bucket.PutBucketRenameOutput;
+import com.volcengine.tos.model.bucket.PutBucketReplicationInput;
+import com.volcengine.tos.model.bucket.PutBucketReplicationOutput;
+import com.volcengine.tos.model.bucket.PutBucketStorageClassInput;
+import com.volcengine.tos.model.bucket.PutBucketStorageClassOutput;
+import com.volcengine.tos.model.bucket.PutBucketTaggingInput;
+import com.volcengine.tos.model.bucket.PutBucketTaggingOutput;
+import com.volcengine.tos.model.bucket.PutBucketVersioningInput;
+import com.volcengine.tos.model.bucket.PutBucketVersioningOutput;
+import com.volcengine.tos.model.bucket.PutBucketWebsiteInput;
+import com.volcengine.tos.model.bucket.PutBucketWebsiteOutput;
+import com.volcengine.tos.model.object.AbortMultipartUploadInput;
+import com.volcengine.tos.model.object.AbortMultipartUploadOutput;
+import com.volcengine.tos.model.object.AppendObjectInput;
+import com.volcengine.tos.model.object.AppendObjectOutput;
+import com.volcengine.tos.model.object.CompleteMultipartUploadInput;
+import com.volcengine.tos.model.object.CompleteMultipartUploadOutput;
+import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input;
+import com.volcengine.tos.model.object.CompleteMultipartUploadV2Output;
+import com.volcengine.tos.model.object.CopyObjectOutput;
+import com.volcengine.tos.model.object.CopyObjectV2Input;
+import com.volcengine.tos.model.object.CopyObjectV2Output;
+import com.volcengine.tos.model.object.CreateMultipartUploadInput;
+import com.volcengine.tos.model.object.CreateMultipartUploadOutput;
+import com.volcengine.tos.model.object.DeleteMultiObjectsInput;
+import com.volcengine.tos.model.object.DeleteMultiObjectsOutput;
+import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input;
+import com.volcengine.tos.model.object.DeleteMultiObjectsV2Output;
+import com.volcengine.tos.model.object.DeleteObjectInput;
+import com.volcengine.tos.model.object.DeleteObjectOutput;
+import com.volcengine.tos.model.object.DeleteObjectTaggingInput;
+import com.volcengine.tos.model.object.DeleteObjectTaggingOutput;
+import com.volcengine.tos.model.object.DownloadFileInput;
+import com.volcengine.tos.model.object.DownloadFileOutput;
+import com.volcengine.tos.model.object.FetchObjectInput;
+import com.volcengine.tos.model.object.FetchObjectOutput;
+import com.volcengine.tos.model.object.GetFetchTaskInput;
+import com.volcengine.tos.model.object.GetFetchTaskOutput;
+import com.volcengine.tos.model.object.GetFileStatusInput;
+import com.volcengine.tos.model.object.GetFileStatusOutput;
+import com.volcengine.tos.model.object.GetObjectACLV2Input;
+import com.volcengine.tos.model.object.GetObjectACLV2Output;
+import com.volcengine.tos.model.object.GetObjectOutput;
+import com.volcengine.tos.model.object.GetObjectTaggingInput;
+import com.volcengine.tos.model.object.GetObjectTaggingOutput;
+import com.volcengine.tos.model.object.GetObjectToFileInput;
+import com.volcengine.tos.model.object.GetObjectToFileOutput;
+import com.volcengine.tos.model.object.GetObjectV2Input;
+import com.volcengine.tos.model.object.GetObjectV2Output;
+import com.volcengine.tos.model.object.GetSymlinkInput;
+import com.volcengine.tos.model.object.GetSymlinkOutput;
+import com.volcengine.tos.model.object.HeadObjectOutput;
+import com.volcengine.tos.model.object.HeadObjectV2Input;
+import com.volcengine.tos.model.object.HeadObjectV2Output;
+import com.volcengine.tos.model.object.ListMultipartUploadsInput;
+import com.volcengine.tos.model.object.ListMultipartUploadsOutput;
+import com.volcengine.tos.model.object.ListMultipartUploadsV2Input;
+import com.volcengine.tos.model.object.ListMultipartUploadsV2Output;
+import com.volcengine.tos.model.object.ListObjectVersionsInput;
+import com.volcengine.tos.model.object.ListObjectVersionsOutput;
+import com.volcengine.tos.model.object.ListObjectVersionsV2Input;
+import com.volcengine.tos.model.object.ListObjectVersionsV2Output;
+import com.volcengine.tos.model.object.ListObjectsInput;
+import com.volcengine.tos.model.object.ListObjectsOutput;
+import com.volcengine.tos.model.object.ListObjectsType2Input;
+import com.volcengine.tos.model.object.ListObjectsType2Output;
+import com.volcengine.tos.model.object.ListObjectsV2Input;
+import com.volcengine.tos.model.object.ListObjectsV2Output;
+import com.volcengine.tos.model.object.ListPartsInput;
+import com.volcengine.tos.model.object.ListPartsOutput;
+import com.volcengine.tos.model.object.ListUploadedPartsInput;
+import com.volcengine.tos.model.object.ListUploadedPartsOutput;
+import com.volcengine.tos.model.object.ObjectMetaRequestOptions;
+import com.volcengine.tos.model.object.PreSignedPolicyURLInput;
+import com.volcengine.tos.model.object.PreSignedPolicyURLOutput;
+import com.volcengine.tos.model.object.PreSignedPostSignatureInput;
+import com.volcengine.tos.model.object.PreSignedPostSignatureOutput;
+import com.volcengine.tos.model.object.PreSignedURLInput;
+import com.volcengine.tos.model.object.PreSignedURLOutput;
+import com.volcengine.tos.model.object.PreSingedPolicyURLInput;
+import com.volcengine.tos.model.object.PreSingedPolicyURLOutput;
+import com.volcengine.tos.model.object.PutFetchTaskInput;
+import com.volcengine.tos.model.object.PutFetchTaskOutput;
+import com.volcengine.tos.model.object.PutObjectACLInput;
+import com.volcengine.tos.model.object.PutObjectACLOutput;
+import com.volcengine.tos.model.object.PutObjectFromFileInput;
+import com.volcengine.tos.model.object.PutObjectFromFileOutput;
+import com.volcengine.tos.model.object.PutObjectInput;
+import com.volcengine.tos.model.object.PutObjectOutput;
+import com.volcengine.tos.model.object.PutObjectTaggingInput;
+import com.volcengine.tos.model.object.PutObjectTaggingOutput;
+import com.volcengine.tos.model.object.PutSymlinkInput;
+import com.volcengine.tos.model.object.PutSymlinkOutput;
+import com.volcengine.tos.model.object.RenameObjectInput;
+import com.volcengine.tos.model.object.RenameObjectOutput;
+import com.volcengine.tos.model.object.RestoreObjectInput;
+import com.volcengine.tos.model.object.RestoreObjectOutput;
+import com.volcengine.tos.model.object.ResumableCopyObjectInput;
+import com.volcengine.tos.model.object.ResumableCopyObjectOutput;
+import com.volcengine.tos.model.object.SetObjectMetaInput;
+import com.volcengine.tos.model.object.SetObjectMetaOutput;
+import com.volcengine.tos.model.object.UploadFileInput;
+import com.volcengine.tos.model.object.UploadFileOutput;
+import com.volcengine.tos.model.object.UploadFileV2Input;
+import com.volcengine.tos.model.object.UploadFileV2Output;
+import com.volcengine.tos.model.object.UploadPartCopyInput;
+import com.volcengine.tos.model.object.UploadPartCopyOutput;
+import com.volcengine.tos.model.object.UploadPartCopyV2Input;
+import com.volcengine.tos.model.object.UploadPartCopyV2Output;
+import com.volcengine.tos.model.object.UploadPartFromFileInput;
+import com.volcengine.tos.model.object.UploadPartFromFileOutput;
+import com.volcengine.tos.model.object.UploadPartInput;
+import com.volcengine.tos.model.object.UploadPartOutput;
+import com.volcengine.tos.model.object.UploadPartV2Input;
+import com.volcengine.tos.model.object.UploadPartV2Output;
+import com.volcengine.tos.transport.TransportConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.tosfs.object.InputStreamProvider;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.fs.tosfs.util.RetryableUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Throwables;
+import org.apache.hadoop.thirdparty.com.google.common.io.CountingInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.time.Duration;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.Callable;
+import javax.net.ssl.SSLException;
+
+public class DelegationClient implements TOSV2 {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DelegationClient.class);
+
+  private final Credentials provider;
+  private final TOSClientConfiguration config;
+  private int maxRetryTimes;
+  private TOSV2 client;
+  private volatile Credential usedCredential;
+  private final List<String> nonRetryable409ErrorCodes;
+
+  protected DelegationClient(
+      TOSClientConfiguration configuration, int maxRetryTimes, List<String> nonRetryable409ErrorCodes) {
+    this.config = configuration;
+    this.maxRetryTimes = maxRetryTimes;
+    this.provider = configuration.getCredentials();
+    this.usedCredential = provider.credential();
+    this.client = new TOSV2ClientBuilder().build(configuration);
+    this.nonRetryable409ErrorCodes = nonRetryable409ErrorCodes;
+  }
+
+  @VisibleForTesting
+  void setClient(TOSV2 client) {
+    this.client = client;
+  }
+
+  public TOSV2 client() {
+    return client;
+  }
+
+  @VisibleForTesting
+  void setMaxRetryTimes(int maxRetryTimes) {
+    this.maxRetryTimes = maxRetryTimes;
+  }
+
+  public int maxRetryTimes() {
+    return maxRetryTimes;
+  }
+
+  public TOSClientConfiguration config() {
+    return config;
+  }
+
+  public Credential usedCredential() {
+    return usedCredential;
+  }
+
+  @Override
+  public CreateBucketV2Output createBucket(String bucket) throws TosException {
+    return retry(() -> client.createBucket(bucket));
+  }
+
+  @Override
+  public CreateBucketV2Output createBucket(CreateBucketV2Input input) throws TosException {
+    return retry(() -> client.createBucket(input));
+  }
+
+  @Override
+  public HeadBucketV2Output headBucket(HeadBucketV2Input input) throws TosException {
+    return retry(() -> client.headBucket(input));
+  }
+
+  @Override
+  public DeleteBucketOutput deleteBucket(DeleteBucketInput input) throws TosException {
+    return retry(() -> client.deleteBucket(input));
+  }
+
+  @Override
+  public ListBucketsV2Output listBuckets(ListBucketsV2Input input) throws TosException {
+    return retry(() -> client.listBuckets(input));
+  }
+
+  @Override
+  public CreateBucketOutput createBucket(CreateBucketInput input) throws TosException {
+    return retry(() -> client.createBucket(input));
+  }
+
+  @Override
+  public HeadBucketOutput headBucket(String bucket) throws TosException {
+    return retry(() -> client.headBucket(bucket));
+  }
+
+  @Override
+  public DeleteBucketOutput deleteBucket(String bucket) throws TosException {
+    return retry(() -> client.deleteBucket(bucket));
+  }
+
+  @Override
+  public ListBucketsOutput listBuckets(ListBucketsInput input) throws TosException {
+    return retry(() -> client.listBuckets(input));
+  }
+
+  @Override
+  public PutBucketPolicyOutput putBucketPolicy(String bucket, String policy) throws TosException {
+    return retry(() -> client.putBucketPolicy(bucket, policy));
+  }
+
+  @Override
+  public PutBucketPolicyOutput putBucketPolicy(PutBucketPolicyInput input) throws TosException {
+    return retry(() -> client.putBucketPolicy(input));
+  }
+
+  @Override
+  public GetBucketPolicyOutput getBucketPolicy(String bucket) throws TosException {
+    return retry(() -> client.getBucketPolicy(bucket));
+  }
+
+  @Override
+  public GetBucketPolicyOutput getBucketPolicy(GetBucketPolicyInput input) throws TosException {
+    return retry(() -> client.getBucketPolicy(input));
+  }
+
+  @Override
+  public DeleteBucketPolicyOutput deleteBucketPolicy(String bucket) throws TosException {
+    return retry(() -> client.deleteBucketPolicy(bucket));
+  }
+
+  @Override
+  public GetObjectOutput getObject(String bucket, String objectKey,
+      RequestOptionsBuilder... builders) throws TosException {
+    return retry(() -> client.getObject(bucket, objectKey, builders));
+  }
+
+  @Override
+  public HeadObjectOutput headObject(String bucket, String objectKey,
+      RequestOptionsBuilder... builders) throws TosException {
+    return retry(() -> client.headObject(bucket, objectKey, builders));
+  }
+
+  @Override
+  public DeleteObjectOutput deleteObject(String bucket, String objectKey,
+      RequestOptionsBuilder... builders) throws TosException {
+    return retry(() -> client.deleteObject(bucket, objectKey, builders));
+  }
+
+  @Override
+  public DeleteMultiObjectsOutput deleteMultiObjects(
+      String bucket,
+      DeleteMultiObjectsInput input,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.deleteMultiObjects(bucket, input, builders));
+  }
+
+  @Override
+  public PutObjectOutput putObject(
+      String bucket, String objectKey, InputStream inputStream,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public UploadFileOutput uploadFile(
+      String bucket, UploadFileInput input,
+      RequestOptionsBuilder... builders) throws TosException {
+    return retry(() -> client.uploadFile(bucket, input, builders));
+  }
+
+  @Override
+  public AppendObjectOutput appendObject(
+      String bucket, String objectKey, InputStream content, long offset,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public SetObjectMetaOutput setObjectMeta(String bucket, String objectKey, RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.setObjectMeta(bucket, objectKey, builders));
+  }
+
+  @Override
+  public ListObjectsOutput listObjects(String bucket, ListObjectsInput input) throws TosException {
+    return retry(() -> client.listObjects(bucket, input));
+  }
+
+  @Override
+  public ListObjectVersionsOutput listObjectVersions(String bucket, ListObjectVersionsInput input) throws TosException {
+    return retry(() -> client.listObjectVersions(bucket, input));
+  }
+
+  @Override
+  public CopyObjectOutput copyObject(
+      String bucket, String srcObjectKey, String dstObjectKey,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.copyObject(bucket, srcObjectKey, dstObjectKey, builders));
+  }
+
+  @Override
+  public CopyObjectOutput copyObjectTo(
+      String bucket, String dstBucket, String dstObjectKey,
+      String srcObjectKey,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.copyObjectTo(bucket, dstBucket, dstObjectKey, srcObjectKey, builders));
+  }
+
+  @Override
+  public CopyObjectOutput copyObjectFrom(
+      String bucket, String srcBucket, String srcObjectKey, String dstObjectKey,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.copyObjectFrom(bucket, srcBucket, srcObjectKey, dstObjectKey, builders));
+  }
+
+  @Override
+  public UploadPartCopyOutput uploadPartCopy(
+      String bucket, UploadPartCopyInput input,
+      RequestOptionsBuilder... builders) throws TosException {
+    return retry(() -> client.uploadPartCopy(bucket, input, builders));
+  }
+
+  @Override
+  public PutObjectAclOutput putObjectAcl(String bucket, PutObjectAclInput input) throws TosException {
+    return retry(() -> client.putObjectAcl(bucket, input));
+  }
+
+  @Override
+  public GetObjectAclOutput getObjectAcl(
+      String bucket, String objectKey,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.getObjectAcl(bucket, objectKey, builders));
+  }
+
+  @Override
+  public CreateMultipartUploadOutput createMultipartUpload(
+      String bucket, String objectKey,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.createMultipartUpload(bucket, objectKey, builders));
+  }
+
+  @Override
+  public UploadPartOutput uploadPart(
+      String bucket, UploadPartInput input,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public CompleteMultipartUploadOutput completeMultipartUpload(
+      String bucket,
+      CompleteMultipartUploadInput input)
+      throws TosException {
+    return retry(() -> client.completeMultipartUpload(bucket, input));
+  }
+
+  @Override
+  public AbortMultipartUploadOutput abortMultipartUpload(
+      String bucket,
+      AbortMultipartUploadInput input)
+      throws TosException {
+    return retry(() -> client.abortMultipartUpload(bucket, input));
+  }
+
+  @Override
+  public ListUploadedPartsOutput listUploadedParts(
+      String bucket,
+      ListUploadedPartsInput input,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.listUploadedParts(bucket, input, builders));
+  }
+
+  @Override
+  public ListMultipartUploadsOutput listMultipartUploads(
+      String bucket,
+      ListMultipartUploadsInput input)
+      throws TosException {
+    return retry(() -> client.listMultipartUploads(bucket, input));
+  }
+
+  @Override
+  public String preSignedURL(
+      String httpMethod, String bucket, String objectKey, Duration ttl,
+      RequestOptionsBuilder... builders)
+      throws TosException {
+    return retry(() -> client.preSignedURL(httpMethod, bucket, objectKey, ttl, builders));
+  }
+
+  @Override
+  public DeleteBucketPolicyOutput deleteBucketPolicy(DeleteBucketPolicyInput input)
+      throws TosException {
+    return retry(() -> client.deleteBucketPolicy(input));
+  }
+
+  @Override
+  public PutBucketCORSOutput putBucketCORS(PutBucketCORSInput input)
+      throws TosException {
+    return retry(() -> client.putBucketCORS(input));
+  }
+
+  @Override
+  public GetBucketCORSOutput getBucketCORS(GetBucketCORSInput input)
+      throws TosException {
+    return retry(() -> client.getBucketCORS(input));
+  }
+
+  @Override
+  public DeleteBucketCORSOutput deleteBucketCORS(DeleteBucketCORSInput input)
+      throws TosException {
+    return retry(() -> client.deleteBucketCORS(input));
+  }
+
+  @Override
+  public PutBucketStorageClassOutput putBucketStorageClass(PutBucketStorageClassInput input)
+      throws TosException {
+    return retry(() -> client.putBucketStorageClass(input));
+  }
+
+  @Override
+  public GetBucketLocationOutput getBucketLocation(GetBucketLocationInput input)
+      throws TosException {
+    return retry(() -> client.getBucketLocation(input));
+  }
+
+  @Override
+  public PutBucketLifecycleOutput putBucketLifecycle(PutBucketLifecycleInput input)
+      throws TosException {
+    return retry(() -> client.putBucketLifecycle(input));
+  }
+
+  @Override
+  public GetBucketLifecycleOutput getBucketLifecycle(GetBucketLifecycleInput input)
+      throws TosException {
+    return retry(() -> client.getBucketLifecycle(input));
+  }
+
+  @Override
+  public DeleteBucketLifecycleOutput deleteBucketLifecycle(DeleteBucketLifecycleInput input)
+      throws TosException {
+    return retry(() -> client.deleteBucketLifecycle(input));
+  }
+
+  @Override
+  public PutBucketMirrorBackOutput putBucketMirrorBack(PutBucketMirrorBackInput input)
+      throws TosException {
+    return retry(() -> client.putBucketMirrorBack(input));
+  }
+
+  @Override
+  public GetBucketMirrorBackOutput getBucketMirrorBack(GetBucketMirrorBackInput input)
+      throws TosException {
+    return retry(() -> client.getBucketMirrorBack(input));
+  }
+
+  @Override
+  public DeleteBucketMirrorBackOutput deleteBucketMirrorBack(DeleteBucketMirrorBackInput input)
+      throws TosException {
+    return retry(() -> client.deleteBucketMirrorBack(input));
+  }
+
+  @Override
+  public PutBucketReplicationOutput putBucketReplication(PutBucketReplicationInput input)
+      throws TosException {
+    return retry(() -> client.putBucketReplication(input));
+  }
+
+  @Override
+  public GetBucketReplicationOutput getBucketReplication(GetBucketReplicationInput input)
+      throws TosException {
+    return retry(() -> client.getBucketReplication(input));
+  }
+
+  @Override
+  public DeleteBucketReplicationOutput deleteBucketReplication(DeleteBucketReplicationInput input)
+      throws TosException {
+    return retry(() -> client.deleteBucketReplication(input));
+  }
+
+  @Override
+  public PutBucketVersioningOutput putBucketVersioning(PutBucketVersioningInput input)
+      throws TosException {
+    return retry(() -> client.putBucketVersioning(input));
+  }
+
+  @Override
+  public GetBucketVersioningOutput getBucketVersioning(GetBucketVersioningInput input)
+      throws TosException {
+    return retry(() -> client.getBucketVersioning(input));
+  }
+
+  @Override
+  public PutBucketWebsiteOutput putBucketWebsite(PutBucketWebsiteInput input)
+      throws TosException {
+    return retry(() -> client.putBucketWebsite(input));
+  }
+
+  @Override
+  public GetBucketWebsiteOutput getBucketWebsite(GetBucketWebsiteInput input)
+      throws TosException {
+    return retry(() -> client.getBucketWebsite(input));
+  }
+
+  @Override
+  public DeleteBucketWebsiteOutput deleteBucketWebsite(DeleteBucketWebsiteInput input)
+      throws TosException {
+    return retry(() -> client.deleteBucketWebsite(input));
+  }
+
+  @Override
+  public PutBucketNotificationOutput putBucketNotification(PutBucketNotificationInput input)
+      throws TosException {
+    return retry(() -> client.putBucketNotification(input));
+  }
+
+  @Override
+  public GetBucketNotificationOutput getBucketNotification(GetBucketNotificationInput input)
+      throws TosException {
+    return retry(() -> client.getBucketNotification(input));
+  }
+
+  @Override
+  public PutBucketNotificationType2Output putBucketNotificationType2(PutBucketNotificationType2Input input)
+      throws TosException {
+    return retry(() -> client.putBucketNotificationType2(input));
+  }
+
+  @Override
+  public GetBucketNotificationType2Output getBucketNotificationType2(GetBucketNotificationType2Input input)
+      throws TosException {
+    return retry(() -> client.getBucketNotificationType2(input));
+  }
+
+  @Override
+  public PutBucketCustomDomainOutput putBucketCustomDomain(PutBucketCustomDomainInput input)
+      throws TosException {
+    return retry(() -> client.putBucketCustomDomain(input));
+  }
+
+  @Override
+  public ListBucketCustomDomainOutput listBucketCustomDomain(ListBucketCustomDomainInput input)
+      throws TosException {
+    return retry(() -> client.listBucketCustomDomain(input));
+  }
+
+  @Override
+  public DeleteBucketCustomDomainOutput deleteBucketCustomDomain(DeleteBucketCustomDomainInput input)
+      throws TosException {
+    return retry(() -> client.deleteBucketCustomDomain(input));
+  }
+
+  @Override
+  public PutBucketRealTimeLogOutput putBucketRealTimeLog(PutBucketRealTimeLogInput input)
+      throws TosException {
+    return retry(() -> client.putBucketRealTimeLog(input));
+  }
+
+  @Override
+  public GetBucketRealTimeLogOutput getBucketRealTimeLog(GetBucketRealTimeLogInput input)
+      throws TosException {
+    return retry(() -> client.getBucketRealTimeLog(input));
+  }
+
+  @Override
+  public DeleteBucketRealTimeLogOutput deleteBucketRealTimeLog(DeleteBucketRealTimeLogInput input)
+      throws TosException {
+    return retry(() -> deleteBucketRealTimeLog(input));
+  }
+
+  @Override
+  public PutBucketACLOutput putBucketACL(PutBucketACLInput input) throws TosException {
+    return retry(() -> client.putBucketACL(input));
+  }
+
+  @Override
+  public GetBucketACLOutput getBucketACL(GetBucketACLInput input) throws TosException {
+    return retry(() -> client.getBucketACL(input));
+  }
+
+  @Override
+  public PutBucketRenameOutput putBucketRename(PutBucketRenameInput input) throws TosException {
+    return retry(() -> client.putBucketRename(input));
+  }
+
+  @Override
+  public GetBucketRenameOutput getBucketRename(GetBucketRenameInput input) throws TosException {
+    return retry(() -> client.getBucketRename(input));
+  }
+
+  @Override
+  public DeleteBucketRenameOutput deleteBucketRename(DeleteBucketRenameInput input) throws TosException {
+    return retry(() -> client.deleteBucketRename(input));
+  }
+
+  @Override
+  public PutBucketEncryptionOutput putBucketEncryption(PutBucketEncryptionInput input) throws TosException {
+    return retry(() -> client.putBucketEncryption(input));
+  }
+
+  @Override
+  public GetBucketEncryptionOutput getBucketEncryption(GetBucketEncryptionInput input) throws TosException {
+    return retry(() -> client.getBucketEncryption(input));
+  }
+
+  @Override
+  public DeleteBucketEncryptionOutput deleteBucketEncryption(DeleteBucketEncryptionInput input) throws TosException {
+    return retry(() -> client.deleteBucketEncryption(input));
+  }
+
+  @Override
+  public PutBucketTaggingOutput putBucketTagging(PutBucketTaggingInput input) throws TosException {
+    return retry(() -> client.putBucketTagging(input));
+  }
+
+  @Override
+  public GetBucketTaggingOutput getBucketTagging(GetBucketTaggingInput input) throws TosException {
+    return retry(() -> client.getBucketTagging(input));
+  }
+
+  @Override
+  public DeleteBucketTaggingOutput deleteBucketTagging(DeleteBucketTaggingInput input) throws TosException {
+    return retry(() -> client.deleteBucketTagging(input));
+  }
+
+  @Override
+  public PutBucketInventoryOutput putBucketInventory(PutBucketInventoryInput input)
+      throws TosException {
+    return retry(() -> client.putBucketInventory(input));
+  }
+
+  @Override
+  public GetBucketInventoryOutput getBucketInventory(GetBucketInventoryInput input) throws TosException {
+    return retry(() -> client.getBucketInventory(input));
+  }
+
+  @Override
+  public ListBucketInventoryOutput listBucketInventory(ListBucketInventoryInput input) throws TosException {
+    return retry(() -> client.listBucketInventory(input));
+  }
+
+  @Override
+  public DeleteBucketInventoryOutput deleteBucketInventory(DeleteBucketInventoryInput input) throws TosException {
+    return retry(() -> client.deleteBucketInventory(input));
+  }
+
+  @Override
+  public GetObjectV2Output getObject(GetObjectV2Input input) throws TosException {
+    return retry(() -> client.getObject(input));
+  }
+
+  @Override
+  public GetObjectToFileOutput getObjectToFile(GetObjectToFileInput input) throws TosException {
+    return retry(() -> client.getObjectToFile(input));
+  }
+
+  @Override
+  public GetFileStatusOutput getFileStatus(GetFileStatusInput input) throws TosException {
+    return retry(() -> client.getFileStatus(input));
+  }
+
+  @Override
+  public UploadFileV2Output uploadFile(UploadFileV2Input input) throws TosException {
+    return retry(() -> client.uploadFile(input));
+  }
+
+  @Override
+  public DownloadFileOutput downloadFile(DownloadFileInput input) throws TosException {
+    return retry(() -> client.downloadFile(input));
+  }
+
+  @Override
+  public ResumableCopyObjectOutput resumableCopyObject(ResumableCopyObjectInput input)
+      throws TosException {
+    return retry(() -> client.resumableCopyObject(input));
+  }
+
+  @Override
+  public HeadObjectV2Output headObject(HeadObjectV2Input input) throws TosException {
+    return retry(() -> client.headObject(input));
+  }
+
+  @Override
+  public DeleteObjectOutput deleteObject(DeleteObjectInput input) throws TosException {
+    return retry(() -> client.deleteObject(input));
+  }
+
+  @Override
+  public DeleteMultiObjectsV2Output deleteMultiObjects(DeleteMultiObjectsV2Input input)
+      throws TosException {
+    return retry(() -> client.deleteMultiObjects(input));
+  }
+
+  public PutObjectOutput put(
+      String bucket, String key, InputStreamProvider streamProvider,
+      long contentLength, ACLType aclType) {
+    return retry(() -> client.putObject(newPutObjectRequest(bucket, key, streamProvider, contentLength, aclType)));
+  }
+
+  private PutObjectInput newPutObjectRequest(
+      String bucket,
+      String key,
+      InputStreamProvider streamProvider,
+      long contentLength,
+      ACLType aclType) {
+
+    return PutObjectInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .content(streamProvider.newStream())
+        .contentLength(contentLength)
+        .options(new ObjectMetaRequestOptions()
+            .setAclType(aclType))
+        .build();
+  }
+
+  public AppendObjectOutput appendObject(String bucket, String key, InputStreamProvider streamProvider,
+      long offset, long contentLength, String originalCrc64, ACLType aclType) {
+    // originalCrc64 is needed when appending data to object. It should be the object's crc64 checksum if the object
+    // exists, and null if the object doesn't exist.
+    return retry(() -> client.appendObject(
+        newAppendObjectRequest(bucket, key, streamProvider, offset, contentLength, originalCrc64, aclType)));
+  }
+
+  private AppendObjectInput newAppendObjectRequest(
+      String bucket,
+      String key,
+      InputStreamProvider streamProvider,
+      long offset,
+      long contentLength,
+      String preCrc64ecma,
+      ACLType aclType) {
+    return AppendObjectInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .content(streamProvider.newStream())
+        .offset(offset)
+        .contentLength(contentLength)
+        .preHashCrc64ecma(preCrc64ecma)
+        .options(new ObjectMetaRequestOptions()
+            .setAclType(aclType))
+        .build();
+  }
+
+  @Override
+  public PutObjectOutput putObject(PutObjectInput input) throws TosException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public PutObjectFromFileOutput putObjectFromFile(PutObjectFromFileInput input)
+      throws TosException {
+    return retry(() -> client.putObjectFromFile(input));
+  }
+
+  @Override
+  public AppendObjectOutput appendObject(AppendObjectInput input)
+      throws TosException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public SetObjectMetaOutput setObjectMeta(SetObjectMetaInput input)
+      throws TosException {
+    return retry(() -> client.setObjectMeta(input));
+  }
+
+  @Override
+  public ListObjectsV2Output listObjects(ListObjectsV2Input input)
+      throws TosException {
+    return retry(() -> client.listObjects(input));
+  }
+
+  @Override
+  public ListObjectsType2Output listObjectsType2(ListObjectsType2Input input)
+      throws TosException {
+    return retry(() -> client.listObjectsType2(input));
+  }
+
+  @Override
+  public ListObjectVersionsV2Output listObjectVersions(ListObjectVersionsV2Input input)
+      throws TosException {
+    return retry(() -> client.listObjectVersions(input));
+  }
+
+  @Override
+  public CopyObjectV2Output copyObject(CopyObjectV2Input input)
+      throws TosException {
+    return retry(() -> client.copyObject(input));
+  }
+
+  @Override
+  public UploadPartCopyV2Output uploadPartCopy(UploadPartCopyV2Input input)
+      throws TosException {
+    return retry(() -> client.uploadPartCopy(input));
+  }
+
+  @Override
+  public PutObjectACLOutput putObjectAcl(PutObjectACLInput input)
+      throws TosException {
+    return retry(() -> client.putObjectAcl(input));
+  }
+
+  @Override
+  public GetObjectACLV2Output getObjectAcl(GetObjectACLV2Input input)
+      throws TosException {
+    return retry(() -> client.getObjectAcl(input));
+  }
+
+  @Override
+  public PutObjectTaggingOutput putObjectTagging(PutObjectTaggingInput input)
+      throws TosException {
+    return retry(() -> client.putObjectTagging(input));
+  }
+
+  @Override
+  public GetObjectTaggingOutput getObjectTagging(GetObjectTaggingInput input)
+      throws TosException {
+    return retry(() -> client.getObjectTagging(input));
+  }
+
+  @Override
+  public DeleteObjectTaggingOutput deleteObjectTagging(DeleteObjectTaggingInput input)
+      throws TosException {
+    return retry(() -> client.deleteObjectTagging(input));
+  }
+
+  @Override
+  public FetchObjectOutput fetchObject(FetchObjectInput input) throws TosException {
+    return retry(() -> client.fetchObject(input));
+  }
+
+  @Override
+  public PutFetchTaskOutput putFetchTask(PutFetchTaskInput input) throws TosException {
+    return retry(() -> client.putFetchTask(input));
+  }
+
+  @Override
+  public GetFetchTaskOutput getFetchTask(GetFetchTaskInput input) throws TosException {
+    return retry(() -> client.getFetchTask(input));
+  }
+
+  @Override
+  public CreateMultipartUploadOutput createMultipartUpload(CreateMultipartUploadInput input)
+      throws TosException {
+    return retry(() -> client.createMultipartUpload(input));
+  }
+
+  public Part uploadPart(
+      String bucket,
+      String key,
+      String uploadId,
+      int partNum,
+      InputStreamProvider streamProvider,
+      long contentLength,
+      ACLType aclType) {
+    return retry(() -> {
+      InputStream in = streamProvider.newStream();
+      CountingInputStream countedIn = new CountingInputStream(in);
+      UploadPartV2Input request = UploadPartV2Input.builder()
+          .bucket(bucket)
+          .key(key)
+          .partNumber(partNum)
+          .uploadID(uploadId)
+          .content(countedIn)
+          .contentLength(contentLength)
+          .options(new ObjectMetaRequestOptions()
+              .setAclType(aclType))
+          .build();
+      UploadPartV2Output output = client.uploadPart(request);
+      return new Part(output.getPartNumber(), countedIn.getCount(), output.getEtag());
+    });
+  }
+
+  @Override
+  public UploadPartV2Output uploadPart(UploadPartV2Input input) throws TosException {
+    throw new UnsupportedOperationException("Not supported");
+  }
+
+  @Override
+  public UploadPartFromFileOutput uploadPartFromFile(UploadPartFromFileInput input)
+      throws TosException {
+    return retry(() -> client.uploadPartFromFile(input));
+  }
+
+  @Override
+  public CompleteMultipartUploadV2Output completeMultipartUpload(CompleteMultipartUploadV2Input input)
+      throws TosException {
+    return retry(() -> client.completeMultipartUpload(input));
+  }
+
+  @Override
+  public AbortMultipartUploadOutput abortMultipartUpload(AbortMultipartUploadInput input)
+      throws TosException {
+    return retry(() -> client.abortMultipartUpload(input));
+  }
+
+  @Override
+  public ListPartsOutput listParts(ListPartsInput input) throws TosException {
+    return retry(() -> client.listParts(input));
+  }
+
+  @Override
+  public ListMultipartUploadsV2Output listMultipartUploads(ListMultipartUploadsV2Input input)
+      throws TosException {
+    return retry(() -> client.listMultipartUploads(input));
+  }
+
+  @Override
+  public RenameObjectOutput renameObject(RenameObjectInput input) throws TosException {
+    return retry(() -> client.renameObject(input));
+  }
+
+  @Override
+  public RestoreObjectOutput restoreObject(RestoreObjectInput input) throws TosException {
+    return retry(() -> client.restoreObject(input));
+  }
+
+  @Override
+  public PutSymlinkOutput putSymlink(PutSymlinkInput input) throws TosException {
+    return retry(() -> client.putSymlink(input));
+  }
+
+  @Override
+  public GetSymlinkOutput getSymlink(GetSymlinkInput input) throws TosException {
+    return retry(() -> client.getSymlink(input));
+  }
+
+  @Override
+  public PreSignedURLOutput preSignedURL(PreSignedURLInput input) throws TosException {
+    return retry(() -> client.preSignedURL(input));
+  }
+
+  @Override
+  public PreSignedPostSignatureOutput preSignedPostSignature(PreSignedPostSignatureInput input)
+      throws TosException {
+    return retry(() -> client.preSignedPostSignature(input));
+  }
+
+  @Override
+  public PreSingedPolicyURLOutput preSingedPolicyURL(PreSingedPolicyURLInput input)
+      throws TosException {
+    return retry(() -> client.preSingedPolicyURL(input));
+  }
+
+  @Override
+  public PreSignedPolicyURLOutput preSignedPolicyURL(PreSignedPolicyURLInput input) throws TosException {
+    return retry(() -> client.preSignedPolicyURL(input));
+  }
+
+  @Override
+  public void changeCredentials(Credentials credentials) {
+    retry(() -> {
+      client.changeCredentials(credentials);
+      return null;
+    });
+  }
+
+  @Override
+  public void changeRegionAndEndpoint(String region, String endpoint) {
+    retry(() -> {
+      client.changeRegionAndEndpoint(region, endpoint);
+      return null;
+    });
+  }
+
+  @Override
+  public void changeTransportConfig(TransportConfig config) {
+    retry(() -> {
+      client.changeTransportConfig(config);
+      return null;
+    });
+  }
+
+  @Override
+  public boolean refreshEndpointRegion(String s, String s1) {
+    return retry(() -> refreshEndpointRegion(s, s1));
+  }
+
+  @Override
+  public boolean refreshCredentials(String s, String s1, String s2) {
+    return retry(() -> refreshCredentials(s, s1, s2));
+  }
+
+  @Override
+  public void close() throws IOException {
+    client.close();
+  }
+
+  private void refresh() throws TosException {
+    Credential credential = provider.credential();
+    if (credentialIsChanged(credential)) {
+      synchronized (this) {
+        if (credentialIsChanged(credential)) {
+          client.changeCredentials(provider);
+          usedCredential = credential;
+        }
+      }
+    }
+  }
+
+  private boolean credentialIsChanged(Credential credential) {
+    return !Objects.equals(credential.getAccessKeyId(), usedCredential.getAccessKeyId())
+        || !Objects.equals(credential.getAccessKeySecret(), usedCredential.getAccessKeySecret())
+        || !Objects.equals(credential.getSecurityToken(), usedCredential.getSecurityToken());
+  }
+
+  private <T> T retry(Callable<T> callable) {
+    int attempt = 0;
+    while (true) {
+      attempt++;
+      try {
+        refresh();
+        return callable.call();
+      } catch (TosException e) {
+        if (attempt >= maxRetryTimes) {
+          LOG.error("Retry exhausted after {} times.", maxRetryTimes);
+          throw e;
+        }
+        if (isRetryableException(e, nonRetryable409ErrorCodes)) {
+          LOG.warn("Retry TOS request in the {} times, error: {}", attempt,
+              Throwables.getRootCause(e).getMessage());
+          try {
+            // last time does not need to sleep
+            Thread.sleep(RetryableUtils.backoff(attempt));
+          } catch (InterruptedException ex) {
+            throw new TosClientException("tos: request interrupted.", ex);
+          }
+        } else {
+          throw e;
+        }
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  static boolean isRetryableException(TosException e, List<String> nonRetryable409ErrorCodes) {
+    return e.getStatusCode() >= HttpStatus.INTERNAL_SERVER_ERROR
+        || e.getStatusCode() == HttpStatus.TOO_MANY_REQUESTS
+        || e.getCause() instanceof SocketException
+        || e.getCause() instanceof UnknownHostException
+        || e.getCause() instanceof SSLException
+        || e.getCause() instanceof SocketTimeoutException
+        || e.getCause() instanceof InterruptedException
+        || isRetryableTosClientException(e)
+        || isRetryableTosServerException(e, nonRetryable409ErrorCodes);
+  }
+
+  private static boolean isRetryableTosClientException(TosException e) {
+    return e instanceof TosClientException
+        && e.getCause() instanceof IOException
+        && !(e.getCause() instanceof EOFException);
+  }
+
+  private static boolean isRetryableTosServerException(TosException e, List<String> nonRetryable409ErrorCodes) {
+    return e instanceof TosServerException
+        && e.getStatusCode() == HttpStatus.CONFLICT
+        && isRetryableTosConflictException((TosServerException) e, nonRetryable409ErrorCodes);
+  }
+
+  private static boolean isRetryableTosConflictException(TosServerException e, List<String> nonRetryableCodes) {
+    String errorCode = e.getEc();
+    return StringUtils.isEmpty(errorCode) || !nonRetryableCodes.contains(errorCode);
+  }
+}
+

+ 184 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java

@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.TOSClientConfiguration;
+import com.volcengine.tos.TosException;
+import com.volcengine.tos.transport.TransportConfig;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.object.tos.auth.CredentialsProvider;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+import org.apache.hadoop.fs.tosfs.util.TOSClientContextUtils;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.hadoop.util.VersionInfo;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
+
+public class DelegationClientBuilder {
+
+  public static final int DISABLE_TOS_RETRY_VALUE = -1;
+  private static final String TOS_ENDPOINT_KEY = ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME);
+  private static final String TOS_REGION_KEY = ConfKeys.FS_TOS_REGION.key(TOS_SCHEME);
+
+  @VisibleForTesting
+  static final Map<String, DelegationClient> CACHE = new ConcurrentHashMap<>();
+
+  private String bucket;
+  private Configuration conf;
+
+  public DelegationClientBuilder bucket(String bucket) {
+    this.bucket = bucket;
+    return this;
+  }
+
+  public DelegationClientBuilder conf(Configuration conf) {
+    this.conf = conf;
+    return this;
+  }
+
+  public DelegationClient build() throws TosException {
+    Preconditions.checkNotNull(bucket, "Bucket cannot be null");
+    Preconditions.checkNotNull(conf, "Conf cannot be null");
+    String endpoint = getAndCheckEndpoint(conf);
+    String region = getAndCheckRegion(conf, endpoint);
+
+    if (conf.getBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE,
+        TosKeys.FS_TOS_DISABLE_CLIENT_CACHE_DEFAULT)) {
+      return createNewClient(conf, endpoint, region, bucket, false);
+    }
+    return CACHE.computeIfAbsent(bucket, client -> createNewClient(conf, endpoint, region, bucket, true));
+  }
+
+  private DelegationClient createNewClient(Configuration conf, String endpoint, String region,
+      String bucket, boolean cached) {
+    CredentialsProvider provider = createProvider(conf, bucket);
+    TOSClientConfiguration clientConfiguration = TOSClientConfiguration.builder()
+        .region(region)
+        .endpoint(endpoint)
+        .credentials(provider)
+        .enableCrc(conf.getBoolean(
+            TosKeys.FS_TOS_CRC_CHECK_ENABLED, TosKeys.FS_TOS_CRC_CHECK_ENABLED_DEFAULT))
+        .transportConfig(createTransportConfig(conf))
+        .userAgentProductName(conf.get(
+            TosKeys.FS_TOS_USER_AGENT_PREFIX, TosKeys.FS_TOS_USER_AGENT_PREFIX_DEFAULT))
+        .userAgentSoftName(Constants.PROTON)
+        .userAgentSoftVersion(VersionInfo.getVersion())
+        .build();
+
+    int maxRetryTimes = conf.getInt(TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES,
+        TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES_DEFAULT);
+    List<String> nonRetryable409ErrorCodes = Arrays.asList(
+        conf.getTrimmedStrings(TosKeys.FS_TOS_FAST_FAILURE_409_ERROR_CODES,
+            TosKeys.FS_TOS_FAST_FAILURE_409_ERROR_CODES_DEFAULT));
+
+    if (cached) {
+      return new CachedClient(clientConfiguration, maxRetryTimes, nonRetryable409ErrorCodes);
+    } else {
+      return new DelegationClient(clientConfiguration, maxRetryTimes, nonRetryable409ErrorCodes);
+    }
+  }
+
+  private CredentialsProvider createProvider(Configuration conf, String bucket) {
+    try {
+      CredentialsProvider provider = (CredentialsProvider) Class.forName(
+              conf.get(TosKeys.FS_TOS_CREDENTIALS_PROVIDER,
+                  TosKeys.FS_TOS_CREDENTIALS_PROVIDER_DEFAULT))
+              .getDeclaredConstructor()
+              .newInstance();
+      provider.initialize(conf, bucket);
+      return provider;
+    } catch (ClassNotFoundException |
+             InstantiationException |
+             IllegalAccessException |
+             InvocationTargetException |
+             NoSuchMethodException e) {
+      throw new TosException(e);
+    }
+  }
+
+  private String getAndCheckEndpoint(Configuration conf) {
+    String endpoint = conf.get(TOS_ENDPOINT_KEY);
+    if (StringUtils.isBlank(endpoint)) {
+      endpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT);
+    }
+    Preconditions.checkNotNull(endpoint, "%s cannot be null", TOS_ENDPOINT_KEY);
+    return endpoint.trim();
+  }
+
+  private String getAndCheckRegion(Configuration conf, String endpoint) {
+    String region = conf.get(TOS_REGION_KEY);
+    if (StringUtils.isNotBlank(region)) {
+      return region.trim();
+    }
+    region = TOSClientContextUtils.parseRegion(endpoint);
+    Preconditions.checkNotNull(region, "%s cannot be null", TOS_REGION_KEY);
+    return region.trim();
+  }
+
+  private TransportConfig createTransportConfig(Configuration conf) {
+    TransportConfig.TransportConfigBuilder builder = TransportConfig.builder();
+    // Disable tos sdk retry with negative number since we have set retry strategy above TOS SDK,
+    // which cannot support retry all input streams via mark & reset API.
+    // It's hard to use it as there are some restrictions.
+    // the TOS SDK will reset the max retry count with 3 if the configured count equal to 0.
+    builder.maxRetryCount(DISABLE_TOS_RETRY_VALUE);
+
+    builder.maxConnections(conf.getInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS,
+        TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS_DEFAULT));
+    builder.idleConnectionTimeMills(conf.getInt(TosKeys.FS_TOS_HTTP_IDLE_CONNECTION_TIME_MILLS,
+        TosKeys.FS_TOS_HTTP_IDLE_CONNECTION_TIME_MILLS_DEFAULT));
+    builder.connectTimeoutMills(conf.getInt(TosKeys.FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS,
+        TosKeys.FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS_DEFAULT));
+    builder.readTimeoutMills(conf.getInt(TosKeys.FS_TOS_HTTP_READ_TIMEOUT_MILLS,
+        TosKeys.FS_TOS_HTTP_READ_TIMEOUT_MILLS_DEFAULT));
+    builder.writeTimeoutMills(conf.getInt(TosKeys.FS_TOS_HTTP_WRITE_TIMEOUT_MILLS,
+        TosKeys.FS_TOS_HTTP_WRITE_TIMEOUT_MILLS_DEFAULT));
+    builder.enableVerifySSL(conf.getBoolean(TosKeys.FS_TOS_HTTP_ENABLE_VERIFY_SSL,
+        TosKeys.FS_TOS_HTTP_ENABLE_VERIFY_SSL_DEFAULT));
+    builder.dnsCacheTimeMinutes(conf.getInt(TosKeys.FS_TOS_HTTP_DNS_CACHE_TIME_MINUTES,
+        TosKeys.FS_TOS_HTTP_DNS_CACHE_TIME_MINUTES_DEFAULT));
+
+    return builder.build();
+  }
+
+  static class CachedClient extends DelegationClient {
+
+    protected CachedClient(TOSClientConfiguration configuration, int maxRetryTimes,
+        List<String> nonRetryable409ErrorCodes) {
+      super(configuration, maxRetryTimes, nonRetryable409ErrorCodes);
+    }
+
+    @Override
+    public void close() {
+      // do nothing as this client may be shared by multiple upper-layer instances
+    }
+  }
+}
+

+ 61 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/GetObjectOutput.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.model.object.GetObjectV2Output;
+import org.apache.hadoop.fs.tosfs.object.exceptions.ChecksumMismatchException;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.util.Preconditions;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+public class GetObjectOutput {
+  private final GetObjectV2Output output;
+  private final byte[] checksum;
+
+  public GetObjectOutput(GetObjectV2Output output, byte[] checksum) {
+    Preconditions.checkNotNull(checksum, "Checksum should not be null.");
+    this.output = output;
+    this.checksum = checksum;
+  }
+
+  public GetObjectV2Output output() {
+    return output;
+  }
+
+  public byte[] checksum() {
+    return checksum;
+  }
+
+  public InputStream verifiedContent(byte[] expectedChecksum) throws IOException {
+    if (!Arrays.equals(expectedChecksum, checksum)) {
+      CommonUtils.runQuietly(this::forceClose);
+      throw new ChecksumMismatchException(expectedChecksum, checksum);
+    }
+
+    return output.getContent();
+  }
+
+  public void forceClose() throws IOException {
+    output.forceClose();
+  }
+}
+

+ 1013 - 1
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java

@@ -16,10 +16,1022 @@
 
 package org.apache.hadoop.fs.tosfs.object.tos;
 
-public class TOS {
+import com.volcengine.tos.TOSV2;
+import com.volcengine.tos.TosException;
+import com.volcengine.tos.TosServerException;
+import com.volcengine.tos.comm.common.ACLType;
+import com.volcengine.tos.comm.common.BucketType;
+import com.volcengine.tos.internal.util.TypeConverter;
+import com.volcengine.tos.model.bucket.HeadBucketV2Input;
+import com.volcengine.tos.model.bucket.HeadBucketV2Output;
+import com.volcengine.tos.model.bucket.Tag;
+import com.volcengine.tos.model.object.AbortMultipartUploadInput;
+import com.volcengine.tos.model.object.AppendObjectOutput;
+import com.volcengine.tos.model.object.CompleteMultipartUploadV2Input;
+import com.volcengine.tos.model.object.CopyObjectV2Input;
+import com.volcengine.tos.model.object.CreateMultipartUploadInput;
+import com.volcengine.tos.model.object.CreateMultipartUploadOutput;
+import com.volcengine.tos.model.object.DeleteError;
+import com.volcengine.tos.model.object.DeleteMultiObjectsV2Input;
+import com.volcengine.tos.model.object.DeleteMultiObjectsV2Output;
+import com.volcengine.tos.model.object.DeleteObjectInput;
+import com.volcengine.tos.model.object.DeleteObjectTaggingInput;
+import com.volcengine.tos.model.object.GetFileStatusInput;
+import com.volcengine.tos.model.object.GetFileStatusOutput;
+import com.volcengine.tos.model.object.GetObjectBasicOutput;
+import com.volcengine.tos.model.object.GetObjectTaggingInput;
+import com.volcengine.tos.model.object.GetObjectTaggingOutput;
+import com.volcengine.tos.model.object.GetObjectV2Input;
+import com.volcengine.tos.model.object.GetObjectV2Output;
+import com.volcengine.tos.model.object.HeadObjectV2Input;
+import com.volcengine.tos.model.object.HeadObjectV2Output;
+import com.volcengine.tos.model.object.ListMultipartUploadsV2Input;
+import com.volcengine.tos.model.object.ListMultipartUploadsV2Output;
+import com.volcengine.tos.model.object.ListObjectsType2Input;
+import com.volcengine.tos.model.object.ListObjectsType2Output;
+import com.volcengine.tos.model.object.ListedCommonPrefix;
+import com.volcengine.tos.model.object.ListedObjectV2;
+import com.volcengine.tos.model.object.ListedUpload;
+import com.volcengine.tos.model.object.ObjectMetaRequestOptions;
+import com.volcengine.tos.model.object.ObjectTobeDeleted;
+import com.volcengine.tos.model.object.PutObjectOutput;
+import com.volcengine.tos.model.object.PutObjectTaggingInput;
+import com.volcengine.tos.model.object.RenameObjectInput;
+import com.volcengine.tos.model.object.TagSet;
+import com.volcengine.tos.model.object.UploadPartCopyV2Input;
+import com.volcengine.tos.model.object.UploadPartCopyV2Output;
+import com.volcengine.tos.model.object.UploadedPartV2;
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.BucketInfo;
+import org.apache.hadoop.fs.tosfs.object.ChecksumInfo;
+import org.apache.hadoop.fs.tosfs.object.ChecksumType;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.object.DirectoryStorage;
+import org.apache.hadoop.fs.tosfs.object.InputStreamProvider;
+import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
+import org.apache.hadoop.fs.tosfs.object.ObjectConstants;
+import org.apache.hadoop.fs.tosfs.object.ObjectContent;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.fs.tosfs.object.exceptions.InvalidObjectKeyException;
+import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException;
+import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
+import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
+import org.apache.hadoop.fs.tosfs.util.LazyReload;
+import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
+import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
+import org.apache.hadoop.util.Lists;
+import org.apache.hadoop.util.Preconditions;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.fs.tosfs.object.tos.TOSErrorCodes.APPEND_NOT_APPENDABLE;
+import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.CHECKSUM_HEADER;
+import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.appendable;
+import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.crc64ecma;
+import static org.apache.hadoop.fs.tosfs.object.tos.TOSUtils.parseChecksum;
+
+/**
+ * {@link TOS} will be initialized by the {@link ObjectStorage#initialize(Configuration, String)}.
+ */
+public class TOS implements DirectoryStorage {
+
+  private static final Logger LOG = LoggerFactory.getLogger(TOS.class);
+  public static final String TOS_SCHEME = "tos";
 
   public static final String ENV_TOS_ACCESS_KEY_ID = "TOS_ACCESS_KEY_ID";
   public static final String ENV_TOS_SECRET_ACCESS_KEY = "TOS_SECRET_ACCESS_KEY";
   public static final String ENV_TOS_SESSION_TOKEN = "TOS_SESSION_TOKEN";
   public static final String ENV_TOS_ENDPOINT = "TOS_ENDPOINT";
+
+  private static final int NOT_FOUND_CODE = 404;
+  private static final int PATH_CONFLICT_CODE = 409;
+  private static final int INVALID_RANGE_CODE = 416;
+
+  private static final int MIN_PART_SIZE = 5 * 1024 * 1024;
+  private static final int MAX_PART_COUNT = 10000;
+
+  private static final InputStream EMPTY_STREAM = new ByteArrayInputStream(new byte[0]);
+
+  private Configuration conf;
+  private String bucket;
+  private DelegationClient client;
+  private long maxDrainBytes;
+  private int batchDeleteMaxRetries;
+  private List<String> batchDeleteRetryCodes;
+  private long batchDeleteRetryInterval;
+  private int maxDeleteObjectsCount;
+  private int listObjectsCount;
+  // the max retry times during reading object content
+  private int maxInputStreamRetries;
+  private ACLType defaultAcl;
+  private ChecksumInfo checksumInfo;
+  private BucketInfo bucketInfo;
+
+  static {
+    org.apache.log4j.Logger logger = LogManager.getLogger("io.proton.shaded.com.volcengine.tos");
+    String logLevel = System.getProperty("tos.log.level", "WARN");
+
+    LOG.debug("Reset the log level of io.proton.shaded.com.volcengine.tos with {} ", logLevel);
+    logger.setLevel(Level.toLevel(logLevel.toUpperCase(), Level.WARN));
+  }
+
+  @Override
+  public void initialize(Configuration conf, String bucket) {
+    this.conf = conf;
+    this.bucket = bucket;
+    client = new DelegationClientBuilder().conf(conf).bucket(bucket).build();
+    maxDrainBytes =
+        conf.getLong(TosKeys.FS_TOS_MAX_DRAIN_BYTES, TosKeys.FS_TOS_MAX_DRAIN_BYTES_DEFAULT);
+    batchDeleteMaxRetries = conf.getInt(TosKeys.FS_TOS_BATCH_DELETE_MAX_RETRIES,
+        TosKeys.FS_TOS_BATCH_DELETE_MAX_RETRIES_DEFAULT);
+    batchDeleteRetryCodes = Arrays.asList(
+        conf.getTrimmedStrings(TosKeys.FS_TOS_BATCH_DELETE_RETRY_CODES,
+            TosKeys.FS_TOS_BATCH_DELETE_RETRY_CODES_DEFAULT));
+    batchDeleteRetryInterval = conf.getLong(TosKeys.FS_TOS_BATCH_DELETE_RETRY_INTERVAL,
+        TosKeys.FS_TOS_BATCH_DELETE_RETRY_INTERVAL_DEFAULT);
+    maxDeleteObjectsCount = conf.getInt(TosKeys.FS_TOS_DELETE_OBJECTS_COUNT,
+        TosKeys.FS_TOS_DELETE_OBJECTS_COUNT_DEFAULT);
+    listObjectsCount =
+        conf.getInt(TosKeys.FS_TOS_LIST_OBJECTS_COUNT, TosKeys.FS_TOS_LIST_OBJECTS_COUNT_DEFAULT);
+    maxInputStreamRetries = conf.getInt(TosKeys.FS_TOS_MAX_READ_OBJECT_RETRIES,
+        TosKeys.FS_TOS_MAX_READ_OBJECT_RETRIES_DEFAULT);
+    defaultAcl = TypeConverter.convertACLType(TosKeys.FS_TOS_ACL_DEFAULT);
+
+    String algorithm = conf.get(TosKeys.FS_TOS_CHECKSUM_ALGORITHM);
+    ChecksumType checksumType = ChecksumType.valueOf(
+        conf.get(TosKeys.FS_TOS_CHECKSUM_TYPE, TosKeys.FS_TOS_CHECKSUM_TYPE_DEFAULT).toUpperCase());
+    Preconditions.checkArgument(CHECKSUM_HEADER.containsKey(checksumType),
+        "Checksum type %s is not supported by TOS.", checksumType.name());
+    checksumInfo = new ChecksumInfo(algorithm, checksumType);
+
+    bucketInfo = getBucketInfo(bucket);
+  }
+
+  @Override
+  public String scheme() {
+    return TOS_SCHEME;
+  }
+
+  @Override
+  public Configuration conf() {
+    return conf;
+  }
+
+  @Override
+  public BucketInfo bucket() {
+    return bucketInfo;
+  }
+
+  private BucketInfo getBucketInfo(String bucket) {
+    try {
+      HeadBucketV2Output res =
+          client.headBucket(HeadBucketV2Input.builder().bucket(bucket).build());
+
+      // BUCKET_TYPE_FNS is the general purpose bucket, BUCKET_TYPE_HNS is directory bucket.
+      boolean directoryBucket = BucketType.BUCKET_TYPE_HNS.equals(res.getBucketType());
+
+      return new BucketInfo(bucket, directoryBucket);
+    } catch (TosException e) {
+      if (e.getStatusCode() == NOT_FOUND_CODE) {
+        return null;
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  @VisibleForTesting
+  void setClient(DelegationClient client) {
+    this.client = client;
+  }
+
+  private void checkAvailableClient() {
+    Preconditions.checkState(client != null,
+        "Encountered uninitialized ObjectStorage, call initialize(..) please.");
+  }
+
+  @Override
+  public ObjectContent get(String key, long offset, long limit) {
+    checkAvailableClient();
+    Preconditions.checkArgument(offset >= 0, "offset is a negative number: %s", offset);
+
+    if (limit == 0) {
+      // Can not return empty stream when limit = 0, because the requested object might not exist.
+      if (head(key) != null) {
+        return new ObjectContent(Constants.MAGIC_CHECKSUM, EMPTY_STREAM);
+      } else {
+        throw new RuntimeException(String.format("Object %s doesn't exit", key));
+      }
+    }
+
+    long end = limit < 0 ? -1 : offset + limit - 1;
+    GetObjectFactory factory = (k, startOff, endOff) -> getObject(key, startOff, endOff);
+    ChainTOSInputStream chainStream =
+        new ChainTOSInputStream(factory, key, offset, end, maxDrainBytes, maxInputStreamRetries);
+    return new ObjectContent(chainStream.checksum(), chainStream);
+  }
+
+  @Override
+  public Iterable<ObjectInfo> listDir(String key, boolean recursive) {
+    if (recursive) {
+      if (bucket().isDirectory()) {
+        // The directory bucket only support list object with delimiter = '/', so if we want to
+        // list directory recursively, we have to list each dir step by step.
+        return bfsListDir(key);
+      } else {
+        return listAll(key, key);
+      }
+    } else {
+      return innerListDir(key, key, -1);
+    }
+  }
+
+  private Iterable<ObjectInfo> bfsListDir(String key) {
+    return new LazyReload<>(() -> {
+      final Deque<String> dirQueue = new LinkedList<>();
+      AtomicReference<String> continueToken = new AtomicReference<>("");
+      AtomicReference<String> curDir = new AtomicReference<>(key);
+
+      return buf -> {
+        // No more objects when isTruncated is false.
+        if (curDir.get() == null) {
+          return true;
+        }
+
+        ListObjectsType2Input request =
+            createListObjectsType2Input(curDir.get(), curDir.get(), listObjectsCount, "/",
+                continueToken.get());
+        ListObjectsType2Output response = client.listObjectsType2(request);
+
+        if (response.getContents() != null) {
+          for (ListedObjectV2 obj : response.getContents()) {
+            buf.add(new ObjectInfo(obj.getKey(), obj.getSize(), obj.getLastModified(),
+                parseChecksum(obj, checksumInfo)));
+          }
+        }
+
+        if (response.getCommonPrefixes() != null) {
+          for (ListedCommonPrefix prefix : response.getCommonPrefixes()) {
+            buf.add(new ObjectInfo(prefix.getPrefix(), 0, new Date(), Constants.MAGIC_CHECKSUM));
+            dirQueue.add(prefix.getPrefix());
+          }
+        }
+
+        if (response.isTruncated()) {
+          continueToken.set(response.getNextContinuationToken());
+        } else {
+          curDir.set(dirQueue.poll());
+          continueToken.set("");
+        }
+
+        return curDir.get() == null;
+      };
+    });
+  }
+
+  private Iterable<ObjectInfo> innerListDir(String key, String startAfter, int limit) {
+    return new LazyReload<>(() -> {
+      AtomicReference<String> continueToken = new AtomicReference<>("");
+      AtomicBoolean isTruncated = new AtomicBoolean(true);
+      AtomicInteger remaining = new AtomicInteger(limit < 0 ? Integer.MAX_VALUE : limit);
+
+      return buf -> {
+        // No more objects when isTruncated is false.
+        if (!isTruncated.get()) {
+          return true;
+        }
+
+        int remainingKeys = remaining.get();
+        int maxKeys = Math.min(listObjectsCount, remainingKeys);
+        ListObjectsType2Input request =
+            createListObjectsType2Input(key, startAfter, maxKeys, "/", continueToken.get());
+        ListObjectsType2Output response = client.listObjectsType2(request);
+
+        if (response.getContents() != null) {
+          for (ListedObjectV2 obj : response.getContents()) {
+            buf.add(new ObjectInfo(obj.getKey(), obj.getSize(), obj.getLastModified(),
+                parseChecksum(obj, checksumInfo)));
+          }
+        }
+
+        if (response.getCommonPrefixes() != null) {
+          for (ListedCommonPrefix prefix : response.getCommonPrefixes()) {
+            buf.add(new ObjectInfo(prefix.getPrefix(), 0, new Date(), Constants.MAGIC_CHECKSUM));
+          }
+        }
+
+        isTruncated.set(response.isTruncated());
+        remaining.compareAndSet(remainingKeys, remainingKeys - response.getKeyCount());
+        continueToken.set(response.getNextContinuationToken());
+
+        return !isTruncated.get();
+      };
+    });
+  }
+
+  @Override
+  public void deleteDir(String key, boolean recursive) {
+    checkAvailableClient();
+    if (recursive) {
+      if (conf.getBoolean(TosKeys.FS_TOS_RMR_SERVER_ENABLED,
+          TosKeys.FS_FS_TOS_RMR_SERVER_ENABLED_DEFAULT)) {
+        DeleteObjectInput request =
+            DeleteObjectInput.builder().bucket(bucket).recursive(true).key(key).build();
+        try {
+          // It's a test feature, TOS SDK don't expose atomic delete dir capability currently.
+          Field f = DeleteObjectInput.class.getDeclaredField("recursiveByServer");
+          f.setAccessible(true);
+          f.setBoolean(request, true);
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        client.deleteObject(request);
+      } else {
+        if (conf.getBoolean(TosKeys.FS_TOS_RMR_CLIENT_ENABLE,
+            TosKeys.FS_TOS_RMR_CLIENT_ENABLE_DEFAULT)) {
+          client.deleteObject(
+              DeleteObjectInput.builder().bucket(bucket).recursive(true).key(key).build());
+        } else {
+          recursiveDeleteDir(key);
+        }
+      }
+    } else {
+      delete(key);
+    }
+  }
+
+  @Override
+  public boolean isEmptyDir(String key) {
+    checkAvailableClient();
+    return !innerListDir(key, key, 1).iterator().hasNext();
+  }
+
+  public void recursiveDeleteDir(String key) {
+    for (ObjectInfo obj : innerListDir(key, key, -1)) {
+      if (obj.isDir()) {
+        recursiveDeleteDir(obj.key());
+      } else {
+        delete(obj.key());
+      }
+    }
+    delete(key);
+  }
+
+  interface GetObjectFactory {
+    /**
+     * Get object content for the given object key and range.
+     *
+     * @param key    The object key
+     * @param offset The start offset of object content
+     * @param end    The end offset of object content
+     * @return {@link GetObjectOutput}
+     */
+    GetObjectOutput create(String key, long offset, long end);
+  }
+
+  public GetObjectOutput getObject(String key, long offset, long end) {
+    checkAvailableClient();
+    Preconditions.checkArgument(offset >= 0, "offset is a negative number: %s", offset);
+
+    try {
+      GetObjectV2Input request = GetObjectV2Input.builder().bucket(bucket).key(key)
+          .options(ObjectMetaRequestOptions.builder().range(offset, end).build()).build();
+      GetObjectV2Output output = client.getObject(request);
+
+      byte[] checksum = parseChecksum(output.getRequestInfo().getHeader(), checksumInfo);
+      return new GetObjectOutput(output, checksum);
+    } catch (TosException e) {
+      if (e instanceof TosServerException) {
+        TosServerException tosException = (TosServerException) e;
+        if (tosException.getStatusCode() == INVALID_RANGE_CODE) {
+          ObjectInfo info = head(key);
+          // if the object is empty or the requested offset is equal to object size,
+          // return empty stream directly, otherwise, throw exception.
+          if (info.size() == 0 || offset == info.size()) {
+            return new GetObjectOutput(
+                new GetObjectV2Output(new GetObjectBasicOutput(), EMPTY_STREAM), info.checksum());
+          } else {
+            throw new RuntimeException(e);
+          }
+        }
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public byte[] put(String key, InputStreamProvider streamProvider, long contentLength) {
+    checkAvailableClient();
+    PutObjectOutput res = client.put(bucket, key, streamProvider, contentLength, defaultAcl);
+    return ObjectInfo.isDir(key) ?
+        Constants.MAGIC_CHECKSUM :
+        parseChecksum(res.getRequestInfo().getHeader(), checksumInfo);
+  }
+
+  @Override
+  public byte[] append(String key, InputStreamProvider streamProvider, long contentLength) {
+    if (bucketInfo.isDirectory()) {
+      return hnsAppend(key, streamProvider, contentLength);
+    } else {
+      return fnsAppend(key, streamProvider, contentLength);
+    }
+  }
+
+  private byte[] hnsAppend(String key, InputStreamProvider streamProvider, long contentLength) {
+    checkAvailableClient();
+
+    long offset = 0;
+    String preCrc64;
+
+    TosObjectInfo obj = innerHead(key);
+    if (obj == null) {
+      if (contentLength == 0) {
+        throw new NotAppendableException(String.format(
+            "%s is not appendable because append non-existed object with "
+                + "zero byte is not supported.", key));
+      }
+
+      // In HNS, append non-existed object is not allowed. Pre-create an empty object before
+      // performing appendObject.
+      PutObjectOutput res = client.put(bucket, key, () -> EMPTY_STREAM, 0, defaultAcl);
+      preCrc64 = res.getHashCrc64ecma();
+    } else {
+      if (contentLength == 0) {
+        return obj.checksum();
+      }
+      offset = obj.size();
+      preCrc64 = obj.crc64ecma();
+    }
+
+    AppendObjectOutput res =
+        client.appendObject(bucket, key, streamProvider, offset, contentLength, preCrc64,
+            defaultAcl);
+    return ObjectInfo.isDir(key) ? Constants.MAGIC_CHECKSUM :
+        parseChecksum(res.getRequestInfo().getHeader(), checksumInfo);
+  }
+
+  private byte[] fnsAppend(String key, InputStreamProvider streamProvider, long contentLength) {
+    checkAvailableClient();
+
+    TosObjectInfo obj = innerHead(key);
+    if (obj != null) {
+      if (!obj.appendable()) {
+        throw new NotAppendableException(String.format("%s is not appendable.", key));
+      }
+      if (contentLength == 0) {
+        return obj.checksum();
+      }
+    } else if (contentLength == 0) {
+      throw new NotAppendableException(String.format("%s is not appendable because append"
+          + " non-existed object with zero byte is not supported.", key));
+    }
+
+    long offset = obj == null ? 0 : obj.size();
+    String preCrc64 = obj == null ? null : obj.crc64ecma();
+    AppendObjectOutput res;
+    try {
+      res = client.appendObject(bucket, key, streamProvider, offset, contentLength, preCrc64,
+          defaultAcl);
+    } catch (TosServerException e) {
+      if (e.getStatusCode() == 409 && APPEND_NOT_APPENDABLE.equals(e.getEc())) {
+        throw new NotAppendableException(String.format("%s is not appendable.", key));
+      }
+      throw e;
+    }
+
+    return ObjectInfo.isDir(key) ?
+        Constants.MAGIC_CHECKSUM :
+        parseChecksum(res.getRequestInfo().getHeader(), checksumInfo);
+  }
+
+  @Override
+  public void delete(String key) {
+    checkAvailableClient();
+    client.deleteObject(DeleteObjectInput.builder().bucket(bucket).key(key).build());
+  }
+
+  @Override
+  public List<String> batchDelete(List<String> keys) {
+    checkAvailableClient();
+    int totalKeyCnt = keys.size();
+
+    Preconditions.checkArgument(totalKeyCnt <= maxDeleteObjectsCount,
+        "The batch delete object count should <= %s", maxDeleteObjectsCount);
+
+
+    List<DeleteError> failedKeys = innerBatchDelete(keys);
+    for (int retry = 1; retry < batchDeleteMaxRetries && !failedKeys.isEmpty(); retry++) {
+      if (isBatchDeleteRetryable(failedKeys)) {
+        try {
+          Thread.sleep(batchDeleteRetryInterval);
+        } catch (InterruptedException e) {
+          throw new RuntimeException(e);
+        }
+
+        failedKeys = innerBatchDelete(deleteErrorKeys(failedKeys));
+      } else {
+        LOG.warn("{} of {} objects deleted failed, and cannot be retried, detail: {}",
+            failedKeys.size(),
+            totalKeyCnt,
+            Joiner.on(",\n").join(failedKeys));
+        break;
+      }
+    }
+
+    if (!failedKeys.isEmpty()) {
+      LOG.warn("{} of {} objects deleted failed after retry {} times.",
+          failedKeys.size(), totalKeyCnt, batchDeleteMaxRetries);
+    }
+
+    return deleteErrorKeys(failedKeys);
+  }
+
+  @Override
+  public void deleteAll(String prefix) {
+    if (bucket().isDirectory()) {
+      deleteDir(prefix, true);
+    } else {
+      Iterable<ObjectInfo> objects = listAll(prefix, "");
+      ObjectUtils.deleteAllObjects(this, objects,
+          conf.getInt(ConfKeys.FS_BATCH_DELETE_SIZE.key(scheme()),
+              ConfKeys.FS_BATCH_DELETE_SIZE_DEFAULT));
+    }
+  }
+
+  private List<DeleteError> innerBatchDelete(List<String> keys) {
+    List<ObjectTobeDeleted> toBeDeleted = Lists.newArrayList();
+    for (String key : keys) {
+      toBeDeleted.add(ObjectTobeDeleted.builder().key(key).build());
+    }
+
+    DeleteMultiObjectsV2Output deletedRes = client.deleteMultiObjects(DeleteMultiObjectsV2Input
+        .builder()
+        .bucket(bucket)
+        .objects(toBeDeleted)
+        .build());
+
+    return deletedRes.getErrors() == null ? Lists.newArrayList() : deletedRes.getErrors();
+  }
+
+  private boolean isBatchDeleteRetryable(List<DeleteError> failedKeys) {
+    for (DeleteError errorKey : failedKeys) {
+      if (batchDeleteRetryCodes.contains(errorKey.getCode())) {
+        LOG.warn("Failed to delete object, which might be deleted succeed after retry, detail: {}",
+            errorKey);
+      } else {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static List<String> deleteErrorKeys(List<DeleteError> errorKeys) {
+    List<String> keys = Lists.newArrayList();
+    for (DeleteError error : errorKeys) {
+      keys.add(error.getKey());
+    }
+    return keys;
+  }
+
+  @Override
+  public ObjectInfo head(String key) {
+    return innerHead(key);
+  }
+
+  private TosObjectInfo innerHead(String key) {
+    checkAvailableClient();
+    try {
+      HeadObjectV2Input request = HeadObjectV2Input.builder().bucket(bucket).key(key).build();
+      HeadObjectV2Output response = client.headObject(request);
+
+      // use crc64ecma/crc32c as checksum to compare object contents, don't use eTag as checksum value
+      // since PUT & MPU operations have different object etags for same content.
+      Map<String, String> headers = response.getRequestInfo().getHeader();
+      byte[] checksum = parseChecksum(headers, checksumInfo);
+      boolean isDir = bucket().isDirectory() ? response.isDirectory() : ObjectInfo.isDir(key);
+
+      return new TosObjectInfo(key, response.getContentLength(), response.getLastModifiedInDate(),
+          checksum, isDir,
+          appendable(headers), crc64ecma(headers));
+    } catch (TosException e) {
+      if (e.getStatusCode() == NOT_FOUND_CODE) {
+        return null;
+      }
+
+      if (e.getStatusCode() == PATH_CONFLICT_CODE) {
+        // if a directory 'a/b/' exists in directory bucket, both headObject('a/b') and
+        // headObject('a/b/') will get directory info, but the response key should be 'a/b/'.
+        // But if a file 'a/b' exists in directory bucket, only headObject('a/b') will get file
+        // info, headObject('a/b/') will get 409 error.
+        throw new InvalidObjectKeyException(e);
+      }
+
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Iterable<ListObjectsResponse> list(ListObjectsRequest req) {
+    return new LazyReload<>(() ->
+    {
+      AtomicReference<String> continueToken = new AtomicReference<>("");
+      AtomicBoolean isTruncated = new AtomicBoolean(true);
+      AtomicInteger remaining =
+          new AtomicInteger(req.maxKeys() < 0 ? Integer.MAX_VALUE : req.maxKeys());
+
+      return buf -> {
+        // No more objects when isTruncated is false.
+        if (!isTruncated.get()) {
+          return true;
+        }
+
+        int remainingKeys = remaining.get();
+        int maxKeys = Math.min(listObjectsCount, remainingKeys);
+        ListObjectsType2Input request =
+            createListObjectsType2Input(req.prefix(), req.startAfter(), maxKeys, req.delimiter(),
+                continueToken.get());
+        ListObjectsType2Output response = client.listObjectsType2(request);
+        List<ObjectInfo> objects = listObjectsOutputToObjectInfos(response);
+        List<String> commonPrefixes = listObjectsOutputToCommonPrefixes(response);
+        buf.add(new ListObjectsResponse(objects, commonPrefixes));
+
+        if (maxKeys < listObjectsCount) {
+          isTruncated.set(false);
+        } else {
+          continueToken.set(response.getNextContinuationToken());
+          remaining.compareAndSet(remainingKeys, remainingKeys - response.getKeyCount());
+          if (remaining.get() == 0) {
+            isTruncated.set(false);
+          } else {
+            isTruncated.set(response.isTruncated());
+          }
+        }
+        return !isTruncated.get();
+      };
+    });
+  }
+
+  private List<String> listObjectsOutputToCommonPrefixes(ListObjectsType2Output listObjectsOutput) {
+    if (listObjectsOutput.getCommonPrefixes() == null) {
+      return Lists.newArrayList();
+    }
+
+    return listObjectsOutput.getCommonPrefixes()
+        .stream()
+        .map(ListedCommonPrefix::getPrefix)
+        .collect(Collectors.toList());
+  }
+
+  private List<ObjectInfo> listObjectsOutputToObjectInfos(
+      ListObjectsType2Output listObjectsOutput) {
+    if (listObjectsOutput.getContents() == null) {
+      return Lists.newArrayList();
+    }
+    return listObjectsOutput.getContents().stream()
+        .map(obj -> new ObjectInfo(
+            obj.getKey(),
+            obj.getSize(),
+            obj.getLastModified(),
+            parseChecksum(obj, checksumInfo)))
+        .collect(Collectors.toList());
+  }
+
+  private ListObjectsType2Input createListObjectsType2Input(
+      String prefix, String startAfter, int maxKeys, String delimiter, String continueToken) {
+    ListObjectsType2Input.ListObjectsType2InputBuilder builder = ListObjectsType2Input.builder()
+        .bucket(bucket)
+        .prefix(prefix)
+        .startAfter(startAfter)
+        .delimiter(delimiter)
+        .maxKeys(maxKeys);
+
+    if (!Strings.isNullOrEmpty(continueToken)) {
+      builder.continuationToken(continueToken);
+    }
+    return builder.build();
+  }
+
+  @Override
+  public MultipartUpload createMultipartUpload(String key) {
+    checkAvailableClient();
+    CreateMultipartUploadInput input = CreateMultipartUploadInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .options(createMetaOptions())
+        .build();
+    CreateMultipartUploadOutput output = client.createMultipartUpload(input);
+    return new MultipartUpload(output.getKey(), output.getUploadID(), MIN_PART_SIZE,
+        MAX_PART_COUNT);
+  }
+
+  @Override
+  public Part uploadPart(
+      String key, String uploadId, int partNum,
+      InputStreamProvider streamProvider, long contentLength) {
+    checkAvailableClient();
+    return client.uploadPart(bucket, key, uploadId, partNum, streamProvider, contentLength,
+        defaultAcl);
+  }
+
+  @Override
+  public byte[] completeUpload(String key, String uploadId, List<Part> uploadParts) {
+    checkAvailableClient();
+    List<UploadedPartV2> uploadedPartsV2 = uploadParts.stream().map(
+        part -> UploadedPartV2.builder()
+            .etag(part.eTag())
+            .partNumber(part.num())
+            .size(part.size())
+            .build()
+    ).collect(Collectors.toList());
+    CompleteMultipartUploadV2Input input = CompleteMultipartUploadV2Input.builder()
+        .bucket(bucket)
+        .key(key)
+        .uploadID(uploadId)
+        .uploadedParts(uploadedPartsV2)
+        .build();
+    return parseChecksum(client.completeMultipartUpload(input).getRequestInfo().getHeader(),
+        checksumInfo);
+  }
+
+  @Override
+  public void abortMultipartUpload(String key, String uploadId) {
+    checkAvailableClient();
+    AbortMultipartUploadInput input = AbortMultipartUploadInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .uploadID(uploadId)
+        .build();
+    client.abortMultipartUpload(input);
+  }
+
+  @Override
+  public Iterable<MultipartUpload> listUploads(String prefix) {
+    checkAvailableClient();
+    return new LazyReload<>(() -> {
+      AtomicReference<String> nextKeyMarker = new AtomicReference<>("");
+      AtomicReference<String> nextUploadIdMarker = new AtomicReference<>("");
+      AtomicBoolean isTruncated = new AtomicBoolean(true);
+      return buf -> {
+        // No more uploads when isTruncated is false.
+        if (!isTruncated.get()) {
+          return true;
+        }
+        ListMultipartUploadsV2Input input = ListMultipartUploadsV2Input.builder()
+            .bucket(bucket)
+            .prefix(prefix)
+            .keyMarker(nextKeyMarker.get())
+            .uploadIDMarker(nextUploadIdMarker.get())
+            .build();
+        ListMultipartUploadsV2Output output = client.listMultipartUploads(input);
+        isTruncated.set(output.isTruncated());
+        if (output.getUploads() != null) {
+          // Fill the reloaded uploads into buffer.
+          for (ListedUpload upload : output.getUploads()) {
+            buf.add(new MultipartUpload(upload.getKey(), upload.getUploadID(),
+                ObjectConstants.MIN_PART_SIZE, ObjectConstants.MAX_PART_COUNT));
+          }
+          LOG.info("Retrieve {} uploads with prefix: {}, marker: {}",
+              output.getUploads().size(), nextKeyMarker.get(), nextUploadIdMarker.get());
+        }
+        // Refresh the nextKeyMarker and nextUploadMarker for the next reload.
+        nextKeyMarker.set(output.getNextKeyMarker());
+        nextUploadIdMarker.set(output.getNextUploadIdMarker());
+
+        return !isTruncated.get();
+      };
+    });
+  }
+
+  @Override
+  public Part uploadPartCopy(
+      String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart,
+      long copySourceRangeEnd) {
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(srcKey), "Source key should not be empty.");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(dstKey), "Dest key should not be empty.");
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(uploadId), "Upload ID should not be empty.");
+    Preconditions.checkArgument(copySourceRangeStart >= 0, "CopySourceRangeStart must be >= 0.");
+    Preconditions.checkArgument(copySourceRangeEnd >= 0, "CopySourceRangeEnd must be >= 0.");
+    Preconditions.checkNotNull(copySourceRangeEnd >= copySourceRangeStart,
+        "CopySourceRangeEnd must be >= copySourceRangeStart.");
+    checkAvailableClient();
+    UploadPartCopyV2Input input = UploadPartCopyV2Input.builder()
+        .bucket(bucket)
+        .key(dstKey)
+        .uploadID(uploadId)
+        .sourceBucket(bucket)
+        .sourceKey(srcKey)
+        .partNumber(partNum)
+        .copySourceRange(copySourceRangeStart, copySourceRangeEnd)
+        .options(createMetaOptions())
+        .build();
+    UploadPartCopyV2Output output = client.uploadPartCopy(input);
+    return new Part(output.getPartNumber(), copySourceRangeEnd - copySourceRangeStart + 1,
+        output.getEtag());
+  }
+
+  @Override
+  public void copy(String srcKey, String dstKey) {
+    checkAvailableClient();
+    CopyObjectV2Input input = CopyObjectV2Input.builder()
+        .bucket(bucket)
+        .key(dstKey)
+        .srcBucket(bucket)
+        .srcKey(srcKey)
+        .options(createMetaOptions())
+        .build();
+    client.copyObject(input);
+  }
+
+  private ObjectMetaRequestOptions createMetaOptions() {
+    return new ObjectMetaRequestOptions().setAclType(defaultAcl);
+  }
+
+  @Override
+  public void rename(String srcKey, String dstKey) {
+    checkAvailableClient();
+    Preconditions.checkArgument(!Objects.equals(srcKey, dstKey),
+        "Cannot rename to the same object");
+
+    RenameObjectInput request = RenameObjectInput.builder()
+        .bucket(bucket)
+        .key(srcKey)
+        .newKey(dstKey)
+        .build();
+    client.renameObject(request);
+  }
+
+  // TOS allows up to 10 tags. AWS S3 allows up to 10 tags too.
+  @Override
+  public void putTags(String key, Map<String, String> newTags) {
+    checkAvailableClient();
+    List<Tag> tags = newTags.entrySet().stream()
+        .map(e -> new Tag().setKey(e.getKey()).setValue(e.getValue()))
+        .collect(Collectors.toList());
+
+    if (tags.size() > 0) {
+      client.putObjectTagging(createPutTagInput(bucket, key, tags));
+    } else {
+      client.deleteObjectTagging(createDeleteTagInput(bucket, key));
+    }
+  }
+
+  @Override
+  public Map<String, String> getTags(String key) {
+    Map<String, String> result = new HashMap<>();
+    for (Tag tag : getObjectTaggingList(key)) {
+      result.put(tag.getKey(), tag.getValue());
+    }
+    return result;
+  }
+
+  private List<Tag> getObjectTaggingList(String key) {
+    checkAvailableClient();
+
+    GetObjectTaggingInput input = GetObjectTaggingInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .build();
+    GetObjectTaggingOutput output = client.getObjectTagging(input);
+
+    TagSet tagSet = output.getTagSet();
+    if (tagSet == null || tagSet.getTags() == null) {
+      return new ArrayList<>();
+    }
+    return tagSet.getTags();
+  }
+
+  private static PutObjectTaggingInput createPutTagInput(String bucket, String key,
+      List<Tag> tags) {
+    return PutObjectTaggingInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .tagSet(TagSet.builder().tags(tags).build())
+        .build();
+  }
+
+  private static DeleteObjectTaggingInput createDeleteTagInput(String bucket, String key) {
+    return DeleteObjectTaggingInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .build();
+  }
+
+  /**
+   * Implement Hadoop FileSystem.getFileStatus semantics through
+   * {@link TOSV2#getFileStatus(GetFileStatusInput)}. <br>
+   *
+   * The detail behavior are as follows:
+   * <ul>
+   *   <li>Assume object 'a/b' exists in TOS, getFileStatus("a/b") will get object('a/b') succeed,
+   *   getFileStatus("a/b/") will get 404.</li>
+   *   <li>Assume object 'a/b/' exists in TOS, both getFileStatus("a/b") & getFileStatus("a/b/")
+   *   will get object('a/b/') succeed </li>
+   *   <li>Assume object 'a/b/c' exists in TOS, both getFileStatus("a/b") & getFileStatus("a/b/")
+   *   will get object('a/b/') succeed.</li>
+   * </ul>
+   * <p>
+   * And the following is the logic of {@link TOSV2#getFileStatus(GetFileStatusInput)}: <br>
+   * Step 1: Head the specified key, if the head operation is successful, the response is filled
+   * with the actual object. <br>
+   * Step 2: Append the key with the suffix '/' to perform list operation, if the list operation is
+   * successful, the response is filled with the <strong>first object from the listing results
+   * </strong>; if there are no objects, return 404. <br>
+   *
+   * @param key for the object.
+   * @return object
+   */
+  private ObjectInfo getFileStatus(String key) {
+    checkAvailableClient();
+    Preconditions.checkArgument(!Strings.isNullOrEmpty(key), "key should not be empty.");
+
+    GetFileStatusInput input = GetFileStatusInput.builder()
+        .bucket(bucket)
+        .key(key)
+        .build();
+    try {
+      GetFileStatusOutput output = client.getFileStatus(input);
+      if (key.equals(output.getKey()) && !ObjectInfo.isDir(output.getKey())) {
+        return new ObjectInfo(key, output.getSize(), output.getLastModifiedInDate(),
+            parseChecksum(output, checksumInfo));
+      } else {
+        String dirKey = ObjectInfo.isDir(key) ? key : key + '/';
+
+        // If only the prefix exists but dir object key doesn't exist, will use the current date as
+        // the modified date.
+        Date lastModifiedInDate =
+            dirKey.equals(output.getKey()) ? output.getLastModifiedInDate() : new Date();
+        return new ObjectInfo(dirKey, 0, lastModifiedInDate, Constants.MAGIC_CHECKSUM, true);
+      }
+    } catch (TosException e) {
+      // the specified object does not exist.
+      if (e.getStatusCode() == NOT_FOUND_CODE) {
+        return null;
+      }
+
+      if (e.getStatusCode() == PATH_CONFLICT_CODE) {
+        throw new InvalidObjectKeyException(e);
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public ObjectInfo objectStatus(String key) {
+    if (bucket().isDirectory()) {
+      return head(key);
+    } else if (conf.getBoolean(TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED,
+        TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED_DEFAULT)) {
+      return getFileStatus(key);
+    } else {
+      ObjectInfo obj = head(key);
+      if (obj == null && !ObjectInfo.isDir(key)) {
+        key = key + '/';
+        obj = head(key);
+      }
+
+      if (obj == null) {
+        Iterable<ObjectInfo> objs = list(key, null, 1);
+        if (objs.iterator().hasNext()) {
+          obj = new ObjectInfo(key, 0, new Date(0), Constants.MAGIC_CHECKSUM, true);
+        }
+      }
+
+      return obj;
+    }
+  }
+
+  @Override
+  public ChecksumInfo checksumInfo() {
+    return checksumInfo;
+  }
+
+  @Override
+  public void close() throws IOException {
+    client.close();
+  }
 }

+ 57 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSErrorCodes.java

@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import org.apache.hadoop.util.Lists;
+
+import java.util.List;
+import java.util.StringJoiner;
+
+public class TOSErrorCodes {
+  private TOSErrorCodes() {
+  }
+
+  // The 409 error codes of HNS
+  public static final String DELETE_NON_EMPTY_DIR = "0026-00000013";
+  public static final String LOCATED_UNDER_A_FILE = "0026-00000020";
+  public static final String COPY_BETWEEN_DIR_AND_FILE = "0026-00000021";
+  public static final String PATH_LOCK_CONFLICT = "0026-00000022";
+  public static final String RENAME_TO_AN_EXISTED_DIR = "0026-00000025";
+  public static final String RENAME_TO_SUB_DIR = "0026-00000026";
+  public static final String RENAME_BETWEEN_DIR_AND_FILE = "0026-00000027";
+
+  // The 409 error codes shared by HNS and FNS.
+  public static final String APPEND_OFFSET_NOT_MATCHED = "0017-00000208";
+  public static final String APPEND_NOT_APPENDABLE = "0017-00000209";
+
+
+  // The bellow error cannot be solved by retry the request except the code PATH_LOCK_CONFLICT,
+  // so need to fail fast.
+  public static String FAST_FAILURE_CONFLICT_ERROR_CODES = new StringJoiner(",")
+      .add(DELETE_NON_EMPTY_DIR)
+      .add(LOCATED_UNDER_A_FILE)
+      .add(COPY_BETWEEN_DIR_AND_FILE)
+      .add(RENAME_TO_AN_EXISTED_DIR)
+      .add(RENAME_TO_SUB_DIR)
+      .add(RENAME_BETWEEN_DIR_AND_FILE)
+      .add(APPEND_OFFSET_NOT_MATCHED)
+      .add(APPEND_NOT_APPENDABLE)
+      .toString();
+}
+

+ 119 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSInputStream.java

@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.thirdparty.com.google.common.io.ByteStreams;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+class TOSInputStream extends InputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(TOSInputStream.class);
+
+  private final GetObjectOutput output;
+  private final InputStream stream;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  private long curOff;
+  private final long endOff; // range end offset (inclusive)
+  private final long maxDrainByteSize;
+
+  TOSInputStream(GetObjectOutput output, long startOff, long endOff, long maxDrainByteSize, byte[] expectedChecksum)
+      throws IOException {
+    this.output = output;
+    this.stream = output.verifiedContent(expectedChecksum);
+    this.curOff = startOff;
+    this.endOff = endOff;
+    this.maxDrainByteSize = maxDrainByteSize;
+  }
+
+  @Override
+  public int read() throws IOException {
+    int b = stream.read();
+    curOff += 1;
+    return b;
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    int readed = 0;
+    int n;
+    do {
+      n = stream.read(b, off + readed, len - readed);
+      if (n > 0) {
+        readed += n;
+      }
+    } while (n > 0);
+
+    if (readed == 0) {
+      return n;
+    } else {
+      curOff += readed;
+      return readed;
+    }
+  }
+
+  // Only visible for testing.
+  GetObjectOutput getObjectOutput() {
+    return output;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closed.compareAndSet(false, true)) {
+      if (endOff >= 0) {
+        // The unread bytes is known. we just skip the bytes if gap <= expected drain size (to reuse the socket conn),
+        // otherwise we force close the socket conn without reading any bytes in the future.
+        long gap = endOff - curOff + 1;
+        if (gap <= maxDrainByteSize) {
+          // The close will try to drain bytes internally.
+          stream.close();
+        } else {
+          CommonUtils.runQuietly(output::forceClose, false);
+        }
+
+      } else {
+        // The unread bytes is unknown, we try to read the expected drain bytes to see if it's EOF now. If EOF then just
+        // close the stream to reuse the socket conn, otherwise close the connection directly for saving draining time.
+        try {
+          ByteStreams.skipFully(stream, maxDrainByteSize);
+        } catch (Exception e) {
+          if (e instanceof EOFException) {
+            LOG.debug("Stream is EOF now, just close the stream to reuse the socket connection.");
+            stream.close();
+          } else {
+            LOG.debug("Stream skipFully encountered exception, force close the socket connection.", e);
+            // Force close the socket connection.
+            CommonUtils.runQuietly(output::forceClose, false);
+          }
+          return;
+        }
+
+        // Force close the socket connection.
+        CommonUtils.runQuietly(output::forceClose, false);
+      }
+    }
+  }
+}
+

+ 117 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOSUtils.java

@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.model.object.GetFileStatusOutput;
+import com.volcengine.tos.model.object.ListedObjectV2;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+import org.apache.hadoop.fs.tosfs.object.ChecksumInfo;
+import org.apache.hadoop.fs.tosfs.object.ChecksumType;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+
+import java.util.Map;
+
+public class TOSUtils {
+  private TOSUtils() {}
+
+  // Checksum header.
+  public static final Map<ChecksumType, String> CHECKSUM_HEADER = ImmutableMap.of(
+      ChecksumType.CRC32C, "x-tos-hash-crc32c",
+      ChecksumType.CRC64ECMA, "x-tos-hash-crc64ecma"
+  );
+
+  // Object type header. Object is either 'Appendable' or 'Normal'.
+  public static final String OBJECT_TYPE_KEY = "x-tos-object-type";
+  public static final String APPENDABLE_TYPE_VALUE = "Appendable";
+
+  // Checksum is magic checksum if the object doesn't support checksum type.
+  public static byte[] parseChecksum(Map<String, String> headers, ChecksumInfo checksumInfo) {
+    ChecksumType type = checksumInfo.checksumType();
+    String header = CHECKSUM_HEADER.get(type);
+    if (header == null) {
+      return Constants.MAGIC_CHECKSUM;
+    }
+
+    String checksumStr = headers.get(header);
+    if (checksumStr == null) {
+      return Constants.MAGIC_CHECKSUM;
+    }
+
+    return parseChecksumStringToBytes(checksumStr, type);
+  }
+
+  // Checksum is magic checksum if the object doesn't support checksum type.
+  public static byte[] parseChecksum(ListedObjectV2 obj, ChecksumInfo checksumInfo) {
+    ChecksumType type = checksumInfo.checksumType();
+
+    String checksumStr;
+    if (type == ChecksumType.CRC32C) {
+      checksumStr = obj.getHashCrc32c();
+    } else if (type == ChecksumType.CRC64ECMA) {
+      checksumStr = obj.getHashCrc64ecma();
+    } else {
+      throw new IllegalArgumentException(String.format("Checksum type %s is not supported by TOS.", type.name()));
+    }
+
+    if (checksumStr == null) {
+      return Constants.MAGIC_CHECKSUM;
+    }
+
+    return parseChecksumStringToBytes(checksumStr, type);
+  }
+
+  // Checksum is magic checksum if the object doesn't support checksum type.
+  public static byte[] parseChecksum(GetFileStatusOutput obj, ChecksumInfo checksumInfo) {
+    ChecksumType type = checksumInfo.checksumType();
+
+    if (type == ChecksumType.CRC32C) {
+      return parseChecksumStringToBytes(obj.getCrc32(), type);
+    } else if (type == ChecksumType.CRC64ECMA) {
+      return parseChecksumStringToBytes(obj.getCrc64(), type);
+    } else {
+      throw new IllegalArgumentException(String.format("Checksum type %s is not supported by TOS.", type.name()));
+    }
+  }
+
+  public static byte[] parseChecksumStringToBytes(String checksum, ChecksumType type) {
+    if (checksum == null) {
+      return Constants.MAGIC_CHECKSUM;
+    }
+
+    switch (type) {
+    case CRC32C:
+    case CRC64ECMA:
+      return Bytes.toBytes(Long.parseUnsignedLong(checksum));
+    default:
+      throw new IllegalArgumentException(String.format("Checksum type %s is not supported by TOS.", type.name()));
+    }
+  }
+
+  public static String crc64ecma(Map<String, String> headers) {
+    String header = CHECKSUM_HEADER.get(ChecksumType.CRC64ECMA);
+    return headers.get(header);
+  }
+
+  public static boolean appendable(Map<String, String> headers) {
+    String value = headers.get(OBJECT_TYPE_KEY);
+    return APPENDABLE_TYPE_VALUE.equals(value);
+  }
+}
+

+ 79 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TosObjectInfo.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+
+import java.util.Date;
+import java.util.Objects;
+
+public class TosObjectInfo extends ObjectInfo {
+  private final String crc64ecma;
+  private final boolean appendable;
+
+  public TosObjectInfo(String key, long size, Date mtime, byte[] checksum, boolean isDir,
+      boolean appendable, String crc64ecma) {
+    super(key, size, mtime, checksum, isDir);
+    this.crc64ecma = crc64ecma;
+    this.appendable = appendable;
+  }
+
+  public String crc64ecma() {
+    return crc64ecma;
+  }
+
+  public boolean appendable() {
+    return appendable;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (!super.equals(o)) {
+      return false;
+    }
+
+    if (!(o instanceof TosObjectInfo)) {
+      return false;
+    }
+
+    TosObjectInfo that = (TosObjectInfo) o;
+    return Objects.equals(appendable, that.appendable) && Objects.equals(crc64ecma, that.crc64ecma);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(super.hashCode(), appendable, crc64ecma);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("key", key())
+        .add("size", size())
+        .add("mtime", mtime())
+        .add("checksum", Hex.encodeHexString(checksum()))
+        .add("isDir", isDir())
+        .add("appendable", appendable)
+        .add("crc64ecma", crc64ecma)
+        .toString();
+  }
+}
+