Browse Source

Integration of TOS: Add ObjectMultiRangeInputStream and ObjectRangeInputStream.

lijinglun 7 months ago
parent
commit
12ddf2b16a

+ 2 - 4
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java

@@ -20,21 +20,19 @@ package org.apache.hadoop.fs.tosfs.conf;
 
 import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
 
-import java.util.List;
-
 public class ConfKeys {
 
   /**
    * Object storage endpoint to connect to, which should include both region and object domain name.
    * e.g. 'fs.tos.endpoint'='tos-cn-beijing.volces.com'.
    */
-  public static final ArgumentKey FS_TOS_ENDPOINT = new ArgumentKey("fs.%s.endpoint");
+  public static final ArgumentKey FS_OBJECT_STORAGE_ENDPOINT = new ArgumentKey("fs.%s.endpoint");
 
   /**
    * The region of the object storage, e.g. fs.tos.region. Parsing template "fs.%s.endpoint" to
    * know the region.
    */
-  public static final ArgumentKey FS_TOS_REGION = new ArgumentKey("fs.%s.region");
+  public static final ArgumentKey FS_OBJECT_STORAGE_REGION = new ArgumentKey("fs.%s.region");
 
   /**
    * The object storage implementation for the defined scheme. For example, we can delegate the

+ 233 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectMultiRangeInputStream.java

@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.FSUtils;
+import org.apache.hadoop.fs.tosfs.util.Range;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class ObjectMultiRangeInputStream extends FSInputStream {
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+  private final ExecutorService threadPool;
+  private final ObjectStorage storage;
+  private final String objectKey;
+  private final long contentLength;
+  private final long rangeSize;
+
+  private volatile ObjectRangeInputStream stream;
+  private volatile long nextPos = 0;
+  private volatile long currPos = 0;
+  // All range streams should have same checksum.
+  private final byte[] checksum;
+
+  public ObjectMultiRangeInputStream(
+      ExecutorService threadPool,
+      ObjectStorage storage,
+      Path path,
+      long contentLength,
+      long rangeSize,
+      byte[] checksum) {
+    this(threadPool, storage, ObjectUtils.pathToKey(path), contentLength, rangeSize, checksum);
+  }
+
+  public ObjectMultiRangeInputStream(
+      ExecutorService threadPool,
+      ObjectStorage storage,
+      String objectKey,
+      long contentLength,
+      long rangeSize,
+      byte[] checksum) {
+    this.threadPool = threadPool;
+    this.storage = storage;
+    this.objectKey = objectKey;
+    this.contentLength = contentLength;
+    this.rangeSize = rangeSize;
+    this.checksum = checksum;
+
+    Preconditions.checkNotNull(checksum, "Checksum should not be null.");
+  }
+
+  @Override
+  public synchronized void seek(long pos) throws IOException {
+    if (pos < 0) {
+      throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
+    }
+
+    if (contentLength <= 0) {
+      return;
+    }
+
+    nextPos = pos;
+  }
+
+  @Override
+  public synchronized long getPos() {
+    return nextPos;
+  }
+
+  @Override
+  public synchronized boolean seekToNewSource(long targetPos) throws IOException {
+    checkNotClosed();
+    return false;
+  }
+
+  @Override
+  public synchronized int read() throws IOException {
+    byte[] buf = new byte[1];
+    int n = read(buf, 0, buf.length);
+    if (n < 0) {
+      return -1;
+    } else {
+      return buf[0] & 0xFF;
+    }
+  }
+
+  @Override
+  public synchronized int read(byte[] buffer, int offset, int length) throws IOException {
+    checkNotClosed();
+    FSUtils.checkReadParameters(buffer, offset, length);
+    if (length == 0) {
+      return 0;
+    }
+
+    int total = 0;
+    while (total < length) {
+      if (contentLength == 0 || nextPos >= contentLength) {
+        return total == 0 ? -1 : total;
+      }
+
+      seekStream();
+      int n = stream.read(buffer, offset, length - total);
+      if (n < 0) {
+        return total == 0 ? -1 : total;
+      }
+
+      total += n;
+      offset += n;
+      currPos += n;
+      nextPos += n;
+    }
+
+    return total;
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    checkNotClosed();
+    // Check the arguments, according to the HDFS contract.
+    if (position < 0) {
+      throw new EOFException("position is negative");
+    }
+    FSUtils.checkReadParameters(buffer, offset, length);
+    if (length == 0) {
+      return 0;
+    }
+
+    if (contentLength == 0 || position >= contentLength) {
+      return -1;
+    }
+
+    long remaining = contentLength - position;
+    int limit = (remaining >= length) ? length : (int) remaining;
+
+    try (InputStream in = storage.get(objectKey, position, limit).verifiedStream(checksum)) {
+      return in.read(buffer, offset, limit);
+    }
+  }
+
+  private void seekStream() throws IOException {
+    if (stream != null && stream.include(nextPos)) {
+      // Seek to a random position which is still located in the current range of stream.
+      if (nextPos != currPos) {
+        stream.seek(nextPos);
+        currPos = nextPos;
+      }
+      return;
+    }
+
+    // Seek to a position which is located in another range of new stream.
+    currPos = nextPos;
+    openStream();
+  }
+
+  private void openStream() throws IOException {
+    closeStream(true);
+
+    long off = (nextPos / rangeSize) * rangeSize;
+    Range range = Range.of(off, Math.min(contentLength - off, rangeSize));
+    if (nextPos < range.end()) {
+      stream = new ObjectRangeInputStream(storage, objectKey, range, checksum);
+      stream.seek(nextPos);
+    }
+  }
+
+  private void closeStream(boolean asyncClose) throws IOException {
+    if (stream != null) {
+      if (asyncClose) {
+        final ObjectRangeInputStream streamToClose = stream;
+        threadPool.submit(() -> CommonUtils.runQuietly(streamToClose::close));
+      } else {
+        stream.close();
+      }
+      stream = null;
+    }
+  }
+
+  private void checkNotClosed() throws IOException {
+    if (closed.get()) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    super.close();
+    if (closed.compareAndSet(false, true)) {
+      closeStream(false);
+    }
+  }
+
+  // for test
+  public long nextExpectPos() {
+    return currPos;
+  }
+
+  @Override
+  public synchronized int available() throws IOException {
+    checkNotClosed();
+    return Ints.saturatedCast(contentLength - nextPos);
+  }
+
+  @VisibleForTesting
+  ObjectRangeInputStream stream() {
+    return stream;
+  }
+}

+ 198 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectRangeInputStream.java

@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.util.FSUtils;
+import org.apache.hadoop.fs.tosfs.util.Range;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.io.ByteStreams;
+import org.apache.hadoop.thirdparty.com.google.common.primitives.Ints;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public class ObjectRangeInputStream extends FSInputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(ObjectRangeInputStream.class);
+  private static final int MAX_SKIP_SIZE = 1024 * 1024;
+
+  private final ObjectStorage storage;
+  private final String objectKey;
+  private final Range range;
+  private final byte[] checksum;
+
+  private InputStream stream;
+  private long nextPos;
+  private long currPos;
+  private boolean closed = false;
+
+  public ObjectRangeInputStream(ObjectStorage storage, Path path, Range range, byte[] checksum) {
+    this(storage, ObjectUtils.pathToKey(path), range, checksum);
+  }
+
+  public ObjectRangeInputStream(
+      ObjectStorage storage, String objectKey, Range range, byte[] checksum) {
+    this.storage = storage;
+    this.objectKey = objectKey;
+    this.range = range;
+    this.checksum = checksum;
+
+    this.stream = null;
+    this.nextPos = range.off();
+    this.currPos = nextPos;
+
+    Preconditions.checkNotNull(checksum, "Checksum should not be null.");
+  }
+
+  @Override
+  public int read() throws IOException {
+    byte[] buf = new byte[1];
+    int n = read(buf, 0, buf.length);
+    if (n < 0) {
+      return -1;
+    } else {
+      return buf[0] & 0xFF;
+    }
+  }
+
+  @Override
+  public int read(byte[] buffer, int offset, int length) throws IOException {
+    checkNotClosed();
+    FSUtils.checkReadParameters(buffer, offset, length);
+
+    if (length == 0) {
+      return 0;
+    }
+
+    if (!range.include(nextPos)) {
+      return -1;
+    }
+
+    seekStream();
+
+    int toRead = Math.min(length, Ints.saturatedCast(range.end() - nextPos));
+    int readLen = stream.read(buffer, offset, toRead);
+    if (readLen > 0) {
+      nextPos += readLen;
+      currPos += readLen;
+    }
+    return readLen;
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    closeStream();
+    closed = true;
+  }
+
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length) throws IOException {
+    checkNotClosed();
+
+    FSUtils.checkReadParameters(buffer, offset, length);
+    if (!range.include(position)) {
+      return -1;
+    }
+
+    int toRead = Math.min(length, Ints.saturatedCast(range.end() - position));
+    if (toRead == 0) {
+      return 0;
+    }
+
+    try (InputStream in = openStream(position, toRead)) {
+      return in.read(buffer, offset, toRead);
+    }
+  }
+
+  @Override
+  public void seek(long pos) throws IOException {
+    checkNotClosed();
+    Preconditions.checkArgument(range.include(pos), "Position %s must be in range %s", pos, range);
+    this.nextPos = pos;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    checkNotClosed();
+    return nextPos;
+  }
+
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    checkNotClosed();
+    return false;
+  }
+
+  private void seekStream() throws IOException {
+    // sequential read
+    if (stream != null && nextPos == currPos) {
+      return;
+    }
+
+    // random read
+    if (stream != null && nextPos > currPos) {
+      long skip = nextPos - currPos;
+      // It is not worth skipping because the skip size is too big, or it can't read any bytes after skip.
+      if (skip < MAX_SKIP_SIZE) {
+        try {
+          ByteStreams.skipFully(stream, skip);
+          currPos = nextPos;
+          return;
+        } catch (IOException ignored) {
+          LOG.warn("Failed to skip {} bytes in stream, will try to reopen the stream", skip);
+        }
+      }
+    }
+
+    currPos = nextPos;
+
+    closeStream();
+    stream = openStream(nextPos, range.end() - nextPos);
+  }
+
+  private InputStream openStream(long offset, long limit) throws IOException {
+    return storage.get(objectKey, offset, limit).verifiedStream(checksum);
+  }
+
+  private void closeStream() throws IOException {
+    if (stream != null) {
+      stream.close();
+    }
+    stream = null;
+  }
+
+  private void checkNotClosed() throws IOException {
+    if (closed) {
+      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+    }
+  }
+
+  public boolean include(long pos) {
+    return range.include(pos);
+  }
+
+  public Range range() {
+    return range;
+  }
+}

+ 2 - 2
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/DelegationClientBuilder.java

@@ -44,8 +44,8 @@ import static org.apache.hadoop.fs.tosfs.object.tos.TOS.TOS_SCHEME;
 public class DelegationClientBuilder {
 
   public static final int DISABLE_TOS_RETRY_VALUE = -1;
-  private static final String TOS_ENDPOINT_KEY = ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME);
-  private static final String TOS_REGION_KEY = ConfKeys.FS_TOS_REGION.key(TOS_SCHEME);
+  private static final String TOS_ENDPOINT_KEY = ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME);
+  private static final String TOS_REGION_KEY = ConfKeys.FS_OBJECT_STORAGE_REGION.key(TOS_SCHEME);
 
   @VisibleForTesting
   static final Map<String, DelegationClient> CACHE = new ConcurrentHashMap<>();

+ 76 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/FSUtils.java

@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import java.io.IOException;
+import java.net.URI;
+
+public class FSUtils {
+  private static final String OVERFLOW_ERROR_HINT = FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+      + ": request length = %s, with offset = %s, buffer capacity = %s";
+
+  private FSUtils() {
+  }
+
+  public static void checkReadParameters(byte[] buffer, int offset, int length) {
+    Preconditions.checkArgument(buffer != null, "Null buffer");
+    Preconditions.checkArgument(offset >= 0 && offset <= buffer.length,
+        "offset: %s is out of range [%s, %s]", offset, 0, buffer.length);
+    Preconditions.checkArgument(length >= 0, "length: %s is negative", length);
+    Preconditions.checkArgument(buffer.length >= offset + length,
+        OVERFLOW_ERROR_HINT, length, offset, (buffer.length - offset));
+  }
+
+  public static URI normalizeURI(URI fsUri, Configuration hadoopConfig) {
+    final String scheme = fsUri.getScheme();
+    final String authority = fsUri.getAuthority();
+
+    if (scheme == null && authority == null) {
+      fsUri = FileSystem.getDefaultUri(hadoopConfig);
+    } else if (scheme != null && authority == null) {
+      URI defaultUri = FileSystem.getDefaultUri(hadoopConfig);
+      if (scheme.equals(defaultUri.getScheme()) && defaultUri.getAuthority() != null) {
+        fsUri = defaultUri;
+      }
+    }
+    return fsUri;
+  }
+
+  public static String scheme(Configuration conf, URI uri) {
+    if (uri.getScheme() == null || uri.getScheme().isEmpty()) {
+      return FileSystem.getDefaultUri(conf).getScheme();
+    } else {
+      return uri.getScheme();
+    }
+  }
+
+  /**
+   * The difference with {@link FileSystem#get(URI, Configuration)} is this approach won't cache the filesystem.
+   */
+  public static FileSystem createFileSystem(URI uri, Configuration conf) throws IOException {
+    Class<? extends FileSystem> clazz = FileSystem.getFileSystemClass(uri.getScheme(), conf);
+    return ReflectionUtils.newInstance(clazz, conf);
+  }
+}

