Browse Source

HADOOP-18910: [ABFS] Adding Support for MD5 Hash based integrity verification of the request content during transport (#6069)

Contributed By: Anuj Modi
Anuj Modi 1 year ago
parent
commit
99b9e7fb43
13 changed files with 558 additions and 12 deletions
  1. 13 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
  2. 1 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java
  3. 3 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
  4. 1 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java
  5. 1 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java
  6. 54 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java
  7. 56 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsInvalidChecksumException.java
  8. 2 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java
  9. 143 4
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
  10. 15 6
      hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
  11. 3 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  12. 261 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java
  13. 5 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java

+ 13 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -359,8 +359,11 @@ public class AbfsConfiguration{
           FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
           FS_AZURE_ABFS_RENAME_RESILIENCE, DefaultValue = DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE)
   private boolean renameResilience;
   private boolean renameResilience;
 
 
-  private String clientProvidedEncryptionKey;
+  @BooleanConfigurationValidatorAnnotation(ConfigurationKey =
+      FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION, DefaultValue = DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION)
+  private boolean isChecksumValidationEnabled;
 
 
+  private String clientProvidedEncryptionKey;
   private String clientProvidedEncryptionKeySHA;
   private String clientProvidedEncryptionKeySHA;
 
 
   public AbfsConfiguration(final Configuration rawConfig, String accountName)
   public AbfsConfiguration(final Configuration rawConfig, String accountName)
@@ -1240,4 +1243,13 @@ public class AbfsConfiguration{
   void setRenameResilience(boolean actualResilience) {
   void setRenameResilience(boolean actualResilience) {
     renameResilience = actualResilience;
     renameResilience = actualResilience;
   }
   }
+
+  public boolean getIsChecksumValidationEnabled() {
+    return isChecksumValidationEnabled;
+  }
+
+  @VisibleForTesting
+  public void setIsChecksumValidationEnabled(boolean isChecksumValidationEnabled) {
+    this.isChecksumValidationEnabled = isChecksumValidationEnabled;
+  }
 }
 }

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

