浏览代码

HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)

Steve Loughran 10 年之前
父节点
当前提交
15b7076ad5

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -667,6 +667,8 @@ Release 2.7.0 - UNRELEASED
     HADOOP-11620. Add support for load balancing across a group of KMS for HA.
     HADOOP-11620. Add support for load balancing across a group of KMS for HA.
     (Arun Suresh via wang)
     (Arun Suresh via wang)
 
 
+    HADOOP-11183. Memory-based S3AOutputstream. (Thomas Demoor via stevel)
+
   BUG FIXES
   BUG FIXES
 
 
     HADOOP-11512. Use getTrimmedStrings when reading serialization keys
     HADOOP-11512. Use getTrimmedStrings when reading serialization keys

+ 18 - 2
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -763,13 +763,13 @@ for ldap providers in the same way as above does.
 <property>
 <property>
   <name>fs.s3a.connection.establish.timeout</name>
   <name>fs.s3a.connection.establish.timeout</name>
   <value>5000</value>
   <value>5000</value>
-  <description>Socket connection setup timeout in seconds.</description>
+  <description>Socket connection setup timeout in milliseconds.</description>
 </property>
 </property>
 
 
 <property>
 <property>
   <name>fs.s3a.connection.timeout</name>
   <name>fs.s3a.connection.timeout</name>
   <value>50000</value>
   <value>50000</value>
-  <description>Socket connection timeout in seconds.</description>
+  <description>Socket connection timeout in milliseconds.</description>
 </property>
 </property>
 
 
 <property>
 <property>
@@ -845,6 +845,22 @@ for ldap providers in the same way as above does.
     uploads to.</description>
     uploads to.</description>
 </property>
 </property>
 
 
+<property>
+  <name>fs.s3a.fast.upload</name>
+  <value>false</value>
+  <description>Upload directly from memory instead of buffering to
+    disk first. Memory usage and parallelism can be controlled as up to
+    fs.s3a.multipart.size memory is consumed for each (part)upload actively
+    uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
+</property>
+
+  <property>
+  <name>fs.s3a.fast.buffer.size</name>
+  <value>1048576</value>
+  <description>Size of initial memory buffer in bytes allocated for an
+    upload. No effect if fs.s3a.fast.upload is false.</description>
+</property>
+
 <property>
 <property>
   <name>fs.s3a.impl</name>
   <name>fs.s3a.impl</name>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>
   <value>org.apache.hadoop.fs.s3a.S3AFileSystem</value>

