Sfoglia il codice sorgente

HADOOP-15224. S3A: Add option to set checksum on S3 objects (#7396)

Add the property fs.s3a.create.checksum.algorithm that allow users to specify a checksum algorithm (CRC32, CRC32C, SHA1, or SHA256) to be used by the AWS SDK to generate the checksum for object integrity check.

Contributed by Raphael Azzolini
Raphael Azzolini 1 mese fa
parent
commit
f7a331d13f
21 ha cambiato i file con 951 aggiunte e 36 eliminazioni
  1. 9 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  2. 4 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
  3. 2 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  4. 10 9
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java
  5. 136 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java
  6. 7 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java
  7. 71 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java
  8. 50 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java
  9. 70 7
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java
  10. 9 0
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
  11. 22 0
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
  12. 120 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java
  13. 16 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java
  14. 5 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java
  15. 4 3
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java
  16. 169 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java
  17. 1 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java
  18. 4 3
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java
  19. 60 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java
  20. 66 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java
  21. 116 7
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java

+ 9 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1781,6 +1781,15 @@ public final class Constants {
    */
   public static final boolean CHECKSUM_VALIDATION_DEFAULT = false;
 
+  /**
+   * Indicates the algorithm used to create the checksum for the object
+   * to be uploaded to S3. Unset by default. It supports the following values:
+   * 'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
+   * value:{@value}
+   */
+  public static final String CHECKSUM_ALGORITHM =
+      "fs.s3a.create.checksum.algorithm";
+
   /**
    * Are extensions classes, such as {@code fs.s3a.aws.credentials.provider},
    * going to be loaded from the same classloader that loaded

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

@@ -1064,6 +1064,10 @@ class S3ABlockOutputStream extends OutputStream implements
               return CompletedPart.builder()
                   .eTag(response.eTag())
                   .partNumber(currentPartNumber)
+                  .checksumCRC32(response.checksumCRC32())
+                  .checksumCRC32C(response.checksumCRC32C())
+                  .checksumSHA1(response.checksumSHA1())
+                  .checksumSHA256(response.checksumSHA256())
                   .build();
             } catch (Exception e) {
               final IOException ex = e instanceof IOException

+ 2 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -117,6 +117,7 @@ import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperation;
 import org.apache.hadoop.fs.s3a.impl.BulkDeleteOperationCallbacksImpl;
 import org.apache.hadoop.fs.s3a.impl.CSES3AFileSystemOperations;
 import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
 import org.apache.hadoop.fs.s3a.impl.ClientManager;
 import org.apache.hadoop.fs.s3a.impl.ClientManagerImpl;
 import org.apache.hadoop.fs.s3a.impl.ConfigurationHelper;
@@ -1327,6 +1328,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         .withStorageClass(storageClass)
         .withMultipartUploadEnabled(isMultipartUploadEnabled)
         .withPartUploadTimeout(partUploadTimeout)
+        .withChecksumAlgorithm(ChecksumSupport.getChecksumAlgorithm(getConf()))
         .build();
   }
 

+ 10 - 9
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/SinglePendingCommit.java

@@ -71,7 +71,7 @@ import static org.apache.hadoop.util.StringUtils.join;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommit>
-    implements Iterable<String> {
+    implements Iterable<UploadEtag> {
 
   /**
    * Serialization ID: {@value}.
@@ -118,7 +118,7 @@ public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommi
   private String text = "";
 
   /** Ordered list of etags. */
-  private List<String> etags;
+  private List<UploadEtag> etags;
 
   /**
    * Any custom extra data committer subclasses may choose to add.
@@ -222,7 +222,7 @@ public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommi
     for (CompletedPart part : parts) {
       verify(part.partNumber() == counter,
           "Expected part number %s but got %s", counter, part.partNumber());
-      etags.add(part.eTag());
+      etags.add(UploadEtag.fromCompletedPart(part));
       counter++;
     }
   }
@@ -237,9 +237,10 @@ public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommi
     verify(length >= 0, "Invalid length: " + length);
     destinationPath();
     verify(etags != null, "No etag list");
-    validateCollectionClass(etags, String.class);
-    for (String etag : etags) {
-      verify(StringUtils.isNotEmpty(etag), "Empty etag");
+    validateCollectionClass(etags, UploadEtag.class);
+    for (UploadEtag etag : etags) {
+      verify(etag != null && StringUtils.isNotEmpty(etag.getEtag()),
+          "Empty etag");
     }
     if (extraData != null) {
       validateCollectionClass(extraData.keySet(), String.class);
@@ -313,7 +314,7 @@ public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommi
    * @return an iterator.
    */
   @Override
-  public Iterator<String> iterator() {
+  public Iterator<UploadEtag> iterator() {
     return etags.iterator();
   }
 
@@ -442,11 +443,11 @@ public class SinglePendingCommit extends PersistentCommitData<SinglePendingCommi
   }
 
   /** @return ordered list of etags. */
-  public List<String> getEtags() {
+  public List<UploadEtag> getEtags() {
     return etags;
   }
 
-  public void setEtags(List<String> etags) {
+  public void setEtags(List<UploadEtag> etags) {
     this.etags = etags;
   }
 

+ 136 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/files/UploadEtag.java

@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import java.io.Serializable;
+import java.util.StringJoiner;
+
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+/**
+ * Stores ETag and checksum values from {@link  CompletedPart} responses from S3.
+ * These values need to be stored to be later passed to the
+ * {@link software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest
+ * CompleteMultipartUploadRequest}
+ */
+public class UploadEtag implements Serializable {
+
+  /**
+   * Serialization ID: {@value}.
+   */
+  private static final long serialVersionUID = 1L;
+
+  private String etag;
+  private String checksumAlgorithm;
+  private String checksum;
+
+  public UploadEtag() {
+  }
+
+  public UploadEtag(String etag, String checksumAlgorithm, String checksum) {
+    this.etag = etag;
+    this.checksumAlgorithm = checksumAlgorithm;
+    this.checksum = checksum;
+  }
+
+  public String getEtag() {
+    return etag;
+  }
+
+  public void setEtag(String etag) {
+    this.etag = etag;
+  }
+
+  public String getChecksumAlgorithm() {
+    return checksumAlgorithm;
+  }
+
+  public void setChecksumAlgorithm(String checksumAlgorithm) {
+    this.checksumAlgorithm = checksumAlgorithm;
+  }
+
+  public String getChecksum() {
+    return checksum;
+  }
+
+  public void setChecksum(String checksum) {
+    this.checksum = checksum;
+  }
+
+  public static UploadEtag fromCompletedPart(CompletedPart completedPart) {
+    UploadEtag uploadEtag = new UploadEtag();
+    uploadEtag.setEtag(completedPart.eTag());
+    if (completedPart.checksumCRC32() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32.toString());
+      uploadEtag.setChecksum(completedPart.checksumCRC32());
+    }
+    if (completedPart.checksumCRC32C() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.CRC32_C.toString());
+      uploadEtag.setChecksum(completedPart.checksumCRC32C());
+    }
+    if (completedPart.checksumSHA1() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA1.toString());
+      uploadEtag.setChecksum(completedPart.checksumSHA1());
+    }
+    if (completedPart.checksumSHA256() != null) {
+      uploadEtag.setChecksumAlgorithm(ChecksumAlgorithm.SHA256.toString());
+      uploadEtag.setChecksum(completedPart.checksumSHA256());
+    }
+    return uploadEtag;
+  }
+
+  public static CompletedPart toCompletedPart(UploadEtag uploadEtag, int partNumber) {
+    final CompletedPart.Builder builder = CompletedPart.builder()
+        .partNumber(partNumber)
+        .eTag(uploadEtag.etag);
+    if (uploadEtag.checksumAlgorithm == null) {
+      return builder.build();
+    }
+    final ChecksumAlgorithm checksumAlgorithm = ChecksumAlgorithm.fromValue(
+        uploadEtag.checksumAlgorithm);
+    switch (checksumAlgorithm) {
+    case CRC32:
+      builder.checksumCRC32(uploadEtag.checksum);
+      break;
+    case CRC32_C:
+      builder.checksumCRC32C(uploadEtag.checksum);
+      break;
+    case SHA1:
+      builder.checksumSHA1(uploadEtag.checksum);
+      break;
+    case SHA256:
+      builder.checksumSHA256(uploadEtag.checksum);
+      break;
+    default:
+      // do nothing
+    }
+    return builder.build();
+  }
+
+  @Override
+  public String toString() {
+    return new StringJoiner(", ", UploadEtag.class.getSimpleName() + "[", "]")
+        .add("serialVersionUID='" + serialVersionUID + "'")
+        .add("etag='" + etag + "'")
+        .add("checksumAlgorithm='" + checksumAlgorithm + "'")
+        .add("checksum='" + checksum + "'")
+        .toString();
+  }
+}

+ 7 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/impl/CommitOperations.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.fs.s3a.S3AUtils;
 import org.apache.hadoop.fs.s3a.WriteOperations;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.files.SuccessData;
@@ -165,9 +166,9 @@ public class CommitOperations extends AbstractStoreOperation
    * @param tagIds list of tags
    * @return same list, now in numbered tuples
    */
-  public static List<CompletedPart> toPartEtags(List<String> tagIds) {
+  public static List<CompletedPart> toPartEtags(List<UploadEtag> tagIds) {
     return IntStream.range(0, tagIds.size())
-        .mapToObj(i -> CompletedPart.builder().partNumber(i + 1).eTag(tagIds.get(i)).build())
+        .mapToObj(i -> UploadEtag.toCompletedPart(tagIds.get(i), i + 1))
         .collect(Collectors.toList());
   }
 
@@ -655,6 +656,10 @@ public class CommitOperations extends AbstractStoreOperation
       parts.add(CompletedPart.builder()
           .partNumber(partNumber)
           .eTag(response.eTag())
+          .checksumCRC32(response.checksumCRC32())
+          .checksumCRC32C(response.checksumCRC32C())
+          .checksumSHA1(response.checksumSHA1())
+          .checksumSHA256(response.checksumSHA256())
           .build());
     }
     return parts;

+ 71 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/ChecksumSupport.java

@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.util.Set;
+
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ConfigurationHelper;
+
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+
+/**
+ * Utility class to support operations on S3 object checksum.
+ */
+public final class ChecksumSupport {
+
+  private ChecksumSupport() {
+  }
+
+  /**
+   * Checksum algorithms that are supported by S3A.
+   */
+  private static final Set<ChecksumAlgorithm> SUPPORTED_CHECKSUM_ALGORITHMS = ImmutableSet.of(
+      ChecksumAlgorithm.CRC32,
+      ChecksumAlgorithm.CRC32_C,
+      ChecksumAlgorithm.SHA1,
+      ChecksumAlgorithm.SHA256);
+
+  /**
+   * Get the checksum algorithm to be used for data integrity check of the objects in S3.
+   * This operation includes validating if the provided value is a supported checksum algorithm.
+   * @param conf configuration to scan
+   * @return the checksum algorithm to be passed on S3 requests
+   * @throws IllegalArgumentException if the checksum algorithm is not known or not supported
+   */
+  public static ChecksumAlgorithm getChecksumAlgorithm(Configuration conf) {
+    return ConfigurationHelper.resolveEnum(conf,
+        CHECKSUM_ALGORITHM,
+        ChecksumAlgorithm.class,
+        configValue -> {
+          if (StringUtils.isBlank(configValue)) {
+            return null;
+          }
+          if (ChecksumAlgorithm.CRC32_C.toString().equalsIgnoreCase(configValue)) {
+            // In case the configuration value is CRC32C, without underscore.
+            return ChecksumAlgorithm.CRC32_C;
+          }
+          throw new IllegalArgumentException("Checksum algorithm is not supported: " + configValue);
+        });
+  }
+}

+ 50 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 
 import software.amazon.awssdk.core.SdkRequest;
 import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
 import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload;
 import software.amazon.awssdk.services.s3.model.CompletedPart;
@@ -62,6 +63,7 @@ import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
 
 import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PART_UPLOAD_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.SSE_C;
 import static org.apache.hadoop.fs.s3a.S3AEncryptionMethods.UNKNOWN_ALGORITHM;
 import static org.apache.hadoop.fs.s3a.impl.AWSClientConfig.setRequestTimeout;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
@@ -138,6 +140,11 @@ public class RequestFactoryImpl implements RequestFactory {
    */
   private final Duration partUploadTimeout;
 
+  /**
+   * Indicates the algorithm used to create the checksum for the object to be uploaded to S3.
+   */
+  private final ChecksumAlgorithm checksumAlgorithm;
+
   /**
    * Constructor.
    * @param builder builder with all the configuration.
@@ -153,6 +160,7 @@ public class RequestFactoryImpl implements RequestFactory {
     this.storageClass = builder.storageClass;
     this.isMultipartUploadEnabled = builder.isMultipartUploadEnabled;
     this.partUploadTimeout = builder.partUploadTimeout;
+    this.checksumAlgorithm = builder.checksumAlgorithm;
   }
 
   /**
@@ -235,6 +243,10 @@ public class RequestFactoryImpl implements RequestFactory {
       copyObjectRequestBuilder.contentEncoding(contentEncoding);
     }
 
+    if (checksumAlgorithm != null) {
+      copyObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return copyObjectRequestBuilder;
   }
 
@@ -377,6 +389,10 @@ public class RequestFactoryImpl implements RequestFactory {
       putObjectRequestBuilder.contentEncoding(contentEncoding);
     }
 
+    if (checksumAlgorithm != null) {
+      putObjectRequestBuilder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return putObjectRequestBuilder;
   }
 
@@ -526,6 +542,10 @@ public class RequestFactoryImpl implements RequestFactory {
       requestBuilder.storageClass(storageClass);
     }
 
+    if (checksumAlgorithm != null) {
+      requestBuilder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return prepareRequest(requestBuilder);
   }
 
@@ -539,6 +559,16 @@ public class RequestFactoryImpl implements RequestFactory {
     CompleteMultipartUploadRequest.Builder requestBuilder =
         CompleteMultipartUploadRequest.builder().bucket(bucket).key(destKey).uploadId(uploadId)
             .multipartUpload(CompletedMultipartUpload.builder().parts(partETags).build());
+    // Correct SSE-C request parameters are required for this request when
+    // specifying checksums for each part
+    if (checksumAlgorithm != null && getServerSideEncryptionAlgorithm() == SSE_C) {
+      EncryptionSecretOperations.getSSECustomerKey(encryptionSecrets)
+          .ifPresent(base64customerKey -> requestBuilder
+              .sseCustomerAlgorithm(ServerSideEncryption.AES256.name())
+              .sseCustomerKey(base64customerKey)
+              .sseCustomerKeyMD5(
+                  Md5Utils.md5AsBase64(Base64.getDecoder().decode(base64customerKey))));
+    }
 
     return prepareRequest(requestBuilder);
   }
@@ -618,6 +648,11 @@ public class RequestFactoryImpl implements RequestFactory {
 
     // Set the request timeout for the part upload
     setRequestTimeout(builder, partUploadTimeout);
+
+    if (checksumAlgorithm != null) {
+      builder.checksumAlgorithm(checksumAlgorithm);
+    }
+
     return prepareRequest(builder);
   }
 
@@ -732,6 +767,11 @@ public class RequestFactoryImpl implements RequestFactory {
      */
     private Duration partUploadTimeout = DEFAULT_PART_UPLOAD_TIMEOUT;
 
+    /**
+     * Indicates the algorithm used to create the checksum for the object to be uploaded to S3.
+     */
+    private ChecksumAlgorithm checksumAlgorithm;
+
     private RequestFactoryBuilder() {
     }
 
@@ -841,6 +881,16 @@ public class RequestFactoryImpl implements RequestFactory {
       partUploadTimeout = value;
       return this;
     }
+
+    /**
+     * Indicates the algorithm used to create the checksum for the object to be uploaded to S3.
+     * @param value new value
+     * @return the builder
+     */
+    public RequestFactoryBuilder withChecksumAlgorithm(final ChecksumAlgorithm value) {
+      checksumAlgorithm = value;
+      return this;
+    }
   }
 
   /**

+ 70 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AMultipartUploader.java

@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashSet;
@@ -54,6 +55,7 @@ import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.UploadHandle;
 import org.apache.hadoop.fs.impl.AbstractMultipartUploader;
 import org.apache.hadoop.fs.s3a.WriteOperations;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.statistics.S3AMultipartUploaderStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 import org.apache.hadoop.util.Preconditions;
@@ -160,6 +162,13 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
           UploadPartResponse response = writeOperations.uploadPart(request, body, statistics);
           statistics.partPut(lengthInBytes);
           String eTag = response.eTag();
+          String checksumAlgorithm = null;
+          String checksum = null;
+          final Map.Entry<String, String> extractedChecksum = extractChecksum(response);
+          if (extractedChecksum != null) {
+            checksumAlgorithm = extractedChecksum.getKey();
+            checksum = extractedChecksum.getValue();
+          }
           return BBPartHandle.from(
               ByteBuffer.wrap(
                   buildPartHandlePayload(
@@ -167,7 +176,9 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
                       uploadIdString,
                       partNumber,
                       eTag,
-                      lengthInBytes)));
+                      lengthInBytes,
+                      checksumAlgorithm,
+                      checksum)));
         });
   }
 
@@ -203,8 +214,9 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
       payload.validate(uploadIdStr, filePath);
       ids.add(payload.getPartNumber());
       totalLength += payload.getLen();
-      eTags.add(
-          CompletedPart.builder().partNumber(handle.getKey()).eTag(payload.getEtag()).build());
+      final UploadEtag uploadEtag = new UploadEtag(payload.getEtag(),
+          payload.getChecksumAlgorithm(), payload.getChecksum());
+      eTags.add(UploadEtag.toCompletedPart(uploadEtag, handle.getKey()));
     }
     Preconditions.checkArgument(ids.size() == count,
         "Duplicate PartHandles");
@@ -270,6 +282,8 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
    * @param partNumber part number from response
    * @param etag upload etag
    * @param len length
+   * @param checksumAlgorithm checksum algorithm
+   * @param checksum checksum content
    * @return a byte array to marshall.
    * @throws IOException error writing the payload
    */
@@ -279,10 +293,12 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
       final String uploadId,
       final int partNumber,
       final String etag,
-      final long len)
+      final long len,
+      final String checksumAlgorithm,
+      final String checksum)
       throws IOException {
 
-    return new PartHandlePayload(path, uploadId, partNumber, len, etag)
+    return new PartHandlePayload(path, uploadId, partNumber, len, etag, checksumAlgorithm, checksum)
         .toBytes();
   }
 
