Ver código fonte

HADOOP-19406. ABFS: [FNSOverBlob] Support User Delegation SAS for FNS Blob (#7523)

Contributed by Manika Joshi
Reviewed by Anmol Asrani, Manish Bhatt, Anuj Modi

Signed off by Anuj Modi<anujmodi@apache.org>
manika137 1 semana atrás
pai
commit
10cbe27d56

+ 23 - 5
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.azurebfs;
 
+import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -41,7 +42,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import javax.annotation.Nullable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -118,8 +118,24 @@ import static java.net.HttpURLConnection.HTTP_INTERNAL_ERROR;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
 import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
-import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_APPEND;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_CREATE_NON_RECURSIVE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_DELETE;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_EXIST;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_DELEGATION_TOKEN;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_GET_FILE_STATUS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_LIST_STATUS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_MKDIRS;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_OPEN;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.CALL_RENAME;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_CREATED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.DIRECTORIES_DELETED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.ERROR_IGNORED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_CREATED;
+import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.FILES_DELETED;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CPK_IN_NON_HNS_ACCOUNT_ERROR_MESSAGE;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType.DFS;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.DATA_BLOCKS_BUFFER;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_ACCOUNT_IS_HNS_ENABLED;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BLOCK_UPLOAD_ACTIVE_BLOCKS;
@@ -240,9 +256,10 @@ public class AzureBlobFileSystem extends FileSystem
 
     /*
      * Validates if the correct SAS Token provider is configured for non-HNS accounts.
-     * For non-HNS accounts, if the authentication type is set to SAS, only a fixed SAS Token is supported as of now.
-     * A custom SAS Token Provider should not be configured in such cases, as it will override the FixedSASTokenProvider and render it unused.
-     * If the namespace is not enabled and the FixedSASTokenProvider is not configured,
+     * For non-HNS accounts with Blob endpoint, both fixed SAS Token and custom SAS Token provider are supported.
+     * For non-HNS accounts with DFS endpoint, if the authentication type is set to SAS, only fixed SAS Token is supported as of now.
+     * A custom SAS Token Provider should not be configured in this case as it will override the FixedSASTokenProvider and render it unused.
+     * If the namespace is not enabled and the FixedSASTokenProvider is not configured for non-HNS accounts with DFS endpoint,
      * an InvalidConfigurationValueException will be thrown.
      *
      * @throws InvalidConfigurationValueException if account is not namespace enabled and FixedSASTokenProvider is not configured.
@@ -250,6 +267,7 @@ public class AzureBlobFileSystem extends FileSystem
     try {
       if (abfsConfiguration.getAuthType(abfsConfiguration.getAccountName()) == AuthType.SAS && // Auth type is SAS
           !tryGetIsNamespaceEnabled(new TracingContext(initFSTracingContext)) && // Account is FNS
+          abfsConfiguration.getFsConfiguredServiceType() == DFS && // Service type is DFS
           !abfsConfiguration.isFixedSASTokenProviderConfigured()) { // Fixed SAS Token Provider is not configured
         throw new InvalidConfigurationValueException(FS_AZURE_SAS_FIXED_TOKEN, UNAUTHORIZED_SAS);
       }

+ 4 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/extensions/SASTokenProvider.java

@@ -33,6 +33,8 @@ import org.apache.hadoop.security.AccessControlException;
 public interface SASTokenProvider {
 
   String CHECK_ACCESS_OPERATION = "check-access";
+  String COPY_BLOB_DST_OPERATION = "copy-blob-dst";
+  String COPY_BLOB_SRC_OPERATION = "copy-blob-src";
   String CREATE_DIRECTORY_OPERATION = "create-directory";
   String CREATE_FILE_OPERATION = "create-file";
   String DELETE_OPERATION = "delete";
@@ -40,7 +42,9 @@ public interface SASTokenProvider {
   String GET_ACL_OPERATION = "get-acl";
   String GET_STATUS_OPERATION = "get-status";
   String GET_PROPERTIES_OPERATION = "get-properties";
+  String LEASE_BLOB_OPERATION = "lease-blob";
   String LIST_OPERATION = "list";
+  String LIST_OPERATION_BLOB = "list-blob";
   String READ_OPERATION = "read";
   String RENAME_SOURCE_OPERATION = "rename-source";
   String RENAME_DESTINATION_OPERATION = "rename-destination";
@@ -49,8 +53,6 @@ public interface SASTokenProvider {
   String SET_PERMISSION_OPERATION = "set-permission";
   String SET_PROPERTIES_OPERATION = "set-properties";
   String WRITE_OPERATION = "write";
-  // Generic HTTP operation can be used with FixedSASTokenProvider.
-  String FIXED_SAS_STORE_OPERATION = "fixed-sas";
 
   /**
    * Initialize authorizer for Azure Blob File System.

+ 20 - 18
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -367,7 +367,7 @@ public class AbfsBlobClient extends AbfsClient {
       abfsUriQueryBuilder.addQuery(QUERY_PARAM_DELIMITER, FORWARD_SLASH);
     }
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAX_RESULTS, String.valueOf(listMaxResults));
-    appendSASTokenToQuery(relativePath, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION_BLOB, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
@@ -555,11 +555,14 @@ public class AbfsBlobClient extends AbfsClient {
       final ContextEncryptionAdapter contextEncryptionAdapter,
       final TracingContext tracingContext) throws AzureBlobFileSystemException {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
+    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     if (isFile) {
       addEncryptionKeyRequestHeaders(path, requestHeaders, true,
           contextEncryptionAdapter, tracingContext);
+      appendSASTokenToQuery(path, SASTokenProvider.CREATE_FILE_OPERATION, abfsUriQueryBuilder);
     } else {
       requestHeaders.add(new AbfsHttpHeader(X_MS_META_HDI_ISFOLDER, TRUE));
+      appendSASTokenToQuery(path, SASTokenProvider.CREATE_DIRECTORY_OPERATION, abfsUriQueryBuilder);
     }
     requestHeaders.add(new AbfsHttpHeader(CONTENT_LENGTH, ZERO));
     if (isAppendBlob) {
@@ -574,9 +577,6 @@ public class AbfsBlobClient extends AbfsClient {
       requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
     }
 
-    final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
-    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
-
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
         AbfsRestOperationType.PutBlob,
@@ -698,7 +698,7 @@ public class AbfsBlobClient extends AbfsClient {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
-    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.LEASE_BLOB_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
@@ -726,7 +726,7 @@ public class AbfsBlobClient extends AbfsClient {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
-    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.LEASE_BLOB_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
@@ -754,7 +754,7 @@ public class AbfsBlobClient extends AbfsClient {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
-    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.LEASE_BLOB_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
@@ -781,7 +781,7 @@ public class AbfsBlobClient extends AbfsClient {
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, LEASE);
-    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.LEASE_BLOB_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
@@ -829,6 +829,8 @@ public class AbfsBlobClient extends AbfsClient {
       if (blobRenameHandler.execute(false)) {
         final AbfsUriQueryBuilder abfsUriQueryBuilder
             = createDefaultUriQueryBuilder();
+        appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION,
+            abfsUriQueryBuilder);
         final URL url = createRequestUrl(destination,
             abfsUriQueryBuilder.toString());
         final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
@@ -902,7 +904,7 @@ public class AbfsBlobClient extends AbfsClient {
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCK);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOCKID, reqParams.getBlockId());
 
-    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION,
+    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
@@ -975,7 +977,7 @@ public class AbfsBlobClient extends AbfsClient {
     }
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, APPEND_BLOCK);
-    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
@@ -1067,7 +1069,7 @@ public class AbfsBlobClient extends AbfsClient {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
-    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION,
+    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
@@ -1129,7 +1131,7 @@ public class AbfsBlobClient extends AbfsClient {
 
     AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, METADATA);
-    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+    appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
     final AbfsRestOperation op = getAbfsRestOperation(
@@ -1208,7 +1210,7 @@ public class AbfsBlobClient extends AbfsClient {
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN,
         String.valueOf(getAbfsConfiguration().isUpnUsed()));
-    appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION,
+    appendSASTokenToQuery(path, SASTokenProvider.GET_PROPERTIES_OPERATION,
         abfsUriQueryBuilder);
 
     final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
@@ -1287,7 +1289,7 @@ public class AbfsBlobClient extends AbfsClient {
     }
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
-    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.FIXED_SAS_STORE_OPERATION,
+    String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
         abfsUriQueryBuilder, cachedSasToken);
 
     URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
@@ -1449,7 +1451,7 @@ public class AbfsBlobClient extends AbfsClient {
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
 
     final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
-    String operation = SASTokenProvider.FIXED_SAS_STORE_OPERATION;
+    String operation = SASTokenProvider.READ_OPERATION;
     appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
 
     abfsUriQueryBuilder.addQuery(QUERY_PARAM_COMP, BLOCKLIST);
@@ -1487,9 +1489,9 @@ public class AbfsBlobClient extends AbfsClient {
     String dstBlobRelativePath = destinationBlobPath.toUri().getPath();
     String srcBlobRelativePath = sourceBlobPath.toUri().getPath();
     appendSASTokenToQuery(dstBlobRelativePath,
-        SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilderDst);
+        SASTokenProvider.COPY_BLOB_DST_OPERATION, abfsUriQueryBuilderDst);
     appendSASTokenToQuery(srcBlobRelativePath,
-        SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilderSrc);
+        SASTokenProvider.COPY_BLOB_SRC_OPERATION, abfsUriQueryBuilderSrc);
     final URL url = createRequestUrl(dstBlobRelativePath,
         abfsUriQueryBuilderDst.toString());
     final String sourcePathUrl = createRequestUrl(srcBlobRelativePath,
@@ -1523,7 +1525,7 @@ public class AbfsBlobClient extends AbfsClient {
     AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
     String blobRelativePath = blobPath.toUri().getPath();
     appendSASTokenToQuery(blobRelativePath,
-        SASTokenProvider.FIXED_SAS_STORE_OPERATION, abfsUriQueryBuilder);
+        SASTokenProvider.DELETE_OPERATION, abfsUriQueryBuilder);
     final URL url = createRequestUrl(blobRelativePath,
         abfsUriQueryBuilder.toString());
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();

+ 2 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java

@@ -62,7 +62,8 @@ public final class AbfsErrors {
   /**
    * Exception message on filesystem init if token-provider-auth-type configs are provided
    */
-  public static final String UNAUTHORIZED_SAS = "Incorrect SAS token provider configured for non-hierarchical namespace account.";
+  public static final String UNAUTHORIZED_SAS
+      = "Incorrect SAS token provider configured for non-hierarchical namespace account with DFS service type.";
   public static final String ERR_RENAME_BLOB =
       "FNS-Blob rename was not successful for source and destination path: ";
   public static final String ERR_DELETE_BLOB =

+ 13 - 4
hadoop-tools/hadoop-azure/src/site/markdown/index.md

@@ -652,13 +652,17 @@ To know more about how SAS Authentication works refer to
 [Grant limited access to Azure Storage resources using shared access signatures (SAS)](https://learn.microsoft.com/en-us/azure/storage/common/storage-sas-overview)
 
 There are three types of SAS supported by Azure Storage:
-- [User Delegation SAS](https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas): Recommended for use with ABFS Driver with HNS Enabled ADLS Gen2 accounts. It is Identity based SAS that works at blob/directory level)
+- [User Delegation SAS](https://learn.microsoft.com/en-us/rest/api/storageservices/create-user-delegation-sas):
+  SAS-based authentication works with HNS-enabled ADLS Gen2 accounts
+  (recommended for use with ABFS) and is also supported with non-HNS (FNS) Blob
+  accounts. However, it is **NOT SUPPORTED** with FNS-DFS accounts.
 - [Service SAS](https://learn.microsoft.com/en-us/rest/api/storageservices/create-service-sas): Global and works at container level.
 - [Account SAS](https://learn.microsoft.com/en-us/rest/api/storageservices/create-account-sas): Global and works at account level.
 
 #### Known Issues With SAS
-- SAS Based Authentication works only with HNS Enabled ADLS Gen2 Accounts which
-is a recommended account type to be used with ABFS.
+- SAS Based Authentication works with HNS Enabled ADLS Gen2 Accounts (which
+  is a recommended account type to be used with ABFS). It is also supported with
+  non-HNS (FNS) Blob accounts. It is **NOT SUPPORTED** with FNS-DFS accounts.
 - Certain root level operations are known to fail with SAS Based Authentication.
 
 #### Using User Delegation SAS with ABFS
@@ -1465,7 +1469,12 @@ Once the above properties are configured, `hdfs dfs -ls abfs://container1@abfswa
 
 Following failures are known and expected to fail as of now.
 1. AzureBlobFileSystem.setXAttr() and AzureBlobFileSystem.getXAttr() will fail when attempted on root ("/") path with `Operation failed: "The request URI is invalid.", HTTP 400 Bad Request`
-
+2. If you're using user-delegation SAS authentication:
+    - Listing operation for HNS accounts (on DFS endpoint) works with SAS token supporting either blob or directory
+      scopes (Signed Resource Type as Blob or Directory),
+      though it is intended to work only at the directory scope. It is a known bug.
+    - AzureBlobFileSystem.getFileStatus() is expected to fail at root ("/") path with
+      `Operation failed: "Server failed to authenticate the request.", HTTP 401 Unauthorized Error`
 ## <a name="testing"></a> Testing ABFS
 
 See the relevant section in [Testing Azure](testing_azure.html).

+ 154 - 11
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java

@@ -23,12 +23,14 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.AccessDeniedException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Hashtable;
 import java.util.List;
 import java.util.UUID;
 
 import org.assertj.core.api.Assertions;
 import org.junit.Assume;
 import org.junit.Test;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,10 +42,14 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
 import org.apache.hadoop.fs.azurebfs.constants.TestConfigurationKeys;
+import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
 import org.apache.hadoop.fs.azurebfs.extensions.MockDelegationSASTokenProvider;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
+import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AuthType;
-import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntryScope;
 import org.apache.hadoop.fs.permission.AclStatus;
@@ -91,7 +97,9 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   public void setup() throws Exception {
     isHNSEnabled = this.getConfiguration().getBoolean(
         TestConfigurationKeys.FS_AZURE_TEST_NAMESPACE_ENABLED_ACCOUNT, false);
-    Assume.assumeTrue(isHNSEnabled);
+    if (!isHNSEnabled) {
+      assumeBlobServiceType();
+    }
     createFilesystemForSASTests();
     super.setup();
   }
@@ -99,6 +107,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   @Test
   // Test filesystem operations access, create, mkdirs, setOwner, getFileStatus
   public void testCheckAccess() throws Exception {
+    assumeHnsEnabled();
     final AzureBlobFileSystem fs = getFileSystem();
 
     Path rootPath = new Path("/");
@@ -217,6 +226,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
 
   @Test
   public void checkExceptionForRenameOverwrites() throws Exception {
+    assumeHnsEnabled();
     final AzureBlobFileSystem fs = getFileSystem();
     Path src = new Path("a/b/f1.txt");
     Path dest = new Path("a/b/f2.txt");
@@ -297,6 +307,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
     final AzureBlobFileSystem fs = getFileSystem();
     Path dirPath = new Path(UUID.randomUUID().toString());
     Path filePath = new Path(dirPath, UUID.randomUUID().toString());
+    Path filePath2 = new Path(dirPath, UUID.randomUUID().toString());
 
     fs.mkdirs(dirPath);
 
@@ -305,6 +316,11 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
       stream.writeBytes("hello");
     }
 
+    // create file with content "bye"
+    try (FSDataOutputStream stream = fs.create(filePath2)) {
+      stream.writeBytes("bye");
+    }
+
     fs.listStatus(filePath);
     fs.listStatus(dirPath);
     fs.listStatus(new Path("/"));
@@ -314,6 +330,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   // Test filesystem operations setAcl, getAclStatus, removeAcl
   // setPermissions and getFileStatus
   public void testAcl() throws Exception {
+    assumeHnsEnabled();
     final AzureBlobFileSystem fs = getFileSystem();
     Path reqPath = new Path(UUID.randomUUID().toString());
 
@@ -343,6 +360,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   @Test
   // Test getFileStatus and getAclStatus operations on root path
   public void testRootPath() throws Exception {
+    assumeHnsEnabled();
     final AzureBlobFileSystem fs = getFileSystem();
     Path rootPath = new Path(AbfsHttpConstants.ROOT_PATH);
 
@@ -410,16 +428,77 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   }
 
   @Test
-  public void testSignatureMask() throws Exception {
+  // FileSystemProperties are not supported by delegation SAS and should throw exception
+  public void testSetFileSystemProperties() throws Exception {
     final AzureBlobFileSystem fs = getFileSystem();
-    String src = String.format("/testABC/test%s.xt", UUID.randomUUID());
-    fs.create(new Path(src)).close();
-    AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
-        .renamePath(src, "/testABC" + "/abc.txt", null,
-            getTestTracingContext(fs, false), null,
-            false)
-        .getOp();
-    AbfsHttpOperation result = abfsHttpRestOperation.getResult();
+    final Hashtable<String, String>
+        properties = new Hashtable<>();
+    properties.put("FileSystemProperties", "true");
+    TracingContext tracingContext = getTestTracingContext(fs, true);
+    assertThrows(IOException.class, () -> fs.getAbfsStore()
+        .setFilesystemProperties(properties, tracingContext));
+    assertThrows(IOException.class,
+        () -> fs.getAbfsStore().getFilesystemProperties(tracingContext));
+  }
+
+  @Test
+  //Test list and delete operation on implicit paths
+  public void testListAndDeleteImplicitPaths() throws Exception {
+    AzureBlobFileSystem fs = getFileSystem();
+    AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient());
+    assumeBlobServiceType();
+
+    Path file1 = new Path("/testDir/dir1/file1");
+    Path file2 = new Path("/testDir/dir1/file2");
+    Path implicitDir = file1.getParent();
+
+    createAzCopyFolder(implicitDir);
+    createAzCopyFile(file1);
+    createAzCopyFile(file2);
+
+    AbfsRestOperation op = client.listPath(
+        implicitDir.toString(), false, 2, null,
+        getTestTracingContext(getFileSystem(), false), null, false).getOp();
+    List<? extends ListResultEntrySchema> list = op.getResult()
+        .getListResultSchema()
+        .paths();
+    Assertions.assertThat(list).hasSize(2);
+
+    client.deletePath(implicitDir.toString(), true, "",
+        getTestTracingContext(fs, false));
+
+    Assertions.assertThat(fs.exists(file1))
+        .describedAs("Deleted file1 should not exist.").isFalse();
+    Assertions.assertThat(fs.exists(file2))
+        .describedAs("Deleted file2 should not exist.").isFalse();
+    Assertions.assertThat(fs.exists(implicitDir))
+        .describedAs("The parent dir should not exist.")
+        .isFalse();
+  }
+
+
+  /**
+   * Spies on the AzureBlobFileSystem's store and client to enable mocking and verification
+   * of client interactions in tests. It replaces the actual store and client with mocked versions.
+   *
+   * @param fs the AzureBlobFileSystem instance
+   * @return the spied AbfsClient for interaction verification
+   */
+  private AbfsClient addSpyHooksOnClient(final AzureBlobFileSystem fs) {
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    AbfsClient client = Mockito.spy(store.getClient());
+    Mockito.doReturn(client).when(store).getClient();
+    return client;
+  }
+
+  /**
+   * Asserts the signature masking in the URL and encoded URL of the AbfsRestOperation.
+   *
+   * @param op the AbfsRestOperation
+   */
+  private void checkSignatureMaskAssertions(AbfsRestOperation op) {
+    AbfsHttpOperation result = op.getResult();
     String url = result.getMaskedUrl();
     String encodedUrl = result.getMaskedEncodedUrl();
     Assertions.assertThat(url.substring(url.indexOf("sig=")))
@@ -430,6 +509,68 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
         .startsWith("sig%3DXXXXX");
   }
 
+  @Test
+  // Test masking of signature for rename operation for Blob
+  public void testSignatureMaskforBlob() throws Exception {
+    assumeBlobServiceType();
+    final AzureBlobFileSystem fs = Mockito.spy(this.getFileSystem());
+    AbfsBlobClient client = (AbfsBlobClient) addSpyHooksOnClient(fs);
+
+    fs.getAbfsStore().setClient(client);
+    String src = String.format("/testABC/test%s.xt", UUID.randomUUID());
+    String dest = "/testABC" + "/abc.txt";
+    fs.create(new Path(src)).close();
+
+    Mockito.doAnswer(answer -> {
+          Path srcCopy = answer.getArgument(0);
+          Path dstCopy = answer.getArgument(1);
+          String leaseId = answer.getArgument(2);
+          TracingContext tracingContext = answer.getArgument(3);
+          AbfsRestOperation op
+              = ((AbfsBlobClient) getFileSystem().getAbfsClient()).copyBlob(srcCopy,
+              dstCopy, leaseId, tracingContext);
+          checkSignatureMaskAssertions(op);
+          return answer.callRealMethod();
+        })
+        .when(client)
+        .copyBlob(Mockito.any(Path.class), Mockito.any(Path.class),
+            Mockito.any(String.class), Mockito.any(TracingContext.class));
+
+    Mockito.doAnswer(answer -> {
+          Path blobPath = answer.getArgument(0);
+          String leaseId = answer.getArgument(1);
+          TracingContext tracingContext = answer.getArgument(2);
+          AbfsRestOperation op
+              = ((AbfsBlobClient) getFileSystem().getAbfsClient()).deleteBlobPath(
+              blobPath,
+              leaseId, tracingContext);
+          checkSignatureMaskAssertions(op);
+          return answer.callRealMethod();
+        })
+        .when(client)
+        .deleteBlobPath(Mockito.any(Path.class), Mockito.any(String.class),
+            Mockito.any(TracingContext.class));
+
+    client.renamePath(src, dest, null,
+        getTestTracingContext(fs, false), null,
+        false);
+  }
+
+  // Test masking of signature for rename operation for DFS
+  @Test
+  public void testSignatureMask() throws Exception {
+    assumeDfsServiceType();
+    final AzureBlobFileSystem fs = getFileSystem();
+    String src = String.format("/testABC/test%s.xt", UUID.randomUUID());
+    fs.create(new Path(src)).close();
+    AbfsRestOperation abfsHttpRestOperation = fs.getAbfsClient()
+        .renamePath(src, "/testABC" + "/abc.txt", null,
+            getTestTracingContext(fs, false), null,
+            false)
+        .getOp();
+    checkSignatureMaskAssertions(abfsHttpRestOperation);
+  }
+
   @Test
   public void testSignatureMaskOnExceptionMessage() throws Exception {
     intercept(IOException.class, "sig=XXXX",
@@ -442,6 +583,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   @Test
   // SetPermission should fail when saoid is not the owner and succeed when it is.
   public void testSetPermissionForNonOwner() throws Exception {
+    assumeHnsEnabled();
     final AzureBlobFileSystem fs = getFileSystem();
 
     Path rootPath = new Path("/");
@@ -477,6 +619,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   @Test
   // Without saoid or suoid, setPermission should succeed with sp=p for a non-owner.
   public void testSetPermissionWithoutAgentForNonOwner() throws Exception {
+    assumeHnsEnabled();
     final AzureBlobFileSystem fs = getFileSystem();
     Path path = new Path(MockDelegationSASTokenProvider.NO_AGENT_PATH);
     fs.create(path).close();

+ 12 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DelegationSASGenerator.java

@@ -24,6 +24,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.services.AbfsUriQueryBuilder;
 
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 
 /**
  * Test Delegation SAS generator.
@@ -60,6 +61,8 @@ public class DelegationSASGenerator extends SASGenerator {
       case SASTokenProvider.CREATE_DIRECTORY_OPERATION:
       case SASTokenProvider.WRITE_OPERATION:
       case SASTokenProvider.SET_PROPERTIES_OPERATION:
+      case SASTokenProvider.LEASE_BLOB_OPERATION:
+      case SASTokenProvider.COPY_BLOB_DST_OPERATION:
         sp = "w";
         break;
       case SASTokenProvider.DELETE_OPERATION:
@@ -68,18 +71,25 @@ public class DelegationSASGenerator extends SASGenerator {
       case SASTokenProvider.DELETE_RECURSIVE_OPERATION:
         sp = "d";
         sr = "d";
-        sdd = Integer.toString(StringUtils.countMatches(path, "/"));
+        sdd = path.equals(ROOT_PATH)? "0": Integer.toString(StringUtils.countMatches(path, "/"));
         break;
       case SASTokenProvider.CHECK_ACCESS_OPERATION:
       case SASTokenProvider.GET_ACL_OPERATION:
       case SASTokenProvider.GET_STATUS_OPERATION:
         sp = "e";
         break;
+      case SASTokenProvider.LIST_OPERATION_BLOB:
+        sp = "l";
+        sr = "c";
+        break;
       case SASTokenProvider.LIST_OPERATION:
         sp = "l";
+        sr = "d";
+        sdd = path.equals(ROOT_PATH)? "0": Integer.toString(StringUtils.countMatches(path, "/"));
         break;
       case SASTokenProvider.GET_PROPERTIES_OPERATION:
       case SASTokenProvider.READ_OPERATION:
+      case SASTokenProvider.COPY_BLOB_SRC_OPERATION:
         sp = "r";
         break;
       case SASTokenProvider.RENAME_DESTINATION_OPERATION:
@@ -189,4 +199,4 @@ public class DelegationSASGenerator extends SASGenerator {
     LOG.debug("Delegation SAS stringToSign: " + stringToSign.replace("\n", "."));
     return computeHmac256(stringToSign);
   }
-}
+}