+ 8 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -83,6 +83,14 @@ public class Constants {
   // comma separated list of directories
   // comma separated list of directories
   public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
   public static final String BUFFER_DIR = "fs.s3a.buffer.dir";
 
 
+  // should we upload directly from memory rather than using a file buffer
+  public static final String FAST_UPLOAD = "fs.s3a.fast.upload";
+  public static final boolean DEFAULT_FAST_UPLOAD = false;
+
+  //initial size of memory buffer for a fast upload
+  public static final String FAST_BUFFER_SIZE = "fs.s3a.fast.buffer.size";
+  public static final int DEFAULT_FAST_BUFFER_SIZE = 1048576; //1MB
+
   // private | public-read | public-read-write | authenticated-read | 
   // private | public-read | public-read-write | authenticated-read | 
   // log-delivery-write | bucket-owner-read | bucket-owner-full-control
   // log-delivery-write | bucket-owner-read | bucket-owner-full-control
   public static final String CANNED_ACL = "fs.s3a.acl.default";
   public static final String CANNED_ACL = "fs.s3a.acl.default";

+ 413 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java

@@ -0,0 +1,413 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.AmazonClientException;
+import com.amazonaws.AmazonServiceException;
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.AmazonS3Client;
+import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
+import com.amazonaws.services.s3.model.CannedAccessControlList;
+import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.PartETag;
+import com.amazonaws.services.s3.model.PutObjectRequest;
+import com.amazonaws.services.s3.model.PutObjectResult;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+
+
+/**
+ * Upload files/parts asap directly from a memory buffer (instead of buffering
+ * to a file).
+ * <p/>
+ * Uploads are managed low-level rather than through the AWS TransferManager.
+ * This allows for uploading each part of a multi-part upload as soon as
+ * the bytes are in memory, rather than waiting until the file is closed.
+ * <p/>
+ * Unstable: statistics and error handling might evolve
+ */
+@InterfaceStability.Unstable
+public class S3AFastOutputStream extends OutputStream {
+
+  private static final Logger LOG = S3AFileSystem.LOG;
+  private final String key;
+  private final String bucket;
+  private final AmazonS3Client client;
+  private final int partSize;
+  private final int multiPartThreshold;
+  private final S3AFileSystem fs;
+  private final CannedAccessControlList cannedACL;
+  private final FileSystem.Statistics statistics;
+  private final String serverSideEncryptionAlgorithm;
+  private final ProgressListener progressListener;
+  private final ListeningExecutorService executorService;
+  private MultiPartUpload multiPartUpload;
+  private boolean closed;
+  private ByteArrayOutputStream buffer;
+  private int bufferLimit;
+
+
+  /**
+   * Creates a fast OutputStream that uploads to S3 from memory.
+   * For MultiPartUploads, as soon as sufficient bytes have been written to
+   * the stream a part is uploaded immediately (by using the low-level
+   * multi-part upload API on the AmazonS3Client).
+   *
+   * @param client AmazonS3Client used for S3 calls
+   * @param fs S3AFilesystem
+   * @param bucket S3 bucket name
+   * @param key S3 key name
+   * @param progress report progress in order to prevent timeouts
+   * @param statistics track FileSystem.Statistics on the performed operations
+   * @param cannedACL used CannedAccessControlList
+   * @param serverSideEncryptionAlgorithm algorithm for server side encryption
+   * @param partSize size of a single part in a multi-part upload (except
+   * last part)
+   * @param multiPartThreshold files at least this size use multi-part upload
+   * @throws IOException
+   */
+  public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
+      String bucket, String key, Progressable progress,
+      FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
+      String serverSideEncryptionAlgorithm, long partSize,
+      long multiPartThreshold, ThreadPoolExecutor threadPoolExecutor)
+      throws IOException {
+    this.bucket = bucket;
+    this.key = key;
+    this.client = client;
+    this.fs = fs;
+    this.cannedACL = cannedACL;
+    this.statistics = statistics;
+    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
+    //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
+    if (partSize > Integer.MAX_VALUE) {
+      this.partSize = Integer.MAX_VALUE;
+      LOG.warn("s3a: MULTIPART_SIZE capped to ~2.14GB (maximum allowed size " +
+          "when using 'FAST_UPLOAD = true')");
+    } else {
+      this.partSize = (int) partSize;
+    }
+    if (multiPartThreshold > Integer.MAX_VALUE) {
+      this.multiPartThreshold = Integer.MAX_VALUE;
+      LOG.warn("s3a: MIN_MULTIPART_THRESHOLD capped to ~2.14GB (maximum " +
+          "allowed size when using 'FAST_UPLOAD = true')");
+    } else {
+      this.multiPartThreshold = (int) multiPartThreshold;
+    }
+    this.bufferLimit = this.multiPartThreshold;
+    this.closed = false;
+    int initialBufferSize = this.fs.getConf()
+        .getInt(Constants.FAST_BUFFER_SIZE, Constants.DEFAULT_FAST_BUFFER_SIZE);
+    if (initialBufferSize < 0) {
+      LOG.warn("s3a: FAST_BUFFER_SIZE should be a positive number. Using " +
+          "default value");
+      initialBufferSize = Constants.DEFAULT_FAST_BUFFER_SIZE;
+    } else if (initialBufferSize > this.bufferLimit) {
+      LOG.warn("s3a: automatically adjusting FAST_BUFFER_SIZE to not " +
+          "exceed MIN_MULTIPART_THRESHOLD");
+      initialBufferSize = this.bufferLimit;
+    }
+    this.buffer = new ByteArrayOutputStream(initialBufferSize);
+    this.executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
+    this.multiPartUpload = null;
+    this.progressListener = new ProgressableListener(progress);
+    if (LOG.isDebugEnabled()){
+      LOG.debug("Initialized S3AFastOutputStream for bucket '{}' key '{}'",
+          bucket, key);
+    }
+  }
+
+  /**
+   * Writes a byte to the memory buffer. If this causes the buffer to reach
+   * its limit, the actual upload is submitted to the threadpool.
+   * @param b the int of which the lowest byte is written
+   * @throws IOException
+   */
+  @Override
+  public synchronized void write(int b) throws IOException {
+    buffer.write(b);
+    if (buffer.size() == bufferLimit) {
+      uploadBuffer();
+    }
+  }
+
+  /**
+   * Writes a range of bytes from to the memory buffer. If this causes the
+   * buffer to reach its limit, the actual upload is submitted to the
+   * threadpool and the remainder of the array is written to memory
+   * (recursively).
+   * @param b byte array containing
+   * @param off offset in array where to start
+   * @param len number of bytes to be written
+   * @throws IOException
+   */
+  @Override
+  public synchronized void write(byte b[], int off, int len)
+      throws IOException {
+    if (b == null) {
+      throw new NullPointerException();
+    } else if ((off < 0) || (off > b.length) || (len < 0) ||
+        ((off + len) > b.length) || ((off + len) < 0)) {
+      throw new IndexOutOfBoundsException();
+    } else if (len == 0) {
+      return;
+    }
+    if (buffer.size() + len < bufferLimit) {
+      buffer.write(b, off, len);
+    } else {
+      int firstPart = bufferLimit - buffer.size();
+      buffer.write(b, off, firstPart);
+      uploadBuffer();
+      this.write(b, off + firstPart, len - firstPart);
+    }
+  }
+
+  private synchronized void uploadBuffer() throws IOException {
+    if (multiPartUpload == null) {
+      multiPartUpload = initiateMultiPartUpload();
+       /* Upload the existing buffer if it exceeds partSize. This possibly
+       requires multiple parts! */
+      final byte[] allBytes = buffer.toByteArray();
+      buffer = null; //earlier gc?
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Total length of initial buffer: {}", allBytes.length);
+      }
+      int processedPos = 0;
+      while ((multiPartThreshold - processedPos) >= partSize) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Initial buffer: processing from byte {} to byte {}",
+              processedPos, (processedPos + partSize - 1));
+        }
+        multiPartUpload.uploadPartAsync(new ByteArrayInputStream(allBytes,
+            processedPos, partSize), partSize);
+        processedPos += partSize;
+      }
+      //resize and reset stream
+      bufferLimit = partSize;
+      buffer = new ByteArrayOutputStream(bufferLimit);
+      buffer.write(allBytes, processedPos, multiPartThreshold - processedPos);
+    } else {
+      //upload next part
+      multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
+          .toByteArray()), partSize);
+      buffer.reset();
+    }
+  }
+
+
+  @Override
+  public synchronized void close() throws IOException {
+    if (closed) {
+      return;
+    }
+    closed = true;
+    try {
+      if (multiPartUpload == null) {
+        putObject();
+      } else {
+        if (buffer.size() > 0) {
+          //send last part
+          multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
+              .toByteArray()), buffer.size());
+        }
+        final List<PartETag> partETags = multiPartUpload
+            .waitForAllPartUploads();
+        multiPartUpload.complete(partETags);
+      }
+      statistics.incrementWriteOps(1);
+      // This will delete unnecessary fake parent directories
+      fs.finishedWrite(key);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
+      }
+    } finally {
+      buffer = null;
+      super.close();
+    }
+  }
+
+  private ObjectMetadata createDefaultMetadata() {
+    ObjectMetadata om = new ObjectMetadata();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setServerSideEncryption(serverSideEncryptionAlgorithm);
+    }
+    return om;
+  }
+
+  private MultiPartUpload initiateMultiPartUpload() throws IOException {
+    final ObjectMetadata om = createDefaultMetadata();
+    final InitiateMultipartUploadRequest initiateMPURequest =
+        new InitiateMultipartUploadRequest(bucket, key, om);
+    initiateMPURequest.setCannedACL(cannedACL);
+    try {
+      return new MultiPartUpload(
+          client.initiateMultipartUpload(initiateMPURequest).getUploadId());
+    } catch (AmazonServiceException ase) {
+      throw new IOException("Unable to initiate MultiPartUpload (server side)" +
+          ": " + ase, ase);
+    } catch (AmazonClientException ace) {
+      throw new IOException("Unable to initiate MultiPartUpload (client side)" +
+          ": " + ace, ace);
+    }
+  }
+
+  private void putObject() throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Executing regular upload for bucket '{}' key '{}'", bucket,
+          key);
+    }
+    final ObjectMetadata om = createDefaultMetadata();
+    om.setContentLength(buffer.size());
+    final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+        new ByteArrayInputStream(buffer.toByteArray()), om);
+    putObjectRequest.setCannedAcl(cannedACL);
+    putObjectRequest.setGeneralProgressListener(progressListener);
+    ListenableFuture<PutObjectResult> putObjectResult =
+        executorService.submit(new Callable<PutObjectResult>() {
+          @Override
+          public PutObjectResult call() throws Exception {
+            return client.putObject(putObjectRequest);
+          }
+        });
+    //wait for completion
+    try {
+      putObjectResult.get();
+    } catch (InterruptedException ie) {
+      LOG.warn("Interrupted object upload:" + ie, ie);
+      Thread.currentThread().interrupt();
+    } catch (ExecutionException ee) {
+      throw new IOException("Regular upload failed", ee.getCause());
+    }
+  }
+
+  private class MultiPartUpload {
+    private final String uploadId;
+    private final List<ListenableFuture<PartETag>> partETagsFutures;
+
+    public MultiPartUpload(String uploadId) {
+      this.uploadId = uploadId;
+      this.partETagsFutures = new ArrayList<ListenableFuture<PartETag>>();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Initiated multi-part upload for bucket '{}' key '{}' with " +
+            "id '{}'", bucket, key, uploadId);
+      }
+    }
+
+    public void uploadPartAsync(ByteArrayInputStream inputStream,
+        int partSize) {
+      final int currentPartNumber = partETagsFutures.size() + 1;
+      final UploadPartRequest request =
+          new UploadPartRequest().withBucketName(bucket).withKey(key)
+              .withUploadId(uploadId).withInputStream(inputStream)
+              .withPartNumber(currentPartNumber).withPartSize(partSize);
+      request.setGeneralProgressListener(progressListener);
+      ListenableFuture<PartETag> partETagFuture =
+          executorService.submit(new Callable<PartETag>() {
+            @Override
+            public PartETag call() throws Exception {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
+                    uploadId);
+              }
+              return client.uploadPart(request).getPartETag();
+            }
+          });
+      partETagsFutures.add(partETagFuture);
+    }
+
+    public List<PartETag> waitForAllPartUploads() throws IOException {
+      try {
+        return Futures.allAsList(partETagsFutures).get();
+      } catch (InterruptedException ie) {
+        LOG.warn("Interrupted partUpload:" + ie, ie);
+        Thread.currentThread().interrupt();
+      } catch (ExecutionException ee) {
+        //there is no way of recovering so abort
+        //cancel all partUploads
+        for (ListenableFuture<PartETag> future : partETagsFutures) {
+          future.cancel(true);
+        }
+        //abort multipartupload
+        this.abort();
+        throw new IOException("Part upload failed in multi-part upload with " +
+            "id '" +uploadId + "':" + ee, ee);
+      }
+      //should not happen?
+      return null;
+    }
+
+    public void complete(List<PartETag> partETags) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Completing multi-part upload for key '{}', id '{}'", key,
+            uploadId);
+      }
+      final CompleteMultipartUploadRequest completeRequest =
+          new CompleteMultipartUploadRequest(bucket, key, uploadId, partETags);
+      client.completeMultipartUpload(completeRequest);
+
+    }
+
+    public void abort() {
+      LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
+      try {
+        client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
+            key, uploadId));
+      } catch (Exception e2) {
+        LOG.warn("Unable to abort multipart upload, you may need to purge  " +
+            "uploaded parts: " + e2, e2);
+      }
+    }
+  }
+
+  private static class ProgressableListener implements ProgressListener {
+    private final Progressable progress;
+
+    public ProgressableListener(Progressable progress) {
+      this.progress = progress;
+    }
+
+    public void progressChanged(ProgressEvent progressEvent) {
+      if (progress != null) {
+        progress.progress();
+      }
+    }
+  }
+}

