浏览代码

Integration of TOS: Add tos magic Committer.

lijinglun 7 月之前
父节点
当前提交
3f6662dd06
共有 14 个文件被更改,包括 1860 次插入0 次删除
  1. 17 0
      hadoop-cloud-storage-project/hadoop-tos/pom.xml
  2. 45 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitContext.java
  3. 359 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java
  4. 499 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Committer.java
  5. 181 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Pending.java
  6. 123 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/PendingSet.java
  7. 238 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/SuccessData.java
  8. 184 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java
  9. 43 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOps.java
  10. 40 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOpsFactory.java
  11. 54 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/RawPendingOps.java
  12. 6 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java
  13. 46 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/JsonCodec.java
  14. 25 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/Serializer.java

+ 17 - 0
hadoop-cloud-storage-project/hadoop-tos/pom.xml

@@ -43,12 +43,29 @@
       <artifactId>hadoop-common</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
       <scope>test</scope>
       <type>test-jar</type>
     </dependency>
+    <!-- Artifacts needed to bring up a Mini MR Yarn cluster-->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-examples</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-minicluster</artifactId>
+      <scope>test</scope>
+    </dependency>
 
     <dependency>
       <groupId>com.volcengine</groupId>

+ 45 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitContext.java

@@ -0,0 +1,45 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+
+import java.util.List;
+
+public class CommitContext {
+  private final List<FileStatus> pendingSets;
+  // It will be accessed in multi-threads, please access it in a thread-safe context.
+  private final List<String> destKeys;
+
+  public CommitContext(List<FileStatus> pendingSets) {
+    this.pendingSets = pendingSets;
+    this.destKeys = Lists.newArrayList();
+  }
+
+  public List<FileStatus> pendingSets() {
+    return pendingSets;
+  }
+
+  public synchronized void addDestKey(String destKey) {
+    destKeys.add(destKey);
+  }
+
+  public synchronized List<String> destKeys() {
+    return destKeys;
+  }
+}

+ 359 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java