@@ -308,11 +324,34 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
       final int partNumber = input.readInt();
       final long len = input.readLong();
       final String etag = input.readUTF();
+      String checksumAlgorithm = null;
+      String checksum = null;
+      if (input.available() > 0) {
+        checksumAlgorithm = input.readUTF();
+        checksum = input.readUTF();
+      }
       if (len < 0) {
         throw new IOException("Negative length");
       }
-      return new PartHandlePayload(path, uploadId, partNumber, len, etag);
+      return new PartHandlePayload(path, uploadId, partNumber, len, etag, checksumAlgorithm,
+          checksum);
+    }
+  }
+
+  static Map.Entry<String, String> extractChecksum(final UploadPartResponse uploadPartResponse) {
+    if (uploadPartResponse.checksumCRC32() != null) {
+      return new AbstractMap.SimpleEntry<>("CRC32", uploadPartResponse.checksumCRC32());
+    }
+    if (uploadPartResponse.checksumCRC32C() != null) {
+      return new AbstractMap.SimpleEntry<>("CRC32C", uploadPartResponse.checksumCRC32C());
+    }
+    if (uploadPartResponse.checksumSHA1() != null) {
+      return new AbstractMap.SimpleEntry<>("SHA1", uploadPartResponse.checksumSHA1());
+    }
+    if (uploadPartResponse.checksumSHA256() != null) {
+      return new AbstractMap.SimpleEntry<>("SHA256", uploadPartResponse.checksumSHA256());
     }
+    return null;
   }
 
   /**
@@ -332,12 +371,18 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
 
     private final String etag;
 
+    private final String checksumAlgorithm;
+
+    private final String checksum;
+
     private PartHandlePayload(
         final String path,
         final String uploadId,
         final int partNumber,
         final long len,
-        final String etag) {
+        final String etag,
+        final String checksumAlgorithm,
+        final String checksum) {
       Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
           "Empty etag");
       Preconditions.checkArgument(StringUtils.isNotEmpty(path),
@@ -346,12 +391,18 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
           "Empty uploadId");
       Preconditions.checkArgument(len >= 0,
           "Invalid length");
+      Preconditions.checkArgument((StringUtils.isNotEmpty(checksumAlgorithm) &&
+              StringUtils.isNotEmpty(checksum)) ||
+              (StringUtils.isEmpty(checksumAlgorithm) && StringUtils.isEmpty(checksum)),
+          "Checksum algorithm and checksum should be both provided or empty");
 
       this.path = path;
       this.uploadId = uploadId;
       this.partNumber = partNumber;
       this.len = len;
       this.etag = etag;
+      this.checksumAlgorithm = checksumAlgorithm;
+      this.checksum = checksum;
     }
 
     public String getPath() {
@@ -374,6 +425,14 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
       return uploadId;
     }
 
+    public String getChecksumAlgorithm() {
+      return checksumAlgorithm;
+    }
+
+    public String getChecksum() {
+      return checksum;
+    }
+
     public byte[] toBytes()
         throws IOException {
       Preconditions.checkArgument(StringUtils.isNotEmpty(etag),
@@ -389,6 +448,10 @@ class S3AMultipartUploader extends AbstractMultipartUploader {
         output.writeInt(partNumber);
         output.writeLong(len);
         output.writeUTF(etag);
+        if (checksumAlgorithm != null && checksum != null) {
+          output.writeUTF(checksumAlgorithm);
+          output.writeUTF(checksum);
+        }
       }
       return bytes.toByteArray();
     }

+ 9 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -926,6 +926,15 @@ Here are some the S3A properties for use in production.
   </description>
 </property>
 
+<property>
+  <name>fs.s3a.create.checksum.algorithm</name>
+  <description>
+    Indicates the algorithm used to create the checksum for the object
+    to be uploaded to S3. Unset by default. It supports the following values:
+    'CRC32', 'CRC32C', 'SHA1', and 'SHA256'
+  </description>
+</property>
+
 <!--
 The switch to turn S3A auditing on or off.
 -->

+ 22 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md

@@ -387,6 +387,28 @@ Happens if a multipart upload is being completed, but one of the parts is missin
 * A magic committer job's list of in-progress uploads somehow got corrupted
 * Bug in the S3A codebase (rare, but not impossible...)
 
+### <a name="object_lock_parameters"></a> Status Code 400 "Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters"
+```
+software.amazon.awssdk.services.s3.model.S3Exception: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: S3, Status Code: 400, Request ID: 1122334455, Extended Request ID: ...):
+InvalidRequest: Content-MD5 OR x-amz-checksum- HTTP header is required for Put Object requests with Object Lock parameters (Service: S3, Status Code: 400, Request ID: 1122334455, Extended Request ID: ...)
+```
+
+This error happens if the S3 bucket has [Object Lock](https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lock.html) enabled.
+
+The Content-MD5 or x-amz-sdk-checksum-algorithm header is required for any request to upload an object
+with a retention period configured using Object Lock.
+
+If Object Lock can't be disabled in the S3 bucket, set a checksum algorithm to be used in the
+uploads via the `fs.s3a.create.checksum.algorithm` property. Note that enabling checksum on uploads can
+affect the performance.
+
+```xml
+<property>
+  <name>fs.s3a.create.checksum.algorithm</name>
+  <value>SHA256</value>
+</property>
+```
+
 ## <a name="access_denied"></a> Access Denied
 
 HTTP error codes 401 and 403 are mapped to `AccessDeniedException` in the S3A connector.

+ 120 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AChecksum.java

@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+import software.amazon.awssdk.services.s3.model.ChecksumMode;
+import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
+import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.impl.ChecksumSupport;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.rm;
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.REJECT_OUT_OF_SPAN_OPERATIONS;
+
+/**
+ * Tests S3 checksum algorithm.
+ * If CHECKSUM_ALGORITHM config is not set in auth-keys.xml,
+ * SHA256 algorithm will be picked.
+ */
+public class ITestS3AChecksum extends AbstractS3ATestBase {
+
+  private static final ChecksumAlgorithm DEFAULT_CHECKSUM_ALGORITHM = ChecksumAlgorithm.SHA256;
+
+  private ChecksumAlgorithm checksumAlgorithm;
+
+  private static final int[] SIZES = {
+      1, 2, 3, 4, 5, 254, 255, 256, 257, 2 ^ 12 - 1
+  };
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf,
+        CHECKSUM_ALGORITHM,
+        REJECT_OUT_OF_SPAN_OPERATIONS);
+    S3ATestUtils.disableFilesystemCaching(conf);
+    checksumAlgorithm = ChecksumSupport.getChecksumAlgorithm(conf);
+    if (checksumAlgorithm == null) {
+      checksumAlgorithm = DEFAULT_CHECKSUM_ALGORITHM;
+      LOG.info("No checksum algorithm found in configuration, will use default {}",
+          checksumAlgorithm);
+      conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString());
+    }
+    conf.setBoolean(REJECT_OUT_OF_SPAN_OPERATIONS, false);
+    return conf;
+  }
+
+  @Test
+  public void testChecksum() throws IOException {
+    for (int size : SIZES) {
+      validateChecksumForFilesize(size);
+    }
+  }
+
+  private void validateChecksumForFilesize(int len) throws IOException {
+    describe("Create a file of size " + len);
+    String src = String.format("%s-%04x", methodName.getMethodName(), len);
+    Path path = writeThenReadFile(src, len);
+    assertChecksum(path);
+    rm(getFileSystem(), path, false, false);
+  }
+
+  private void assertChecksum(Path path) throws IOException {
+    final String key = getFileSystem().pathToKey(path);
+    HeadObjectRequest.Builder requestBuilder = getFileSystem().getRequestFactory()
+        .newHeadObjectRequestBuilder(key)
+        .checksumMode(ChecksumMode.ENABLED);
+    HeadObjectResponse headObject = getFileSystem().getS3AInternals()
+        .getAmazonS3Client("Call head object with checksum enabled")
+        .headObject(requestBuilder.build());
+    switch (checksumAlgorithm) {
+    case CRC32:
+      Assertions.assertThat(headObject.checksumCRC32())
+          .describedAs("headObject.checksumCRC32()")
+          .isNotNull();
+      break;
+    case CRC32_C:
+      Assertions.assertThat(headObject.checksumCRC32C())
+          .describedAs("headObject.checksumCRC32C()")
+          .isNotNull();
+      break;
+    case SHA1:
+      Assertions.assertThat(headObject.checksumSHA1())
+          .describedAs("headObject.checksumSHA1()")
+          .isNotNull();
+      break;
+    case SHA256:
+      Assertions.assertThat(headObject.checksumSHA256())
+          .describedAs("headObject.checksumSHA256()")
+          .isNotNull();
+      break;
+    default:
+      fail("Checksum algorithm not supported: " + checksumAlgorithm);
+    }
+  }
+
+}

