Ver código fonte

Integration of TOS: Add FsOps.

1. Add FsOps, DefaultFsOps, DirectoryFsOps, RenameOp.
2. Add RawFileStatus.
lijinglun 7 meses atrás
pai
commit
5774f37b1f

+ 29 - 1
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/TosFileSystem.java → hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileStatus.java

@@ -18,5 +18,33 @@
 
 
 package org.apache.hadoop.fs.tosfs;
 package org.apache.hadoop.fs.tosfs;
 
 
-public class TosFileSystem {
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+
+public class RawFileStatus extends FileStatus {
+  private final byte[] checksum;
+
+  /**
+   * File status of directory
+   *
+   * @param path  directory path
+   * @param owner directory owner
+   */
+  public RawFileStatus(Path path, String owner) {
+    this(0, true, 1, System.currentTimeMillis(), path, owner, Constants.MAGIC_CHECKSUM);
+  }
+
+  public RawFileStatus(
+      long length, boolean isdir, long blocksize,
+      long modification_time, Path path, String owner, byte[] checksum) {
+    super(length, isdir, 1, blocksize, modification_time, path);
+    setOwner(owner);
+    setGroup(owner);
+    this.checksum = checksum;
+  }
+
+  public byte[] checksum() {
+    return checksum;
+  }
 }
 }

+ 766 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java

