Explorar o código

HADOOP-19137. [ABFS] Prevent ABFS initialization for non-hierarchal-namespace account if Customer-provided-key configs given. (#6752)

Customer-provided-keys (CPK) configs are not allowed with non-hierarchal-namespace (non-HNS) accounts for ABFS. This patch aims to prevent ABFS initialization for non-HNS accounts if CPK configs are provided.

Contributed by: Pranav Saxena
Pranav Saxena hai 11 meses
pai
achega
2e1deee87a

+ 31 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -43,6 +43,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
 import javax.annotation.Nullable;
 import javax.annotation.Nullable;
 
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.security.ProviderUtils;
@@ -113,6 +114,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_BUFFER_DIR;
@@ -221,6 +223,26 @@ public class AzureBlobFileSystem extends FileSystem
       }
       }
     }
     }
 
 
+    /*
+     * Non-hierarchical-namespace account can not have a customer-provided-key(CPK).
+     * Fail initialization of filesystem if the configs are provided. CPK is of
+     * two types: GLOBAL_KEY, and ENCRYPTION_CONTEXT.
+     */
+    if ((isEncryptionContextCPK(abfsConfiguration) || isGlobalKeyCPK(
+        abfsConfiguration))
+        && !getIsNamespaceEnabled(
+        new TracingContext(clientCorrelationId, fileSystemId,
+            FSOperationType.CREATE_FILESYSTEM, tracingHeaderFormat,
+            listener))) {
+      /*
+       * Close the filesystem gracefully before throwing exception. Graceful close
+       * will ensure that all resources are released properly.
+       */
+      close();
+      throw new PathIOException(uri.getPath(),
+          CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE);
+    }
+
     LOG.trace("Initiate check for delegation token manager");
     LOG.trace("Initiate check for delegation token manager");
     if (UserGroupInformation.isSecurityEnabled()) {
     if (UserGroupInformation.isSecurityEnabled()) {
       this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
       this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled();
@@ -237,6 +259,15 @@ public class AzureBlobFileSystem extends FileSystem
     LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
     LOG.debug("Initializing AzureBlobFileSystem for {} complete", uri);
   }
   }
 
 
