瀏覽代碼

Integration of TOS: Add Committer test.

lijinglun 6 月之前
父節點
當前提交
c7cd0ba4d6
共有 20 個文件被更改,包括 1866 次插入35 次删除
  1. 1 1
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java
  2. 1 1
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java
  3. 33 0
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java
  4. 2 2
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java
  5. 2 2
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java
  6. 9 10
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java
  7. 2 2
      hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java
  8. 1 1
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java
  9. 227 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java
  10. 422 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java
  11. 219 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java
  12. 232 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java
  13. 29 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java
  14. 49 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java
  15. 1 1
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java
  16. 364 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java
  17. 228 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java
  18. 29 0
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java
  19. 3 3
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java
  20. 12 12
      hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java

+ 1 - 1
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/RawFileSystem.java

@@ -178,7 +178,7 @@ public class RawFileSystem extends FileSystem {
       if (fileStatus == null && FuseUtils.fuseEnabled()) {
         // The fuse requires the file to be visible when accessing getFileStatus once we created the file, so here we
         // close and commit the file to be visible explicitly for fuse, and then reopen the file output stream for
-        // further data bytes writing. For more details please see: https://code.byted.org/emr/proton/issues/825
+        // further data bytes writing.
         out.close();
         out = new ObjectOutputStream(storage, uploadThreadPool, getConf(), makeQualified(path), true);
       }

+ 1 - 1
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitUtils.java

@@ -329,7 +329,7 @@ public class CommitUtils {
     void visit(FileStatus f);
   }
 
-  public static boolean supportProtonCommit(Configuration conf, Path outputPath) {
+  public static boolean supportObjectStorageCommit(Configuration conf, Path outputPath) {
     return supportSchemes(conf).contains(outputPath.toUri().getScheme());
   }
 

+ 33 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/CommitterFactory.java

@@ -0,0 +1,33 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+
+import java.io.IOException;
+
+public class CommitterFactory extends PathOutputCommitterFactory {
+
+  @Override
+  public PathOutputCommitter createOutputCommitter(Path outputPath,
+      TaskAttemptContext context) throws IOException {
+    return new Committer(outputPath, context);
+  }
+}

+ 2 - 2
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/mapred/Committer.java

@@ -47,7 +47,7 @@ public class Committer extends FileOutputCommitter {
 
   private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(JobContext context) throws IOException {
     if(wrapped == null) {
-      wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context))
+      wrapped = CommitUtils.supportObjectStorageCommit(context.getConfiguration(), getOutputPath(context))
           ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context)
           : new org.apache.hadoop.mapred.FileOutputCommitter();
       LOG.debug("Using OutputCommitter implementation {}", wrapped.getClass().getName());
@@ -64,7 +64,7 @@ public class Committer extends FileOutputCommitter {
 
   private org.apache.hadoop.mapreduce.OutputCommitter getWrapped(TaskAttemptContext context) throws IOException {
     if(wrapped == null) {
-      wrapped = CommitUtils.supportProtonCommit(context.getConfiguration(), getOutputPath(context))
+      wrapped = CommitUtils.supportObjectStorageCommit(context.getConfiguration(), getOutputPath(context))
           ? new org.apache.hadoop.fs.tosfs.commit.Committer(getOutputPath(context), context)
           : new org.apache.hadoop.mapred.FileOutputCommitter();
     }

+ 2 - 2
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/common/ThreadPools.java

@@ -41,12 +41,12 @@ public class ThreadPools {
   private ThreadPools() {
   }
 
-  public static final String WORKER_THREAD_POOL_SIZE_PROP = "proton.worker.num-threads";
+  public static final String WORKER_THREAD_POOL_SIZE_PROP = "tos.worker.num-threads";
 
   public static final int WORKER_THREAD_POOL_SIZE =
       poolSize(Math.max(2, Runtime.getRuntime().availableProcessors()));
 
-  private static final ExecutorService WORKER_POOL = newWorkerPool("proton-default-worker-pool");
+  private static final ExecutorService WORKER_POOL = newWorkerPool("tos-default-worker-pool");
 
   public static ExecutorService defaultWorkerPool() {
     return WORKER_POOL;

+ 9 - 10
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/TosKeys.java

@@ -109,15 +109,15 @@ public class TosKeys {
   public static final int FS_TOS_HTTP_CONNECT_TIMEOUT_MILLS_DEFAULT = 10000;
 
   /**
-   * The reading timeout when reading data from tos. Note that it is configured for the tos client,
-   * not proton.
+   * The reading timeout when reading data from tos. Note that it is configured for the tos client
+   * sdk, not hadoop-tos.
    */
   public static final String FS_TOS_HTTP_READ_TIMEOUT_MILLS = "fs.tos.http.readTimeoutMills";
   public static final int FS_TOS_HTTP_READ_TIMEOUT_MILLS_DEFAULT = 30000;
 
   /**
-   * The writing timeout when uploading data to tos. Note that it is configured for the tos client,
-   * not proton.
+   * The writing timeout when uploading data to tos. Note that it is configured for the tos client
+   * sdk, not hadoop-tos.
    */
   public static final String FS_TOS_HTTP_WRITE_TIMEOUT_MILLS = "fs.tos.http.writeTimeoutMills";
   public static final int FS_TOS_HTTP_WRITE_TIMEOUT_MILLS_DEFAULT = 30000;
@@ -152,11 +152,10 @@ public class TosKeys {
 
   /**
    * The prefix will be used as the product name in TOS SDK. The final user agent pattern is
-   * '{prefix}/Proton/{proton version}'.
-   * TODO: review it.
+   * '{prefix}/TOS_FS/{hadoop tos version}'.
    */
   public static final String FS_TOS_USER_AGENT_PREFIX = "fs.tos.user.agent.prefix";
-  public static final String FS_TOS_USER_AGENT_PREFIX_DEFAULT = "EMR";
+  public static final String FS_TOS_USER_AGENT_PREFIX_DEFAULT = "HADOOP-TOS";
 
   // TOS common keys.
   /**
@@ -187,7 +186,7 @@ public class TosKeys {
   public static final int FS_TOS_BATCH_DELETE_MAX_RETRIES_DEFAULT = 20;
 
   /**
-   * The codes from TOS deleteMultiObjects response, Proton will resend the batch delete request to
+   * The codes from TOS deleteMultiObjects response, client will resend the batch delete request to
    * delete the failed keys again if the response only contains these codes, otherwise won't send
    * request anymore.
    */
@@ -213,7 +212,7 @@ public class TosKeys {
   public static final int FS_TOS_LIST_OBJECTS_COUNT_DEFAULT = 1000;
 
   /**
-   * The maximum retry times of sending request via TOS client, Proton will resend the request if
+   * The maximum retry times of sending request via TOS client, client will resend the request if
    * got retryable exceptions, e.g. SocketException, UnknownHostException, SSLException,
    * InterruptedException, SocketTimeoutException, or got TOO_MANY_REQUESTS, INTERNAL_SERVER_ERROR
    * http codes.
@@ -232,7 +231,7 @@ public class TosKeys {
       TOSErrorCodes.FAST_FAILURE_CONFLICT_ERROR_CODES;
 
   /**
-   * The maximum retry times of reading object content via TOS client, Proton will resend the
+   * The maximum retry times of reading object content via TOS client, client will resend the
    * request to create a new input stream if getting unexpected end of stream error during reading
    * the input stream.
    */

+ 2 - 2
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/tos/TOS.java

@@ -154,10 +154,10 @@ public class TOS implements DirectoryStorage {
   private BucketInfo bucketInfo;
 
   static {
-    org.apache.log4j.Logger logger = LogManager.getLogger("io.proton.shaded.com.volcengine.tos");
+    org.apache.log4j.Logger logger = LogManager.getLogger("com.volcengine.tos");
     String logLevel = System.getProperty("tos.log.level", "WARN");
 
-    LOG.debug("Reset the log level of io.proton.shaded.com.volcengine.tos with {} ", logLevel);
+    LOG.debug("Reset the log level of com.volcengine.tos with {} ", logLevel);
     logger.setLevel(Level.toLevel(logLevel.toUpperCase(), Level.WARN));
   }
 

+ 1 - 1
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/TestTosChecksum.java

@@ -46,7 +46,7 @@ import static org.junit.Assert.assertEquals;
 
 @RunWith(Parameterized.class)
 public class TestTosChecksum {
-  private static final String FILE_STORE_ROOT = TempFiles.newTempDir("TestProtonChecksum");
+  private static final String FILE_STORE_ROOT = TempFiles.newTempDir("TestTosChecksum");
   private static final String ALGORITHM_NAME = "mock-algorithm";
   private static final String PREFIX = UUIDUtils.random();
 

+ 227 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/BaseJobSuite.java

@@ -0,0 +1,227 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Iterables;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+public abstract class BaseJobSuite {
+  private static final Logger LOG = LoggerFactory.getLogger(BaseJobSuite.class);
+  public static final int DEFAULT_APP_ATTEMPT_ID = 1;
+  protected static final Text KEY_1 = new Text("key1");
+  protected static final Text KEY_2 = new Text("key2");
+  protected static final Text VAL_1 = new Text("val1");
+  protected static final Text VAL_2 = new Text("val2");
+
+  protected Job job;
+  protected String jobId;
+  protected FileSystem fs;
+  protected Path outputPath;
+  protected ObjectStorage storage;
+
+  private final boolean dumpObjectStorage = ParseUtils.envAsBoolean("DUMP_OBJECT_STORAGE", false);
+
+  protected abstract Path magicPartPath();
+
+  protected abstract Path magicPendingSetPath();
+
+  protected abstract void assertSuccessMarker() throws IOException;
+
+  protected abstract void assertSummaryReport(Path reportDir) throws IOException;
+
+  protected abstract void assertNoTaskAttemptPath() throws IOException;
+
+  protected void assertMagicPathExist(Path outputPath) throws IOException {
+    Path magicPath = CommitUtils.magicPath(outputPath);
+    Assert.assertTrue(String.format("Magic path: %s should exist", magicPath), fs.exists(magicPath));
+  }
+
+  protected void assertMagicPathNotExist(Path outputPath) throws IOException {
+    Path magicPath = CommitUtils.magicPath(outputPath);
+    Assert.assertFalse(String.format("Magic path: %s should not exist", magicPath), fs.exists(magicPath));
+  }
+
+  protected abstract boolean skipTests();
+
+  public Path magicPendingPath() {
+    Path magicPart = magicPartPath();
+    return new Path(magicPart.getParent(), magicPart.getName() + ".pending");
+  }
+
+  public Path magicJobPath() {
+    return CommitUtils.magicPath(outputPath);
+  }
+
+  public String magicPartKey() {
+    return ObjectUtils.pathToKey(magicPartPath());
+  }
+
+  public String destPartKey() {
+    return MagicOutputStream.toDestKey(magicPartPath());
+  }
+
+  public FileSystem fs() {
+    return fs;
+  }
+
+  public ObjectStorage storage() {
+    return storage;
+  }
+
+  public Job job() {
+    return job;
+  }
+
+  public void assertHasMagicKeys() {
+    Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    Assert.assertTrue("Should have some __magic object keys", Iterables.any(objects, o -> o.key().contains(
+        CommitUtils.MAGIC) && o.key().contains(jobId)));
+  }
+
+  public void assertHasBaseKeys() {
+    Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    Assert.assertTrue("Should have some __base object keys", Iterables.any(objects, o -> o.key().contains(
+        CommitUtils.BASE) && o.key().contains(jobId)));
+  }
+
+  public void assertNoMagicPendingFile() {
+    String magicPendingKey = String.format("%s.pending", magicPartKey());
+    Assert.assertNull("Magic pending key should exist", storage.head(magicPendingKey));
+  }
+
+  public void assertHasMagicPendingFile() {
+    String magicPendingKey = String.format("%s.pending", magicPartKey());
+    Assert.assertNotNull("Magic pending key should exist", storage.head(magicPendingKey));
+  }
+
+  public void assertNoMagicMultipartUpload() {
+    Iterable<MultipartUpload> uploads = storage.listUploads(ObjectUtils.pathToKey(magicJobPath(), true));
+    boolean anyMagicUploads = Iterables.any(uploads, u -> u.key().contains(CommitUtils.MAGIC));
+    Assert.assertFalse("Should have no magic multipart uploads", anyMagicUploads);
+  }
+
+  public void assertNoMagicObjectKeys() {
+    Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    boolean anyMagicUploads =
+        Iterables.any(objects, o -> o.key().contains(CommitUtils.MAGIC) && o.key().contains(jobId));
+    Assert.assertFalse("Should not have any magic keys", anyMagicUploads);
+  }
+
+  public void assertHasPendingSet() {
+    Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    boolean anyPendingSet =
+        Iterables.any(objects, o -> o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId));
+    Assert.assertTrue("Should have the expected .pendingset file", anyPendingSet);
+  }
+
+  public void assertPendingSetAtRightLocation() {
+    Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(magicJobPath(), true), "");
+    Path magicJobAttemptPath =
+        CommitUtils.magicJobAttemptPath(job().getJobID().toString(), DEFAULT_APP_ATTEMPT_ID, outputPath);
+    String inQualifiedPath = magicJobAttemptPath.toUri().getPath().substring(1);
+    Iterable<ObjectInfo> filtered =
+        Iterables.filter(objects, o -> o.key().contains(CommitUtils.PENDINGSET_SUFFIX) && o.key().contains(jobId));
+    boolean pendingSetAtRightLocation =
+        Iterables.any(filtered, o -> o.key().startsWith(inQualifiedPath) && o.key().contains(jobId));
+    Assert.assertTrue("The .pendingset file should locate at the job's magic output path.", pendingSetAtRightLocation);
+  }
+
+  public void assertMultipartUpload(int expectedUploads) {
+    // Note: should be care in concurrent case: they need to check the same output path.
+    Iterable<MultipartUpload> uploads = storage.listUploads(ObjectUtils.pathToKey(outputPath, true));
+    long actualUploads = StreamSupport.stream(uploads.spliterator(), false).count();
+    Assert.assertEquals(expectedUploads, actualUploads);
+  }
+
+  public void assertPartFiles(int num) throws IOException {
+    FileStatus[] files = fs.listStatus(outputPath,
+        f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && f.toUri().toString().contains("part-"));
+    Assert.assertEquals(num, files.length);
+    Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(outputPath, true), "");
+    List<ObjectInfo> infos = Arrays.stream(Iterables.toArray(objects, ObjectInfo.class))
+        .filter(o -> o.key().contains("part-")).collect(Collectors.toList());
+    Assert.assertEquals(
+        String.format("Part files number should be %d, but got %d", num, infos.size()), num, infos.size());
+  }
+
+  public void assertNoPartFiles() throws IOException {
+    FileStatus[] files = fs.listStatus(outputPath,
+        f -> !MagicOutputStream.isMagic(new Path(f.toUri())) && f.toUri().toString().contains("part-"));
+    Assert.assertEquals(0, files.length);
+    Iterable<ObjectInfo> objects = storage.listAll(ObjectUtils.pathToKey(outputPath, true), "");
+    boolean anyPartFile = Iterables.any(objects, o -> o.key().contains("part-"));
+    Assert.assertFalse("Should have no part files", anyPartFile);
+  }
+
+  public void dumpObjectStorage() {
+    if (dumpObjectStorage) {
+      LOG.info("===> Dump object storage - Start <===");
+      dumpObjectKeys();
+      dumpMultipartUploads();
+      LOG.info("===> Dump object storage -  End  <===");
+    }
+  }
+
+  public void dumpObjectKeys() {
+    String prefix = ObjectUtils.pathToKey(magicJobPath());
+    LOG.info("Dump object keys with prefix {}", prefix);
+    storage.listAll("", "").forEach(o -> LOG.info("Dump object keys - {}", o));
+  }
+
+  public void dumpMultipartUploads() {
+    String prefix = ObjectUtils.pathToKey(magicJobPath());
+    LOG.info("Dump multi part uploads with prefix {}", prefix);
+    storage.listUploads("")
+        .forEach(u -> LOG.info("Dump multipart uploads - {}", u));
+  }
+
+  public void verifyPartContent() throws IOException {
+    String partKey = destPartKey();
+    LOG.info("Part key to verify is: {}", partKey);
+    try (InputStream in = storage.get(partKey).stream()) {
+      byte[] data = IOUtils.toByteArray(in);
+      String expected = String.format("%s\t%s\n%s\t%s\n", KEY_1, VAL_1, KEY_2, VAL_2);
+      Assert.assertEquals(expected, new String(data, StandardCharsets.UTF_8));
+    }
+  }
+
+  public void assertSuccessMarkerNotExist() throws IOException {
+    Path succPath = CommitUtils.successMarker(outputPath);
+    Assert.assertFalse(String.format("%s should not exists", succPath), fs.exists(succPath));
+  }
+}

+ 422 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/CommitterTestBase.java

@@ -0,0 +1,422 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThrows;
+
+public abstract class CommitterTestBase {
+  private Configuration conf;
+  private FileSystem fs;
+  private Path outputPath;
+  private TaskAttemptID job1Task0Attempt0;
+  private TaskAttemptID job2Task1Attempt0;
+  private Path reportDir;
+
+  @Before
+  public void setup() throws IOException {
+    conf = newConf();
+    fs = FileSystem.get(conf);
+    String uuid = UUIDUtils.random();
+    outputPath = fs.makeQualified(new Path("/test/" + uuid));
+    job1Task0Attempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 0, 0);
+    job2Task1Attempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 1, 0);
+
+    reportDir = fs.makeQualified(new Path("/report/" + uuid));
+    fs.mkdirs(reportDir);
+    conf.set(Committer.COMMITTER_SUMMARY_REPORT_DIR, reportDir.toUri().toString());
+  }
+
+  protected abstract Configuration newConf();
+
+  @After
+  public void teardown() {
+    CommonUtils.runQuietly(() -> fs.delete(outputPath, true));
+    IOUtils.closeStream(fs);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    List<String> committerThreads = Thread.getAllStackTraces().keySet()
+        .stream()
+        .map(Thread::getName)
+        .filter(n -> n.startsWith(Committer.THREADS_PREFIX))
+        .collect(Collectors.toList());
+    Assert.assertTrue("Outstanding committer threads", committerThreads.isEmpty());
+  }
+
+  private static String randomTrimmedJobId() {
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
+    return String.format("%s%04d_%04d", formatter.format(new Date()),
+        (long) (Math.random() * 1000),
+        (long) (Math.random() * 1000));
+  }
+
+  private static String randomFormedJobId() {
+    return String.format("job_%s", randomTrimmedJobId());
+  }
+
+  @Test
+  public void testSetupJob() throws IOException {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Setup job.
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+  }
+
+  @Test
+  public void testSetupJobWithOrphanPaths() throws IOException, InterruptedException {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Orphan success marker.
+    Path successPath = CommitUtils.successMarker(outputPath);
+    CommitUtils.save(fs, successPath, new byte[]{});
+    Assert.assertTrue(fs.exists(successPath));
+
+    // Orphan job path.
+    Path jobPath = CommitUtils.magicJobPath(suite.committer().jobId(), outputPath);
+    fs.mkdirs(jobPath);
+    Assert.assertTrue("The job path should be existing", fs.exists(jobPath));
+    Path subPath = new Path(jobPath, "tmp.pending");
+    CommitUtils.save(fs, subPath, new byte[]{});
+    Assert.assertTrue("The sub path under job path should be existing.", fs.exists(subPath));
+    FileStatus jobPathStatus = fs.getFileStatus(jobPath);
+
+    Thread.sleep(1000L);
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    assertFalse("Should have deleted the success path", fs.exists(successPath));
+    Assert.assertTrue("Should have re-created the job path", fs.exists(jobPath));
+    assertFalse("Should have deleted the sub path under the job path", fs.exists(subPath));
+  }
+
+  @Test
+  public void testSetupTask() throws IOException {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Remaining attempt task path.
+    Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(suite.taskAttemptContext(), outputPath);
+    Path subTaskAttemptPath = new Path(taskAttemptBasePath, "tmp.pending");
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup job.
+    suite.setupJob();
+    suite.assertHasMagicKeys();
+    // It will clear all the job path once we've set up the job.
+    assertFalse(fs.exists(taskAttemptBasePath));
+    assertFalse(fs.exists(subTaskAttemptPath));
+
+    // Left some the task paths.
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup task.
+    suite.setupTask();
+    assertFalse(fs.exists(subTaskAttemptPath));
+  }
+
+  @Test
+  public void testCommitTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Setup job
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    // Setup task
+    suite.setupTask();
+
+    // Write records.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+    suite.writeOutput();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    assertEquals(suite.destPartKey(), pending.destKey());
+    assertEquals(20, pending.length());
+    assertEquals(1, pending.parts().size());
+
+    // Commit the task.
+    suite.commitTask();
+
+    // Verify the pending set file.
+    suite.assertHasPendingSet();
+    // Assert the pending set file content.
+    Path pendingSetPath = suite.magicPendingSetPath();
+    byte[] pendingSetData = CommitUtils.load(suite.fs(), pendingSetPath);
+    PendingSet pendingSet = PendingSet.deserialize(pendingSetData);
+    assertEquals(suite.job().getJobID().toString(), pendingSet.jobId());
+    assertEquals(1, pendingSet.commits().size());
+    assertEquals(pending, pendingSet.commits().get(0));
+    assertEquals(pendingSet.extraData(),
+        ImmutableMap.of(CommitUtils.TASK_ATTEMPT_ID, suite.taskAttemptContext().getTaskAttemptID().toString()));
+
+    // Complete the multipart upload and verify the results.
+    ObjectStorage storage = suite.storage();
+    storage.completeUpload(pending.destKey(), pending.uploadId(), pending.parts());
+    suite.verifyPartContent();
+  }
+
+  @Test
+  public void testAbortTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+
+    // Pre-check before the output write.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+
+    // Execute the output write.
+    suite.writeOutput();
+
+    // Post-check after the output write.
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    assertEquals(suite.destPartKey(), pending.destKey());
+    assertEquals(20, pending.length());
+    assertEquals(1, pending.parts().size());
+
+    // Abort the task.
+    suite.abortTask();
+
+    // Verify the state after aborting task.
+    suite.assertNoMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(0);
+    suite.assertNoTaskAttemptPath();
+  }
+
+  @Test
+  public void testCommitJob() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.assertSummaryReport(reportDir);
+    suite.verifyPartContent();
+  }
+
+
+  @Test
+  public void testCommitJobFailed() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+  }
+
+  @Test
+  public void testCommitJobSuccessMarkerFailed() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    CommitUtils.injectError("marker");
+    // Commit the job.
+    suite.assertNoPartFiles();
+    assertThrows("Expect commit job error.", IOException.class, suite::commitJob);
+    CommitUtils.removeError("marker");
+
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarkerNotExist();
+    assertEquals(0, suite.fs().listStatus(suite.outputPath).length);
+  }
+
+  @Test
+  public void testTaskCommitAfterJobCommit() throws Exception {
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+
+    // Commit the task again.
+    assertThrows(FileNotFoundException.class, suite::commitTask);
+  }
+
+  @Test
+  public void testTaskCommitWithConsistentJobId() throws Exception {
+    Configuration conf = newConf();
+    String consistentJobId = randomFormedJobId();
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    JobSuite suite = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // By now, we have two "jobId"s, one is spark uuid, and the other is the jobId in taskAttempt.
+    // The job committer will adopt the former.
+    suite.setupJob();
+
+    // Next, we clear spark uuid, and set the jobId of taskAttempt to another value. In this case,
+    // the committer will take the jobId of taskAttempt as the final jobId, which is not consistent
+    // with the one that committer holds.
+    conf.unset(CommitUtils.SPARK_WRITE_UUID);
+    String anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId1 = JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    final TaskAttemptContext attemptContext1 =
+        JobSuite.createTaskAttemptContext(conf, taskAttemptId1, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    assertThrows("JobId set in the context", IllegalArgumentException.class,
+        () -> suite.setupTask(attemptContext1));
+
+    // Even though we use another taskAttempt, as long as we ensure the spark uuid is consistent,
+    // the jobId in committer is consistent.
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    conf.set(FileOutputFormat.OUTDIR, outputPath.toString());
+    anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId2 = JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    TaskAttemptContext attemptContext2 =
+        JobSuite.createTaskAttemptContext(conf, taskAttemptId2, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    suite.setupTask(attemptContext2);
+    // Write output must use the same task context with setup task.
+    suite.writeOutput(attemptContext2);
+    // Commit task must use the same task context with setup task.
+    suite.commitTask(attemptContext2);
+    suite.assertPendingSetAtRightLocation();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+  }
+
+  @Test
+  public void testConcurrentJobs() throws Exception {
+    JobSuite suite1 = JobSuite.create(conf, job1Task0Attempt0, outputPath);
+    JobSuite suite2 = JobSuite.create(conf, job2Task1Attempt0, outputPath);
+    Assume.assumeFalse(suite1.skipTests());
+    Assume.assumeFalse(suite2.skipTests());
+    suite1.setupJob();
+    suite2.setupJob();
+    suite1.setupTask();
+    suite2.setupTask();
+    suite1.writeOutput();
+    suite2.writeOutput();
+    suite1.commitTask();
+    suite2.commitTask();
+
+    // Job2 commit the job.
+    suite2.assertNoPartFiles();
+    suite2.commitJob();
+    suite2.assertPartFiles(1);
+
+    suite2.assertNoMagicMultipartUpload();
+    suite2.assertNoMagicObjectKeys();
+    suite2.assertSuccessMarker();
+    suite2.assertSummaryReport(reportDir);
+    suite2.verifyPartContent();
+    suite2.assertMagicPathExist(outputPath);
+
+    // Job1 commit the job.
+    suite1.commitJob();
+    suite2.assertPartFiles(2);
+
+    // Verify the output.
+    suite1.assertNoMagicMultipartUpload();
+    suite1.assertNoMagicObjectKeys();
+    suite1.assertSuccessMarker();
+    suite1.assertSummaryReport(reportDir);
+    suite1.verifyPartContent();
+    suite1.assertMagicPathNotExist(outputPath);
+  }
+}

+ 219 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/JobSuite.java

@@ -0,0 +1,219 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.PathOutputCommitterFactory;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Assert;
+
+import java.io.IOException;
+
+public class JobSuite extends BaseJobSuite {
+  private static final CommitterFactory FACTORY = new CommitterFactory();
+  private final JobContext jobContext;
+  private final TaskAttemptContext taskAttemptContext;
+  private final Committer committer;
+
+  private JobSuite(FileSystem fs, Configuration conf, TaskAttemptID taskAttemptId, int appAttemptId, Path outputPath)
+      throws IOException {
+    this.fs = fs;
+    // Initialize the job instance.
+    this.job = Job.getInstance(conf);
+    job.setJobID(JobID.forName(CommitUtils.buildJobId(conf, taskAttemptId.getJobID())));
+    this.jobContext = createJobContext(job.getConfiguration(), taskAttemptId);
+    this.jobId = CommitUtils.buildJobId(jobContext);
+    this.taskAttemptContext = createTaskAttemptContext(job.getConfiguration(), taskAttemptId, appAttemptId);
+
+    // Set job output directory.
+    FileOutputFormat.setOutputPath(job, outputPath);
+    this.outputPath = outputPath;
+    this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), conf);
+
+    // Initialize committer.
+    this.committer = (Committer) FACTORY.createOutputCommitter(outputPath, taskAttemptContext);
+  }
+
+  public static JobSuite create(Configuration conf, TaskAttemptID taskAttemptId, Path outDir) throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    return new JobSuite(fs, conf, taskAttemptId, DEFAULT_APP_ATTEMPT_ID, outDir);
+  }
+
+  public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int attemptId) {
+    String attempt = String.format("attempt_%s_m_000000_%d", trimmedJobId, attemptId);
+    return TaskAttemptID.forName(attempt);
+  }
+
+  public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int taskId, int attemptId) {
+    String[] parts = trimmedJobId.split("_");
+    return new TaskAttemptID(parts[0], Integer.parseInt(parts[1]), TaskType.MAP, taskId, attemptId);
+  }
+
+  public static JobContext createJobContext(Configuration jobConf, TaskAttemptID taskAttemptId) {
+    return new JobContextImpl(jobConf, taskAttemptId.getJobID());
+  }
+
+  public static TaskAttemptContext createTaskAttemptContext(
+      Configuration jobConf, TaskAttemptID taskAttemptId, int appAttemptId) throws IOException {
+    // Set the key values for job configuration.
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString());
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+    jobConf.set(PathOutputCommitterFactory.COMMITTER_FACTORY_CLASS, CommitterFactory.class.getName());
+    return new TaskAttemptContextImpl(jobConf, taskAttemptId);
+  }
+
+  public void setupJob() throws IOException {
+    committer.setupJob(jobContext);
+  }
+
+  public void setupTask() throws IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a different
+  // taskAttemptContext, e.g., for a spark job.
+  public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  public void writeOutput() throws Exception {
+    writeOutput(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a different
+  // taskAttemptContext, e.g., for a spark job.
+  public void writeOutput(TaskAttemptContext taskAttemptContext) throws Exception {
+    RecordWriter<Object, Object> writer = new TextOutputFormat<>().getRecordWriter(taskAttemptContext);
+    NullWritable nullKey = NullWritable.get();
+    NullWritable nullVal = NullWritable.get();
+    Object[] keys = new Object[]{KEY_1, nullKey, null, nullKey, null, KEY_2};
+    Object[] vals = new Object[]{VAL_1, nullVal, null, null, nullVal, VAL_2};
+    try {
+      Assert.assertEquals(keys.length, vals.length);
+      for (int i = 0; i < keys.length; i++) {
+        writer.write(keys[i], vals[i]);
+      }
+    } finally {
+      writer.close(taskAttemptContext);
+    }
+  }
+
+  public boolean needsTaskCommit() {
+    return committer.needsTaskCommit(taskAttemptContext);
+  }
+
+  public void commitTask() throws IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a different
+  // taskAttemptContext, e.g., for a spark job.
+  public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  public void abortTask() throws IOException {
+    committer.abortTask(taskAttemptContext);
+  }
+
+  public void commitJob() throws IOException {
+    committer.commitJob(jobContext);
+  }
+
+  @Override
+  public Path magicPartPath() {
+    return new Path(committer.getWorkPath(), FileOutputFormat.getUniqueFile(taskAttemptContext, "part", ""));
+  }
+
+  @Override
+  public Path magicPendingSetPath() {
+    return CommitUtils.magicTaskPendingSetPath(taskAttemptContext, outputPath);
+  }
+
+  public TaskAttemptContext taskAttemptContext() {
+    return taskAttemptContext;
+  }
+
+  public Committer committer() {
+    return committer;
+  }
+
+  @Override
+  public void assertNoTaskAttemptPath() throws IOException {
+    Path path = CommitUtils.magicTaskAttemptBasePath(taskAttemptContext, outputPath);
+    Assert.assertFalse("Task attempt path should be not existing", fs.exists(path));
+    String pathToKey = ObjectUtils.pathToKey(path);
+    Assert.assertNull("Should have no task attempt path key", storage.head(pathToKey));
+  }
+
+  @Override
+  protected boolean skipTests() {
+    return storage.bucket().isDirectory();
+  }
+
+  @Override
+  public void assertSuccessMarker() throws IOException {
+    Path succPath = CommitUtils.successMarker(outputPath);
+    Assert.assertTrue(String.format("%s should be exists", succPath), fs.exists(succPath));
+    SuccessData successData = SuccessData.deserialize(CommitUtils.load(fs, succPath));
+    Assert.assertEquals(SuccessData.class.getName(), successData.name());
+    Assert.assertTrue(successData.success());
+    Assert.assertEquals(NetUtils.getHostname(), successData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, successData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()),
+        successData.description());
+    Assert.assertEquals(job.getJobID().toString(), successData.jobId());
+    Assert.assertEquals(1, successData.filenames().size());
+    Assert.assertEquals(destPartKey(), successData.filenames().get(0));
+  }
+
+  @Override
+  public void assertSummaryReport(Path reportDir) throws IOException {
+    Path reportPath = CommitUtils.summaryReport(reportDir, job().getJobID().toString());
+    Assert.assertTrue(String.format("%s should be exists", reportPath), fs.exists(reportPath));
+    SuccessData reportData = SuccessData.deserialize(CommitUtils.load(fs, reportPath));
+    Assert.assertEquals(SuccessData.class.getName(), reportData.name());
+    Assert.assertTrue(reportData.success());
+    Assert.assertEquals(NetUtils.getHostname(), reportData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, reportData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()),
+        reportData.description());
+    Assert.assertEquals(job.getJobID().toString(), reportData.jobId());
+    Assert.assertEquals(1, reportData.filenames().size());
+    Assert.assertEquals(destPartKey(), reportData.filenames().get(0));
+    Assert.assertEquals("clean", reportData.diagnostics().get("stage"));
+  }
+}