@@ -0,0 +1,766 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs;
+
+import io.proton.commit.MagicOutputStream;
+import io.proton.common.conf.Conf;
+import io.proton.common.conf.ConfKeys;
+import io.proton.common.object.DirectoryStorage;
+import io.proton.common.object.ObjectInfo;
+import io.proton.common.object.ObjectMultiRangeInputStream;
+import io.proton.common.object.ObjectOutputStream;
+import io.proton.common.object.ObjectRangeInputStream;
+import io.proton.common.object.ObjectStorage;
+import io.proton.common.object.ObjectStorageFactory;
+import io.proton.common.object.ObjectUtils;
+import io.proton.common.object.exceptions.InvalidObjectKeyException;
+import io.proton.common.util.Bytes;
+import io.proton.common.util.Constants;
+import io.proton.common.util.FSUtils;
+import io.proton.common.util.FuseUtils;
+import io.proton.common.util.HadoopUtil;
+import io.proton.common.util.Range;
+import io.proton.common.util.RemoteIterators;
+import io.proton.common.util.ThreadPools;
+import io.proton.fs.ops.DefaultFsOps;
+import io.proton.fs.ops.DirectoryFsOps;
+import io.proton.fs.ops.FsOps;
+import io.proton.shaded.com.google.common.annotations.VisibleForTesting;
+import io.proton.shaded.com.google.common.base.Preconditions;
+import io.proton.shaded.com.google.common.collect.Iterators;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.ParentNotDirectoryException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIOException;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.fs.XAttrSetFlag.CREATE;
+import static org.apache.hadoop.fs.XAttrSetFlag.REPLACE;
+
+public class RawFileSystem extends FileSystem {
+  private static final Logger LOG = LoggerFactory.getLogger(RawFileSystem.class);
+  private static final String MULTIPART_THREAD_POOL_PREFIX = "rawfs-multipart-thread-pool";
+  private static final String TASK_THREAD_POOL_PREFIX = "rawfs-task-thread-pool";
+  // This is the same as HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, we do not
+  // use that directly because we don't want to introduce the hdfs client library.
+  private static final String DFS_BLOCK_SIZE_KEY = "dfs.blocksize";
+  private static final long DFS_BLOCK_SIZE_DEFAULT = 128 << 20;
+
+  private String scheme;
+  private Conf protonConf;
+  private String username;
+  private Path workingDir;
+  private URI uri;
+  private String bucket;
+  private ObjectStorage storage;
+  // Use for task parallel execution, such as parallel to copy multiple files.
+  private ExecutorService taskThreadPool;
+  // Use for file multipart upload only.
+  private ExecutorService uploadThreadPool;
+  private FsOps fsOps;
+
+  @Override
+  public URI getUri() {
+    return uri;
+  }
+
+  @Override
+  public String getScheme() {
+    return scheme;
+  }
+
+  @VisibleForTesting
+  String bucket() {
+    return bucket;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    super.setConf(conf);
+  }
+
+  @Override
+  public FSDataInputStream open(Path path, int bufferSize) throws IOException {
+    LOG.debug("Opening '{}' for reading.", path);
+    FileStatus status = innerFileStatus(path);
+    if (status.isDirectory()) {
+      throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path));
+    }
+
+    // Parse the range size from the hadoop conf.
+    long rangeSize = getConf().getLong(
+        ConfKeys.OBJECT_STREAM_RANGE_SIZE.key(),
+        ConfKeys.OBJECT_STREAM_RANGE_SIZE.defaultValue());
+    Preconditions.checkArgument(rangeSize > 0, "Object storage range size must be positive.");
+
+    FSInputStream fsIn = new ObjectMultiRangeInputStream(taskThreadPool, storage, path, status.getLen(), rangeSize);
+    return new FSDataInputStream(fsIn);
+  }
+
+  public FSDataInputStream open(Path path, String expectedChecksum, Range range) throws IOException {
+    LOG.debug("Opening '{}' for reading.", path);
+    RawFileStatus status = innerFileStatus(path);
+    if (status.isDirectory()) {
+      throw new FileNotFoundException(String.format("Can't open %s because it is a directory", path));
+    }
+
+    if (expectedChecksum != null && !Objects.equals(status.checksum(), expectedChecksum)) {
+      throw new ChecksumMismatchException(String.format("The requested file has been staled, " +
+          "request version's checksum is %s " +
+          "while current version's checksum is %s", expectedChecksum, status.checksum()));
+    }
+
+    return new FSDataInputStream(new ObjectRangeInputStream(storage, path, range));
+  }
+
+  @Override
+  public FSDataOutputStream create(
+      Path path, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize,
+      Progressable progress) throws IOException {
+    FileStatus fileStatus = getFileStatusOrNull(path);
+    if (fileStatus != null) {
+      if (fileStatus.isDirectory()) {
+        throw new FileAlreadyExistsException(path + " is a directory");
+      }
+
+      if (!overwrite) {
+        throw new FileAlreadyExistsException(path + " already exists");
+      }
+      LOG.debug("Overwriting file {}", path);
+    }
+
+    if (MagicOutputStream.isMagic(path)) {
+      return new FSDataOutputStream(
+          new MagicOutputStream(this, storage, uploadThreadPool, protonConf, makeQualified(path)), null);
+    } else {
+      ObjectOutputStream out =
+          new ObjectOutputStream(storage, uploadThreadPool, protonConf, makeQualified(path), true);
+
+      if (fileStatus == null && FuseUtils.fuseEnabled()) {
+        // The fuse requires the file to be visible when accessing getFileStatus once we created the file, so here we
+        // close and commit the file to be visible explicitly for fuse, and then reopen the file output stream for
+        // further data bytes writing. For more details please see: https://code.byted.org/emr/proton/issues/825
+        out.close();
+        out = new ObjectOutputStream(storage, uploadThreadPool, protonConf, makeQualified(path), true);
+      }
+
+      return new FSDataOutputStream(out, null);
+    }
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listFiles(Path f, boolean recursive) throws IOException {
+    Path path = makeQualified(f);
+    LOG.debug("listFiles({}, {})", path, recursive);
+
+    // assume the path is a dir at first, and list sub files
+    RemoteIterator<LocatedFileStatus> subFiles = RemoteIterators.fromIterable(
+        fsOps.listDir(path, recursive, key -> !ObjectInfo.isDir(key)), this::toLocatedFileStatus);
+    if (!subFiles.hasNext()) {
+      final RawFileStatus fileStatus = innerFileStatus(path);
+      if (fileStatus.isFile()) {
+        return RemoteIterators.fromSingleton(toLocatedFileStatus(fileStatus));
+      }
+    }
+    return subFiles;
+  }
+
+  private RawLocatedFileStatus toLocatedFileStatus(RawFileStatus status) throws IOException {
+    return new RawLocatedFileStatus(status,
+        status.isFile() ? getFileBlockLocations(status, 0, status.getLen()) : null);
+  }
+
+  @Override
+  public FSDataOutputStream createNonRecursive(
+      Path path,
+      FsPermission permission,
+      EnumSet<CreateFlag> flag,
+      int bufferSize,
+      short replication,
+      long blockSize,
+      Progressable progress) throws IOException {
+    Path qualified = makeQualified(path);
+    return create(qualified, permission, flag.contains(CreateFlag.OVERWRITE),
+        bufferSize, replication, blockSize, progress);
+  }
+
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  /**
+   * Rename src path to dest path, if dest path is an existed dir,
+   * then FS will rename the src path under the dst dir.
+   * E.g. rename('/a/b', '/a/c') and dest 'c' is an existed dir,
+   * then the source path '/a/b' will be renamed with dest path '/a/b/c' internally.
+   *
+   * <ul>
+   *   <li>Return false if src doesn't exist</li>
+   *   <li>Return false if src is root</li>
+   *   <li>Return false if dst path is under src path, e.g. rename('/a/b', '/a/b/c')</li>
+   *   <li>Return false if dst path already exists</li>
+   *   <li>Return true if rename('/a/b', '/a/b') and 'b' is an existed file</li>
+   *   <li>Return true if rename('/a/b', '/a') and 'a' is an existed dir,
+   *   fs will rename '/a/b' to '/a/b' internally</li>
+   *   <li>Return false if rename('/a/b', '/a/b') and 'b' is an existed dir,
+   *   because fs will try to rename '/a/b' to '/a/b/b', which is under '/a/b', this behavior is forbidden.</li>
+   * </ul>
+   *
+   * @param src path to be renamed
+   * @param dst path after rename
+   * @return true if rename is successful
+   * @throws IOException on failure
+   */
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    LOG.debug("Rename source path {} to dest path {}", src, dst);
+
+    // 1. Check source and destination path
+    Future<FileStatus> srcStatusFuture = taskThreadPool.submit(() -> checkAndGetSrcStatus(src));
+    Future<Path> destPathFuture = taskThreadPool.submit(() -> checkAndGetDstPath(src, dst));
+
+    FileStatus srcStatus;
+    Path dstPath;
+    try {
+      srcStatus = srcStatusFuture.get();
+      dstPath = destPathFuture.get();
+
+      if (src.equals(dstPath)) {
+        return true;
+      }
+    } catch (InterruptedException | ExecutionException e) {
+      LOG.error("Failed to rename path, src: {}, dst: {}", src, dst, e);
+      return false;
+    }
+
+    // 2. Start copy source to destination
+    if (srcStatus.isDirectory()) {
+      fsOps.renameDir(srcStatus.getPath(), dstPath);
+    } else {
+      fsOps.renameFile(srcStatus.getPath(), dstPath, srcStatus.getLen());
+    }
+
+    return true;
+  }
+
+  private Path checkAndGetDstPath(Path src, Path dest) throws IOException {
+    FileStatus destStatus = getFileStatusOrNull(dest);
+    // 1. Rebuilding the destination path
+    Path finalDstPath = dest;
+    if (destStatus != null && destStatus.isDirectory()) {
+      finalDstPath = new Path(dest, src.getName());
+    }
+
+    // 2. No need to check the dest path because renaming itself is allowed.
+    if (src.equals(finalDstPath)) {
+      return finalDstPath;
+    }
+
+    // 3. Ensure the source path cannot be the ancestor of destination path.
+    if (RawFSUtils.inSubtree(src, finalDstPath)) {
+      throw new IOException(String.format("Failed to rename since it is prohibited to " +
+          "rename dest path %s under src path %s", finalDstPath, src));
+    }
+
+    // 4. Ensure the destination path doesn't exist.
+    FileStatus finalDstStatus = destStatus;
+    if (destStatus != null && destStatus.isDirectory()) {
+      finalDstStatus = getFileStatusOrNull(finalDstPath);
+    }
+    if (finalDstStatus != null) {
+      throw new FileAlreadyExistsException(
+          String.format("Failed to rename since the dest path %s already exists.", finalDstPath));
+    } else {
+      return finalDstPath;
+    }
+  }
+
+  private FileStatus checkAndGetSrcStatus(Path src) throws IOException {
+    // throw FileNotFoundException if src not found.
+    FileStatus srcStatus = innerFileStatus(src);
+
+    if (src.isRoot()) {
+      throw new IOException(String.format("Cannot rename the root directory %s to another name", src));
+    }
+    return srcStatus;
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    LOG.debug("Delete path {} - recursive {}", f, recursive);
+    try {
+      FileStatus fileStatus = getFileStatus(f);
+      Path path = fileStatus.getPath();
+
+      if (path.isRoot()) {
+        return deleteRoot(path, recursive);
+      } else {
+        if (fileStatus.isDirectory()) {
+          fsOps.deleteDir(path, recursive);
+        } else {
+          fsOps.deleteFile(path);
+        }
+        return true;
+      }
+    } catch (FileNotFoundException e) {
+      LOG.debug("Couldn't delete {} - does not exist", f);
+      return false;
+    }
+  }
+
+  /**
+   * Reject deleting root directory and implement the specific logic to compatible with
+   * AbstractContractRootDirectoryTest rm test cases.
+   *
+   * @param root      the root path.
+   * @param recursive indicate whether delete directory recursively
+   * @return true if root directory is empty, false if trying to delete a non-empty dir recursively.
+   * @throws IOException if trying to delete the non-empty root dir non-recursively.
+   */
+  private boolean deleteRoot(Path root, boolean recursive) throws IOException {
+    LOG.info("Delete the {} root directory of {}", bucket, recursive);
+    boolean isEmptyDir = fsOps.isEmptyDirectory(root);
+    if (isEmptyDir) {
+      return true;
+    }
+    if (recursive) {
+      // AbstractContractRootDirectoryTest#testRmRootRecursive doesn't expect any exception if trying to delete a
+      // non-empty root directory recursively, so we have to return false here instead of throwing a IOException.
+      return false;
+    } else {
+      // AbstractContractRootDirectoryTest#testRmNonEmptyRootDirNonRecursive expect a exception if trying to delete a
+      // non-empty root directory non-recursively, so we have to throw a IOException instead of returning false.
+      throw new PathIOException(bucket, "Cannot delete root path");
+    }
+  }
+
+  @Override
+  public RawFileStatus[] listStatus(Path f) throws IOException {
+    LOG.debug("List status for path: {}", f);
+    return Iterators.toArray(listStatus(f, false), RawFileStatus.class);
+  }
+
+  public Iterator<RawFileStatus> listStatus(Path f, boolean recursive) throws IOException {
+    Path path = makeQualified(f);
+    // Assuming path is a dir at first.
+    Iterator<RawFileStatus> iterator = fsOps.listDir(path, recursive, key -> true).iterator();
+    if (iterator.hasNext()) {
+      return iterator;
+    } else {
+      RawFileStatus fileStatus = innerFileStatus(path);
+      if (fileStatus.isFile()) {
+        return Collections.singletonList(fileStatus).iterator();
+      } else {
+        // The path is an empty dir.
+        return Collections.emptyIterator();
+      }
+    }
+  }
+
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(Path p) throws IOException {
+    // We expect throw FileNotFoundException if the path doesn't exist during creating the RemoteIterator instead of
+    // throwing FileNotFoundException during call hasNext method.
+
+    // The follow RemoteIterator is as same as {@link FileSystem#DirListingIterator} above hadoop 3.2.2,
+    // but below 3.2.2, the DirListingIterator fetches the directory entries during call hasNext method instead of
+    // create the DirListingIterator instance.
+    return new RemoteIterator<FileStatus>() {
+      private DirectoryEntries entries = listStatusBatch(p, null);
+      private int index = 0;
+
+      @Override
+      public boolean hasNext() {
+        return index < entries.getEntries().length || entries.hasMore();
+      }
+
+      private void fetchMore() throws IOException {
+        byte[] token = entries.getToken();
+        entries = listStatusBatch(p, token);
+        index = 0;
+      }
+
+      @Override
+      public FileStatus next() throws IOException {
+        if (!hasNext()) {
+          throw new NoSuchElementException("No more items in iterator");
+        } else {
+          if (index == entries.getEntries().length) {
+            fetchMore();
+            if (!hasNext()) {
+              throw new NoSuchElementException("No more items in iterator");
+            }
+          }
+
+          return entries.getEntries()[index++];
+        }
+      }
+    };
+  }
+
+  public static long dateToLong(final Date date) {
+    return date == null ? 0L : date.getTime();
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return workingDir;
+  }
+
+  @Override
+  public void setWorkingDirectory(Path new_dir) {
+    this.workingDir = new_dir;
+  }
+
+  @Override
+  public boolean mkdirs(Path path, FsPermission permission) throws IOException {
+    try {
+      FileStatus fileStatus = innerFileStatus(path);
+      if (fileStatus.isDirectory()) {
+        return true;
+      } else {
+        throw new FileAlreadyExistsException("Path is a file: " + path);
+      }
+    } catch (FileNotFoundException e) {
+      Path dir = makeQualified(path);
+      validatePath(dir);
+      fsOps.mkdirs(dir);
+    }
+    return true;
+  }
+
+  private void validatePath(Path path) throws IOException {
+    Path parent = path.getParent();
+    do {
+      try {
+        FileStatus fileStatus = innerFileStatus(parent);
+        if (fileStatus.isDirectory()) {
+          // If path exists and a directory, exit
+          break;
+        } else {
+          throw new FileAlreadyExistsException(String.format("Can't make directory for path '%s', it is a file.",
+              parent));
+        }
+      } catch (FileNotFoundException ignored) {
+      }
+      parent = parent.getParent();
+    } while (parent != null);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path path) throws IOException {
+    try {
+      return innerFileStatus(path);
+    } catch (ParentNotDirectoryException e) {
+      // Treat ParentNotDirectoryException as FileNotFoundException for the case that check whether path exist or not.
+      throw new FileNotFoundException(e.getMessage());
+    }
+  }
+
+
+  /**
+   * Get the file status of given path.
+   *
+   * @param f the path
+   * @return {@link RawFileStatus} describe file status info.
+   * @throws FileNotFoundException       if the path doesn't exist.
+   * @throws ParentNotDirectoryException if the path is locating under an existing file, which is not allowed
+   *                                     in directory bucket case.
+   */
+  RawFileStatus innerFileStatus(Path f) throws ParentNotDirectoryException, FileNotFoundException {
+    Path qualifiedPath = f.makeQualified(uri, workingDir);
+    RawFileStatus fileStatus = getFileStatusOrNull(qualifiedPath);
+    if (fileStatus == null) {
+      throw new FileNotFoundException(String.format("No such file or directory: %s", qualifiedPath));
+    }
+    return fileStatus;
+  }
+
+  /**
+   * The different with {@link RawFileSystem#getFileStatus(Path)} is that:
+   * 1. throw  {@link ParentNotDirectoryException} if the path is locating under an existing file in directory bucket
+   * case, but {@link RawFileSystem#getFileStatus(Path)} will ignore whether the invalid path and
+   * throw {@link FileNotFoundException}
+   * 2. return null if the path doesn't exist instead of throwing {@link FileNotFoundException}.
+   *
+   * @param path the object path.
+   * @return null if the path doesn't exist.
+   * @throws ParentNotDirectoryException if the path is locating under an existing file, which is not allowed
+   *                                     in directory bucket case.
+   */
+  public RawFileStatus getFileStatusOrNull(final Path path) throws ParentNotDirectoryException {
+    Path qualifiedPath = path.makeQualified(uri, workingDir);
+    String key = ObjectUtils.pathToKey(qualifiedPath);
+
+    // Root directory always exists
+    if (key.isEmpty()) {
+      return new RawFileStatus(0, true, 0, 0, qualifiedPath, username, Constants.EMPTY_CHECKSUM);
+    }
+
+    try {
+      ObjectInfo obj = storage.objectStatus(key);
+      if (obj == null) {
+        return null;
+      } else {
+        return objectToFileStatus(obj);
+      }
+    } catch (InvalidObjectKeyException e) {
+      String msg = String.format("The object key %s is a invalid key, detail: %s", key, e.getMessage());
+      throw new ParentNotDirectoryException(msg);
+    }
+  }
+
+  private RawFileStatus objectToFileStatus(ObjectInfo obj) {
+    Path keyPath = makeQualified(ObjectUtils.keyToPath(obj.key()));
+    long blockSize = obj.isDir() ? 0 : getDefaultBlockSize(keyPath);
+    long modificationTime = dateToLong(obj.mtime());
+    return new RawFileStatus(obj.size(), obj.isDir(), blockSize, modificationTime, keyPath, username, obj.checksum());
+  }
+
+  @Override
+  @Deprecated
+  public long getDefaultBlockSize() {
+    return getConf().getLongBytes(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT);
+  }
+
+  @Override
+  public FsServerDefaults getServerDefaults(Path p) throws IOException {
+    Configuration config = getConf();
+    // CRC32 is chosen as default as it is available in all
+    // releases that support checksum.
+    // The client trash configuration is ignored.
+    return new FsServerDefaults(getDefaultBlockSize(),
+        config.getInt("dfs.bytes-per-checksum", 512),
+        64 * 1024,
+        getDefaultReplication(),
+        config.getInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+            CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT),
+        false,
+        CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT,
+        DataChecksum.Type.CRC32,
+        "");
+  }
+
+  private void stopAllServices() {
+    HadoopUtil.shutdownHadoopExecutors(uploadThreadPool, LOG, 30, TimeUnit.SECONDS);
+    HadoopUtil.shutdownHadoopExecutors(taskThreadPool, LOG, 30, TimeUnit.SECONDS);
+  }
+
+  @Override
+  public void initialize(URI uri, Configuration conf) throws IOException {
+    super.initialize(uri, conf);
+    setConf(conf);
+    this.scheme = FSUtils.scheme(conf, uri);
+
+    // Merge the deprecated configure keys with the new configure keys and convert hadoop conf to proton conf.
+    FSUtils.withCompatibleKeys(conf, scheme);
+    this.protonConf = Conf.copyOf(conf);
+
+    // Username is the current user at the time the FS was instantiated.
+    this.username = UserGroupInformation.getCurrentUser().getShortUserName();
+    this.workingDir = new Path("/user", username).makeQualified(uri, null);
+    this.uri = URI.create(scheme + "://" + uri.getAuthority());
+    this.bucket = this.uri.getAuthority();
+    this.storage = ObjectStorageFactory.create(scheme, bucket, protonConf);
+    if (storage.bucket() == null) {
+      throw new FileNotFoundException(String.format("Bucket: %s not found.", uri.getAuthority()));
+    }
+
+    int taskThreadPoolSize = protonConf.get(ConfKeys.TASK_THREAD_POOL_SIZE.format(scheme));
+    this.taskThreadPool = ThreadPools.newWorkerPool(TASK_THREAD_POOL_PREFIX, taskThreadPoolSize);
+
+    int uploadThreadPoolSize = protonConf.get(ConfKeys.MULTIPART_THREAD_POOL_SIZE.format(scheme));
+    this.uploadThreadPool = ThreadPools.newWorkerPool(MULTIPART_THREAD_POOL_PREFIX, uploadThreadPoolSize);
+
+    if (storage.bucket().isDirectory()) {
+
+      fsOps = new DirectoryFsOps((DirectoryStorage) storage, this::objectToFileStatus);
+    } else {
+      fsOps = new DefaultFsOps(storage, protonConf, taskThreadPool, this::objectToFileStatus);
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    try {
+      super.close();
+      storage.close();
+    } finally {
+      stopAllServices();
+    }
+  }
+
+  public ObjectStorage storage() {
+    return storage;
+  }
+
+  public ExecutorService uploadThreadPool() {
+    return uploadThreadPool;
+  }
+
+  String username() {
+    return username;
+  }
+
+  /**
+   * @return null if checksum is not supported.
+   */
+  @Override
+  public FileChecksum getFileChecksum(Path f, long length) throws IOException {
+    Preconditions.checkArgument(length >= 0);
+    RawFileStatus fileStatus = innerFileStatus(f);
+    if (fileStatus.isDirectory()) {
+      // Compatible with HDFS
+      throw new FileNotFoundException(String.format("Path is not a file, %s", f));
+    }
+    if (!protonConf.get(ConfKeys.CHECKSUM_ENABLED.format(scheme))) {
+      return null;
+    }
+
+    return BaseChecksum.create(protonConf, fileStatus, length);
+  }
+
+  @Override
+  public String getCanonicalServiceName() {
+    return null;
+  }
+
+  @Override
+  public void setXAttr(Path path, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException {
+    Preconditions.checkNotNull(name, "xAttr name must not be null.");
+    Preconditions.checkArgument(!name.isEmpty(), "xAttr name must not be empty.");
+    Preconditions.checkNotNull(value, "xAttr value must not be null.");
+
+    if (getFileStatus(path).isFile()) {
+      Path qualifiedPath = path.makeQualified(uri, workingDir);
+      String key = ObjectUtils.pathToKey(qualifiedPath);
+
+      Map<String, String> existedTags = storage.getTags(key);
+      validateXAttrFlag(name, existedTags.containsKey(name), flag);
+
+      String newValue = Bytes.toString(value);
+      String previousValue = existedTags.put(name, newValue);
+      if (!newValue.equals(previousValue)) {
+        storage.putTags(key, existedTags);
+      }
+    }
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(Path path) throws IOException {
+    if (getFileStatus(path).isDirectory()) {
+      return new HashMap<>();
+    } else {
+      Path qualifiedPath = path.makeQualified(uri, workingDir);
+      String key = ObjectUtils.pathToKey(qualifiedPath);
+
+      Map<String, String> tags = storage.getTags(key);
+      return tags.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, t -> Bytes.toBytes(t.getValue())));
+    }
+  }
+
+  @Override
+  public byte[] getXAttr(Path path, String name) throws IOException {
+    Map<String, byte[]> xAttrs = getXAttrs(path);
+    if (xAttrs.containsKey(name)) {
+      return xAttrs.get(name);
+    } else {
+      throw new IOException("Attribute with name " + name + " is not found.");
+    }
+  }
+
+  @Override
+  public Map<String, byte[]> getXAttrs(Path path, List<String> names) throws IOException {
+    Map<String, byte[]> xAttrs = getXAttrs(path);
+    xAttrs.keySet().retainAll(names);
+    if (xAttrs.size() == names.size()) {
+      return xAttrs;
+    } else {
+      List<String> badNames = names.stream().filter(n -> !xAttrs.containsKey(n)).collect(Collectors.toList());
+      throw new IOException("Attributes with name " + badNames + " are not found.");
+    }
+  }
+
+  @Override
+  public List<String> listXAttrs(Path path) throws IOException {
+    return getXAttrs(path).keySet().stream().collect(Collectors.toList());
+  }
+
+  @Override
+  public void removeXAttr(Path path, String name) throws IOException {
+    if (getFileStatus(path).isFile()) {
+      Path qualifiedPath = path.makeQualified(uri, workingDir);
+      String key = ObjectUtils.pathToKey(qualifiedPath);
+
+      Map<String, String> existedTags = storage.getTags(key);
+      if (existedTags.remove(name) != null) {
+        storage.putTags(key, existedTags);
+      }
+    }
+  }
+
+  private void validateXAttrFlag(String xAttrName, boolean xAttrExists, EnumSet<XAttrSetFlag> flag) throws IOException {
+    if (xAttrExists) {
+      if (!flag.contains(REPLACE)) {
+        throw new IOException("XAttr: " + xAttrName + " already exists. The REPLACE flag must be specified.");
+      }
+    } else {
+      if (!flag.contains(CREATE)) {
+        throw new IOException("XAttr: " + xAttrName + " does not exist. The CREATE flag must be specified.");
+      }
+    }
+  }
+}