@@ -93,6 +93,7 @@ public final class AbfsHttpConstants {
   public static final String FORWARD_SLASH_ENCODE = "%2F";
   public static final String FORWARD_SLASH_ENCODE = "%2F";
   public static final String AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER = "@";
   public static final String AZURE_DISTRIBUTED_FILE_SYSTEM_AUTHORITY_DELIMITER = "@";
   public static final String UTF_8 = "utf-8";
   public static final String UTF_8 = "utf-8";
+  public static final String MD5 = "MD5";
   public static final String GMT_TIMEZONE = "GMT";
   public static final String GMT_TIMEZONE = "GMT";
   public static final String APPLICATION_JSON = "application/json";
   public static final String APPLICATION_JSON = "application/json";
   public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";
   public static final String APPLICATION_OCTET_STREAM = "application/octet-stream";

+ 3 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -275,6 +275,9 @@ public final class ConfigurationKeys {
   /** Add extra resilience to rename failures, at the expense of performance. */
   /** Add extra resilience to rename failures, at the expense of performance. */
   public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";
   public static final String FS_AZURE_ABFS_RENAME_RESILIENCE = "fs.azure.enable.rename.resilience";
 
 
+  /** Add extra layer of verification of the integrity of the request content during transport: {@value}. */
+  public static final String FS_AZURE_ABFS_ENABLE_CHECKSUM_VALIDATION = "fs.azure.enable.checksum.validation";
+
   public static String accountProperty(String property, String account) {
   public static String accountProperty(String property, String account) {
     return property + "." + account;
     return property + "." + account;
   }
   }

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -133,6 +133,7 @@ public final class FileSystemConfigurations {
   public static final int STREAM_ID_LEN = 12;
   public static final int STREAM_ID_LEN = 12;
   public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
   public static final boolean DEFAULT_ENABLE_ABFS_LIST_ITERATOR = true;
   public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
   public static final boolean DEFAULT_ENABLE_ABFS_RENAME_RESILIENCE = true;
+  public static final boolean DEFAULT_ENABLE_ABFS_CHECKSUM_VALIDATION = false;
 
 
   /**
   /**
    * Limit of queued block upload operations before writes
    * Limit of queued block upload operations before writes

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java

@@ -72,6 +72,7 @@ public final class HttpHeaderConfigurations {
   public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
   public static final String X_MS_PROPOSED_LEASE_ID = "x-ms-proposed-lease-id";
   public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
   public static final String X_MS_LEASE_BREAK_PERIOD = "x-ms-lease-break-period";
   public static final String EXPECT = "Expect";
   public static final String EXPECT = "Expect";
+  public static final String X_MS_RANGE_GET_CONTENT_MD5 = "x-ms-range-get-content-md5";
 
 
   private HttpHeaderConfigurations() {}
   private HttpHeaderConfigurations() {}
 }
 }

+ 54 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsDriverException.java

@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+
+/**
+ * Exception to be thrown if any Runtime Exception occurs.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AbfsDriverException extends AbfsRestOperationException {
+
+  private static final String ERROR_MESSAGE = "Runtime Exception Occurred In ABFS Driver";
+
+  public AbfsDriverException(final Exception innerException) {
+    super(
+        AzureServiceErrorCode.UNKNOWN.getStatusCode(),
+        AzureServiceErrorCode.UNKNOWN.getErrorCode(),
+        innerException != null
+            ? innerException.toString()
+            : ERROR_MESSAGE,
+        innerException);
+  }
+
+  public AbfsDriverException(final Exception innerException, final String activityId) {
+    super(
+        AzureServiceErrorCode.UNKNOWN.getStatusCode(),
+        AzureServiceErrorCode.UNKNOWN.getErrorCode(),
+        innerException != null
+            ? innerException.toString() + ", rId: " + activityId
+            : ERROR_MESSAGE + ", rId: " + activityId,
+        null);
+  }
+}

+ 56 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/exceptions/AbfsInvalidChecksumException.java

@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hadoop.fs.azurebfs.contracts.exceptions;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+
+/**
+ * Exception to wrap invalid checksum verification on client side.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class AbfsInvalidChecksumException extends AbfsRestOperationException {
+
+  private static final String ERROR_MESSAGE = "Checksum Validation Failed, MD5 Mismatch Error";
+
+  public AbfsInvalidChecksumException(final AbfsRestOperationException abfsRestOperationException) {
+    super(
+        abfsRestOperationException != null
+            ? abfsRestOperationException.getStatusCode()
+            : AzureServiceErrorCode.UNKNOWN.getStatusCode(),
+        abfsRestOperationException != null
+            ? abfsRestOperationException.getErrorCode().getErrorCode()
+            : AzureServiceErrorCode.UNKNOWN.getErrorCode(),
+        abfsRestOperationException != null
+            ? abfsRestOperationException.toString()
+            : ERROR_MESSAGE,
+        abfsRestOperationException);
+  }
+
+  public AbfsInvalidChecksumException(final String activityId) {
+    super(
+        AzureServiceErrorCode.UNKNOWN.getStatusCode(),
+        AzureServiceErrorCode.UNKNOWN.getErrorCode(),
+        ERROR_MESSAGE + ", rId: " + activityId,
+        null);
+  }
+}

+ 2 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java

@@ -47,6 +47,8 @@ public enum AzureServiceErrorCode {
   INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
   INVALID_QUERY_PARAMETER_VALUE("InvalidQueryParameterValue", HttpURLConnection.HTTP_BAD_REQUEST, null),
   AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
   AUTHORIZATION_PERMISSION_MISS_MATCH("AuthorizationPermissionMismatch", HttpURLConnection.HTTP_FORBIDDEN, null),
   ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null),
   ACCOUNT_REQUIRES_HTTPS("AccountRequiresHttps", HttpURLConnection.HTTP_BAD_REQUEST, null),
+  MD5_MISMATCH("Md5Mismatch", HttpURLConnection.HTTP_BAD_REQUEST,
+          "The MD5 value specified in the request did not match with the MD5 value calculated by the server."),
   UNKNOWN(null, -1, null);
   UNKNOWN(null, -1, null);
 
 
   private final String errorCode;
   private final String errorCode;

+ 143 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -25,7 +25,10 @@ import java.net.HttpURLConnection;
 import java.net.MalformedURLException;
 import java.net.MalformedURLException;
 import java.net.URL;
 import java.net.URL;
 import java.net.URLEncoder;
 import java.net.URLEncoder;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.List;
 import java.util.List;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.UUID;
 import java.util.UUID;
@@ -34,6 +37,9 @@ import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil;
 import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
@@ -76,6 +82,7 @@ import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.RENAME_PATH_ATTEMPTS;
 import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
 import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.SERVER_SIDE_ENCRYPTION_ALGORITHM;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
 import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
@@ -851,6 +858,11 @@ public class AbfsClient implements Closeable {
       requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry));
       requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgentRetry));
     }
     }
 
 
+    // Add MD5 Hash of request content as request header if feature is enabled
+    if (isChecksumValidationEnabled()) {
+      addCheckSumHeaderForWrite(requestHeaders, reqParams, buffer);
+    }
+
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
         abfsUriQueryBuilder, cachedSasToken);
@@ -867,7 +879,7 @@ public class AbfsClient implements Closeable {
             sasTokenForReuse);
             sasTokenForReuse);
     try {
     try {
       op.execute(tracingContext);
       op.execute(tracingContext);
-    } catch (AzureBlobFileSystemException e) {
+    } catch (AbfsRestOperationException e) {
       /*
       /*
          If the http response code indicates a user error we retry
          If the http response code indicates a user error we retry
          the same append request with expect header being disabled.
          the same append request with expect header being disabled.
@@ -877,7 +889,7 @@ public class AbfsClient implements Closeable {
          if someone has taken dependency on the exception message,
          if someone has taken dependency on the exception message,
          which is created using the error string present in the response header.
          which is created using the error string present in the response header.
       */
       */
-      int responseStatusCode = ((AbfsRestOperationException) e).getStatusCode();
+      int responseStatusCode = e.getStatusCode();
       if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
       if (checkUserError(responseStatusCode) && reqParams.isExpectHeaderEnabled()) {
         LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
         LOG.debug("User error, retrying without 100 continue enabled for the given path {}", path);
         reqParams.setExpectHeaderEnabled(false);
         reqParams.setExpectHeaderEnabled(false);
@@ -889,6 +901,11 @@ public class AbfsClient implements Closeable {
       if (!op.hasResult()) {
       if (!op.hasResult()) {
         throw e;
         throw e;
       }
       }
+
+      if (isMd5ChecksumError(e)) {
+        throw new AbfsInvalidChecksumException(e);
+      }
+
       if (reqParams.isAppendBlob()
       if (reqParams.isAppendBlob()
           && appendSuccessCheckOp(op, path,
           && appendSuccessCheckOp(op, path,
           (reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
           (reqParams.getPosition() + reqParams.getLength()), tracingContext)) {
@@ -907,6 +924,13 @@ public class AbfsClient implements Closeable {
       throw e;
       throw e;
     }
     }
 
 
+    catch (AzureBlobFileSystemException e) {
+      // Any server side issue will be returned as AbfsRestOperationException and will be handled above.
+      LOG.debug("Append request failed with non server issues for path: {}, offset: {}, position: {}",
+          path, reqParams.getoffset(), reqParams.getPosition());
+      throw e;
+    }
+
     return op;
     return op;
   }
   }
 
 
@@ -920,6 +944,16 @@ public class AbfsClient implements Closeable {
         && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR);
         && responseStatusCode < HttpURLConnection.HTTP_INTERNAL_ERROR);
   }
   }
 
 
+  /**
+   * To check if the failure exception returned by server is due to MD5 Mismatch
+   * @param e Exception returned by AbfsRestOperation
+   * @return boolean whether exception is due to MD5Mismatch or not
+   */
+  private boolean isMd5ChecksumError(final AbfsRestOperationException e) {
+    AzureServiceErrorCode storageErrorCode = e.getErrorCode();
+    return storageErrorCode == AzureServiceErrorCode.MD5_MISMATCH;
+  }
+
   // For AppendBlob its possible that the append succeeded in the backend but the request failed.
   // For AppendBlob its possible that the append succeeded in the backend but the request failed.
   // However a retry would fail with an InvalidQueryParameterValue
   // However a retry would fail with an InvalidQueryParameterValue
   // (as the current offset would be unacceptable).
   // (as the current offset would be unacceptable).
@@ -1049,10 +1083,16 @@ public class AbfsClient implements Closeable {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
     addEncryptionKeyRequestHeaders(path, requestHeaders, false,
     addEncryptionKeyRequestHeaders(path, requestHeaders, false,
         contextEncryptionAdapter, tracingContext);
         contextEncryptionAdapter, tracingContext);
-    requestHeaders.add(new AbfsHttpHeader(RANGE,
-            String.format("bytes=%d-%d", position, position + bufferLength - 1)));
+    AbfsHttpHeader rangeHeader = new AbfsHttpHeader(RANGE,
+        String.format("bytes=%d-%d", position, position + bufferLength - 1));
+    requestHeaders.add(rangeHeader);
     requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
     requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
 
 
+    // Add request header to fetch MD5 Hash of data returned by server.
+    if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
+      requestHeaders.add(new AbfsHttpHeader(X_MS_RANGE_GET_CONTENT_MD5, TRUE));
+    }
+
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
@@ -1069,6 +1109,11 @@ public class AbfsClient implements Closeable {
             bufferLength, sasTokenForReuse);
             bufferLength, sasTokenForReuse);
     op.execute(tracingContext);
     op.execute(tracingContext);
 
 
+    // Verify the MD5 hash returned by server holds valid on the data received.
+    if (isChecksumValidationEnabled(requestHeaders, rangeHeader, bufferLength)) {
+      verifyCheckSumForRead(buffer, op.getResult(), bufferOffset);
+    }
+
     return op;
     return op;
   }
   }
 
 
