浏览代码

HADOOP-15576. S3A Multipart Uploader to work with S3Guard and encryption Originally contributed by Ewan Higgs with refinements by Steve Loughran.

Ewan Higgs 6 年之前
父节点
当前提交
2ec97abb2e
共有 18 个文件被更改,包括 787 次插入357 次删除
  1. 48 21
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java
  2. 18 14
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java
  3. 4 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java
  4. 7 2
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java
  5. 0 143
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java
  6. 300 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java
  7. 14 36
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java
  8. 0 76
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java
  9. 58 0
      hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java
  10. 119 58
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java
  11. 4 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
  12. 0 0
      hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory
  13. 116 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java
  14. 5 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java
  15. 84 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java
  16. 3 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
  17. 2 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
  18. 5 0
      hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml

+ 48 - 21
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystemMultipartUploader.java

@@ -16,12 +16,6 @@
  */
  */
 package org.apache.hadoop.fs;
 package org.apache.hadoop.fs;
 
 
-import com.google.common.base.Charsets;
-import org.apache.commons.compress.utils.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.permission.FsPermission;
-
 import java.io.IOException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
@@ -29,13 +23,26 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.List;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
+import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.compress.utils.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import static org.apache.hadoop.fs.Path.mergePaths;
+
 /**
 /**
  * A MultipartUploader that uses the basic FileSystem commands.
  * A MultipartUploader that uses the basic FileSystem commands.
  * This is done in three stages:
  * This is done in three stages:
- * Init - create a temp _multipart directory.
- * PutPart - copying the individual parts of the file to the temp directory.
- * Complete - use {@link FileSystem#concat} to merge the files; and then delete
- * the temp directory.
+ * <ul>
+ *   <li>Init - create a temp {@code _multipart} directory.</li>
+ *   <li>PutPart - copying the individual parts of the file to the temp
+ *   directory.</li>
+ *   <li>Complete - use {@link FileSystem#concat} to merge the files;
+ *   and then delete the temp directory.</li>
+ * </ul>
  */
  */
 public class FileSystemMultipartUploader extends MultipartUploader {
 public class FileSystemMultipartUploader extends MultipartUploader {
 
 
@@ -64,28 +71,44 @@ public class FileSystemMultipartUploader extends MultipartUploader {
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
         uploadIdByteArray.length, Charsets.UTF_8));
     Path partPath =
     Path partPath =
-        Path.mergePaths(collectorPath, Path.mergePaths(new Path(Path.SEPARATOR),
+        mergePaths(collectorPath, mergePaths(new Path(Path.SEPARATOR),
             new Path(Integer.toString(partNumber) + ".part")));
             new Path(Integer.toString(partNumber) + ".part")));
-    FSDataOutputStreamBuilder outputStream = fs.createFile(partPath);
-    FSDataOutputStream fsDataOutputStream = outputStream.build();
-    IOUtils.copy(inputStream, fsDataOutputStream, 4096);
-    fsDataOutputStream.close();
+    try(FSDataOutputStream fsDataOutputStream =
+            fs.createFile(partPath).build()) {
+      IOUtils.copy(inputStream, fsDataOutputStream, 4096);
+    } finally {
+      org.apache.hadoop.io.IOUtils.cleanupWithLogger(LOG, inputStream);
+    }
     return BBPartHandle.from(ByteBuffer.wrap(
     return BBPartHandle.from(ByteBuffer.wrap(
         partPath.toString().getBytes(Charsets.UTF_8)));
         partPath.toString().getBytes(Charsets.UTF_8)));
   }
   }
 
 
   private Path createCollectorPath(Path filePath) {
   private Path createCollectorPath(Path filePath) {
-    return Path.mergePaths(filePath.getParent(),
-        Path.mergePaths(new Path(filePath.getName().split("\\.")[0]),
-            Path.mergePaths(new Path("_multipart"),
+    return mergePaths(filePath.getParent(),
+        mergePaths(new Path(filePath.getName().split("\\.")[0]),
+            mergePaths(new Path("_multipart"),
                 new Path(Path.SEPARATOR))));
                 new Path(Path.SEPARATOR))));
   }
   }
 
 