+ 38 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java

@@ -46,4 +46,42 @@ public class ConfKeys {
    */
    */
   public static final ArgumentKey FS_BATCH_DELETE_SIZE = new ArgumentKey("fs.%s.delete.batch-size");
   public static final ArgumentKey FS_BATCH_DELETE_SIZE = new ArgumentKey("fs.%s.delete.batch-size");
   public static final int FS_BATCH_DELETE_SIZE_DEFAULT = 250;
   public static final int FS_BATCH_DELETE_SIZE_DEFAULT = 250;
+
+  /**
+   * The multipart upload part size of the given object storage, e.g. fs.tos.multipart.size.
+   */
+  public static final String MULTIPART_SIZE = "fs.tos.multipart.size";
+  public static final long MULTIPART_SIZE_DEFAULT = 8L << 20;
+
+  /**
+   * The threshold (larger than this value) to enable multipart upload during copying objects
+   * in the given object storage. If the copied data size is less than threshold, will copy data via
+   * executing copyObject instead of uploadPartCopy. E.g. fs.tos.multipart.copy-threshold
+   */
+  public static final String MULTIPART_COPY_THRESHOLD = "fs.tos.multipart.copy-threshold";
+  public static final long MULTIPART_COPY_THRESHOLD_DEFAULT = 5L << 20;
+
+  /**
+   * The batch size of deleting multiple objects per request for the given object storage.
+   * e.g. fs.tos.delete.batch-size
+   */
+  public static final String BATCH_DELETE_SIZE = "fs.tos.delete.batch-size";
+  public static final int BATCH_DELETE_SIZE_DEFAULT = 250;
+
+  /**
+   * True to create the missed parent dir asynchronously during deleting or renaming a file or dir.
+   */
+  public static final String ASYNC_CREATE_MISSED_PARENT = "fs.tos.missed.parent.dir.async-create";
+  public static final boolean ASYNC_CREATE_MISSED_PARENT_DEFAULT = true;
+
+  /**
+   * Whether using rename semantic of object storage during rename files, otherwise using
+   * copy + delete.
+   * Please ensure that the object storage support and enable rename semantic and before enable it,
+   * and also ensure grant rename permission to the requester.
+   * If you are using TOS, you have to send putBucketRename request before sending rename request,
+   * otherwise MethodNotAllowed exception will be thrown.
+   */
+  public static final String OBJECT_RENAME_ENABLED = "fs.tos.rename.enabled";
+  public static final boolean OBJECT_RENAME_ENABLED_DEFAULT = false;
 }
 }