+ 232 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/MRJobTestBase.java

@@ -0,0 +1,232 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.terasort.TeraGen;
+import org.apache.hadoop.examples.terasort.TeraSort;
+import org.apache.hadoop.examples.terasort.TeraSortConfigKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.ObjectInfo;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.WordCount;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class MRJobTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(MRJobTestBase.class);
+
+  private static Configuration conf = new Configuration();
+  private static MiniMRYarnCluster yarnCluster;
+
+  private static FileSystem fs;
+
+  private static Path testDataPath;
+
+  public static void setConf(Configuration newConf) {
+    conf = newConf;
+  }
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    conf.setBoolean(JHAdminConfig.MR_HISTORY_CLEANER_ENABLE, false);
+    conf.setBoolean(YarnConfiguration.NM_DISK_HEALTH_CHECK_ENABLE, false);
+    conf.setInt(YarnConfiguration.NM_MAX_PER_DISK_UTILIZATION_PERCENTAGE, 100);
+
+    conf.set("mapreduce.outputcommitter.factory.scheme.tos",
+        CommitterFactory.class.getName()); // 3x newApiCommitter=true.
+    conf.set("mapred.output.committer.class",
+        Committer.class.getName()); // 2x and 3x newApiCommitter=false.
+    conf.set("mapreduce.outputcommitter.class",
+        org.apache.hadoop.fs.tosfs.commit.Committer.class.getName()); // 2x newApiCommitter=true.
+
+    // Start the yarn cluster.
+    yarnCluster = new MiniMRYarnCluster("yarn-" + System.currentTimeMillis(), 2);
+    LOG.info("Default filesystem: {}", conf.get("fs.defaultFS"));
+    LOG.info("Default filesystem implementation: {}", conf.get("fs.AbstractFileSystem.tos.impl"));
+
+    yarnCluster.init(conf);
+    yarnCluster.start();
+
+    fs = FileSystem.get(conf);
+    testDataPath = new Path("/mr-test-" + UUIDUtils.random())
+        .makeQualified(fs.getUri(), fs.getWorkingDirectory());
+  }
+
+  @AfterClass
+  public static void afterClass() throws IOException {
+    fs.delete(testDataPath, true);
+    if (yarnCluster != null) {
+      yarnCluster.stop();
+    }
+  }
+
+  @Before
+  public void before() throws IOException {
+  }
+
+  @After
+  public void after() throws IOException {
+  }
+
+  @Test
+  public void testTeraGen() throws Exception {
+    Path teraGenPath = new Path(testDataPath, "teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path output = new Path(teraGenPath, "output");
+    JobConf jobConf = new JobConf(yarnCluster.getConfig());
+    jobConf.addResource(conf);
+    jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+    jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
+    jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
+
+    String[] args = new String[]{Integer.toString(1000), output.toString()};
+    int result = ToolRunner.run(jobConf, new TeraGen(), args);
+    Assert.assertEquals(String.format("teragen %s", StringUtils.join(" ", args)), 0, result);
+
+    // Verify the success data.
+    ObjectStorage storage = ObjectStorageFactory.create(
+        output.toUri().getScheme(), output.toUri().getAuthority(), conf);
+    int byteSizes = 0;
+
+    Path success = new Path(output, CommitUtils._SUCCESS);
+    byte[] serializedData = CommitUtils.load(fs, success);
+    SuccessData successData = SuccessData.deserialize(serializedData);
+    Assert.assertTrue("Should execute successfully", successData.success());
+    // Assert the destination paths.
+    Assert.assertEquals(2, successData.filenames().size());
+    successData.filenames().sort(String::compareTo);
+    Assert.assertEquals(ObjectUtils.pathToKey(new Path(output, "part-m-00000")),
+        successData.filenames().get(0));
+    Assert.assertEquals(ObjectUtils.pathToKey(new Path(output, "part-m-00001")),
+        successData.filenames().get(1));
+
+    for (String partFileKey : successData.filenames()) {
+      ObjectInfo objectInfo = storage.head(partFileKey);
+      Assert.assertNotNull("Output file should be existing", objectInfo);
+      byteSizes += objectInfo.size();
+    }
+
+    Assert.assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* total 1000 rows */);
+  }
+
+  @Test
+  public void testTeraSort() throws Exception {
+    Path teraGenPath = new Path(testDataPath, "teraGen").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path inputPath = new Path(teraGenPath, "output");
+    Path outputPath = new Path(teraGenPath, "sortOutput");
+    JobConf jobConf = new JobConf(yarnCluster.getConfig());
+    jobConf.addResource(conf);
+    jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+    jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
+    jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
+    String[] args = new String[]{inputPath.toString(), outputPath.toString()};
+    int result = ToolRunner.run(jobConf, new TeraSort(), args);
+    Assert.assertEquals(String.format("terasort %s", StringUtils.join(" ", args)), 0, result);
+
+    // Verify the success data.
+    ObjectStorage storage = ObjectStorageFactory
+        .create(outputPath.toUri().getScheme(), outputPath.toUri().getAuthority(), conf);
+    int byteSizes = 0;
+
+    Path success = new Path(outputPath, CommitUtils._SUCCESS);
+    byte[] serializedData = CommitUtils.load(fs, success);
+    SuccessData successData = SuccessData.deserialize(serializedData);
+    Assert.assertTrue("Should execute successfully", successData.success());
+    // Assert the destination paths.
+    Assert.assertEquals(1, successData.filenames().size());
+    successData.filenames().sort(String::compareTo);
+    Assert.assertEquals(ObjectUtils.pathToKey(new Path(outputPath, "part-r-00000")), successData.filenames().get(0));
+
+    for (String partFileKey : successData.filenames()) {
+      ObjectInfo objectInfo = storage.head(partFileKey);
+      Assert.assertNotNull("Output file should be existing", objectInfo);
+      byteSizes += objectInfo.size();
+    }
+
+    Assert.assertEquals(byteSizes, 100 /* Each row 100 bytes */ * 1000 /* total 1000 rows */);
+  }
+
+  @Test
+  public void testWordCount() throws Exception {
+    Path wordCountPath = new Path(testDataPath, "wc").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path output = new Path(wordCountPath, "output");
+    Path input = new Path(wordCountPath, "input");
+    JobConf jobConf = new JobConf(yarnCluster.getConfig());
+    jobConf.addResource(conf);
+    jobConf.setInt(TeraSortConfigKeys.SAMPLE_SIZE.key(), 1000);
+    jobConf.setInt(TeraSortConfigKeys.NUM_PARTITIONS.key(), 10);
+    jobConf.setBoolean(TeraSortConfigKeys.USE_SIMPLE_PARTITIONER.key(), false);
+
+    if (!fs.mkdirs(input)) {
+      throw new IOException("Mkdirs failed to create " + input.toString());
+    }
+
+    DataOutputStream file = fs.create(new Path(input, "part-0"));
+    file.writeBytes("a a b c");
+    file.close();
+
+    String[] args = new String[]{input.toString(), output.toString()};
+    int result = ToolRunner.run(jobConf, new WordCount(), args);
+    Assert.assertEquals(String.format("WordCount %s", StringUtils.join(" ", args)), 0, result);
+
+    // Verify the success path.
+    Assert.assertTrue(fs.exists(new Path(output, CommitUtils._SUCCESS)));
+    Assert.assertTrue(fs.exists(new Path(output, "part-00000")));
+
+    Path success = new Path(output, CommitUtils._SUCCESS);
+    Assert.assertTrue("Success file must be not empty", CommitUtils.load(fs, success).length != 0);
+
+    byte[] serializedData = CommitUtils.load(fs, new Path(output, "part-00000"));
+    String outputAsStr = new String(serializedData);
+    Map<String, Integer> resAsMap = getResultAsMap(outputAsStr);
+    Assert.assertEquals(2, (int) resAsMap.get("a"));
+    Assert.assertEquals(1, (int) resAsMap.get("b"));
+    Assert.assertEquals(1, (int) resAsMap.get("c"));
+  }
+
+  private Map<String, Integer> getResultAsMap(String outputAsStr) {
+    Map<String, Integer> result = new HashMap<>();
+    for (String line : outputAsStr.split("\n")) {
+      String[] tokens = line.split("\t");
+      Assert.assertTrue(String.format("Not enough tokens in in string %s from output %s", line, outputAsStr),
+          tokens.length > 1);
+      result.put(tokens[0], Integer.parseInt(tokens[1]));
+    }
+    return result;
+  }
+}