@@ -0,0 +1,359 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.tosfs.commit.mapred.Committer;
+import org.apache.hadoop.fs.tosfs.util.Serializer;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+public class CommitUtils {
+  private CommitUtils() {
+  }
+
+  public static final String COMMITTER_NAME = Committer.class.getName();
+
+  /**
+   * Support scheme for tos committer.
+   */
+  public static final String FS_STORAGE_OBJECT_SCHEME = "fs.object-storage.scheme";
+  public static final String DEFAULT_FS_STORAGE_OBJECT_SCHEME = "tos,oss,s3,s3a,s3n,obs,filestore";
+
+  /**
+   * Path for "magic" writes: path and {@link #PENDING_SUFFIX} files: {@value}.
+   */
+  public static final String MAGIC = "__magic";
+
+  /**
+   * Marker of the start of a directory tree for calculating the final path names: {@value}.
+   */
+  public static final String BASE = "__base";
+
+  /**
+   * Suffix applied to pending commit metadata: {@value}.
+   */
+  public static final String PENDING_SUFFIX = ".pending";
+
+  /**
+   * Suffix applied to multiple pending commit metadata: {@value}.
+   */
+  public static final String PENDINGSET_SUFFIX = ".pendingset";
+
+  /**
+   * Marker file to create on success: {@value}.
+   */
+  public static final String _SUCCESS = "_SUCCESS";
+
+  /**
+   * Format string used to build a summary file from a Job ID.
+   */
+  public static final String SUMMARY_FILENAME_FORMAT = "summary-%s.json";
+
+  /**
+   * Extra Data key for task attempt in pendingset files.
+   */
+  public static final String TASK_ATTEMPT_ID = "task.attempt.id";
+
+  /**
+   * The UUID for jobs: {@value}.
+   * This was historically created in Spark 1.x's SQL queries, see SPARK-33230.
+   */
+  public static final String SPARK_WRITE_UUID = "spark.sql.sources.writeJobUUID";
+
+  /**
+   * Get the magic location for the output path.
+   * Format: ${out}/__magic
+   *
+   * @param out the base output directory.
+   * @return the location of magic job attempts.
+   */
+  public static Path magicPath(Path out) {
+    return new Path(out, MAGIC);
+  }
+
+  /**
+   * Compute the "magic" path for a job. <br>
+   * Format: ${jobOutput}/__magic/${jobId}
+   *
+   * @param jobId     unique Job ID.
+   * @param jobOutput the final output directory.
+   * @return the path to store job attempt data.
+   */
+  public static Path magicJobPath(String jobId, Path jobOutput) {
+    return new Path(magicPath(jobOutput), jobId);
+  }
+
+  /**
+   * Get the Application Attempt ID for this job.
+   *
+   * @param context the context to look in
+   * @return the Application Attempt ID for a given job, or 0
+   */
+  public static int appAttemptId(JobContext context) {
+    return context.getConfiguration().getInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 0);
+  }
+
+  /**
+   * Compute the "magic" path for a job attempt. <br>
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}
+   *
+   * @param jobId        unique Job ID.
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @param jobOutput    the final output directory.
+   * @return the path to store job attempt data.
+   */
+  public static Path magicJobAttemptPath(String jobId, int appAttemptId, Path jobOutput) {
+    return new Path(magicPath(jobOutput), formatAppAttemptDir(jobId, appAttemptId));
+  }
+
+  /**
+   * Compute the "magic" path for a job attempt. <br>
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}
+   *
+   * @param context      the context of the job.
+   * @param jobOutput    the final output directory.
+   * @return the path to store job attempt data.
+   */
+  public static Path magicJobAttemptPath(JobContext context, Path jobOutput) {
+    String jobId = buildJobId(context);
+    return magicJobAttemptPath(jobId, appAttemptId(context), jobOutput);
+  }
+
+  private static String formatAppAttemptDir(String jobId, int appAttemptId) {
+    return String.format("%s/%02d", jobId, appAttemptId);
+  }
+
+  /**
+   * Compute the path where the output of magic task attempts are stored. <br>
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks
+   *
+   * @param jobId        unique Job ID.
+   * @param jobOutput    The output path to commit work into.
+   * @param appAttemptId the ID of the application attempt for this job.
+   * @return the path where the output of magic task attempts are stored.
+   */
+  public static Path magicTaskAttemptsPath(String jobId, Path jobOutput, int appAttemptId) {
+    return new Path(magicJobAttemptPath(jobId, appAttemptId, jobOutput), "tasks");
+  }
+
+  /**
+   * Compute the path where the output of a task attempt is stored until that task is committed.
+   * This path is marked as a base path for relocations, so subdirectory information is preserved.
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}/__base
+   *
+   * @param context   the context of the task attempt.
+   * @param jobId     unique Job ID.
+   * @param jobOutput The output path to commit work into.
+   * @return the path where a task attempt should be stored.
+   */
+  public static Path magicTaskAttemptBasePath(TaskAttemptContext context, String jobId, Path jobOutput) {
+    return new Path(magicTaskAttemptPath(context, jobId, jobOutput), BASE);
+  }
+
+  /**
+   * Compute the path where the output of a task attempt is stored until that task is committed.
+   * This path is marked as a base path for relocations, so subdirectory information is preserved.
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}/__base
+   *
+   * @param context   the context of the task attempt.
+   * @param jobOutput The output path to commit work into.
+   * @return the path where a task attempt should be stored.
+   */
+  public static Path magicTaskAttemptBasePath(TaskAttemptContext context, Path jobOutput) {
+    String jobId = buildJobId(context);
+    return magicTaskAttemptBasePath(context, jobId, jobOutput);
+  }
+
+  /**
+   * Get the magic task attempt path, without any annotations to mark relative references.
+   * If there is an app attempt property in the context configuration, that is included.
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}
+   *
+   * @param context   the context of the task attempt.
+   * @param jobId     unique Job ID.
+   * @param jobOutput The output path to commit work into.
+   * @return the path under which all attempts go.
+   */
+  public static Path magicTaskAttemptPath(TaskAttemptContext context, String jobId, Path jobOutput) {
+    return new Path(
+        magicTaskAttemptsPath(jobId, jobOutput, appAttemptId(context)),
+        String.valueOf(context.getTaskAttemptID()));
+  }
+
+  /**
+   * Get the magic task attempt path, without any annotations to mark relative references.
+   * If there is an app attempt property in the context configuration, that is included.
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/tasks/${taskAttemptId}
+   *
+   * @param context   the context of the task attempt.
+   * @param jobOutput The output path to commit work into.
+   * @return the path under which all attempts go.
+   */
+  public static Path magicTaskAttemptPath(TaskAttemptContext context, Path jobOutput) {
+    String jobId = buildJobId(context);
+    return magicTaskAttemptPath(context, jobId, jobOutput);
+  }
+
+  /**
+   * Get the magic task pendingset path.
+   * Format: ${jobOutput}/__magic/${jobId}/${appAttemptId}/${taskId}.pendingset
+   *
+   * @param context   the context of the task attempt.
+   * @param jobOutput The output path to commit work into.
+   */
+  public static Path magicTaskPendingSetPath(TaskAttemptContext context, Path jobOutput) {
+    String taskId = String.valueOf(context.getTaskAttemptID().getTaskID());
+    return new Path(magicJobAttemptPath(context, jobOutput), String.format("%s%s", taskId, PENDINGSET_SUFFIX));
+  }
+
+  public static String buildJobId(Configuration conf, JobID jobId) {
+    String jobUUID = conf.getTrimmed(SPARK_WRITE_UUID, "");
+    if (!jobUUID.isEmpty()) {
+      if (jobUUID.startsWith(JobID.JOB)) {
+        return jobUUID;
+      } else {
+        return String.format("%s_%s", JobID.JOB, jobUUID);
+      }
+    }
+
+    // if no other option was supplied, return the job ID.
+    // This is exactly what MR jobs expect, but is not what
+    // Spark jobs can do as there is a risk of jobID collision.
+    return jobId != null ? jobId.toString() : "NULL_JOB_ID";
+  }
+
+  public static String buildJobId(JobContext context) {
+    return buildJobId(context.getConfiguration(), context.getJobID());
+  }
+
+  /**
+   * Get a job name; returns meaningful text if there is no name.
+   *
+   * @param context job context
+   * @return a string for logs
+   */
+  public static String jobName(JobContext context) {
+    String name = context.getJobName();
+    return (name != null && !name.isEmpty()) ? name : "(anonymous)";
+  }
+
+  /**
+   * Format: ${output}/_SUCCESS
+   */
+  public static Path successMarker(Path output) {
+    return new Path(output, _SUCCESS);
+  }
+
+  /**
+   * Format: ${reportDir}/summary-xxxxx.json
+   */
+  public static Path summaryReport(Path reportDir, String jobId) {
+    return new Path(reportDir, String.format(SUMMARY_FILENAME_FORMAT, jobId));
+  }
+
+  public static void save(FileSystem fs, Path path, byte[] data) throws IOException {
+    // By default, fs.create(path) will create parent folder recursively, and overwrite
+    // it if it's already exist.
+    try (FSDataOutputStream out = fs.create(path)) {
+      IOUtils.copy(new ByteArrayInputStream(data), out);
+    }
+  }
+
+  public static void save(FileSystem fs, Path path, Serializer instance) throws IOException {
+    save(fs, path, instance.serialize());
+  }
+
+  public static byte[] load(FileSystem fs, Path path) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream();
+    try (FSDataInputStream in = fs.open(path)) {
+      IOUtils.copy(in, out);
+    }
+    return out.toByteArray();
+  }
+
+  public static List<FileStatus> listPendingFiles(FileSystem fs, Path dir) throws IOException {
+    List<FileStatus> pendingFiles = Lists.newArrayList();
+    CommitUtils.listFiles(fs, dir, true, f -> {
+      if (f.getPath().toString().endsWith(CommitUtils.PENDING_SUFFIX)) {
+        pendingFiles.add(f);
+      }
+    });
+    return pendingFiles;
+  }
+
+  public static void listFiles(FileSystem fs, Path dir, boolean recursive, FileVisitor visitor) throws IOException {
+    RemoteIterator<LocatedFileStatus> iter = fs.listFiles(dir, recursive);
+    while (iter.hasNext()) {
+      FileStatus f = iter.next();
+      visitor.visit(f);
+    }
+  }
+
+  public interface FileVisitor {
+    void visit(FileStatus f);
+  }
+
+  public static boolean supportProtonCommit(Configuration conf, Path outputPath) {
+    return supportSchemes(conf).contains(outputPath.toUri().getScheme());
+  }
+
+  private static List<String> supportSchemes(Configuration conf) {
+    String schemes = conf.get(FS_STORAGE_OBJECT_SCHEME, DEFAULT_FS_STORAGE_OBJECT_SCHEME);
+    Preconditions.checkNotNull(schemes, "%s cannot be null", FS_STORAGE_OBJECT_SCHEME);
+    return Arrays.asList(schemes.split(","));
+  }
+
+  private static Set<String> errorStage = new HashSet<>();
+  private static boolean testMode = false;
+
+  public static void injectError(String stage) {
+    errorStage.add(stage);
+    testMode = true;
+  }
+
+  public static void removeError(String stage) {
+    errorStage.remove(stage);
+  }
+
+  public static <T extends Exception> void triggerError(Supplier<T> error, String stage) throws T {
+    if (testMode && errorStage.contains(stage)) {
+      throw error.get();
+    }
+  }
+}