@@ -1492,6 +1537,100 @@ public class AbfsClient implements Closeable {
     }
     }
   }
   }
 
 
+  /**
+   * Add MD5 hash as request header to the append request.
+   * @param requestHeaders to be updated with checksum header
+   * @param reqParams for getting offset and length
+   * @param buffer for getting input data for MD5 computation
+   * @throws AbfsRestOperationException if Md5 computation fails
+   */
+  private void addCheckSumHeaderForWrite(List<AbfsHttpHeader> requestHeaders,
+      final AppendRequestParameters reqParams, final byte[] buffer)
+      throws AbfsRestOperationException {
+    String md5Hash = computeMD5Hash(buffer, reqParams.getoffset(),
+        reqParams.getLength());
+    requestHeaders.add(new AbfsHttpHeader(CONTENT_MD5, md5Hash));
+  }
+
+  /**
+   * To verify the checksum information received from server for the data read.
+   * @param buffer stores the data received from server.
+   * @param result HTTP Operation Result.
+   * @param bufferOffset Position where data returned by server is saved in buffer.
+   * @throws AbfsRestOperationException if Md5Mismatch.
+   */
+  private void verifyCheckSumForRead(final byte[] buffer,
+      final AbfsHttpOperation result, final int bufferOffset)
+      throws AbfsRestOperationException {
+    // Number of bytes returned by server could be less than or equal to what
+    // caller requests. In case it is less, extra bytes will be initialized to 0
+    // Server returned MD5 Hash will be computed on what server returned.
+    // We need to get exact data that server returned and compute its md5 hash
+    // Computed hash should be equal to what server returned.
+    int numberOfBytesRead = (int) result.getBytesReceived();
+    if (numberOfBytesRead == 0) {
+      return;
+    }
+    String md5HashComputed = computeMD5Hash(buffer, bufferOffset,
+        numberOfBytesRead);
+    String md5HashActual = result.getResponseHeader(CONTENT_MD5);
+    if (!md5HashComputed.equals(md5HashActual)) {
+      LOG.debug("Md5 Mismatch Error in Read Operation. Server returned Md5: {}, Client computed Md5: {}", md5HashActual, md5HashComputed);
+      throw new AbfsInvalidChecksumException(result.getRequestId());
+    }
+  }
+
+  /**
+   * Conditions check for allowing checksum support for read operation.
+   * Sending MD5 Hash in request headers. For more details see
+   * @see <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read">
+   *     Path - Read Azure Storage Rest API</a>.
+   * 1. Range header must be present as one of the request headers.
+   * 2. buffer length must be less than or equal to 4 MB.
+   * @param requestHeaders to be checked for range header.
+   * @param rangeHeader must be present.
+   * @param bufferLength must be less than or equal to 4 MB.
+   * @return true if all conditions are met.
+   */
+  private boolean isChecksumValidationEnabled(List<AbfsHttpHeader> requestHeaders,
+      final AbfsHttpHeader rangeHeader, final int bufferLength) {
+    return getAbfsConfiguration().getIsChecksumValidationEnabled()
+        && requestHeaders.contains(rangeHeader) && bufferLength <= 4 * ONE_MB;
+  }
+
+  /**
+   * Conditions check for allowing checksum support for write operation.
+   * Server will support this if client sends the MD5 Hash as a request header.
+   * For azure stoage service documentation see
+   * @see <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update">
+   *     Path - Update Azure Rest API</a>.
+   * @return true if checksum validation enabled.
+   */
+  private boolean isChecksumValidationEnabled() {
+    return getAbfsConfiguration().getIsChecksumValidationEnabled();
+  }
+
+  /**
+   * Compute MD5Hash of the given byte array starting from given offset up to given length.
+   * @param data byte array from which data is fetched to compute MD5 Hash.
+   * @param off offset in the array from where actual data starts.
+   * @param len length of the data to be used to compute MD5Hash.
+   * @return MD5 Hash of the data as String.
+   * @throws AbfsRestOperationException if computation fails.
+   */
+  @VisibleForTesting
+  public String computeMD5Hash(final byte[] data, final int off, final int len)
+      throws AbfsRestOperationException {
+    try {
+      MessageDigest md5Digest = MessageDigest.getInstance(MD5);
+      md5Digest.update(data, off, len);
+      byte[] md5Bytes = md5Digest.digest();
+      return Base64.getEncoder().encodeToString(md5Bytes);
+    } catch (NoSuchAlgorithmException ex) {
+      throw new AbfsDriverException(ex);
+    }
+  }
+
   @VisibleForTesting
   @VisibleForTesting
   URL getBaseUrl() {
   URL getBaseUrl() {
     return baseUrl;
     return baseUrl;

+ 15 - 6
hadoop-tools/hadoop-azure/src/site/markdown/abfs.md

@@ -941,7 +941,7 @@ string retrieved from a GetFileStatus request to the server.
 implementing EncryptionContextProvider.
 implementing EncryptionContextProvider.
 
 
 ### <a name="serverconfigoptions"></a> Server Options
 ### <a name="serverconfigoptions"></a> Server Options
-When the config `fs.azure.io.read.tolerate.concurrent.append` is made true, the
+`fs.azure.io.read.tolerate.concurrent.append`: When the config is made true, the
 If-Match header sent to the server for read calls will be set as * otherwise the
 If-Match header sent to the server for read calls will be set as * otherwise the
 same will be set with ETag. This is basically a mechanism in place to handle the
 same will be set with ETag. This is basically a mechanism in place to handle the
 reads with optimistic concurrency.
 reads with optimistic concurrency.
@@ -949,14 +949,23 @@ Please refer the following links for further information.
 1. https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read
 1. https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read
 2. https://azure.microsoft.com/de-de/blog/managing-concurrency-in-microsoft-azure-storage-2/
 2. https://azure.microsoft.com/de-de/blog/managing-concurrency-in-microsoft-azure-storage-2/
 
 
-listStatus API fetches the FileStatus information from server in a page by page
-manner. The config `fs.azure.list.max.results` used to set the maxResults URI
- param which sets the pagesize(maximum results per call). The value should
- be >  0. By default this will be 5000. Server has a maximum value for this
- parameter as 5000. So even if the config is above 5000 the response will only
+`fs.azure.list.max.results`: listStatus API fetches the FileStatus information
+from server in a page by page manner. The config is used to set the maxResults URI
+param which sets the page size(maximum results per call). The value should
+be >  0. By default, this will be 5000. Server has a maximum value for this
+parameter as 5000. So even if the config is above 5000 the response will only
 contain 5000 entries. Please refer the following link for further information.
 contain 5000 entries. Please refer the following link for further information.
 https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/list
 https://docs.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/list
 
 
+`fs.azure.enable.checksum.validation`: When the config is set to true, Content-MD5
+headers are sent to the server for read and append calls. This provides a way
+to verify the integrity of data during transport. This will have performance
+impact due to MD5 Hash re-computation on Client and Server side. Please refer
+to the Azure documentation for
+[Read](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/read)
+and [Append](https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/update)
+APIs for more details
+
 ### <a name="throttlingconfigoptions"></a> Throttling Options
 ### <a name="throttlingconfigoptions"></a> Throttling Options
 ABFS driver has the capability to throttle read and write operations to achieve
 ABFS driver has the capability to throttle read and write operations to achieve
 maximum throughput by minimizing errors. The errors occur when the account
 maximum throughput by minimizing errors. The errors occur when the account

+ 3 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -145,6 +145,9 @@ public abstract class AbstractAbfsIntegrationTest extends
     } else {
     } else {
       this.isIPAddress = false;
       this.isIPAddress = false;
     }
     }