+ 15 - 9
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -88,7 +88,8 @@ public class S3AFileSystem extends FileSystem {
   private int maxKeys;
   private int maxKeys;
   private long partSize;
   private long partSize;
   private TransferManager transfers;
   private TransferManager transfers;
-  private int partSizeThreshold;
+  private ThreadPoolExecutor threadPoolExecutor;
+  private int multiPartThreshold;
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
   private CannedAccessControlList cannedACL;
   private CannedAccessControlList cannedACL;
   private String serverSideEncryptionAlgorithm;
   private String serverSideEncryptionAlgorithm;
@@ -237,7 +238,7 @@ public class S3AFileSystem extends FileSystem {
 
 
     maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
     maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
     partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
     partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    partSizeThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD, 
+    multiPartThreshold = conf.getInt(MIN_MULTIPART_THRESHOLD,
       DEFAULT_MIN_MULTIPART_THRESHOLD);
       DEFAULT_MIN_MULTIPART_THRESHOLD);
 
 
     if (partSize < 5 * 1024 * 1024) {
     if (partSize < 5 * 1024 * 1024) {
@@ -245,9 +246,9 @@ public class S3AFileSystem extends FileSystem {
       partSize = 5 * 1024 * 1024;
       partSize = 5 * 1024 * 1024;
     }
     }
 
 
-    if (partSizeThreshold < 5 * 1024 * 1024) {
+    if (multiPartThreshold < 5 * 1024 * 1024) {
       LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
       LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
-      partSizeThreshold = 5 * 1024 * 1024;
+      multiPartThreshold = 5 * 1024 * 1024;
     }
     }
 
 
     int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
     int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
@@ -262,20 +263,20 @@ public class S3AFileSystem extends FileSystem {
     LinkedBlockingQueue<Runnable> workQueue =
     LinkedBlockingQueue<Runnable> workQueue =
       new LinkedBlockingQueue<>(maxThreads *
       new LinkedBlockingQueue<>(maxThreads *
         conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
         conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS));
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+    threadPoolExecutor = new ThreadPoolExecutor(
         coreThreads,
         coreThreads,
         maxThreads,
         maxThreads,
         keepAliveTime,
         keepAliveTime,
         TimeUnit.SECONDS,
         TimeUnit.SECONDS,
         workQueue,
         workQueue,
         newDaemonThreadFactory("s3a-transfer-shared-"));
         newDaemonThreadFactory("s3a-transfer-shared-"));
-    tpe.allowCoreThreadTimeOut(true);
+    threadPoolExecutor.allowCoreThreadTimeOut(true);
 
 
     TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
     TransferManagerConfiguration transferConfiguration = new TransferManagerConfiguration();
     transferConfiguration.setMinimumUploadPartSize(partSize);
     transferConfiguration.setMinimumUploadPartSize(partSize);
-    transferConfiguration.setMultipartUploadThreshold(partSizeThreshold);
+    transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
 
 
-    transfers = new TransferManager(s3, tpe);
+    transfers = new TransferManager(s3, threadPoolExecutor);
     transfers.setConfiguration(transferConfiguration);
     transfers.setConfiguration(transferConfiguration);
 
 
     String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
     String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
@@ -391,7 +392,12 @@ public class S3AFileSystem extends FileSystem {
     if (!overwrite && exists(f)) {
     if (!overwrite && exists(f)) {
       throw new FileAlreadyExistsException(f + " already exists");
       throw new FileAlreadyExistsException(f + " already exists");
     }
     }
-
+    if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
+      return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
+          key, progress, statistics, cannedACL,
+          serverSideEncryptionAlgorithm, partSize, (long)multiPartThreshold,
+          threadPoolExecutor), statistics);
+    }
     // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
     // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
     return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
     return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
       bucket, key, progress, cannedACL, statistics, 
       bucket, key, progress, cannedACL, statistics, 