+ 16 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ADeleteOnExit.java

@@ -19,7 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import static org.apache.hadoop.fs.s3a.Constants.FS_S3A;
-import static org.junit.Assert.assertEquals;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.mockito.ArgumentMatchers.argThat;
 import static org.mockito.Mockito.when;
 
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Date;
 
+import org.assertj.core.api.Assertions;
 import software.amazon.awssdk.services.s3.S3Client;
 import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
@@ -86,7 +87,20 @@ public class TestS3ADeleteOnExit extends AbstractS3AMockTest {
 
     testFs.deleteOnExit(path);
     testFs.close();
-    assertEquals(0, testFs.getDeleteOnDnExitCount());
+    Assertions.assertThat(testFs.getDeleteOnDnExitCount()).isEqualTo(0);
+  }
+
+  @Test
+  public void testCreateRequestFactoryWithInvalidChecksumAlgorithm() throws Exception {
+    Configuration conf = createConfiguration();
+    conf.set(Constants.CHECKSUM_ALGORITHM, "INVALID");
+    TestS3AFileSystem testFs  = new TestS3AFileSystem();
+    URI uri = URI.create(FS_S3A + "://" + BUCKET);
+    final IllegalArgumentException exception = intercept(IllegalArgumentException.class,
+        () -> testFs.initialize(uri, conf));
+    Assertions.assertThat(exception.getMessage())
+        .describedAs("Error message should say that INVALID is not supported")
+        .isEqualTo("Checksum algorithm is not supported: INVALID");
   }
 
   private ArgumentMatcher<HeadObjectRequest> correctGetMetadataRequest(

+ 5 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestCustomSigner.java

@@ -217,6 +217,11 @@ public class ITestCustomSigner extends AbstractS3ATestBase {
     conf.set(TEST_ID_KEY, identifier);
     conf.set(TEST_REGION_KEY, regionName);
 
+    // Having the checksum algorithm in this test causes
+    // x-amz-sdk-checksum-algorithm specified, but no corresponding
+    // x-amz-checksum-* or x-amz-trailer headers were found
+    conf.unset(Constants.CHECKSUM_ALGORITHM);
+
     // make absolutely sure there is no caching.
     disableFilesystemCaching(conf);
 

+ 4 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestCommitOperations.java

@@ -40,6 +40,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.auth.ProgressCounter;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitOperations;
@@ -442,11 +443,11 @@ public class ITestCommitOperations extends AbstractCommitITest {
     Assertions.assertThat(persisted.getSaved())
         .describedAs("saved timestamp in %s", persisted)
         .isGreaterThan(0);
-    List<String> etags = persisted.getEtags();
-    Assertions.assertThat(etags)
+    List<UploadEtag> uploadEtags = persisted.getEtags();
+    Assertions.assertThat(uploadEtags)
         .describedAs("Etag list")
         .hasSize(1);
-    Assertions.assertThat(CommitOperations.toPartEtags(etags))
+    Assertions.assertThat(uploadEtags)
         .describedAs("Etags to parts")
         .hasSize(1);
     return pendingDataPath;

+ 169 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/files/TestUploadEtag.java

@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.commit.files;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.s3.model.CompletedPart;
+
+public class TestUploadEtag {
+
+  @Test
+  public void testFromCompletedPartCRC32() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumCRC32("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be CRC32")
+        .isEqualTo("CRC32");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartCRC32C() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumCRC32C("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be CRC32C")
+        .isEqualTo("CRC32C");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartSHA1() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumSHA1("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be SHA1")
+        .isEqualTo("SHA1");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartSHA256() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .checksumSHA256("checksum")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm should be SHA256")
+        .isEqualTo("SHA256");
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testFromCompletedPartNoChecksum() {
+    final CompletedPart completedPart = CompletedPart.builder()
+        .eTag("tag")
+        .build();
+    final UploadEtag uploadEtag = UploadEtag.fromCompletedPart(completedPart);
+    Assertions.assertThat(uploadEtag.getEtag())
+        .describedAs("Etag mismatch")
+        .isEqualTo("tag");
+    Assertions.assertThat(uploadEtag.getChecksumAlgorithm())
+        .describedAs("uploadEtag.getChecksumAlgorithm()")
+        .isNull();
+    Assertions.assertThat(uploadEtag.getChecksum())
+        .describedAs("uploadEtag.getChecksum()")
+        .isNull();
+  }
+
+  @Test
+  public void testToCompletedPartCRC32() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "CRC32", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1);
+    Assertions.assertThat(completedPart.checksumCRC32())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartCRC32C() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "CRC32C", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1);
+    Assertions.assertThat(completedPart.checksumCRC32C())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartSHA1() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "SHA1", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1);
+    Assertions.assertThat(completedPart.checksumSHA1())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartSHA256() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", "SHA256", "checksum");
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1);
+    Assertions.assertThat(completedPart.checksumSHA256())
+        .describedAs("Checksum mismatch")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testToCompletedPartNoChecksum() {
+    final UploadEtag uploadEtag = new UploadEtag("tag", null, null);
+    final CompletedPart completedPart = UploadEtag.toCompletedPart(uploadEtag, 1);
+    Assertions.assertThat(completedPart.checksumCRC32())
+        .describedAs("completedPart.checksumCRC32()")
+        .isNull();
+    Assertions.assertThat(completedPart.checksumCRC32C())
+        .describedAs("completedPart.checksumCRC32C()")
+        .isNull();
+    Assertions.assertThat(completedPart.checksumSHA1())
+        .describedAs("completedPart.checksumSHA1()")
+        .isNull();
+    Assertions.assertThat(completedPart.checksumSHA256())
+        .describedAs("completedPart.checksumSHA256()")
+        .isNull();
+  }
+}