+ 85 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java

@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class ObjectStorageTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(ObjectStorageTestBase.class);
+  protected Configuration conf;
+  protected Configuration protonConf;
+  protected Path testDir;
+  protected FileSystem fs;
+  protected String scheme;
+  protected ObjectStorage storage;
+
+  @Rule
+  public TemporaryFolder tempDir = new TemporaryFolder();
+
+  @Before
+  public void setUp() throws IOException {
+    LOG.info("The test temporary folder is {}", tempDir.getRoot().getAbsolutePath());
+
+    String tempDirPath = tempDir.getRoot().getAbsolutePath();
+    conf = new Configuration();
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("filestore"), tempDirPath);
+    conf.set("fs.filestore.impl", LocalFileSystem.class.getName());
+    protonConf = new Configuration(conf);
+    // Set the environment variable for ObjectTestUtils#assertObject
+    TestUtility.setSystemEnv(FileStore.ENV_FILE_STORAGE_ROOT, tempDirPath);
+
+    testDir = new Path("filestore://" + FileStore.DEFAULT_BUCKET + "/", UUIDUtils.random());
+    fs = testDir.getFileSystem(conf);
+    scheme = testDir.toUri().getScheme();
+    storage = ObjectStorageFactory.create(scheme, testDir.toUri().getAuthority(), protonConf);
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (storage != null) {
+      // List all keys with test dir prefix and delete them.
+      String prefix = ObjectUtils.pathToKey(testDir);
+      CommonUtils.runQuietly(() -> storage.deleteAll(prefix));
+      // List all multipart uploads and abort them.
+      CommonUtils.runQuietly(() -> {
+        for (MultipartUpload upload : storage.listUploads(prefix)) {
+          LOG.info("Abort the multipart upload {}", upload);
+          storage.abortMultipartUpload(upload.key(), upload.uploadId());
+        }
+      });
+
+      storage.close();
+    }
+  }
+}