+  private boolean isGlobalKeyCPK(final AbfsConfiguration abfsConfiguration) {
+    return StringUtils.isNotEmpty(
+        abfsConfiguration.getEncodedClientProvidedEncryptionKey());
+  }
+
+  private boolean isEncryptionContextCPK(final AbfsConfiguration abfsConfiguration) {
+    return abfsConfiguration.createEncryptionContextProvider() != null;
+  }
+
   @Override
   @Override
   public String toString() {
   public String toString() {
     final StringBuilder sb = new StringBuilder(
     final StringBuilder sb = new StringBuilder(

+ 46 - 4
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -60,7 +60,6 @@ import org.apache.hadoop.fs.azurebfs.security.ContextProviderEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.security.NoContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
-import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil;
 import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.fs.impl.BackReference;
 import org.apache.hadoop.fs.PathIOException;
 import org.apache.hadoop.fs.PathIOException;
 
 
@@ -182,7 +181,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
   private final AbfsConfiguration abfsConfiguration;
   private final AbfsConfiguration abfsConfiguration;
   private final Set<String> azureAtomicRenameDirSet;
   private final Set<String> azureAtomicRenameDirSet;
   private Set<String> azureInfiniteLeaseDirSet;
   private Set<String> azureInfiniteLeaseDirSet;
-  private Trilean isNamespaceEnabled;
+  private volatile Trilean isNamespaceEnabled;
   private final AuthType authType;
   private final AuthType authType;
   private final UserGroupInformation userGroupInformation;
   private final UserGroupInformation userGroupInformation;
   private final IdentityTransformerInterface identityTransformer;
   private final IdentityTransformerInterface identityTransformer;
@@ -364,19 +363,62 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     return authorityParts;
     return authorityParts;
   }
   }
 
 
+  /**
+   * Resolves namespace information of the filesystem from the state of {@link #isNamespaceEnabled}.
+   * if the state is UNKNOWN, it will be determined by making a GET_ACL request
+   * to the root of the filesystem. GET_ACL call is synchronized to ensure a single
+   * call is made to determine the namespace information in case multiple threads are
+   * calling this method at the same time. The resolution of namespace information
+   * would be stored back as state of {@link #isNamespaceEnabled}.
+   *
+   * @param tracingContext tracing context
+   * @return true if namespace is enabled, false otherwise.
+   * @throws AzureBlobFileSystemException server errors.
+   */
   public boolean getIsNamespaceEnabled(TracingContext tracingContext)
   public boolean getIsNamespaceEnabled(TracingContext tracingContext)
       throws AzureBlobFileSystemException {
       throws AzureBlobFileSystemException {
     try {
     try {
-      return this.isNamespaceEnabled.toBoolean();
+      return isNamespaceEnabled();
     } catch (TrileanConversionException e) {
     } catch (TrileanConversionException e) {
       LOG.debug("isNamespaceEnabled is UNKNOWN; fall back and determine through"
       LOG.debug("isNamespaceEnabled is UNKNOWN; fall back and determine through"
           + " getAcl server call", e);
           + " getAcl server call", e);
     }
     }
 
 
-    isNamespaceEnabled = Trilean.getTrilean(NamespaceUtil.isNamespaceEnabled(client, tracingContext));
+    return getNamespaceEnabledInformationFromServer(tracingContext);
+  }
+
+  private synchronized boolean getNamespaceEnabledInformationFromServer(
+      final TracingContext tracingContext) throws AzureBlobFileSystemException {
+    if (isNamespaceEnabled != Trilean.UNKNOWN) {
+      return isNamespaceEnabled.toBoolean();
+    }
+    try {
+      LOG.debug("Get root ACL status");
+      getClient().getAclStatus(AbfsHttpConstants.ROOT_PATH, tracingContext);
+      isNamespaceEnabled = Trilean.getTrilean(true);
+    } catch (AbfsRestOperationException ex) {
+      // Get ACL status is a HEAD request, its response doesn't contain
+      // errorCode
+      // So can only rely on its status code to determine its account type.
+      if (HttpURLConnection.HTTP_BAD_REQUEST != ex.getStatusCode()) {
+        throw ex;
+      }
+      isNamespaceEnabled = Trilean.getTrilean(false);
+    } catch (AzureBlobFileSystemException ex) {
+      throw ex;
+    }
     return isNamespaceEnabled.toBoolean();
     return isNamespaceEnabled.toBoolean();
   }
   }
 
 
+  /**
+   * @return true if namespace is enabled, false otherwise.
+   * @throws TrileanConversionException if namespaceEnabled information is UNKNOWN
+   */
+  @VisibleForTesting
+  boolean isNamespaceEnabled() throws TrileanConversionException {
+    return this.isNamespaceEnabled.toBoolean();
+  }
+
   @VisibleForTesting
   @VisibleForTesting
   URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
   URIBuilder getURIBuilder(final String hostName, boolean isSecure) {
     String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;
     String scheme = isSecure ? FileSystemUriSchemes.HTTPS_SCHEME : FileSystemUriSchemes.HTTP_SCHEME;

+ 34 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java

@@ -22,6 +22,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.util.VersionInfo;
 import org.apache.hadoop.util.VersionInfo;
 
 
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+
 /**
 /**
  * Responsible to keep all constant keys used in abfs rest client here.
  * Responsible to keep all constant keys used in abfs rest client here.
  */
  */
@@ -165,5 +169,35 @@ public final class AbfsHttpConstants {
    */
    */
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
   public static final Integer HTTP_STATUS_CATEGORY_QUOTIENT = 100;
 
 
+  /**
+   * List of configurations that are related to Customer-Provided-Keys.
+   * <ol>
+   *   <li>
+   *     {@value ConfigurationKeys#FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE}
+   *     for ENCRYPTION_CONTEXT cpk-type.
+   *   </li>
+   *   <li>
+   *     {@value ConfigurationKeys#FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY} and
+   *     {@value ConfigurationKeys#FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA}
+   *     for GLOBAL_KEY cpk-type.
+   *   </li>
+   * </ol>
+   * List: {@value}
+   */
+  private static final String CPK_CONFIG_LIST =
+      FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE + ", "
+          + FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY + ", "
+          + FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
+
+  /**
+   * Exception message on filesystem init if customer-provided-keys configs are provided
+   * for a non-hierarchical-namespace account: {@value}
+   */
+  public static final String CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE =
+      "Non hierarchical-namespace account can not have configs enabled for "
+          + "Customer Provided Keys. Following configs can not be given with "
+          + "non-hierarchical-namespace account:"
+          + CPK_CONFIG_LIST;
+
   private AbfsHttpConstants() {}
   private AbfsHttpConstants() {}
 }
 }

+ 0 - 19
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsInvalidChecksumExc
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsDriverException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
 import org.apache.hadoop.fs.azurebfs.utils.MetricFormat;
-import org.apache.hadoop.fs.azurebfs.utils.NamespaceUtil;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.store.LogExactlyOnce;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.Permissions;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -134,7 +133,6 @@ public class AbfsClient implements Closeable {
   private final AbfsThrottlingIntercept intercept;
   private final AbfsThrottlingIntercept intercept;
 
 
   private final ListeningScheduledExecutorService executorService;
   private final ListeningScheduledExecutorService executorService;
-  private Boolean isNamespaceEnabled;
 
 
   private boolean renameResilience;
   private boolean renameResilience;
   private TimerTask runningTimerTask;
   private TimerTask runningTimerTask;
@@ -359,9 +357,6 @@ public class AbfsClient implements Closeable {
       List<AbfsHttpHeader> requestHeaders, boolean isCreateFileRequest,
       List<AbfsHttpHeader> requestHeaders, boolean isCreateFileRequest,
       ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext)
       ContextEncryptionAdapter contextEncryptionAdapter, TracingContext tracingContext)
       throws AzureBlobFileSystemException {
       throws AzureBlobFileSystemException {
-    if (!getIsNamespaceEnabled(tracingContext)) {
-      return;
-    }
     String encodedKey, encodedKeySHA256;
     String encodedKey, encodedKeySHA256;
     switch (encryptionType) {
     switch (encryptionType) {
     case GLOBAL_KEY:
     case GLOBAL_KEY:
@@ -1550,15 +1545,6 @@ public class AbfsClient implements Closeable {
     }
     }
   }
   }
 
 
-  private synchronized Boolean getIsNamespaceEnabled(TracingContext tracingContext)
-      throws AzureBlobFileSystemException {
-    if (isNamespaceEnabled == null) {
-      setIsNamespaceEnabled(NamespaceUtil.isNamespaceEnabled(this,
-          tracingContext));
-    }
-    return isNamespaceEnabled;
-  }
-
   protected Boolean getIsPaginatedDeleteEnabled() {
   protected Boolean getIsPaginatedDeleteEnabled() {
     return abfsConfiguration.isPaginatedDeleteEnabled();
     return abfsConfiguration.isPaginatedDeleteEnabled();
   }
   }
@@ -1748,11 +1734,6 @@ public class AbfsClient implements Closeable {
     encryptionContextProvider = provider;
     encryptionContextProvider = provider;
   }
   }
 
 
-  @VisibleForTesting
-  void setIsNamespaceEnabled(final Boolean isNamespaceEnabled) {
-    this.isNamespaceEnabled = isNamespaceEnabled;
-  }
-
   /**
   /**
    * Getter for abfsCounters from AbfsClient.
    * Getter for abfsCounters from AbfsClient.
    * @return AbfsCounters instance.
    * @return AbfsCounters instance.

+ 0 - 88
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/NamespaceUtil.java

@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.fs.azurebfs.utils;
-
-import java.net.HttpURLConnection;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
-import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-
-/**
- * Utility class to provide method which can return if the account is namespace
- * enabled or not.
- */
-public final class NamespaceUtil {
-
-  public static final Logger LOG = LoggerFactory.getLogger(NamespaceUtil.class);
-
-  private NamespaceUtil() {
-
-  }
-
-  /**
-   * Return if the account used in the provided abfsClient object namespace enabled
-   * or not.
-   * It would call {@link org.apache.hadoop.fs.azurebfs.services.AbfsClient#getAclStatus(String, TracingContext)}.
-   * <ol>
-   *   <li>
-   *     If the API call is successful, then the account is namespace enabled.
-   *   </li>
-   *   <li>
-   *     If the server returns with {@link java.net.HttpURLConnection#HTTP_BAD_REQUEST}, the account is non-namespace enabled.
-   *   </li>
-   *   <li>
-   *     If the server call gets some other exception, then the method would throw the exception.
-   *   </li>
-   * </ol>
-   * @param abfsClient client for which namespace-enabled to be checked.
-   * @param tracingContext object to correlate Store requests.
-   * @return if the account corresponding to the given client is namespace-enabled
-   * or not.
-   * @throws AzureBlobFileSystemException throws back the exception the method receives
-   * from the {@link AbfsClient#getAclStatus(String, TracingContext)}. In case it gets
-   * {@link AbfsRestOperationException}, it checks if the exception statusCode is
-   * BAD_REQUEST or not. If not, then it will pass the exception to the calling method.
-   */
-  public static Boolean isNamespaceEnabled(final AbfsClient abfsClient,
-      final TracingContext tracingContext)
-      throws AzureBlobFileSystemException {
-    Boolean isNamespaceEnabled;
-    try {
-      LOG.debug("Get root ACL status");
-      abfsClient.getAclStatus(AbfsHttpConstants.ROOT_PATH, tracingContext);
-      isNamespaceEnabled = true;
-    } catch (AbfsRestOperationException ex) {
-      // Get ACL status is a HEAD request, its response doesn't contain
-      // errorCode
-      // So can only rely on its status code to determine its account type.
-      if (HttpURLConnection.HTTP_BAD_REQUEST != ex.getStatusCode()) {
-        throw ex;
-      }
-      isNamespaceEnabled = false;
-    } catch (AzureBlobFileSystemException ex) {
-      throw ex;
-    }
-    return isNamespaceEnabled;
-  }
-}

+ 0 - 3
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -41,7 +41,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemExc
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClientUtils;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
 import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
@@ -209,8 +208,6 @@ public abstract class AbstractAbfsIntegrationTest extends
       wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
       wasb = new NativeAzureFileSystem(azureNativeFileSystemStore);
       wasb.initialize(wasbUri, rawConfig);
       wasb.initialize(wasbUri, rawConfig);
     }
     }