+ 1 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java

@@ -780,7 +780,7 @@ public class TestStagingCommitter extends StagingTestBase.MiniDFSTest {
 
     for (int i = 0; i < tags.size(); i += 1) {
       assertEquals("Should commit the correct part tags",
-          tags.get(i), commit.getEtags().get(i));
+          tags.get(i), commit.getEtags().get(i).getEtag());
     }
   }
 

+ 4 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingPartitionedJobCommit.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.commit.PathCommitException;
+import org.apache.hadoop.fs.s3a.commit.files.UploadEtag;
 import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
 import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
 import org.apache.hadoop.fs.s3a.commit.impl.CommitContext;
@@ -100,9 +101,9 @@ public class TestStagingPartitionedJobCommit
           commit.setDestinationKey(key);
           commit.setUri("s3a://" + BUCKET + "/" + key);
           commit.setUploadId(uploadId);
-          ArrayList<String> etags = new ArrayList<>();
-          etags.add("tag1");
-          commit.setEtags(etags);
+          ArrayList<UploadEtag> uploadEtags = new ArrayList<>();
+          uploadEtags.add(new UploadEtag("tag1", null, null));
+          commit.setEtags(uploadEtags);
           pendingSet.add(commit);
           // register the upload so commit operations are not rejected
           getMockResults().addUpload(uploadId, key);