+ 186 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DefaultFsOps.java

@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.ops;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.tosfs.RawFileStatus;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.Constants;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.object.request.ListObjectsRequest;
+import org.apache.hadoop.fs.tosfs.object.response.ListObjectsResponse;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Function;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.function.Predicate;
+
+import static org.apache.hadoop.fs.tosfs.object.ObjectUtils.SLASH;
+
+/**
+ * Provides rename, delete, list capabilities for general purpose bucket.
+ */
+public class DefaultFsOps implements FsOps {
+  private final ObjectStorage storage;
+  private final ExecutorService taskThreadPool;
+  private final Function<ObjectInfo, RawFileStatus> objMapper;
+  private final RenameOp renameOp;
+  private final boolean asyncCreateParentDir;
+
+  public DefaultFsOps(
+      ObjectStorage storage,
+      Configuration conf,
+      ExecutorService taskThreadPool,
+      Function<ObjectInfo, RawFileStatus> objMapper) {
+    this.storage = storage;
+    this.taskThreadPool = taskThreadPool;
+    this.objMapper = objMapper;
+    this.renameOp = new RenameOp(conf, storage, taskThreadPool);
+    this.asyncCreateParentDir = conf.getBoolean(ConfKeys.ASYNC_CREATE_MISSED_PARENT,
+        ConfKeys.ASYNC_CREATE_MISSED_PARENT_DEFAULT);
+  }
+
+  @Override
+  public void renameFile(Path src, Path dst, long length) {
+    renameOp.renameFile(src, dst, length);
+    mkdirIfNecessary(src.getParent(), asyncCreateParentDir);
+  }
+
+  @Override
+  public void renameDir(Path src, Path dst) {
+    renameOp.renameDir(src, dst);
+    mkdirIfNecessary(src.getParent(), asyncCreateParentDir);
+  }
+
+  @Override
+  public void deleteFile(Path file) {
+    storage.delete(ObjectUtils.pathToKey(file));
+    mkdirIfNecessary(file.getParent(), asyncCreateParentDir);
+  }
+
+  @Override
+  public void deleteDir(Path dir, boolean recursive) throws IOException {
+    String dirKey = ObjectUtils.pathToKey(dir, true);
+    if (recursive) {
+      storage.deleteAll(dirKey);
+    } else {
+      if (isEmptyDirectory(dir)) {
+        storage.delete(dirKey);
+      } else {
+        throw new PathIsNotEmptyDirectoryException(dir.toString());
+      }
+    }
+  }
+
+  @Override
+  public Iterable<RawFileStatus> listDir(Path dir, boolean recursive, Predicate<String> postFilter) {
+    String key = ObjectUtils.pathToKey(dir, true);
+    String delimiter = recursive ? null : SLASH;
+
+    ListObjectsRequest req = ListObjectsRequest.builder()
+        .prefix(key)
+        .startAfter(key)
+        .delimiter(delimiter)
+        .build();
+    return Iterables.transform(asObjectInfo(storage.list(req), postFilter), objMapper);
+  }
+
+  @Override
+  public boolean isEmptyDirectory(Path dir) {
+    String key = ObjectUtils.pathToKey(dir, true);
+    ListObjectsRequest req = ListObjectsRequest.builder()
+        .prefix(key)
+        .startAfter(key)
+        .delimiter(SLASH)
+        .maxKeys(1)
+        .build();
+    return !asObjectInfo(storage.list(req), s -> true).iterator().hasNext();
+  }
+
+  @Override
+  public void mkdirs(Path dir) {
+    if (dir.isRoot()) {
+      return;
+    }
+    String key = ObjectUtils.pathToKey(dir, true);
+    storage.put(key, new byte[0]);
+
+    // Create parent dir if missed.
+    Path parentPath = dir.getParent();
+    String parentKey = ObjectUtils.pathToKey(parentPath, true);
+    while (!parentPath.isRoot() && storage.head(parentKey) == null) {
+      storage.put(parentKey, new byte[0]);
+      parentPath = parentPath.getParent();
+      parentKey = ObjectUtils.pathToKey(parentPath, true);
+    }
+  }
+
+  private void mkdirIfNecessary(Path path, boolean async) {
+    if (path != null) {
+      CommonUtils.runQuietly(() -> {
+        Future<?> future = taskThreadPool.submit(() -> {
+          String key = ObjectUtils.pathToKey(path, true);
+          if (!key.isEmpty() && storage.head(key) == null) {
+            mkdirs(ObjectUtils.keyToPath(key));
+          }
+        });
+
+        if (!async) {
+          future.get();
+        }
+      });
+    }
+  }
+
+  /**
+   * Convert ListObjectResponse iterable to FileStatus iterable,
+   * using file status acceptor to filter the expected objects and common prefixes.
+   *
+   * @param listResponses the iterable of ListObjectsResponse
+   * @param filter        the file status acceptor
+   * @return the iterable of TosFileStatus
+   */
+  private Iterable<ObjectInfo> asObjectInfo(Iterable<ListObjectsResponse> listResponses, Predicate<String> filter) {
+    Iterable<List<ObjectInfo>> results = Iterables.transform(listResponses, listResp -> {
+      List<ObjectInfo> objs = Lists.newArrayList();
+
+      // Add object files.
+      objs.addAll(listResp.objects());
+
+      // Add object directories.
+      for (String prefix : listResp.commonPrefixes()) {
+        objs.add(new ObjectInfo(prefix, 0, new Date(), Constants.MAGIC_CHECKSUM, true));
+      }
+
+      return objs;
+    });
+
+    return Iterables.filter(Iterables.concat(results), o -> filter.test(o.key()));
+  }
+}