-    // Todo: To be fixed in HADOOP-19137
-    AbfsClientUtils.setIsNamespaceEnabled(abfs.getAbfsClient(), true);
   }
   }
 
 
   @After
   @After

+ 63 - 23
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsCustomEncryption.java

@@ -35,7 +35,7 @@ import org.apache.hadoop.fs.azurebfs.security.EncodingHelper;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientUtils;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.assertj.core.api.Assertions;
 import org.assertj.core.api.Assertions;
-import org.junit.Assume;
+import org.assertj.core.api.Assumptions;
 import org.junit.Test;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized;
@@ -57,9 +57,11 @@ import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
 import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Lists;
 import org.apache.hadoop.util.Lists;
 
 
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA;
@@ -171,9 +173,6 @@ public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
   }
   }
 
 
   public ITestAbfsCustomEncryption() throws Exception {
   public ITestAbfsCustomEncryption() throws Exception {
-    Assume.assumeTrue("Account should be HNS enabled for CPK",
-        getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
-            false));
     new Random().nextBytes(cpk);
     new Random().nextBytes(cpk);
     cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
     cpkSHAEncoded = EncodingHelper.getBase64EncodedString(
         EncodingHelper.getSHA256Hash(cpk));
         EncodingHelper.getSHA256Hash(cpk));
