Browse Source

Integration of TOS: Add test cases of ObjectOutputStream, MagicOutputStream, FSUtils.

lijinglun 7 months ago
parent
commit
43ed890e45

+ 196 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/commit/TestMagicOutputStream.java

@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.common.ThreadPools;
+import org.apache.hadoop.fs.tosfs.object.MultipartUpload;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageTestBase;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.fs.tosfs.object.staging.StagingPart;
+import org.apache.hadoop.fs.tosfs.object.staging.State;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+public class TestMagicOutputStream extends ObjectStorageTestBase {
+
+  private static ExecutorService threadPool;
+
+  @BeforeClass
+  public static void beforeClass() {
+    threadPool = ThreadPools.newWorkerPool("TestMagicOutputStream-pool");
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (!threadPool.isShutdown()) {
+      threadPool.shutdown();
+    }
+  }
+
+  private static Path path(String p) {
+    return new Path(p);
+  }
+
+  private static Path path(Path parent, String child) {
+    return new Path(parent, child);
+  }
+
+  @Test
+  public void testCreateDestKey() {
+    Object[][] testCases = new Object[][]{
+        new Object[]{path("tos://bucket/__magic/a.txt"), "a.txt"},
+        new Object[]{path("tos://bucket/output/__magic/job-1/tasks/tasks-attempt-0/a.txt"), "output/a.txt"},
+        new Object[]{path("tos://bucket/__magic/job0/task0/__base/a.txt"), "a.txt"},
+        new Object[]{path("tos://bucket/output/__magic/job0/task0/__base/part/part-m-1000"), "output/part/part-m-1000"},
+        new Object[]{path("tos://bucket/a/b/c/__magic/__base/d/e/f"), "a/b/c/d/e/f"},
+        new Object[]{path("tos://bucket/a/b/c/__magic/d/e/f"), "a/b/c/f"},
+    };
+
+    for (Object[] input : testCases) {
+      String actualDestKey = MagicOutputStream.toDestKey((Path) input[0]);
+      Assert.assertEquals("Unexpected destination key.", actualDestKey, input[1]);
+    }
+  }
+
+  @Test
+  public void testNonMagicPath() {
+    try (MagicOutputStream ignored = new TestingMagicOutputStream(path(testDir, "non-magic"))) {
+      Assert.fail("Cannot create magic output stream for non-magic path");
+    } catch (Exception ignored) {
+    }
+  }
+
+  @Test
+  public void testWriteZeroByte() throws IOException {
+    Path magic = path(path(testDir, CommitUtils.MAGIC), "zero-byte.txt");
+    MagicOutputStream out = new TestingMagicOutputStream(magic);
+    // write zero-byte and close.
+    out.close();
+    assertStagingFiles(0, out.stagingParts());
+
+    // Read and validate the .pending contents
+    try (InputStream in = storage.get(out.pendingKey()).stream()) {
+      byte[] data = IOUtils.toByteArray(in);
+      Pending commit = Pending.deserialize(data);
+      Assert.assertEquals(storage.bucket().name(), commit.bucket());
+      Assert.assertEquals(out.destKey(), commit.destKey());
+      Assert.assertTrue(StringUtils.isNoneEmpty(commit.uploadId()));
+      Assert.assertTrue(commit.createdTimestamp() > 0);
+      Assert.assertEquals(1, commit.parts().size());
+      Assert.assertEquals(0, commit.length());
+      Assert.assertEquals(out.upload().uploadId(), commit.uploadId());
+    }
+  }
+
+  public void testWrite(int len) throws IOException {
+    Path magic = path(path(testDir, CommitUtils.MAGIC), len + ".txt");
+    int uploadPartSize = 8 << 20;
+    int partNum = (len - 1) / (8 << 20) + 1;
+
+    MagicOutputStream out = new TestingMagicOutputStream(magic);
+    byte[] data = TestUtility.rand(len);
+    out.write(data);
+    out.close();
+
+    assertStagingFiles(partNum, out.stagingParts());
+    Assert.assertEquals(ObjectUtils.pathToKey(magic) + CommitUtils.PENDING_SUFFIX, out.pendingKey());
+
+    Pending commit;
+    try (InputStream in = storage.get(out.pendingKey()).stream()) {
+      byte[] serializedData = IOUtils.toByteArray(in);
+      commit = Pending.deserialize(serializedData);
+      Assert.assertEquals(storage.bucket().name(), commit.bucket());
+      Assert.assertEquals(out.destKey(), commit.destKey());
+      Assert.assertTrue(commit.createdTimestamp() > 0);
+      Assert.assertEquals(len, commit.length());
+      Assert.assertEquals(out.upload().uploadId(), commit.uploadId());
+      // Verify the upload part list.
+      Assert.assertEquals(partNum, commit.parts().size());
+      if (!commit.parts().isEmpty()) {
+        for (int i = 0; i < partNum - 1; i += 1) {
+          Assert.assertEquals(uploadPartSize, commit.parts().get(i).size());
+        }
+        Part lastPart = commit.parts().get(partNum - 1);
+        Assert.assertTrue(lastPart.size() > 0 && lastPart.size() <= uploadPartSize);
+      }
+    }
+
+    // List multipart uploads
+    int uploadsNum = 0;
+    for (MultipartUpload upload : storage.listUploads(out.destKey())) {
+      uploadsNum += 1;
+      Assert.assertEquals(out.upload(), upload);
+    }
+    Assert.assertEquals(1L, uploadsNum);
+
+    // The target object is still not visible for object storage.
+    Assert.assertNull(storage.head(out.destKey()));
+
+    // Complete the upload and validate the content.
+    storage.completeUpload(out.destKey(), out.upload().uploadId(), commit.parts());
+    try (InputStream in = storage.get(out.destKey()).stream()) {
+      Assert.assertArrayEquals(data, IOUtils.toByteArray(in));
+    }
+  }
+
+  @Test
+  public void testWrite1MB() throws IOException {
+    testWrite(1 << 20);
+  }
+
+  @Test
+  public void testWrite24MB() throws IOException {
+    testWrite(24 << 20);
+  }
+
+  @Test
+  public void testWrite100MB() throws IOException {
+    testWrite(100 << 20);
+  }
+
+  private static void assertStagingFiles(int expectedNum, List<StagingPart> stagings) {
+    Assert.assertEquals(expectedNum, stagings.size());
+    for (StagingPart staging : stagings) {
+      Assert.assertEquals(State.CLEANED, staging.state());
+    }
+  }
+
+  private class TestingMagicOutputStream extends MagicOutputStream {
+
+    TestingMagicOutputStream(Path magic) {
+      super(fs, storage, threadPool, protonConf, magic);
+    }
+
+    protected void persist(Path p, byte[] data) {
+      storage().put(ObjectUtils.pathToKey(p), data);
+    }
+  }
+}

