Browse Source

HADOOP-18516: [ABFS][Authentication] Support Fixed SAS Token for ABFS Authentication (#6552)

Contributed by Anuj Modi
Anuj Modi 11 months ago
parent
commit
93c787be00
14 changed files with 611 additions and 72 deletions
  1. 53 22
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java
  2. 1 2
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java
  3. 1 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java
  4. 4 1
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java
  5. 6 3
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java
  6. 65 0
      hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/FixedSASTokenProvider.java
  7. 124 25
      hadoop-tools/hadoop-azure/src/site/markdown/abfs.md
  8. 20 3
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java
  9. 182 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChooseSAS.java
  10. 1 1
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java
  11. 14 2
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java
  12. 103 0
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AccountSASGenerator.java
  13. 28 6
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java
  14. 9 6
      hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java

+ 53 - 22
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
 import org.apache.hadoop.util.Preconditions;
 
 import org.apache.commons.lang3.StringUtils;
@@ -980,33 +981,63 @@ public class AbfsConfiguration{
     }
   }
 
+  /**
+   * Returns the SASTokenProvider implementation to be used to generate SAS token.<br>
+   * Users can choose between a custom implementation of {@link SASTokenProvider}
+   * or an in house implementation {@link FixedSASTokenProvider}.<br>
+   * For Custom implementation "fs.azure.sas.token.provider.type" needs to be provided.<br>
+   * For Fixed SAS Token use "fs.azure.sas.fixed.token" needs to be provided.<br>
+   * In case both are provided, Preference will be given to Custom implementation.<br>
+   * Avoid using a custom tokenProvider implementation just to read the configured
+   * fixed token, as this could create confusion. Also,implementing the SASTokenProvider
+   * requires relying on the raw configurations. It is more stable to depend on
+   * the AbfsConfiguration with which a filesystem is initialized, and eliminate
+   * chances of dynamic modifications and spurious situations.<br>
+   * @return sasTokenProvider object based on configurations provided
+   * @throws AzureBlobFileSystemException
+   */
   public SASTokenProvider getSASTokenProvider() throws AzureBlobFileSystemException {
     AuthType authType = getEnum(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SharedKey);
     if (authType != AuthType.SAS) {
       throw new SASTokenProviderException(String.format(
-        "Invalid auth type: %s is being used, expecting SAS", authType));
+          "Invalid auth type: %s is being used, expecting SAS.", authType));
     }
 
     try {
-      String configKey = FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
-      Class<? extends SASTokenProvider> sasTokenProviderClass =
-          getTokenProviderClass(authType, configKey, null,
-              SASTokenProvider.class);
-
-      Preconditions.checkArgument(sasTokenProviderClass != null,
-          String.format("The configuration value for \"%s\" is invalid.", configKey));
-
-      SASTokenProvider sasTokenProvider = ReflectionUtils
-          .newInstance(sasTokenProviderClass, rawConfig);
-      Preconditions.checkArgument(sasTokenProvider != null,
-          String.format("Failed to initialize %s", sasTokenProviderClass));
-
-      LOG.trace("Initializing {}", sasTokenProviderClass.getName());
-      sasTokenProvider.initialize(rawConfig, accountName);
-      LOG.trace("{} init complete", sasTokenProviderClass.getName());
-      return sasTokenProvider;
+      Class<? extends SASTokenProvider> customSasTokenProviderImplementation =
+          getTokenProviderClass(authType, FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
+              null, SASTokenProvider.class);
+      String configuredFixedToken = this.getTrimmedPasswordString(FS_AZURE_SAS_FIXED_TOKEN, EMPTY_STRING);
+
+      if (customSasTokenProviderImplementation == null && configuredFixedToken.isEmpty()) {
+        throw new SASTokenProviderException(String.format(
+            "At least one of the \"%s\" and \"%s\" must be set.",
+            FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, FS_AZURE_SAS_FIXED_TOKEN));
+      }
+
+      // Prefer Custom SASTokenProvider Implementation if configured.
+      if (customSasTokenProviderImplementation != null) {
+        LOG.trace("Using Custom SASTokenProvider implementation because it is given precedence when it is set.");
+        SASTokenProvider sasTokenProvider = ReflectionUtils.newInstance(
+            customSasTokenProviderImplementation, rawConfig);
+        if (sasTokenProvider == null) {
+          throw new SASTokenProviderException(String.format(
+              "Failed to initialize %s", customSasTokenProviderImplementation));
+        }
+        LOG.trace("Initializing {}", customSasTokenProviderImplementation.getName());
+        sasTokenProvider.initialize(rawConfig, accountName);
+        LOG.trace("{} init complete", customSasTokenProviderImplementation.getName());
+        return sasTokenProvider;
+      } else {
+        LOG.trace("Using FixedSASTokenProvider implementation");
+        FixedSASTokenProvider fixedSASTokenProvider = new FixedSASTokenProvider(configuredFixedToken);
+        return fixedSASTokenProvider;
+      }
+    } catch (SASTokenProviderException e) {
+      throw e;
     } catch (Exception e) {
-      throw new TokenAccessProviderException("Unable to load SAS token provider class: " + e, e);
+      throw new SASTokenProviderException(
+          "Unable to load SAS token provider class: " + e, e);
     }
   }
 