@@ -181,7 +180,13 @@ public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
 
 
   @Test
   @Test
   public void testCustomEncryptionCombinations() throws Exception {
   public void testCustomEncryptionCombinations() throws Exception {
-    AzureBlobFileSystem fs = getOrCreateFS();
+    try (AzureBlobFileSystem fs = getOrCreateFS()) {
+      validateCpkResponseHeadersForCombination(fs);
+    }
+  }
+
+  private void validateCpkResponseHeadersForCombination(final AzureBlobFileSystem fs)
+      throws Exception {
     Path testPath = path("/testFile");
     Path testPath = path("/testFile");
     String relativePath = fs.getAbfsStore().getRelativePath(testPath);
     String relativePath = fs.getAbfsStore().getRelativePath(testPath);
     MockEncryptionContextProvider ecp =
     MockEncryptionContextProvider ecp =
@@ -375,9 +380,7 @@ public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
         + getAccountName());
         + getAccountName());
     configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
     configuration.unset(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
         + getAccountName());
         + getAccountName());
-    AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(configuration);
-    fileSystemsOpenedInTest.add(fs);
-    return fs;
+    return getAzureBlobFileSystem(configuration);
   }
   }
 
 
   private AzureBlobFileSystem getCPKEnabledFS() throws IOException {
   private AzureBlobFileSystem getCPKEnabledFS() throws IOException {
@@ -390,9 +393,34 @@ public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
     conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
     conf.set(FS_AZURE_ENCRYPTION_ENCODED_CLIENT_PROVIDED_KEY_SHA + "."
         + getAccountName(), cpkEncodedSHA);
         + getAccountName(), cpkEncodedSHA);
     conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
     conf.unset(FS_AZURE_ENCRYPTION_CONTEXT_PROVIDER_TYPE);
-    AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(conf);
-    fileSystemsOpenedInTest.add(fs);
-    return fs;
+    return getAzureBlobFileSystem(conf);
+  }
+
+  private AzureBlobFileSystem getAzureBlobFileSystem(final Configuration conf) {
+    try {
+      AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem.newInstance(
+          conf);
+      fileSystemsOpenedInTest.add(fs);
+      Assertions.assertThat(
+          getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+              false))
+          .describedAs("Encryption tests should run only on namespace enabled account")
+          .isTrue();
+      return fs;
+    } catch (IOException ex) {
+      GenericTestUtils.assertExceptionContains(
+          CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE, ex,
+          "Exception message should contain the expected message");
+      Assertions.assertThat(
+              getConfiguration().getBoolean(FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT,
+                  false))
+          .describedAs("Encryption tests should run only on namespace enabled account")
+          .isFalse();
+
+      //Skip the test
+      Assumptions.assumeThat(true).isFalse();
+      return null;
+    }
   }
   }
 
 
   private AzureBlobFileSystem getOrCreateFS() throws Exception {
   private AzureBlobFileSystem getOrCreateFS() throws Exception {
@@ -423,18 +451,18 @@ public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
    * was used to create the x-ms-encryption-context value used for creating the file.
    * was used to create the x-ms-encryption-context value used for creating the file.
    */
    */
   private EncryptionContextProvider createEncryptedFile(Path testPath) throws Exception {
   private EncryptionContextProvider createEncryptedFile(Path testPath) throws Exception {
-    AzureBlobFileSystem fs;
-    if (getFileSystem().getAbfsClient().getEncryptionType() == fileEncryptionType) {
-      fs = getFileSystem();
-    } else {
-      fs = fileEncryptionType == ENCRYPTION_CONTEXT
-          ? getECProviderEnabledFS()
-          : getCPKEnabledFS();
-    }
-    String relativePath = fs.getAbfsStore().getRelativePath(testPath);
-    try (FSDataOutputStream out = fs.create(new Path(relativePath))) {
-      out.write(SERVER_FILE_CONTENT.getBytes());
+    try (AzureBlobFileSystem fs = getFileSystemForFileEncryption()) {
+      String relativePath = fs.getAbfsStore().getRelativePath(testPath);
+      try (FSDataOutputStream out = fs.create(new Path(relativePath))) {
+        out.write(SERVER_FILE_CONTENT.getBytes());
+      }
+      verifyFileEncryption(fs, relativePath);
+      return fs.getAbfsClient().getEncryptionContextProvider();
     }
     }
+  }
+
+  private void verifyFileEncryption(final AzureBlobFileSystem fs,
+      final String relativePath) throws Exception {
     // verify file is encrypted by calling getPathStatus (with properties)
     // verify file is encrypted by calling getPathStatus (with properties)
     // without encryption headers in request
     // without encryption headers in request
     if (fileEncryptionType != EncryptionType.NONE) {
     if (fileEncryptionType != EncryptionType.NONE) {
@@ -448,7 +476,19 @@ public class ITestAbfsCustomEncryption extends AbstractAbfsIntegrationTest {
                   getTestTracingContext(fs, false), abfsClient)));
                   getTestTracingContext(fs, false), abfsClient)));
       fs.getAbfsClient().setEncryptionType(fileEncryptionType);
       fs.getAbfsClient().setEncryptionType(fileEncryptionType);
     }
     }