+
+    // For tests, we want to enforce checksum validation so that any regressions can be caught.
+    abfsConfig.setIsChecksumValidationEnabled(true);
   }
   }
 
 
   protected boolean getIsNamespaceEnabled(AzureBlobFileSystem fs)
   protected boolean getIsNamespaceEnabled(AzureBlobFileSystem fs)

+ 261 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChecksum.java

@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs;
+
+import java.security.SecureRandom;
+import java.util.Arrays;
+import java.util.HashSet;
+
+import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
+import org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
+import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB;
+import static org.apache.hadoop.fs.azurebfs.contracts.services.AppendRequestParameters.Mode.APPEND_MODE;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+import static org.mockito.ArgumentMatchers.any;
+
+/**
+ * Test For Verifying Checksum Related Operations
+ */
+public class ITestAzureBlobFileSystemChecksum extends AbstractAbfsIntegrationTest {
+
+  private static final int MB_2 = 2 * ONE_MB;
+  private static final int MB_3 = 3 * ONE_MB;
+  private static final int MB_4 = 4 * ONE_MB;
+  private static final int MB_8 = 8 * ONE_MB;
+  private static final int MB_15 = 15 * ONE_MB;
+  private static final int MB_16 = 16 * ONE_MB;
+  private static final String INVALID_MD5_TEXT = "Text for Invalid MD5 Computation";
+
+  public ITestAzureBlobFileSystemChecksum() throws Exception {
+    super();
+  }
+
+  @Test
+  public void testWriteReadWithChecksum() throws Exception {
+    testWriteReadWithChecksumInternal(true);
+    testWriteReadWithChecksumInternal(false);
+  }
+
+  @Test
+  public void testAppendWithChecksumAtDifferentOffsets() throws Exception {
+    AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true);
+    AbfsClient client = fs.getAbfsStore().getClient();
+    Path path = path("testPath" + getMethodName());
+    try (FSDataOutputStream out = fs.create(path)) {
+      byte[] data = generateRandomBytes(MB_4);
+
+      appendWithOffsetHelper(client, path, data, fs, 0);
+      appendWithOffsetHelper(client, path, data, fs, ONE_MB);
+      appendWithOffsetHelper(client, path, data, fs, MB_2);
+      appendWithOffsetHelper(client, path, data, fs, MB_4 - 1);
+    }
+  }
+
+  @Test
+  public void testReadWithChecksumAtDifferentOffsets() throws Exception {
+    AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true);
+    AbfsClient client = fs.getAbfsStore().getClient();
+    Path path = path("testPath" + getMethodName());
+    byte[] data = generateRandomBytes(MB_16);
+
+    createFileWithData(path, data, fs);
+    readWithOffsetAndPositionHelper(client, path, data, fs, 0, 0);
+    readWithOffsetAndPositionHelper(client, path, data, fs, MB_4, 0);
+    readWithOffsetAndPositionHelper(client, path, data, fs, MB_4, ONE_MB);
+    readWithOffsetAndPositionHelper(client, path, data, fs, MB_8, MB_2);
+    readWithOffsetAndPositionHelper(client, path, data, fs, MB_15, MB_4 - 1);
+  }
+
+  @Test
+  public void testWriteReadWithChecksumAndOptions() throws Exception {
+    testWriteReadWithChecksumAndOptionsInternal(true);
+    testWriteReadWithChecksumAndOptionsInternal(false);
+  }
+
+  @Test
+  public void testAbfsInvalidChecksumExceptionInAppend() throws Exception {
+    AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true);
+    AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClient());
+    Path path = path("testPath" + getMethodName());
+    fs.create(path);
+    byte[] data= generateRandomBytes(MB_4);
+    String invalidMD5Hash = spiedClient.computeMD5Hash(
+            INVALID_MD5_TEXT.getBytes(), 0, INVALID_MD5_TEXT.length());
+    Mockito.doReturn(invalidMD5Hash).when(spiedClient).computeMD5Hash(any(),
+        any(Integer.class), any(Integer.class));
+    AbfsRestOperationException ex = intercept(AbfsInvalidChecksumException.class, () -> {
+      appendWithOffsetHelper(spiedClient, path, data, fs, 0);
+    });
+
+    Assertions.assertThat(ex.getErrorCode())
+        .describedAs("Exception Message should contain MD5Mismatch")
+        .isEqualTo(AzureServiceErrorCode.MD5_MISMATCH);
+  }
+
+  @Test
+  public void testAbfsInvalidChecksumExceptionInRead() throws Exception {
+    AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, true);
+    AbfsClient spiedClient = Mockito.spy(fs.getAbfsStore().getClient());
+    Path path = path("testPath" + getMethodName());
+    byte[] data = generateRandomBytes(MB_3);
+    createFileWithData(path, data, fs);
+
+    String invalidMD5Hash = spiedClient.computeMD5Hash(
+            INVALID_MD5_TEXT.getBytes(), 0, INVALID_MD5_TEXT.length());
+    Mockito.doReturn(invalidMD5Hash).when(spiedClient).computeMD5Hash(any(),
+        any(Integer.class), any(Integer.class));
+
+    intercept(AbfsInvalidChecksumException.class, () -> {
+      readWithOffsetAndPositionHelper(spiedClient, path, data, fs, 0, 0);
+    });
+  }
+
+  private void testWriteReadWithChecksumInternal(final boolean readAheadEnabled)
+      throws Exception {
+    AzureBlobFileSystem fs = getConfiguredFileSystem(MB_4, MB_4, readAheadEnabled);
+    final int dataSize = MB_16 + 1000;
+    Path testPath = path("testPath" + getMethodName());
+    byte[] bytesUploaded = generateRandomBytes(dataSize);
+
+    createFileWithData(testPath, bytesUploaded, fs);
+
+    try (FSDataInputStream in = fs.open(testPath)) {
+      byte[] bytesRead = new byte[bytesUploaded.length];
+      in.read(bytesRead, 0, dataSize);
+
+      // Verify that the data read is same as data written
+      Assertions.assertThat(bytesRead)
+              .describedAs("Bytes read with checksum enabled are not as expected")
+              .containsExactly(bytesUploaded);
+    }
+  }
+
+  /**
+   * Verify that the checksum computed on client side matches with the one
+   * computed at server side. If not, request will fail with 400 Bad request.
+   * @param client
+   * @param path
+   * @param data
+   * @param fs
+   * @param offset
+   * @throws Exception
+   */
+  private void appendWithOffsetHelper(AbfsClient client, Path path,
+      byte[] data, AzureBlobFileSystem fs, final int offset) throws Exception {
+    AppendRequestParameters reqParams = new AppendRequestParameters(
+        0, offset, data.length - offset, APPEND_MODE, false, null, true);
+    client.append(path.toUri().getPath(), data, reqParams, null, null,
+        getTestTracingContext(fs, false));
+  }
+
+  /**
+   * Verify that the checksum returned by server is same as computed on client
+   * side even when read from different positions and stored at different offsets
+   * If not server request will pass but client.read() will fail with
+   * {@link org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumException}
+   * @param client
+   * @param path
+   * @param data
+   * @param fs
+   * @param position
+   * @param offset
+   * @throws Exception
+   */
+  private void readWithOffsetAndPositionHelper(AbfsClient client, Path path,
+      byte[] data, AzureBlobFileSystem fs, final int position,
+      final int offset) throws Exception {
+
+    int bufferLength = fs.getAbfsStore().getAbfsConfiguration().getReadBufferSize();
+    byte[] readBuffer = new byte[bufferLength];
+    final int readLength = bufferLength - offset;
+
+    client.read(path.toUri().getPath(), position, readBuffer, offset, readLength,
+        "*", null, null, getTestTracingContext(fs, false));
+
+    byte[] actual = Arrays.copyOfRange(readBuffer, offset, offset + readLength);
+    byte[] expected = Arrays.copyOfRange(data, position, readLength + position);
+    Assertions.assertThat(actual)
+        .describedAs("Data read should be same as Data Written")
+        .containsExactly(expected);
+  }
+
+  private void testWriteReadWithChecksumAndOptionsInternal(
+      final boolean readAheadEnabled) throws Exception {
+    AzureBlobFileSystem fs = getConfiguredFileSystem(MB_8, ONE_MB, readAheadEnabled);
+    final int dataSize = MB_16 + 1000;
+
+    Path testPath = path("testPath" + getMethodName());
+    byte[] bytesUploaded = generateRandomBytes(dataSize);
+    createFileWithData(testPath, bytesUploaded, fs);
+
+    Configuration cpm1 = new Configuration();
+    cpm1.setBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, true);
+    try (FSDataInputStream in = fs.openFileWithOptions(testPath,
+        new OpenFileParameters().withOptions(cpm1)
+            .withMandatoryKeys(new HashSet<>())).get()) {
+      byte[] bytesRead = new byte[dataSize];
+
+      in.read(1, bytesRead, 1, MB_4);
+
+      // Verify that the data read is same as data written
+      Assertions.assertThat(Arrays.copyOfRange(bytesRead, 1, MB_4))
+              .describedAs("Bytes read with checksum enabled are not as expected")
+              .containsExactly(Arrays.copyOfRange(bytesUploaded, 1, MB_4));
+    }
+  }
+
+  private void createFileWithData(Path path, byte[] data, AzureBlobFileSystem fs) throws Exception {
+    try (FSDataOutputStream out = fs.create(path)) {
+      out.write(data);
+      out.hflush();
+    }
+  }
+
+  private AzureBlobFileSystem getConfiguredFileSystem(final int writeBuffer,
+      final int readBuffer, final boolean readAheadEnabled) throws Exception {
+    AzureBlobFileSystem fs = createFileSystem();
+    AbfsConfiguration abfsConf = fs.getAbfsStore().getAbfsConfiguration();
+    abfsConf.setIsChecksumValidationEnabled(true);
+    abfsConf.setWriteBufferSize(writeBuffer);
+    abfsConf.setReadBufferSize(readBuffer);
+    abfsConf.setReadAheadEnabled(readAheadEnabled);
+    return fs;
+  }
+
+  public static byte[] generateRandomBytes(int numBytes) {
+    SecureRandom secureRandom = new SecureRandom();
+    byte[] randomBytes = new byte[numBytes];
+    secureRandom.nextBytes(randomBytes);
+    return randomBytes;
+  }
+}