@@ -1019,14 +1050,14 @@ public class AbfsConfiguration{
       Class<? extends EncryptionContextProvider> encryptionContextClass =
           getAccountSpecificClass(configKey, null,
               EncryptionContextProvider.class);
-      Preconditions.checkArgument(encryptionContextClass != null, String.format(
+      Preconditions.checkArgument(encryptionContextClass != null,
           "The configuration value for %s is invalid, or config key is not account-specific",
-          configKey));
+          configKey);
 
       EncryptionContextProvider encryptionContextProvider =
           ReflectionUtils.newInstance(encryptionContextClass, rawConfig);
       Preconditions.checkArgument(encryptionContextProvider != null,
-          String.format("Failed to initialize %s", encryptionContextClass));
+         "Failed to initialize %s", encryptionContextClass);
 
       LOG.trace("{} init complete", encryptionContextClass.getName());
       return encryptionContextProvider;

+ 1 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -1292,10 +1292,9 @@ public class AzureBlobFileSystem extends FileSystem
 
   /**
    * Incrementing exists() calls from superclass for statistic collection.
-   *
    * @param f source path.
    * @return true if the path exists.
-   * @throws IOException
+   * @throws IOException if some issue in checking path.
    */
   @Override
   public boolean exists(Path f) throws IOException {

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -1729,7 +1729,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       creds = new SharedKeyCredentials(accountName.substring(0, dotIndex),
             abfsConfiguration.getStorageAccountKey());
     } else if (authType == AuthType.SAS) {
-      LOG.trace("Fetching SAS token provider");
+      LOG.trace("Fetching SAS Token Provider");
       sasTokenProvider = abfsConfiguration.getSASTokenProvider();
     } else {
       LOG.trace("Fetching token provider");

+ 4 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -290,7 +290,10 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_ENABLE_DELEGATION_TOKEN = "fs.azure.enable.delegation.token";
   public static final String FS_AZURE_DELEGATION_TOKEN_PROVIDER_TYPE = "fs.azure.delegation.token.provider.type";
 
-  /** Key for SAS token provider **/
+  /** Key for fixed SAS token: {@value}. **/
+  public static final String FS_AZURE_SAS_FIXED_TOKEN = "fs.azure.sas.fixed.token";
+
+  /** Key for SAS token provider: {@value}. **/
   public static final String FS_AZURE_SAS_TOKEN_PROVIDER_TYPE = "fs.azure.sas.token.provider.type";
 
   /** For performance, AbfsInputStream/AbfsOutputStream re-use SAS tokens until the expiry is within this number of seconds. **/

+ 6 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -1011,6 +1011,7 @@ public class AbfsClient implements Closeable {
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
+
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
@@ -1107,6 +1108,7 @@ public class AbfsClient implements Closeable {
     }
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
+
     // AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
     String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
@@ -1419,16 +1421,17 @@ public class AbfsClient implements Closeable {
           sasToken = cachedSasToken;
           LOG.trace("Using cached SAS token.");
         }
+
         // if SAS Token contains a prefix of ?, it should be removed
         if (sasToken.charAt(0) == '?') {
           sasToken = sasToken.substring(1);
         }
+
         queryBuilder.setSASToken(sasToken);
         LOG.trace("SAS token fetch complete for {} on {}", operation, path);
       } catch (Exception ex) {
-        throw new SASTokenProviderException(String.format("Failed to acquire a SAS token for %s on %s due to %s",
-            operation,
-            path,
+        throw new SASTokenProviderException(String.format(
+            "Failed to acquire a SAS token for %s on %s due to %s", operation, path,
             ex.toString()));
       }
     }

+ 65 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/FixedSASTokenProvider.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
+import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
+
+/**
+ * In house implementation of {@link SASTokenProvider} to use a fixed SAS token with ABFS.
+ * Use this to avoid implementing a Custom Token Provider just to return fixed SAS.
+ * Fixed SAS Token to be provided using the config "fs.azure.sas.fixed.token".
+ */
+public class FixedSASTokenProvider implements SASTokenProvider {
+  private String fixedSASToken;
+
+  public FixedSASTokenProvider(final String fixedSASToken) throws SASTokenProviderException {
+    this.fixedSASToken = fixedSASToken;
+    if (fixedSASToken == null || fixedSASToken.isEmpty()) {
+      throw new SASTokenProviderException(
+          String.format("Configured Fixed SAS Token is Invalid: %s", fixedSASToken));
+    }
+  }
+
+  @Override
+  public void initialize(final Configuration configuration,
+      final String accountName)
+      throws IOException {
+  }
+
+  /**
+   * Returns the fixed SAS Token configured.
+   * @param account the name of the storage account.
+   * @param fileSystem the name of the fileSystem.
+   * @param path the file or directory path.
+   * @param operation the operation to be performed on the path.
+   * @return Fixed SAS Token
+   * @throws IOException never
+   */
+  @Override
+  public String getSASToken(final String account,
+      final String fileSystem,
+      final String path,
+      final String operation) throws IOException {
+    return fixedSASToken;
+  }
+}

+ 124 - 25
hadoop-tools/hadoop-azure/src/site/markdown/abfs.md

@@ -12,7 +12,7 @@
   limitations under the License. See accompanying LICENSE file.
 -->
 
-# Hadoop Azure Support: ABFS   Azure Data Lake Storage Gen2
+# Hadoop Azure Support: ABFS  - Azure Data Lake Storage Gen2
 
 <!-- MACRO{toc|fromDepth=1|toDepth=3} -->
 
@@ -309,12 +309,13 @@ in different deployment situations.
 The ABFS client can be deployed in different ways, with its authentication needs
 driven by them.
 
-1. With the storage account's authentication secret in the configuration:
-"Shared Key".
-1. Using OAuth 2.0 tokens of one form or another.
-1. Deployed in-Azure with the Azure VMs providing OAuth 2.0 tokens to the application,
- "Managed Instance".
-1. Using Shared Access Signature (SAS) tokens provided by a custom implementation of the SASTokenProvider interface.
+1. With the storage account's authentication secret in the configuration: "Shared Key".
+2. Using OAuth 2.0 tokens of one form or another.
+3. Deployed in-Azure with the Azure VMs providing OAuth 2.0 tokens to the application, "Managed Instance".
+4. Using Shared Access Signature (SAS) tokens provided by a custom implementation of the SASTokenProvider interface.
+5. By directly configuring a fixed Shared Access Signature (SAS) token in the account configuration settings files.
+
+Note: SAS Based Authentication should be used only with HNS Enabled accounts.
 
 What can be changed is what secrets/credentials are used to authenticate the caller.
 
@@ -355,14 +356,14 @@ the password, "key", retrieved from the XML/JCECKs configuration files.
 
 ```xml
 <property>
-  <name>fs.azure.account.auth.type.abfswales1.dfs.core.windows.net</name>
+  <name>fs.azure.account.auth.type.ACCOUNT_NAME.dfs.core.windows.net</name>
   <value>SharedKey</value>
   <description>
   </description>
 </property>
 <property>
-  <name>fs.azure.account.key.abfswales1.dfs.core.windows.net</name>
-  <value>ZGlkIHlvdSByZWFsbHkgdGhpbmsgSSB3YXMgZ29pbmcgdG8gcHV0IGEga2V5IGluIGhlcmU/IA==</value>
+  <name>fs.azure.account.key.ACCOUNT_NAME.dfs.core.windows.net</name>
+  <value>ACCOUNT_KEY</value>
   <description>
   The secret password. Never share these.
   </description>
@@ -609,21 +610,119 @@ In case delegation token is enabled, and the config `fs.azure.delegation.token
 
 ### Shared Access Signature (SAS) Token Provider
 
-A Shared Access Signature (SAS) token provider supplies the ABFS connector with SAS
-tokens by implementing the SASTokenProvider interface.
-
-```xml
-<property>
-  <name>fs.azure.account.auth.type</name>
-  <value>SAS</value>
-</property>
-<property>
-  <name>fs.azure.sas.token.provider.type</name>
-  <value>{fully-qualified-class-name-for-implementation-of-SASTokenProvider-interface}</value>
-</property>
-```
-
-The declared class must implement `org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider`.
+A shared access signature (SAS) provides secure delegated access to resources in
+your storage account. With a SAS, you have granular control over how a client can access your data.
+To know more about how SAS Authentication works refer to
+[Grant limited access to Azure Storage resources using shared access signatures (SAS)](https://learn.microsoft.com/en-us/azure/storage/common/storage-sas-overview)
+
+There are three types of SAS supported by Azure Storage:
+- [User Delegation SAS](https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas): Recommended for use with ABFS Driver with HNS Enabled ADLS Gen2 accounts. It is Identity based SAS that works at blob/directory level)
+- [Service SAS](https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas): Global and works at container level.
+- [Account SAS](https://learn.microsoft.com/en-us/rest/api/storageservices/create-account-sas): Global and works at account level.
+
+#### Known Issues With SAS
+- SAS Based Authentication works only with HNS Enabled ADLS Gen2 Accounts which
+is a recommended account type to be used with ABFS.
+- Certain root level operations are known to fail with SAS Based Authentication.
+
+#### Using User Delegation SAS with ABFS
+
+- **Description**: ABFS allows you to implement your custom SAS Token Provider
+that uses your identity to create a user delegation key which then can be used to
+create SAS instead of storage account key. The declared class must implement
+`org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider`.
+
+- **Configuration**: To use this method with ABFS Driver, specify the following properties in your `core-site.xml` file:
+    1. Authentication Type:
+        ```xml
+        <property>
+          <name>fs.azure.account.auth.type</name>
+          <value>SAS</value>
+        </property>
+        ```
+
+    1. Custom SAS Token Provider Class:
+        ```xml
+        <property>
+          <name>fs.azure.sas.token.provider.type</name>
+          <value>CUSTOM_SAS_TOKEN_PROVIDER_CLASS</value>
+        </property>
+        ```
+
+    Replace `CUSTOM_SAS_TOKEN_PROVIDER_CLASS` with fully qualified class name of
+your custom token provider implementation. Depending upon the implementation you
+might need to specify additional configurations that are required by your custom
+implementation.
+
+- **Example**: ABFS Hadoop Driver provides a [MockDelegationSASTokenProvider](https://github.com/apache/hadoop/blob/trunk/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java)
+implementation that can be used as an example on how to implement your own custom
+SASTokenProvider. This requires the Application credentials to be specifed using
+the following configurations apart from above two:
+
+    1. App Service Principle Tenant Id:
+        ```xml
+        <property>
+          <name>fs.azure.test.app.service.principal.tenant.id</name>
+          <value>TENANT_ID</value>
+        </property>
+        ```
+    1. App Service Principle Object Id:
+        ```xml
+        <property>
+          <name>fs.azure.test.app.service.principal.object.id</name>
+          <value>OBJECT_ID</value>
+        </property>
+        ```
+    1. App Id:
+        ```xml
+        <property>
+          <name>fs.azure.test.app.id</name>
+          <value>APPLICATION_ID</value>
+        </property>
+        ```
+    1. App Secret:
+        ```xml
+        <property>
+          <name>fs.azure.test.app.secret</name>
+          <value>APPLICATION_SECRET</value>
+        </property>
+        ```
+
+- **Security**: More secure than Shared Key and allows granting limited access
+to data without exposing the access key. Recommended to be used only with HNS Enabled,
+ADLS Gen 2 storage accounts.
+
+#### Using Account/Service SAS with ABFS
+
+- **Description**: ABFS allows user to use Account/Service SAS for authenticating
+requests. User can specify them as fixed SAS Token to be used across all the requests.
+
+- **Configuration**: To use this method with ABFS Driver, specify the following properties in your `core-site.xml` file:
+
+    1. Authentication Type:
+        ```xml
+        <property>
+          <name>fs.azure.account.auth.type</name>
+          <value>SAS</value>
+        </property>
+        ```
+
+    1.  Fixed SAS Token:
+        ```xml
+        <property>
+          <name>fs.azure.sas.fixed.token</name>
+          <value>FIXED_SAS_TOKEN</value>
+        </property>
+        ```
+
+    Replace `FIXED_SAS_TOKEN` with fixed Account/Service SAS. You can also
+generate SAS from Azure portal. Account -> Security + Networking -> Shared Access Signature
+
+- **Security**: Account/Service SAS requires account keys to be used which makes
+them less secure. There is no scope of having delegated access to different users.
+
+*Note:* When `fs.azure.sas.token.provider.type` and `fs.azure.fixed.sas.token`
+are both configured, precedence will be given to the custom token provider implementation.
 
 ## <a name="technical"></a> Technical notes
 

+ 20 - 3
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/AbstractAbfsIntegrationTest.java

@@ -284,13 +284,30 @@ public abstract class AbstractAbfsIntegrationTest extends
     useConfiguredFileSystem = true;
   }
 
+  /**
+   * Create a filesystem for SAS tests using the SharedKey authentication.
+   * We do not allow filesystem creation with SAS because certain type of SAS do not have
+   * required permissions, and it is not known what type of SAS is configured by user.
+   * @throws Exception
+   */
   protected void createFilesystemForSASTests() throws Exception {
-    // The SAS tests do not have permission to create a filesystem
-    // so first create temporary instance of the filesystem using SharedKey
-    // then re-use the filesystem it creates with SAS auth instead of SharedKey.
+    createFilesystemWithTestFileForSASTests(null);
+  }
+
+  /**
+   * Create a filesystem for SAS tests along with a test file using SharedKey authentication.
+   * We do not allow filesystem creation with SAS because certain type of SAS do not have
+   * required permissions, and it is not known what type of SAS is configured by user.
+   * @param testPath path of the test file.
+   * @throws Exception
+   */
+  protected void createFilesystemWithTestFileForSASTests(Path testPath) throws Exception {
     try (AzureBlobFileSystem tempFs = (AzureBlobFileSystem) FileSystem.newInstance(rawConfig)){
       ContractTestUtils.assertPathExists(tempFs, "This path should exist",
           new Path("/"));
+      if (testPath != null) {
+        tempFs.create(testPath).close();
+      }
       abfsConfig.set(FS_AZURE_ACCOUNT_AUTH_TYPE_PROPERTY_NAME, AuthType.SAS.name());
       usingFilesystemForSASTests = true;
     }

+ 182 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemChooseSAS.java

@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs;
+
+import java.io.IOException;
+import java.nio.file.AccessDeniedException;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Assume;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
+import org.apache.hadoop.fs.azurebfs.extensions.MockDelegationSASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AuthType;
+import org.apache.hadoop.fs.azurebfs.services.FixedSASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.utils.AccountSASGenerator;
+import org.apache.hadoop.fs.azurebfs.utils.Base64;
+
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_FIXED_TOKEN;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_SAS_TOKEN_PROVIDER_TYPE;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.accountProperty;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Tests to validate the choice between using a custom SASTokenProvider
+ * implementation and FixedSASTokenProvider.
+ */
+public class ITestAzureBlobFileSystemChooseSAS extends AbstractAbfsIntegrationTest{
+
+  private String accountSAS = null;
+  private static final String TEST_PATH = "testPath";
+
+  /**
+   * To differentiate which SASTokenProvider was used we will use different type of SAS Tokens.
+   * FixedSASTokenProvider will return an Account SAS with only read permissions.
+   * SASTokenProvider will return a User Delegation SAS Token with both read and write permissions.
+=   */
+  public ITestAzureBlobFileSystemChooseSAS() throws Exception {
+    // SAS Token configured might not have permissions for creating file system.
+    // Shared Key must be configured to create one. Once created, a new instance
+    // of same file system will be used with SAS Authentication.
+    Assume.assumeTrue(this.getAuthType() == AuthType.SharedKey);
+  }
+
+  @Override
+  public void setup() throws Exception {
+    createFilesystemWithTestFileForSASTests(new Path(TEST_PATH));
+    super.setup();
+    generateAccountSAS();
+  }
+
+  /**
+   * Generates an Account SAS Token using the Account Shared Key to be used as a fixed SAS Token.
+   * Account SAS used here will have only read permissions to resources.
+   * This will be used by individual tests to set in the configurations.
+   * @throws AzureBlobFileSystemException
+   */
+  private void generateAccountSAS() throws AzureBlobFileSystemException {
+    final String accountKey = getConfiguration().getStorageAccountKey();
+    AccountSASGenerator configAccountSASGenerator = new AccountSASGenerator(Base64.decode(accountKey));
+    // Setting only read permissions.
+    configAccountSASGenerator.setPermissions("r");
+    accountSAS = configAccountSASGenerator.getAccountSAS(getAccountName());
+  }
+
+  /**
+   * Tests the scenario where both the custom SASTokenProvider and a fixed SAS token are configured.
+   * Custom implementation of SASTokenProvider class should be chosen and User Delegation SAS should be used.
+   * @throws Exception
+   */
+  @Test
+  public void testBothProviderFixedTokenConfigured() throws Exception {
+    AbfsConfiguration testAbfsConfig = new AbfsConfiguration(
+        getRawConfiguration(), this.getAccountName());
+    removeAnyPresetConfiguration(testAbfsConfig);
+
+    // Configuring a SASTokenProvider class which provides a user delegation SAS.
+    testAbfsConfig.set(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE,
+        MockDelegationSASTokenProvider.class.getName());
+
+    // configuring the Fixed SAS token which is an Account SAS.
+    testAbfsConfig.set(FS_AZURE_SAS_FIXED_TOKEN, accountSAS);
+
+    // Creating a new file system with updated configs.
+    try (AzureBlobFileSystem newTestFs = (AzureBlobFileSystem)
+        FileSystem.newInstance(testAbfsConfig.getRawConfiguration())) {
+
+      // Asserting that MockDelegationSASTokenProvider is used.
+      Assertions.assertThat(testAbfsConfig.getSASTokenProvider())
+          .describedAs("Custom SASTokenProvider Class must be used")
+          .isInstanceOf(MockDelegationSASTokenProvider.class);
+
+      // Assert that User Delegation SAS is used and both read and write operations are permitted.
+      Path testPath = path(getMethodName());
+      newTestFs.create(testPath).close();
+      newTestFs.open(testPath).close();
+    }
+  }
+
+  /**
+   * Tests the scenario where only the fixed token is configured, and no token provider class is set.
+   * Account SAS Token configured as fixed SAS should be used.
+   * Also verifies that Account Specific as well as Account Agnostic Fixed SAS Token Works.
+   * @throws IOException
+   */
+  @Test
+  public void testOnlyFixedTokenConfigured() throws Exception {
+    AbfsConfiguration testAbfsConfig = new AbfsConfiguration(
+        getRawConfiguration(), this.getAccountName());
+
+    // setting an Account Specific Fixed SAS token.
+    removeAnyPresetConfiguration(testAbfsConfig);
+    testAbfsConfig.set(accountProperty(FS_AZURE_SAS_FIXED_TOKEN, this.getAccountName()), accountSAS);
+    testOnlyFixedTokenConfiguredInternal(testAbfsConfig);
+
+    // setting an Account Agnostic Fixed SAS token.
+    removeAnyPresetConfiguration(testAbfsConfig);
+    testAbfsConfig.set(FS_AZURE_SAS_FIXED_TOKEN, accountSAS);
+    testOnlyFixedTokenConfiguredInternal(testAbfsConfig);
+  }
+
+  private void testOnlyFixedTokenConfiguredInternal(AbfsConfiguration testAbfsConfig) throws Exception {
+    // Creating a new filesystem with updated configs.
+    try (AzureBlobFileSystem newTestFs = (AzureBlobFileSystem)
+        FileSystem.newInstance(testAbfsConfig.getRawConfiguration())) {
+
+      // Asserting that FixedSASTokenProvider is used.
+      Assertions.assertThat(testAbfsConfig.getSASTokenProvider())
+          .describedAs("FixedSASTokenProvider Class must be used")
+          .isInstanceOf(FixedSASTokenProvider.class);
+
+      // Assert that Account SAS is used and only read operations are permitted.
+      Path testPath = path(getMethodName());
+      intercept(AccessDeniedException.class, () -> {
+        newTestFs.create(testPath);
+      });
+      // Read Operation is permitted
+      newTestFs.getFileStatus(new Path(TEST_PATH));
+    }
+  }
+
+  /**
+   * Tests the scenario where both the token provider class and the fixed token are not configured.
+   * The code errors out at the initialization stage itself.
+   * @throws IOException
+   */
+  @Test
+  public void testBothProviderFixedTokenUnset() throws Exception {
+    AbfsConfiguration testAbfsConfig = new AbfsConfiguration(
+        getRawConfiguration(), this.getAccountName());
+    removeAnyPresetConfiguration(testAbfsConfig);
+
+    intercept(SASTokenProviderException.class, () -> {
+      FileSystem.newInstance(testAbfsConfig.getRawConfiguration());
+    });
+  }
+
+  private void removeAnyPresetConfiguration(AbfsConfiguration testAbfsConfig) {
+    testAbfsConfig.unset(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE);
+    testAbfsConfig.unset(FS_AZURE_SAS_FIXED_TOKEN);
+    testAbfsConfig.unset(accountProperty(FS_AZURE_SAS_TOKEN_PROVIDER_TYPE, this.getAccountName()));
+    testAbfsConfig.unset(accountProperty(FS_AZURE_SAS_FIXED_TOKEN, this.getAccountName()));
+  }
+}

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockDelegationSASTokenProvider.java

@@ -43,7 +43,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.D
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_HTTP_READ_TIMEOUT;
 
 /**
- * A mock SAS token provider implementation
+ * A mock SAS token provider implementation.
  */
 public class MockDelegationSASTokenProvider implements SASTokenProvider {
 

+ 14 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/extensions/MockSASTokenProvider.java

@@ -20,7 +20,11 @@ package org.apache.hadoop.fs.azurebfs.extensions;
 
 import java.io.IOException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import org.apache.hadoop.security.AccessControlException;
 
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -28,17 +32,25 @@ import org.apache.hadoop.fs.azurebfs.utils.Base64;
 import org.apache.hadoop.fs.azurebfs.utils.ServiceSASGenerator;
 
 /**
- * A mock SAS token provider implementation
+ * A mock SAS token provider implementation.
  */
 public class MockSASTokenProvider implements SASTokenProvider {
 
   private byte[] accountKey;
   private ServiceSASGenerator generator;
   private boolean skipAuthorizationForTestSetup = false;
+  private static final Logger LOG = LoggerFactory.getLogger(MockSASTokenProvider.class);
 
   // For testing we use a container SAS for all operations.
   private String generateSAS(byte[] accountKey, String accountName, String fileSystemName) {
-     return generator.getContainerSASWithFullControl(accountName, fileSystemName);
+    String containerSAS = "";
+    try {
+      containerSAS = generator.getContainerSASWithFullControl(accountName, fileSystemName);
+    } catch (InvalidConfigurationValueException e) {
+      LOG.debug(e.getMessage());
+      containerSAS = "";
+    }
+    return containerSAS;
   }
 
   @Override

+ 103 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/AccountSASGenerator.java

@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.time.Instant;
+
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
+import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
+
+/**
+ * Test Account SAS Generator.
+ * SAS generated by this will have only read access to storage account blob and file services.
+ */
+public class AccountSASGenerator extends SASGenerator {
+  /**
+   * Creates Account SAS from Storage Account Key.
+   * https://learn.microsoft.com/en-us/rest/api/storageservices/create-account-sas.
+   * @param accountKey: the storage account key.
+   */
+  public AccountSASGenerator(byte[] accountKey) {
+    super(accountKey);
+  }
+
+  private String permissions = "racwdl";
+
+  public String getAccountSAS(String accountName) throws
+      AzureBlobFileSystemException {
+    // retaining only the account name
+    accountName = getCanonicalAccountName(accountName);
+    String sp = permissions;
+    String sv = "2021-06-08";
+    String srt = "sco";
+
+    String st = ISO_8601_FORMATTER.format(Instant.now().minus(FIVE_MINUTES));
+    String se = ISO_8601_FORMATTER.format(Instant.now().plus(ONE_DAY));
+
+    String ss = "bf";
+    String spr = "https";
+    String signature = computeSignatureForSAS(sp, ss, srt, st, se, sv, accountName);
+
+    AbfsUriQueryBuilder qb = new AbfsUriQueryBuilder();
+    qb.addQuery("sp", sp);
+    qb.addQuery("ss", ss);
+    qb.addQuery("srt", srt);
+    qb.addQuery("st", st);
+    qb.addQuery("se", se);
+    qb.addQuery("sv", sv);
+    qb.addQuery("sig", signature);
+    return qb.toString().substring(1);
+  }
+
+  private String computeSignatureForSAS(String signedPerm, String signedService, String signedResType,
+      String signedStart, String signedExp, String signedVersion, String accountName) {
+
+    StringBuilder sb = new StringBuilder();
+    sb.append(accountName);
+    sb.append("\n");
+    sb.append(signedPerm);
+    sb.append("\n");
+    sb.append(signedService);
+    sb.append("\n");
+    sb.append(signedResType);
+    sb.append("\n");
+    sb.append(signedStart);
+    sb.append("\n");
+    sb.append(signedExp);
+    sb.append("\n");
+    sb.append("\n"); // signedIP
+    sb.append("\n"); // signedProtocol
+    sb.append(signedVersion);
+    sb.append("\n");
+    sb.append("\n"); //signed encryption scope
+
+    String stringToSign = sb.toString();
+    LOG.debug("Account SAS stringToSign: " + stringToSign.replace("\n", "."));
+    return computeHmac256(stringToSign);
+  }
+
+  /**
+   * By default Account SAS has all the available permissions. Use this to
+   * override the default permissions and set as per the requirements.
+   * @param permissions
+   */
+  public void setPermissions(final String permissions) {
+    this.permissions = permissions;
+  }
+}

+ 28 - 6
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/SASGenerator.java

@@ -29,6 +29,10 @@ import javax.crypto.spec.SecretKeySpec;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
+
 /**
  * Test SAS generator.
  */
@@ -54,10 +58,8 @@ public abstract class SASGenerator {
   protected static final Logger LOG = LoggerFactory.getLogger(SASGenerator.class);
   public static final Duration FIVE_MINUTES = Duration.ofMinutes(5);
   public static final Duration ONE_DAY = Duration.ofDays(1);
-  public static final DateTimeFormatter ISO_8601_FORMATTER =
-      DateTimeFormatter
-          .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT)
-          .withZone(ZoneId.of("UTC"));
+  public static final DateTimeFormatter ISO_8601_FORMATTER = DateTimeFormatter
+      .ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ROOT).withZone(ZoneId.of("UTC"));
 
   private Mac hmacSha256;
   private byte[] key;
@@ -68,7 +70,7 @@ public abstract class SASGenerator {
 
   /**
    * Called by subclasses to initialize the cryptographic SHA-256 HMAC provider.
-   * @param key - a 256-bit secret key
+   * @param key - a 256-bit secret key.
    */
   protected SASGenerator(byte[] key) {
     this.key = key;
@@ -85,6 +87,26 @@ public abstract class SASGenerator {
     }
   }
 
+  protected String getCanonicalAccountName(String accountName) throws
+      InvalidConfigurationValueException {
+    // returns the account name without the endpoint
+    // given account names with endpoint have the format accountname.endpoint
+    // For example, input of xyz.dfs.core.windows.net should return "xyz" only
+    int dotIndex = accountName.indexOf(AbfsHttpConstants.DOT);
+    if (dotIndex == 0) {
+      // case when accountname starts with a ".": endpoint is present, accountName is null
+      // for example .dfs.azure.com, which is invalid
+      throw new InvalidConfigurationValueException("Account Name is not fully qualified");
+    }
+    if (dotIndex > 0) {
+      // case when endpoint is present with accountName
+      return accountName.substring(0, dotIndex);
+    } else {
+      // case when accountName is already canonicalized
+      return accountName;
+    }
+  }
+
   protected String computeHmac256(final String stringToSign) {
     byte[] utf8Bytes;
     try {
@@ -98,4 +120,4 @@ public abstract class SASGenerator {
     }
     return Base64.encode(hmac);
   }
-}
+}

+ 9 - 6
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ServiceSASGenerator.java

@@ -20,23 +20,26 @@ package org.apache.hadoop.fs.azurebfs.utils;
 
 import java.time.Instant;
 
+import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidConfigurationValueException;
 import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
 
 /**
- * Test Service SAS generator.
+ * Test Service SAS Generator.
  */
 public class ServiceSASGenerator extends SASGenerator {
 
   /**
-   * Creates a SAS Generator for Service SAS
+   * Creates a SAS Generator for Service SAS.
    * (https://docs.microsoft.com/en-us/rest/api/storageservices/create-service-sas).
-   * @param accountKey - the storage account key
+   * @param accountKey - the storage account key.
    */
   public ServiceSASGenerator(byte[] accountKey) {
     super(accountKey);
   }
 
-  public String getContainerSASWithFullControl(String accountName, String containerName) {
+  public String getContainerSASWithFullControl(String accountName, String containerName) throws
+      InvalidConfigurationValueException {
+    accountName = getCanonicalAccountName(accountName);
     String sp = "rcwdl";
     String sv = AuthenticationVersion.Feb20.toString();
     String sr = "c";
@@ -66,7 +69,7 @@ public class ServiceSASGenerator extends SASGenerator {
     sb.append("\n");
     sb.append(se);
     sb.append("\n");
-    // canonicalized resource
+    // canonicalize resource
     sb.append("/blob/");
     sb.append(accountName);
     sb.append("/");
@@ -93,4 +96,4 @@ public class ServiceSASGenerator extends SASGenerator {
     LOG.debug("Service SAS stringToSign: " + stringToSign.replace("\n", "."));
     return computeHmac256(stringToSign);
   }
-}
+}