Ver Fonte

Integration of TOS: TOS test cases.

1. Add test cases TestChainTOSInputStream, TestDelegationClientBuilder, TestTOSInputStream, TestTOSObjectStorage, TestTOSRetryPolicy.
2. Add common utils:  ThreadPools, Tasks.
lijinglun há 8 meses atrás
pai
commit
42c88327e1

+ 5 - 0
hadoop-cloud-storage-project/hadoop-tos/pom.xml

@@ -66,6 +66,11 @@
       <artifactId>junit</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-core</artifactId>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

+ 22 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java

@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+public class TosFileSystem {
+}

+ 110 - 3
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Bytes.java

@@ -16,7 +16,10 @@
 
 package org.apache.hadoop.fs.tosfs.common;
 
-// TODO: Remove this class?
+import org.apache.hadoop.util.Preconditions;
+
+import java.nio.ByteBuffer;
+
 public class Bytes {
   private Bytes() {
   }
@@ -26,11 +29,11 @@ public class Bytes {
   // Encode basic Java types into big-endian binaries.
 
   public static byte[] toBytes(boolean b) {
-    return new byte[]{b ? (byte) -1 : (byte) 0};
+    return new byte[] { b ? (byte) -1 : (byte) 0 };
   }
 
   public static byte[] toBytes(byte b) {
-    return new byte[]{b};
+    return new byte[] { b };
   }
 
   public static byte[] toBytes(short val) {
@@ -59,4 +62,108 @@ public class Bytes {
     }
     return b;
   }
+
+  // Decode big-endian binaries into basic Java types.
+
+  public static boolean toBoolean(byte[] b) {
+    return toBoolean(b, 0, 1);
+  }
+
+  public static boolean toBoolean(byte[] b, int off) {
+    return toBoolean(b, off, 1);
+  }
+
+  public static boolean toBoolean(byte[] b, int off, int len) {
+    Preconditions.checkArgument(len == 1, "Invalid len: %s", len);
+    return b[off] != (byte) 0;
+  }
+
+  public static byte toByte(byte[] b) {
+    return b[0];
+  }
+
+  public static byte toByte(byte[] b, int off) {
+    return b[off];
+  }
+
+  public static short toShort(byte[] b) {
+    return toShort(b, 0, 2);
+  }
+
+  public static short toShort(byte[] b, int off) {
+    return toShort(b, off, 2);
+  }
+
+  public static short toShort(byte[] b, int off, int len) {
+    Preconditions.checkArgument(len == 2, "Invalid len: %s", len);
+    Preconditions.checkArgument(off >= 0 && off + len <= b.length,
+        "Invalid off: %s, len: %s, array size: %s", off, len, b.length);
+    short n = 0;
+    n = (short) ((n ^ b[off]) & 0xFF);
+    n = (short) (n << 8);
+    n ^= (short) (b[off + 1] & 0xFF);
+    return n;
+  }
+
+  public static int toInt(byte[] b) {
+    return toInt(b, 0, 4);
+  }
+
+  public static int toInt(byte[] b, int off) {
+    return toInt(b, off, 4);
+  }
+
+  public static int toInt(byte[] b, int off, int len) {
+    Preconditions.checkArgument(len == 4, "Invalid len: %s", len);
+    Preconditions.checkArgument(off >= 0 && off + len <= b.length,
+        "Invalid off: %s, len: %s, array size: %s", off, len, b.length);
+    int n = 0;
+    for (int i = off; i < (off + len); i++) {
+      n <<= 8;
+      n ^= b[i] & 0xFF;
+    }
+    return n;
+  }
+
+  public static int toInt(ByteBuffer b) {
+    Preconditions.checkArgument(4 <= b.remaining(),
+        "Invalid ByteBuffer which remaining must be >= 4, but is: %s", b.remaining());
+    int n = 0;
+    int off = b.position();
+    for (int i = off; i < (off + 4); i++) {
+      n <<= 8;
+      n ^= b.get(i) & 0xFF;
+    }
+    return n;
+  }
+
+  public static long toLong(byte[] b) {
+    return toLong(b, 0, 8);
+  }
+
+  public static long toLong(byte[] b, int off) {
+    return toLong(b, off, 8);
+  }
+
+  public static long toLong(byte[] b, int off, int len) {
+    Preconditions.checkArgument(len == 8, "Invalid len: %s", len);
+    Preconditions.checkArgument(off >= 0 && off + len <= b.length,
+        "Invalid off: %s, len: %s, array size: %s", off, len, b.length);
+    long l = 0;
+    for (int i = off; i < off + len; i++) {
+      l <<= 8;
+      l ^= b[i] & 0xFF;
+    }
+    return l;
+  }
+
+  public static byte[] toBytes(byte[] b, int off, int len) {
+    Preconditions.checkArgument(off >= 0, "off %s must be >=0", off);
+    Preconditions.checkArgument(len >= 0, "len %s must be >= 0", len);
+    Preconditions.checkArgument(off + len <= b.length, "off (%s) + len (%s) must be <= %s", off,
+        len, b.length);
+    byte[] data = new byte[len];
+    System.arraycopy(b, off, data, 0, len);
+    return data;
+  }
 }

+ 590 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/Tasks.java

@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.common;
+
+import org.apache.hadoop.util.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * Copied from Apache Iceberg, please see:
+ * <a href="https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/util/Tasks.java">
+ * Tasks</a>
+ */
+public class Tasks {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Tasks.class);
+
+  private Tasks() {
+  }
+
+  public static class UnrecoverableException extends RuntimeException {
+    public UnrecoverableException(String message) {
+      super(message);
+    }
+
+    public UnrecoverableException(String message, Throwable cause) {
+      super(message, cause);
+    }
+
+    public UnrecoverableException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  public interface FailureTask<I, E extends Exception> {
+    void run(I item, Exception exception) throws E;
+  }
+
+  public interface Task<I, E extends Exception> {
+    void run(I item) throws E;
+  }
+
+  public static class Builder<I> {
+    private final Iterable<I> items;
+    private ExecutorService service = null;
+    private FailureTask<I, ?> onFailure = null;
+    private boolean stopOnFailure = false;
+    private boolean throwFailureWhenFinished = true;
+    private Task<I, ?> revertTask = null;
+    private boolean stopRevertsOnFailure = false;
+    private Task<I, ?> abortTask = null;
+    private boolean stopAbortsOnFailure = false;
+
+    // retry settings
+    private List<Class<? extends Exception>> stopRetryExceptions =
+        Lists.newArrayList(UnrecoverableException.class);
+    private List<Class<? extends Exception>> onlyRetryExceptions = null;
+    private Predicate<Exception> shouldRetryPredicate = null;
+    private int maxAttempts = 1; // not all operations can be retried
+    private long minSleepTimeMs = 1000; // 1 second
+    private long maxSleepTimeMs = 600000; // 10 minutes
+    private long maxDurationMs = 600000; // 10 minutes
+    private double scaleFactor = 2.0; // exponential
+
+    public Builder(Iterable<I> items) {
+      this.items = items;
+    }
+
+    public Builder<I> executeWith(ExecutorService svc) {
+      this.service = svc;
+      return this;
+    }
+
+    public Builder<I> onFailure(FailureTask<I, ?> task) {
+      this.onFailure = task;
+      return this;
+    }
+
+    public Builder<I> stopOnFailure() {
+      this.stopOnFailure = true;
+      return this;
+    }
+
+    public Builder<I> throwFailureWhenFinished() {
+      this.throwFailureWhenFinished = true;
+      return this;
+    }
+
+    public Builder<I> throwFailureWhenFinished(boolean throwWhenFinished) {
+      this.throwFailureWhenFinished = throwWhenFinished;
+      return this;
+    }
+
+    public Builder<I> suppressFailureWhenFinished() {
+      this.throwFailureWhenFinished = false;
+      return this;
+    }
+
+    public Builder<I> revertWith(Task<I, ?> task) {
+      this.revertTask = task;
+      return this;
+    }
+
+    public Builder<I> stopRevertsOnFailure() {
+      this.stopRevertsOnFailure = true;
+      return this;
+    }
+
+    public Builder<I> abortWith(Task<I, ?> task) {
+      this.abortTask = task;
+      return this;
+    }
+
+    public Builder<I> stopAbortsOnFailure() {
+      this.stopAbortsOnFailure = true;
+      return this;
+    }
+
+    @SafeVarargs public final Builder<I> stopRetryOn(Class<? extends Exception>... exceptions) {
+      stopRetryExceptions.addAll(Arrays.asList(exceptions));
+      return this;
+    }
+
+    public Builder<I> shouldRetryTest(Predicate<Exception> shouldRetry) {
+      this.shouldRetryPredicate = shouldRetry;
+      return this;
+    }
+
+    public Builder<I> noRetry() {
+      this.maxAttempts = 1;
+      return this;
+    }
+
+    public Builder<I> retry(int nTimes) {
+      this.maxAttempts = nTimes + 1;
+      return this;
+    }
+
+    public Builder<I> onlyRetryOn(Class<? extends Exception> exception) {
+      this.onlyRetryExceptions = Collections.singletonList(exception);
+      return this;
+    }
+
+    @SafeVarargs public final Builder<I> onlyRetryOn(Class<? extends Exception>... exceptions) {
+      this.onlyRetryExceptions = Lists.newArrayList(exceptions);
+      return this;
+    }
+
+    public Builder<I> exponentialBackoff(long backoffMinSleepTimeMs, long backoffMaxSleepTimeMs,
+        long backoffMaxRetryTimeMs, double backoffScaleFactor) {
+      this.minSleepTimeMs = backoffMinSleepTimeMs;
+      this.maxSleepTimeMs = backoffMaxSleepTimeMs;
+      this.maxDurationMs = backoffMaxRetryTimeMs;
+      this.scaleFactor = backoffScaleFactor;
+      return this;
+    }
+
+    public boolean run(Task<I, RuntimeException> task) {
+      return run(task, RuntimeException.class);
+    }
+
+    public <E extends Exception> boolean run(Task<I, E> task, Class<E> exceptionClass) throws E {
+      if (service != null) {
+        return runParallel(task, exceptionClass);
+      } else {
+        return runSingleThreaded(task, exceptionClass);
+      }
+    }
+
+    @SuppressWarnings("checkstyle:CyclomaticComplexity") private <E extends Exception> boolean runSingleThreaded(
+        Task<I, E> task, Class<E> exceptionClass) throws E {
+      List<I> succeeded = Lists.newArrayList();
+      List<Throwable> exceptions = Lists.newArrayList();
+
+      Iterator<I> iterator = items.iterator();
+      boolean threw = true;
+      try {
+        while (iterator.hasNext()) {
+          I item = iterator.next();
+          try {
+            runTaskWithRetry(task, item);
+            succeeded.add(item);
+          } catch (Exception e) {
+            exceptions.add(e);
+
+            if (onFailure != null) {
+              tryRunOnFailure(item, e);
+            }
+
+            if (stopOnFailure) {
+              break;
+            }
+          }
+        }
+
+        threw = false;
+
+      } finally {
+        // threw handles exceptions that were *not* caught by the catch block,
+        // and exceptions that were caught and possibly handled by onFailure
+        // are kept in exceptions.
+        if (threw || !exceptions.isEmpty()) {
+          if (revertTask != null) {
+            boolean failed = false;
+            for (I item : succeeded) {
+              try {
+                revertTask.run(item);
+              } catch (Exception e) {
+                failed = true;
+                LOG.error("Failed to revert task", e);
+                // keep going
+              }
+              if (stopRevertsOnFailure && failed) {
+                break;
+              }
+            }
+          }
+
+          if (abortTask != null) {
+            boolean failed = false;
+            while (iterator.hasNext()) {
+              try {
+                abortTask.run(iterator.next());
+              } catch (Exception e) {
+                failed = true;
+                LOG.error("Failed to abort task", e);
+                // keep going
+              }
+              if (stopAbortsOnFailure && failed) {
+                break;
+              }
+            }
+          }
+        }
+      }
+
+      if (throwFailureWhenFinished && !exceptions.isEmpty()) {
+        Tasks.throwOne(exceptions, exceptionClass);
+      } else if (throwFailureWhenFinished && threw) {
+        throw new RuntimeException("Task set failed with an uncaught throwable");
+      }
+
+      return !threw;
+    }
+
+    private void tryRunOnFailure(I item, Exception failure) {
+      try {
+        onFailure.run(item, failure);
+      } catch (Exception failException) {
+        failure.addSuppressed(failException);
+        LOG.error("Failed to clean up on failure", failException);
+        // keep going
+      }
+    }
+
+    @SuppressWarnings("checkstyle:CyclomaticComplexity") private <E extends Exception> boolean runParallel(
+        final Task<I, E> task, Class<E> exceptionClass) throws E {
+      final Queue<I> succeeded = new ConcurrentLinkedQueue<>();
+      final Queue<Throwable> exceptions = new ConcurrentLinkedQueue<>();
+      final AtomicBoolean taskFailed = new AtomicBoolean(false);
+      final AtomicBoolean abortFailed = new AtomicBoolean(false);
+      final AtomicBoolean revertFailed = new AtomicBoolean(false);
+
+      List<Future<?>> futures = Lists.newArrayList();
+
+      for (final I item : items) {
+        // submit a task for each item that will either run or abort the task
+        futures.add(service.submit(() -> {
+          if (!(stopOnFailure && taskFailed.get())) {
+            // run the task with retries
+            boolean threw = true;
+            try {
+              runTaskWithRetry(task, item);
+
+              succeeded.add(item);
+
+              threw = false;
+
+            } catch (Exception e) {
+              taskFailed.set(true);
+              exceptions.add(e);
+
+              if (onFailure != null) {
+                tryRunOnFailure(item, e);
+              }
+            } finally {
+              if (threw) {
+                taskFailed.set(true);
+              }
+            }
+
+          } else if (abortTask != null) {
+            // abort the task instead of running it
+            if (stopAbortsOnFailure && abortFailed.get()) {
+              return;
+            }
+
+            boolean failed = true;
+            try {
+              abortTask.run(item);
+              failed = false;
+            } catch (Exception e) {
+              LOG.error("Failed to abort task", e);
+              // swallow the exception
+            } finally {
+              if (failed) {
+                abortFailed.set(true);
+              }
+            }
+          }
+        }));
+      }
+
+      // let the above tasks complete (or abort)
+      exceptions.addAll(waitFor(futures));
+      futures.clear();
+
+      if (taskFailed.get() && revertTask != null) {
+        // at least one task failed, revert any that succeeded
+        for (final I item : succeeded) {
+          futures.add(service.submit(() -> {
+            if (stopRevertsOnFailure && revertFailed.get()) {
+              return;
+            }
+
+            boolean failed = true;
+            try {
+              revertTask.run(item);
+              failed = false;
+            } catch (Exception e) {
+              LOG.error("Failed to revert task", e);
+              // swallow the exception
+            } finally {
+              if (failed) {
+                revertFailed.set(true);
+              }
+            }
+          }));
+        }
+
+        // let the revert tasks complete
+        exceptions.addAll(waitFor(futures));
+      }
+
+      if (throwFailureWhenFinished && !exceptions.isEmpty()) {
+        Tasks.throwOne(exceptions, exceptionClass);
+      } else if (throwFailureWhenFinished && taskFailed.get()) {
+        throw new RuntimeException("Task set failed with an uncaught throwable");
+      }
+
+      return !taskFailed.get();
+    }
+
+    @SuppressWarnings("checkstyle:CyclomaticComplexity") private <E extends Exception> void runTaskWithRetry(
+        Task<I, E> task, I item) throws E {
+      long start = System.currentTimeMillis();
+      int attempt = 0;
+      while (true) {
+        attempt += 1;
+        try {
+          task.run(item);
+          break;
+
+        } catch (Exception e) {
+          long durationMs = System.currentTimeMillis() - start;
+          if (attempt >= maxAttempts || (durationMs > maxDurationMs && attempt > 1)) {
+            if (durationMs > maxDurationMs) {
+              LOG.info("Stopping retries after {} ms", durationMs);
+            }
+            throw e;
+          }
+
+          if (shouldRetryPredicate != null) {
+            if (!shouldRetryPredicate.test(e)) {
+              throw e;
+            }
+
+          } else if (onlyRetryExceptions != null) {
+            // if onlyRetryExceptions are present, then this retries if one is found
+            boolean matchedRetryException = false;
+            for (Class<? extends Exception> exClass : onlyRetryExceptions) {
+              if (exClass.isInstance(e)) {
+                matchedRetryException = true;
+                break;
+              }
+            }
+            if (!matchedRetryException) {
+              throw e;
+            }
+
+          } else {
+            // otherwise, always retry unless one of the stop exceptions is found
+            for (Class<? extends Exception> exClass : stopRetryExceptions) {
+              if (exClass.isInstance(e)) {
+                throw e;
+              }
+            }
+          }
+
+          int delayMs =
+              (int) Math.min(minSleepTimeMs * Math.pow(scaleFactor, attempt - 1), maxSleepTimeMs);
+          int jitter = ThreadLocalRandom.current().nextInt(Math.max(1, (int) (delayMs * 0.1)));
+
+          LOG.warn("Retrying task after failure: {}", e.getMessage(), e);
+
+          try {
+            TimeUnit.MILLISECONDS.sleep(delayMs + jitter);
+          } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(ie);
+          }
+        }
+      }
+    }
+  }
+
+  @SuppressWarnings("checkstyle:CyclomaticComplexity") private static Collection<Throwable> waitFor(
+      Collection<Future<?>> futures) {
+    while (true) {
+      int numFinished = 0;
+      for (Future<?> future : futures) {
+        if (future.isDone()) {
+          numFinished += 1;
+        }
+      }
+
+      if (numFinished == futures.size()) {
+        List<Throwable> uncaught = Lists.newArrayList();
+        // all of the futures are done, get any uncaught exceptions
+        for (Future<?> future : futures) {
+          try {
+            future.get();
+
+          } catch (InterruptedException e) {
+            LOG.warn("Interrupted while getting future results", e);
+            for (Throwable t : uncaught) {
+              e.addSuppressed(t);
+            }
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+
+          } catch (CancellationException e) {
+            // ignore cancellations
+
+          } catch (ExecutionException e) {
+            Throwable cause = e.getCause();
+            if (Error.class.isInstance(cause)) {
+              for (Throwable t : uncaught) {
+                cause.addSuppressed(t);
+              }
+              throw (Error) cause;
+            }
+
+            if (cause != null) {
+              uncaught.add(e);
+            }
+
+            LOG.warn("Task threw uncaught exception", cause);
+          }
+        }
+
+        return uncaught;
+
+      } else {
+        try {
+          Thread.sleep(10);
+        } catch (InterruptedException e) {
+          LOG.warn("Interrupted while waiting for tasks to finish", e);
+
+          for (Future<?> future : futures) {
+            future.cancel(true);
+          }
+          Thread.currentThread().interrupt();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+  }
+
+  /**
+   * A range, [ 0, size )
+   */
+  private static class Range implements Iterable<Integer> {
+    private final int size;
+
+    Range(int size) {
+      this.size = size;
+    }
+
+    @Override
+    public Iterator<Integer> iterator() {
+      return new Iterator<Integer>() {
+        private int current = 0;
+
+        @Override
+        public boolean hasNext() {
+          return current < size;
+        }
+
+        @Override
+        public Integer next() {
+          int ret = current;
+          current += 1;
+          return ret;
+        }
+      };
+    }
+  }
+
+  public static Builder<Integer> range(int upTo) {
+    return new Builder<>(new Range(upTo));
+  }
+
+  public static <I> Builder<I> foreach(Iterator<I> items) {
+    return new Builder<>(() -> items);
+  }
+
+  public static <I> Builder<I> foreach(Iterable<I> items) {
+    return new Builder<>(items);
+  }
+
+  @SafeVarargs public static <I> Builder<I> foreach(I... items) {
+    return new Builder<>(Arrays.asList(items));
+  }
+
+  @SuppressWarnings("StreamToIterable") public static <I> Builder<I> foreach(Stream<I> items) {
+    return new Builder<>(items::iterator);
+  }
+
+  private static <E extends Exception> void throwOne(Collection<Throwable> exceptions,
+      Class<E> allowedException) throws E {
+    Iterator<Throwable> iter = exceptions.iterator();
+    Throwable exception = iter.next();
+    Class<? extends Throwable> exceptionClass = exception.getClass();
+
+    while (iter.hasNext()) {
+      Throwable other = iter.next();
+      if (!exceptionClass.isInstance(other)) {
+        exception.addSuppressed(other);
+      }
+    }
+
+    castAndThrow(exception, allowedException);
+  }
+
+  @SuppressWarnings("unchecked") public static <E extends Exception> void castAndThrow(
+      Throwable exception, Class<E> exceptionClass) throws E {
+    if (exception instanceof RuntimeException) {
+      throw (RuntimeException) exception;
+    } else if (exception instanceof Error) {
+      throw (Error) exception;
+    } else if (exceptionClass.isInstance(exception)) {
+      throw (E) exception;
+    }
+    throw new RuntimeException(exception);
+  }
+}

+ 142 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java

@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.common;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Copied from Apache Iceberg, please see:
+ * <a href="https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/util/ThreadPools.java">
+ * ThreadPools</a>
+ */
+public class ThreadPools {
+
+  private static final Logger LOG = LoggerFactory.getLogger(ThreadPools.class);
+
+  private ThreadPools() {
+  }
+
+  public static final String WORKER_THREAD_POOL_SIZE_PROP = "proton.worker.num-threads";
+
+  public static final int WORKER_THREAD_POOL_SIZE =
+      poolSize(Math.max(2, Runtime.getRuntime().availableProcessors()));
+
+  private static final ExecutorService WORKER_POOL = newWorkerPool("proton-default-worker-pool");
+
+  public static ExecutorService defaultWorkerPool() {
+    return WORKER_POOL;
+  }
+
+  public static ExecutorService newWorkerPool(String namePrefix) {
+    return newWorkerPool(namePrefix, WORKER_THREAD_POOL_SIZE);
+  }
+
+  public static ExecutorService newWorkerPool(String namePrefix, int poolSize) {
+    return Executors.newFixedThreadPool(poolSize, newDaemonThreadFactory(namePrefix));
+  }
+
+  public static ScheduledExecutorService newScheduleWorkerPool(String namePrefix, int poolSize) {
+    return Executors.newScheduledThreadPool(poolSize, newDaemonThreadFactory(namePrefix));
+  }
+
+  /**
+   * Helper routine to shutdown a {@link ExecutorService}. Will wait up to a
+   * certain timeout for the ExecutorService to gracefully shutdown. If the
+   * ExecutorService did not shutdown and there are still tasks unfinished after
+   * the timeout period, the ExecutorService will be notified to forcibly shut
+   * down. Another timeout period will be waited before giving up. So, at most,
+   * a shutdown will be allowed to wait up to twice the timeout value before
+   * giving up.
+   * <p>
+   * This method is copied from
+   * {@link org.apache.hadoop.util.concurrent.HadoopExecutors#shutdown(ExecutorService, Logger, long, TimeUnit)}.
+   *
+   * @param executorService ExecutorService to shutdown
+   * @param timeout         the maximum time to wait
+   * @param unit            the time unit of the timeout argument
+   */
+  public static void shutdown(ExecutorService executorService, long timeout, TimeUnit unit) {
+    if (executorService == null) {
+      return;
+    }
+
+    try {
+      executorService.shutdown();
+      LOG.debug("Gracefully shutting down executor service. Waiting max {} {}", timeout, unit);
+
+      if (!executorService.awaitTermination(timeout, unit)) {
+        LOG.debug(
+            "Executor service has not shutdown yet. Forcing. Will wait up to an additional {} {} for shutdown",
+            timeout, unit);
+        executorService.shutdownNow();
+      }
+
+      if (executorService.awaitTermination(timeout, unit)) {
+        LOG.debug("Succesfully shutdown executor service");
+      } else {
+        LOG.error("Unable to shutdown executor service after timeout {} {}", (2 * timeout), unit);
+      }
+    } catch (InterruptedException e) {
+      LOG.error("Interrupted while attempting to shutdown", e);
+      executorService.shutdownNow();
+    } catch (Exception e) {
+      LOG.warn("Exception closing executor service {}", e.getMessage());
+      LOG.debug("Exception closing executor service", e);
+      throw e;
+    }
+  }
+
+  private static int poolSize(int defaultSize) {
+    String value = System.getProperty(WORKER_THREAD_POOL_SIZE_PROP);
+    if (value != null) {
+      try {
+        return Integer.parseUnsignedInt(value);
+      } catch (NumberFormatException e) {
+        // will return the default
+      }
+    }
+    return defaultSize;
+  }
+
+  public static ThreadFactory newDaemonThreadFactory(String namePrefix) {
+    return new ThreadFactoryBuilder().setDaemon(true).setNameFormat(namePrefix + "-%d")
+        .setUncaughtExceptionHandler(
+            (t, e) -> LOG.error("Thread {} encounter uncaught exception", t, e)).build();
+  }
+
+  public static Thread newDaemonThread(String name, Runnable runnable,
+      UncaughtExceptionHandler handler) {
+    Thread t = new Thread(runnable);
+    t.setName(name);
+    t.setDaemon(true);
+    if (handler != null) {
+      t.setUncaughtExceptionHandler(handler);
+    }
+    return t;
+  }
+}

+ 241 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestChainTOSInputStream.java

@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.model.object.GetObjectBasicOutput;
+import com.volcengine.tos.model.object.GetObjectV2Output;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+
+public class TestChainTOSInputStream {
+
+  private static final int DATA_SIZE = 1 << 20;
+  private static final byte[] DATA = TestUtility.rand(DATA_SIZE);
+
+  @Test
+  public void testRetryReadData() throws IOException {
+    int readLen = DATA_SIZE - 1;
+    int cutOff = readLen / 2;
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
+        cutOff)) {
+      // The read length is more than the cut-off position, and equal to data length,
+      // so the first stream will throw IOException, and fallback to the second stream.
+      byte[] data = new byte[readLen];
+      int n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, 0, readLen), data);
+    }
+
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
+        cutOff)) {
+      // The read length is more than data length, so the first stream will throw IOException,
+      // and fallback to the second stream.
+      byte[] data = new byte[readLen + 2];
+      int n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, 0, readLen), Bytes.toBytes(data, 0, n));
+    }
+
+    readLen = DATA_SIZE / 3;
+    cutOff = DATA_SIZE / 2;
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE, 1024,
+        cutOff)) {
+      for (int i = 0; i <= 3; i++) {
+        // The cut-off position is between (readLen, 2 * readLen), so the data of first read come from the first stream,
+        // and then the second read will meet IOException, and fallback to the second stream.
+        byte[] data = new byte[readLen];
+        int n = stream.read(data);
+
+        int off = i * readLen;
+        int len = Math.min(readLen, DATA_SIZE - off);
+
+        Assert.assertEquals(len, n);
+        Assert.assertArrayEquals(Bytes.toBytes(DATA, off, len), Bytes.toBytes(data, 0, len));
+      }
+    }
+
+    int smallDataSize = 1 << 10;
+    cutOff = smallDataSize / 2;
+    byte[] smallData = TestUtility.rand(1 << 10);
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(smallData, 0, smallDataSize,
+        1024, cutOff)) {
+      for (int i = 0; i < smallDataSize; i++) {
+        // The cut-off position is 512, the 512th read operation will meet IOException,
+        // and then fallback to the second stream.
+        int read = stream.read();
+        Assert.assertEquals(smallData[i] & 0xFF, read);
+      }
+    }
+  }
+
+  @Test
+  public void testSkipAndRead() throws IOException {
+    int cutOff = (DATA_SIZE - 1) / 2;
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
+        cutOff)) {
+      // The skip pos is equal to cut-off pos, once skip finished, the first read operation will meet IOException,
+      // and the fallback to the second stream.
+      int readPos = (DATA_SIZE - 1) / 2;
+      stream.skip(readPos);
+
+      int readLen = 1024;
+      byte[] data = new byte[readLen];
+      int n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
+    }
+
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
+        cutOff)) {
+      // The skip pos is more than cut-off pos, the skip operation will throw IOException,
+      // and the fallback to the second stream and skip(readPos) again
+      int readPos = cutOff + 1024;
+      stream.skip(readPos);
+
+      int readLen = 1024;
+      byte[] data = new byte[readLen];
+      int n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
+    }
+
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
+        cutOff)) {
+      // The skip pos = cut-off pos - 1025, the skip operation will succeed on the first stream,
+      // the 1024 bytes read operation also succeed on the first stream,
+      // but the next 1024 bytes read operation will fail on the first stream, and fallback to the second stream
+      int readPos = cutOff - 1024 - 1;
+      stream.skip(readPos);
+
+      int readLen = 1024;
+      byte[] data = new byte[readLen];
+      int n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
+
+      n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos + 1024, readLen), data);
+    }
+
+    try (ChainTOSInputStream stream = createTestChainTOSInputStream(DATA, 0, DATA_SIZE - 1, 1024,
+        cutOff)) {
+      // 1. Skip 1024 bytes and then read 1024 bytes from the first stream.
+      // 2. And then skip cut-off - 512 bytes, the target off = 1024 + 1024 + cut-off - 512,
+      // which is bigger than cut-off pos, so the second skip operation will fail,
+      // and then fallback to the second stream.
+      // 3. Read 1024 bytes
+      int readPos = 1024;
+      stream.skip(readPos);
+
+      int readLen = 1024;
+      byte[] data = new byte[readLen];
+      int n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, readPos, readLen), data);
+
+      int skipPos = cutOff - 512;
+      stream.skip(skipPos);
+
+      n = stream.read(data);
+      Assert.assertEquals(readLen, n);
+      int targetOff = readPos + 1024 + skipPos;
+      Assert.assertArrayEquals(Bytes.toBytes(DATA, targetOff, readLen), data);
+    }
+  }
+
+  /**
+   * The ChainTOSInputStream contains two stream created by TestObjectFactory.
+   * Once the read pos of first stream is more than cutPos, the stream will throw IOException with
+   * unexpect end of stream error msg, but the second stream will contain the remaining data.
+   */
+  private ChainTOSInputStream createTestChainTOSInputStream(byte[] data, long startOff, long endOff,
+      long maxDrainSize, long cutPos) {
+    String key = "dummy-key";
+    TOS.GetObjectFactory factory = new TestObjectFactory(data, Arrays.asList(cutPos, -1L));
+    return new ChainTOSInputStream(factory, key, startOff, endOff, maxDrainSize, 1);
+  }
+
+  private static class TestObjectFactory implements TOS.GetObjectFactory {
+    private final byte[] data;
+    private final List<Long> streamBreakPoses;
+    private int streamIndex = 0;
+
+    TestObjectFactory(byte[] data, List<Long> streamBreakPoses) {
+      this.data = data;
+      this.streamBreakPoses = streamBreakPoses;
+    }
+
+    @Override
+    public GetObjectOutput create(String key, long offset, long end) {
+      long len = Math.min(end, data.length) - offset;
+      ByteArrayInputStream data = new ByteArrayInputStream(this.data, (int) offset, (int) len);
+
+      if (streamIndex < streamBreakPoses.size()) {
+        return new GetObjectOutput(new GetObjectV2Output(new GetObjectBasicOutput(),
+            new UnExpectedEndOfStream(data, streamBreakPoses.get(streamIndex++))),
+            Constants.MAGIC_CHECKSUM);
+      } else {
+        throw new RuntimeException("No more output");
+      }
+    }
+  }
+
+  private static class UnExpectedEndOfStream extends InputStream {
+    private final ByteArrayInputStream delegate;
+    private final long breakPos;
+    private int readPos;
+
+    UnExpectedEndOfStream(ByteArrayInputStream stream, long breakPos) {
+      delegate = stream;
+      this.breakPos = breakPos;
+    }
+
+    @Override
+    public int read() throws IOException {
+      if (breakPos != -1 && readPos >= breakPos) {
+        throw new IOException("unexpected end of stream on dummy source.");
+      } else {
+        int n = delegate.read();
+        readPos += 1;
+        return n;
+      }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      if (breakPos != -1 && readPos >= breakPos) {
+        throw new IOException("unexpected end of stream on dummy source.");
+      } else {
+        int n = delegate.read(b, off, len);
+        readPos += n;
+        return n;
+      }
+    }
+  }
+}