+  private PathHandle getPathHandle(Path filePath) throws IOException {
+    FileStatus status = fs.getFileStatus(filePath);
+    return fs.getPathHandle(status);
+  }
+
   @Override
   @Override
   @SuppressWarnings("deprecation") // rename w/ OVERWRITE
   @SuppressWarnings("deprecation") // rename w/ OVERWRITE
   public PathHandle complete(Path filePath,
   public PathHandle complete(Path filePath,
       List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
       List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
       throws IOException {
       throws IOException {
+
+    if (handles.isEmpty()) {
+      throw new IOException("Empty upload");
+    }
+    // If destination already exists, we believe we already completed it.
+    if (fs.exists(filePath)) {
+      return getPathHandle(filePath);
+    }
+
     handles.sort(Comparator.comparing(Pair::getKey));
     handles.sort(Comparator.comparing(Pair::getKey));
     List<Path> partHandles = handles
     List<Path> partHandles = handles
         .stream()
         .stream()
@@ -97,22 +120,26 @@ public class FileSystemMultipartUploader extends MultipartUploader {
         .collect(Collectors.toList());
         .collect(Collectors.toList());
 
 
     Path collectorPath = createCollectorPath(filePath);
     Path collectorPath = createCollectorPath(filePath);
-    Path filePathInsideCollector = Path.mergePaths(collectorPath,
+    Path filePathInsideCollector = mergePaths(collectorPath,
         new Path(Path.SEPARATOR + filePath.getName()));
         new Path(Path.SEPARATOR + filePath.getName()));
     fs.create(filePathInsideCollector).close();
     fs.create(filePathInsideCollector).close();
     fs.concat(filePathInsideCollector,
     fs.concat(filePathInsideCollector,
         partHandles.toArray(new Path[handles.size()]));
         partHandles.toArray(new Path[handles.size()]));
     fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
     fs.rename(filePathInsideCollector, filePath, Options.Rename.OVERWRITE);
     fs.delete(collectorPath, true);
     fs.delete(collectorPath, true);
-    FileStatus status = fs.getFileStatus(filePath);
-    return fs.getPathHandle(status);
+    return getPathHandle(filePath);
   }
   }
 
 
   @Override
   @Override
   public void abort(Path filePath, UploadHandle uploadId) throws IOException {
   public void abort(Path filePath, UploadHandle uploadId) throws IOException {
     byte[] uploadIdByteArray = uploadId.toByteArray();
     byte[] uploadIdByteArray = uploadId.toByteArray();
+    Preconditions.checkArgument(uploadIdByteArray.length != 0,
+        "UploadId is empty");
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
     Path collectorPath = new Path(new String(uploadIdByteArray, 0,
         uploadIdByteArray.length, Charsets.UTF_8));
         uploadIdByteArray.length, Charsets.UTF_8));
+
+    // force a check for a file existing; raises FNFE if not found
+    fs.getFileStatus(collectorPath);
     fs.delete(collectorPath, true);
     fs.delete(collectorPath, true);
   }
   }
 
 

+ 18 - 14
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/MultipartUploader.java

@@ -21,17 +21,20 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStream;
 import java.util.List;
 import java.util.List;
 
 
-import org.apache.commons.lang3.tuple.Pair;
-
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 
 
+import org.apache.commons.lang3.tuple.Pair;
+
 /**
 /**
  * MultipartUploader is an interface for copying files multipart and across
  * MultipartUploader is an interface for copying files multipart and across
  * multiple nodes. Users should:
  * multiple nodes. Users should:
- * 1. Initialize an upload
- * 2. Upload parts in any order
- * 3. Complete the upload in order to have it materialize in the destination FS.
+ * <ol>
+ *   <li>Initialize an upload</li>
+ *   <li>Upload parts in any order</li>
+ *   <li>Complete the upload in order to have it materialize in the destination
+ *   FS</li>
+ * </ol>
  *
  *
  * Implementers should make sure that the complete function should make sure
  * Implementers should make sure that the complete function should make sure
  * that 'complete' will reorder parts if the destination FS doesn't already
  * that 'complete' will reorder parts if the destination FS doesn't already
@@ -45,7 +48,7 @@ public abstract class MultipartUploader {
    * Initialize a multipart upload.
    * Initialize a multipart upload.
    * @param filePath Target path for upload.
    * @param filePath Target path for upload.
    * @return unique identifier associating part uploads.
    * @return unique identifier associating part uploads.
-   * @throws IOException
+   * @throws IOException IO failure
    */
    */
   public abstract UploadHandle initialize(Path filePath) throws IOException;
   public abstract UploadHandle initialize(Path filePath) throws IOException;
 
 
@@ -53,12 +56,13 @@ public abstract class MultipartUploader {
    * Put part as part of a multipart upload. It should be possible to have
    * Put part as part of a multipart upload. It should be possible to have
    * parts uploaded in any order (or in parallel).
    * parts uploaded in any order (or in parallel).
    * @param filePath Target path for upload (same as {@link #initialize(Path)}).
    * @param filePath Target path for upload (same as {@link #initialize(Path)}).
-   * @param inputStream Data for this part.
+   * @param inputStream Data for this part. Implementations MUST close this
+   * stream after reading in the data.
    * @param partNumber Index of the part relative to others.
    * @param partNumber Index of the part relative to others.
    * @param uploadId Identifier from {@link #initialize(Path)}.
    * @param uploadId Identifier from {@link #initialize(Path)}.
    * @param lengthInBytes Target length to read from the stream.
    * @param lengthInBytes Target length to read from the stream.
    * @return unique PartHandle identifier for the uploaded part.
    * @return unique PartHandle identifier for the uploaded part.
-   * @throws IOException
+   * @throws IOException IO failure
    */
    */
   public abstract PartHandle putPart(Path filePath, InputStream inputStream,
   public abstract PartHandle putPart(Path filePath, InputStream inputStream,
       int partNumber, UploadHandle uploadId, long lengthInBytes)
       int partNumber, UploadHandle uploadId, long lengthInBytes)
@@ -67,12 +71,12 @@ public abstract class MultipartUploader {
   /**
   /**
    * Complete a multipart upload.
    * Complete a multipart upload.
    * @param filePath Target path for upload (same as {@link #initialize(Path)}.
    * @param filePath Target path for upload (same as {@link #initialize(Path)}.
-   * @param handles Identifiers with associated part numbers from
-   *          {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
+   * @param handles non-empty list of identifiers with associated part numbers
+   *          from {@link #putPart(Path, InputStream, int, UploadHandle, long)}.
    *          Depending on the backend, the list order may be significant.
    *          Depending on the backend, the list order may be significant.
    * @param multipartUploadId Identifier from {@link #initialize(Path)}.
    * @param multipartUploadId Identifier from {@link #initialize(Path)}.
    * @return unique PathHandle identifier for the uploaded file.
    * @return unique PathHandle identifier for the uploaded file.
-   * @throws IOException
+   * @throws IOException IO failure or the handle list is empty.
    */
    */
   public abstract PathHandle complete(Path filePath,
   public abstract PathHandle complete(Path filePath,
       List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
       List<Pair<Integer, PartHandle>> handles, UploadHandle multipartUploadId)
@@ -81,10 +85,10 @@ public abstract class MultipartUploader {
   /**
   /**
    * Aborts a multipart upload.
    * Aborts a multipart upload.
    * @param filePath Target path for upload (same as {@link #initialize(Path)}.
    * @param filePath Target path for upload (same as {@link #initialize(Path)}.
-   * @param multipartuploadId Identifier from {@link #initialize(Path)}.
-   * @throws IOException
+   * @param multipartUploadId Identifier from {@link #initialize(Path)}.
+   * @throws IOException IO failure
    */
    */
-  public abstract void abort(Path filePath, UploadHandle multipartuploadId)
+  public abstract void abort(Path filePath, UploadHandle multipartUploadId)
       throws IOException;
       throws IOException;
 
 
 }
 }

+ 4 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PartHandle.java

@@ -16,14 +16,14 @@
  */
  */
 package org.apache.hadoop.fs;
 package org.apache.hadoop.fs;
 
 
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
 import java.io.Serializable;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
 /**
 /**
- * Opaque, serializable reference to an part id for multipart uploads.
+ * Opaque, serializable reference to a part id for multipart uploads.
  */
  */
 @InterfaceAudience.Public
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving

+ 7 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/PathHandle.java

@@ -25,15 +25,16 @@ import org.apache.hadoop.classification.InterfaceStability;
 
 
 /**
 /**
  * Opaque, serializable reference to an entity in the FileSystem. May contain
  * Opaque, serializable reference to an entity in the FileSystem. May contain
- * metadata sufficient to resolve or verify subsequent accesses indepedent of
+ * metadata sufficient to resolve or verify subsequent accesses independent of
  * other modifications to the FileSystem.
  * other modifications to the FileSystem.
  */
  */
 @InterfaceAudience.Public
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving
+@FunctionalInterface
 public interface PathHandle extends Serializable {
 public interface PathHandle extends Serializable {
 
 
   /**
   /**
-   * @return Serialized from in bytes.
+   * @return Serialized form in bytes.
    */
    */
   default byte[] toByteArray() {
   default byte[] toByteArray() {
     ByteBuffer bb = bytes();
     ByteBuffer bb = bytes();
@@ -42,6 +43,10 @@ public interface PathHandle extends Serializable {
     return ret;
     return ret;
   }
   }
 
 
+  /**
+   * Get the bytes of this path handle.
+   * @return the bytes to get to the process completing the upload.
+   */
   ByteBuffer bytes();
   ByteBuffer bytes();
 
 
   @Override
   @Override

+ 0 - 143
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/AbstractSystemMultipartUploaderTest.java

@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.tuple.Pair;
-
-import org.junit.Test;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-public abstract class AbstractSystemMultipartUploaderTest {
-
-  abstract FileSystem getFS() throws IOException;
-
-  abstract Path getBaseTestPath();
-
-  @Test
-  public void testMultipartUpload() throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (int i = 1; i <= 100; ++i) {
-      String contents = "ThisIsPart" + i + "\n";
-      sb.append(contents);
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-      partHandles.add(Pair.of(i, partHandle));
-    }
-    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
-    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
-    byte[] fileData = IOUtils.toByteArray(fs.open(file));
-    String readString = new String(fdData);
-    assertEquals(sb.toString(), readString);
-    assertArrayEquals(fdData, fileData);
-  }
-
-  @Test
-  public void testMultipartUploadReverseOrder() throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (int i = 1; i <= 100; ++i) {
-      String contents = "ThisIsPart" + i + "\n";
-      sb.append(contents);
-    }
-    for (int i = 100; i > 0; --i) {
-      String contents = "ThisIsPart" + i + "\n";
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-      partHandles.add(Pair.of(i, partHandle));
-    }
-    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
-    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
-    byte[] fileData = IOUtils.toByteArray(fs.open(file));
-    String readString = new String(fdData);
-    assertEquals(sb.toString(), readString);
-    assertArrayEquals(fdData, fileData);
-  }
-
-  @Test
-  public void testMultipartUploadReverseOrderNoNContiguousPartNumbers()
-      throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
-    StringBuilder sb = new StringBuilder();
-    for (int i = 2; i <= 200; i += 2) {
-      String contents = "ThisIsPart" + i + "\n";
-      sb.append(contents);
-    }
-    for (int i = 200; i > 0; i -= 2) {
-      String contents = "ThisIsPart" + i + "\n";
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-      partHandles.add(Pair.of(i, partHandle));
-    }
-    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
-    byte[] fdData = IOUtils.toByteArray(fs.open(fd));
-    byte[] fileData = IOUtils.toByteArray(fs.open(file));
-    String readString = new String(fdData);
-    assertEquals(sb.toString(), readString);
-    assertArrayEquals(fdData, fileData);
-  }
-
-  @Test
-  public void testMultipartUploadAbort() throws Exception {
-    FileSystem fs = getFS();
-    Path file = new Path(getBaseTestPath(), "some-file");
-    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
-    UploadHandle uploadHandle = mpu.initialize(file);
-    for (int i = 100; i >= 50; --i) {
-      String contents = "ThisIsPart" + i + "\n";
-      int len = contents.getBytes().length;
-      InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle, len);
-    }
-    mpu.abort(file, uploadHandle);
-
-    String contents = "ThisIsPart49\n";
-    int len = contents.getBytes().length;
-    InputStream is = IOUtils.toInputStream(contents, "UTF-8");
-
-    try {
-      mpu.putPart(file, is, 49, uploadHandle, len);
-      fail("putPart should have thrown an exception");
-    } catch (IOException ok) {
-      // ignore
-    }
-  }
-}

+ 300 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractMultipartUploaderTest.java

@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.security.MessageDigest;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.google.common.base.Charsets;
+import org.junit.Test;
+
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.fs.BBUploadHandle;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MultipartUploader;
+import org.apache.hadoop.fs.MultipartUploaderFactory;
+import org.apache.hadoop.fs.PartHandle;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathHandle;
+import org.apache.hadoop.fs.UploadHandle;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.verifyPathExists;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+public abstract class AbstractContractMultipartUploaderTest extends
+    AbstractFSContractTestBase {
+
+  /**
+   * The payload is the part number repeated for the length of the part.
+   * This makes checking the correctness of the upload straightforward.
+   * @param partNumber part number
+   * @return the bytes to upload.
+   */
+  private byte[] generatePayload(int partNumber) {
+    int sizeInBytes = partSizeInBytes();
+    ByteBuffer buffer = ByteBuffer.allocate(sizeInBytes);
+    for (int i=0 ; i < sizeInBytes/(Integer.SIZE/Byte.SIZE); ++i) {
+      buffer.putInt(partNumber);
+    }
+    return buffer.array();
+  }
+
+  /**
+   * Load a path, make an MD5 digest.
+   * @param path path to load
+   * @return the digest array
+   * @throws IOException failure to read or digest the file.
+   */
+  protected byte[] digest(Path path) throws IOException {
+    FileSystem fs = getFileSystem();
+    try (InputStream in = fs.open(path)) {
+      byte[] fdData = IOUtils.toByteArray(in);
+      MessageDigest newDigest = DigestUtils.getMd5Digest();
+      return newDigest.digest(fdData);
+    }
+  }
+
+  /**
+   * Get the partition size in bytes to use for each upload.
+   * @return a number > 0
+   */
+  protected abstract int partSizeInBytes();
+
+  /**
+   * Get the number of test payloads to upload.
+   * @return a number > 1
+   */
+  protected int getTestPayloadCount() {
+    return 10;
+  }
+
+  /**
+   * Assert that a multipart upload is successful.
+   * @throws Exception failure
+   */
+  @Test
+  public void testSingleUpload() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testSingleUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    byte[] payload = generatePayload(1);
+    origDigest.update(payload);
+    InputStream is = new ByteArrayInputStream(payload);
+    PartHandle partHandle = mpu.putPart(file, is, 1, uploadHandle,
+        payload.length);
+    partHandles.add(Pair.of(1, partHandle));
+    PathHandle fd = completeUpload(file, mpu, uploadHandle, partHandles,
+        origDigest,
+        payload.length);
+
+    // Complete is idempotent
+    PathHandle fd2 = mpu.complete(file, partHandles, uploadHandle);
+    assertArrayEquals("Path handles differ", fd.toByteArray(),
+        fd2.toByteArray());
+  }
+
+  private PathHandle completeUpload(final Path file,
+      final MultipartUploader mpu,
+      final UploadHandle uploadHandle,
+      final List<Pair<Integer, PartHandle>> partHandles,
+      final MessageDigest origDigest,
+      final int expectedLength) throws IOException {
+    PathHandle fd = mpu.complete(file, partHandles, uploadHandle);
+
+    FileStatus status = verifyPathExists(getFileSystem(),
+        "Completed file", file);
+    assertEquals("length of " + status,
+        expectedLength, status.getLen());
+
+    assertArrayEquals("digest of source and " + file
+            + " differ",
+        origDigest.digest(), digest(file));
+    return fd;
+  }
+
+  /**
+   * Assert that a multipart upload is successful.
+   * @throws Exception failure
+   */
+  @Test
+  public void testMultipartUpload() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testMultipartUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    final int payloadCount = getTestPayloadCount();
+    for (int i = 1; i <= payloadCount; ++i) {
+      byte[] payload = generatePayload(i);
+      origDigest.update(payload);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+        payloadCount * partSizeInBytes());
+  }
+
+  /**
+   * Assert that a multipart upload is successful even when the parts are
+   * given in the reverse order.
+   */
+  @Test
+  public void testMultipartUploadReverseOrder() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testMultipartUploadReverseOrder");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    final int payloadCount = getTestPayloadCount();
+    for (int i = 1; i <= payloadCount; ++i) {
+      byte[] payload = generatePayload(i);
+      origDigest.update(payload);
+    }
+    for (int i = payloadCount; i > 0; --i) {
+      byte[] payload = generatePayload(i);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+        payloadCount * partSizeInBytes());
+  }
+
+  /**
+   * Assert that a multipart upload is successful even when the parts are
+   * given in reverse order and the part numbers are not contiguous.
+   */
+  @Test
+  public void testMultipartUploadReverseOrderNonContiguousPartNumbers()
+      throws Exception {
+    describe("Upload in reverse order and the part numbers are not contiguous");
+    FileSystem fs = getFileSystem();
+    Path file = path("testMultipartUploadReverseOrderNonContiguousPartNumbers");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    MessageDigest origDigest = DigestUtils.getMd5Digest();
+    int payloadCount = 2 * getTestPayloadCount();
+    for (int i = 2; i <= payloadCount; i += 2) {
+      byte[] payload = generatePayload(i);
+      origDigest.update(payload);
+    }
+    for (int i = payloadCount; i > 0; i -= 2) {
+      byte[] payload = generatePayload(i);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    completeUpload(file, mpu, uploadHandle, partHandles, origDigest,
+        getTestPayloadCount() * partSizeInBytes());
+  }
+
+  /**
+   * Assert that when we abort a multipart upload, the resulting file does
+   * not show up.
+   */
+  @Test
+  public void testMultipartUploadAbort() throws Exception {
+    describe("Upload and then abort it before completing");
+    FileSystem fs = getFileSystem();
+    Path file = path("testMultipartUploadAbort");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle uploadHandle = mpu.initialize(file);
+    List<Pair<Integer, PartHandle>> partHandles = new ArrayList<>();
+    for (int i = 20; i >= 10; --i) {
+      byte[] payload = generatePayload(i);
+      InputStream is = new ByteArrayInputStream(payload);
+      PartHandle partHandle = mpu.putPart(file, is, i, uploadHandle,
+          payload.length);
+      partHandles.add(Pair.of(i, partHandle));
+    }
+    mpu.abort(file, uploadHandle);
+
+    String contents = "ThisIsPart49\n";
+    int len = contents.getBytes(Charsets.UTF_8).length;
+    InputStream is = IOUtils.toInputStream(contents, "UTF-8");
+
+    intercept(IOException.class,
+        () -> mpu.putPart(file, is, 49, uploadHandle, len));
+    intercept(IOException.class,
+        () -> mpu.complete(file, partHandles, uploadHandle));
+
+    assertPathDoesNotExist("Uploaded file should not exist", file);
+  }
+
+  /**
+   * Trying to abort from an invalid handle must fail.
+   */
+  @Test
+  public void testAbortUnknownUpload() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testAbortUnknownUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(
+        "invalid-handle".getBytes(Charsets.UTF_8));
+    UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
+    intercept(FileNotFoundException.class, () -> mpu.abort(file, uploadHandle));
+  }
+
+  /**
+   * Trying to abort with a handle of size 0 must fail.
+   */
+  @Test
+  public void testAbortEmptyUploadHandle() throws Exception {
+    FileSystem fs = getFileSystem();
+    Path file = path("testAbortEmptyUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[0]);
+    UploadHandle uploadHandle = BBUploadHandle.from(byteBuffer);
+    intercept(IllegalArgumentException.class,
+        () -> mpu.abort(file, uploadHandle));
+  }
+
+  /**
+   * When we complete with no parts provided, it must fail.
+   */
+  @Test
+  public void testCompleteEmptyUpload() throws Exception {
+    describe("Expect an empty MPU to fail, but still be abortable");
+    FileSystem fs = getFileSystem();
+    Path dest = path("testCompleteEmptyUpload");
+    MultipartUploader mpu = MultipartUploaderFactory.get(fs, null);
+    UploadHandle handle = mpu.initialize(dest);
+    intercept(IOException.class,
+        () -> mpu.complete(dest, new ArrayList<>(), handle));
+    mpu.abort(dest, handle);
+  }
+}

+ 14 - 36
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/TestLocalFileSystemMultipartUploader.java → hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/localfs/TestLocalFSContractMultipartUploader.java

@@ -15,51 +15,29 @@
  * See the License for the specific language governing permissions and
  * See the License for the specific language governing permissions and
  * limitations under the License.
  * limitations under the License.
  */
  */
-package org.apache.hadoop.fs;
+package org.apache.hadoop.fs.contract.localfs;
 
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
-import static org.apache.hadoop.test.GenericTestUtils.getRandomizedTestDir;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-
-import java.io.File;
-import java.io.IOException;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
 
 
 /**
 /**
  * Test the FileSystemMultipartUploader on local file system.
  * Test the FileSystemMultipartUploader on local file system.
  */
  */
-public class TestLocalFileSystemMultipartUploader
-    extends AbstractSystemMultipartUploaderTest {
-
-  private static FileSystem fs;
-  private File tmp;
-
-  @BeforeClass
-  public static void init() throws IOException {
-    fs = LocalFileSystem.getLocal(new Configuration());
-  }
-
-  @Before
-  public void setup() throws IOException {
-    tmp = getRandomizedTestDir();
-    tmp.mkdirs();
-  }
-
-  @After
-  public void tearDown() throws IOException {
-    tmp.delete();
-  }
+public class TestLocalFSContractMultipartUploader
+    extends AbstractContractMultipartUploaderTest {
 
 
   @Override
   @Override
-  public FileSystem getFS() {
-    return fs;
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new LocalFSContract(conf);
   }
   }
 
 
+  /**
+   * There is no real need to upload any particular size.
+   * @return 1 kilobyte
+   */
   @Override
   @Override
-  public Path getBaseTestPath() {
-    return new Path(tmp.getAbsolutePath());
+  protected int partSizeInBytes() {
+    return 1024;
   }
   }
-
-}
+}

+ 0 - 76
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestHDFSMultipartUploader.java

@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *  or more contributor license agreements.  See the NOTICE file
- *  distributed with this work for additional information
- *  regarding copyright ownership.  The ASF licenses this file
- *  to you under the Apache License, Version 2.0 (the
- *  "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *       http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.fs;
-
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.rules.TestName;
-
-import java.io.IOException;
-
-public class TestHDFSMultipartUploader
-    extends AbstractSystemMultipartUploaderTest {
-
-  private static MiniDFSCluster cluster;
-  private Path tmp;
-
-  @Rule
-  public TestName name = new TestName();
-
-  @BeforeClass
-  public static void init() throws IOException {
-    HdfsConfiguration conf = new HdfsConfiguration();
-    cluster = new MiniDFSCluster.Builder(conf,
-          GenericTestUtils.getRandomizedTestDir())
-        .numDataNodes(1)
-        .build();
-    cluster.waitClusterUp();
-  }
-
-  @AfterClass
-  public static void cleanup() throws IOException {
-    if (cluster != null) {
-      cluster.shutdown();
-      cluster = null;
-    }
-  }
-
-  @Before
-  public void setup() throws IOException {
-    tmp = new Path(cluster.getFileSystem().getWorkingDirectory(),
-        name.getMethodName());
-    cluster.getFileSystem().mkdirs(tmp);
-  }
-
-  @Override
-  public FileSystem getFS() throws IOException {
-    return cluster.getFileSystem();
-  }
-
-  @Override
-  public Path getBaseTestPath() {
-    return tmp;
-  }
-
-}

+ 58 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/contract/hdfs/TestHDFSContractMultipartUploader.java

@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.hdfs;
+
+import java.io.IOException;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+
+/**
+ * Test MultipartUploader tests on HDFS.
+ */
+public class TestHDFSContractMultipartUploader extends
+    AbstractContractMultipartUploaderTest {
+
+  @BeforeClass
+  public static void createCluster() throws IOException {
+    HDFSContract.createCluster();
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws IOException {
+    HDFSContract.destroyCluster();
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new HDFSContract(conf);
+  }
+
+  /**
+   * HDFS doesn't have any restriction on the part size.
+   * @return 1KB
+   */
+  @Override
+  protected int partSizeInBytes() {
+    return 1024;
+  }
+}

+ 119 - 58
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AMultipartUploader.java

@@ -17,15 +17,26 @@
  */
  */
 package org.apache.hadoop.fs.s3a;
 package org.apache.hadoop.fs.s3a;
 
 
-import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
-import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
 import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
-import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.PartETag;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartRequest;
 import com.amazonaws.services.s3.model.UploadPartResult;
 import com.amazonaws.services.s3.model.UploadPartResult;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Charsets;
+import com.google.common.base.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BBPartHandle;
 import org.apache.hadoop.fs.BBPartHandle;
@@ -37,13 +48,8 @@ import org.apache.hadoop.fs.PartHandle;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.PathHandle;
 import org.apache.hadoop.fs.UploadHandle;
 import org.apache.hadoop.fs.UploadHandle;
-import org.apache.hadoop.hdfs.DFSUtilClient;
 
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.stream.Collectors;
+import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
 
 
 /**
 /**
  * MultipartUploader for S3AFileSystem. This uses the S3 multipart
  * MultipartUploader for S3AFileSystem. This uses the S3 multipart
@@ -53,6 +59,10 @@ public class S3AMultipartUploader extends MultipartUploader {
 
 
   private final S3AFileSystem s3a;
   private final S3AFileSystem s3a;
 
 
+  /** Header for Parts: {@value}. */
+
+  public static final String HEADER = "S3A-part01";
+
   public S3AMultipartUploader(FileSystem fs, Configuration conf) {
   public S3AMultipartUploader(FileSystem fs, Configuration conf) {
     if (!(fs instanceof S3AFileSystem)) {
     if (!(fs instanceof S3AFileSystem)) {
       throw new IllegalArgumentException(
       throw new IllegalArgumentException(
@@ -63,75 +73,72 @@ public class S3AMultipartUploader extends MultipartUploader {
 
 
   @Override
   @Override
   public UploadHandle initialize(Path filePath) throws IOException {
   public UploadHandle initialize(Path filePath) throws IOException {
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
     String key = s3a.pathToKey(filePath);
     String key = s3a.pathToKey(filePath);
-    InitiateMultipartUploadRequest request =
-        new InitiateMultipartUploadRequest(s3a.getBucket(), key);
-    LOG.debug("initialize request: {}", request);
-    InitiateMultipartUploadResult result = s3a.initiateMultipartUpload(request);
-    String uploadId = result.getUploadId();
+    String uploadId = writeHelper.initiateMultiPartUpload(key);
     return BBUploadHandle.from(ByteBuffer.wrap(
     return BBUploadHandle.from(ByteBuffer.wrap(
         uploadId.getBytes(Charsets.UTF_8)));
         uploadId.getBytes(Charsets.UTF_8)));
   }
   }
 
 
   @Override
   @Override
   public PartHandle putPart(Path filePath, InputStream inputStream,
   public PartHandle putPart(Path filePath, InputStream inputStream,
-      int partNumber, UploadHandle uploadId, long lengthInBytes) {
+      int partNumber, UploadHandle uploadId, long lengthInBytes)
+      throws IOException {
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
     String key = s3a.pathToKey(filePath);
     String key = s3a.pathToKey(filePath);
-    UploadPartRequest request = new UploadPartRequest();
     byte[] uploadIdBytes = uploadId.toByteArray();
     byte[] uploadIdBytes = uploadId.toByteArray();
-    request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
-        Charsets.UTF_8));
-    request.setInputStream(inputStream);
-    request.setPartSize(lengthInBytes);
-    request.setPartNumber(partNumber);
-    request.setBucketName(s3a.getBucket());
-    request.setKey(key);
-    LOG.debug("putPart request: {}", request);
-    UploadPartResult result = s3a.uploadPart(request);
+    String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    UploadPartRequest request = writeHelper.newUploadPartRequest(key,
+        uploadIdString, partNumber, (int) lengthInBytes, inputStream, null, 0L);
+    UploadPartResult result = writeHelper.uploadPart(request);
     String eTag = result.getETag();
     String eTag = result.getETag();
-    return BBPartHandle.from(ByteBuffer.wrap(eTag.getBytes(Charsets.UTF_8)));
+    return BBPartHandle.from(
+        ByteBuffer.wrap(
+            buildPartHandlePayload(eTag, lengthInBytes)));
   }
   }
 
 
   @Override
   @Override
   public PathHandle complete(Path filePath,
   public PathHandle complete(Path filePath,
-      List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId) {
-    String key = s3a.pathToKey(filePath);
-    CompleteMultipartUploadRequest request =
-        new CompleteMultipartUploadRequest();
-    request.setBucketName(s3a.getBucket());
-    request.setKey(key);
+      List<Pair<Integer, PartHandle>> handles, UploadHandle uploadId)
+      throws IOException {
     byte[] uploadIdBytes = uploadId.toByteArray();
     byte[] uploadIdBytes = uploadId.toByteArray();
-    request.setUploadId(new String(uploadIdBytes, 0, uploadIdBytes.length,
-        Charsets.UTF_8));
-    List<PartETag> eTags = handles
-        .stream()
-        .map(handle -> {
-          byte[] partEtagBytes = handle.getRight().toByteArray();
-          return new PartETag(handle.getLeft(),
-              new String(partEtagBytes, 0, partEtagBytes.length,
-                  Charsets.UTF_8));
-        })
-        .collect(Collectors.toList());
-    request.setPartETags(eTags);
-    LOG.debug("Complete request: {}", request);
-    CompleteMultipartUploadResult completeMultipartUploadResult =
-        s3a.getAmazonS3Client().completeMultipartUpload(request);
-
-    byte[] eTag = DFSUtilClient.string2Bytes(
-        completeMultipartUploadResult.getETag());
+    checkUploadId(uploadIdBytes);
+    if (handles.isEmpty()) {
+      throw new IOException("Empty upload");
+    }
+
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
+    String key = s3a.pathToKey(filePath);
+
+    String uploadIdStr = new String(uploadIdBytes, 0, uploadIdBytes.length,
+        Charsets.UTF_8);
+    ArrayList<PartETag> eTags = new ArrayList<>();
+    eTags.ensureCapacity(handles.size());
+    long totalLength = 0;
+    for (Pair<Integer, PartHandle> handle : handles) {
+      byte[] payload = handle.getRight().toByteArray();
+      Pair<Long, String> result = parsePartHandlePayload(payload);
+      totalLength += result.getLeft();
+      eTags.add(new PartETag(handle.getLeft(), result.getRight()));
+    }
+    AtomicInteger errorCount = new AtomicInteger(0);
+    CompleteMultipartUploadResult result = writeHelper.completeMPUwithRetries(
+        key, uploadIdStr, eTags, totalLength, errorCount);
+
+    byte[] eTag = result.getETag().getBytes(Charsets.UTF_8);
     return (PathHandle) () -> ByteBuffer.wrap(eTag);
     return (PathHandle) () -> ByteBuffer.wrap(eTag);
   }
   }
 
 
   @Override
   @Override
-  public void abort(Path filePath, UploadHandle uploadId) {
+  public void abort(Path filePath, UploadHandle uploadId) throws IOException {
+    final byte[] uploadIdBytes = uploadId.toByteArray();
+    checkUploadId(uploadIdBytes);
+    final WriteOperationHelper writeHelper = s3a.getWriteOperationHelper();
     String key = s3a.pathToKey(filePath);
     String key = s3a.pathToKey(filePath);
-    byte[] uploadIdBytes = uploadId.toByteArray();
     String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
     String uploadIdString = new String(uploadIdBytes, 0, uploadIdBytes.length,
         Charsets.UTF_8);
         Charsets.UTF_8);
-    AbortMultipartUploadRequest request = new AbortMultipartUploadRequest(s3a
-        .getBucket(), key, uploadIdString);
-    LOG.debug("Abort request: {}", request);
-    s3a.getAmazonS3Client().abortMultipartUpload(request);
+    writeHelper.abortMultipartCommit(key, uploadIdString);
   }
   }
 
 
   /**
   /**
@@ -141,10 +148,64 @@ public class S3AMultipartUploader extends MultipartUploader {
     @Override
     @Override
     protected MultipartUploader createMultipartUploader(FileSystem fs,
     protected MultipartUploader createMultipartUploader(FileSystem fs,
         Configuration conf) {
         Configuration conf) {
-      if (fs.getScheme().equals("s3a")) {
+      if (FS_S3A.equals(fs.getScheme())) {
         return new S3AMultipartUploader(fs, conf);
         return new S3AMultipartUploader(fs, conf);
       }
       }
       return null;
       return null;
     }
     }
   }
   }
+
+  private void checkUploadId(byte[] uploadId) throws IllegalArgumentException {
+    Preconditions.checkArgument(uploadId.length > 0,
+        "Empty UploadId is not valid");
+  }
+
+  /**
+   * Build the payload for marshalling.
+   * @param eTag upload etag
+   * @param len length
+   * @return a byte array to marshall.
+   * @throws IOException error writing the payload
+   */
+  @VisibleForTesting
+  static byte[] buildPartHandlePayload(String eTag, long len)
+      throws IOException {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(eTag),
+        "Empty etag");
+    Preconditions.checkArgument(len > 0,
+        "Invalid length");
+
+    ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+    try(DataOutputStream output = new DataOutputStream(bytes)) {
+      output.writeUTF(HEADER);
+      output.writeLong(len);
+      output.writeUTF(eTag);
+    }
+    return bytes.toByteArray();
+  }
+
+  /**
+   * Parse the payload marshalled as a part handle.
+   * @param data handle data
+   * @return the length and etag
+   * @throws IOException error reading the payload
+   */
+  static Pair<Long, String> parsePartHandlePayload(byte[] data)
+      throws IOException {
+
+    try(DataInputStream input =
+            new DataInputStream(new ByteArrayInputStream(data))) {
+      final String header = input.readUTF();
+      if (!HEADER.equals(header)) {
+        throw new IOException("Wrong header string: \"" + header + "\"");
+      }
+      final long len = input.readLong();
+      final String etag = input.readUTF();
+      if (len <= 0) {
+        throw new IOException("Negative length");
+      }
+      return Pair.of(len, etag);
+    }
+  }
+
 }
 }

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

@@ -219,6 +219,10 @@ public class WriteOperationHelper {
       List<PartETag> partETags,
       List<PartETag> partETags,
       long length,
       long length,
       Retried retrying) throws IOException {
       Retried retrying) throws IOException {
+    if (partETags.isEmpty()) {
+      throw new IOException(
+          "No upload parts in multipart upload to " + destKey);
+    }
     return invoker.retry("Completing multipart commit", destKey,
     return invoker.retry("Completing multipart commit", destKey,
         true,
         true,
         retrying,
         retrying,

+ 0 - 0
hadoop-tools/hadoop-aws/src/main/resources/META-INF/org.apache.hadoop.fs.MultipartUploaderFactory → hadoop-tools/hadoop-aws/src/main/resources/META-INF/services/org.apache.hadoop.fs.MultipartUploaderFactory


+ 116 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractMultipartUploader.java

@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.contract.s3a;
+
+import java.io.IOException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractContractMultipartUploaderTest;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.WriteOperationHelper;
+
+import static org.apache.hadoop.fs.s3a.S3ATestConstants.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.scale.AbstractSTestS3AHugeFiles.DEFAULT_HUGE_PARTITION_SIZE;
+
+/**
+ * Test MultipartUploader with S3A.
+ */
+public class ITestS3AContractMultipartUploader extends
+    AbstractContractMultipartUploaderTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3AContractMultipartUploader.class);
+
+  private int partitionSize;
+
+  /**
+   * S3 requires a minimum part size of 5MB (except the last part).
+   * @return 5MB
+   */
+  @Override
+  protected int partSizeInBytes() {
+    return partitionSize;
+  }
+
+  @Override
+  protected int getTestPayloadCount() {
+    return 3;
+  }
+
+  @Override
+  public S3AFileSystem getFileSystem() {
+    return (S3AFileSystem) super.getFileSystem();
+  }
+
+  /**
+   * Create a configuration, possibly patching in S3Guard options.
+   * @return a configuration
+   */
+  @Override
+  protected Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    maybeEnableS3Guard(conf);
+    return conf;
+  }
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    Configuration conf = getContract().getConf();
+    boolean enabled = getTestPropertyBool(
+        conf,
+        KEY_SCALE_TESTS_ENABLED,
+        DEFAULT_SCALE_TESTS_ENABLED);
+    assume("Scale test disabled: to enable set property " +
+            KEY_SCALE_TESTS_ENABLED,
+        enabled);
+    partitionSize = (int) getTestPropertyBytes(conf,
+        KEY_HUGE_PARTITION_SIZE,
+        DEFAULT_HUGE_PARTITION_SIZE);
+  }
+
+  /**
+   * Extend superclass teardown with actions to help clean up the S3 store,
+   * including aborting uploads under the test path.
+   */
+  @Override
+  public void teardown() throws Exception {
+    Path teardown = path("teardown").getParent();
+    S3AFileSystem fs = getFileSystem();
+    WriteOperationHelper helper = fs.getWriteOperationHelper();
+    try {
+      LOG.info("Teardown: aborting outstanding uploads under {}", teardown);
+      int count = helper.abortMultipartUploadsUnderPath(fs.pathToKey(teardown));
+      LOG.info("Found {} incomplete uploads", count);
+    } catch (IOException e) {
+      LOG.warn("IOE in teardown", e);
+    }
+    super.teardown();
+  }
+}

+ 5 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestConstants.java

@@ -105,6 +105,11 @@ public interface S3ATestConstants {
    */
    */
   String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize";
   String KEY_HUGE_PARTITION_SIZE = S3A_SCALE_TEST + "huge.partitionsize";
 
 
+  /**
+   * Size of partitions to upload: {@value}.
+   */
+  String DEFAULT_HUGE_PARTITION_SIZE = "8M";
+
   /**
   /**
    * The default huge size is small —full 5GB+ scale tests are something
    * The default huge size is small —full 5GB+ scale tests are something
    * to run in long test runs on EC2 VMs. {@value}.
    * to run in long test runs on EC2 VMs. {@value}.

+ 84 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AMultipartUploaderSupport.java

@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+import java.io.IOException;
+
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.*;
+import static org.apache.hadoop.fs.s3a.S3AMultipartUploader.parsePartHandlePayload;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Test multipart upload support methods and classes.
+ */
+public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
+
+  @Test
+  public void testRoundTrip() throws Throwable {
+    Pair<Long, String> result = roundTrip("tag", 1);
+    assertEquals("tag", result.getRight());
+    assertEquals(1, result.getLeft().longValue());
+  }
+
+  @Test
+  public void testRoundTrip2() throws Throwable {
+    long len = 1L + Integer.MAX_VALUE;
+    Pair<Long, String> result = roundTrip("11223344",
+        len);
+    assertEquals("11223344", result.getRight());
+    assertEquals(len, result.getLeft().longValue());
+  }
+
+  @Test
+  public void testNoEtag() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload("", 1));
+  }
+
+  @Test
+  public void testNoLen() throws Throwable {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload("tag", 0));
+  }
+
+  @Test
+  public void testBadPayload() throws Throwable {
+    intercept(EOFException.class,
+        () -> parsePartHandlePayload(new byte[0]));
+  }
+
+  @Test
+  public void testBadHeader() throws Throwable {
+    byte[] bytes = buildPartHandlePayload("tag", 1);
+    bytes[2]='f';
+    intercept(IOException.class, "header",
+        () -> parsePartHandlePayload(bytes));
+  }
+
+  private Pair<Long, String> roundTrip(final String tag, final long len) throws IOException {
+    byte[] bytes = buildPartHandlePayload(tag, len);
+    return parsePartHandlePayload(bytes);
+  }
+}

+ 3 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java

@@ -83,7 +83,9 @@ public class TestStagingPartitionedJobCommit
           commit.setDestinationKey(key);
           commit.setDestinationKey(key);
           commit.setUri("s3a://" + BUCKET + "/" + key);
           commit.setUri("s3a://" + BUCKET + "/" + key);
           commit.setUploadId(UUID.randomUUID().toString());
           commit.setUploadId(UUID.randomUUID().toString());
-          commit.setEtags(new ArrayList<>());
+          ArrayList<String> etags = new ArrayList<>();
+          etags.add("tag1");
+          commit.setEtags(etags);
           pending.add(commit);
           pending.add(commit);
         }
         }
       }
       }

