Browse Source

Integration of TOS: Add ObjectStorage interface.

lijinglun 9 months ago
parent
commit
ad155eecb4

+ 91 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/BucketInfo.java

@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+/**
+ * There are two kinds of bucket types: general purpose bucket(common bucket) and directory bucket.
+ * Directory bucket organize data hierarchically into directories as opposed to the flat storage
+ * structure of general purpose buckets.
+ * <p>
+ * Only a few object storages support directory bucket, e.g. S3, OBS, TOS. Generally, directory
+ * bucket supports rename or delete dir with constant time complexity O(1), but these object
+ * storages have slight differences on these APIs. E.g. S3 doesn't provide rename API. S3 will
+ * recursively delete any empty parent directories in the object path during delete an object in a
+ * directory bucket, but TOS won't delete any empty parent directories.
+ * <p>
+ * And also there are some general difference between general purpose bucket and directory bucket.
+ * <ul>
+ *   <li>Directory bucket treats the object end with '/' as the directory during creating object.
+ *   </li>
+ *   <li>TOS directory bucket will create missed parent dir during create object automatically,
+ *   but general purpose bucket only create one object.</li>
+ *   <li>Directory bucket doesn't allow create any object under a file, but general purpose bucket
+ *   haven't this constraint.</li>
+ *   <li>If a object 'a/b/' exists in directory bucket, both head('a/b') and head('a/b/') will get
+ *   the object meta, but only head('a/b/') can get the object meta from general purpose bucket</li>
+ *   <li>TOS directory bucket provides atomic rename/delete dir abilities</li>
+ * </ul>
+ */
+public class BucketInfo {
+  private final String name;
+  private final boolean directory;
+
+  public BucketInfo(String name, boolean directory) {
+    this.name = name;
+    this.directory = directory;
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public boolean isDirectory() {
+    return directory;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof BucketInfo)) {
+      return false;
+    }
+
+    BucketInfo that = (BucketInfo) o;
+    return Objects.equals(name, that.name)
+        && Objects.equals(directory, that.directory);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, directory);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("name", name)
+        .add("directory", directory)
+        .toString();
+  }
+}

+ 38 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumInfo.java

@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+public class ChecksumInfo {
+  private final String algorithm;
+  private final ChecksumType checksumType;
+
+  public ChecksumInfo(String algorithm, ChecksumType checksumType) {
+    this.algorithm = algorithm;
+    this.checksumType = checksumType;
+  }
+
+  public String algorithm() {
+    return algorithm;
+  }
+
+  public ChecksumType checksumType() {
+    return checksumType;
+  }
+}
+

+ 61 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ChecksumType.java

@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+public enum ChecksumType {
+  NULL((byte) 0, 0),
+  CRC32((byte) 1, 4),
+
+  CRC32C((byte) 2, 4),
+  CRC64ECMA((byte) 3, 8),
+  MD5((byte) 4, 128);
+
+  private final byte value;
+  private final int bytes;
+
+  ChecksumType(byte value, int bytes) {
+    this.value = value;
+    this.bytes = bytes;
+  }
+
+  public byte value() {
+    return value;
+  }
+
+  public int bytes() {
+    return bytes;
+  }
+
+  public static ChecksumType valueOf(byte value) {
+    for (ChecksumType t : values()) {
+      if (t.value == value) {
+        return t;
+      }
+    }
+    throw new IllegalStateException("Unknown checksum type with value: " + value);
+  }
+
+  public static int maxBytes() {
+    int maxBytes = 0;
+    for (ChecksumType t : values()) {
+      maxBytes = Math.max(maxBytes, t.bytes());
+    }
+    return maxBytes;
+  }
+}

+ 30 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Constants.java

@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+public class Constants {
+  private Constants() {
+  }
+
+  // Magic checksum means doesn't support checksum, if the file type is dir or the filesystem/object
+  // storage doesn't implement checksum algorithm will use magic checksum as the file checksum.
+  public static final byte[] MAGIC_CHECKSUM = new byte[] { 'M' };
+
+  public static final String SLASH = "/";
+}

+ 35 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/InputStreamProvider.java