+ 450 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java

@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.TOSV2;
+import com.volcengine.tos.TOSV2ClientBuilder;
+import com.volcengine.tos.TosClientException;
+import com.volcengine.tos.TosException;
+import com.volcengine.tos.TosServerException;
+import com.volcengine.tos.auth.Credential;
+import com.volcengine.tos.auth.StaticCredentials;
+import com.volcengine.tos.comm.HttpStatus;
+import com.volcengine.tos.model.object.DeleteObjectInput;
+import com.volcengine.tos.model.object.HeadObjectV2Input;
+import com.volcengine.tos.model.object.HeadObjectV2Output;
+import com.volcengine.tos.model.object.ListObjectsV2Input;
+import com.volcengine.tos.model.object.ListObjectsV2Output;
+import com.volcengine.tos.model.object.PutObjectInput;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.common.Tasks;
+import org.apache.hadoop.fs.tosfs.common.ThreadPools;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.tos.auth.EnvironmentCredentialsProvider;
+import org.apache.hadoop.fs.tosfs.object.tos.auth.SimpleCredentialsProvider;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.SocketException;
+import java.net.SocketTimeoutException;
+import java.net.UnknownHostException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.IntStream;
+import javax.net.ssl.SSLException;
+
+import static org.apache.hadoop.fs.tosfs.object.tos.DelegationClient.isRetryableException;
+import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestDelegationClientBuilder {
+
+  private static final String TEST_KEY = UUIDUtils.random();
+  private static final String TEST_DATA = "1234567890";
+  private static final String ENV_ACCESS_KEY =
+      ParseUtils.envAsString(TOS.ENV_TOS_ACCESS_KEY_ID, false);
+  private static final String ENV_SECRET_KEY =
+      ParseUtils.envAsString(TOS.ENV_TOS_SECRET_ACCESS_KEY, false);
+  private static final String ENV_ENDPOINT = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, false);
+
+  @Rule
+  public TestName name = new TestName();
+
+  // Maximum retry times of the tos http client.
+  public static final String MAX_RETRY_COUNT_KEY = "fs.tos.http.maxRetryCount";
+  public static final int MAX_RETRY_COUNT_DEFAULT = -1;
+
+  @Before
+  public void setUp() {
+    TOSV2 tosSdkClientV2 =
+        new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
+            new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY));
+    try (ByteArrayInputStream stream = new ByteArrayInputStream(TEST_DATA.getBytes())) {
+      PutObjectInput putObjectInput =
+          new PutObjectInput().setBucket(TestUtility.bucket()).setKey(TEST_KEY).setContent(stream);
+      tosSdkClientV2.putObject(putObjectInput);
+    } catch (IOException e) {
+      Assert.fail(e.getMessage());
+    }
+  }
+
+  @Test
+  public void testHeadApiRetry() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY");
+
+    DelegationClient tosV2 = new DelegationClientBuilder().bucket("test").conf(conf).build();
+    TOSV2 mockClient = mock(TOSV2.class);
+    tosV2.setClient(mockClient);
+    tosV2.setMaxRetryTimes(5);
+
+    HeadObjectV2Input input = HeadObjectV2Input.builder().bucket("test").build();
+    when(tosV2.headObject(input)).thenThrow(
+            new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR),
+            new TosServerException(HttpStatus.TOO_MANY_REQUESTS),
+            new TosClientException("fake toe", new IOException("fake ioe")),
+            new TosException(new SocketException("fake msg")),
+            new TosException(new UnknownHostException("fake msg")),
+            new TosException(new SSLException("fake msg")),
+            new TosException(new InterruptedException("fake msg")),
+            new TosException(new InterruptedException("fake msg")))
+        .thenReturn(new HeadObjectV2Output());
+
+    RuntimeException exception =
+        assertThrows(RuntimeException.class, () -> tosV2.headObject(input));
+    assertTrue(exception instanceof TosException);
+    assertTrue(exception.getCause() instanceof UnknownHostException);
+    verify(tosV2.client(), times(5)).headObject(input);
+
+    HeadObjectV2Input inputOneTime = HeadObjectV2Input.builder().bucket("inputOneTime").build();
+    HeadObjectV2Output output = new HeadObjectV2Output();
+    when(tosV2.headObject(inputOneTime)).thenReturn(output);
+    HeadObjectV2Output headObject = tosV2.headObject(inputOneTime);
+    Assert.assertEquals(headObject, output);
+    verify(tosV2.client(), times(1)).headObject(inputOneTime);
+    tosV2.close();
+
+    DelegationClient newClient = new DelegationClientBuilder().bucket("test").conf(conf).build();
+    mockClient = mock(TOSV2.class);
+    newClient.setClient(mockClient);
+    newClient.setMaxRetryTimes(5);
+    when(newClient.headObject(input)).thenThrow(
+        new TosClientException("fake toe", new EOFException("fake eof")),
+        new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR),
+        new TosServerException(HttpStatus.TOO_MANY_REQUESTS)).thenReturn(new HeadObjectV2Output());
+
+    exception = assertThrows(RuntimeException.class, () -> newClient.headObject(input));
+    assertTrue(exception instanceof TosClientException);
+    assertTrue(exception.getCause() instanceof EOFException);
+    verify(newClient.client(), times(1)).headObject(input);
+    newClient.close();
+  }
+
+  @Test
+  public void testEnableCrcCheck() throws IOException {
+    String bucket = name.getMethodName();
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY");
+
+    DelegationClient tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
+    Assert.assertTrue(tosV2.config().isEnableCrc());
+
+    conf.setBoolean(TosKeys.FS_TOS_CRC_CHECK_ENABLED, false);
+    tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
+    Assert.assertFalse(tosV2.config().isEnableCrc());
+
+    tosV2.close();
+  }
+
+  @Test
+  public void testClientCache() throws IOException {
+    String bucket = name.getMethodName();
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), "SECRET_KEY");
+
+    DelegationClient tosV2 = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
+    DelegationClient tosV2Cached = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
+    Assert.assertEquals("client must be load in cache", tosV2Cached, tosV2);
+    Assert.assertEquals("ACCESS_KEY_A", tosV2.usedCredential().getAccessKeyId());
+    tosV2Cached.close();
+
+    String newBucket = "new-test-bucket";
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(newBucket), "ACCESS_KEY_B");
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(newBucket), "SECRET_KEY_B");
+    DelegationClient changeBucketClient =
+        new DelegationClientBuilder().bucket(newBucket).conf(conf).build();
+    Assert.assertNotEquals("client should be created entirely new", changeBucketClient, tosV2);
+    Assert.assertEquals("ACCESS_KEY_B", changeBucketClient.usedCredential().getAccessKeyId());
+    changeBucketClient.close();
+
+    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true); // disable cache: true
+    DelegationClient tosV2NotCached =
+        new DelegationClientBuilder().bucket(bucket).conf(conf).build();
+    Assert.assertNotEquals("client should be created entirely new", tosV2NotCached, tosV2);
+    Assert.assertEquals("ACCESS_KEY_A", tosV2NotCached.usedCredential().getAccessKeyId());
+    tosV2NotCached.close();
+
+    tosV2.close();
+  }
+
+  @Test
+  public void testOverwriteHttpConfig() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY");
+    conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24);
+    conf.setInt(MAX_RETRY_COUNT_KEY, 24);
+    conf.setInt(TosKeys.FS_TOS_REQUEST_MAX_RETRY_TIMES, 24);
+    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
+
+    DelegationClient tosV2 = new DelegationClientBuilder().bucket("test").conf(conf).build();
+    Assert.assertEquals("ACCESS_KEY", tosV2.usedCredential().getAccessKeyId());
+    Assert.assertEquals("http max connection overwrite to 24 from 1024, must be 24", 24,
+        tosV2.config().getTransportConfig().getMaxConnections());
+    Assert.assertEquals("tos maxRetryCount disabled, must be -1",
+        DelegationClientBuilder.DISABLE_TOS_RETRY_VALUE,
+        tosV2.config().getTransportConfig().getMaxRetryCount());
+    Assert.assertEquals("maxRetryTimes must be 24", 24, tosV2.maxRetryTimes());
+    Assert.assertEquals("endpoint must be equals to https://tos-cn-beijing.ivolces.com",
+        "https://tos-cn-beijing.ivolces.com", tosV2.config().getEndpoint());
+
+    tosV2.close();
+  }
+
+  @Test
+  public void testDynamicRefreshAkSk() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY);
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY);
+    conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24);
+    conf.setInt(MAX_RETRY_COUNT_KEY, 24);
+
+    TOSV2 tosSdkClientV2 =
+        new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
+            new StaticCredentials("a", "b"));
+    DelegationClient delegationClientV2 =
+        new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build();
+
+    ListObjectsV2Input inputV2 =
+        ListObjectsV2Input.builder().bucket(TestUtility.bucket()).prefix(TEST_KEY).marker("")
+            .maxKeys(10).build();
+
+    Assert.assertThrows(TosServerException.class, () -> tosSdkClientV2.listObjects(inputV2));
+
+    tosSdkClientV2.changeCredentials(new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY));
+
+    ListObjectsV2Output tosSdkOutput = tosSdkClientV2.listObjects(inputV2);
+    ListObjectsV2Output delegateOutput = delegationClientV2.listObjects(inputV2);
+    int nativeContentSize =
+        tosSdkOutput.getContents() == null ? -1 : tosSdkOutput.getContents().size();
+    int delegateContentSize =
+        delegateOutput.getContents() == null ? -1 : delegateOutput.getContents().size();
+
+    Assert.assertEquals("delegation client must same as native client", nativeContentSize,
+        delegateContentSize);
+    Assert.assertEquals(ENV_ACCESS_KEY, delegationClientV2.usedCredential().getAccessKeyId());
+
+    delegationClientV2.close();
+  }
+
+  @Test
+  public void testCreateClientWithEnvironmentCredentials() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, EnvironmentCredentialsProvider.NAME);
+
+    DelegationClient tosV2 =
+        new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build();
+    Credential cred = tosV2.usedCredential();
+
+    String assertMsg =
+        String.format("expect %s, but got %s", ENV_ACCESS_KEY, cred.getAccessKeyId());
+    Assert.assertEquals(assertMsg, cred.getAccessKeyId(), ENV_ACCESS_KEY);
+    assertMsg = String.format("expect %s, but got %s", ENV_SECRET_KEY, cred.getAccessKeySecret());
+    Assert.assertEquals(assertMsg, cred.getAccessKeySecret(), ENV_SECRET_KEY);
+
+    tosV2.close();
+  }
+
+  @Test
+  public void testCreateClientWithSimpleCredentials() throws IOException {
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT);
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+    conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY);
+    conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY);
+    conf.setInt(TosKeys.FS_TOS_HTTP_MAX_CONNECTIONS, 24);
+    conf.setInt(MAX_RETRY_COUNT_KEY, 24);
+
+    ListObjectsV2Input input =
+        ListObjectsV2Input.builder().bucket(TestUtility.bucket()).prefix(TEST_KEY).marker("")
+            .maxKeys(10).build();
+
+    TOSV2 v2 = new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
+        new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY));
+    ListObjectsV2Output outputV2 = v2.listObjects(input);
+
+    DelegationClient tosV2 =
+        new DelegationClientBuilder().bucket(TestUtility.bucket()).conf(conf).build();
+
+    ListObjectsV2Output output = tosV2.listObjects(input);
+    Assert.assertEquals("delegation client must be same as native client",
+        outputV2.getContents().size(), output.getContents().size());
+
+    tosV2.close();
+  }
+
+  @Test
+  public void testCachedConcurrently() {
+    String bucketName = name.getMethodName();
+
+    Function<String, Configuration> commonConf = bucket -> {
+      Configuration conf = new Configuration();
+      conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT);
+      conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+      conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), ENV_ACCESS_KEY);
+      conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), ENV_SECRET_KEY);
+      return conf;
+    };
+
+    // enable cache
+    Function<String, Configuration> enableCachedConf = bucket -> {
+      Configuration conf = commonConf.apply(bucket);
+      conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
+      return conf;
+    };
+
+    ExecutorService es = ThreadPools.newWorkerPool("testCachedConcurrently", 32);
+    int bucketCount = 5;
+    int taskCount = 10000;
+
+    AtomicInteger success = new AtomicInteger(0);
+    AtomicInteger failure = new AtomicInteger(0);
+    Tasks.foreach(IntStream.range(0, taskCount).boxed().map(i -> bucketName + (i % bucketCount)))
+        .executeWith(es).run(bucket -> {
+          try {
+            Configuration conf = enableCachedConf.apply(bucket);
+            DelegationClient client = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
+            client.close();
+            success.incrementAndGet();
+          } catch (Exception e) {
+            failure.incrementAndGet();
+          }
+        });
+
+    Assert.assertEquals(bucketCount, DelegationClientBuilder.CACHE.size());
+    Assert.assertEquals(taskCount, success.get());
+    Assert.assertEquals(0, failure.get());
+
+    // clear cache
+    DelegationClientBuilder.CACHE.clear();
+
+    // disable cache
+    Function<String, Configuration> disableCachedConf = bucket -> {
+      Configuration conf = commonConf.apply(bucket);
+      conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
+      return conf;
+    };
+
+    success.set(0);
+    failure.set(0);
+    Tasks.foreach(IntStream.range(0, taskCount).boxed().map(i -> bucketName + (i % bucketCount)))
+        .executeWith(es).run(bucket -> {
+          try {
+            Configuration conf = disableCachedConf.apply(bucket);
+            DelegationClient client = new DelegationClientBuilder().bucket(bucket).conf(conf).build();
+            client.close();
+            success.incrementAndGet();
+          } catch (Exception e) {
+            failure.incrementAndGet();
+          }
+        });
+
+    Assert.assertTrue(DelegationClientBuilder.CACHE.isEmpty());
+    Assert.assertEquals(taskCount, success.get());
+    Assert.assertEquals(0, failure.get());
+
+    es.shutdown();
+  }
+
+  @After
+  public void deleteAllTestData() throws IOException {
+    TOSV2 tosSdkClientV2 =
+        new TOSV2ClientBuilder().build(TestUtility.region(), TestUtility.endpoint(),
+            new StaticCredentials(ENV_ACCESS_KEY, ENV_SECRET_KEY));
+    tosSdkClientV2.deleteObject(
+        DeleteObjectInput.builder().bucket(TestUtility.bucket()).key(TEST_KEY).build());
+
+    tosSdkClientV2.close();
+    DelegationClientBuilder.CACHE.clear();
+  }
+
+  @Test
+  public void testRetryableException() {
+    assertTrue(retryableException(new TosServerException(500)));
+    assertTrue(retryableException(new TosServerException(501)));
+    assertTrue(retryableException(new TosServerException(429)));
+    assertFalse(retryableException(new TosServerException(404)));
+
+    assertTrue(retryableException(new TosException(new SocketException())));
+    assertTrue(retryableException(new TosException(new UnknownHostException())));
+    assertTrue(retryableException(new TosException(new SSLException("fake ssl"))));
+    assertTrue(retryableException(new TosException(new SocketTimeoutException())));
+    assertTrue(retryableException(new TosException(new InterruptedException())));
+
+    assertTrue(retryableException(new TosClientException("fake ioe", new IOException())));
+    assertFalse(retryableException(new TosClientException("fake eof", new EOFException())));
+
+    assertTrue(retryableException(new TosServerException(409)));
+    assertTrue(
+        retryableException(new TosServerException(409).setEc(TOSErrorCodes.PATH_LOCK_CONFLICT)));
+    assertFalse(
+        retryableException(new TosServerException(409).setEc(TOSErrorCodes.DELETE_NON_EMPTY_DIR)));
+    assertFalse(
+        retryableException(new TosServerException(409).setEc(TOSErrorCodes.LOCATED_UNDER_A_FILE)));
+    assertFalse(retryableException(
+        new TosServerException(409).setEc(TOSErrorCodes.COPY_BETWEEN_DIR_AND_FILE)));
+    assertFalse(retryableException(
+        new TosServerException(409).setEc(TOSErrorCodes.RENAME_TO_AN_EXISTED_DIR)));
+    assertFalse(
+        retryableException(new TosServerException(409).setEc(TOSErrorCodes.RENAME_TO_SUB_DIR)));
+    assertFalse(retryableException(
+        new TosServerException(409).setEc(TOSErrorCodes.RENAME_BETWEEN_DIR_AND_FILE)));
+  }
+
+  private boolean retryableException(TosException e) {
+    return isRetryableException(e,
+        Arrays.asList(TOSErrorCodes.FAST_FAILURE_CONFLICT_ERROR_CODES.split(",")));
+  }
+}