+ 499 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Committer.java

@@ -0,0 +1,499 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.commit.ops.PendingOps;
+import org.apache.hadoop.fs.tosfs.commit.ops.PendingOpsFactory;
+import org.apache.hadoop.fs.tosfs.common.Tasks;
+import org.apache.hadoop.fs.tosfs.common.ThreadPools;
+import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+
+public class Committer extends PathOutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(Committer.class);
+
+  public static final String COMMITTER_THREADS = "fs.job.committer.threads";
+  public static final String COMMITTER_SUMMARY_REPORT_DIR = "fs.job.committer.summary.report.directory";
+  public static final int DEFAULT_COMMITTER_THREADS = Runtime.getRuntime().availableProcessors();
+  public static final String THREADS_PREFIX = "job-committer-thread-pool";
+
+  private final String jobId;
+  private final Path outputPath;
+  // This is the directory for all intermediate work, where the output format will write data.
+  // This may not be on the final file system
+  private Path workPath;
+  private final String role;
+  private final Configuration conf;
+  private final FileSystem destFs;
+  private final ObjectStorage storage;
+  private final PendingOps ops;
+
+  public Committer(Path outputPath, TaskAttemptContext context) throws IOException {
+    this(outputPath, context, String.format("Task committer %s", context.getTaskAttemptID()));
+    this.workPath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
+    LOG.info("Task attempt {} has work path {}", context.getTaskAttemptID(), getWorkPath());
+  }
+
+  public Committer(Path outputPath, JobContext context) throws IOException {
+    this(outputPath, context, String.format("Job committer %s", context.getJobID()));
+  }
+
+  private Committer(Path outputPath, JobContext context, String role) throws IOException {
+    super(outputPath, context);
+    this.jobId = CommitUtils.buildJobId(context);
+    this.outputPath = outputPath;
+    this.role = role;
+    this.conf = context.getConfiguration();
+    this.destFs = outputPath.getFileSystem(conf);
+    LOG.info("{} instantiated for job '{}' ID {} with destination {}",
+        role,
+        CommitUtils.jobName(context),
+        jobId, outputPath);
+    // Initialize the object storage.
+    this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), conf);
+    this.ops = PendingOpsFactory.create(destFs, storage);
+  }
+
+  @Override
+  public Path getOutputPath() {
+    return outputPath;
+  }
+
+  @Override
+  public Path getWorkPath() {
+    return workPath;
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    checkJobId(context);
+    LOG.info("Setup Job {}", jobId);
+    Path jobOutput = getOutputPath();
+
+    // delete the success marker if exists.
+    destFs.delete(CommitUtils.successMarker(jobOutput), false);
+
+    // create the destination directory.
+    destFs.mkdirs(jobOutput);
+
+    logUncompletedMPUIfPresent(jobOutput);
+
+    // Reset the job path, and create the job path with job attempt sub path.
+    Path jobPath = CommitUtils.magicJobPath(jobId, outputPath);
+    Path jobAttemptPath = CommitUtils.magicJobAttemptPath(context, outputPath);
+    destFs.delete(jobPath, true);
+    destFs.mkdirs(jobAttemptPath);
+  }
+
+  private void logUncompletedMPUIfPresent(Path jobOutput) {
+    // do a scan and add warn log message for active uploads.
+    int nums = 0;
+    for (MultipartUpload upload : storage.listUploads(ObjectUtils.pathToKey(jobOutput, true))) {
+      if (nums++ > 10) {
+        LOG.warn("There are more than 10 uncompleted multipart uploads under path {}.", jobOutput);
+        break;
+      }
+      LOG.warn("Uncompleted multipart upload {} is under path {}, either jobs are running concurrently "
+          + "or failed jobs are not being cleaned up.", upload, jobOutput);
+    }
+  }
+
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    checkJobId(context);
+    LOG.info("{}: committing job {}", role, jobId);
+    String stage = null;
+    Exception failure = null;
+    SuccessData successData = null;
+
+    ExecutorService threadPool = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads());
+    List<FileStatus> pendingSets = Lists.newArrayList();
+    try {
+      // Step.1 List active pending commits.
+      stage = "preparing";
+      CommitUtils.listFiles(destFs, CommitUtils.magicJobAttemptPath(context, outputPath), true, f -> {
+        if (f.getPath().toString().endsWith(CommitUtils.PENDINGSET_SUFFIX)) {
+          pendingSets.add(f);
+        }
+      });
+
+      // Step.2 Load and commit those active pending commits.
+      stage = "commit";
+      CommitContext commitCtxt = new CommitContext(pendingSets);
+      loadAndCommitPendingSets(threadPool, commitCtxt);
+
+      // Step.3 Save the success marker.
+      stage = "marker";
+      successData = createSuccessData(commitCtxt.destKeys());
+      CommitUtils.triggerError(() -> new IOException("Mock error of success marker."), stage);
+      CommitUtils.save(destFs, CommitUtils.successMarker(outputPath), successData);
+
+      // Step.4 Abort those orphan multipart uploads and cleanup the staging dir.
+      stage = "clean";
+      cleanup(threadPool, true);
+    } catch (Exception e) {
+      failure = e;
+      LOG.warn("Commit failure for job {} stage {}", CommitUtils.buildJobId(context), stage, e);
+
+      // Revert all pending sets when marker step fails.
+      if (stage.equals("marker")) {
+        CommonUtils.runQuietly(
+            () -> loadAndRevertPendingSets(threadPool, new CommitContext(pendingSets)));
+      }
+      CommonUtils.runQuietly(() -> cleanup(threadPool, true));
+      throw e;
+    } finally {
+      saveSummaryReportQuietly(stage, context, successData, failure);
+      CommonUtils.runQuietly(threadPool::shutdown);
+
+      cleanupResources();
+    }
+  }
+
+  private SuccessData createSuccessData(Iterable<String> filenames) {
+    SuccessData data = SuccessData.builder()
+        .setName(SuccessData.class.getName())
+        .setCommitter(CommitUtils.COMMITTER_NAME)
+        .setTimestamp(System.currentTimeMillis())
+        .setHostname(NetUtils.getHostname())
+        .setDescription(role)
+        .setJobId(jobId)
+        .addFileNames(filenames)
+        .build();
+
+    data.addDiagnosticInfo(COMMITTER_THREADS, Integer.toString(commitThreads()));
+    return data;
+  }
+
+  private void saveSummaryReportQuietly(String activeStage, JobContext context, SuccessData report, Throwable thrown) {
+    Configuration jobConf = context.getConfiguration();
+    String reportDir = jobConf.get(COMMITTER_SUMMARY_REPORT_DIR, "");
+    if (reportDir.isEmpty()) {
+      LOG.debug("Summary directory conf: {} is not set", COMMITTER_SUMMARY_REPORT_DIR);
+      return;
+    }
+
+    Path path = CommitUtils.summaryReport(new Path(reportDir), jobId);
+    LOG.debug("Summary report path is {}", path);
+
+    try {
+      if (report == null) {
+        report = createSuccessData(null);
+      }
+      if (thrown != null) {
+        report.recordJobFailure(thrown);
+      }
+      report.addDiagnosticInfo("stage", activeStage);
+
+      CommitUtils.save(path.getFileSystem(jobConf), path, report);
+      LOG.info("Job summary saved to {}", path);
+    } catch (Exception e) {
+      LOG.warn("Failed to save summary to {}", path, e);
+    }
+  }
+
+  private void loadAndCommitPendingSets(ExecutorService outerPool, CommitContext commitContext) {
+    ExecutorService innerPool = ThreadPools.newWorkerPool("commit-pending-files-pool", commitThreads());
+    try {
+      Tasks.foreach(commitContext.pendingSets())
+          .stopOnFailure()
+          .throwFailureWhenFinished()
+          .executeWith(outerPool)
+          .abortWith(pendingSet -> loadAndAbort(innerPool, pendingSet))
+          .revertWith(pendingSet -> loadAndRevert(innerPool, pendingSet))
+          .run(pendingSet -> loadAndCommit(commitContext, innerPool, pendingSet));
+    } finally {
+      CommonUtils.runQuietly(innerPool::shutdown);
+    }
+  }
+
+  private void loadAndRevertPendingSets(ExecutorService outerPool, CommitContext commitContext) {
+    Tasks.foreach(commitContext.pendingSets())
+        .throwFailureWhenFinished()
+        .executeWith(outerPool)
+        .run(pendingSet -> loadAndRevert(outerPool, pendingSet));
+  }
+
+  /**
+   * Load {@link PendingSet} from file and abort those {@link Pending} commits.
+   */
+  private void loadAndAbort(ExecutorService pool, FileStatus pendingSetFile) {
+    PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile);
+    Tasks.foreach(pendingSet.commits())
+        .suppressFailureWhenFinished()
+        .executeWith(pool)
+        .run(ops::abort);
+  }
+
+  /**
+   * Load {@link PendingSet} from file and revert those {@link Pending} commits.
+   */
+  private void loadAndRevert(ExecutorService pool, FileStatus pendingSetFile) {
+    PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile);
+    Tasks.foreach(pendingSet.commits())
+        .suppressFailureWhenFinished()
+        .executeWith(pool)
+        .run(ops::revert);
+  }
+
+  /**
+   * Load {@link PendingSet} from file and commit those {@link Pending} commits.
+   */
+  private void loadAndCommit(CommitContext commitCtxt, ExecutorService pool, FileStatus pendingSetFile) {
+    PendingSet pendingSet = PendingSet.deserialize(destFs, pendingSetFile);
+    // Verify that whether the job id is matched.
+    String jobId = pendingSet.jobId();
+    if (!StringUtils.isNoneEmpty(jobId) && !Objects.equals(jobId, jobId())) {
+      throw new IllegalStateException(String.format("Mismatch in Job ID (%s) and commit job ID (%s)", jobId(), jobId));
+    }
+
+    Tasks.foreach(pendingSet.commits())
+        .stopOnFailure()
+        .throwFailureWhenFinished()
+        .executeWith(pool)
+        .onFailure((pending, exception) -> ops.abort(pending))
+        .abortWith(ops::abort)
+        .revertWith(ops::revert)
+        .run(pending -> {
+          ops.commit(pending);
+          commitCtxt.addDestKey(pending.destKey());
+        });
+  }
+
+  @Override
+  public void abortJob(JobContext context, JobStatus.State state) {
+    checkJobId(context);
+    LOG.info("{}: aborting job {} in state {}", role, jobId, state);
+    ExecutorService service = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads());
+    try {
+      cleanup(service, false);
+    } finally {
+      service.shutdown();
+
+      cleanupResources();
+    }
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    checkJobId(context);
+    LOG.info("Setup Task {}", context.getTaskAttemptID());
+    Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
+    // Delete the task attempt path if somehow it was there.
+    destFs.delete(taskAttemptBasePath, true);
+    // Make an empty directory.
+    destFs.mkdirs(taskAttemptBasePath);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext taskContext) {
+    return true;
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    checkJobId(context);
+    LOG.info("Commit task {}", context);
+    ExecutorService pool = ThreadPools.newWorkerPool(THREADS_PREFIX, commitThreads());
+    try {
+      PendingSet commits = innerCommitTask(pool, context);
+      LOG.info("Task {} committed {} files", context.getTaskAttemptID(), commits.size());
+    } catch (IOException e) {
+      LOG.error("Failed to commit task {}", context.getTaskAttemptID(), e);
+      throw e;
+    } finally {
+      // Shutdown the thread pool quietly.
+      CommonUtils.runQuietly(pool::shutdown);
+
+      // Delete the task attempt path quietly.
+      Path taskAttemptPath = CommitUtils.magicTaskAttemptPath(context, outputPath);
+      LOG.info("Delete task attempt path {}", taskAttemptPath);
+      CommonUtils.runQuietly(() -> destFs.delete(taskAttemptPath, true));
+    }
+  }
+
+  private PendingSet innerCommitTask(ExecutorService pool, TaskAttemptContext context) throws IOException {
+    Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
+    PendingSet pendingSet = new PendingSet(jobId);
+    try {
+      // Load the pending files and fill them into the pending set.
+      List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(destFs, taskAttemptBasePath);
+      // Use the thread-safe collection to collect the pending list.
+      List<Pending> pendings = Collections.synchronizedList(Lists.newArrayList());
+      Tasks.foreach(pendingFiles)
+          .throwFailureWhenFinished()
+          .executeWith(pool)
+          .run(f -> {
+            try {
+              byte[] data = CommitUtils.load(destFs, f.getPath());
+              pendings.add(Pending.deserialize(data));
+            } catch (IOException e) {
+              LOG.warn("Failed to load .pending file {}", f.getPath(), e);
+              throw new UncheckedIOException(e);
+            }
+          });
+      pendingSet.addAll(pendings);
+
+      // Add the extra task attempt id property.
+      String taskId = String.valueOf(context.getTaskAttemptID());
+      pendingSet.addExtraData(CommitUtils.TASK_ATTEMPT_ID, taskId);
+
+      // Save the pending set to file system.
+      Path taskOutput = CommitUtils.magicTaskPendingSetPath(context, outputPath);
+      LOG.info("Saving work of {} to {}", taskId, taskOutput);
+      CommitUtils.save(destFs, taskOutput, pendingSet.serialize());
+
+    } catch (Exception e) {
+      LOG.error("Encounter error when loading pending set from {}", taskAttemptBasePath, e);
+      if (!pendingSet.commits().isEmpty()) {
+        Tasks.foreach(pendingSet.commits())
+            .executeWith(pool)
+            .suppressFailureWhenFinished()
+            .run(ops::abort);
+      }
+      throw e;
+    }
+
+    return pendingSet;
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    checkJobId(context);
+    Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(context, outputPath);
+    try {
+      // Load the pending files from the underlying filesystem.
+      List<FileStatus> pendingFiles = CommitUtils.listPendingFiles(destFs, taskAttemptBasePath);
+      Tasks.foreach(pendingFiles)
+          .throwFailureWhenFinished()
+          .run(f -> {
+            try {
+              byte[] serializedData = CommitUtils.load(destFs, f.getPath());
+              ops.abort(Pending.deserialize(serializedData));
+            } catch (FileNotFoundException e) {
+              LOG.debug("Listed file already deleted: {}", f);
+            } catch (IOException e) {
+              throw new UncheckedIOException(e);
+            } finally {
+              final FileStatus pendingFile = f;
+              CommonUtils.runQuietly(() -> destFs.delete(pendingFile.getPath(), false));
+            }
+          });
+    } finally {
+      CommonUtils.runQuietly(() -> destFs.delete(taskAttemptBasePath, true));
+    }
+  }
+
+  @Override
+  public void recoverTask(TaskAttemptContext context) {
+    checkJobId(context);
+    String taskId = context.getTaskAttemptID().toString();
+    throw new UnsupportedOperationException(String.format("Unable to recover task %s, output: %s", taskId, outputPath));
+  }
+
+  private int commitThreads() {
+    return conf.getInt(COMMITTER_THREADS, DEFAULT_COMMITTER_THREADS);
+  }
+
+  private void cleanup(ExecutorService pool, boolean suppress) {
+    LOG.info("Cleanup the job by abort the multipart uploads and clean staging dir, suppress {}", suppress);
+    try {
+      Path jobOutput = getOutputPath();
+      Iterable<MultipartUpload> pending =
+          storage.listUploads(ObjectUtils.pathToKey(CommitUtils.magicJobPath(jobId, jobOutput), true));
+      Tasks.foreach(pending)
+          .executeWith(pool)
+          .suppressFailureWhenFinished()
+          .run(u -> storage.abortMultipartUpload(u.key(), u.uploadId()));
+    } catch (Exception e) {
+      if (suppress) {
+        LOG.error("The following exception has been suppressed when cleanup job", e);
+      } else {
+        throw e;
+      }
+    } finally {
+      CommonUtils.runQuietly(this::cleanupStagingDir);
+    }
+  }
+
+  private void cleanupStagingDir() throws IOException {
+    // Note: different jobs share the same __magic folder, like,
+    // tos://bucket/path/to/table/__magic/job-A/..., and
+    // tos://bucket/path/to/table/__magic/job-B/...
+    // Job should only delete its own job folder to avoid the failure of other jobs,
+    // and, folder __magic should be deleted by the last job.
+    // This design does not assure the security of two jobs that one job founds there
+    // isn't another job be running, however, when it is deleting __magic but another
+    // job will visit it at the same time. We think the probability is low and we don't
+    // deal with it.
+    destFs.delete(CommitUtils.magicJobPath(jobId, outputPath), true);
+    Path magicPath = CommitUtils.magicPath(outputPath);
+    if (destFs.listStatus(magicPath).length == 0) {
+      destFs.delete(magicPath, true);
+    }
+  }
+
+  public String jobId() {
+    return jobId;
+  }
+
+  private void checkJobId(JobContext context) {
+    String jobIdInContext = CommitUtils.buildJobId(context);
+    Preconditions.checkArgument(Objects.equals(jobId, jobIdInContext),
+        String.format("JobId set in the context: %s is not consistent with the initial jobId of the committer: %s, "
+            + "please check you settings in your taskAttemptContext.", jobIdInContext, jobId));
+  }
+
+  private void cleanupResources() {
+    CommonUtils.runQuietly(storage::close);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("role", role)
+        .add("jobId", jobId)
+        .add("outputPath", outputPath)
+        .toString();
+  }
+}