+ 40 - 6
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -213,13 +213,13 @@ If you do any of these: change your credentials immediately!
     <property>
     <property>
       <name>fs.s3a.connection.establish.timeout</name>
       <name>fs.s3a.connection.establish.timeout</name>
       <value>5000</value>
       <value>5000</value>
-      <description>Socket connection setup timeout in seconds.</description>
+      <description>Socket connection setup timeout in milliseconds.</description>
     </property>
     </property>
 
 
     <property>
     <property>
       <name>fs.s3a.connection.timeout</name>
       <name>fs.s3a.connection.timeout</name>
       <value>50000</value>
       <value>50000</value>
-      <description>Socket connection timeout in seconds.</description>
+      <description>Socket connection timeout in milliseconds.</description>
     </property>
     </property>
 
 
     <property>
     <property>
@@ -292,7 +292,7 @@ If you do any of these: change your credentials immediately!
       <name>fs.s3a.buffer.dir</name>
       <name>fs.s3a.buffer.dir</name>
       <value>${hadoop.tmp.dir}/s3a</value>
       <value>${hadoop.tmp.dir}/s3a</value>
       <description>Comma separated list of directories that will be used to buffer file
       <description>Comma separated list of directories that will be used to buffer file
-        uploads to.</description>
+        uploads to. No effect if fs.s3a.fast.upload is true.</description>
     </property>
     </property>
 
 
     <property>
     <property>