-    return fs.getAbfsClient().getEncryptionContextProvider();
+  }
+
+  private AzureBlobFileSystem getFileSystemForFileEncryption() throws Exception {
+    AzureBlobFileSystem fs;
+    if (getFileSystem().getAbfsClient().getEncryptionType() == fileEncryptionType) {
+      fs = (AzureBlobFileSystem) FileSystem.newInstance(
+          getConfiguration().getRawConfiguration());
+    } else {
+      fs = fileEncryptionType == ENCRYPTION_CONTEXT
+          ? getECProviderEnabledFS()
+          : getCPKEnabledFS();
+    }
+    return fs;
   }
   }
 
 
   @Override
   @Override

+ 0 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemCreate.java

@@ -36,7 +36,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
-import org.apache.hadoop.fs.azurebfs.services.AbfsClientUtils;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -281,7 +280,6 @@ public class ITestAzureBlobFileSystemCreate extends
     final AzureBlobFileSystem fs =
     final AzureBlobFileSystem fs =
         (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
         (AzureBlobFileSystem) FileSystem.newInstance(currentFs.getUri(),
             config);
             config);
-    AbfsClientUtils.setIsNamespaceEnabled(fs.getAbfsClient(), true);
 
 
     long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
     long totalConnectionMadeBeforeTest = fs.getInstrumentationMap()
         .get(CONNECTIONS_MADE.getStatName());
         .get(CONNECTIONS_MADE.getStatName());