+ 181 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/Pending.java

@@ -0,0 +1,181 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.fs.tosfs.util.JsonCodec;
+import org.apache.hadoop.fs.tosfs.util.Serializer;
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Metadata that will be serialized as json and be saved in the .pending files.
+ */
+public class Pending implements Serializer {
+  private static final JsonCodec<Pending> CODEC = new JsonCodec<>(Pending.class);
+
+  private String bucket;
+  private String destKey;
+  private String uploadId;
+  private long length;
+  private long createdTimestamp;
+  private List<Part> parts;
+
+  // No-arg constructor for json serializer, don't use.
+  public Pending() {
+  }
+
+  public Pending(
+      String bucket, String destKey,
+      String uploadId, long length,
+      long createdTimestamp, List<Part> parts) {
+    this.bucket = bucket;
+    this.destKey = destKey;
+    this.uploadId = uploadId;
+    this.length = length;
+    this.createdTimestamp = createdTimestamp;
+    this.parts = parts;
+  }
+
+  public String bucket() {
+    return bucket;
+  }
+
+  public String destKey() {
+    return destKey;
+  }
+
+  public String uploadId() {
+    return uploadId;
+  }
+
+  public long length() {
+    return length;
+  }
+
+  public long createdTimestamp() {
+    return createdTimestamp;
+  }
+
+  public List<Part> parts() {
+    return parts;
+  }
+
+  @Override
+  public byte[] serialize() throws IOException {
+    return CODEC.toBytes(this);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("bucket", bucket)
+        .add("destKey", destKey)
+        .add("uploadId", uploadId)
+        .add("length", length)
+        .add("createdTimestamp", createdTimestamp)
+        .add("uploadParts", StringUtils.join(parts, ","))
+        .toString();
+  }
+
+  public static Pending deserialize(byte[] data) throws IOException {
+    return CODEC.fromBytes(data);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(bucket, destKey, uploadId, length, createdTimestamp, parts);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof Pending)) {
+      return false;
+    }
+    Pending that = (Pending) o;
+    return Objects.equals(bucket, that.bucket)
+        && Objects.equals(destKey, that.destKey)
+        && Objects.equals(uploadId, that.uploadId)
+        && Objects.equals(length, that.length)
+        && Objects.equals(createdTimestamp, that.createdTimestamp)
+        && Objects.equals(parts, that.parts);
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+    private String bucket;
+    private String destKey;
+    private String uploadId;
+    private long length;
+    private long createdTimestamp;
+    private final List<Part> parts = Lists.newArrayList();
+
+    public Builder setBucket(String bucket) {
+      this.bucket = bucket;
+      return this;
+    }
+
+    public Builder setDestKey(String destKey) {
+      this.destKey = destKey;
+      return this;
+    }
+
+    public Builder setUploadId(String uploadId) {
+      this.uploadId = uploadId;
+      return this;
+    }
+
+    public Builder setLength(long length) {
+      this.length = length;
+      return this;
+    }
+
+    public Builder setCreatedTimestamp(long createdTimestamp) {
+      this.createdTimestamp = createdTimestamp;
+      return this;
+    }
+
+    public Builder addParts(List<Part> parts) {
+      this.parts.addAll(parts);
+      return this;
+    }
+
+    public Pending build() {
+      Preconditions.checkArgument(StringUtils.isNoneEmpty(bucket), "Empty bucket");
+      Preconditions.checkArgument(StringUtils.isNoneEmpty(destKey), "Empty object destination key");
+      Preconditions.checkArgument(StringUtils.isNoneEmpty(uploadId), "Empty uploadId");
+      Preconditions.checkArgument(length >= 0, "Invalid length: %s", length);
+      parts.forEach(part -> Preconditions.checkArgument(StringUtils.isNoneEmpty(part.eTag(), "Empty etag")));
+
+      return new Pending(
+          bucket, destKey,
+          uploadId, length,
+          createdTimestamp, parts);
+    }
+  }
+}