@@ -301,6 +301,40 @@ If you do any of these: change your credentials immediately!
       <description>The implementation class of the S3A Filesystem</description>
       <description>The implementation class of the S3A Filesystem</description>
     </property>
     </property>
 
 
+### S3AFastOutputStream
+ **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
+
+    <property>
+      <name>fs.s3a.fast.upload</name>
+      <value>false</value>
+      <description>Upload directly from memory instead of buffering to
+      disk first. Memory usage and parallelism can be controlled as up to
+      fs.s3a.multipart.size memory is consumed for each (part)upload actively
+      uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
+    </property>
+
+    <property>
+      <name>fs.s3a.fast.buffer.size</name>
+      <value>1048576</value>
+      <description>Size (in bytes) of initial memory buffer allocated for an
+      upload. No effect if fs.s3a.fast.upload is false.</description>
+    </property>
+
+Writes are buffered in memory instead of to a file on local disk. This
+removes the throughput bottleneck of the local disk write and read cycle
+before starting the actual upload. Furthermore, it allows handling files that
+are larger than the remaining local disk space.
+
+However, non-trivial memory tuning is needed for optimal results and careless
+settings could cause memory overflow. Up to `fs.s3a.threads.max` parallel
+(part)uploads are active. Furthermore, up to `fs.s3a.max.total.tasks`
+additional part(uploads) can be waiting (and thus memory buffers are created).
+The memory buffer is uploaded as a single upload if it is not larger than
+`fs.s3a.multipart.threshold`. Else, a multi-part upload is initiatated and
+parts of size `fs.s3a.multipart.size` are used to protect against overflowing
+the available memory. These settings should be tuned to the envisioned
+workflow (some large files, many small ones, ...) and the physical
+limitations of the machine and cluster (memory, network bandwidth).
 
 
 ## Testing the S3 filesystem clients
 ## Testing the S3 filesystem clients
 
 