+ 107 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/DirectoryFsOps.java

@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.ops;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException;
+import org.apache.hadoop.fs.tosfs.RawFileStatus;
+import org.apache.hadoop.fs.tosfs.object.DirectoryStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Function;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+/**
+ * Provides rename, delete, list capabilities for directory bucket.
+ */
+public class DirectoryFsOps implements FsOps {
+  private final DirectoryStorage storage;
+  private final Function<ObjectInfo, RawFileStatus> objMapper;
+
+  public DirectoryFsOps(DirectoryStorage storage, Function<ObjectInfo, RawFileStatus> objMapper) {
+    this.storage = storage;
+    this.objMapper = objMapper;
+  }
+
+  @Override
+  public void renameFile(Path src, Path dst, long length) {
+    innerRename(src, dst, false);
+  }
+
+  @Override
+  public void renameDir(Path src, Path dst) {
+    innerRename(src, dst, true);
+  }
+
+  private void innerRename(Path src, Path dst, boolean isDir) {
+    // Need to ensure the dest parent exist before rename file in directory bucket.
+    String dstParentKey = ObjectUtils.pathToKey(dst.getParent(), true);
+    if (!dstParentKey.isEmpty() && storage.head(dstParentKey) == null) {
+      mkdirs(dst.getParent());
+    }
+
+    String srcKey = ObjectUtils.pathToKey(src, isDir);
+    String dstKey = ObjectUtils.pathToKey(dst, isDir);
+    storage.rename(srcKey, dstKey);
+  }
+
+  @Override
+  public void deleteFile(Path file) {
+    storage.delete(ObjectUtils.pathToKey(file));
+  }
+
+  @Override
+  public void deleteDir(Path dir, boolean recursive) throws IOException {
+    String dirKey = ObjectUtils.pathToKey(dir, true);
+    if (recursive) {
+      storage.deleteAll(dirKey);
+    } else {
+      if (isEmptyDirectory(dir)) {
+        storage.delete(dirKey);
+      } else {
+        throw new PathIsNotEmptyDirectoryException(dir.toString());
+      }
+    }
+  }
+
+  @Override
+  public Iterable<RawFileStatus> listDir(Path dir, boolean recursive, Predicate<String> postFilter) {
+    String key = ObjectUtils.pathToKey(dir, true);
+    Iterable<ObjectInfo> objs = Iterables.filter(storage.listDir(key, recursive), obj -> postFilter.test(obj.key()));
+    return Iterables.transform(objs, objMapper);
+  }
+
+  @Override
+  public boolean isEmptyDirectory(Path dir) {
+    return storage.isEmptyDir(ObjectUtils.pathToKey(dir, true));
+  }
+
+  @Override
+  public void mkdirs(Path dir) {
+    if (dir.isRoot()) {
+      return;
+    }
+    String key = ObjectUtils.pathToKey(dir, true);
+    // Directory bucket will create missed parent dirs automatically.
+    storage.put(key, new byte[0]);
+  }
+}