+ 150 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSInputStream.java

@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.internal.util.aborthook.AbortInputStreamHook;
+import com.volcengine.tos.model.object.GetObjectBasicOutput;
+import com.volcengine.tos.model.object.GetObjectV2Output;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.apache.hadoop.thirdparty.com.google.common.io.ByteStreams;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class TestTOSInputStream {
+
+  private static final int DATA_SIZE = 1 << 20;
+  private static final byte[] DATA = TestUtility.rand(DATA_SIZE);
+
+  @Test
+  public void testForceClose() throws IOException {
+    TOSInputStream stream = createStream(DATA, 0, DATA_SIZE - 1, 1024);
+    stream.close();
+    Assert.assertTrue("Expected force close", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, DATA_SIZE - 1, 1024);
+    ByteStreams.skipFully(stream, DATA_SIZE - 1024 - 1);
+    stream.close();
+    Assert.assertTrue("Expected force close", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, -1, 1024);
+    stream.close();
+    Assert.assertTrue("Expected force close", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, -1, 1024);
+    ByteStreams.skipFully(stream, DATA_SIZE - 1024 - 1);
+    stream.close();
+    Assert.assertTrue("Expected force close", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, -1, 1024);
+    ByteStreams.skipFully(stream, DATA_SIZE - 1024);
+    stream.close();
+    Assert.assertTrue("Expected force close", cast(stream).isForceClose());
+  }
+
+  @Test
+  public void testClose() throws IOException {
+    TOSInputStream stream = createStream(DATA, 0, DATA_SIZE - 1, DATA_SIZE);
+    stream.close();
+    Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, DATA_SIZE - 1, 1024);
+    ByteStreams.skipFully(stream, DATA_SIZE - 1024);
+    stream.close();
+    Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, DATA_SIZE - 1, 1024);
+    ByteStreams.skipFully(stream, DATA_SIZE - 1023);
+    stream.close();
+    Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, -1, DATA_SIZE + 1);
+    stream.close();
+    Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, -1, 1024);
+    ByteStreams.skipFully(stream, DATA_SIZE - 1023);
+    stream.close();
+    Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose());
+
+    stream = createStream(DATA, 0, -1, 1024);
+    ByteStreams.skipFully(stream, DATA_SIZE);
+    stream.close();
+    Assert.assertFalse("Expected close by skipping bytes", cast(stream).isForceClose());
+  }
+
+  private TestInputStream cast(TOSInputStream stream) throws IOException {
+    InputStream content = stream.getObjectOutput().verifiedContent(Constants.MAGIC_CHECKSUM);
+    Assert.assertTrue("Not a TestInputStream", content instanceof TestInputStream);
+    return (TestInputStream) content;
+  }
+
+  private TOSInputStream createStream(byte[] data, long startOff, long endOff, long maxDrainSize)
+      throws IOException {
+    TestInputStream stream =
+        new TestInputStream(data, (int) startOff, (int) (data.length - startOff));
+    GetObjectV2Output output = new GetObjectV2Output(new GetObjectBasicOutput(), stream).setHook(
+        new ForceCloseHook(stream));
+
+    return new TOSInputStream(new GetObjectOutput(output, Constants.MAGIC_CHECKSUM), startOff,
+        endOff, maxDrainSize, Constants.MAGIC_CHECKSUM);
+  }
+
+  private static class TestInputStream extends ByteArrayInputStream {
+    // -1 means call close()
+    //  0 means neither call close() nor forceClose()
+    //  1 means call forceClose()
+    private int cloeState = 0;
+
+    private TestInputStream(byte[] buf, int off, int len) {
+      super(buf, off, len);
+    }
+
+    @Override public void close() {
+      cloeState = -1;
+    }
+
+    public void forceClose() {
+      cloeState = 1;
+    }
+
+    boolean isForceClose() {
+      Assert.assertTrue("Neither call close() nor forceClose()", cloeState == -1 || cloeState == 1);
+      return cloeState == 1;
+    }
+  }
+
+  private static class ForceCloseHook implements AbortInputStreamHook {
+    private final TestInputStream in;
+
+    private ForceCloseHook(TestInputStream in) {
+      this.in = in;
+    }
+
+    @Override public void abort() {
+      if (in != null) {
+        in.forceClose();
+      }
+    }
+  }
+}