@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+import java.io.InputStream;
+
+/**
+ * Provides the content stream of a request.
+ * <p>
+ * Each call to the {@link #newStream()} method must result in a stream
+ * whose position is at the beginning of the content.
+ * Implementations may return a new stream or the same stream for each call.
+ * If returning a new stream, the implementation must ensure to {@code close()}
+ * and free any resources acquired by the previous stream.
+ */
+public interface InputStreamProvider {
+  InputStream newStream();
+}
+

+ 103 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/MultipartUpload.java

@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+public class MultipartUpload implements Comparable<MultipartUpload> {
+  private final String key;
+  private final String uploadId;
+  private final int minPartSize;
+  private final int maxPartCount;
+
+  public MultipartUpload(String key, String uploadId, int minPartSize, int maxPartCount) {
+    this.key = key;
+    this.uploadId = uploadId;
+    this.minPartSize = minPartSize;
+    this.maxPartCount = maxPartCount;
+  }
+
+  public String key() {
+    return key;
+  }
+
+  public String uploadId() {
+    return uploadId;
+  }
+
+  public int minPartSize() {
+    return minPartSize;
+  }
+
+  public int maxPartCount() {
+    return maxPartCount;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof MultipartUpload)) {
+      return false;
+    }
+
+    MultipartUpload that = (MultipartUpload) o;
+    if (!Objects.equals(key, that.key)) {
+      return false;
+    }
+    if (!Objects.equals(uploadId, that.uploadId)) {
+      return false;
+    }
+    if (!Objects.equals(minPartSize, that.minPartSize)) {
+      return false;
+    }
+    return Objects.equals(maxPartCount, that.maxPartCount);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, uploadId, minPartSize, maxPartCount);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("key", key)
+        .add("uploadId", uploadId)
+        .add("minPartSize", minPartSize)
+        .add("maxPartCount", maxPartCount)
+        .toString();
+  }
+
+  @Override
+  public int compareTo(MultipartUpload o) {
+    if (this == o) {
+      return 0;
+    } else if (o == null) {
+      return 1;
+    } else if (this.key.compareTo(o.key) == 0) {
+      return this.uploadId.compareTo(o.uploadId);
+    } else {
+      return this.key.compareTo(o.key);
+    }
+  }
+}
+

+ 53 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectContent.java

@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+import org.apache.hadoop.fs.tosfs.common.ChecksumMismatchException;
+import org.apache.hadoop.fs.tosfs.common.CommonUtils;
+
+import java.io.InputStream;
+import java.util.Arrays;
+
+public class ObjectContent {
+  private final byte[] checksum;
+  private final InputStream stream;
+
+  public ObjectContent(byte[] checksum, InputStream stream) {
+    this.checksum = checksum;
+    this.stream = stream;
+  }
+
+  public InputStream stream() {
+    return stream;
+  }
+
+  public InputStream verifiedStream(byte[] expectedChecksum) throws ChecksumMismatchException {
+    if (!Arrays.equals(expectedChecksum, checksum)) {
+      CommonUtils.runQuietly(stream::close);
+      throw new ChecksumMismatchException(expectedChecksum, checksum);
+    }
+
+    return stream;
+  }
+
+  public byte[] checksum() {
+    return checksum;
+  }
+}
+

+ 118 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectInfo.java

@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hadoop.util.StringUtils;
+
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Objects;
+
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+public class ObjectInfo {
+  private final String key;
+  private final long size;
+  private final Date mtime;
+  private final boolean isDir;
+  private final byte[] checksum;
+
+  public ObjectInfo(String key, long size, Date mtime, byte[] checksum) {
+    this(key, size, mtime, checksum, ObjectInfo.isDir(key));
+  }
+
+  public ObjectInfo(String key, long size, Date mtime, byte[] checksum, boolean isDir) {
+    checkArgument(key != null, "Key is null");
+    checkArgument(size >= 0, "The size of key(%s) is negative", key);
+    checkArgument(mtime != null, "The modified time of key(%s) null.", key);
+    this.key = key;
+    this.size = size;
+    this.mtime = mtime;
+    this.isDir = isDir;
+    // checksum can be null since some object storage might not support checksum.
+    this.checksum = checksum == null || isDir ? Constants.MAGIC_CHECKSUM : checksum;
+  }
+
+  public String key() {
+    return key;
+  }
+
+  /**
+   * The size of directory object is 0;
+   *
+   * @return the size of object.
+   */
+  public long size() {
+    return isDir ? 0 : size;
+  }
+
+  public Date mtime() {
+    return mtime;
+  }
+
+  /**
+   * @return {@link Constants#MAGIC_CHECKSUM} if the object is a dir or the object storage
+   * doesn't support the given checksum type.
+   */
+  public byte[] checksum() {
+    return checksum;
+  }
+
+  public boolean isDir() {
+    return isDir;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof ObjectInfo)) {
+      return false;
+    }
+
+    ObjectInfo that = (ObjectInfo) o;
+    return Objects.equals(key, that.key)
+        && Objects.equals(size, that.size)
+        && Objects.equals(mtime, that.mtime)
+        && Arrays.equals(checksum, that.checksum)
+        && Objects.equals(isDir, that.isDir);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(key, size, mtime, Arrays.hashCode(checksum), isDir);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("key", key)
+        .add("size", size)
+        .add("mtime", mtime)
+        .add("checksum", StringUtils.byteToHexString(checksum))
+        .add("isDir", isDir)
+        .toString();
+  }
+
+  public static boolean isDir(String key) {
+    return key.endsWith(Constants.SLASH);
+  }
+}
+