+ 88 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/FsOps.java

@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.ops;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.RawFileStatus;
+
+import java.io.IOException;
+import java.util.function.Predicate;
+
+public interface FsOps {
+
+  /**
+   * Rename file from source path to dest path.
+   *
+   * @param src    the source path.
+   * @param dst    the dest path.
+   * @param length the length of source file.
+   * @throws IOException if any io error happen.
+   */
+  void renameFile(Path src, Path dst, long length) throws IOException;
+
+  /**
+   * Rename dir from source path to dest path.
+   *
+   * @param src the source path.
+   * @param dst the dest path.
+   * @throws IOException if any io error happen.
+   */
+  void renameDir(Path src, Path dst) throws IOException;
+
+  /**
+   * Delete the given file.
+   *
+   * @param file the given file path.
+   * @throws IOException if any io error happen.
+   */
+  void deleteFile(Path file) throws IOException;
+
+  /**
+   * Delete the given dir.
+   *
+   * @param dir       the given dir path.
+   * @param recursive indicate whether delete dir recursively.
+   * @throws IOException if any io error happen.
+   */
+  void deleteDir(Path dir, boolean recursive) throws IOException;
+
+  /**
+   * List the sub dirs and files with given dir.
+   * Return empty collection if the path doesn't exist, or is a file, or is an empty dir.
+   *
+   * @param dir        the listed path.
+   * @param recursive  indicated whether list all sub dirs/files or not.
+   * @param postFilter filter the result after getting listing response.
+   * @return the status of sub dirs and files.
+   */
+  Iterable<RawFileStatus> listDir(Path dir, boolean recursive, Predicate<String> postFilter);
+
+  /**
+   * @return true if path don't have any children.
+   */
+  boolean isEmptyDirectory(Path dir);
+
+  /**
+   * Create dir and parent dirs if don't exist.
+   *
+   * @param dir the dir to be created.
+   * @throws IOException if any io error happen.
+   */
+  void mkdirs(Path dir) throws IOException;
+}