+ 29 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestCommitter.java

@@ -0,0 +1,29 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+
+public class TestCommitter extends CommitterTestBase {
+  @Override
+  protected Configuration newConf() {
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", String.format("tos://%s", ParseUtils.envAsString("TOS_BUCKET", false)));
+    return conf;
+  }
+}

+ 49 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMRJob.java

@@ -0,0 +1,49 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.conf.TosKeys;
+import org.apache.hadoop.fs.tosfs.object.tos.TOS;
+import org.apache.hadoop.fs.tosfs.util.ParseUtils;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.BeforeClass;
+
+import java.io.IOException;
+
+public class TestMRJob extends MRJobTestBase {
+
+  @BeforeClass
+  public static void beforeClass() throws IOException {
+    // Create the new configuration and set it to the IT Case.
+    Configuration newConf = new Configuration();
+    newConf.set("fs.defaultFS", String.format("tos://%s", TestUtility.bucket()));
+    // Application in yarn cluster cannot read the environment variables from user bash, so here we
+    // set it into the config manually.
+    newConf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("tos"),
+        ParseUtils.envAsString(TOS.ENV_TOS_ENDPOINT, false));
+    newConf.set(TosKeys.FS_TOS_ACCESS_KEY_ID,
+        ParseUtils.envAsString(TOS.ENV_TOS_ACCESS_KEY_ID, false));
+    newConf.set(TosKeys.FS_TOS_SECRET_ACCESS_KEY,
+        ParseUtils.envAsString(TOS.ENV_TOS_SECRET_ACCESS_KEY, false));
+
+    MRJobTestBase.setConf(newConf);
+    // Continue to prepare the IT Case environments.
+    MRJobTestBase.beforeClass();
+  }
+}

