Selaa lähdekoodia

Integration of TOS: Add ObjectOutputStream, MagicOutputStream.

lijinglun 7 kuukautta sitten
vanhempi
commit
30ba5c87e8

+ 146 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/commit/MagicOutputStream.java

@@ -0,0 +1,146 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.commit;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.object.ObjectOutputStream;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorage;
+import org.apache.hadoop.fs.tosfs.object.ObjectStorageFactory;
+import org.apache.hadoop.fs.tosfs.object.ObjectUtils;
+import org.apache.hadoop.fs.tosfs.object.Part;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+public class MagicOutputStream extends ObjectOutputStream {
+
+  private final FileSystem fs;
+  private final Path pendingPath;
+  private boolean closeStorage = false;
+
+  public MagicOutputStream(FileSystem fs, ExecutorService threadPool, Configuration conf,
+      Path magic) {
+    this(fs,
+        ObjectStorageFactory.create(magic.toUri().getScheme(), magic.toUri().getHost(), conf),
+        threadPool,
+        conf,
+        magic);
+    closeStorage = true;
+  }
+
+  public MagicOutputStream(FileSystem fs, ObjectStorage storage, ExecutorService threadPool,
+      Configuration conf, Path magic) {
+    super(storage, threadPool, conf, magic, false);
+    this.fs = fs;
+    this.pendingPath = createPendingPath(magic);
+  }
+
+  static String toDestKey(Path magicPath) {
+    Preconditions.checkArgument(isMagic(magicPath), "Destination path is not magic %s", magicPath);
+    String magicKey = ObjectUtils.pathToKey(magicPath);
+    List<String> splits = Lists.newArrayList(magicKey.split("/"));
+
+    // Break the full splits list into three collections: <parentSplits>, __magic, <childrenSplits>
+    int magicIndex = splits.indexOf(CommitUtils.MAGIC);
+    Preconditions.checkArgument(magicIndex >= 0, "Cannot locate %s in path %s", CommitUtils.MAGIC, magicPath);
+    List<String> parentSplits = splits.subList(0, magicIndex);
+    List<String> childrenSplits = splits.subList(magicIndex + 1, splits.size());
+    Preconditions.checkArgument(!childrenSplits.isEmpty(),
+        "No path found under %s for path %s", CommitUtils.MAGIC, magicPath);
+
+    // Generate the destination splits which will be joined into the destination object key.
+    List<String> destSplits = Lists.newArrayList(parentSplits);
+    if (childrenSplits.contains(CommitUtils.BASE)) {
+      // Break the <childrenDir> into three collections: <baseParentSplits>, __base, <baseChildrenSplits>, and add all
+      // <baseChildrenSplits> into the destination splits.
+      int baseIndex = childrenSplits.indexOf(CommitUtils.BASE);
+      Preconditions.checkArgument(baseIndex >= 0, "Cannot locate %s in path %s", CommitUtils.BASE, magicPath);
+      List<String> baseChildrenSplits = childrenSplits.subList(baseIndex + 1, childrenSplits.size());
+      Preconditions.checkArgument(!baseChildrenSplits.isEmpty(),
+          "No path found under %s for magic path %s", CommitUtils.BASE, magicPath);
+      destSplits.addAll(baseChildrenSplits);
+    } else {
+      // Just add the last elements of the <childrenSplits> into the destination splits.
+      String filename = childrenSplits.get(childrenSplits.size() - 1);
+      destSplits.add(filename);
+    }
+
+    return StringUtils.join(destSplits, "/");
+  }
+
+  @Override
+  protected String createDestKey(Path magicPath) {
+    return toDestKey(magicPath);
+  }
+
+  @Override
+  protected void finishUpload(String destKey, String uploadId, List<Part> parts)
+      throws IOException {
+    Pending pending = Pending.builder()
+        .setBucket(storage().bucket().name())
+        .setUploadId(uploadId)
+        .setLength(parts.stream().mapToLong(Part::size).sum())
+        .setDestKey(destKey)
+        .setCreatedTimestamp(System.currentTimeMillis())
+        .addParts(parts)
+        .build();
+
+    persist(pendingPath, pending.serialize());
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    super.close();
+    if (closeStorage) {
+      storage().close();
+    }
+  }
+
+  protected void persist(Path p, byte[] data) throws IOException {
+    CommitUtils.save(fs, p, data);
+  }
+
+  public String pendingKey() {
+    return ObjectUtils.pathToKey(pendingPath);
+  }
+
+  private static Path createPendingPath(Path magic) {
+    return new Path(magic.getParent(), String.format("%s%s", magic.getName(), CommitUtils.PENDING_SUFFIX));
+  }
+
+  // .pending and .pendingset files are not typical magic files.
+  private static boolean isInternalFile(Path p) {
+    return p.toString().endsWith(CommitUtils.PENDINGSET_SUFFIX) || p.toString().endsWith(CommitUtils.PENDING_SUFFIX);
+  }
+
+  public static boolean isMagic(Path p) {
+    Preconditions.checkNotNull(p, "path cannot be null.");
+    String path = p.toUri().getPath();
+    List<String> splits = Arrays.stream(path.split("/"))
+        .filter(StringUtils::isNoneEmpty)
+        .collect(Collectors.toList());
+    return splits.contains(CommitUtils.MAGIC) && !isInternalFile(p);
+  }
+}