+ 221 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/ops/RenameOp.java

@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.ops;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.common.Tasks;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class RenameOp {
+  private static final Logger LOG = LoggerFactory.getLogger(RenameOp.class);
+  private static final int RENAME_RETRY_TIMES = 3;
+
+  private final Configuration conf;
+  private final ObjectStorage storage;
+  private final ExecutorService renamePool;
+  // Whether enable object storage atomic rename object capability.
+  private final boolean renameObjectEnabled;
+
+  public RenameOp(Configuration conf, ObjectStorage storage, ExecutorService taskThreadPool) {
+    this.conf = conf;
+    this.storage = storage;
+    this.renamePool = taskThreadPool;
+    this.renameObjectEnabled =
+        conf.getBoolean(ConfKeys.OBJECT_RENAME_ENABLED, ConfKeys.OBJECT_RENAME_ENABLED_DEFAULT);
+  }
+
+  public void renameDir(Path src, Path dst) {
+    String srcKey = ObjectUtils.pathToKey(src, true);
+    String dstKey = ObjectUtils.pathToKey(dst, true);
+    renameDir(srcKey, dstKey);
+  }
+
+  public void renameFile(Path src, Path dst, long length) {
+    String srcKey = ObjectUtils.pathToKey(src, false);
+    String dstKey = ObjectUtils.pathToKey(dst, false);
+    renameFile(srcKey, dstKey, length);
+  }
+
+  /**
+   * Renames each object after listing all objects with given src key via renaming semantic if object storage
+   * supports atomic rename semantic, otherwise renaming all objects via copy & delete.
+   *
+   * @param srcKey the source dir key, ending with slash.
+   * @param dstKey the destination parent dir key, ending with slash.
+   */
+  private void renameDir(String srcKey, String dstKey) {
+    Iterable<ObjectInfo> objs = storage.listAll(srcKey, "");
+    if (renameObjectEnabled) {
+      Tasks.foreach(objs)
+          .executeWith(renamePool)
+          .throwFailureWhenFinished()
+          .retry(RENAME_RETRY_TIMES)
+          .revertWith(sourceInfo -> {
+            String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length());
+            String newSrcKey = sourceInfo.key();
+            LOG.debug("Try to rollback dest key {} to source key {}", newDstKey, newSrcKey);
+
+            storage.rename(newDstKey, newSrcKey);
+          })
+          .run(sourceInfo -> {
+            String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length());
+            String newSrcKey = sourceInfo.key();
+            LOG.debug("Try to rename src key {} to dest key {}", newSrcKey, newDstKey);
+
+            storage.rename(newSrcKey, newDstKey);
+          });
+    } else {
+      Tasks.foreach(objs)
+          .executeWith(renamePool)
+          .throwFailureWhenFinished()
+          .retry(RENAME_RETRY_TIMES)
+          .revertWith(sourceInfo -> {
+            String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length());
+            storage.delete(newDstKey);
+          })
+          .run(sourceInfo -> {
+            String newDstKey = dstKey + sourceInfo.key().substring(srcKey.length());
+            LOG.debug("Try to rename src key {} to dest key {}", sourceInfo.key(), newDstKey);
+
+            try {
+              if (ObjectInfo.isDir(newDstKey)) {
+                mkdir(newDstKey);
+              } else {
+                copyFile(sourceInfo.key(), newDstKey, sourceInfo.size());
+              }
+            } catch (IOException e) {
+              throw new UncheckedIOException(String.format("Failed to copy source file %s to dest file %s",
+                  sourceInfo.key(), newDstKey), e);
+            }
+          });
+
+      // Delete all the source keys, since we've already copied them into destination keys.
+      storage.deleteAll(srcKey);
+    }
+  }
+
+  private void renameFile(String srcKey, String dstKey, long fileSize) {
+    if (renameObjectEnabled) {
+      storage.rename(srcKey, dstKey);
+    } else {
+      Tasks.foreach(0)
+          .throwFailureWhenFinished()
+          .retry(RENAME_RETRY_TIMES)
+          .revertWith(obj -> storage.delete(dstKey))
+          .run(obj -> {
+            try {
+              copyFile(srcKey, dstKey, fileSize);
+            } catch (IOException e) {
+              throw new UncheckedIOException(String.format("Failed to copy source file %s to dest file %s",
+                  srcKey, dstKey), e);
+            }
+          });
+
+      Tasks.foreach(0)
+          .throwFailureWhenFinished()
+          .retry(RENAME_RETRY_TIMES)
+          .run(obj -> storage.delete(srcKey));
+    }
+  }
+
+  private void copyFile(String srcKey, String dstKey, long srcSize) throws IOException {
+    long byteSizePerPart = conf.getLong(ConfKeys.MULTIPART_SIZE, ConfKeys.MULTIPART_SIZE_DEFAULT);
+    long multiPartCopyThreshold =
+        conf.getLong(ConfKeys.MULTIPART_COPY_THRESHOLD, ConfKeys.MULTIPART_COPY_THRESHOLD_DEFAULT);
+    if (srcSize > multiPartCopyThreshold) {
+      uploadPartCopy(srcKey, srcSize, dstKey, byteSizePerPart);
+    } else {
+      storage.copy(srcKey, dstKey);
+    }
+  }
+
+  private void uploadPartCopy(String srcKey, long srcSize, String dstKey, long byteSizePerPart) {
+    final MultipartUpload multipartUpload = storage.createMultipartUpload(dstKey);
+    try {
+      Preconditions.checkState(byteSizePerPart >= multipartUpload.minPartSize(),
+          "Configured upload part size %s must be greater than or equals to the minimal part size %s,"
+              + " please check configure key %s.",
+          byteSizePerPart, multipartUpload.minPartSize(), ConfKeys.MULTIPART_SIZE.format(storage.scheme()));
+
+      AtomicInteger partNumGetter = new AtomicInteger(0);
+      List<CompletableFuture<Part>> results = Lists.newArrayList();
+      for (long start = 0, end; start < srcSize; start += byteSizePerPart) {
+        end = Math.min(start + byteSizePerPart, srcSize) - 1;
+        Preconditions.checkArgument(end >= 0, "Invalid copy range start: %s, end: %s", start, end);
+        // Submit upload part copy task to the thread pool.
+        CompletableFuture<Part> result = asyncUploadPartCopy(srcKey, multipartUpload,
+            partNumGetter.incrementAndGet(), start, end);
+        results.add(result);
+      }
+
+      // Waiting for all the upload parts to be finished.
+      List<Part> parts = results.stream()
+          .map(CompletableFuture::join)
+          .sorted(Comparator.comparing(Part::num))
+          .collect(Collectors.toList());
+
+      finishUpload(multipartUpload.key(), multipartUpload.uploadId(), parts);
+    } catch (Exception e) {
+      LOG.error("Encountering error when upload part copy", e);
+      CommonUtils.runQuietly(() -> storage.abortMultipartUpload(multipartUpload.key(), multipartUpload.uploadId()));
+      throw e;
+    }
+  }
+
+  protected void finishUpload(String key, String uploadId, List<Part> uploadParts) {
+    storage.completeUpload(key, uploadId, uploadParts);
+  }
+
+  private CompletableFuture<Part> asyncUploadPartCopy(
+      String srcKey, MultipartUpload multipartUpload, int partNum,
+      long copyRangeStart, long copyRangeEnd) {
+    return CompletableFuture.supplyAsync(() -> storage.uploadPartCopy(srcKey, multipartUpload.key(),
+            multipartUpload.uploadId(), partNum, copyRangeStart, copyRangeEnd), renamePool)
+        .whenComplete((part, err) -> {
+          if (err != null) {
+            LOG.error("Failed to upload part copy, src key: {}, multipartUpload: {}, partNum: {}, copy range start: " +
+                "{}, copy range end: {}", srcKey, multipartUpload, partNum, copyRangeStart, copyRangeEnd, err);
+          }
+        });
+  }
+
+  private void mkdir(String key) {
+    storage.put(key, new byte[0]);
+  }
+}