+ 447 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectMultiRangeInputStream.java

@@ -0,0 +1,447 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.common.Bytes;
+import org.apache.hadoop.fs.tosfs.common.ThreadPools;
+import org.apache.hadoop.fs.tosfs.object.exceptions.ChecksumMismatchException;
+import org.apache.hadoop.fs.tosfs.util.Range;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class TestObjectMultiRangeInputStream extends ObjectStorageTestBase {
+  private static ExecutorService threadPool;
+
+  @BeforeClass
+  public static void beforeClass() {
+    threadPool = ThreadPools.newWorkerPool("TestObjectInputStream-pool");
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (!threadPool.isShutdown()) {
+      threadPool.shutdown();
+    }
+  }
+
+  @Test
+  public void testSequentialAndRandomRead() throws IOException {
+    Path outPath = new Path(testDir, "testSequentialAndRandomRead.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(5 << 20);
+    storage.put(key, rawData);
+
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage,
+        ObjectUtils.pathToKey(outPath), rawData.length, Long.MAX_VALUE, content.checksum())) {
+      // sequential read
+      assertEquals(0, in.getPos());
+      assertEquals(0, in.nextExpectPos());
+
+      byte[] b = new byte[1024];
+      int readCnt = in.read(b);
+      assertEquals(readCnt, b.length);
+      assertArrayEquals(Arrays.copyOfRange(rawData, 0, 1024), b);
+      assertEquals(1024, in.getPos());
+      assertEquals(1024, in.nextExpectPos());
+
+      readCnt = in.read(b);
+      assertEquals(readCnt, b.length);
+      assertArrayEquals(Arrays.copyOfRange(rawData, 1024, 2048), b);
+      assertEquals(2048, in.getPos());
+      assertEquals(2048, in.nextExpectPos());
+
+      // random read forward
+      in.seek(4 << 20);
+      assertEquals(4 << 20, in.getPos());
+      assertEquals(2048, in.nextExpectPos());
+
+      readCnt = in.read(b);
+      assertEquals(readCnt, b.length);
+      assertArrayEquals(Arrays.copyOfRange(rawData, 4 << 20, 1024 + (4 << 20)), b);
+      assertEquals((4 << 20) + 1024, in.getPos());
+      assertEquals((4 << 20) + 1024, in.nextExpectPos());
+
+      // random read back
+      in.seek(2 << 20);
+      assertEquals(2 << 20, in.getPos());
+      assertEquals((4 << 20) + 1024, in.nextExpectPos());
+
+      readCnt = in.read(b);
+      assertEquals(readCnt, b.length);
+      assertArrayEquals(Arrays.copyOfRange(rawData, 2 << 20, 1024 + (2 << 20)), b);
+      assertEquals((2 << 20) + 1024, in.getPos());
+      assertEquals((2 << 20) + 1024, in.nextExpectPos());
+    }
+  }
+
+  private InputStream getStream(String key) {
+    return storage.get(key).stream();
+  }
+
+  @Test
+  public void testReadSingleByte() throws IOException {
+    int len = 10;
+    Path outPath = new Path(testDir, "testReadSingleByte.txt");
+    byte[] data = TestUtility.rand(len);
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] checksum = storage.put(key, data);
+
+    try (InputStream in = new ObjectMultiRangeInputStream(threadPool, storage, key,
+        data.length, Long.MAX_VALUE, checksum)) {
+      for (int i = 0; i < data.length; i++) {
+        assertTrue(in.read() >= 0);
+      }
+      assertEquals(-1, in.read());
+    }
+  }
+
+  @Test
+  public void testReadStreamButTheFileChangedDuringReading() throws IOException {
+    int len = 2048;
+    Path outPath = new Path(testDir, "testReadStreamButTheFileChangedDuringReading.txt");
+    byte[] data = TestUtility.rand(len);
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] checksum = storage.put(key, data);
+
+    try (InputStream in = new ObjectMultiRangeInputStream(threadPool, storage, key, data.length, 1024, checksum)) {
+      byte[] read = new byte[1024];
+      int n = in.read(read);
+      Assert.assertEquals(1024, n);
+
+      storage.put(key, TestUtility.rand(1024));
+      assertThrows("The file is staled", ChecksumMismatchException.class, () -> in.read(read));
+    }
+  }
+
+  @Test
+  public void testRead100M() throws IOException {
+    testSequentialReadData(100 << 20, 6 << 20);
+    testSequentialReadData(100 << 20, 5 << 20);
+  }
+
+  @Test
+  public void testRead10M() throws IOException {
+    testSequentialReadData(10 << 20, 4 << 20);
+    testSequentialReadData(10 << 20, 5 << 20);
+  }
+
+  @Test
+  public void testParallelRead10M() throws IOException, ExecutionException, InterruptedException {
+    testParallelRandomRead(10 << 20, 4 << 20);
+    testParallelRandomRead(10 << 20, 5 << 20);
+  }
+
+  @Test
+  public void testRead100b() throws IOException {
+    testSequentialReadData(100, 40);
+    testSequentialReadData(100, 50);
+    testSequentialReadData(100, 100);
+    testSequentialReadData(100, 101);
+  }
+
+  private void testSequentialReadData(int dataSize, int partSize) throws IOException {
+    Path outPath = new Path(testDir, String.format("%d-%d.txt", dataSize, partSize));
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(dataSize);
+    storage.put(key, rawData);
+
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    int batchSize = (dataSize - 1) / partSize + 1;
+    try (InputStream in = new ObjectMultiRangeInputStream(threadPool, storage, ObjectUtils.pathToKey(outPath),
+        rawData.length, Long.MAX_VALUE, content.checksum())) {
+      for (int i = 0; i < batchSize; i++) {
+        int start = i * partSize;
+        int end = Math.min(dataSize, start + partSize);
+        byte[] expectArr = Arrays.copyOfRange(rawData, start, end);
+
+        byte[] b = new byte[end - start];
+        int ret = in.read(b, 0, b.length);
+
+        assertEquals(b.length, ret);
+        assertArrayEquals(String.format("the read bytes mismatched at batch: %d", i), expectArr, b);
+      }
+      assertEquals(-1, in.read());
+    }
+  }
+
+  private void testParallelRandomRead(int dataSize, int partSize)
+      throws IOException, ExecutionException, InterruptedException {
+
+    Path outPath = new Path(testDir, String.format("%d-%d.txt", dataSize, partSize));
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(dataSize);
+    storage.put(key, rawData);
+
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    Random random = new Random();
+    List<Future<Boolean>> tasks = new ArrayList<>();
+    for (int i = 0; i < 10; i++) {
+      int position = random.nextInt(rawData.length);
+      tasks.add(threadPool.submit(() ->
+          testReadDataFromSpecificPosition(rawData, outPath, position, partSize, content.checksum())));
+    }
+
+    for (Future<Boolean> task : tasks) {
+      assertTrue(task.get());
+    }
+  }
+
+  private boolean testReadDataFromSpecificPosition(
+      final byte[] rawData,
+      final Path objPath,
+      final int startPosition,
+      final int partSize,
+      byte[] checksum) {
+    int rawDataSize = rawData.length;
+    int batchSize = (rawDataSize - startPosition - 1) / partSize + 1;
+    try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage,
+        ObjectUtils.pathToKey(objPath), rawDataSize, Long.MAX_VALUE, checksum)) {
+      in.seek(startPosition);
+
+      for (int i = 0; i < batchSize; i++) {
+        int start = startPosition + i * partSize;
+        int end = Math.min(rawDataSize, start + partSize);
+        byte[] expectArr = Arrays.copyOfRange(rawData, start, end);
+
+        byte[] b = new byte[end - start];
+        int ret = in.read(b, 0, b.length);
+
+        assertEquals(b.length, ret);
+        assertArrayEquals(String.format("the read bytes mismatched at batch: %d", i), expectArr, b);
+      }
+      assertEquals(-1, in.read());
+      return true;
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  @Test
+  public void testParallelReadFromOneInputStream() throws IOException, ExecutionException, InterruptedException {
+    testParallelReadFromOneInputStreamImpl(10 << 20, 512, 10);
+    testParallelReadFromOneInputStreamImpl(10 << 20, 64, 100);
+    testParallelReadFromOneInputStreamImpl(1 << 20, 2 << 20, 5);
+  }
+
+  public void testParallelReadFromOneInputStreamImpl(int dataSize, int batchSize, int parallel)
+      throws IOException, ExecutionException, InterruptedException {
+
+    Path outPath = new Path(testDir,
+        String.format("%d-%d-testParallelReadFromOneInputStreamImpl.txt", dataSize, batchSize));
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(dataSize);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    AtomicInteger sum = new AtomicInteger(0);
+    CopyOnWriteArrayList<byte[]> readBytes = new CopyOnWriteArrayList();
+    List<Future<?>> futures = new ArrayList<>();
+    try (ObjectMultiRangeInputStream inputStream = new ObjectMultiRangeInputStream(threadPool,
+        storage, ObjectUtils.pathToKey(outPath), rawData.length, Long.MAX_VALUE, content.checksum())) {
+      for (int i = 0; i < parallel; i++) {
+        futures.add(threadPool.submit(() -> {
+          byte[] data = new byte[batchSize];
+          try {
+            int count;
+            while ((count = inputStream.read(data)) != -1) {
+              sum.getAndAdd(count);
+              readBytes.add(Arrays.copyOfRange(data, 0, count));
+              data = new byte[batchSize];
+            }
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }));
+      }
+
+      for (Future<?> future : futures) {
+        future.get();
+
+      }
+      assertEquals(rawData.length, sum.get());
+    }
+
+    byte[] actualBytes = new byte[rawData.length];
+    int offset = 0;
+    for (byte[] bytes : readBytes) {
+      System.arraycopy(bytes, 0, actualBytes, offset, bytes.length);
+      offset += bytes.length;
+    }
+
+    Arrays.sort(actualBytes);
+    Arrays.sort(rawData);
+    assertArrayEquals(rawData, actualBytes);
+  }
+
+  @Test
+  public void testPositionalRead() throws IOException {
+    Path outPath = new Path(testDir, "testPositionalRead.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    int fileSize = 5 << 20;
+    byte[] rawData = TestUtility.rand(fileSize);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    Random rand = ThreadLocalRandom.current();
+
+    try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage,
+        ObjectUtils.pathToKey(outPath), fileSize, Long.MAX_VALUE, content.checksum())) {
+      for (int i = 0; i < 100; i++) {
+        int pos = rand.nextInt(fileSize);
+        int len = rand.nextInt(fileSize);
+
+        int expectSize = Math.min(fileSize - pos, len);
+        byte[] actual = new byte[expectSize];
+        int actualLen = in.read(pos, actual, 0, expectSize);
+
+        assertEquals(expectSize, actualLen);
+        assertArrayEquals(Bytes.toBytes(rawData, pos, expectSize), actual);
+      }
+    }
+  }
+
+  @Test
+  public void testReadAcrossRange() throws IOException {
+    Path outPath = new Path(testDir, "testReadAcrossRange.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    int fileSize = 1 << 10;
+    byte[] rawData = TestUtility.rand(fileSize);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(ThreadPools.defaultWorkerPool(),
+        storage, key, fileSize, 10, content.checksum())) {
+      byte[] data = new byte[fileSize / 2];
+      for (int i = 0; i < 2; i++) {
+        assertEquals(data.length, in.read(data));
+        assertEquals((i + 1) * data.length, in.getPos());
+        assertArrayEquals(Bytes.toBytes(rawData, i * data.length, data.length), data);
+      }
+    }
+  }
+
+  @Test
+  public void testStorageRange() throws IOException {
+    Path outPath = new Path(testDir, "testStorageRange.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    int fileSize = 5 << 20;
+    byte[] rawData = TestUtility.rand(fileSize);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    int oneMB = 1 << 20;
+    long rangeOpenLen = oneMB;
+    try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(ThreadPools.defaultWorkerPool(),
+        storage, key, fileSize, rangeOpenLen, content.checksum())) {
+      assertNull(in.stream());
+
+      // Init range.
+      in.read();
+      assertEquals(Range.of(0, rangeOpenLen), in.stream().range());
+      // Range doesn't change.
+      in.read(new byte[(int) (rangeOpenLen - 1)], 0, (int) (rangeOpenLen - 1));
+      assertEquals(Range.of(0, rangeOpenLen), in.stream().range());
+
+      // Move to next range.
+      in.read();
+      assertEquals(Range.of(rangeOpenLen, rangeOpenLen), in.stream().range());
+
+      // Seek and move.
+      in.seek(rangeOpenLen * 3 + 10);
+      in.read();
+      assertEquals(Range.of(rangeOpenLen * 3, rangeOpenLen), in.stream().range());
+
+      // Seek small and range doesn't change.
+      in.seek(in.getPos() + 1);
+      in.read();
+      assertEquals(Range.of(rangeOpenLen * 3, rangeOpenLen), in.stream().range());
+
+      // Seek big and range changes.
+      in.seek(rangeOpenLen * 2);
+      in.read(new byte[(int) (rangeOpenLen - 10)], 0, (int) (rangeOpenLen - 10));
+      assertEquals(Range.of(rangeOpenLen * 2, rangeOpenLen), in.stream().range());
+      // Old range has 10 bytes left. Seek 10 bytes then read 10 bytes. Old range can't read any bytes, so
+      // range changes.
+      assertEquals(rangeOpenLen * 3 - 10, in.getPos());
+      in.seek(rangeOpenLen * 3);
+      in.read(new byte[10], 0, 10);
+      assertEquals(Range.of(rangeOpenLen * 3, rangeOpenLen), in.stream().range());
+
+      // Read big buffer.
+      in.seek(10);
+      in.read(new byte[oneMB * 3], 0, oneMB * 3);
+      assertEquals(oneMB * 3 + 10, in.getPos());
+      assertEquals(Range.of(3 * rangeOpenLen, rangeOpenLen), in.stream().range());
+    }
+
+    try (ObjectMultiRangeInputStream in = new ObjectMultiRangeInputStream(threadPool, storage,
+        ObjectUtils.pathToKey(outPath), fileSize, Long.MAX_VALUE, content.checksum())) {
+      assertNull(in.stream());
+
+      // Init range.
+      in.read();
+      assertEquals(Range.of(0, fileSize), in.stream().range());
+
+      // Range doesn't change.
+      in.read(new byte[oneMB], 0, oneMB);
+      assertEquals(Range.of(0, fileSize), in.stream().range());
+
+      // Seek and move.
+      long pos = oneMB * 3 + 10;
+      in.seek(pos);
+      in.read();
+      assertEquals(Range.of(0, fileSize), in.stream().range());
+    }
+  }
+}