+ 123 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/PendingSet.java

@@ -0,0 +1,123 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.tosfs.util.JsonCodec;
+import org.apache.hadoop.fs.tosfs.util.Serializer;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class PendingSet implements Serializer {
+  private static final JsonCodec<PendingSet> CODEC = new JsonCodec<>(PendingSet.class);
+
+  private String jobId;
+  private List<Pending> pendings;
+  private Map<String, String> extraData;
+
+  // No-arg constructor for json serializer, don't use.
+  public PendingSet() {
+  }
+
+  public PendingSet(String jobId) {
+    this(jobId, Lists.newArrayList());
+  }
+
+  public PendingSet(String jobId, List<Pending> pendings) {
+    this.jobId = jobId;
+    this.pendings = Lists.newArrayList(pendings);
+    this.extraData = Maps.newHashMap();
+  }
+
+  public PendingSet addAll(Iterable<Pending> items) {
+    Iterables.addAll(pendings, items);
+    return this;
+  }
+
+  public PendingSet add(Pending pending) {
+    pendings.add(pending);
+    return this;
+  }
+
+  public PendingSet addExtraData(String key, String val) {
+    extraData.put(key, val);
+    return this;
+  }
+
+  public String jobId() {
+    return jobId;
+  }
+
+  public List<Pending> commits() {
+    return pendings;
+  }
+
+  public Map<String, String> extraData() {
+    return extraData;
+  }
+
+  public int size() {
+    return pendings.size();
+  }
+
+  @Override
+  public byte[] serialize() throws IOException {
+    return CODEC.toBytes(this);
+  }
+
+  public static PendingSet deserialize(byte[] data) {
+    try {
+      return CODEC.fromBytes(data);
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public static PendingSet deserialize(FileSystem fs, FileStatus f) {
+    try {
+      return deserialize(CommitUtils.load(fs, f.getPath()));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(jobId, pendings, extraData);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof PendingSet)) {
+      return false;
+    }
+    PendingSet that = (PendingSet) o;
+    return Objects.equals(jobId, that.jobId)
+        && Objects.equals(pendings, that.pendings)
+        && Objects.equals(extraData, that.extraData);
+  }
+}

+ 238 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/SuccessData.java

@@ -0,0 +1,238 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.fs.tosfs.util.JsonCodec;
+import org.apache.hadoop.fs.tosfs.util.Serializer;
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hadoop.thirdparty.com.google.common.base.Throwables;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+public class SuccessData implements Serializer {
+  private static final JsonCodec<SuccessData> CODEC = new JsonCodec<>(SuccessData.class);
+
+  private String name;
+  private boolean success = true;
+  private long timestamp;
+  private String date;
+  private String hostname;
+  private String committer;
+  private String description;
+  private String jobId;
+  // Filenames in the commit.
+  private final List<String> filenames = new ArrayList<>();
+
+  // Diagnostics information.
+  private final Map<String, String> diagnostics = new HashMap<>();
+
+  // No-arg constructor for json serializer, Don't use.
+  public SuccessData() {
+  }
+
+  public SuccessData(
+      String name, boolean success, long timestamp,
+      String date, String hostname, String committer,
+      String description, String jobId, List<String> filenames) {
+    this.name = name;
+    this.success = success;
+    this.timestamp = timestamp;
+    this.date = date;
+    this.hostname = hostname;
+    this.committer = committer;
+    this.description = description;
+    this.jobId = jobId;
+    this.filenames.addAll(filenames);
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public boolean success() {
+    return success;
+  }
+
+  public long timestamp() {
+    return timestamp;
+  }
+
+  public String date() {
+    return date;
+  }
+
+  public String hostname() {
+    return hostname;
+  }
+
+  public String committer() {
+    return committer;
+  }
+
+  public String description() {
+    return description;
+  }
+
+  public String jobId() {
+    return jobId;
+  }
+
+  public Map<String, String> diagnostics() {
+    return diagnostics;
+  }
+
+  public List<String> filenames() {
+    return filenames;
+  }
+
+  public void recordJobFailure(Throwable thrown) {
+    this.success = false;
+    String stacktrace = Throwables.getStackTraceAsString(thrown);
+    addDiagnosticInfo("exception", thrown.toString());
+    addDiagnosticInfo("stacktrace", stacktrace);
+  }
+
+  public void addDiagnosticInfo(String key, String value) {
+    diagnostics.put(key, value);
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("name", name)
+        .add("success", success)
+        .add("timestamp", timestamp)
+        .add("date", date)
+        .add("hostname", hostname)
+        .add("committer", committer)
+        .add("description", description)
+        .add("jobId", jobId)
+        .add("filenames", StringUtils.join(",", filenames))
+        .toString();
+  }
+
+  public static Builder builder() {
+    return new Builder();
+  }
+
+  @Override
+  public byte[] serialize() throws IOException {
+    return CODEC.toBytes(this);
+  }
+
+  public static SuccessData deserialize(byte[] data) throws IOException {
+    return CODEC.fromBytes(data);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, success, timestamp, date, hostname, committer, description, jobId, filenames);
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    } else if (!(o instanceof SuccessData)) {
+      return false;
+    }
+    SuccessData that = (SuccessData) o;
+    return Objects.equals(name, that.name)
+        && Objects.equals(success, that.success)
+        && Objects.equals(timestamp, that.timestamp)
+        && Objects.equals(date, that.date)
+        && Objects.equals(hostname, that.hostname)
+        && Objects.equals(committer, that.committer)
+        && Objects.equals(description, that.description)
+        && Objects.equals(jobId, that.jobId)
+        && Objects.equals(filenames, that.filenames);
+  }
+
+  public static class Builder {
+    private String name = SuccessData.class.getName();
+    private boolean success = true;
+    private long timestamp;
+    private String date;
+    private String hostname;
+    private String committer;
+    private String description;
+    private String jobId;
+    private final List<String> filenames = Lists.newArrayList();
+
+    public Builder setName(String name) {
+      this.name = name;
+      return this;
+    }
+
+    public Builder setSuccess(boolean success) {
+      this.success = success;
+      return this;
+    }
+
+    public Builder setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+      return this;
+    }
+
+    public Builder setDate(String date) {
+      this.date = date;
+      return this;
+    }
+
+    public Builder setHostname(String hostname) {
+      this.hostname = hostname;
+      return this;
+    }
+
+    public Builder setCommitter(String committer) {
+      this.committer = committer;
+      return this;
+    }
+
+    public Builder setDescription(String description) {
+      this.description = description;
+      return this;
+    }
+
+    public Builder setJobId(String jobId) {
+      this.jobId = jobId;
+      return this;
+    }
+
+    public Builder addFileNames(Iterable<String> newFileNames) {
+      if (newFileNames != null) {
+        Iterables.addAll(this.filenames, newFileNames);
+      }
+      return this;
+    }
+
+    public SuccessData build() {
+      return new SuccessData(
+          name, success, timestamp,
+          date, hostname, committer,
+          description, jobId, filenames);
+    }
+  }
+}

