Explorar o código

HADOOP-18458: AliyunOSSBlockOutputStream to support heap/off-heap buffer before uploading data to OSS (#4912)

Jinhu Wu %!s(int64=2) %!d(string=hai) anos
pai
achega
b5e8269d9b

+ 131 - 82
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.aliyun.oss;
 
 import com.aliyun.oss.model.PartETag;
+import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService;
@@ -27,17 +28,15 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedOutputStream;
-import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 
 /**
  * Asynchronous multi-part based uploading mechanism to support huge file
@@ -49,71 +48,103 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
       LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class);
   private AliyunOSSFileSystemStore store;
   private Configuration conf;
-  private boolean closed;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
   private String key;
-  private File blockFile;
-  private Map<Integer, File> blockFiles = new HashMap<>();
-  private long blockSize;
+  private int blockSize;
   private int blockId = 0;
   private long blockWritten = 0L;
   private String uploadId = null;
   private final List<ListenableFuture<PartETag>> partETagsFutures;
+  private final OSSDataBlocks.BlockFactory blockFactory;
+  private final BlockOutputStreamStatistics statistics;
+  private OSSDataBlocks.DataBlock activeBlock;
   private final ListeningExecutorService executorService;
-  private OutputStream blockStream;
   private final byte[] singleByte = new byte[1];
 
   public AliyunOSSBlockOutputStream(Configuration conf,
       AliyunOSSFileSystemStore store,
       String key,
-      Long blockSize,
+      int blockSize,
+      OSSDataBlocks.BlockFactory blockFactory,
+      BlockOutputStreamStatistics statistics,
       ExecutorService executorService) throws IOException {
     this.store = store;
     this.conf = conf;
     this.key = key;
     this.blockSize = blockSize;
-    this.blockFile = newBlockFile();
-    this.blockStream =
-        new BufferedOutputStream(new FileOutputStream(blockFile));
+    this.blockFactory = blockFactory;
+    this.statistics = statistics;
     this.partETagsFutures = new ArrayList<>(2);
     this.executorService = MoreExecutors.listeningDecorator(executorService);
   }
 
-  private File newBlockFile() throws IOException {
-    return AliyunOSSUtils.createTmpFileForWrite(
-        String.format("oss-block-%04d-", blockId), blockSize, conf);
+  /**
+   * Demand create a destination block.
+   * @return the active block; null if there isn't one.
+   * @throws IOException on any failure to create
+   */
+  private synchronized OSSDataBlocks.DataBlock createBlockIfNeeded()
+      throws IOException {
+    if (activeBlock == null) {
+      blockId++;
+      activeBlock = blockFactory.create(blockId, blockSize, statistics);
+    }
+    return activeBlock;
+  }
+
+  /**
+   * Check for the filesystem being open.
+   * @throws IOException if the filesystem is closed.
+   */
+  void checkOpen() throws IOException {
+    if (closed.get()) {
+      throw new IOException("Stream closed.");
+    }
   }
 
+  /**
+   * The flush operation does not trigger an upload; that awaits
+   * the next block being full. What it does do is call {@code flush() }
+   * on the current block, leaving it to choose how to react.
+   * @throws IOException Any IO problem.
+   */
   @Override
   public synchronized void flush() throws IOException {
-    blockStream.flush();
+    checkOpen();
+
+    OSSDataBlocks.DataBlock dataBlock = getActiveBlock();
+    if (dataBlock != null) {
+      dataBlock.flush();
+    }
   }
 
   @Override
   public synchronized void close() throws IOException {
-    if (closed) {
+    if (closed.get()) {
+      // already closed
+      LOG.debug("Ignoring close() as stream is already closed");
       return;
     }
 
-    blockStream.flush();
-    blockStream.close();
-    if (!blockFiles.values().contains(blockFile)) {
-      blockId++;
-      blockFiles.put(blockId, blockFile);
-    }
-
     try {
-      if (blockFiles.size() == 1) {
+      if (uploadId == null) {
         // just upload it directly
-        store.uploadObject(key, blockFile);
+        OSSDataBlocks.DataBlock dataBlock = getActiveBlock();
+        if (dataBlock == null) {
+          // zero size file
+          store.storeEmptyFile(key);
+        } else {
+          OSSDataBlocks.BlockUploadData uploadData = dataBlock.startUpload();
+          if (uploadData.hasFile()) {
+            store.uploadObject(key, uploadData.getFile());
+          } else {
+            store.uploadObject(key,
+                uploadData.getUploadStream(), dataBlock.dataSize());
+          }
+        }
       } else {
         if (blockWritten > 0) {
-          ListenableFuture<PartETag> partETagFuture =
-              executorService.submit(() -> {
-                PartETag partETag = store.uploadPart(blockFile, key, uploadId,
-                    blockId);
-                return partETag;
-              });
-          partETagsFutures.add(partETagFuture);
+          uploadCurrentBlock();
         }
         // wait for the partial uploads to finish
         final List<PartETag> partETags = waitForAllPartUploads();
@@ -124,8 +155,8 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
             new ArrayList<>(partETags));
       }
     } finally {
-      removeTemporaryFiles();
-      closed = true;
+      cleanupWithLogger(LOG, getActiveBlock(), blockFactory);
+      closed.set(true);
     }
   }
 
@@ -138,64 +169,82 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
   @Override
   public synchronized void write(byte[] b, int off, int len)
       throws IOException {
-    if (closed) {
-      throw new IOException("Stream closed.");
-    }
-    blockStream.write(b, off, len);
-    blockWritten += len;
-    if (blockWritten >= blockSize) {
-      uploadCurrentPart();
-      blockWritten = 0L;
+    int totalWritten = 0;
+    while (totalWritten < len) {
+      int written = internalWrite(b, off + totalWritten, len - totalWritten);
+      totalWritten += written;
+      LOG.debug("Buffer len {}, written {},  total written {}",
+          len, written, totalWritten);
     }
   }
-
-  private void removeTemporaryFiles() {
-    for (File file : blockFiles.values()) {
-      if (file != null && file.exists() && !file.delete()) {
-        LOG.warn("Failed to delete temporary file {}", file);
+  private synchronized int internalWrite(byte[] b, int off, int len)
+      throws IOException {
+    OSSDataBlocks.validateWriteArgs(b, off, len);
+    checkOpen();
+    if (len == 0) {
+      return 0;
+    }
+    OSSDataBlocks.DataBlock block = createBlockIfNeeded();
+    int written = block.write(b, off, len);
+    blockWritten += written;
+    int remainingCapacity = block.remainingCapacity();
+    if (written < len) {
+      // not everything was written — the block has run out
+      // of capacity
+      // Trigger an upload then process the remainder.
+      LOG.debug("writing more data than block has capacity -triggering upload");
+      uploadCurrentBlock();
+    } else {
+      if (remainingCapacity == 0) {
+        // the whole buffer is done, trigger an upload
+        uploadCurrentBlock();
       }
     }
+    return written;
   }
 
-  private void removePartFiles() throws IOException {
-    for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
-      if (!partETagFuture.isDone()) {
-        continue;
-      }
-
-      try {
-        File blockFile = blockFiles.get(partETagFuture.get().getPartNumber());
-        if (blockFile != null && blockFile.exists() && !blockFile.delete()) {
-          LOG.warn("Failed to delete temporary file {}", blockFile);
-        }
-      } catch (InterruptedException | ExecutionException e) {
-        throw new IOException(e);
-      }
+  /**
+   * Clear the active block.
+   */
+  private void clearActiveBlock() {
+    if (activeBlock != null) {
+      LOG.debug("Clearing active block");
+    }
+    synchronized (this) {
+      activeBlock = null;
     }
   }
 
-  private void uploadCurrentPart() throws IOException {
-    blockStream.flush();
-    blockStream.close();
-    if (blockId == 0) {
+  private synchronized OSSDataBlocks.DataBlock getActiveBlock() {
+    return activeBlock;
+  }
+
+  private void uploadCurrentBlock()
+      throws IOException {
+    if (uploadId == null) {
       uploadId = store.getUploadId(key);
     }
 
-    blockId++;
-    blockFiles.put(blockId, blockFile);
-
-    File currentFile = blockFile;
     int currentBlockId = blockId;
-    ListenableFuture<PartETag> partETagFuture =
-        executorService.submit(() -> {
-          PartETag partETag = store.uploadPart(currentFile, key, uploadId,
-              currentBlockId);
-          return partETag;
-        });
-    partETagsFutures.add(partETagFuture);
-    removePartFiles();
-    blockFile = newBlockFile();
-    blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
+    OSSDataBlocks.DataBlock dataBlock = getActiveBlock();
+    long size = dataBlock.dataSize();
+    OSSDataBlocks.BlockUploadData uploadData = dataBlock.startUpload();
+    try {
+      ListenableFuture<PartETag> partETagFuture =
+          executorService.submit(() -> {
+            try {
+              PartETag partETag = store.uploadPart(uploadData, size, key,
+                  uploadId, currentBlockId);
+              return partETag;
+            } finally {
+              cleanupWithLogger(LOG, uploadData, dataBlock);
+            }
+          });
+      partETagsFutures.add(partETagFuture);
+    } finally {
+      blockWritten = 0;
+      clearActiveBlock();
+    }
   }
 
   /**

+ 29 - 4
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystem.java

@@ -27,6 +27,9 @@ import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.fs.aliyun.oss.statistics.impl.OutputStreamStatistics;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
@@ -71,6 +74,9 @@ public class AliyunOSSFileSystem extends FileSystem {
   private String bucket;
   private String username;
   private Path workingDir;
+  private OSSDataBlocks.BlockFactory blockFactory;
+  private BlockOutputStreamStatistics blockOutputStreamStatistics;
+  private int uploadPartSize;
   private int blockOutputActiveBlocks;
   private AliyunOSSFileSystemStore store;
   private int maxKeys;
@@ -128,13 +134,13 @@ public class AliyunOSSFileSystem extends FileSystem {
       // this means the file is not found
     }
 
-    long uploadPartSize = AliyunOSSUtils.getMultipartSizeProperty(getConf(),
-        MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
     return new FSDataOutputStream(
         new AliyunOSSBlockOutputStream(getConf(),
             store,
             key,
             uploadPartSize,
+            blockFactory,
+            blockOutputStreamStatistics,
             new SemaphoredDelegatingExecutor(boundedThreadPool,
                 blockOutputActiveBlocks, true)), statistics);
   }
@@ -334,6 +340,7 @@ public class AliyunOSSFileSystem extends FileSystem {
    */
   public void initialize(URI name, Configuration conf) throws IOException {
     super.initialize(name, conf);
+    setConf(conf);
 
     bucket = name.getHost();
     uri = java.net.URI.create(name.getScheme() + "://" + name.getAuthority());
@@ -345,6 +352,16 @@ public class AliyunOSSFileSystem extends FileSystem {
     blockOutputActiveBlocks = intOption(conf,
         UPLOAD_ACTIVE_BLOCKS_KEY, UPLOAD_ACTIVE_BLOCKS_DEFAULT, 1);
 
+    uploadPartSize = (int)AliyunOSSUtils.getMultipartSizeProperty(conf,
+        MULTIPART_UPLOAD_PART_SIZE_KEY, MULTIPART_UPLOAD_PART_SIZE_DEFAULT);
+    String uploadBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
+        DEFAULT_FAST_UPLOAD_BUFFER);
+
+    blockOutputStreamStatistics = new OutputStreamStatistics();
+    blockFactory = OSSDataBlocks.createFactory(this, uploadBuffer);
+    LOG.debug("Using OSSBlockOutputStream with buffer = {}; block={};" +
+            " queue limit={}",
+        uploadBuffer, uploadPartSize, blockOutputActiveBlocks);
     store = new AliyunOSSFileSystemStore();
     store.initialize(name, conf, username, statistics);
     maxKeys = conf.getInt(MAX_PAGING_KEYS_KEY, MAX_PAGING_KEYS_DEFAULT);
@@ -379,8 +396,6 @@ public class AliyunOSSFileSystem extends FileSystem {
     this.boundedCopyThreadPool = BlockingThreadPoolExecutorService.newInstance(
         maxCopyThreads, maxCopyTasks, 60L,
         TimeUnit.SECONDS, "oss-copy-unbounded");
-
-    setConf(conf);
   }
 
 /**
@@ -757,4 +772,14 @@ public class AliyunOSSFileSystem extends FileSystem {
   public AliyunOSSFileSystemStore getStore() {
     return store;
   }
+
+  @VisibleForTesting
+  OSSDataBlocks.BlockFactory getBlockFactory() {
+    return blockFactory;
+  }
+
+  @VisibleForTesting
+  BlockOutputStreamStatistics getBlockOutputStreamStatistics() {
+    return blockOutputStreamStatistics;
+  }
 }

+ 52 - 11
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java

@@ -422,6 +422,27 @@ public class AliyunOSSFileSystemStore {
     }
   }
 
+  /**
+   * Upload an input stream as an OSS object, using single upload.
+   * @param key object key.
+   * @param in input stream to upload.
+   * @param size size of the input stream.
+   * @throws IOException if failed to upload object.
+   */
+  public void uploadObject(String key, InputStream in, long size)
+      throws IOException {
+    ObjectMetadata meta = new ObjectMetadata();
+    meta.setContentLength(size);
+
+    if (StringUtils.isNotEmpty(serverSideEncryptionAlgorithm)) {
+      meta.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+
+    PutObjectResult result = ossClient.putObject(bucketName, key, in, meta);
+    LOG.debug(result.getETag());
+    statistics.incrementWriteOps(1);
+  }
+
   /**
    * list objects.
    *
@@ -652,44 +673,58 @@ public class AliyunOSSFileSystemStore {
     };
   }
 
+  public PartETag uploadPart(OSSDataBlocks.BlockUploadData partData,
+      long size, String key, String uploadId, int idx) throws IOException {
+    if (partData.hasFile()) {
+      return uploadPart(partData.getFile(), key, uploadId, idx);
+    } else {
+      return uploadPart(partData.getUploadStream(), size, key, uploadId, idx);
+    }
+  }
+
   public PartETag uploadPart(File file, String key, String uploadId, int idx)
       throws IOException {
-    InputStream instream = null;
+    InputStream in = new FileInputStream(file);
+    try {
+      return uploadPart(in, file.length(), key, uploadId, idx);
+    } finally {
+      in.close();
+    }
+  }
+
+  public PartETag uploadPart(InputStream in, long size, String key,
+      String uploadId, int idx) throws IOException {
     Exception caught = null;
     int tries = 3;
     while (tries > 0) {
       try {
-        instream = new FileInputStream(file);
         UploadPartRequest uploadRequest = new UploadPartRequest();
         uploadRequest.setBucketName(bucketName);
         uploadRequest.setKey(key);
         uploadRequest.setUploadId(uploadId);
-        uploadRequest.setInputStream(instream);
-        uploadRequest.setPartSize(file.length());
+        uploadRequest.setInputStream(in);
+        uploadRequest.setPartSize(size);
         uploadRequest.setPartNumber(idx);
         UploadPartResult uploadResult = ossClient.uploadPart(uploadRequest);
         statistics.incrementWriteOps(1);
         return uploadResult.getPartETag();
       } catch (Exception e) {
-        LOG.debug("Failed to upload "+ file.getPath() +", " +
+        LOG.debug("Failed to upload " + key + ", part " + idx +
             "try again.", e);
         caught = e;
-      } finally {
-        if (instream != null) {
-          instream.close();
-          instream = null;
-        }
       }
       tries--;
     }
 
     assert (caught != null);
-    throw new IOException("Failed to upload " + file.getPath() +
+    throw new IOException("Failed to upload " + key + ", part " + idx +
         " for 3 times.", caught);
   }
 
   /**
    * Initiate multipart upload.
+   * @param key object key.
+   * @return upload id.
    */
   public String getUploadId(String key) {
     InitiateMultipartUploadRequest initiateMultipartUploadRequest =
@@ -701,6 +736,10 @@ public class AliyunOSSFileSystemStore {
 
   /**
    * Complete the specific multipart upload.
+   * @param key object key.
+   * @param uploadId upload id of this multipart upload.
+   * @param partETags part etags need to be completed.
+   * @return CompleteMultipartUploadResult.
    */
   public CompleteMultipartUploadResult completeMultipartUpload(String key,
       String uploadId, List<PartETag> partETags) {
@@ -713,6 +752,8 @@ public class AliyunOSSFileSystemStore {
 
   /**
    * Abort the specific multipart upload.
+   * @param key object key.
+   * @param uploadId upload id of this multipart upload.
    */
   public void abortMultipartUpload(String key, String uploadId) {
     AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(

+ 53 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/Constants.java

@@ -134,6 +134,59 @@ public final class Constants {
   // Comma separated list of directories
   public static final String BUFFER_DIR_KEY = "fs.oss.buffer.dir";
 
+  /**
+   * What buffer to use.
+   * Default is {@link #FAST_UPLOAD_BUFFER_DISK}
+   * Value: {@value}
+   */
+  public static final String FAST_UPLOAD_BUFFER =
+      "fs.oss.fast.upload.buffer";
+
+  /**
+   * Buffer blocks to disk: {@value}.
+   * Capacity is limited to available disk space.
+   */
+  public static final String FAST_UPLOAD_BUFFER_DISK = "disk";
+
+  /**
+   * Use an in-memory array. Fast but will run of heap rapidly: {@value}.
+   */
+  public static final String FAST_UPLOAD_BUFFER_ARRAY = "array";
+
+  /**
+   * Use a byte buffer. May be more memory efficient than the
+   * {@link #FAST_UPLOAD_BUFFER_ARRAY}: {@value}.
+   */
+  public static final String FAST_UPLOAD_BYTEBUFFER = "bytebuffer";
+
+  /**
+   * Use an in-memory array and fallback to disk if
+   * used memory exceed the quota.
+   */
+  public static final String FAST_UPLOAD_BUFFER_ARRAY_DISK = "array_disk";
+
+  /**
+   * Use a byte buffer and fallback to disk if
+   * used memory exceed the quota.
+   */
+  public static final String FAST_UPLOAD_BYTEBUFFER_DISK = "bytebuffer_disk";
+
+  /**
+   * Memory limit of {@link #FAST_UPLOAD_BUFFER_ARRAY_DISK} or
+   * {@link #FAST_UPLOAD_BYTEBUFFER_DISK}.
+   */
+  public static final String FAST_UPLOAD_BUFFER_MEMORY_LIMIT =
+      "fs.oss.fast.upload.memory.limit";
+
+  public static final long FAST_UPLOAD_BUFFER_MEMORY_LIMIT_DEFAULT =
+      1024 * 1024 * 1024; // 1GB
+
+  /**
+   * Default buffer option: {@value}.
+   */
+  public static final String DEFAULT_FAST_UPLOAD_BUFFER =
+      FAST_UPLOAD_BUFFER_DISK;
+
   // private | public-read | public-read-write
   public static final String CANNED_ACL_KEY = "fs.oss.acl.default";
   public static final String CANNED_ACL_DEFAULT = "";

+ 1109 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/OSSDataBlocks.java

@@ -0,0 +1,1109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss;
+
+import java.io.BufferedOutputStream;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics;
+import org.apache.hadoop.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.FSExceptionMessages;
+import org.apache.hadoop.util.DirectBufferPool;
+
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Set of classes to support output streaming into blocks which are then
+ * uploaded as to OSS as a single PUT, or as part of a multipart request.
+ */
+final class OSSDataBlocks {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(OSSDataBlocks.class);
+
+  private OSSDataBlocks() {
+  }
+
+  /**
+   * Validate args to a write command. These are the same validation checks
+   * expected for any implementation of {@code OutputStream.write()}.
+   * @param b byte array containing data
+   * @param off offset in array where to start
+   * @param len number of bytes to be written
+   * @throws NullPointerException for a null buffer
+   * @throws IndexOutOfBoundsException if indices are out of range
+   */
+  static void validateWriteArgs(byte[] b, int off, int len)
+      throws IOException {
+    Preconditions.checkNotNull(b);
+    if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException(
+          "write (b[" + b.length + "], " + off + ", " + len + ')');
+    }
+  }
+
+  /**
+   * Create a factory.
+   * @param owner factory owner
+   * @param name factory name -the option from {@link Constants}.
+   * @return the factory, ready to be initialized.
+   * @throws IllegalArgumentException if the name is unknown.
+   */
+  static BlockFactory createFactory(AliyunOSSFileSystem owner,
+      String name) {
+    switch (name) {
+    case Constants.FAST_UPLOAD_BUFFER_ARRAY:
+      return new ArrayBlockFactory(owner);
+    case Constants.FAST_UPLOAD_BUFFER_DISK:
+      return new DiskBlockFactory(owner);
+    case Constants.FAST_UPLOAD_BYTEBUFFER:
+      return new ByteBufferBlockFactory(owner);
+    case Constants.FAST_UPLOAD_BUFFER_ARRAY_DISK:
+      return new MemoryAndDiskBlockFactory(
+          owner, new ArrayBlockFactory(owner));
+    case Constants.FAST_UPLOAD_BYTEBUFFER_DISK:
+      return new MemoryAndDiskBlockFactory(
+          owner, new ByteBufferBlockFactory(owner));
+    default:
+      throw new IllegalArgumentException("Unsupported block buffer" +
+          " \"" + name + '"');
+    }
+  }
+
+  /**
+   * The output information for an upload.
+   * It can be one of a file or an input stream.
+   * When closed, any stream is closed. Any source file is untouched.
+   */
+  static final class BlockUploadData implements Closeable {
+    private final File file;
+    private final InputStream uploadStream;
+
+    /**
+     * File constructor; input stream will be null.
+     * @param file file to upload
+     */
+    BlockUploadData(File file) {
+      Preconditions.checkArgument(file.exists(), "No file: " + file);
+      this.file = file;
+      this.uploadStream = null;
+    }
+
+    /**
+     * Stream constructor, file field will be null.
+     * @param uploadStream stream to upload
+     */
+    BlockUploadData(InputStream uploadStream) {
+      Preconditions.checkNotNull(uploadStream, "rawUploadStream");
+      this.uploadStream = uploadStream;
+      this.file = null;
+    }
+
+    /**
+     * Predicate: does this instance contain a file reference.
+     * @return true if there is a file.
+     */
+    boolean hasFile() {
+      return file != null;
+    }
+
+    /**
+     * Get the file, if there is one.
+     * @return the file for uploading, or null.
+     */
+    File getFile() {
+      return file;
+    }
+
+    /**
+     * Get the raw upload stream, if the object was
+     * created with one.
+     * @return the upload stream or null.
+     */
+    InputStream getUploadStream() {
+      return uploadStream;
+    }
+
+    /**
+     * Close: closes any upload stream provided in the constructor.
+     * @throws IOException inherited exception
+     */
+    @Override
+    public void close() throws IOException {
+      cleanupWithLogger(LOG, uploadStream);
+    }
+  }
+
+  /**
+   * Base class for block factories.
+   */
+  static abstract class BlockFactory implements Closeable {
+    private final AliyunOSSFileSystem owner;
+
+    protected BlockFactory(AliyunOSSFileSystem owner) {
+      this.owner = owner;
+    }
+
+    /**
+     * Create a block.
+     *
+     * @param index index of block
+     * @param limit limit of the block
+     * @param statistics stats to work with
+     * @return a new block.
+     */
+    abstract DataBlock create(long index, int limit,
+        BlockOutputStreamStatistics statistics) throws IOException;
+
+    /**
+     * Implement any close/cleanup operation.
+     * Base class is a no-op
+     * @throws IOException Inherited exception; implementations should
+     * avoid raising it.
+     */
+    @Override
+    public void close() throws IOException {
+    }
+
+    /**
+     * Owner.
+     */
+    protected AliyunOSSFileSystem getOwner() {
+      return owner;
+    }
+  }
+
+  /**
+   * This represents a block being uploaded.
+   */
+  static abstract class DataBlock implements Closeable {
+
+    enum DestState {Writing, Upload, Closed}
+
+    private volatile DestState state = DestState.Writing;
+    private final long index;
+    private final BlockOutputStreamStatistics statistics;
+
+    protected DataBlock(long index,
+        BlockOutputStreamStatistics statistics) {
+      this.index = index;
+      this.statistics = statistics;
+    }
+
+    /**
+     * Atomically enter a state, verifying current state.
+     * @param current current state. null means "no check"
+     * @param next next state
+     * @throws IllegalStateException if the current state is not as expected
+     */
+    protected synchronized final void enterState(DestState current,
+        DestState next)
+        throws IllegalStateException {
+      verifyState(current);
+      LOG.debug("{}: entering state {}", this, next);
+      state = next;
+    }
+
+    /**
+     * Verify that the block is in the declared state.
+     * @param expected expected state.
+     * @throws IllegalStateException if the DataBlock is in the wrong state
+     */
+    protected final void verifyState(DestState expected)
+        throws IllegalStateException {
+      if (expected != null && state != expected) {
+        throw new IllegalStateException("Expected stream state " + expected
+            + " -but actual state is " + state + " in " + this);
+      }
+    }
+
+    /**
+     * Current state.
+     * @return the current state.
+     */
+    final DestState getState() {
+      return state;
+    }
+
+    /**
+     * Get index, used by subclasses.
+     */
+    final long getIndex() {
+      return index;
+    }
+
+    /**
+     * Return the current data size.
+     * @return the size of the data
+     */
+    abstract int dataSize();
+
+    /**
+     * Predicate to verify that the block has the capacity to write
+     * the given set of bytes.
+     * @param bytes number of bytes desired to be written.
+     * @return true if there is enough space.
+     */
+    abstract boolean hasCapacity(long bytes);
+
+    /**
+     * Predicate to check if there is data in the block.
+     * @return true if there is
+     */
+    boolean hasData() {
+      return dataSize() > 0;
+    }
+
+    /**
+     * The remaining capacity in the block before it is full.
+     * @return the number of bytes remaining.
+     */
+    abstract int remainingCapacity();
+
+    /**
+     * Write a series of bytes from the buffer, from the offset.
+     * Returns the number of bytes written.
+     * Only valid in the state {@code Writing}.
+     * Base class verifies the state but does no writing.
+     * @param buffer buffer
+     * @param offset offset
+     * @param length length of write
+     * @return number of bytes written
+     * @throws IOException trouble
+     */
+    int write(byte[] buffer, int offset, int length) throws IOException {
+      verifyState(DestState.Writing);
+      Preconditions.checkArgument(buffer != null, "Null buffer");
+      Preconditions.checkArgument(length >= 0, "length is negative");
+      Preconditions.checkArgument(offset >= 0, "offset is negative");
+      Preconditions.checkArgument(
+          !(buffer.length - offset < length),
+          "buffer shorter than amount of data to write");
+      return 0;
+    }
+
+    /**
+     * Flush the output.
+     * Only valid in the state {@code Writing}.
+     * In the base class, this is a no-op
+     * @throws IOException any IO problem.
+     */
+    void flush() throws IOException {
+      verifyState(DestState.Writing);
+    }
+
+    /**
+     * Switch to the upload state and return a stream for uploading.
+     * Base class calls {@link #enterState(DestState, DestState)} to
+     * manage the state machine.
+     * @return the stream
+     * @throws IOException trouble
+     */
+    BlockUploadData startUpload() throws IOException {
+      LOG.debug("Start datablock[{}] upload", index);
+      enterState(DestState.Writing, DestState.Upload);
+      return null;
+    }
+
+    /**
+     * Enter the closed state.
+     * @return true if the class was in any other state, implying that
+     * the subclass should do its close operations
+     */
+    protected synchronized boolean enterClosedState() {
+      if (!state.equals(DestState.Closed)) {
+        enterState(null, DestState.Closed);
+        return true;
+      } else {
+        return false;
+      }
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (enterClosedState()) {
+        LOG.debug("Closed {}", this);
+        innerClose();
+      }
+    }
+
+    /**
+     * Inner close logic for subclasses to implement.
+     */
+    protected void innerClose() throws IOException {
+    }
+
+    /**
+     * A block has been allocated.
+     */
+    protected void blockAllocated() {
+      if (statistics != null) {
+        statistics.blockAllocated();
+      }
+    }
+
+    /**
+     * A block has been released.
+     */
+    protected void blockReleased() {
+      if (statistics != null) {
+        statistics.blockReleased();
+      }
+    }
+
+    /**
+     * A disk block has been allocated.
+     */
+    protected void diskBlockAllocated() {
+      if (statistics != null) {
+        statistics.diskBlockAllocated();
+      }
+    }
+
+    /**
+     * A disk block has been released.
+     */
+    protected void diskBlockReleased() {
+      if (statistics != null) {
+        statistics.diskBlockReleased();
+      }
+    }
+
+    /**
+     * Memory bytes has been allocated.
+     */
+    protected void bytesAllocated(long size) {
+      if (statistics != null) {
+        statistics.bytesAllocated(size);
+      }
+    }
+
+    /**
+     * Memory bytes has been released.
+     */
+    protected void bytesReleased(long size) {
+      if (statistics != null) {
+        statistics.bytesReleased(size);
+      }
+    }
+
+    protected BlockOutputStreamStatistics getStatistics() {
+      return statistics;
+    }
+  }
+
+  // ====================================================================
+
+  static class MemoryLimitException extends IOException {
+    MemoryLimitException(String msg) {
+      super(msg);
+    }
+  }
+
+  static abstract class MemoryBlockFactory extends BlockFactory {
+    private final AtomicLong memoryUsed = new AtomicLong(0);
+    private long memoryLimit = 0;
+    private boolean checkMemory = false;
+
+    MemoryBlockFactory(AliyunOSSFileSystem owner) {
+      super(owner);
+    }
+
+    void setMemoryLimit(long memoryLimit) {
+      this.memoryLimit = memoryLimit;
+      if (memoryLimit > 0) {
+        checkMemory = true;
+      }
+    }
+
+    void allocateMemory(long size) throws MemoryLimitException {
+      if (!checkMemory) {
+        return;
+      }
+      long next = memoryUsed.addAndGet(size);
+      if (next > memoryLimit) {
+        memoryUsed.getAndAdd(-size);
+        String msg = "Can not allocate memory"
+                + ", memory used " + memoryUsed
+                + ", allocate size " + size
+                + ", memory limit " + memoryLimit;
+        throw new MemoryLimitException(msg);
+      }
+    }
+
+    void releaseMemory(long size) {
+      if (!checkMemory) {
+        return;
+      }
+      memoryUsed.getAndAdd(-size);
+    }
+
+    long getMemoryUsed() {
+      return memoryUsed.get();
+    }
+  }
+
+  /**
+   * Use byte arrays on the heap for storage.
+   */
+  static class ArrayBlockFactory extends MemoryBlockFactory {
+
+    ArrayBlockFactory(AliyunOSSFileSystem owner) {
+      super(owner);
+    }
+
+    @Override
+    DataBlock create(long index, int limit,
+        BlockOutputStreamStatistics statistics)
+        throws IOException {
+      try {
+        return new ByteArrayBlock(index, limit, statistics);
+      } catch (MemoryLimitException e) {
+        LOG.debug(e.getMessage() + ", index " + index);
+        return null;
+      }
+    }
+
+    static class OSSByteArrayOutputStream extends ByteArrayOutputStream {
+
+      OSSByteArrayOutputStream(int size) {
+        super(size);
+      }
+
+      /**
+       * InputStream backed by the internal byte array.
+       *
+       * @return
+       */
+      ByteArrayInputStream getInputStream() {
+        ByteArrayInputStream bin = new ByteArrayInputStream(this.buf, 0, count);
+        this.reset();
+        this.buf = null;
+        return bin;
+      }
+    }
+
+    /**
+     * Stream to memory via a {@code ByteArrayOutputStream}.
+     * <p>
+     * This has the problem: it can consume a lot of heap space
+     * proportional to the mismatch between writes to the stream and
+     * the JVM-wide upload bandwidth to the OSS endpoint.
+     * The memory consumption can be limited by tuning the filesystem settings
+     * to restrict the number of queued/active uploads.
+     */
+
+    class ByteArrayBlock extends DataBlock {
+      private OSSByteArrayOutputStream buffer;
+      private final int limit;
+      // cache data size so that it is consistent after the buffer is reset.
+      private Integer dataSize;
+
+      ByteArrayBlock(long index,
+          int limit,
+          BlockOutputStreamStatistics statistics) throws MemoryLimitException {
+        super(index, statistics);
+        this.limit = limit;
+        allocateMemory(limit);
+        buffer = new OSSByteArrayOutputStream(limit);
+        blockAllocated();
+        bytesAllocated(limit);
+      }
+
+      /**
+       * Get the amount of data; if there is no buffer then the size is 0.
+       *
+       * @return the amount of data available to upload.
+       */
+      @Override
+      int dataSize() {
+        return dataSize != null ? dataSize : buffer.size();
+      }
+
+      @Override
+      BlockUploadData startUpload() throws IOException {
+        super.startUpload();
+        dataSize = buffer.size();
+        ByteArrayInputStream bufferData = buffer.getInputStream();
+        buffer = null;
+        return new BlockUploadData(bufferData);
+      }
+
+      @Override
+      boolean hasCapacity(long bytes) {
+        return dataSize() + bytes <= limit;
+      }
+
+      @Override
+      int remainingCapacity() {
+        return limit - dataSize();
+      }
+
+      @Override
+      int write(byte[] b, int offset, int len) throws IOException {
+        super.write(b, offset, len);
+        int written = Math.min(remainingCapacity(), len);
+        buffer.write(b, offset, written);
+        return written;
+      }
+
+      @Override
+      protected void innerClose() {
+        buffer = null;
+        releaseMemory(limit);
+        blockReleased();
+        bytesReleased(limit);
+      }
+
+      @Override
+      public String toString() {
+        return "ByteArrayBlock{"
+            + "index=" + getIndex() +
+            ", state=" + getState() +
+            ", limit=" + limit +
+            ", dataSize=" + dataSize +
+            '}';
+      }
+    }
+  }
+
+  // ====================================================================
+
+  /**
+   * Stream via Direct ByteBuffers; these are allocated off heap
+   * via {@link DirectBufferPool}.
+   */
+  static class ByteBufferBlockFactory extends MemoryBlockFactory {
+    private final DirectBufferPool bufferPool = new DirectBufferPool();
+    private final AtomicInteger buffersOutstanding = new AtomicInteger(0);
+
+    ByteBufferBlockFactory(AliyunOSSFileSystem owner) {
+      super(owner);
+    }
+
+    @Override
+    ByteBufferBlock create(long index, int limit,
+        BlockOutputStreamStatistics statistics)
+        throws IOException {
+      try {
+        return new ByteBufferBlock(index, limit, statistics);
+      } catch (MemoryLimitException e) {
+        LOG.debug(e.getMessage() + ", index " + index);
+        return null;
+      }
+    }
+
+    private ByteBuffer requestBuffer(int limit) {
+      LOG.debug("Requesting buffer of size {}", limit);
+      buffersOutstanding.incrementAndGet();
+      return bufferPool.getBuffer(limit);
+    }
+
+    private void releaseBuffer(ByteBuffer buffer) {
+      LOG.debug("Releasing buffer");
+      bufferPool.returnBuffer(buffer);
+      buffersOutstanding.decrementAndGet();
+    }
+
+    /**
+     * Get count of outstanding buffers.
+     * @return the current buffer count
+     */
+    public int getOutstandingBufferCount() {
+      return buffersOutstanding.get();
+    }
+
+    @Override
+    public String toString() {
+      return "ByteBufferBlockFactory{"
+          + "buffersOutstanding=" + buffersOutstanding +
+          '}';
+    }
+
+    /**
+     * A DataBlock which requests a buffer from pool on creation; returns
+     * it when it is closed.
+     */
+    class ByteBufferBlock extends DataBlock {
+      private ByteBuffer blockBuffer;
+      private final int bufferSize;
+      // cache data size so that it is consistent after the buffer is reset.
+      private Integer dataSize;
+
+      /**
+       * Instantiate. This will request a ByteBuffer of the desired size.
+       * @param index block index
+       * @param bufferSize buffer size
+       */
+      ByteBufferBlock(long index, int bufferSize,
+          BlockOutputStreamStatistics statistics) throws MemoryLimitException {
+        super(index, statistics);
+        this.bufferSize = bufferSize;
+        allocateMemory(bufferSize);
+        blockBuffer = requestBuffer(bufferSize);
+        blockAllocated();
+        bytesAllocated(bufferSize);
+      }
+
+      /**
+       * Get the amount of data; if there is no buffer then the size is 0.
+       * @return the amount of data available to upload.
+       */
+      @Override
+      int dataSize() {
+        return dataSize != null ? dataSize : bufferCapacityUsed();
+      }
+
+      @Override
+      BlockUploadData startUpload() throws IOException {
+        super.startUpload();
+        dataSize = bufferCapacityUsed();
+        // set the buffer up from reading from the beginning
+        blockBuffer.limit(blockBuffer.position());
+        blockBuffer.position(0);
+        return new BlockUploadData(
+            new ByteBufferInputStream(dataSize, blockBuffer));
+      }
+
+      @Override
+      public boolean hasCapacity(long bytes) {
+        return bytes <= remainingCapacity();
+      }
+
+      @Override
+      public int remainingCapacity() {
+        return blockBuffer != null ? blockBuffer.remaining() : 0;
+      }
+
+      private int bufferCapacityUsed() {
+        return blockBuffer.capacity() - blockBuffer.remaining();
+      }
+
+      @Override
+      int write(byte[] b, int offset, int len) throws IOException {
+        super.write(b, offset, len);
+        int written = Math.min(remainingCapacity(), len);
+        blockBuffer.put(b, offset, written);
+        return written;
+      }
+
+      /**
+       * Closing the block will release the buffer.
+       */
+      @Override
+      protected void innerClose() {
+        if (blockBuffer != null) {
+          releaseMemory(bufferSize);
+          blockReleased();
+          bytesReleased(bufferSize);
+          releaseBuffer(blockBuffer);
+          blockBuffer = null;
+        }
+      }
+
+      @Override
+      public String toString() {
+        return "ByteBufferBlock{"
+            + "index=" + getIndex() +
+            ", state=" + getState() +
+            ", dataSize=" + dataSize() +
+            ", limit=" + bufferSize +
+            ", remainingCapacity=" + remainingCapacity() +
+            '}';
+      }
+
+      /**
+       * Provide an input stream from a byte buffer; supporting
+       * {@link #mark(int)}, which is required to enable replay of failed
+       * PUT attempts.
+       */
+      class ByteBufferInputStream extends InputStream {
+
+        private final int size;
+        private ByteBuffer byteBuffer;
+
+        ByteBufferInputStream(int size,
+            ByteBuffer byteBuffer) {
+          LOG.debug("Creating ByteBufferInputStream of size {}", size);
+          this.size = size;
+          this.byteBuffer = byteBuffer;
+        }
+
+        /**
+         * After the stream is closed, set the local reference to the byte
+         * buffer to null; this guarantees that future attempts to use
+         * stream methods will fail.
+         */
+        @Override
+        public synchronized void close() {
+          LOG.debug("ByteBufferInputStream.close() for {}",
+              ByteBufferBlock.super.toString());
+          byteBuffer = null;
+        }
+
+        /**
+         * Verify that the stream is open.
+         * @throws IOException if the stream is closed
+         */
+        private void verifyOpen() throws IOException {
+          if (byteBuffer == null) {
+            throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+          }
+        }
+
+        public synchronized int read() throws IOException {
+          if (available() > 0) {
+            return byteBuffer.get() & 0xFF;
+          } else {
+            return -1;
+          }
+        }
+
+        @Override
+        public synchronized long skip(long offset) throws IOException {
+          verifyOpen();
+          long newPos = position() + offset;
+          if (newPos < 0) {
+            throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK);
+          }
+          if (newPos > size) {
+            throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
+          }
+          byteBuffer.position((int) newPos);
+          return newPos;
+        }
+
+        @Override
+        public synchronized int available() {
+          Preconditions.checkState(byteBuffer != null,
+              FSExceptionMessages.STREAM_IS_CLOSED);
+          return byteBuffer.remaining();
+        }
+
+        /**
+         * Get the current buffer position.
+         * @return the buffer position
+         */
+        public synchronized int position() {
+          return byteBuffer.position();
+        }
+
+        /**
+         * Check if there is data left.
+         * @return true if there is data remaining in the buffer.
+         */
+        public synchronized boolean hasRemaining() {
+          return byteBuffer.hasRemaining();
+        }
+
+        @Override
+        public synchronized void mark(int readlimit) {
+          LOG.debug("mark at {}", position());
+          byteBuffer.mark();
+        }
+
+        @Override
+        public synchronized void reset() throws IOException {
+          LOG.debug("reset");
+          byteBuffer.reset();
+        }
+
+        @Override
+        public boolean markSupported() {
+          return true;
+        }
+
+        /**
+         * Read in data.
+         * @param b destination buffer
+         * @param offset offset within the buffer
+         * @param length length of bytes to read
+         * @throws EOFException if the position is negative
+         * @throws IndexOutOfBoundsException if there isn't space for the
+         * amount of data requested.
+         * @throws IllegalArgumentException other arguments are invalid.
+         */
+        @SuppressWarnings("NullableProblems")
+        public synchronized int read(byte[] b, int offset, int length)
+            throws IOException {
+          Preconditions.checkArgument(length >= 0, "length is negative");
+          Preconditions.checkArgument(b != null, "Null buffer");
+          if (b.length - offset < length) {
+            throw new IndexOutOfBoundsException(
+                FSExceptionMessages.TOO_MANY_BYTES_FOR_DEST_BUFFER
+                    + ": request length =" + length
+                    + ", with offset =" + offset
+                    + "; buffer capacity =" + (b.length - offset));
+          }
+          verifyOpen();
+          if (!hasRemaining()) {
+            return -1;
+          }
+
+          int toRead = Math.min(length, available());
+          byteBuffer.get(b, offset, toRead);
+          return toRead;
+        }
+
+        @Override
+        public String toString() {
+          final StringBuilder sb = new StringBuilder(
+              "ByteBufferInputStream{");
+          sb.append("size=").append(size);
+          ByteBuffer buf = this.byteBuffer;
+          if (buf != null) {
+            sb.append(", available=").append(buf.remaining());
+          }
+          sb.append(", ").append(ByteBufferBlock.super.toString());
+          sb.append('}');
+          return sb.toString();
+        }
+      }
+    }
+  }
+
+  // ====================================================================
+
+  /**
+   * Buffer blocks to disk.
+   */
+  static class DiskBlockFactory extends BlockFactory {
+
+    DiskBlockFactory(AliyunOSSFileSystem owner) {
+      super(owner);
+    }
+
+    /**
+     * Create a temp file and a {@link DiskBlock} instance to manage it.
+     *
+     * @param index block index
+     * @param limit limit of the block.
+     * @return the new block
+     * @throws IOException IO problems
+     */
+    @Override
+    DataBlock create(long index, int limit,
+        BlockOutputStreamStatistics statistics)
+        throws IOException {
+      File destFile = AliyunOSSUtils.createTmpFileForWrite(
+          String.format("oss-block-%04d-", index), limit, getOwner().getConf());
+      return new DiskBlock(destFile, limit, index, statistics);
+    }
+  }
+
+  /**
+   * Stream to a file.
+   * This will stop at the limit; the caller is expected to create a new block.
+   */
+  static class DiskBlock extends DataBlock {
+
+    private int bytesWritten = 0;
+    private final File bufferFile;
+    private final int limit;
+    private BufferedOutputStream out;
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    DiskBlock(File bufferFile,
+        int limit,
+        long index,
+        BlockOutputStreamStatistics statistics)
+        throws FileNotFoundException {
+      super(index, statistics);
+      this.limit = limit;
+      this.bufferFile = bufferFile;
+      blockAllocated();
+      diskBlockAllocated();
+      out = new BufferedOutputStream(new FileOutputStream(bufferFile));
+    }
+
+    @Override
+    int dataSize() {
+      return bytesWritten;
+    }
+
+    @Override
+    boolean hasCapacity(long bytes) {
+      return dataSize() + bytes <= limit;
+    }
+
+    @Override
+    int remainingCapacity() {
+      return limit - bytesWritten;
+    }
+
+    @Override
+    int write(byte[] b, int offset, int len) throws IOException {
+      super.write(b, offset, len);
+      int written = Math.min(remainingCapacity(), len);
+      out.write(b, offset, written);
+      bytesWritten += written;
+      return written;
+    }
+
+    @Override
+    BlockUploadData startUpload() throws IOException {
+      super.startUpload();
+      try {
+        out.flush();
+      } finally {
+        out.close();
+        out = null;
+      }
+      return new BlockUploadData(bufferFile);
+    }
+
+    /**
+     * The close operation will delete the destination file if it still
+     * exists.
+     * @throws IOException IO problems
+     */
+    @SuppressWarnings("UnnecessaryDefault")
+    @Override
+    protected void innerClose() throws IOException {
+      final DestState state = getState();
+      LOG.debug("Closing {}", this);
+      switch (state) {
+      case Writing:
+        if (bufferFile.exists()) {
+          // file was not uploaded
+          LOG.debug("Block[{}]: Deleting buffer file as upload did not start",
+              getIndex());
+          closeBlock();
+        }
+        break;
+
+      case Upload:
+        LOG.debug("Block[{}]: Buffer file {} exists —close upload stream",
+            getIndex(), bufferFile);
+        break;
+
+      case Closed:
+        closeBlock();
+        break;
+
+      default:
+        // this state can never be reached, but checkstyle complains, so
+        // it is here.
+      }
+    }
+
+    /**
+     * Flush operation will flush to disk.
+     * @throws IOException IOE raised on FileOutputStream
+     */
+    @Override
+    void flush() throws IOException {
+      super.flush();
+      out.flush();
+    }
+
+    @Override
+    public String toString() {
+      String sb = "FileBlock{"
+          + "index=" + getIndex()
+          + ", destFile=" + bufferFile +
+          ", state=" + getState() +
+          ", dataSize=" + dataSize() +
+          ", limit=" + limit +
+          '}';
+      return sb;
+    }
+
+    /**
+     * Close the block.
+     * This will delete the block's buffer file if the block has
+     * not previously been closed.
+     */
+    void closeBlock() {
+      LOG.debug("block[{}]: closeBlock()", getIndex());
+      if (!closed.getAndSet(true)) {
+        blockReleased();
+        diskBlockReleased();
+        if (!bufferFile.delete() && bufferFile.exists()) {
+          LOG.warn("delete({}) returned false",
+              bufferFile.getAbsoluteFile());
+        }
+      } else {
+        LOG.debug("block[{}]: skipping re-entrant closeBlock()", getIndex());
+      }
+    }
+  }
+
+  /**
+   * Buffer blocks to memory and fallback to disk if
+   * used memory exceed the quota.
+   */
+  static class MemoryAndDiskBlockFactory extends BlockFactory {
+    private BlockFactory memoryFactory;
+    private BlockFactory diskFactory;
+
+    MemoryAndDiskBlockFactory(AliyunOSSFileSystem owner,
+        BlockFactory memoryFactory) {
+      super(owner);
+      this.memoryFactory = memoryFactory;
+      diskFactory = new DiskBlockFactory(owner);
+
+      long memoryLimit = owner.getConf().getLong(
+          Constants.FAST_UPLOAD_BUFFER_MEMORY_LIMIT,
+          Constants.FAST_UPLOAD_BUFFER_MEMORY_LIMIT_DEFAULT);
+      ((MemoryBlockFactory)this.memoryFactory).setMemoryLimit(memoryLimit);
+    }
+
+    /**
+     * Create a temp file and a {@link DataBlock} instance to manage it.
+     *
+     * @param index block index
+     * @param limit limit of the block.
+     * @return the new block
+     * @throws IOException IO problems
+     */
+    @Override
+    DataBlock create(long index, int limit,
+        BlockOutputStreamStatistics statistics)
+        throws IOException {
+      DataBlock block = memoryFactory.create(index, limit, statistics);
+      if (block != null) {
+        return block;
+      } else {
+        return diskFactory.create(index, limit, statistics);
+      }
+    }
+
+    @VisibleForTesting
+    MemoryBlockFactory getMemoryFactory() {
+      return (MemoryBlockFactory)memoryFactory;
+    }
+  }
+}

+ 72 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/BlockOutputStreamStatistics.java

@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.statistics;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Block output stream statistics.
+ */
+@InterfaceStability.Unstable
+public interface BlockOutputStreamStatistics {
+
+  /**
+   * A block has been allocated.
+   */
+  void blockAllocated();
+
+  /**
+   * A block has been released.
+   */
+  void blockReleased();
+
+  /**
+   * A disk block has been allocated.
+   */
+  void diskBlockAllocated();
+
+  /**
+   * A disk block has been released.
+   */
+  void diskBlockReleased();
+
+  /**
+   * Memory bytes has been allocated.
+   * @param size allocated size.
+   */
+  void bytesAllocated(long size);
+
+  /**
+   * Memory bytes has been released.
+   * @param size released size.
+   */
+  void bytesReleased(long size);
+
+  int getBlocksAllocated();
+
+  int getBlocksReleased();
+
+  int getDiskBlocksAllocated();
+
+  int getDiskBlocksReleased();
+
+  long getBytesAllocated();
+
+  long getBytesReleased();
+}

+ 98 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/OutputStreamStatistics.java

@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.aliyun.oss.statistics.impl;
+
+import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Implementation of {@link BlockOutputStreamStatistics}.
+ */
+public class OutputStreamStatistics implements BlockOutputStreamStatistics {
+  private final AtomicInteger blocksAllocated = new AtomicInteger(0);
+  private final AtomicInteger blocksReleased = new AtomicInteger(0);
+
+  private final AtomicInteger diskBlocksAllocated = new AtomicInteger(0);
+  private final AtomicInteger diskBlocksReleased = new AtomicInteger(0);
+
+  private final AtomicLong bytesAllocated = new AtomicLong(0);
+  private final AtomicLong bytesReleased = new AtomicLong(0);
+
+  @Override
+  public void blockAllocated() {
+    blocksAllocated.incrementAndGet();
+  }
+
+  @Override
+  public void blockReleased() {
+    blocksReleased.incrementAndGet();
+  }
+
+  @Override
+  public void diskBlockAllocated() {
+    diskBlocksAllocated.incrementAndGet();
+  }
+
+  @Override
+  public void diskBlockReleased() {
+    diskBlocksReleased.incrementAndGet();
+  }
+
+  @Override
+  public int getBlocksAllocated() {
+    return blocksAllocated.get();
+  }
+
+  @Override
+  public int getBlocksReleased() {
+    return blocksReleased.get();
+  }
+
+  @Override
+  public int getDiskBlocksAllocated() {
+    return diskBlocksAllocated.get();
+  }
+
+  @Override
+  public int getDiskBlocksReleased() {
+    return diskBlocksReleased.get();
+  }
+
+  @Override
+  public void bytesAllocated(long size) {
+    bytesAllocated.getAndAdd(size);
+  }
+
+  @Override
+  public void bytesReleased(long size) {
+    bytesReleased.getAndAdd(size);
+  }
+
+  @Override
+  public long getBytesAllocated() {
+    return bytesAllocated.get();
+  }
+
+  @Override
+  public long getBytesReleased() {
+    return bytesReleased.get();
+  }
+}

+ 29 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/impl/package-info.java

@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Statistics collection for the OSS connector: implementation.
+ * Not for use by anything outside the hadoop-aliyun source tree.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.aliyun.oss.statistics.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 27 - 0
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/statistics/package-info.java

@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Statistics collection for the OSS connector: interfaces.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.fs.aliyun.oss.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;

+ 47 - 3
hadoop-tools/hadoop-aliyun/src/site/markdown/tools/hadoop-aliyun/index.md

@@ -164,7 +164,7 @@ please raise your issues with them.
 
     <property>
       <name>fs.oss.attempts.maximum</name>
-      <value>20</value>
+      <value>10</value>
       <description>How many times we should retry commands on transient errors.</description>
     </property>
 
@@ -239,7 +239,7 @@ please raise your issues with them.
 
     <property>
       <name>fs.oss.multipart.download.size</name>
-      <value>102400/value>
+      <value>524288/value>
       <description>Size in bytes in each request from ALiyun OSS.</description>
     </property>
 
@@ -251,9 +251,53 @@ please raise your issues with them.
       </description>
     </property>
 
+    <property>
+      <name>fs.oss.fast.upload.buffer</name>
+      <value>disk</value>
+      <description>
+        The buffering mechanism to use.
+        Values: disk, array, bytebuffer, array_disk, bytebuffer_disk.
+
+        "disk" will use the directories listed in fs.oss.buffer.dir as
+        the location(s) to save data prior to being uploaded.
+
+        "array" uses arrays in the JVM heap
+
+        "bytebuffer" uses off-heap memory within the JVM.
+
+        Both "array" and "bytebuffer" will consume memory in a single stream up to the number
+        of blocks set by:
+
+            fs.oss.multipart.upload.size * fs.oss.upload.active.blocks.
+
+        If using either of these mechanisms, keep this value low
+
+        The total number of threads performing work across all threads is set by
+        fs.oss.multipart.download.threads(Currently fast upload shares the same thread tool with download.
+        The thread pool size is specified in "fs.oss.multipart.download.threads"),
+        with fs.oss.max.total.tasks values setting the number of queued work items.
+
+        "array_disk" and "bytebuffer_disk" support fallback to disk.
+      </description>
+    </property>
+
+    <property>
+      <name>fs.oss.fast.upload.memory.limit</name>
+      <value>1073741824</value>
+      <description>
+        Memory limit of "array_disk" and "bytebuffer_disk" upload buffers.
+        Will fallback to disk buffers if used memory reaches the limit.
+      </description>
+    </property>
+
     <property>
       <name>fs.oss.buffer.dir</name>
-      <description>Comma separated list of directories to buffer OSS data before uploading to Aliyun OSS</description>
+      <value>${env.LOCAL_DIRS:-${hadoop.tmp.dir}}/oss</value>
+      <description>Comma separated list of directories to buffer
+        OSS data before uploading to Aliyun OSS.
+        Yarn container path will be used as default value on yarn applications,
+        otherwise fall back to hadoop.tmp.dir
+      </description>
     </property>
 
     <property>

+ 180 - 13
hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java

@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.aliyun.oss.OSSDataBlocks.ByteBufferBlockFactory;
+import org.apache.hadoop.fs.aliyun.oss.statistics.BlockOutputStreamStatistics;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
@@ -37,12 +39,19 @@ import java.util.ArrayList;
 import java.util.LinkedHashSet;
 
 import static org.apache.hadoop.fs.aliyun.oss.Constants.BUFFER_DIR_KEY;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER_ARRAY_DISK;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER_DISK;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BUFFER_MEMORY_LIMIT;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BYTEBUFFER;
+import static org.apache.hadoop.fs.aliyun.oss.Constants.FAST_UPLOAD_BYTEBUFFER_DISK;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
 import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_KEY;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -54,6 +63,7 @@ public class TestAliyunOSSBlockOutputStream {
   private static final int PART_SIZE = 1024 * 1024;
   private static String testRootPath =
       AliyunOSSTestUtils.generateUniqueTestPath();
+  private static final long MEMORY_LIMIT = 10 * 1024 * 1024;
 
   @Rule
   public Timeout testTimeout = new Timeout(30 * 60 * 1000);
@@ -65,6 +75,7 @@ public class TestAliyunOSSBlockOutputStream {
     conf.setInt(IO_CHUNK_BUFFER_SIZE,
         conf.getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
     conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
+    conf.setLong(FAST_UPLOAD_BUFFER_MEMORY_LIMIT, MEMORY_LIMIT);
     fs = AliyunOSSTestUtils.createTestFileSystem(conf);
   }
 
@@ -82,7 +93,7 @@ public class TestAliyunOSSBlockOutputStream {
   @Test
   public void testZeroByteUpload() throws IOException {
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 0);
-    bufferDirShouldEmpty();
+    bufferShouldReleased(true);
   }
 
   @Test
@@ -106,20 +117,21 @@ public class TestAliyunOSSBlockOutputStream {
     assertEquals(size - 1, statistics.getBytesRead());
     assertEquals(3, statistics.getWriteOps());
     assertEquals(size - 1, statistics.getBytesWritten());
+    bufferShouldReleased();
 
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
     assertEquals(14, statistics.getReadOps());
     assertEquals(2 * size - 1, statistics.getBytesRead());
     assertEquals(6, statistics.getWriteOps());
     assertEquals(2 * size - 1, statistics.getBytesWritten());
+    bufferShouldReleased();
 
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
-
     assertEquals(22, statistics.getReadOps());
     assertEquals(3 * size, statistics.getBytesRead());
     assertEquals(10, statistics.getWriteOps());
     assertEquals(3 * size, statistics.getBytesWritten());
-    bufferDirShouldEmpty();
+    bufferShouldReleased();
   }
 
   @Test
@@ -133,19 +145,21 @@ public class TestAliyunOSSBlockOutputStream {
     assertEquals(size - 1, statistics.getBytesRead());
     assertEquals(8, statistics.getWriteOps());
     assertEquals(size - 1, statistics.getBytesWritten());
+    bufferShouldReleased();
 
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
     assertEquals(34, statistics.getReadOps());
     assertEquals(2 * size - 1, statistics.getBytesRead());
     assertEquals(16, statistics.getWriteOps());
     assertEquals(2 * size - 1, statistics.getBytesWritten());
+    bufferShouldReleased();
 
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size + 1);
     assertEquals(52, statistics.getReadOps());
     assertEquals(3 * size, statistics.getBytesRead());
     assertEquals(25, statistics.getWriteOps());
     assertEquals(3 * size, statistics.getBytesWritten());
-    bufferDirShouldEmpty();
+    bufferShouldReleased();
   }
 
   @Test
@@ -159,16 +173,18 @@ public class TestAliyunOSSBlockOutputStream {
     assertEquals(size, statistics.getBytesRead());
     assertEquals(52, statistics.getWriteOps());
     assertEquals(size, statistics.getBytesWritten());
-    bufferDirShouldEmpty();
+    bufferShouldReleased();
   }
 
   @Test
   public void testHugeUpload() throws IOException {
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE - 1);
+    bufferShouldReleased();
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), PART_SIZE);
+    bufferShouldReleased();
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
         MULTIPART_UPLOAD_PART_SIZE_DEFAULT + 1);
-    bufferDirShouldEmpty();
+    bufferShouldReleased();
   }
 
   @Test
@@ -199,15 +215,43 @@ public class TestAliyunOSSBlockOutputStream {
   public void testSmallUpload() throws IOException {
     long size = fs.getConf().getInt(MULTIPART_UPLOAD_PART_SIZE_KEY, 1024);
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size - 1);
-    bufferDirShouldEmpty();
+    bufferShouldReleased();
   }
 
-  private void bufferDirShouldEmpty() throws IOException {
-    Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY));
-    FileStatus[] files = bufferPath.getFileSystem(
-        fs.getConf()).listStatus(bufferPath);
-    // Temporary file should be deleted
-    assertEquals(0, files.length);
+  private void bufferShouldReleased() throws IOException {
+    bufferShouldReleased(false);
+  }
+
+  private void bufferShouldReleased(boolean zeroSizeFile) throws IOException {
+    String bufferDir = fs.getConf().get(BUFFER_DIR_KEY);
+    String bufferType = fs.getConf().get(FAST_UPLOAD_BUFFER);
+    if (bufferType.equals(FAST_UPLOAD_BUFFER_DISK)) {
+      assertNotNull(bufferDir);
+      Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY));
+      FileStatus[] files = bufferPath.getFileSystem(
+          fs.getConf()).listStatus(bufferPath);
+      // Temporary file should be deleted
+      assertEquals(0, files.length);
+    } else {
+      if (bufferType.equals(FAST_UPLOAD_BYTEBUFFER)) {
+        OSSDataBlocks.ByteBufferBlockFactory
+            blockFactory = (OSSDataBlocks.ByteBufferBlockFactory)
+                ((AliyunOSSFileSystem)fs).getBlockFactory();
+        assertEquals("outstanding buffers in " + blockFactory,
+            0, blockFactory.getOutstandingBufferCount());
+      }
+    }
+    BlockOutputStreamStatistics statistics =
+        ((AliyunOSSFileSystem)fs).getBlockOutputStreamStatistics();
+    assertEquals(statistics.getBlocksAllocated(),
+        statistics.getBlocksReleased());
+    if (zeroSizeFile) {
+      assertEquals(statistics.getBlocksAllocated(), 0);
+    } else {
+      assertTrue(statistics.getBlocksAllocated() >= 1);
+    }
+    assertEquals(statistics.getBytesReleased(),
+        statistics.getBytesAllocated());
   }
 
   @Test
@@ -249,4 +293,127 @@ public class TestAliyunOSSBlockOutputStream {
     assertNotEquals("round robin not working",
         tmp1.getParent(), tmp2.getParent());
   }
+
+  @Test
+  public void testByteBufferIO() throws IOException {
+    try (OSSDataBlocks.ByteBufferBlockFactory factory =
+         new OSSDataBlocks.ByteBufferBlockFactory((AliyunOSSFileSystem)fs)) {
+      int limit = 128;
+      OSSDataBlocks.ByteBufferBlockFactory.ByteBufferBlock block
+          = factory.create(1, limit, null);
+      assertEquals("outstanding buffers in " + factory,
+          1, factory.getOutstandingBufferCount());
+
+      byte[] buffer = ContractTestUtils.toAsciiByteArray("test data");
+      int bufferLen = buffer.length;
+      block.write(buffer, 0, bufferLen);
+      assertEquals(bufferLen, block.dataSize());
+      assertEquals("capacity in " + block,
+          limit - bufferLen, block.remainingCapacity());
+      assertTrue("hasCapacity(64) in " + block, block.hasCapacity(64));
+      assertTrue("No capacity in " + block,
+          block.hasCapacity(limit - bufferLen));
+
+      // now start the write
+      OSSDataBlocks.BlockUploadData blockUploadData = block.startUpload();
+      ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream
+          stream =
+          (ByteBufferBlockFactory.ByteBufferBlock.ByteBufferInputStream)
+              blockUploadData.getUploadStream();
+      assertTrue("Mark not supported in " + stream, stream.markSupported());
+      assertTrue("!hasRemaining() in " + stream, stream.hasRemaining());
+
+      int expected = bufferLen;
+      assertEquals("wrong available() in " + stream,
+          expected, stream.available());
+
+      assertEquals('t', stream.read());
+      stream.mark(limit);
+      expected--;
+      assertEquals("wrong available() in " + stream,
+          expected, stream.available());
+
+      // read into a byte array with an offset
+      int offset = 5;
+      byte[] in = new byte[limit];
+      assertEquals(2, stream.read(in, offset, 2));
+      assertEquals('e', in[offset]);
+      assertEquals('s', in[offset + 1]);
+      expected -= 2;
+      assertEquals("wrong available() in " + stream,
+          expected, stream.available());
+
+      // read to end
+      byte[] remainder = new byte[limit];
+      int c;
+      int index = 0;
+      while ((c = stream.read()) >= 0) {
+        remainder[index++] = (byte) c;
+      }
+      assertEquals(expected, index);
+      assertEquals('a', remainder[--index]);
+
+      assertEquals("wrong available() in " + stream,
+          0, stream.available());
+      assertTrue("hasRemaining() in " + stream, !stream.hasRemaining());
+
+      // go the mark point
+      stream.reset();
+      assertEquals('e', stream.read());
+
+      // when the stream is closed, the data should be returned
+      stream.close();
+      assertEquals("outstanding buffers in " + factory,
+          1, factory.getOutstandingBufferCount());
+      block.close();
+      assertEquals("outstanding buffers in " + factory,
+          0, factory.getOutstandingBufferCount());
+      stream.close();
+      assertEquals("outstanding buffers in " + factory,
+          0, factory.getOutstandingBufferCount());
+    }
+  }
+
+  @Test
+  public void testFastUploadArrayDisk() throws IOException {
+    testFastUploadFallback(FAST_UPLOAD_BUFFER_ARRAY_DISK);
+  }
+
+  @Test
+  public void testFastUploadByteBufferDisk() throws IOException {
+    testFastUploadFallback(FAST_UPLOAD_BYTEBUFFER_DISK);
+  }
+
+  private void testFastUploadFallback(String name) throws IOException {
+    Configuration conf = fs.getConf();
+    fs.close();
+
+    conf.set(FAST_UPLOAD_BUFFER, name);
+
+    fs = AliyunOSSTestUtils.createTestFileSystem(conf);
+    long size = 5 * MEMORY_LIMIT;
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), size);
+    OSSDataBlocks.MemoryBlockFactory
+        blockFactory = ((OSSDataBlocks.MemoryAndDiskBlockFactory)
+        ((AliyunOSSFileSystem)fs).getBlockFactory()).getMemoryFactory();
+    assertEquals(blockFactory.getMemoryUsed(), 0);
+
+    Path bufferPath = new Path(fs.getConf().get(BUFFER_DIR_KEY));
+    FileStatus[] files = bufferPath.getFileSystem(
+        fs.getConf()).listStatus(bufferPath);
+    // Temporary file should be deleted
+    assertEquals(0, files.length);
+
+    BlockOutputStreamStatistics statistics =
+        ((AliyunOSSFileSystem)fs).getBlockOutputStreamStatistics();
+    assertEquals(statistics.getBlocksAllocated(),
+        statistics.getBlocksReleased());
+    assertTrue(statistics.getBlocksAllocated() > 1);
+    assertEquals(statistics.getBytesReleased(),
+        statistics.getBytesAllocated());
+    assertTrue(statistics.getBytesAllocated() >= MEMORY_LIMIT);
+    assertTrue(statistics.getDiskBlocksAllocated() > 0);
+    assertEquals(statistics.getDiskBlocksAllocated(),
+        statistics.getDiskBlocksReleased());
+  }
 }