+ 39 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/conf/ConfKeys.java

@@ -18,6 +18,10 @@
 
 package org.apache.hadoop.fs.tosfs.conf;
 
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+
+import java.util.List;
+
 public class ConfKeys {
 
   /**
@@ -61,6 +65,30 @@ public class ConfKeys {
   public static final String MULTIPART_COPY_THRESHOLD = "fs.tos.multipart.copy-threshold";
   public static final long MULTIPART_COPY_THRESHOLD_DEFAULT = 5L << 20;
 
+  /**
+   * The threshold which control whether enable multipart upload during writing data to the given
+   * object storage, if the write data size is less than threshold, will write data via simple put
+   * instead of multipart upload. E.g. fs.tos.multipart.threshold.
+   */
+  public static final String MULTIPART_THRESHOLD = "fs.tos.multipart.threshold";
+  public static final long MULTIPART_THRESHOLD_DEFAULT = 10 << 20;
+
+  /**
+   * The max byte size which will buffer the staging data in-memory before flushing to the staging
+   * file. It will decrease the random write in local staging disk dramatically if writing plenty of
+   * small files.
+   */
+  public static final String MULTIPART_STAGING_BUFFER_SIZE = "fs.tos.multipart.staging-buffer-size";
+  public static final int MULTIPART_STAGING_BUFFER_SIZE_DEFAULT = 4 << 10;
+
+  /**
+   * The multipart upload part staging dir(s) of the given object storage.
+   * e.g. fs.tos.multipart.staging-dir.
+   * Separate the staging dirs with comma if there are many staging dir paths.
+   */
+  public static final String MULTIPART_STAGING_DIR = "fs.tos.multipart.staging-dir";
+  public static final String MULTIPART_STAGING_DIR_DEFAULT = defaultDir("multipart-staging-dir");
+
   /**
    * The batch size of deleting multiple objects per request for the given object storage.
    * e.g. fs.tos.delete.batch-size
@@ -90,4 +118,15 @@ public class ConfKeys {
    */
   public static final String OBJECT_STREAM_RANGE_SIZE = "proton.objectstorage.stream.range-size";
   public static final long OBJECT_STREAM_RANGE_SIZE_DEFAULT = Long.MAX_VALUE;
+
+  public static String defaultDir(String basename) {
+    String tmpdir = System.getProperty("java.io.tmpdir");
+    Preconditions.checkNotNull(tmpdir, "System property 'java.io.tmpdir' cannot be null");
+
+    if (tmpdir.endsWith("/")) {
+      return String.format("%s%s", tmpdir, basename);
+    } else {
+      return String.format("%s/%s", tmpdir, basename);
+    }
+  }
 }

+ 331 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/ObjectOutputStream.java