+ 60 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestChecksumSupport.java

@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.fs.s3a.Constants.CHECKSUM_ALGORITHM;
+
+public class TestChecksumSupport {
+
+  @ParameterizedTest
+  @EnumSource(value = ChecksumAlgorithm.class, names = {"CRC32", "CRC32_C", "SHA1", "SHA256"})
+  public void testGetSupportedChecksumAlgorithm(ChecksumAlgorithm checksumAlgorithm) {
+    final Configuration conf = new Configuration();
+    conf.set(CHECKSUM_ALGORITHM, checksumAlgorithm.toString());
+    Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf))
+        .describedAs("Checksum algorithm must match value set in the configuration")
+        .isEqualTo(checksumAlgorithm);
+  }
+
+  @Test
+  public void testGetChecksumAlgorithmWhenNull() {
+    final Configuration conf = new Configuration();
+    conf.unset(CHECKSUM_ALGORITHM);
+    Assertions.assertThat(ChecksumSupport.getChecksumAlgorithm(conf))
+        .describedAs("If configuration is not set, checksum algorithm must be null")
+        .isNull();
+  }
+
+  @Test
+  public void testGetNotSupportedChecksumAlgorithm() {
+    final Configuration conf = new Configuration();
+    conf.set(CHECKSUM_ALGORITHM, "INVALID");
+    Assertions.assertThatThrownBy(() -> ChecksumSupport.getChecksumAlgorithm(conf))
+        .describedAs("Invalid checksum algorithm should throw an exception")
+        .isInstanceOf(IllegalArgumentException.class);
+  }
+}