+ 368 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/ObjectStorage.java

@@ -0,0 +1,368 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.common.InvalidObjectKeyException;
+import org.apache.hadoop.fs.tosfs.common.LazyReload;
+import org.apache.hadoop.fs.tosfs.common.NotAppendableException;
+import org.apache.hadoop.fs.tosfs.oss.request.ListObjectsRequest;
+import org.apache.hadoop.fs.tosfs.oss.response.ListObjectsResponse;
+
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public interface ObjectStorage extends Closeable {
+  String EMPTY_DELIMITER = "";
+
+  /**
+   * @return Scheme of the object storage.
+   */
+  String scheme();
+
+  /**
+   * @return null if bucket doesn't exist.
+   */
+  BucketInfo bucket();
+
+  /**
+   * Initialize the Object storage, according to the properties.
+   *
+   * @param conf   to initialize the {@link ObjectStorage}
+   * @param bucket the corresponding bucket name, each object store has one bucket.
+   */
+  void initialize(Configuration conf, String bucket);
+
+  /**
+   * @return storage conf
+   */
+  Configuration conf();
+
+  default ObjectContent get(String key) {
+    return get(key, 0, -1);
+  }
+
+  /**
+   * Get the data for the given object specified by key.
+   * Throw {@link RuntimeException} if object key doesn't exist.
+   * Throw {@link RuntimeException} if object key is null or empty.
+   *
+   * @return {@link InputStream} to read the object content.
+   */
+  ObjectContent get(String key, long offset, long limit);
+
+  default byte[] put(String key, byte[] data) {
+    return put(key, data, 0, data.length);
+  }
+
+  default byte[] put(String key, byte[] data, int off, int len) {
+    return put(key, () -> new ByteArrayInputStream(data, off, len), len);
+  }
+
+  /**
+   * Put data read from a reader to an object specified by key. The implementation must ensure to
+   * close the stream created by stream provider after finishing stream operation.
+   * Throw {@link RuntimeException} if object key is null or empty.
+   *
+   * @param key            for the object.
+   * @param streamProvider the binary input stream provider that create input stream to write.
+   * @param contentLength  the content length, if the actual data is bigger than content length, the
+   *                       object can be created, but the object data will be truncated to the given
+   *                       content length, if the actual data is smaller than content length, will
+   *                       create object failed with unexpect end of IOException.
+   * @return the checksum of uploaded object
+   */
+  byte[] put(String key, InputStreamProvider streamProvider, long contentLength);
+
+  default byte[] append(String key, byte[] data) {
+    return append(key, data, 0, data.length);
+  }
+
+  default byte[] append(String key, byte[] data, int off, int len) {
+    return append(key, () -> new ByteArrayInputStream(data, off, len), len);
+  }
+
+  /**
+   * Append data read from a reader to an object specified by key. If the object exists, data will
+   * be appended to the tail. Otherwise, the object will be created and data will be written to it.
+   * Content length could be zero if object exists. If the object doesn't exist and content length
+   * is zero, a {@link NotAppendableException} will be thrown.
+   * <p>
+   * The first one wins if there are concurrent appends.
+   * <p>
+   * The implementation must ensure to close the stream created by stream provider after finishing
+   * stream operation.
+   * Throw {@link RuntimeException} if object key is null or empty.
+   *
+   * @param key            for the object.
+   * @param streamProvider the binary input stream provider that create input stream to write.
+   * @param contentLength  the appended content length. If the actual appended data is bigger than
+   *                       content length, the object can be appended but the data to append will be
+   *                       truncated to the given content length. If the actual data is smaller than
+   *                       content length, append object will fail with unexpect end IOException.
+   * @return the checksum of appended object.
+   * @throws NotAppendableException if the object already exists and is not appendable, or the
+   *                                object doesn't exist and content length is zero.
+   */
+  byte[] append(String key, InputStreamProvider streamProvider, long contentLength);
+
+  /**
+   * Delete an object.
+   * No exception thrown if the object key doesn't exist.
+   * Throw {@link RuntimeException} if object key is null or empty.
+   *
+   * @param key the given object key to be deleted.
+   */
+  void delete(String key);
+
+  /**
+   * Delete multiple keys. If one key doesn't exist, it will be treated as delete succeed, won't be
+   * included in response list.
+   *
+   * @param keys the given object keys to be deleted
+   * @return the keys delete failed
+   */
+  List<String> batchDelete(List<String> keys);
+
+  /**
+   * Delete all objects with the given prefix(include the prefix if the corresponding object exists)
+   *
+   * @param prefix the prefix key.
+   */
+  void deleteAll(String prefix);
+
+  /**
+   * Head returns some information about the object or a null if not found.
+   * Throw {@link RuntimeException} if object key is null or empty.
+   * There are some differences between directory bucket and general purpose bucket:
+   * <ul>
+   *   <li>Assume an file object 'a/b' exists, only head("a/b") will get the meta of object 'a/b'
+   *   for both general purpose bucket and directory bucket</li>
+   *   <li>Assume an dir object 'a/b/' exists, regarding general purpose bucket, only head("a/b/")
+   *   will get the meta of object 'a/b/', but for directory bucket, both head("a/b") and
+   *   head("a/b/") will get the meta of object 'a/b/'</li>
+   * </ul>
+   *
+   * @param key for the specified object.
+   * @return {@link ObjectInfo}, null if the object does not exist.
+   * @throws InvalidObjectKeyException if the object is locating under an existing file in directory
+   *                                   bucket, which is not allowed.
+   */
+  ObjectInfo head(String key);
+
+  /**
+   * List objects according to the given {@link ListObjectsRequest}
+   *
+   * @param request {@link ListObjectsRequest}
+   * @return the iterable of {@link ListObjectsResponse} which contains objects and common prefixes
+   */
+  Iterable<ListObjectsResponse> list(ListObjectsRequest request);
+
+  /**
+   * List limited objects in a given bucket
+   *
+   * @param prefix     Limits the response to keys that begin with the specified prefix.
+   * @param startAfter StartAfter is where you want the object storage to start listing from.
+   *                   object storage starts listing after this specified key.
+   *                   StartAfter can be any key in the bucket.
+   * @param limit      Limit the maximum number of response objects.
+   * @return {@link ObjectInfo} the object list with matched prefix key
+   */
+  default Iterable<ObjectInfo> list(String prefix, String startAfter, int limit) {
+    ListObjectsRequest request = ListObjectsRequest.builder()
+        .prefix(prefix)
+        .startAfter(startAfter)
+        .maxKeys(limit)
+        .delimiter(EMPTY_DELIMITER)
+        .build();
+
+    return new LazyReload<>(() -> {
+      Iterator<ListObjectsResponse> iterator = list(request).iterator();
+      return buf -> {
+        if (!iterator.hasNext()) {
+          return true;
+        }
+        buf.addAll(iterator.next().objects());
+
+        return !iterator.hasNext();
+      };
+    });
+  }
+
+  /**
+   * List all objects in a given bucket
+   *
+   * @param prefix     Limits the response to keys that begin with the specified prefix.
+   * @param startAfter StartAfter is where you want the object storage to start listing from.
+   *                   object storage starts listing after this specified key.
+   *                   StartAfter can be any key in the bucket.
+   * @return {@link ObjectInfo} Iterable to iterate over the objects with matched prefix key
+   *                            and StartAfter
+   */
+  default Iterable<ObjectInfo> listAll(String prefix, String startAfter) {
+    return list(prefix, startAfter, -1);
+  }
+
+  /**
+   * CreateMultipartUpload starts to upload a large object part by part.
+   *
+   * @param key for the specified object.
+   * @return {@link MultipartUpload}.
+   */
+  MultipartUpload createMultipartUpload(String key);
+
+  /**
+   * UploadPart upload a part of an object. The implementation must ensure to close the stream
+   * created by stream provider after finishing stream operation.
+   *
+   * @param key            for the specified object.
+   * @param uploadId       for the multipart upload id.
+   * @param partNum        upload part number.
+   * @param streamProvider the stream provider to provider part stream
+   * @param contentLength  the content length, if the actual data is bigger than content length, the
+   *                       object can be created, but the object data will be truncated to the given
+   *                       content length, if the actual data is smaller than content length, will
+   *                       create object failed with unexpect end of IOException.
+   */
+  Part uploadPart(String key, String uploadId, int partNum, InputStreamProvider streamProvider,
+      long contentLength);
+
+  /**
+   * Complete the multipart uploads with given object key and upload id.
+   *
+   * @param key         for the specified object.
+   * @param uploadId    id of the multipart upload.
+   * @param uploadParts parts to upload.
+   * @return the checksum of uploaded object
+   */
+  byte[] completeUpload(String key, String uploadId, List<Part> uploadParts);
+
+  /**
+   * Abort a multipart upload.
+   *
+   * @param key      object key.
+   * @param uploadId multipart upload Id.
+   */
+  void abortMultipartUpload(String key, String uploadId);
+
+  /**
+   * List multipart uploads under a path.
+   *
+   * @param prefix for uploads to abort.
+   * @return Iterable to iterate over multipart unloads.
+   */
+  Iterable<MultipartUpload> listUploads(String prefix);
+
+  /**
+   * upload part copy with mutipart upload id
+   *
+   * @param srcKey               source object key
+   * @param dstKey               dest object key
+   * @param uploadId             id of the multipart upload copy
+   * @param partNum              part num of the multipart upload copy
+   * @param copySourceRangeStart copy source range start of source object
+   * @param copySourceRangeEnd   copy source range end of source object
+   * @return {@link Part}.
+   */
+  Part uploadPartCopy(
+      String srcKey, String dstKey, String uploadId, int partNum, long copySourceRangeStart,
+      long copySourceRangeEnd);
+
+  /**
+   * Copy binary content from one object to another object.
+   *
+   * @param srcKey source object key
+   * @param dstKey dest object key
+   */
+  void copy(String srcKey, String dstKey);
+
+  /**
+   * Atomic rename source object to dest object without any data copying.
+   * Will overwrite dest object if dest object exists.
+   *
+   * @param srcKey source object key
+   * @param dstKey dest object key
+   * @throws RuntimeException if rename failed,e.g. srcKey is equal to dstKey or the source object
+   *                          doesn't exist.
+   */
+  void rename(String srcKey, String dstKey);
+
+  /**
+   * Attach tags to specified object. This method will overwrite all existed tags with the new tags.
+   * Remove all existed tags if the new tags are empty. The maximum tags number is 10.
+   *
+   * @param key     the key of the object key.
+   * @param newTags the new tags to put.
+   * @throws RuntimeException if key doesn't exist.
+   */
+  default void putTags(String key, Map<String, String> newTags) {
+    throw new UnsupportedOperationException(
+        this.getClass().getSimpleName() + " doesn't support putObjectTagging.");
+  }
+
+  /**
+   * Get all attached tags of the object.
+   *
+   * @param key the key of the object.
+   * @return map containing all tags.
+   * @throws RuntimeException if key doesn't exist.
+   */
+  default Map<String, String> getTags(String key) {
+    throw new UnsupportedOperationException(
+        this.getClass().getSimpleName() + " doesn't support getObjectTagging.");
+  }
+
+  /**
+   * Gets the object status for the given key.
+   * It's different from {@link ObjectStorage#head(String)}, it returns object info if the key
+   * exists or the prefix with value key exists.
+   * <p>
+   * There are three kinds of implementations:
+   * <ul>
+   *   <li>Uses the headObject API if the object storage support directory bucket and the requested
+   *   bucket is a directory bucket, the object storage will return object directly if the file or
+   *   dir exists, otherwise return null</li>
+   *   <li>Uses getFileStatus API if the object storage support it, e.g. TOS. The object storage
+   *   will return the object directly if the key or prefix exists, otherwise return null.</li>
+   *   <li>If the object storage doesn't support above all cases, you have to try to headObject(key)
+   *   at first, if the object doesn't exist, and then headObject(key + "/") later if the key
+   *   doesn't end with '/', and if neither the new key doesn't exist, and then use listObjects API
+   *   to check whether the prefix/key exist.</li>
+   * </ul>
+   *
+   * @param key the object
+   * @return object info if the key or prefix exists, otherwise return null.
+   * @throws InvalidObjectKeyException if the object is locating under an existing file in directory
+   *                                   bucket, which is not allowed.
+   */
+  ObjectInfo objectStatus(String key);
+
+  /**
+   * Get the object storage checksum information, including checksum algorithm name,
+   * checksum type, etc.
+   *
+   * @return checksum information of this storage.
+   */
+  ChecksumInfo checksumInfo();
+}
+