+ 1 - 1
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java

@@ -186,7 +186,7 @@ public class TestMagicOutputStream extends ObjectStorageTestBase {
   private class TestingMagicOutputStream extends MagicOutputStream {
 
     TestingMagicOutputStream(Path magic) {
-      super(fs, storage, threadPool, protonConf, magic);
+      super(fs, storage, threadPool, tosConf, magic);
     }
 
     protected void persist(Path p, byte[] data) {

+ 364 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/CommitterTestBase.java

@@ -0,0 +1,364 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
+import org.apache.hadoop.fs.tosfs.commit.Pending;
+import org.apache.hadoop.fs.tosfs.commit.PendingSet;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public abstract class CommitterTestBase {
+  private Configuration conf;
+  private FileSystem fs;
+  private Path outputPath;
+  private TaskAttemptID taskAttempt0;
+  private Path reportDir;
+
+  @Before
+  public void setup() throws IOException {
+    conf = newConf();
+    fs = FileSystem.get(conf);
+    String uuid = UUIDUtils.random();
+    outputPath = fs.makeQualified(new Path("/test/" + uuid));
+    taskAttempt0 = JobSuite.createTaskAttemptId(randomTrimmedJobId(), 0);
+
+    reportDir = fs.makeQualified(new Path("/report/" + uuid));
+    fs.mkdirs(reportDir);
+    conf.set(org.apache.hadoop.fs.tosfs.commit.Committer.COMMITTER_SUMMARY_REPORT_DIR,
+        reportDir.toUri().toString());
+  }
+
+  protected abstract Configuration newConf();
+
+  @After
+  public void teardown() {
+    CommonUtils.runQuietly(() -> fs.delete(outputPath, true));
+    IOUtils.closeStream(fs);
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    List<String> committerThreads = Thread.getAllStackTraces().keySet()
+        .stream()
+        .map(Thread::getName)
+        .filter(n -> n.startsWith(org.apache.hadoop.fs.tosfs.commit.Committer.THREADS_PREFIX))
+        .collect(Collectors.toList());
+    Assert.assertTrue("Outstanding committer threads", committerThreads.isEmpty());
+  }
+
+  private static String randomTrimmedJobId() {
+    SimpleDateFormat formatter = new SimpleDateFormat("yyyyMMdd");
+    return String.format("%s%04d_%04d", formatter.format(new Date()),
+        (long) (Math.random() * 1000),
+        (long) (Math.random() * 1000));
+  }
+
+  private static String randomFormedJobId() {
+    return String.format("job_%s", randomTrimmedJobId());
+  }
+
+  @Test
+  public void testSetupJob() throws IOException {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Setup job.
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+  }
+
+  @Test
+  public void testSetupJobWithOrphanPaths() throws IOException, InterruptedException {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Orphan success marker.
+    Path successPath = CommitUtils.successMarker(outputPath);
+    CommitUtils.save(fs, successPath, new byte[]{});
+    Assert.assertTrue(fs.exists(successPath));
+
+    // Orphan job path.
+    Path jobPath = CommitUtils.magicJobPath(suite.committer().jobId(), outputPath);
+    fs.mkdirs(jobPath);
+    Assert.assertTrue("The job path should be existing", fs.exists(jobPath));
+    Path subPath = new Path(jobPath, "tmp.pending");
+    CommitUtils.save(fs, subPath, new byte[]{});
+    Assert.assertTrue("The sub path under job path should be existing.", fs.exists(subPath));
+    FileStatus jobPathStatus = fs.getFileStatus(jobPath);
+
+    Thread.sleep(1000L);
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    Assert.assertFalse("Should have deleted the success path", fs.exists(successPath));
+    Assert.assertTrue("Should have re-created the job path", fs.exists(jobPath));
+    Assert.assertFalse("Should have deleted the sub path under the job path", fs.exists(subPath));
+  }
+
+  @Test
+  public void testSetupTask() throws IOException {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // Remaining attempt task path.
+    Path taskAttemptBasePath = CommitUtils.magicTaskAttemptBasePath(suite.taskAttemptContext(), outputPath);
+    Path subTaskAttemptPath = new Path(taskAttemptBasePath, "tmp.pending");
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup job.
+    suite.setupJob();
+    suite.assertHasMagicKeys();
+    // It will clear all the job path once we've set up the job.
+    Assert.assertFalse(fs.exists(taskAttemptBasePath));
+    Assert.assertFalse(fs.exists(subTaskAttemptPath));
+
+    // Left some the task paths.
+    CommitUtils.save(fs, subTaskAttemptPath, new byte[]{});
+    Assert.assertTrue(fs.exists(taskAttemptBasePath));
+    Assert.assertTrue(fs.exists(subTaskAttemptPath));
+
+    // Setup task.
+    suite.setupTask();
+    Assert.assertFalse(fs.exists(subTaskAttemptPath));
+  }
+
+  @Test
+  public void testCommitTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    // Setup job
+    suite.setupJob();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicKeys();
+
+    // Setup task
+    suite.setupTask();
+
+    // Write records.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+    suite.writeOutput();
+    suite.dumpObjectStorage();
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    Assert.assertEquals(suite.destPartKey(), pending.destKey());
+    Assert.assertEquals(20, pending.length());
+    Assert.assertEquals(1, pending.parts().size());
+
+    // Commit the task.
+    suite.commitTask();
+
+    // Verify the pending set file.
+    suite.assertHasPendingSet();
+    // Assert the pending set file content.
+    Path pendingSetPath = suite.magicPendingSetPath();
+    byte[] pendingSetData = CommitUtils.load(suite.fs(), pendingSetPath);
+    PendingSet pendingSet = PendingSet.deserialize(pendingSetData);
+    Assert.assertEquals(suite.job().getJobID().toString(), pendingSet.jobId());
+    Assert.assertEquals(1, pendingSet.commits().size());
+    Assert.assertEquals(pending, pendingSet.commits().get(0));
+    Assert.assertEquals(pendingSet.extraData(),
+        ImmutableMap.of(CommitUtils.TASK_ATTEMPT_ID, suite.taskAttemptContext().getTaskAttemptID().toString()));
+
+    // Complete the multipart upload and verify the results.
+    ObjectStorage storage = suite.storage();
+    storage.completeUpload(pending.destKey(), pending.uploadId(), pending.parts());
+    suite.verifyPartContent();
+  }
+
+  @Test
+  public void testAbortTask() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+
+    // Pre-check before the output write.
+    suite.assertNoMagicPendingFile();
+    suite.assertMultipartUpload(0);
+
+    // Execute the output write.
+    suite.writeOutput();
+
+    // Post-check after the output write.
+    suite.assertHasMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(1);
+    // Assert the pending file content.
+    Path pendingPath = suite.magicPendingPath();
+    byte[] pendingData = CommitUtils.load(suite.fs(), pendingPath);
+    Pending pending = Pending.deserialize(pendingData);
+    Assert.assertEquals(suite.destPartKey(), pending.destKey());
+    Assert.assertEquals(20, pending.length());
+    Assert.assertEquals(1, pending.parts().size());
+
+    // Abort the task.
+    suite.abortTask();
+
+    // Verify the state after aborting task.
+    suite.assertNoMagicPendingFile();
+    suite.assertNoMagicMultipartUpload();
+    suite.assertMultipartUpload(0);
+    suite.assertNoTaskAttemptPath();
+  }
+
+  @Test
+  public void testCommitJob() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.assertSummaryReport(reportDir);
+    suite.verifyPartContent();
+  }
+
+
+  @Test
+  public void testCommitJobFailed() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job.
+    suite.assertNoPartFiles();
+    suite.commitJob();
+  }
+
+  @Test
+  public void testTaskCommitAfterJobCommit() throws Exception {
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+    suite.setupJob();
+    suite.setupTask();
+    suite.writeOutput();
+    suite.commitTask();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+
+    // Commit the task again.
+    Assert.assertThrows(FileNotFoundException.class, suite::commitTask);
+  }
+
+  @Test
+  public void testTaskCommitWithConsistentJobId() throws Exception {
+    Configuration conf = newConf();
+    String consistentJobId = randomFormedJobId();
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    JobSuite suite = JobSuite.create(conf, taskAttempt0, outputPath);
+    Assume.assumeFalse(suite.skipTests());
+
+    // By now, we have two "jobId"s, one is spark uuid, and the other is the jobId in taskAttempt.
+    // The job committer will adopt the former.
+    suite.setupJob();
+
+    // Next, we clear spark uuid, and set the jobId of taskAttempt to another value. In this case,
+    // the committer will take the jobId of taskAttempt as the final jobId, which is not consistent
+    // with the one that committer holds.
+    conf.unset(CommitUtils.SPARK_WRITE_UUID);
+    JobConf jobConf = new JobConf(conf);
+    String anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId1 =
+        JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    final TaskAttemptContext attemptContext1 =
+        JobSuite.createTaskAttemptContext(jobConf, taskAttemptId1, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    Assert.assertThrows("JobId set in the context", IllegalArgumentException.class,
+        () -> suite.setupTask(attemptContext1));
+
+    // Even though we use another taskAttempt, as long as we ensure the spark uuid is consistent,
+    // the jobId in committer is consistent.
+    conf.set(CommitUtils.SPARK_WRITE_UUID, consistentJobId);
+    conf.set(FileOutputFormat.OUTDIR, outputPath.toString());
+    jobConf = new JobConf(conf);
+    anotherJobId = randomTrimmedJobId();
+    TaskAttemptID taskAttemptId2 =
+        JobSuite.createTaskAttemptId(anotherJobId, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+    TaskAttemptContext attemptContext2 =
+        JobSuite.createTaskAttemptContext(jobConf, taskAttemptId2, JobSuite.DEFAULT_APP_ATTEMPT_ID);
+
+    suite.setupTask(attemptContext2);
+    // Write output must use the same task context with setup task.
+    suite.writeOutput(attemptContext2);
+    // Commit task must use the same task context with setup task.
+    suite.commitTask(attemptContext2);
+    suite.assertPendingSetAtRightLocation();
+
+    // Commit the job
+    suite.assertNoPartFiles();
+    suite.commitJob();
+
+    // Verify the output.
+    suite.assertNoMagicMultipartUpload();
+    suite.assertNoMagicObjectKeys();
+    suite.assertSuccessMarker();
+    suite.verifyPartContent();
+  }
+}

+ 228 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/JobSuite.java

@@ -0,0 +1,228 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.mapred;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.commit.BaseJobSuite;
+import org.apache.hadoop.fs.tosfs.commit.CommitUtils;
+import org.apache.hadoop.fs.tosfs.commit.SuccessData;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.JobContextImpl;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TextOutputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class JobSuite extends BaseJobSuite {
+  private static final Logger LOG = LoggerFactory.getLogger(JobSuite.class);
+  private final JobContext jobContext;
+  private final TaskAttemptContext taskAttemptContext;
+  private final Committer committer;
+
+  private JobSuite(FileSystem fs, JobConf conf,
+                   TaskAttemptID taskAttemptId, int appAttemptId, Path outputPath)
+      throws IOException {
+    this.fs = fs;
+    // Initialize the job instance.
+    this.job = Job.getInstance(conf);
+    job.setJobID(JobID.forName(CommitUtils.buildJobId(conf, taskAttemptId.getJobID())));
+    this.jobContext = createJobContext(conf, taskAttemptId);
+    this.taskAttemptContext = createTaskAttemptContext(conf, taskAttemptId, appAttemptId);
+    this.jobId = CommitUtils.buildJobId(jobContext);
+
+    // Set job output directory.
+    FileOutputFormat.setOutputPath(conf, outputPath);
+    this.outputPath = outputPath;
+    this.storage = ObjectStorageFactory.create(outputPath.toUri().getScheme(),
+        outputPath.toUri().getAuthority(), conf);
+
+    // Initialize committer.
+    this.committer = new Committer();
+    this.committer.setupTask(taskAttemptContext);
+  }
+
+  public static JobSuite create(Configuration conf, TaskAttemptID taskAttemptId, Path outDir)
+      throws IOException {
+    FileSystem fs = outDir.getFileSystem(conf);
+    return new JobSuite(fs, new JobConf(conf), taskAttemptId, DEFAULT_APP_ATTEMPT_ID, outDir);
+  }
+
+  public static TaskAttemptID createTaskAttemptId(String trimmedJobId, int attemptId) {
+    String attempt = String.format("attempt_%s_m_000000_%d", trimmedJobId, attemptId);
+    return TaskAttemptID.forName(attempt);
+  }
+
+  public static JobContext createJobContext(JobConf jobConf, TaskAttemptID taskAttemptId) {
+    return new JobContextImpl(jobConf, taskAttemptId.getJobID());
+  }
+
+  public static TaskAttemptContext createTaskAttemptContext(
+      JobConf jobConf, TaskAttemptID taskAttemptId, int appAttemptId) throws IOException {
+    // Set the key values for job configuration.
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, taskAttemptId.toString());
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, appAttemptId);
+    jobConf.set("mapred.output.committer.class",
+        Committer.class.getName()); // 2x and 3x newApiCommitter=false.
+    return new TaskAttemptContextImpl(jobConf, taskAttemptId);
+  }
+
+  public void setupJob() throws IOException {
+    committer.setupJob(jobContext);
+  }
+
+  public void setupTask() throws IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a different
+  // taskAttemptContext, e.g., for a spark job.
+  public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
+    committer.setupTask(taskAttemptContext);
+  }
+
+  public void writeOutput() throws Exception {
+    writeOutput(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a different
+  // taskAttemptContext, e.g., for a spark job.
+  public void writeOutput(TaskAttemptContext taskAttemptContext) throws Exception {
+    RecordWriter<Object, Object> writer = new TextOutputFormat<>().getRecordWriter(fs,
+        taskAttemptContext.getJobConf(),
+        CommitUtils.buildJobId(taskAttemptContext),
+        taskAttemptContext.getProgressible());
+    NullWritable nullKey = NullWritable.get();
+    NullWritable nullVal = NullWritable.get();
+    Object[] keys = new Object[]{KEY_1, nullKey, null, nullKey, null, KEY_2};
+    Object[] vals = new Object[]{VAL_1, nullVal, null, null, nullVal, VAL_2};
+    try {
+      Assert.assertEquals(keys.length, vals.length);
+      for (int i = 0; i < keys.length; i++) {
+        writer.write(keys[i], vals[i]);
+      }
+    } finally {
+      writer.close(Reporter.NULL);
+    }
+  }
+
+  public boolean needsTaskCommit() throws IOException {
+    return committer.needsTaskCommit(taskAttemptContext);
+  }
+
+  public void commitTask() throws IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  // This method simulates the scenario that the job may set up task with a different
+  // taskAttemptContext, e.g., for a spark job.
+  public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
+    committer.commitTask(taskAttemptContext);
+  }
+
+  public void abortTask() throws IOException {
+    committer.abortTask(taskAttemptContext);
+  }
+
+  public void commitJob() throws IOException {
+    committer.commitJob(jobContext);
+  }
+
+  @Override
+  public Path magicPartPath() {
+    return new Path(committer.getWorkPath(), committer.jobId());
+  }
+
+  @Override
+  public Path magicPendingSetPath() {
+    return CommitUtils.magicTaskPendingSetPath(taskAttemptContext, outputPath);
+  }
+
+  public TaskAttemptContext taskAttemptContext() {
+    return taskAttemptContext;
+  }
+
+  public Committer committer() {
+    return committer;
+  }
+
+  @Override
+  public void assertNoTaskAttemptPath() throws IOException {
+    Path path = CommitUtils.magicTaskAttemptBasePath(taskAttemptContext, outputPath);
+    Assert.assertFalse("Task attempt path should be not existing", fs.exists(path));
+    String pathToKey = ObjectUtils.pathToKey(path);
+    Assert.assertNull("Should have no task attempt path key", storage.head(pathToKey));
+  }
+
+  @Override
+  protected boolean skipTests() {
+    return storage.bucket().isDirectory();
+  }
+
+  @Override
+  public void assertSuccessMarker() throws IOException {
+    Path succPath = CommitUtils.successMarker(outputPath);
+    Assert.assertTrue(String.format("%s should be exists", succPath), fs.exists(succPath));
+    SuccessData successData = SuccessData.deserialize(CommitUtils.load(fs, succPath));
+    Assert.assertEquals(SuccessData.class.getName(), successData.name());
+    Assert.assertTrue(successData.success());
+    Assert.assertEquals(NetUtils.getHostname(), successData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, successData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()),
+        successData.description());
+    Assert.assertEquals(job.getJobID().toString(), successData.jobId());
+    Assert.assertEquals(1, successData.filenames().size());
+    Assert.assertEquals(destPartKey(), successData.filenames().get(0));
+  }
+
+  @Override
+  public void assertSummaryReport(Path reportDir) throws IOException {
+    Path reportPath = CommitUtils.summaryReport(reportDir, job().getJobID().toString());
+    Assert.assertTrue(String.format("%s should be exists", reportPath), fs.exists(reportPath));
+    SuccessData reportData = SuccessData.deserialize(CommitUtils.load(fs, reportPath));
+    Assert.assertEquals(SuccessData.class.getName(), reportData.name());
+    Assert.assertTrue(reportData.success());
+    Assert.assertEquals(NetUtils.getHostname(), reportData.hostname());
+    Assert.assertEquals(CommitUtils.COMMITTER_NAME, reportData.committer());
+    Assert.assertEquals(
+        String.format("Task committer %s", taskAttemptContext.getTaskAttemptID()),
+        reportData.description());
+    Assert.assertEquals(job.getJobID().toString(), reportData.jobId());
+    Assert.assertEquals(1, reportData.filenames().size());
+    Assert.assertEquals(destPartKey(), reportData.filenames().get(0));
+    Assert.assertEquals("clean", reportData.diagnostics().get("stage"));
+  }
+}