+ 118 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/ObjectTestUtils.java

@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+import java.util.Objects;
+
+public class ObjectTestUtils {
+
+  public static final byte[] EMPTY_BYTES = new byte[]{};
+
+  private ObjectTestUtils() {
+  }
+
+  /**
+   * Assert that all the parent directories should be existing.
+   *
+   * @param path to validate, can be directory or file.
+   */
+  public static void assertParentDirExist(Path path) throws IOException {
+    for (Path p = path.getParent(); p != null && p.getParent() != null; p = p.getParent()) {
+      assertObject(p, EMPTY_BYTES, true);
+    }
+  }
+
+  /**
+   * Assert that all the parent directories and current directory should be existing.
+   *
+   * @param path to validate, must be a directory.
+   */
+  public static void assertDirExist(Path path) throws IOException {
+    // All parent directories exist.
+    assertParentDirExist(path);
+    // The current directory exist.
+    assertObject(path, EMPTY_BYTES, true);
+  }
+
+  public static void assertObjectNotExist(Path path) throws IOException {
+    assertObjectNotExist(path, false);
+  }
+
+  public static void assertObjectNotExist(Path path, boolean isDir) throws IOException {
+    ObjectStorage store = ObjectStorageFactory.create(
+        path.toUri().getScheme(), path.toUri().getHost(), new Configuration());
+    String objectKey = ObjectUtils.pathToKey(path, isDir);
+    ObjectInfo info = store.head(objectKey);
+    Assert.assertNull(String.format("Object key %s shouldn't exist in backend storage.", objectKey), info);
+
+    store.close();
+  }
+
+  public static void assertObject(Path path, byte[] data) throws IOException {
+    assertObject(path, data, false);
+  }
+
+  public static void assertObject(Path path, byte[] data, boolean isDir) throws IOException {
+    ObjectStorage store = ObjectStorageFactory.create(
+        path.toUri().getScheme(), path.toUri().getHost(), new Configuration());
+    String objectKey = ObjectUtils.pathToKey(path, isDir);
+    // Verify the existence of object.
+    ObjectInfo info = store.head(objectKey);
+    Assert.assertNotNull(String.format("there should be an key %s in object storage", objectKey), info);
+    Assert.assertEquals(info.key(), objectKey);
+    Assert.assertEquals(data.length, info.size());
+    // Verify the data content.
+    try (InputStream in = store.get(objectKey, 0, -1).stream()) {
+      byte[] actual = IOUtils.toByteArray(in);
+      Assert.assertArrayEquals("Unexpected binary", data, actual);
+    }
+
+    store.close();
+  }
+
+  public static void assertMultipartUploadExist(Path path, String uploadId) throws IOException {
+    ObjectStorage store = ObjectStorageFactory.create(
+        path.toUri().getScheme(), path.toUri().getHost(), new Configuration());
+    String objectKey = ObjectUtils.pathToKey(path, false);
+
+    Iterator<MultipartUpload> uploadIterator = store.listUploads(objectKey).iterator();
+    Assert.assertTrue(uploadIterator.hasNext());
+    assertMultipartUploadIdExist(uploadIterator, uploadId);
+
+    store.close();
+  }
+
+  private static void assertMultipartUploadIdExist(Iterator<MultipartUpload> uploadIterator, String uploadId) {
+    boolean exist = false;
+    while (uploadIterator.hasNext()) {
+      if (Objects.equals(uploadIterator.next().uploadId(), uploadId)) {
+        exist = true;
+      }
+    }
+    Assert.assertTrue(exist);
+  }
+}