@@ -334,7 +368,7 @@ each filesystem for its testing.
 The contents of each bucket will be destroyed during the test process:
 The contents of each bucket will be destroyed during the test process:
 do not use the bucket for any purpose other than testing. Furthermore, for
 do not use the bucket for any purpose other than testing. Furthermore, for
 s3a, all in-progress multi-part uploads to the bucket will be aborted at the
 s3a, all in-progress multi-part uploads to the bucket will be aborted at the
-start of a test (by forcing fs.s3a.multipart.purge=true) to clean up the
+start of a test (by forcing `fs.s3a.multipart.purge=true`) to clean up the
 temporary state of previously failed tests.
 temporary state of previously failed tests.
 
 
 Example:
 Example:
@@ -392,14 +426,14 @@ Example:
 ## File `contract-test-options.xml`
 ## File `contract-test-options.xml`
 
 
 The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
 The file `hadoop-tools/hadoop-aws/src/test/resources/contract-test-options.xml`
-must be created and configured for the test fileystems.
+must be created and configured for the test filesystems.
 
 
 If a specific file `fs.contract.test.fs.*` test path is not defined for
 If a specific file `fs.contract.test.fs.*` test path is not defined for
 any of the filesystems, those tests will be skipped.
 any of the filesystems, those tests will be skipped.
 
 
 The standard S3 authentication details must also be provided. This can be
 The standard S3 authentication details must also be provided. This can be
 through copy-and-paste of the `auth-keys.xml` credentials, or it can be
 through copy-and-paste of the `auth-keys.xml` credentials, or it can be
-through direct XInclude inclustion.
+through direct XInclude inclusion.
 
 
 #### s3://
 #### s3://
 
 

+ 74 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFastOutputStream.java

@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+import java.io.IOException;
+
+/**
+ * Tests regular and multi-part upload functionality for S3AFastOutputStream.
+ * File sizes are kept small to reduce test duration on slow connections
+ */
+public class TestS3AFastOutputStream {
+  private FileSystem fs;
+
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  @Before
+  public void setUp() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+    conf.setInt(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    fs = S3ATestUtils.createTestFileSystem(conf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(getTestPath(), true);
+    }
+  }
+
+  protected Path getTestPath() {
+    return new Path("/tests3a");
+  }
+
+  @Test
+  public void testRegularUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 1024 * 1024);
+  }
+
+  @Test
+  public void testMultiPartUpload() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 6 * 1024 *
+        1024);
+  }
+}