+ 5 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsInputStream.java

@@ -170,7 +170,9 @@ public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
     getAbfsStore(fs).getAbfsConfiguration()
     getAbfsStore(fs).getAbfsConfiguration()
         .setReadSmallFilesCompletely(readSmallFilesCompletely);
         .setReadSmallFilesCompletely(readSmallFilesCompletely);
     getAbfsStore(fs).getAbfsConfiguration()
     getAbfsStore(fs).getAbfsConfiguration()
-            .setOptimizeFooterRead(false);
+        .setOptimizeFooterRead(false);
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setIsChecksumValidationEnabled(true);
     return fs;
     return fs;
   }
   }
 
 
@@ -179,6 +181,8 @@ public class ITestAbfsInputStream extends AbstractAbfsIntegrationTest {
     final AzureBlobFileSystem fs = getFileSystem();
     final AzureBlobFileSystem fs = getFileSystem();
     getAbfsStore(fs).getAbfsConfiguration()
     getAbfsStore(fs).getAbfsConfiguration()
         .setOptimizeFooterRead(optimizeFooterRead);
         .setOptimizeFooterRead(optimizeFooterRead);
+    getAbfsStore(fs).getAbfsConfiguration()
+        .setIsChecksumValidationEnabled(true);
     if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
     if (fileSize <= getAbfsStore(fs).getAbfsConfiguration()
         .getReadBufferSize()) {
         .getReadBufferSize()) {
       getAbfsStore(fs).getAbfsConfiguration()
       getAbfsStore(fs).getAbfsConfiguration()