+ 184 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java

@@ -0,0 +1,184 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.mapred;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class Committer extends FileOutputCommitter {
+  private static final Logger LOG = LoggerFactory.getLogger(Committer.class);
+  private org.apache.hadoop.mapreduce.OutputCommitter wrapped = null;
+
+  private static Path getOutputPath(JobContext context) {
+    JobConf conf = context.getJobConf();
+    return FileOutputFormat.getOutputPath(conf);
+  }
+
+  private static Path getOutputPath(TaskAttemptContext context) {
+    JobConf conf = context.getJobConf();
+    return FileOutputFormat.getOutputPath(conf);
+  }
+
+  private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(JobContext context) throws IOException {
+    if(wrapped == null) {
+      wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context))
+          ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context)
+          : new org.apache.hadoop.mapred.FileOutputCommitter();
+      LOG.debug("Using OutputCommitter implementation {}", wrapped.getClass().getName());
+    }
+    return wrapped;
+  }
+
+  @InterfaceAudience.Private
+  @Override
+  public Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
+    Path out = getOutputPath(context);
+    return out == null ? null : getTaskAttemptPath(context, out);
+  }
+
+  private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(TaskAttemptContext context) throws IOException {
+    if(wrapped == null) {
+      wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context))
+          ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context)
+          : new org.apache.hadoop.mapred.FileOutputCommitter();
+    }
+    return wrapped;
+  }
+
+  @Override
+  public Path getWorkPath(TaskAttemptContext context, Path outputPath)
+      throws IOException {
+    if (getWrapped(context) instanceof org.apache.hadoop.fs.tosfs.commit.Committer) {
+      return ((org.apache.hadoop.fs.tosfs.commit.Committer) getWrapped(context)).getWorkPath();
+    }
+    return super.getWorkPath(context, outputPath);
+  }
+
+  private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
+    Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
+    if(workPath == null && out != null) {
+      if (getWrapped(context) instanceof org.apache.hadoop.fs.tosfs.commit.Committer) {
+        return CommitUtils.magicTaskAttemptPath(context, getOutputPath(context));
+      } else {
+        return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
+            .getTaskAttemptPath(context, out);
+      }
+    }
+    return workPath;
+  }
+
+  @Override
+  public void setupJob(JobContext context) throws IOException {
+    getWrapped(context).setupJob(context);
+  }
+
+  @Override
+  public void commitJob(JobContext context) throws IOException {
+    getWrapped(context).commitJob(context);
+  }
+
+  @Override
+  @Deprecated
+  public void cleanupJob(JobContext context) throws IOException {
+    getWrapped(context).cleanupJob(context);
+  }
+
+  @Override
+  public void abortJob(JobContext context, int runState)
+      throws IOException {
+    JobStatus.State state;
+    if(runState == JobStatus.State.RUNNING.getValue()) {
+      state = JobStatus.State.RUNNING;
+    } else if(runState == JobStatus.State.SUCCEEDED.getValue()) {
+      state = JobStatus.State.SUCCEEDED;
+    } else if(runState == JobStatus.State.FAILED.getValue()) {
+      state = JobStatus.State.FAILED;
+    } else if(runState == JobStatus.State.PREP.getValue()) {
+      state = JobStatus.State.PREP;
+    } else if(runState == JobStatus.State.KILLED.getValue()) {
+      state = JobStatus.State.KILLED;
+    } else {
+      throw new IllegalArgumentException(runState+" is not a valid runState.");
+    }
+    getWrapped(context).abortJob(context, state);
+  }
+
+  @Override
+  public void setupTask(TaskAttemptContext context) throws IOException {
+    getWrapped(context).setupTask(context);
+  }
+
+  @Override
+  public void commitTask(TaskAttemptContext context) throws IOException {
+    getWrapped(context).commitTask(context);
+  }
+
+  @Override
+  public void abortTask(TaskAttemptContext context) throws IOException {
+    getWrapped(context).abortTask(context);
+  }
+
+  @Override
+  public boolean needsTaskCommit(TaskAttemptContext context)
+      throws IOException {
+    return getWrapped(context).needsTaskCommit(context);
+  }
+
+  @Override
+  @Deprecated
+  public boolean isRecoverySupported() {
+    return false;
+  }
+
+  @Override
+  public boolean isCommitJobRepeatable(JobContext context) throws IOException {
+    return getWrapped(context).isCommitJobRepeatable(context);
+  }
+
+  @Override
+  public boolean isRecoverySupported(JobContext context) throws IOException {
+    return getWrapped(context).isRecoverySupported(context);
+  }
+
+  @Override
+  public void recoverTask(TaskAttemptContext context)
+      throws IOException {
+    getWrapped(context).recoverTask(context);
+  }
+
+  public String jobId() {
+    Preconditions.checkNotNull(wrapped, "Encountered uninitialized job committer.");
+    return wrapped instanceof org.apache.hadoop.fs.tosfs.commit.Committer ? ((org.apache.hadoop.fs.tosfs.commit.Committer) wrapped).jobId() : null;
+  }
+
+  public Path getWorkPath() {
+    Preconditions.checkNotNull(wrapped, "Encountered uninitialized job committer.");
+    return wrapped instanceof org.apache.hadoop.fs.tosfs.commit.Committer ? ((org.apache.hadoop.fs.tosfs.commit.Committer) wrapped).getWorkPath() : null;
+  }
+}