+ 142 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectRangeInputStream.java

@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.util.Range;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+public class TestObjectRangeInputStream extends ObjectStorageTestBase {
+
+  @Test
+  public void testRead() throws IOException {
+    Path outPath = new Path(testDir, "testRead.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(1 << 10);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    int position = 100;
+    int len = 200;
+    try (ObjectRangeInputStream ri =
+             new ObjectRangeInputStream(storage, key, Range.of(position, len), content.checksum())) {
+      // Test read byte.
+      assertEquals(rawData[position] & 0xff, ri.read());
+
+      // Test read buffer.
+      byte[] buffer = new byte[len];
+      assertEquals(buffer.length - 1, ri.read(buffer, 0, buffer.length));
+      assertArrayEquals(
+          Arrays.copyOfRange(rawData, position + 1, position + len),
+          Arrays.copyOfRange(buffer, 0, buffer.length - 1));
+      assertEquals(0, ri.available());
+
+      assertEquals(-1, ri.read());
+      assertEquals(-1, ri.read(buffer, 0, buffer.length));
+    }
+  }
+
+  @Test
+  public void testRangeExceedInnerStream() throws IOException {
+    Path outPath = new Path(testDir, "testRangeExceedInnerStream.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(10);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    int position = 10;
+    int badLen = 10;
+    try (ObjectRangeInputStream ri =
+             new ObjectRangeInputStream(storage, key, Range.of(position, badLen), content.checksum())) {
+      byte[] buffer = new byte[1];
+      assertEquals(-1, ri.read());
+      assertEquals(-1, ri.read(buffer, 0, buffer.length));
+    }
+  }
+
+  @Test
+  public void testRangeInclude() throws IOException {
+    Path outPath = new Path(testDir, "testRangeInclude.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(10);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    long pos = 100;
+    long len = 300;
+
+    try (ObjectRangeInputStream in = new ObjectRangeInputStream(storage, key, Range.of(pos, len), content.checksum())) {
+      assertEquals(Range.of(pos, len), in.range());
+
+      assertTrue(in.include(pos));
+      assertTrue(in.include((pos + len) / 2));
+      assertTrue(in.include(pos + len - 1));
+
+      assertFalse(in.include(pos - 1));
+      assertFalse(in.include(pos + len));
+    }
+  }
+
+  @Test
+  public void testSeek() throws IOException {
+    Path outPath = new Path(testDir, "testSeek.txt");
+    String key = ObjectUtils.pathToKey(outPath);
+    byte[] rawData = TestUtility.rand(1 << 10);
+    storage.put(key, rawData);
+    ObjectContent content = storage.get(key);
+    assertArrayEquals(rawData, IOUtils.toByteArray(content.stream()));
+
+    long pos = 100;
+    long len = 300;
+
+    try (ObjectRangeInputStream in = new ObjectRangeInputStream(storage, key, Range.of(pos, len), content.checksum())) {
+      assertEquals(pos, in.getPos());
+
+      Exception error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(-1));
+      assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}"));
+      error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(99));
+      assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}"));
+      error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(401));
+      assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}"));
+      error = assertThrows("Overflow", IllegalArgumentException.class, () -> in.seek(1 << 20));
+      assertTrue(error.getMessage().contains("must be in range Range{offset=100, length=300}"));
+
+      in.seek(399);
+      assertTrue(0 <= in.read());
+      assertEquals(-1, in.read());
+
+      in.seek(100);
+      assertTrue(in.read() >= 0);
+    }
+  }
+}