@@ -0,0 +1,331 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.tosfs.conf.ConfKeys;
+import org.apache.hadoop.fs.tosfs.object.staging.FileStagingPart;
+import org.apache.hadoop.fs.tosfs.object.staging.StagingPart;
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.fs.tosfs.util.UUIDUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.SequenceInputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+public class ObjectOutputStream extends OutputStream {
+  private static final Logger LOG = LoggerFactory.getLogger(ObjectOutputStream.class);
+
+  private final ObjectStorage storage;
+  private final ExecutorService uploadPool;
+  private long totalWroteSize;
+  private final String destKey;
+  private final String destScheme;
+  private final long multiUploadThreshold;
+  private final long byteSizePerPart;
+  private final int stagingBufferSize;
+  private final boolean allowPut;
+  private final List<File> stagingDirs;
+  private final List<StagingPart> stagingParts = Lists.newArrayList();
+
+  // For multipart uploads.
+  private final AtomicInteger partNumGetter = new AtomicInteger(0);
+  private MultipartUpload multipartUpload = null;
+  private final List<CompletableFuture<Part>> results = Lists.newArrayList();
+
+  private StagingPart curPart;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
+
+  public ObjectOutputStream(ObjectStorage storage, ExecutorService threadPool, Configuration conf,
+      Path dest, boolean allowPut) {
+    this.storage = storage;
+    this.uploadPool = threadPool;
+    this.destScheme = dest.toUri().getScheme();
+    this.totalWroteSize = 0;
+    this.destKey = createDestKey(dest);
+    this.multiUploadThreshold =
+        conf.getLong(ConfKeys.MULTIPART_THRESHOLD, ConfKeys.MULTIPART_THRESHOLD_DEFAULT);
+    this.byteSizePerPart = conf.getLong(ConfKeys.MULTIPART_SIZE, ConfKeys.MULTIPART_SIZE_DEFAULT);
+    this.stagingBufferSize = conf.getInt(ConfKeys.MULTIPART_STAGING_BUFFER_SIZE,
+        ConfKeys.MULTIPART_STAGING_BUFFER_SIZE_DEFAULT);
+    this.allowPut = allowPut;
+    this.stagingDirs = createStagingDirs(conf, destScheme);
+
+    if (!allowPut) {
+      this.multipartUpload = storage.createMultipartUpload(destKey);
+    }
+  }
+
+  private static List<File> createStagingDirs(Configuration conf, String scheme) {
+    String[] dirs =
+        conf.getStrings(ConfKeys.MULTIPART_STAGING_DIR, ConfKeys.MULTIPART_STAGING_DIR_DEFAULT);
+    Preconditions.checkArgument(dirs != null && dirs.length > 0, "'%s' cannot be an empty list",
+        ConfKeys.MULTIPART_STAGING_DIR);
+
+    List<File> stagingDirs = new ArrayList<>();
+    for (String dir : dirs) {
+      // Create the directory if not exist.
+      File stagingDir = new File(dir);
+      if (!stagingDir.exists() && stagingDir.mkdirs()) {
+        Preconditions.checkArgument(stagingDir.setWritable(true, false),
+            "Failed to change staging dir permission to writable, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir);
+        Preconditions.checkArgument(stagingDir.setReadable(true, false),
+            "Failed to change staging dir permission to readable, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir);
+      } else {
+        Preconditions.checkArgument(stagingDir.exists(),
+            "Failed to create staging dir, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir);
+        Preconditions.checkArgument(stagingDir.isDirectory(),
+            "Staging dir should be a directory, please check %s with value %s", ConfKeys.MULTIPART_STAGING_DIR, dir);
+      }
+      stagingDirs.add(stagingDir);
+    }
+    return stagingDirs;
+  }
+
+  private File chooseStagingDir() {
+    // Choose a random directory from the staging dirs as the candidate staging dir.
+    return stagingDirs.get(ThreadLocalRandom.current().nextInt(stagingDirs.size()));
+  }
+
+  @Override
+  public void write(int b) throws IOException {
+    write(new byte[]{(byte) b}, 0, 1);
+  }
+
+  protected String createDestKey(Path dest) {
+    return ObjectUtils.pathToKey(dest);
+  }
+
+  @Override
+  public synchronized void write(byte[] buf, int off, int len) throws IOException {
+    if (len == 0) {
+      return;
+    }
+    Preconditions.checkArgument(off >= 0 && off < buf.length,
+        "Invalid offset - off: %s, len: %s, bufferSize: %s", off, len, buf.length);
+    Preconditions.checkArgument(len >= 0 && off + len <= buf.length,
+        "Invalid length - off: %s, len: %s, bufferSize: %s", off, len, buf.length);
+    Preconditions.checkState(!closed.get(), "OutputStream is closed.");
+
+    while (len > 0) {
+      if (curPart == null) {
+        curPart = newStagingPart();
+      }
+
+      Preconditions.checkArgument(curPart.size() <= byteSizePerPart,
+          "Invalid staging size (%s) which is greater than part size (%s)", curPart.size(), byteSizePerPart);
+
+      // size is the remaining length to fill a complete upload part.
+      int size = (int) Math.min(byteSizePerPart - curPart.size(), len);
+      curPart.write(buf, off, size);
+
+      off += size;
+      len -= size;
+      totalWroteSize += size;
+
+      // Switch to the next staging part if current staging part is full.
+      if (curPart.size() >= byteSizePerPart) {
+        curPart.complete();
+
+        // Upload this part if multipart upload was triggered.
+        if (multipartUpload != null) {
+          CompletableFuture<Part> result = asyncUploadPart(curPart, partNumGetter.incrementAndGet());
+          results.add(result);
+        }
+
+        // Reset the stagingOut
+        curPart = null;
+      }
+
+      // Trigger the multipart upload when reach the configured threshold.
+      if (multipartUpload == null && totalWroteSize >= multiUploadThreshold) {
+        multipartUpload = storage.createMultipartUpload(destKey);
+        Preconditions.checkState(byteSizePerPart >= multipartUpload.minPartSize(),
+            "Configured upload part size %s must be greater than or equals to the minimal part size %s,"
+                + " please check configure key %s.",
+            byteSizePerPart, multipartUpload.minPartSize(), ConfKeys.MULTIPART_THRESHOLD.format(destScheme));
+
+        // Upload the accumulated staging files whose length >= byteSizePerPart.
+        for (StagingPart stagingPart : stagingParts) {
+          if (stagingPart.size() >= byteSizePerPart) {
+            CompletableFuture<Part> result = asyncUploadPart(stagingPart, partNumGetter.incrementAndGet());
+            results.add(result);
+          }
+        }
+      }
+    }
+  }
+
+  private CompletableFuture<Part> asyncUploadPart(final StagingPart stagingPart, final int partNum) {
+    final MultipartUpload immutableUpload = multipartUpload;
+    return CompletableFuture.supplyAsync(() -> uploadPart(stagingPart, partNum), uploadPool)
+        .whenComplete((part, err) -> {
+          stagingPart.cleanup();
+          if (err != null) {
+            LOG.error("Failed to upload part, multipartUpload: {}, partNum: {}, stagingPart: {}",
+                immutableUpload, partNum, stagingPart, err);
+          }
+        });
+  }
+
+  private CompletableFuture<Part> asyncUploadEmptyPart(final int partNum) {
+    final MultipartUpload immutableUpload = multipartUpload;
+    return CompletableFuture.supplyAsync(
+            () -> storage.uploadPart(
+                immutableUpload.key(),
+                immutableUpload.uploadId(),
+                partNum,
+                () -> new ByteArrayInputStream(new byte[0]),
+                0),
+            uploadPool)
+        .whenComplete((part, err) -> {
+          if (err != null) {
+            LOG.error("Failed to upload empty part, multipartUpload: {}, partNum: {}",
+                immutableUpload, partNum, err);
+          }
+        });
+  }
+
+  private Part uploadPart(StagingPart stagingPart, int partNum) {
+    Preconditions.checkNotNull(storage, "Object storage cannot be null.");
+    Preconditions.checkNotNull(multipartUpload, "Multipart upload is not initialized.");
+    return storage.uploadPart(multipartUpload.key(), multipartUpload.uploadId(),
+        partNum, stagingPart::newIn, stagingPart.size());
+  }
+
+  protected void finishUpload(String key, String uploadId, List<Part> parts) throws IOException {
+    storage.completeUpload(key, uploadId, parts);
+  }
+
+  private void simplePut() throws IOException {
+    if (curPart != null) {
+      curPart.complete();
+    }
+    storage.put(
+        destKey,
+        () -> stagingParts()
+            .stream()
+            .map(StagingPart::newIn)
+            .reduce(SequenceInputStream::new)
+            .orElseGet(() -> new ByteArrayInputStream(new byte[0])),
+        stagingParts().stream().mapToLong(StagingPart::size).sum());
+    // Reset the staging output stream.
+    curPart = null;
+  }
+
+  synchronized List<Part> waitForPartsUpload() {
+    Preconditions.checkArgument(multipartUpload != null, "Multipart upload cannot be null");
+    Preconditions.checkArgument(!results.isEmpty(), "Upload parts cannot be empty");
+    // Waiting for all the upload parts to be finished.
+    return results.stream()
+        .map(CompletableFuture::join)
+        .sorted(Comparator.comparing(Part::num))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (!closed.compareAndSet(false, true)) {
+      return;
+    }
+
+    try {
+      // Use the simple PUT API if wrote bytes is not reached the multipart threshold.
+      if (multipartUpload == null && allowPut) {
+        simplePut();
+        return;
+      }
+      Preconditions.checkNotNull(multipartUpload, "MultipartUpload cannot be null since allowPut was disabled.");
+
+      // Use multipart upload API to upload those parts.
+      if (totalWroteSize <= 0) {
+        // Write an empty part for this zero-byte file.
+        CompletableFuture<Part> result = asyncUploadEmptyPart(partNumGetter.incrementAndGet());
+        results.add(result);
+      } else if (curPart != null) {
+        curPart.complete();
+        // Submit the last part to upload thread pool.
+        CompletableFuture<Part> result = asyncUploadPart(curPart, partNumGetter.incrementAndGet());
+        results.add(result);
+        // Reset the staging output stream.
+        curPart = null;
+      }
+
+      // Finish the multipart uploads.
+      finishUpload(multipartUpload.key(), multipartUpload.uploadId(), waitForPartsUpload());
+
+    } catch (Exception e) {
+      LOG.error("Encountering error when closing output stream", e);
+      if (multipartUpload != null) {
+        CommonUtils.runQuietly(() -> storage.abortMultipartUpload(multipartUpload.key(), multipartUpload.uploadId()));
+      }
+      throw e;
+    } finally {
+      // Clear all the staging part.
+      deleteStagingPart(stagingParts);
+    }
+  }
+
+  public long totalWroteSize() {
+    return totalWroteSize;
+  }
+
+  public ObjectStorage storage() {
+    return storage;
+  }
+
+  public List<StagingPart> stagingParts() {
+    return stagingParts;
+  }
+
+  public String destKey() {
+    return destKey;
+  }
+
+  public MultipartUpload upload() {
+    return multipartUpload;
+  }
+
+  private void deleteStagingPart(List<StagingPart> parts) {
+    for (StagingPart part : parts) {
+      part.cleanup();
+    }
+  }
+
+  private StagingPart newStagingPart() {
+    String stagingPath = String.format("%s/staging-%s.tmp", chooseStagingDir(), UUIDUtils.random());
+    StagingPart part = new FileStagingPart(stagingPath, stagingBufferSize);
+    stagingParts.add(part);
+    return part;
+  }
+}

+ 176 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/FileStagingPart.java

@@ -0,0 +1,176 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.staging;
+
+import org.apache.hadoop.fs.tosfs.util.CommonUtils;
+import org.apache.hadoop.thirdparty.com.google.common.base.MoreObjects;
+import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.UncheckedIOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+
+public class FileStagingPart implements StagingPart {
+  private static final Logger LOG = LoggerFactory.getLogger(FileStagingPart.class);
+
+  private final Path path;
+  private final int stagingBufferSize;
+  private final StagingFileOutputStream out;
+  private State state = State.WRITABLE;
+
+  public FileStagingPart(String filePath, int stagingBufferSize) {
+    this.path = Paths.get(filePath);
+    this.stagingBufferSize = stagingBufferSize;
+    this.out = new StagingFileOutputStream(path, stagingBufferSize);
+  }
+
+  @Override
+  public synchronized void write(byte[] b, int off, int len) throws IOException {
+    Preconditions.checkState(state == State.WRITABLE,
+        "Cannot write the part since it's not writable now, state: %s", state);
+    out.write(b, off, len);
+  }
+
+  @Override
+  public synchronized void complete() throws IOException {
+    Preconditions.checkState(state == State.WRITABLE,
+        "Cannot complete the part since it's not writable now, state: %s", state);
+    out.close();
+    state = State.READABLE;
+  }
+
+  @Override
+  public synchronized InputStream newIn() {
+    Preconditions.checkState(state == State.READABLE,
+        "Cannot read the part since it's not readable now, state: %s.", state);
+    return out.newIn();
+  }
+
+  @Override
+  public synchronized long size() {
+    return out.size();
+  }
+
+  @Override
+  public synchronized State state() {
+    return state;
+  }
+
+  @Override
+  public synchronized void cleanup() {
+    if (state != State.CLEANED) {
+      try {
+        // Close the stream quietly.
+        CommonUtils.runQuietly(out::close, false);
+
+        // Delete the staging file if exists.
+        Files.deleteIfExists(path);
+      } catch (Exception e) {
+        LOG.error("Failed to delete staging file, stagingFile: {}", path, e);
+      } finally {
+        state = State.CLEANED;
+      }
+    }
+  }
+
+  @Override
+  public String toString() {
+    return MoreObjects.toStringHelper(this)
+        .add("path", path)
+        .add("stagingBufferSize", stagingBufferSize)
+        .add("wroteByteSize", size())
+        .toString();
+  }
+
+  private static class StagingFileOutputStream extends OutputStream {
+    private final Path path;
+    private byte[] buffer;
+    private boolean memBuffered;
+    private int writePos;
+    private OutputStream out;
+
+    private StagingFileOutputStream(Path path, int stagingBufferSize) {
+      this.path = path;
+      this.buffer = new byte[stagingBufferSize];
+      this.memBuffered = true;
+      this.writePos = 0;
+    }
+
+    private int size() {
+      return writePos;
+    }
+
+    public InputStream newIn() {
+      // Just wrap it as a byte array input stream if the staging bytes are still in the in-memory buffer.
+      if (memBuffered) {
+        return new ByteArrayInputStream(buffer, 0, writePos);
+      }
+
+      // Create a buffered file input stream.
+      try {
+        return new BufferedInputStream(Files.newInputStream(path));
+      } catch (IOException e) {
+        throw new UncheckedIOException(e);
+      }
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+      write(new byte[]{(byte) b}, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+      if (memBuffered && writePos + len > buffer.length) {
+        flushMemToFile();
+      }
+
+      if (memBuffered) {
+        System.arraycopy(b, off, buffer, writePos, len);
+      } else {
+        out.write(b, off, len);
+      }
+
+      writePos += len;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (out != null) {
+        out.close();
+        out = null;
+      }
+    }
+
+    private void flushMemToFile() throws IOException {
+      // Flush the buffered data to the new file OutputStream.
+      out = new BufferedOutputStream(Files.newOutputStream(path));
+      out.write(buffer, 0, writePos);
+      memBuffered = false;
+      buffer = null;
+    }
+  }
+}

+ 79 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/StagingPart.java

@@ -0,0 +1,79 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.staging;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+public interface StagingPart {
+
+  /**
+   * Write bytes into the staging part.
+   *
+   * @param b the buffer to write.
+   * @throws IOException if any IO error.
+   */
+  default void write(byte[] b) throws IOException {
+    write(b, 0, b.length);
+  }
+
+  /**
+   * Write the bytes into the staging part.
+   *
+   * @param b   the buffer to write.
+   * @param off the start offset in buffer.
+   * @param len the length.
+   * @throws IOException if any IO error.
+   */
+  void write(byte[] b, int off, int len) throws IOException;
+
+  /**
+   * Complete the writing process and cannot write more bytes once we've completed this part.
+   *
+   * @throws IOException if any IO error.
+   */
+  void complete() throws IOException;
+
+  /**
+   * The wrote size of staging part.
+   *
+   * @return the staging part size.
+   */
+  long size();
+
+  /**
+   * Access the {@link State} of this part.
+   *
+   * @return the {@link State}.
+   */
+  State state();
+
+  /**
+   * Create a separate new {@link InputStream} to read the staging part data once we've completed the
+   * writing by calling {@link StagingPart#complete()} . Call this method several times will return
+   * many {@link InputStream}s, and remember to close the newly created stream.
+   *
+   * @return a totally new {@link InputStream}.
+   */
+  InputStream newIn();
+
+  /**
+   * Clean all the {@link  StagingPart}'s resources, such as removing temporary file, free the buffered data etc. it
+   * should be idempotent and quiet (without throwing IO error).
+   */
+  void cleanup();
+}

+ 23 - 0
hadoop-cloud-storage-project/hadoop-tos/src/main/java/org/apache/hadoop/fs/tosfs/object/staging/State.java

@@ -0,0 +1,23 @@
+/*
+ * ByteDance Volcengine EMR, Copyright 2022.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.tosfs.object.staging;
+
+public enum State {
+  WRITABLE,
+  READABLE,
+  CLEANED
+}