+ 66 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java

@@ -19,22 +19,31 @@
 package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.EnumSource;
 import software.amazon.awssdk.awscore.AwsRequest;
 import software.amazon.awssdk.awscore.AwsRequestOverrideConfiguration;
 import software.amazon.awssdk.core.SdkRequest;
+import software.amazon.awssdk.services.s3.model.ChecksumAlgorithm;
+import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest;
+import software.amazon.awssdk.services.s3.model.CopyObjectRequest;
+import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest;
 import software.amazon.awssdk.services.s3.model.HeadObjectResponse;
 import org.assertj.core.api.Assertions;
-import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import software.amazon.awssdk.services.s3.model.PutObjectRequest;
 import software.amazon.awssdk.services.s3.model.S3Request;
+import software.amazon.awssdk.services.s3.model.ServerSideEncryption;
 import software.amazon.awssdk.services.s3.model.UploadPartRequest;
-
+import software.amazon.awssdk.utils.Md5Utils;
 
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
@@ -273,4 +282,59 @@ public class TestRequestFactory extends AbstractHadoopTestBase {
     assertApiTimeouts(partDuration, upload);
 
   }
+
+  @ParameterizedTest
+  @EnumSource(value = ChecksumAlgorithm.class, names = {"CRC32", "CRC32_C", "SHA1", "SHA256"})
+  public void testRequestFactoryWithChecksumAlgorithm(ChecksumAlgorithm checksumAlgorithm)
+      throws IOException {
+    String path = "path";
+    String path2 = "path2";
+    HeadObjectResponse md = HeadObjectResponse.builder().contentLength(128L).build();
+
+    RequestFactory factory = RequestFactoryImpl.builder()
+        .withBucket("bucket")
+        .withChecksumAlgorithm(checksumAlgorithm)
+        .build();
+    createFactoryObjects(factory);
+
+    final CopyObjectRequest copyObjectRequest = factory.newCopyObjectRequestBuilder(path,
+            path2, md).build();
+    Assertions.assertThat(copyObjectRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+
+    final PutObjectRequest putObjectRequest = factory.newPutObjectRequestBuilder(path,
+        PutObjectOptions.defaultOptions(), 1024, false).build();
+    Assertions.assertThat(putObjectRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+
+    final CreateMultipartUploadRequest multipartUploadRequest =
+        factory.newMultipartUploadRequestBuilder(path, null).build();
+    Assertions.assertThat(multipartUploadRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+
+    final UploadPartRequest uploadPartRequest = factory.newUploadPartRequestBuilder(path,
+        "id", 2, true, 128_000_000).build();
+    Assertions.assertThat(uploadPartRequest.checksumAlgorithm()).isEqualTo(checksumAlgorithm);
+  }
+
+  @Test
+  public void testCompleteMultipartUploadRequestWithChecksumAlgorithmAndSSEC() throws IOException {
+    final byte[] encryptionKey = "encryptionKey".getBytes(StandardCharsets.UTF_8);
+    final String encryptionKeyBase64 = Base64.getEncoder()
+        .encodeToString(encryptionKey);
+    final String encryptionKeyMd5 = Md5Utils.md5AsBase64(encryptionKey);
+    final EncryptionSecrets encryptionSecrets = new EncryptionSecrets(S3AEncryptionMethods.SSE_C,
+        encryptionKeyBase64, null);
+    RequestFactory factory = RequestFactoryImpl.builder()
+        .withBucket("bucket")
+        .withChecksumAlgorithm(ChecksumAlgorithm.CRC32_C)
+        .withEncryptionSecrets(encryptionSecrets)
+        .build();
+    createFactoryObjects(factory);
+
+    final CompleteMultipartUploadRequest request =
+        factory.newCompleteMultipartUploadRequestBuilder("path", "1", new ArrayList<>())
+            .build();
+    Assertions.assertThat(request.sseCustomerAlgorithm())
+        .isEqualTo(ServerSideEncryption.AES256.name());
+    Assertions.assertThat(request.sseCustomerKey()).isEqualTo(encryptionKeyBase64);
+    Assertions.assertThat(request.sseCustomerKeyMD5()).isEqualTo(encryptionKeyMd5);
+  }
 }

+ 116 - 7
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestS3AMultipartUploaderSupport.java

@@ -20,14 +20,18 @@ package org.apache.hadoop.fs.s3a.impl;
 
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.Map;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
+import software.amazon.awssdk.services.s3.model.UploadPartResponse;
 
 import org.apache.hadoop.test.HadoopTestBase;
 
 import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.PartHandlePayload;
 import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.buildPartHandlePayload;
 import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.parsePartHandlePayload;
+import static org.apache.hadoop.fs.s3a.impl.S3AMultipartUploader.extractChecksum;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
 /**
@@ -41,35 +45,60 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
 
   @Test
   public void testRoundTrip() throws Throwable {
-    PartHandlePayload result = roundTrip(999, "tag", 1);
+    PartHandlePayload result = roundTrip(999, "tag", 1, null, null);
     assertEquals(PATH, result.getPath());
     assertEquals(UPLOAD, result.getUploadId());
     assertEquals(999, result.getPartNumber());
     assertEquals("tag", result.getEtag());
     assertEquals(1, result.getLen());
+    Assertions.assertThat(result.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm must not be present").isNull();
+    Assertions.assertThat(result.getChecksum())
+        .describedAs("Checksum must not be generated").isNull();
   }
 
   @Test
   public void testRoundTrip2() throws Throwable {
     long len = 1L + Integer.MAX_VALUE;
     PartHandlePayload result =
-        roundTrip(1, "11223344", len);
+        roundTrip(1, "11223344", len, null, null);
     assertEquals(1, result.getPartNumber());
     assertEquals("11223344", result.getEtag());
     assertEquals(len, result.getLen());
+    Assertions.assertThat(result.getChecksumAlgorithm())
+        .describedAs("Checksum algorithm must not be present").isNull();
+    Assertions.assertThat(result.getChecksum())
+        .describedAs("Checksum must not be generated").isNull();
+  }
+
+  @Test
+  public void testRoundTripWithChecksum() throws Throwable {
+    PartHandlePayload result = roundTrip(999, "tag", 1,
+        "SHA256", "checksum");
+    assertEquals(PATH, result.getPath());
+    assertEquals(UPLOAD, result.getUploadId());
+    assertEquals(999, result.getPartNumber());
+    assertEquals("tag", result.getEtag());
+    assertEquals(1, result.getLen());
+    Assertions.assertThat(result.getChecksumAlgorithm())
+        .describedAs("Expect the checksum algorithm to be SHA256")
+        .isEqualTo("SHA256");
+    Assertions.assertThat(result.getChecksum())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
   }
 
   @Test
   public void testNoEtag() throws Throwable {
     intercept(IllegalArgumentException.class,
         () -> buildPartHandlePayload(PATH, UPLOAD,
-            0, "", 1));
+            0, "", 1, null, null));
   }
 
   @Test
   public void testNoLen() throws Throwable {
     intercept(IllegalArgumentException.class,
-        () -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1));
+        () -> buildPartHandlePayload(PATH, UPLOAD, 0, "tag", -1, null, null));
   }
 
   @Test
@@ -80,17 +109,97 @@ public class TestS3AMultipartUploaderSupport extends HadoopTestBase {
 
   @Test
   public void testBadHeader() throws Throwable {
-    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1);
+    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, 0, "tag", 1, null, null);
     bytes[2] = 'f';
     intercept(IOException.class, "header",
         () -> parsePartHandlePayload(bytes));
   }
 
+  @Test
+  public void testNoChecksumAlgorithm() throws Exception {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload(PATH, UPLOAD,
+            999, "tag", 1, "", "checksum"));
+  }
+
+  @Test
+  public void testNoChecksum() throws Exception {
+    intercept(IllegalArgumentException.class,
+        () -> buildPartHandlePayload(PATH, UPLOAD,
+            999, "tag", 1, "SHA256", ""));
+  }
+
+  @Test
+  public void testExtractChecksumCRC32() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumCRC32("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be CRC32")
+        .isEqualTo("CRC32");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumCRC32C() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumCRC32C("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be CRC32C")
+        .isEqualTo("CRC32C");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumSHA1() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumSHA1("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be SHA1")
+        .isEqualTo("SHA1");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumSHA256() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder()
+        .checksumSHA256("checksum")
+        .build();
+    final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse);
+    Assertions.assertThat(checksum.getKey())
+        .describedAs("Expect the checksum algorithm to be SHA256")
+        .isEqualTo("SHA256");
+    Assertions.assertThat(checksum.getValue())
+        .describedAs("Checksum must be set")
+        .isEqualTo("checksum");
+  }
+
+  @Test
+  public void testExtractChecksumEmpty() {
+    final UploadPartResponse uploadPartResponse = UploadPartResponse.builder().build();
+    final Map.Entry<String, String> checksum = extractChecksum(uploadPartResponse);
+    assertNull(checksum);
+  }
+
   private PartHandlePayload roundTrip(
       int partNumber,
       String tag,
-      long len) throws IOException {
-    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len);
+      long len,
+      String checksumAlgorithm,
+      String checksum) throws IOException {
+    byte[] bytes = buildPartHandlePayload(PATH, UPLOAD, partNumber, tag, len,
+        checksumAlgorithm, checksum);
     return parsePartHandlePayload(bytes);
   }
 }