+ 47 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemInitAndCreate.java

@@ -21,10 +21,16 @@ package org.apache.hadoop.fs.azurebfs;
 import java.io.FileNotFoundException;
 import java.io.FileNotFoundException;
 
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
 import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
 
 /**
 /**
  * Test filesystem initialization and creation.
  * Test filesystem initialization and creation.
@@ -49,4 +55,45 @@ public class ITestAzureBlobFileSystemInitAndCreate extends
     final AzureBlobFileSystem fs = this.createFileSystem();
     final AzureBlobFileSystem fs = this.createFileSystem();
     FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
     FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
   }
   }
+
+  @Test
+  public void testGetAclCallOnHnsConfigAbsence() throws Exception {
+    AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance(
+        getRawConfiguration()));
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    AbfsClient client = Mockito.spy(fs.getAbfsClient());
+    Mockito.doReturn(client).when(store).getClient();
+
+    Mockito.doThrow(TrileanConversionException.class)
+        .when(store)
+        .isNamespaceEnabled();
+
+    TracingContext tracingContext = getSampleTracingContext(fs, true);
+    Mockito.doReturn(Mockito.mock(AbfsRestOperation.class))
+        .when(client)
+        .getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class));
+    store.getIsNamespaceEnabled(tracingContext);
+
+    Mockito.verify(client, Mockito.times(1))
+        .getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class));
+  }
+
+  @Test
+  public void testNoGetAclCallOnHnsConfigPresence() throws Exception {
+    AzureBlobFileSystem fs = ((AzureBlobFileSystem) FileSystem.newInstance(
+        getRawConfiguration()));
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    AbfsClient client = Mockito.spy(fs.getAbfsClient());
+    Mockito.doReturn(client).when(store).getClient();
+
+    Mockito.doReturn(true)
+        .when(store)
+        .isNamespaceEnabled();
+
+    TracingContext tracingContext = getSampleTracingContext(fs, true);
+    store.getIsNamespaceEnabled(tracingContext);
+
+    Mockito.verify(client, Mockito.times(0))
+        .getAclStatus(Mockito.anyString(), Mockito.any(TracingContext.class));
+  }
 }
 }

+ 0 - 3
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/AbfsClientUtils.java

@@ -26,9 +26,6 @@ public final class AbfsClientUtils {
   private AbfsClientUtils() {
   private AbfsClientUtils() {
 
 
   }
   }
-  public static void setIsNamespaceEnabled(final AbfsClient abfsClient, final Boolean isNamespaceEnabled) {
-    abfsClient.setIsNamespaceEnabled(isNamespaceEnabled);
-  }
 
 
   public static void setEncryptionContextProvider(final AbfsClient abfsClient, final EncryptionContextProvider provider) {
   public static void setEncryptionContextProvider(final AbfsClient abfsClient, final EncryptionContextProvider provider) {
     abfsClient.setEncryptionContextProvider(provider);
     abfsClient.setEncryptionContextProvider(provider);