+ 79 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/Part.java

@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss;
+
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+
+import java.util.Objects;
+
+public class Part {
+  private int num;
+  private long size;
+  private String eTag;
+
+  // No-arg constructor for json serializer, don't use.
+  public Part() {
+  }
+
+  public Part(int num, long size, String eTag) {
+    this.num = num;
+    this.size = size;
+    this.eTag = eTag;
+  }
+
+  public int num() {
+    return num;
+  }
+
+  public long size() {
+    return size;
+  }
+
+  public String eTag() {
+    return eTag;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(num, size, eTag);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof Part)) {
+      return false;
+    }
+    Part that = (Part) o;
+    return Objects.equals(num, that.num)
+        && Objects.equals(size, that.size)
+        && Objects.equals(eTag, that.eTag);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("PartNum", num)
+        .add("PartSize", size)
+        .add("ETag", eTag)
+        .toString();
+  }
+}
+

+ 85 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/request/ListObjectsRequest.java

@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss.request;
+
+public class ListObjectsRequest {
+  private final String prefix;
+  private final String startAfter;
+  private final int maxKeys;
+  private final String delimiter;
+
+  private ListObjectsRequest(String prefix, String startAfter, int maxKeys, String delimiter) {
+    this.prefix = prefix;
+    this.startAfter = startAfter;
+    this.maxKeys = maxKeys;
+    this.delimiter = delimiter;
+  }
+
+  public String prefix() {
+    return prefix;
+  }
+
+  public String startAfter() {
+    return startAfter;
+  }
+
+  public int maxKeys() {
+    return maxKeys;
+  }
+
+  public String delimiter() {
+    return delimiter;
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private String prefix;
+    private String startAfter;
+    // -1 means list all object keys
+    private int maxKeys = -1;
+    private String delimiter;
+
+    public Builder prefix(String prefix) {
+      this.prefix = prefix;
+      return this;
+    }
+
+    public Builder startAfter(String startAfter) {
+      this.startAfter = startAfter;
+      return this;
+    }
+
+    public Builder maxKeys(int maxKeys) {
+      this.maxKeys = maxKeys;
+      return this;
+    }
+
+    public Builder delimiter(String delimiter) {
+      this.delimiter = delimiter;
+      return this;
+    }
+
+    public ListObjectsRequest build() {
+      return new ListObjectsRequest(prefix, startAfter, maxKeys, delimiter);
+    }
+  }
+}

+ 43 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/oss/response/ListObjectsResponse.java

@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.oss.response;
+
+import org.apache.hadoop.fs.tosfs.oss.ObjectInfo;
+
+import java.util.List;
+
+public class ListObjectsResponse {
+  private final List<ObjectInfo> objects;
+  private final List<String> commonPrefixes;
+
+  public ListObjectsResponse(
+      List<ObjectInfo> objects,
+      List<String> commonPrefixes) {
+    this.objects = objects;
+    this.commonPrefixes = commonPrefixes;
+  }
+
+  public List<ObjectInfo> objects() {
+    return objects;
+  }
+
+  public List<String> commonPrefixes() {
+    return commonPrefixes;
+  }
+}