+ 290 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSObjectStorage.java

@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.internal.model.CRC64Checksum;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.ChecksumType;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.fs.tosfs.object.exceptions.NotAppendableException;
+import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
+import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.util.PureJavaCrc32C;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.Checksum;
+
+import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
+import static org.apache.hadoop.fs.tosfs.util.TestUtility.directoryBucketObjectStorage;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+@RunWith(Parameterized.class)
+public class TestTOSObjectStorage {
+
+  private final ObjectStorage tos;
+  private final Checksum checksum;
+  private final ChecksumType type;
+
+  public TestTOSObjectStorage(ObjectStorage tos, Checksum checksum, ChecksumType checksumType) {
+    this.tos = tos;
+    this.checksum = checksum;
+    this.type = checksumType;
+  }
+
+  @Parameterized.Parameters(name = "ObjectStorage = {0}, Checksum = {1}, ChecksumType = {2}")
+  public static Iterable<Object[]> collections() {
+    List<Object[]> values = new ArrayList<>();
+
+    Configuration conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC64ECMA.name());
+    values.add(new Object[] {
+        ObjectStorageFactory.createWithPrefix(String.format("tos-%s/", UUIDUtils.random()),
+            TOS_SCHEME, TestUtility.bucket(), conf), new CRC64Checksum(), ChecksumType.CRC64ECMA });
+
+    conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC32C.name());
+    values.add(new Object[] {
+        ObjectStorageFactory.createWithPrefix(String.format("tos-%s/", UUIDUtils.random()),
+            TOS_SCHEME, TestUtility.bucket(), conf), new PureJavaCrc32C(), ChecksumType.CRC32C });
+
+    conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC64ECMA.name());
+    ObjectStorage directoryObjectStorage = directoryBucketObjectStorage(conf);
+    values.add(
+        new Object[] { directoryObjectStorage, new CRC64Checksum(), ChecksumType.CRC64ECMA });
+
+    conf = new Configuration();
+    conf.set(TosKeys.FS_TOS_CHECKSUM_TYPE, ChecksumType.CRC32C.name());
+    directoryObjectStorage = directoryBucketObjectStorage(conf);
+    values.add(new Object[] { directoryObjectStorage, new PureJavaCrc32C(), ChecksumType.CRC32C });
+
+    return values;
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    checksum.reset();
+
+    CommonUtils.runQuietly(() -> tos.deleteAll(""));
+    for (MultipartUpload upload : tos.listUploads("")) {
+      tos.abortMultipartUpload(upload.key(), upload.uploadId());
+    }
+    tos.close();
+  }
+
+  @Test
+  public void testHeadObj() {
+    String key = "testPutChecksum";
+    byte[] data = TestUtility.rand(1024);
+    checksum.update(data, 0, data.length);
+    assertEquals(checksum.getValue(), parseChecksum(tos.put(key, data)));
+
+    ObjectInfo objInfo = tos.head(key);
+    assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum()));
+  }
+
+  @Test
+  public void testGetFileStatus() {
+    Assume.assumeFalse(tos.bucket().isDirectory());
+
+    Configuration conf = new Configuration(tos.conf());
+    conf.setBoolean(TosKeys.FS_TOS_GET_FILE_STATUS_ENABLED, true);
+    tos.initialize(conf, tos.bucket().name());
+
+    String key = "testFileStatus";
+    byte[] data = TestUtility.rand(256);
+    byte[] checksum = tos.put(key, data);
+
+    ObjectInfo obj1 = tos.objectStatus(key);
+    Assert.assertArrayEquals(checksum, obj1.checksum());
+    Assert.assertEquals(key, obj1.key());
+    Assert.assertEquals(obj1, tos.head(key));
+
+    ObjectInfo obj2 = tos.objectStatus(key + "/");
+    Assert.assertNull(obj2);
+
+    String dirKey = "testDirStatus/";
+    checksum = tos.put(dirKey, new byte[0]);
+
+    ObjectInfo obj3 = tos.objectStatus("testDirStatus");
+    Assert.assertArrayEquals(checksum, obj3.checksum());
+    Assert.assertEquals(dirKey, obj3.key());
+    Assert.assertEquals(obj3, tos.head(dirKey));
+    Assert.assertNull(tos.head("testDirStatus"));
+    ObjectInfo obj4 = tos.objectStatus(dirKey);
+    Assert.assertArrayEquals(checksum, obj4.checksum());
+    Assert.assertEquals(dirKey, obj4.key());
+    Assert.assertEquals(obj4, tos.head(dirKey));
+
+    String prefix = "testPrefix/";
+    tos.put(prefix + "subfile", data);
+    ObjectInfo obj5 = tos.objectStatus(prefix);
+    Assert.assertEquals(prefix, obj5.key());
+    Assert.assertArrayEquals(Constants.MAGIC_CHECKSUM, obj5.checksum());
+    Assert.assertNull(tos.head(prefix));
+    ObjectInfo obj6 = tos.objectStatus("testPrefix");
+    Assert.assertEquals(prefix, obj6.key());
+    Assert.assertArrayEquals(Constants.MAGIC_CHECKSUM, obj6.checksum());
+    Assert.assertNull(tos.head("testPrefix"));
+  }
+
+  @Test
+  public void testObjectStatus() {
+    Assume.assumeFalse(tos.bucket().isDirectory());
+
+    String key = "testObjectStatus";
+    byte[] data = TestUtility.rand(1024);
+    checksum.update(data, 0, data.length);
+    assertEquals(checksum.getValue(), parseChecksum(tos.put(key, data)));
+
+    ObjectInfo objInfo = tos.objectStatus(key);
+    assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum()));
+
+    objInfo = tos.head(key);
+    assertEquals(checksum.getValue(), parseChecksum(objInfo.checksum()));
+
+    String dir = key + "/";
+    tos.put(dir, new byte[0]);
+    objInfo = tos.objectStatus(dir);
+    assertEquals(Constants.MAGIC_CHECKSUM, objInfo.checksum());
+
+    objInfo = tos.head(dir);
+    assertEquals(Constants.MAGIC_CHECKSUM, objInfo.checksum());
+  }
+
+  @Test
+  public void testListObjs() {
+    String key = "testListObjs";
+    byte[] data = TestUtility.rand(1024);
+    checksum.update(data, 0, data.length);
+    for (int i = 0; i < 5; i++) {
+      assertEquals(checksum.getValue(), parseChecksum(tos.put(key, data)));
+    }
+
+    ListObjectsRequest request =
+        ListObjectsRequest.builder().prefix(key).startAfter(null).maxKeys(-1).delimiter("/")
+            .build();
+    Iterator<ListObjectsResponse> iter = tos.list(request).iterator();
+    while (iter.hasNext()) {
+      List<ObjectInfo> objs = iter.next().objects();
+      for (ObjectInfo obj : objs) {
+        assertEquals(checksum.getValue(), parseChecksum(obj.checksum()));
+      }
+    }
+  }
+
+  @Test
+  public void testPutChecksum() {
+    String key = "testPutChecksum";
+    byte[] data = TestUtility.rand(1024);
+    checksum.update(data, 0, data.length);
+
+    byte[] checksumStr = tos.put(key, data);
+
+    assertEquals(checksum.getValue(), parseChecksum(checksumStr));
+  }
+
+  @Test
+  public void testMPUChecksum() {
+    int partNumber = 2;
+    String key = "testMPUChecksum";
+    MultipartUpload mpu = tos.createMultipartUpload(key);
+    byte[] data = TestUtility.rand(mpu.minPartSize() * partNumber);
+    checksum.update(data, 0, data.length);
+
+    List<Part> parts = new ArrayList<>();
+    for (int i = 0; i < partNumber; i++) {
+      final int index = i;
+      Part part = tos.uploadPart(key, mpu.uploadId(), index + 1,
+          () -> new ByteArrayInputStream(data, index * mpu.minPartSize(), mpu.minPartSize()),
+          mpu.minPartSize());
+      parts.add(part);
+    }
+
+    byte[] checksumStr = tos.completeUpload(key, mpu.uploadId(), parts);
+    assertEquals(checksum.getValue(), parseChecksum(checksumStr));
+  }
+
+  @Test
+  public void testAppendable() {
+    Assume.assumeFalse(tos.bucket().isDirectory());
+
+    // Test create object with append then append.
+    byte[] data = TestUtility.rand(256);
+    String prefix = "a/testAppendable/";
+    String key = prefix + "object.txt";
+    tos.append(key, data);
+
+    tos.append(key, new byte[0]);
+
+    // Test create object with put then append.
+    data = TestUtility.rand(256);
+    tos.put(key, data);
+
+    assertThrows("Expect not appendable.", NotAppendableException.class,
+        () -> tos.append(key, new byte[0]));
+
+    tos.delete(key);
+  }
+
+  @Test
+  public void testDirectoryBucketAppendable() {
+    Assume.assumeTrue(tos.bucket().isDirectory());
+
+    byte[] data = TestUtility.rand(256);
+    String prefix = "a/testAppendable/";
+    String key = prefix + "object.txt";
+    tos.put(key, data);
+
+    tos.append(key, new byte[1024]);
+
+    tos.delete(key);
+  }
+
+  private long parseChecksum(byte[] checksum) {
+    switch (type) {
+    case CRC32C:
+    case CRC64ECMA:
+      return Bytes.toLong(checksum);
+    default:
+      throw new IllegalArgumentException(
+          String.format("Checksum type %s is not supported by TOS.", type.name()));
+    }
+  }
+}