+ 407 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/object/TestObjectOutputStream.java

@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.common.ThreadPools;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.staging.StagingPart;
+import org.apache.hadoop.fs.tosfs.object.staging.State;
+import org.apache.hadoop.fs.tosfs.util.FSUtils;
+import org.apache.hadoop.fs.tosfs.util.TempFiles;
+import org.apache.hadoop.fs.tosfs.util.TestUtility;
+import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestObjectOutputStream extends ObjectStorageTestBase {
+
+  private static ExecutorService threadPool;
+
+  @BeforeClass
+  public static void beforeClass() {
+    threadPool = ThreadPools.newWorkerPool("TestObjectOutputStream-pool");
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    if (!threadPool.isShutdown()) {
+      threadPool.shutdown();
+    }
+  }
+
+  @Test
+  public void testMkStagingDir() throws ExecutionException, InterruptedException, IOException {
+    try (TempFiles tmp = TempFiles.of()) {
+      List<String> tmpDirs = Lists.newArrayList();
+      for (int i = 0; i < 3; i++) {
+        tmpDirs.add(tmp.newDir());
+      }
+      Configuration newConf = new Configuration(protonConf);
+      newConf.set(ConfKeys.MULTIPART_STAGING_DIR.format("filestore"), Joiner.on(",").join(tmpDirs));
+
+      // Start multiple threads to open streams to create staging dir.
+      List<Future<ObjectOutputStream>> futures = Collections.synchronizedList(new ArrayList<>());
+      for (int i = 0; i < 10; i++) {
+        futures.add(threadPool.submit(() ->
+            new ObjectOutputStream(storage, threadPool, newConf, path("none.txt"), true)));
+      }
+      for (Future<ObjectOutputStream> f : futures) {
+        f.get().close();
+      }
+    }
+  }
+
+  @Test
+  public void testWriteZeroByte() throws IOException {
+    Path zeroByteTxt = path("zero-byte.txt");
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, true);
+    // write zero-byte and close.
+    out.write(new byte[0], 0, 0);
+    out.close();
+    assertStagingPart(0, out.stagingParts());
+
+    // Read and validate the dest object contents
+    ObjectTestUtils.assertObject(zeroByteTxt, ObjectTestUtils.EMPTY_BYTES);
+  }
+
+  @Test
+  public void testWriteZeroByteWithoutAllowPut() throws IOException {
+    Path zeroByteTxt = path("zero-byte-without-allow-put.txt");
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, zeroByteTxt, false);
+    // write zero-byte and close.
+    out.close();
+    assertStagingPart(0, out.stagingParts());
+
+    // Read and validate the dest object content.
+    ObjectTestUtils.assertObject(zeroByteTxt, ObjectTestUtils.EMPTY_BYTES);
+  }
+
+  @Test
+  public void testDeleteStagingFileWhenUploadPartsOK() throws IOException {
+    Path path = path("data.txt");
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true);
+    byte[] data = TestUtility.rand((int) (ConfKeys.MULTIPART_SIZE_DEFAULT * 2));
+    out.write(data);
+    out.waitForPartsUpload();
+    for (StagingPart part : out.stagingParts()) {
+      Assert.assertEquals(State.CLEANED, part.state());
+    }
+    out.close();
+    for (StagingPart part : out.stagingParts()) {
+      Assert.assertEquals(State.CLEANED, part.state());
+    }
+  }
+
+  @Test
+  public void testDeleteStagingFileWithClose() throws IOException {
+    Path path = path("data.txt");
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, path, true);
+    byte[] data = TestUtility.rand((int) (ConfKeys.MULTIPART_SIZE_DEFAULT * 2));
+    out.write(data);
+    out.close();
+    for (StagingPart part : out.stagingParts()) {
+      Assert.assertEquals(State.CLEANED, part.state());
+    }
+  }
+
+  @Test
+  public void testDeleteSimplePutStagingFile() throws IOException {
+    Path smallTxt = path("small.txt");
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true);
+    byte[] data = TestUtility.rand(4 << 20);
+    out.write(data);
+    for (StagingPart part : out.stagingParts()) {
+      Assert.assertTrue(part.size() > 0);
+    }
+    out.close();
+    for (StagingPart part : out.stagingParts()) {
+      Assert.assertEquals(State.CLEANED, part.state());
+    }
+  }
+
+  @Test
+  public void testSimplePut() throws IOException {
+    Path smallTxt = path("small.txt");
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, smallTxt, true);
+    byte[] data = TestUtility.rand(4 << 20);
+    out.write(data);
+    out.close();
+    assertStagingPart(1, out.stagingParts());
+    assertNull("Should use the simple PUT to upload object for small file.", out.upload());
+
+    // Read and validate the dest object content.
+    ObjectTestUtils.assertObject(smallTxt, data);
+  }
+
+  public void testWrite(int uploadPartSize, int len) throws IOException {
+    Configuration newConf = new Configuration(protonConf);
+    newConf.setLong(ConfKeys.MULTIPART_SIZE.format(FSUtils.scheme(conf, testDir.toUri())),
+        uploadPartSize);
+
+    Path outPath = path(len + ".txt");
+    int partNum = (len - 1) / uploadPartSize + 1;
+
+    byte[] data = TestUtility.rand(len);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, newConf, outPath, true);
+    try {
+      out.write(data);
+    } finally {
+      out.close();
+    }
+
+    assertStagingPart(partNum, out.stagingParts());
+    ObjectTestUtils.assertObject(outPath, data);
+
+    // List multipart uploads
+    int uploadsNum = 0;
+    for (MultipartUpload ignored : storage.listUploads(out.destKey())) {
+      uploadsNum += 1;
+    }
+    Assert.assertEquals(0L, uploadsNum);
+  }
+
+  @Test
+  public void testParallelWriteOneOutPutStream() throws IOException, ExecutionException, InterruptedException {
+    testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 128);
+    testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 1 << 20);
+    testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 2 << 20);
+    testParallelWriteOneOutPutStreamImpl(5 << 20, 10, 6 << 20);
+  }
+
+  public void testParallelWriteOneOutPutStreamImpl(int partSize, int epochs, int batchSize)
+      throws IOException, ExecutionException, InterruptedException {
+    Configuration newConf = new Configuration(protonConf);
+    newConf.setLong(ConfKeys.MULTIPART_SIZE.format(FSUtils.scheme(conf, testDir.toUri())),
+        partSize);
+
+    String file = String.format("%d-%d-%d-testParallelWriteOneOutPutStream.txt", partSize, epochs, batchSize);
+    Path outPath = path(file);
+    try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, newConf, outPath, true)) {
+      List<Future<?>> futures = new ArrayList<>();
+      for (int i = 0; i < epochs; i++) {
+        final int index = i;
+        futures.add(threadPool.submit(() -> {
+          try {
+            out.write(dataset(batchSize, index));
+          } catch (IOException e) {
+            throw new RuntimeException(e);
+          }
+        }));
+      }
+
+      // wait for all tasks finished
+      for (Future<?> future : futures) {
+        future.get();
+      }
+    }
+
+    try (InputStream inputStream = storage.get(ObjectUtils.pathToKey(outPath)).stream()) {
+      List<byte[]> ret = new ArrayList<>();
+      byte[] data = new byte[batchSize];
+      while (inputStream.read(data) != -1) {
+        ret.add(data);
+        data = new byte[batchSize];
+      }
+
+      assertEquals(epochs, ret.size());
+      List<byte[]> sortedRet = ret.stream()
+          .sorted(Comparator.comparingInt(o -> o[0]))
+          .collect(Collectors.toList());
+
+      int j = 0;
+      for (byte[] e : sortedRet) {
+        assertArrayEquals(dataset(batchSize, j), e);
+        j++;
+      }
+    }
+  }
+
+  public static byte[] dataset(int len, int base) {
+    byte[] dataset = new byte[len];
+    for (int i = 0; i < len; i++) {
+      dataset[i] = (byte) (base);
+    }
+    return dataset;
+  }
+
+  @Test
+  public void testWrite1MB() throws IOException {
+    testWrite(5 << 20, 1 << 20);
+    testWrite(8 << 20, 1 << 20);
+    testWrite(16 << 20, 1 << 20);
+  }
+
+  @Test
+  public void testWrite24MB() throws IOException {
+    testWrite(5 << 20, 24 << 20);
+    testWrite(8 << 20, 24 << 20);
+    testWrite(16 << 20, 24 << 20);
+  }
+
+  @Test
+  public void testWrite100MB() throws IOException {
+    testWrite(5 << 20, 100 << 20);
+    testWrite(8 << 20, 100 << 20);
+    testWrite(16 << 20, 100 << 20);
+  }
+
+  private void testMultipartThreshold(int partSize, int multipartThreshold, int dataSize) throws IOException {
+    Configuration newConf = new Configuration(protonConf);
+    newConf.setLong(ConfKeys.MULTIPART_SIZE.format(scheme), partSize);
+    newConf.setLong(ConfKeys.MULTIPART_THRESHOLD.format(scheme), multipartThreshold);
+    Path outPath = path(String.format("threshold-%d-%d-%d.txt", partSize, multipartThreshold, dataSize));
+
+    byte[] data = TestUtility.rand(dataSize);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, newConf, outPath, true);
+    try {
+      // Verify for every 1MB data writing, unless reaching the threshold.
+      int upperLimit = Math.min(multipartThreshold, dataSize);
+      int curOff = 0;
+      for (; curOff < upperLimit; curOff += (1 << 20)) {
+        int end = Math.min(curOff + (1 << 20), upperLimit);
+        out.write(Arrays.copyOfRange(data, curOff, end));
+
+        List<MultipartUpload> uploads = Lists.newArrayList(storage.listUploads(out.destKey()));
+        if (end < multipartThreshold) {
+          Assert.assertEquals("Shouldn't has any uploads because it just use simple PUT", 0, uploads.size());
+        } else {
+          Assert.assertEquals("Switch to use MPU.", 1, uploads.size());
+        }
+        assertEquals((end - 1) / partSize + 1, out.stagingParts().size());
+      }
+
+      // Verify for every 1MB data writing, unless reaching the data size.
+      for (; curOff < dataSize; curOff += (1 << 20)) {
+        int end = Math.min(curOff + (1 << 20), dataSize);
+        out.write(Arrays.copyOfRange(data, curOff, end));
+
+        List<MultipartUpload> uploads = Lists.newArrayList(storage.listUploads(out.destKey()));
+        Assert.assertEquals(1, uploads.size());
+        assertEquals(out.destKey(), uploads.get(0).key());
+        assertEquals((end - 1) / partSize + 1, out.stagingParts().size());
+      }
+    } finally {
+      out.close();
+    }
+
+    assertStagingPart((dataSize - 1) / partSize + 1, out.stagingParts());
+    ObjectTestUtils.assertObject(outPath, data);
+
+    List<MultipartUpload> uploads = Lists.newArrayList(storage.listUploads(out.destKey()));
+    Assert.assertEquals(0, uploads.size());
+  }
+
+  @Test
+  public void testMultipartThreshold2MB() throws IOException {
+    testMultipartThreshold(5 << 20, 2 << 20, 1 << 20);
+    testMultipartThreshold(5 << 20, 2 << 20, (2 << 20) - 1);
+    testMultipartThreshold(5 << 20, 2 << 20, 2 << 20);
+    testMultipartThreshold(5 << 20, 2 << 20, 4 << 20);
+    testMultipartThreshold(5 << 20, 2 << 20, 5 << 20);
+    testMultipartThreshold(5 << 20, 2 << 20, (5 << 20) + 1);
+    testMultipartThreshold(5 << 20, 2 << 20, 6 << 20);
+    testMultipartThreshold(5 << 20, 2 << 20, 10 << 20);
+    testMultipartThreshold(5 << 20, 2 << 20, 20 << 20);
+  }
+
+  @Test
+  public void testMultipartThreshold5MB() throws IOException {
+    testMultipartThreshold(5 << 20, 5 << 20, 1 << 20);
+    testMultipartThreshold(5 << 20, 5 << 20, 4 << 20);
+    testMultipartThreshold(5 << 20, 5 << 20, 5 << 20);
+    testMultipartThreshold(5 << 20, 5 << 20, 5 << 20);
+    testMultipartThreshold(5 << 20, 5 << 20, 6 << 20);
+    testMultipartThreshold(5 << 20, 5 << 20, 10 << 20);
+    testMultipartThreshold(5 << 20, 5 << 20, 20 << 20);
+  }
+
+  @Test
+  public void testMultipartThreshold10MB() throws IOException {
+    testMultipartThreshold(5 << 20, 10 << 20, 1 << 20);
+    testMultipartThreshold(5 << 20, 10 << 20, 10 << 20);
+    testMultipartThreshold(5 << 20, 10 << 20, 11 << 20);
+    testMultipartThreshold(5 << 20, 10 << 20, 15 << 20);
+    testMultipartThreshold(5 << 20, 10 << 20, 20 << 20);
+    testMultipartThreshold(5 << 20, 10 << 20, 40 << 20);
+    testMultipartThreshold(5 << 20, 10 << 20, 30 << 20);
+  }
+
+  @Test
+  public void testCloseStreamTwice() throws IOException {
+    int len = 100;
+    Path outPath = path(len + ".txt");
+    int partNum = 1;
+
+    byte[] data = TestUtility.rand(len);
+    ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true);
+    try {
+      out.write(data);
+      out.close();
+    } finally {
+      out.close();
+    }
+
+    assertStagingPart(partNum, out.stagingParts());
+    ObjectTestUtils.assertObject(outPath, data);
+  }
+
+  @Test
+  public void testWriteClosedStream() throws IOException {
+    byte[] data = TestUtility.rand(10);
+    Path outPath = path("testWriteClosedStream.txt");
+    try (ObjectOutputStream out = new ObjectOutputStream(storage, threadPool, protonConf, outPath, true)) {
+      out.close();
+      out.write(data);
+    } catch (IllegalStateException e) {
+      assertEquals("OutputStream is closed.", e.getMessage());
+    }
+  }
+
+  private static void assertStagingPart(int expectedNum, List<StagingPart> parts) {
+    Assert.assertEquals(expectedNum, parts.size());
+    for (StagingPart part : parts) {
+      Assert.assertTrue(part.size() > 0);
+    }
+  }
+
+  private Path path(String name) {
+    return new Path(testDir, name);
+  }
+}