+ 29 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/mapred/TestCommitter.java

@@ -0,0 +1,29 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+
+public class TestCommitter extends CommitterTestBase {
+  @Override
+  protected Configuration newConf() {
+    Configuration conf = new Configuration();
+    conf.set("fs.defaultFS", String.format("tos://%s", TestUtility.bucket()));
+    return conf;
+  }
+}

+ 3 - 3
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectStorageTestBase.java

@@ -38,7 +38,7 @@ import java.io.IOException;
 public class ObjectStorageTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(ObjectStorageTestBase.class);
   protected Configuration conf;
-  protected Configuration protonConf;
+  protected Configuration tosConf;
   protected Path testDir;
   protected FileSystem fs;
   protected String scheme;
@@ -55,14 +55,14 @@ public class ObjectStorageTestBase {
     conf = new Configuration();
     conf.set(ConfKeys.FS_OBJECT_STORAGE_ENDPOINT.key("filestore"), tempDirPath);
     conf.set("fs.filestore.impl", LocalFileSystem.class.getName());
-    protonConf = new Configuration(conf);
+    tosConf = new Configuration(conf);
     // Set the environment variable for ObjectTestUtils#assertObject
     TestUtility.setSystemEnv(FileStore.ENV_FILE_STORAGE_ROOT, tempDirPath);
 
     testDir = new Path("filestore://" + FileStore.DEFAULT_BUCKET + "/", UUIDUtils.random());
     fs = testDir.getFileSystem(conf);
     scheme = testDir.toUri().getScheme();
-    storage = ObjectStorageFactory.create(scheme, testDir.toUri().getAuthority(), protonConf);
+    storage = ObjectStorageFactory.create(scheme, testDir.toUri().getAuthority(), tosConf);
   }
 
   @After