+ 201 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java

@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.tos;
+
+import com.volcengine.tos.TOSV2;
+import com.volcengine.tos.TosException;
+import com.volcengine.tos.TosServerException;
+import com.volcengine.tos.comm.HttpStatus;
+import com.volcengine.tos.model.RequestInfo;
+import com.volcengine.tos.model.object.PutObjectOutput;
+import com.volcengine.tos.model.object.UploadPartV2Output;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.InputStreamProvider;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.fs.tosfs.object.tos.auth.SimpleCredentialsProvider;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.HashMap;
+import java.util.Map;
+import javax.net.ssl.SSLException;
+
+import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class TestTOSRetryPolicy {
+
+  private final String retryKey = "retryKey.txt";
+  private TOSV2 tosClient;
+  private DelegationClient client;
+
+  @Before
+  public void setUp() {
+    client = createRetryableDelegationClient();
+    tosClient = mock(TOSV2.class);
+    client.setClient(tosClient);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    tosClient.close();
+    client.close();
+  }
+
+  private DelegationClient createRetryableDelegationClient() {
+    Configuration conf = new Configuration();
+    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://tos-cn-beijing.ivolces.com");
+    conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
+    conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
+    conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, "ACCESS_KEY");
+    conf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY, "SECRET_KEY");
+    return new DelegationClientBuilder().bucket("test").conf(conf).build();
+  }
+
+  @Test
+  public void testShouldThrowExceptionAfterRunOut5RetryTimesIfNoRetryConfigSet()
+      throws IOException {
+    TOS storage =
+        (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration());
+    storage.setClient(client);
+    client.setMaxRetryTimes(5);
+
+    PutObjectOutput response = mock(PutObjectOutput.class);
+    InputStreamProvider streamProvider = mock(InputStreamProvider.class);
+
+    when(tosClient.putObject(any())).thenThrow(
+        new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR),
+        new TosServerException(HttpStatus.TOO_MANY_REQUESTS),
+        new TosException(new SocketException("fake msg")),
+        new TosException(new UnknownHostException("fake msg")),
+        new TosException(new SSLException("fake msg")),
+        new TosException(new InterruptedException("fake msg")),
+        new TosException(new InterruptedException("fake msg"))).thenReturn(response);
+
+    // after run out retry times, should throw exception
+    RuntimeException exception =
+        assertThrows(RuntimeException.class, () -> storage.put(retryKey, streamProvider, 0));
+    assertTrue(exception instanceof TosException);
+    assertTrue(exception.getCause() instanceof SSLException);
+
+    // the newStream method of stream provider should be called 5 times
+    verify(streamProvider, times(5)).newStream();
+
+    storage.close();
+  }
+
+  @Test
+  public void testShouldReturnResultAfterRetry8TimesIfConfigured10TimesRetry()
+      throws IOException {
+    TOS storage =
+        (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration());
+    DelegationClient delegationClient = createRetryableDelegationClient();
+    delegationClient.setClient(tosClient);
+    delegationClient.setMaxRetryTimes(10);
+    storage.setClient(delegationClient);
+
+    UploadPartV2Output response = new UploadPartV2Output().setPartNumber(1).setEtag("etag");
+
+    InputStream in = mock(InputStream.class);
+    InputStreamProvider streamProvider = mock(InputStreamProvider.class);
+    when(streamProvider.newStream()).thenReturn(in);
+
+    when(tosClient.uploadPart(any())).thenThrow(
+        new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR),
+        new TosServerException(HttpStatus.TOO_MANY_REQUESTS),
+        new TosException(new SocketException("fake msg")),
+        new TosException(new UnknownHostException("fake msg")),
+        new TosException(new SSLException("fake msg")),
+        new TosException(new InterruptedException("fake msg")),
+        new TosException(new InterruptedException("fake msg"))).thenReturn(response);
+
+    // after run out retry times, should throw exception
+    Part part = storage.uploadPart(retryKey, "uploadId", 1, streamProvider, 0);
+    assertEquals(1, part.num());
+    assertEquals("etag", part.eTag());
+
+    // the newStream method of stream provider should be called 8 times
+    verify(streamProvider, times(8)).newStream();
+
+    storage.close();
+  }
+
+  @Test
+  public void testShouldReturnResultIfRetry3TimesSucceed() throws IOException {
+    TOS storage =
+        (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration());
+    storage.setClient(client);
+
+    PutObjectOutput response = mock(PutObjectOutput.class);
+    InputStreamProvider streamProvider = mock(InputStreamProvider.class);
+
+    RequestInfo requestInfo = mock(RequestInfo.class);
+    Map<String, String> header = new HashMap<>();
+    when(response.getRequestInfo()).thenReturn(requestInfo);
+    when(requestInfo.getHeader()).thenReturn(header);
+
+    when(tosClient.putObject(any())).thenThrow(
+        new TosServerException(HttpStatus.INTERNAL_SERVER_ERROR),
+        new TosServerException(HttpStatus.TOO_MANY_REQUESTS)).thenReturn(response);
+
+    storage.put(retryKey, streamProvider, 0);
+    // the newStream method of stream provider should be called 3 times
+    verify(streamProvider, times(3)).newStream();
+
+    storage.close();
+  }
+
+  @Test
+  public void testShouldNotRetryIfThrowUnRetryException() throws IOException {
+    TOS storage =
+        (TOS) ObjectStorageFactory.create(TOS_SCHEME, TestUtility.bucket(), new Configuration());
+    storage.setClient(client);
+
+    InputStreamProvider streamProvider = mock(InputStreamProvider.class);
+
+    when(tosClient.putObject(any())).thenThrow(
+        new TosException(new NullPointerException("fake msg.")));
+
+    RuntimeException exception =
+        assertThrows(RuntimeException.class, () -> storage.put(retryKey, streamProvider, 0));
+    assertTrue(exception instanceof TosException);
+    assertTrue(exception.getCause() instanceof NullPointerException);
+
+    // the newStream method of stream provider should be only called once.
+    verify(streamProvider, times(1)).newStream();
+
+    storage.close();
+  }
+}