+ 2 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java

@@ -64,7 +64,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(
   private static final Logger LOG = LoggerFactory.getLogger(
       AbstractSTestS3AHugeFiles.class);
       AbstractSTestS3AHugeFiles.class);
   public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
   public static final int DEFAULT_UPLOAD_BLOCKSIZE = 64 * _1KB;
-  public static final String DEFAULT_PARTITION_SIZE = "8M";
+
   private Path scaleTestDir;
   private Path scaleTestDir;
   private Path hugefile;
   private Path hugefile;
   private Path hugefileRenamed;
   private Path hugefileRenamed;
@@ -101,7 +101,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     Configuration conf = super.createScaleConfiguration();
     Configuration conf = super.createScaleConfiguration();
     partitionSize = (int) getTestPropertyBytes(conf,
     partitionSize = (int) getTestPropertyBytes(conf,
         KEY_HUGE_PARTITION_SIZE,
         KEY_HUGE_PARTITION_SIZE,
-        DEFAULT_PARTITION_SIZE);
+        DEFAULT_HUGE_PARTITION_SIZE);
     assertTrue("Partition size too small: " + partitionSize,
     assertTrue("Partition size too small: " + partitionSize,
         partitionSize > MULTIPART_MIN_SIZE);
         partitionSize > MULTIPART_MIN_SIZE);
     conf.setLong(SOCKET_SEND_BUFFER, _1MB);
     conf.setLong(SOCKET_SEND_BUFFER, _1MB);

+ 5 - 0
hadoop-tools/hadoop-aws/src/test/resources/contract/s3a.xml

@@ -107,6 +107,11 @@
     <value>true</value>
     <value>true</value>
   </property>
   </property>
 
 
+  <property>
+    <name>fs.contract.supports-multipartuploader</name>
+    <value>true</value>
+  </property>
+
   <property>
   <property>
     <name>fs.contract.supports-unix-permissions</name>
     <name>fs.contract.supports-unix-permissions</name>
     <value>false</value>
     <value>false</value>