+ 12 - 12
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java

@@ -73,7 +73,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
       for (int i = 0; i < 3; i++) {
         tmpDirs.add(tmp.newDir());
       }
-      Configuration newConf = new Configuration(protonConf);
+      Configuration newConf = new Configuration(tosConf);
       newConf.set(ConfKeys.FS_MULTIPART_STAGING_DIR.key("filestore"), Joiner.on(",").join(tmpDirs));
 
       // Start multiple threads to open streams to create staging dir.
@@ -91,7 +91,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   @Test
   public void testWriteZeroByte() throws IOException {
     Path zeroByteTxt = path("zero-byte.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, zeroByteTxt, true);
     // write zero-byte and close.
     out.write(new byte[0], 0, 0);
     out.close();
@@ -104,7 +104,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   @Test
   public void testWriteZeroByteWithoutAllowPut() throws IOException {
     Path zeroByteTxt = path("zero-byte-without-allow-put.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, false);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, zeroByteTxt, false);
     // write zero-byte and close.
     out.close();
     assertStagingPart(0, out.stagingParts());
@@ -116,7 +116,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   @Test
   public void testDeleteStagingFileWhenUploadPartsOK() throws IOException {
     Path path = path("data.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, path, true);
     byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 2));
     out.write(data);
     out.waitForPartsUpload();
@@ -132,7 +132,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   @Test
   public void testDeleteStagingFileWithClose() throws IOException {
     Path path = path("data.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, path, true);
     byte[] data = TestUtility.rand((int) (ConfKeys.FS_MULTIPART_SIZE_DEFAULT * 2));
     out.write(data);
     out.close();
@@ -144,7 +144,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   @Test
   public void testDeleteSimplePutStagingFile() throws IOException {
     Path smallTxt = path("small.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, smallTxt, true);
     byte[] data = TestUtility.rand(4 << 20);
     out.write(data);
     for (StagingPart part : out.stagingParts()) {
@@ -159,7 +159,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   @Test
   public void testSimplePut() throws IOException {
     Path smallTxt = path("small.txt");
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, smallTxt, true);
     byte[] data = TestUtility.rand(4 << 20);
     out.write(data);
     out.close();
@@ -171,7 +171,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   }
 
   public void testWrite(int uploadPartSize, int len) throws IOException {
-    Configuration newConf = new Configuration(protonConf);
+    Configuration newConf = new Configuration(tosConf);
     newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf, testDir.toUri())),
         uploadPartSize);
 
@@ -207,7 +207,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
 
   public void testParallelWriteOneOutPutStreamImpl(int partSize, int epochs, int batchSize)
       throws IOException, ExecutionException, InterruptedException {
-    Configuration newConf = new Configuration(protonConf);
+    Configuration newConf = new Configuration(tosConf);
     newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(FSUtils.scheme(conf, testDir.toUri())),
         partSize);
 
@@ -283,7 +283,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   }
 
   private void testMultipartThreshold(int partSize, int multipartThreshold, int dataSize) throws IOException {
-    Configuration newConf = new Configuration(protonConf);
+    Configuration newConf = new Configuration(tosConf);
     newConf.setLong(ConfKeys.FS_MULTIPART_SIZE.key(scheme), partSize);
     newConf.setLong(ConfKeys.FS_MULTIPART_THRESHOLD.key(scheme), multipartThreshold);
     Path outPath = path(String.format("threshold-%d-%d-%d.txt", partSize, multipartThreshold, dataSize));
@@ -370,7 +370,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
     int partNum = 1;
 
     byte[] data = TestUtility.rand(len);
-    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, outPath, true);
     try {
       out.write(data);
       out.close();
@@ -386,7 +386,7 @@ public class TestObjectOutputStream extends ObjectStorageTestBase {
   public void testWriteClosedStream() throws IOException {
     byte[] data = TestUtility.rand(10);
     Path outPath = path("testWriteClosedStream.txt");
-    try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true)) {
+    try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, tosConf, outPath, true)) {
       out.close();
       out.write(data);
     } catch (IllegalStateException e) {