+ 43 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOps.java

@@ -0,0 +1,43 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.ops;
+
+import org.apache.hadoop.fs.tosfs.commit.Pending;
+
+public interface PendingOps {
+  /**
+   * Revert the committed {@link Pending}, usually we need to remove or delete the committed files.
+   *
+   * @param commit to revert.
+   */
+  void revert(Pending commit);
+
+  /**
+   * Abort the uncommitted {@link Pending}, to prevent any further committing.
+   *
+   * @param commit to abort.
+   */
+  void abort(Pending commit);
+
+  /**
+   * Commit the {@link Pending} files to be visible. If we want to revert this completed result, please just use
+   * {@link PendingOps#revert(Pending)} to revert this commit.
+   *
+   * @param commit to be visible.
+   */
+  void commit(Pending commit);
+}

+ 40 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/PendingOpsFactory.java

@@ -0,0 +1,40 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.ops;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+
+public class PendingOpsFactory {
+  public static final String PENDING_OPS_IMPL = "pending.ops.impl";
+  public static final String DEFAULT_PENDING_OPS_IMPL = RawPendingOps.class.getName();
+
+  private PendingOpsFactory() {
+  }
+
+  public static PendingOps create(FileSystem fs, ObjectStorage storage) {
+    try {
+      String opsImpl = fs.getConf().get(PENDING_OPS_IMPL, DEFAULT_PENDING_OPS_IMPL);
+      Class<?> clazz = Class.forName(opsImpl);
+      return (PendingOps) clazz
+          .getDeclaredConstructor(FileSystem.class, ObjectStorage.class)
+          .newInstance(fs, storage);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+}

+ 54 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/ops/RawPendingOps.java

@@ -0,0 +1,54 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.ops;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.tosfs.commit.Pending;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * PendingOps will revert, abort or commit the given {@link Pending} commit.
+ */
+public class RawPendingOps implements PendingOps {
+  private static final Logger LOG = LoggerFactory.getLogger(RawPendingOps.class);
+
+  private final ObjectStorage storage;
+
+  /**
+   * Constructor for {@link PendingOpsFactory} to reflect a new instance.
+   */
+  public RawPendingOps(FileSystem fs, ObjectStorage storage) {
+    this.storage = storage;
+  }
+
+  public void revert(Pending commit) {
+    LOG.info("Revert the commit by deleting the object key - {}", commit);
+    storage.delete(commit.destKey());
+  }
+
+  public void abort(Pending commit) {
+    LOG.info("Abort the commit by aborting multipart upload - {}", commit);
+    storage.abortMultipartUpload(commit.destKey(), commit.uploadId());
+  }
+
+  public void commit(Pending commit) {
+    LOG.info("Commit by completing the multipart uploads - {}", commit);
+    storage.completeUpload(commit.destKey(), commit.uploadId(), commit.parts());
+  }
+}

+ 6 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java

@@ -84,4 +84,10 @@ public class ConfKeys {
    */
   public static final String OBJECT_RENAME_ENABLED = "fs.tos.rename.enabled";
   public static final boolean OBJECT_RENAME_ENABLED_DEFAULT = false;
+
+  /**
+   * The range size when open object storage input stream. Value must be positive.
+   */
+  public static final String OBJECT_STREAM_RANGE_SIZE = "proton.objectstorage.stream.range-size";
+  public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE;
 }

+ 46 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/JsonCodec.java

@@ -0,0 +1,46 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+public class JsonCodec<T> {
+  private static final ObjectMapper MAPPER = new ObjectMapper()
+      .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, true)
+      .configure(SerializationFeature.INDENT_OUTPUT, true)
+      .setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY);
+  private final Class<T> clazz;
+
+  public JsonCodec(Class<T> clazz) {
+    this.clazz = clazz;
+  }
+
+  public byte[] toBytes(T instance) throws IOException {
+    return MAPPER.writeValueAsBytes(instance);
+  }
+
+  public T fromBytes(byte[] data) throws IOException {
+    return MAPPER.readValue(new String(data, 0, data.length, StandardCharsets.UTF_8), clazz);
+  }
+}

+ 25 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/util/Serializer.java

@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import java.io.IOException;
+
+public interface Serializer {
+  byte[] serialize() throws IOException;
+}