+ 98 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TempFiles.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.List;
+
+public class TempFiles implements Closeable {
+  private final List<String> files = Lists.newArrayList();
+  private final List<String> dirs = Lists.newArrayList();
+
+  private TempFiles() {
+  }
+
+  public static TempFiles of() {
+    return new TempFiles();
+  }
+
+  public String newFile() {
+    String p = newTempFile();
+    files.add(p);
+    return p;
+  }
+
+  public String newDir() {
+    return newDir(null);
+  }
+
+  public String newDir(String prefix) {
+    String p = newTempDir(prefix);
+    dirs.add(p);
+    return p;
+  }
+
+  @Override
+  public void close() {
+    files.forEach(file -> CommonUtils.runQuietly(() -> TempFiles.deleteFile(file)));
+    files.clear();
+    dirs.forEach(dir -> CommonUtils.runQuietly(() -> TempFiles.deleteDir(dir)));
+    dirs.clear();
+  }
+
+  public static String newTempFile() {
+    return String.join(File.pathSeparator, newTempDir(), UUIDUtils.random());
+  }
+
+  public static String newTempDir() {
+    return newTempDir(null);
+  }
+
+  public static String newTempDir(String prefix) {
+    try {
+      return Files.createTempDirectory(prefix).toFile().getAbsolutePath();
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public static void deleteFile(String path) {
+    try {
+      Files.deleteIfExists(Paths.get(path));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  public static void deleteDir(String path) {
+    try {
+      FileUtils.deleteDirectory(new File(path));
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+}

+ 66 - 0
hadoop-cloud-storage-project/hadoop-tos/src/test/java/org/apache/hadoop/fs/tosfs/util/TestFSUtils.java

@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import static org.apache.hadoop.fs.FileSystem.FS_DEFAULT_NAME_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class TestFSUtils {
+  @Test
+  public void testNormalizeURI() throws URISyntaxException {
+    URI uri = new URI("tos://abc/dir/key");
+    URI normalizeURI = FSUtils.normalizeURI(uri, new Configuration());
+    assertEquals("tos", normalizeURI.getScheme());
+    assertEquals("abc", normalizeURI.getAuthority());
+    assertEquals("abc", normalizeURI.getHost());
+    assertEquals("/dir/key", normalizeURI.getPath());
+
+    uri = new URI("/abc/dir/key");
+    normalizeURI = FSUtils.normalizeURI(uri, new Configuration());
+    assertNull(uri.getScheme());
+    assertEquals("file", normalizeURI.getScheme());
+    assertNull(uri.getAuthority());
+    assertNull(normalizeURI.getAuthority());
+    assertEquals("/abc/dir/key", uri.getPath());
+    assertEquals("/", normalizeURI.getPath());
+
+    uri = new URI("tos:///abc/dir/key");
+    normalizeURI = FSUtils.normalizeURI(uri, new Configuration());
+    assertEquals("tos", uri.getScheme());
+    assertNull(uri.getAuthority());
+    assertEquals("/abc/dir/key", uri.getPath());
+    assertEquals("tos", normalizeURI.getScheme());
+    assertNull(normalizeURI.getAuthority());
+    assertEquals("/abc/dir/key", normalizeURI.getPath());
+
+    Configuration conf = new Configuration();
+    conf.set(FS_DEFAULT_NAME_KEY, "tos://bucket/");
+    normalizeURI = FSUtils.normalizeURI(uri, conf);
+    assertEquals("tos", normalizeURI.getScheme());
+    assertEquals("bucket", normalizeURI.getAuthority());
+    assertEquals("/", normalizeURI.getPath());
+  }
+}