+ 8 - 8
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestDelegationClientBuilder.java

@@ -106,7 +106,7 @@ public class TestDelegationClientBuilder {
   @Test
   public void testHeadApiRetry() throws IOException {
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
     conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
     conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
@@ -163,7 +163,7 @@ public class TestDelegationClientBuilder {
   public void testEnableCrcCheck() throws IOException {
     String bucket = name.getMethodName();
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
     conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
     conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
@@ -183,7 +183,7 @@ public class TestDelegationClientBuilder {
   public void testClientCache() throws IOException {
     String bucket = name.getMethodName();
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
     conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, false);
     conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), "ACCESS_KEY");
@@ -217,7 +217,7 @@ public class TestDelegationClientBuilder {
   @Test
   public void testOverwriteHttpConfig() throws IOException {
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
     conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key("test"), "ACCESS_KEY");
     conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key("test"), "SECRET_KEY");
@@ -243,7 +243,7 @@ public class TestDelegationClientBuilder {
   @Test
   public void testDynamicRefreshAkSk() throws IOException {
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
     conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY);
     conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY);
@@ -281,7 +281,7 @@ public class TestDelegationClientBuilder {
   @Test
   public void testCreateClientWithEnvironmentCredentials() throws IOException {
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://test.tos-cn-beijing.ivolces.com");
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, EnvironmentCredentialsProvider.NAME);
 
     DelegationClient tosV2 =
@@ -300,7 +300,7 @@ public class TestDelegationClientBuilder {
   @Test
   public void testCreateClientWithSimpleCredentials() throws IOException {
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT);
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT);
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
     conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(TestUtility.bucket()), ENV_ACCESS_KEY);
     conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(TestUtility.bucket()), ENV_SECRET_KEY);
@@ -331,7 +331,7 @@ public class TestDelegationClientBuilder {
 
     Function<String, Configuration> commonConf = bucket -> {
       Configuration conf = new Configuration();
-      conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT);
+      conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), ENV_ENDPOINT);
       conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
       conf.set(TosKeys.FS_TOS_BUCKET_ACCESS_KEY_ID.key(bucket), ENV_ACCESS_KEY);
       conf.set(TosKeys.FS_TOS_BUCKET_SECRET_ACCESS_KEY.key(bucket), ENV_SECRET_KEY);

+ 1 - 1
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/tos/TestTOSRetryPolicy.java

@@ -76,7 +76,7 @@ public class TestTOSRetryPolicy {
 
   private DelegationClient createRetryableDelegationClient() {
     Configuration conf = new Configuration();
-    conf.set(ConfKeys.FS_TOS_ENDPOINT.key(TOS_SCHEME), "https://tos-cn-beijing.ivolces.com");
+    conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(TOS_SCHEME), "https://tos-cn-beijing.ivolces.com");
     conf.set(TosKeys.FS_TOS_CREDENTIALS_PROVIDER, SimpleCredentialsProvider.NAME);
     conf.setBoolean(TosKeys.FS_TOS_DISABLE_CLIENT_CACHE, true);
     conf.set(TosKeys.FS_TOS_ACCESS_KEY_ID, "ACCESS_KEY");

+ 3 - 3
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestUtility.java

@@ -145,7 +145,7 @@ public class TestUtility {
     Configuration conf = new Configuration();
     String endpoint = ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, "");
     if (!StringUtils.isEmpty(endpoint)) {
-      conf.set(ConfKeys.FS_TOS_ENDPOINT.key(scheme()), endpoint);
+      conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(scheme()), endpoint);
     }
 
     return ObjectStorageFactory.createWithPrefix(
@@ -159,7 +159,7 @@ public class TestUtility {
     } else {
       String endpoint = ParseUtils.envAsString(ENV_DIRECTORY_BUCKET_ENDPOINT, "");
       if (!StringUtils.isEmpty(endpoint)) {
-        conf.set(ConfKeys.FS_TOS_ENDPOINT.key(scheme()), endpoint);
+        conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key(scheme()), endpoint);
       }
       return ObjectStorageFactory.createWithPrefix(
           String.format("%s-%s/", scheme(), UUIDUtils.random()), scheme(), bucket, conf);
@@ -171,7 +171,7 @@ public class TestUtility {
 
     // 1. FileStore
     Configuration fileStoreConf = new Configuration();
-    fileStoreConf.set(ConfKeys.FS_TOS_ENDPOINT.key("filestore"), fileStoreRoot);
+    fileStoreConf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("filestore"), fileStoreRoot);
     storages.add(ObjectStorageFactory.create("filestore", TestUtility.bucket(), fileStoreConf));
 
     // 2. General Bucket