Parcourir la source

HADOOP-19572. [ABFS][BugFix] Empty Page Issue on Subsequent ListBlob call with NextMarker (#7698) (#7704)

Contributed by Anuj Modi
Reviewed by Anmol Asrani, Manish Bhatt, Manika Joshi

Signed off by Anuj Mod<anujmodi@apache.org>
Anuj Modi il y a 6 jours
Parent
commit
4e14cd9fb2

+ 2 - 9
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -76,7 +76,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityExc
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
-import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
 import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -118,7 +117,6 @@ import org.apache.hadoop.fs.azurebfs.utils.Base64;
 import org.apache.hadoop.fs.azurebfs.utils.CRC64;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
-import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 import org.apache.hadoop.fs.impl.BackReference;
@@ -1298,13 +1296,8 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       }
     } while (shouldContinue);
 
-    if (listingClient instanceof AbfsBlobClient) {
-      fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
-      LOG.debug("ListBlob API returned a total of {} elements including duplicates."
-          + "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
-    } else {
-      fileStatuses.addAll(fileStatusList);
-    }
+    fileStatuses.addAll(listingClient.postListProcessing(
+        relativePath, fileStatusList, tracingContext, uri));
 
     return continuation;
   }

+ 45 - 47
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -51,6 +51,7 @@ import org.xml.sax.SAXException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
@@ -77,6 +78,7 @@ import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
 import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
 import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
 import org.apache.hadoop.fs.azurebfs.security.ContextEncryptionAdapter;
+import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 
 import static java.net.HttpURLConnection.HTTP_CONFLICT;
@@ -348,13 +350,9 @@ public class AbfsBlobClient extends AbfsClient {
    */
   @Override
   public ListResponseData listPath(final String relativePath, final boolean recursive,
-      final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri) throws IOException {
-    return listPath(relativePath, recursive, listMaxResults, continuation, tracingContext, uri, true);
-  }
-
-  public ListResponseData listPath(final String relativePath, final boolean recursive,
-      final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri, boolean is404CheckRequired)
+      final int listMaxResults, final String continuation, TracingContext tracingContext, URI uri)
       throws AzureBlobFileSystemException {
+
     final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
 
     AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
@@ -400,37 +398,46 @@ public class AbfsBlobClient extends AbfsClient {
         listResponseData.setOp(retryListOp);
       }
     }
+    return listResponseData;
+  }
 
-    if (isEmptyListResults(listResponseData) && is404CheckRequired) {
+  /**
+   * Post-processing of the list operation on Blob endpoint.
+   * There are two client handing to be done on list output.
+   * 1. Empty List returned on server could potentially mean path is a file.
+   * 2. There can be duplicates returned from the server for explicit non-empty directory.
+   * @param relativePath relative path to be listed.
+   * @param fileStatuses list of file statuses returned from the server.
+   * @param tracingContext tracing context to trace server calls.
+   * @param uri URI to be used for path conversion.
+   * @return rectified list of file statuses.
+   * @throws AzureBlobFileSystemException if any failure occurs.
+   */
+  @Override
+  public List<FileStatus> postListProcessing(String relativePath, List<FileStatus> fileStatuses,
+      TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException {
+    List<FileStatus> rectifiedFileStatuses = new ArrayList<>();
+    if (fileStatuses.isEmpty() && !ROOT_PATH.equals(relativePath)) {
       // If the list operation returns no paths, we need to check if the path is a file.
       // If it is a file, we need to return the file in the list.
+      // If it is a directory or root path, we need to return an empty list.
       // If it is a non-existing path, we need to throw a FileNotFoundException.
-      if (relativePath.equals(ROOT_PATH)) {
-        // Root Always exists as directory. It can be an empty listing.
-        return listResponseData;
-      }
       AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
       BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
-      LOG.debug("ListBlob attempted on a file path. Returning file status.");
-      List<VersionedFileStatus> fileStatusList = new ArrayList<>();
+      LOG.debug("ListStatus attempted on a file path {}. Returning file status.", relativePath);
       for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
-        fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
+        rectifiedFileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
       }
-      AbfsRestOperation listOp = getAbfsRestOperation(
-          AbfsRestOperationType.ListBlobs,
-          HTTP_METHOD_GET,
-          url,
-          requestHeaders);
-      listOp.hardSetGetListStatusResult(HTTP_OK, listResultSchema);
-      listResponseData.setFileStatusList(fileStatusList);
-      listResponseData.setContinuationToken(null);
-      listResponseData.setRenamePendingJsonPaths(null);
-      listResponseData.setOp(listOp);
-      return listResponseData;
+    } else {
+      // Remove duplicates from the non-empty list output only.
+      rectifiedFileStatuses.addAll(ListUtils.getUniqueListResult(fileStatuses));
+      LOG.debug(
+          "ListBlob API returned a total of {} elements including duplicates."
+              + "Number of unique Elements are {}", fileStatuses.size(),
+          rectifiedFileStatuses.size());
     }
-    return listResponseData;
+    return rectifiedFileStatuses;
   }
-
   /**
    * Filter the paths for which no rename redo operation is performed.
    * Update BlobListResultSchema path with filtered entries.
@@ -2013,6 +2020,8 @@ public class AbfsBlobClient extends AbfsClient {
 
   /**
    * Checks if the listing of the specified path is non-empty.
+   * Since listing is incomplete as long as continuation token is returned by server,
+   * we need to iterate until either we get one entry or continuation token becomes null.
    *
    * @param path The path to be listed.
    * @param tracingContext The tracing context for tracking the operation.
@@ -2024,26 +2033,15 @@ public class AbfsBlobClient extends AbfsClient {
       TracingContext tracingContext) throws AzureBlobFileSystemException {
     // This method is only called internally to determine state of a path
     // and hence don't need identity transformation to happen.
-    ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null, false);
-    return !isEmptyListResults(listResponseData);
-  }
-
-  /**
-   * Check if the list call returned empty results without any continuation token.
-   * @param listResponseData The response of listing API from the server.
-   * @return True if empty results without continuation token.
-   */
-  private boolean isEmptyListResults(ListResponseData listResponseData) {
-    AbfsHttpOperation result = listResponseData.getOp().getResult();
-    boolean isEmptyList = result != null && result.getStatusCode() == HTTP_OK && // List Call was successful
-        result.getListResultSchema() != null && // Parsing of list response was successful
-        listResponseData.getFileStatusList().isEmpty() && listResponseData.getRenamePendingJsonPaths().isEmpty() &&// No paths were returned
-        StringUtils.isEmpty(listResponseData.getContinuationToken()); // No continuation token was returned
-    if (isEmptyList) {
-      LOG.debug("List call returned empty results without any continuation token.");
-      return true;
-    }
-    return false;
+    String continuationToken = null;
+    List<FileStatus> fileStatusList = new ArrayList<>();
+    // We need to loop on continuation token until we get an entry or continuation token becomes null.
+    do {
+      ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null);
+      fileStatusList.addAll(listResponseData.getFileStatusList());
+      continuationToken = listResponseData.getContinuationToken();
+    } while (StringUtils.isNotEmpty(continuationToken) && fileStatusList.isEmpty());
+    return !fileStatusList.isEmpty();
   }
 
   /**

+ 35 - 35
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

@@ -51,6 +51,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
@@ -164,6 +165,8 @@ public abstract class AbfsClient implements Closeable {
   private String clientProvidedEncryptionKey = null;
   private String clientProvidedEncryptionKeySHA = null;
   private final IdentityTransformerInterface identityTransformer;
+  private final String userName;
+  private String primaryUserGroup;
 
   private final String accountName;
   private final AuthType authType;
@@ -305,6 +308,22 @@ public abstract class AbfsClient implements Closeable {
       throw new IOException(e);
     }
     LOG.trace("IdentityTransformer init complete");
+
+    UserGroupInformation userGroupInformation = UserGroupInformation.getCurrentUser();
+    this.userName = userGroupInformation.getShortUserName();
+    LOG.trace("UGI init complete");
+    if (!abfsConfiguration.getSkipUserGroupMetadataDuringInitialization()) {
+      try {
+        this.primaryUserGroup = userGroupInformation.getPrimaryGroupName();
+      } catch (IOException ex) {
+        LOG.error("Failed to get primary group for {}, using user name as primary group name", userName);
+        this.primaryUserGroup = userName;
+      }
+    } else {
+      //Provide a default group name
+      this.primaryUserGroup = userName;
+    }
+    LOG.trace("primaryUserGroup is {}", this.primaryUserGroup);
   }
 
   public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
@@ -528,6 +547,18 @@ public abstract class AbfsClient implements Closeable {
   public abstract ListResponseData listPath(String relativePath, boolean recursive,
       int listMaxResults, String continuation, TracingContext tracingContext, URI uri) throws IOException;
 
+  /**
+   * Post-processing of the list operation.
+   * @param relativePath which is used to list the blobs.
+   * @param fileStatuses list of file statuses to be processed.
+   * @param tracingContext for tracing the server calls.
+   * @param uri to be used for the path conversion.
+   * @return list of file statuses to be returned.
+   * @throws AzureBlobFileSystemException if rest operation fails.
+   */
+  public abstract List<FileStatus> postListProcessing(String relativePath,
+      List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri) throws AzureBlobFileSystemException;
+
   /**
    * Retrieves user-defined metadata on filesystem.
    * @param tracingContext for tracing the server calls.
@@ -1776,37 +1807,6 @@ public abstract class AbfsClient implements Closeable {
     return successOp;
   }
 
-  /**
-   * Get the primary user group name.
-   * @return primary user group name
-   * @throws AzureBlobFileSystemException if unable to get the primary user group
-   */
-  private String getPrimaryUserGroup() throws AzureBlobFileSystemException {
-    if (!getAbfsConfiguration().getSkipUserGroupMetadataDuringInitialization()) {
-      try {
-        return UserGroupInformation.getCurrentUser().getPrimaryGroupName();
-      } catch (IOException ex) {
-        LOG.error("Failed to get primary group for {}, using user name as primary group name",
-            getPrimaryUser());
-      }
-    }
-    //Provide a default group name
-    return getPrimaryUser();
-  }
-
-  /**
-   * Get the primary username.
-   * @return primary username
-   * @throws AzureBlobFileSystemException if unable to get the primary user
-   */
-  private String getPrimaryUser() throws AzureBlobFileSystemException {
-    try {
-      return UserGroupInformation.getCurrentUser().getUserName();
-    } catch (IOException ex) {
-      throw new AbfsDriverException(ex);
-    }
-  }
-
   /**
    * Creates a VersionedFileStatus object from the ListResultEntrySchema.
    * @param entry ListResultEntrySchema object.
@@ -1820,10 +1820,10 @@ public abstract class AbfsClient implements Closeable {
     String owner = null, group = null;
     try{
       if (identityTransformer != null) {
-        owner = identityTransformer.transformIdentityForGetRequest(
-            entry.owner(), true, getPrimaryUser());
-        group = identityTransformer.transformIdentityForGetRequest(
-            entry.group(), false, getPrimaryUserGroup());
+        owner = identityTransformer.transformIdentityForGetRequest(entry.owner(),
+            true, userName);
+        group = identityTransformer.transformIdentityForGetRequest(entry.group(),
+            false, primaryUserGroup);
       }
     } catch (IOException ex) {
       LOG.error("Failed to get owner/group for path {}", entry.name(), ex);

+ 16 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

@@ -45,6 +45,7 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -348,6 +349,21 @@ public class AbfsDfsClient extends AbfsClient {
     return listResponseData;
   }
 
+  /**
+   * Non-functional implementation.
+   * Client side handling to remove duplicates not needed in DFSClient.
+   * @param relativePath on which listing was attempted.
+   * @param fileStatuses result of listing operation.
+   * @param tracingContext for tracing the server calls.
+   * @param uri to be used for path conversion.
+   * @return fileStatuses as it is without any processing.
+   */
+  @Override
+  public List<FileStatus> postListProcessing(String relativePath,
+      List<FileStatus> fileStatuses, TracingContext tracingContext, URI uri){
+    return fileStatuses;
+  }
+
   /**
    * Get Rest Operation for API
    * <a href="https://learn.microsoft.com/en-us/rest/api/storageservices/datalakestoragegen2/path/create">

+ 2 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelegationSAS.java

@@ -445,8 +445,8 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
   //Test list and delete operation on implicit paths
   public void testListAndDeleteImplicitPaths() throws Exception {
     AzureBlobFileSystem fs = getFileSystem();
-    AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient());
     assumeBlobServiceType();
+    AbfsBlobClient client = ((AbfsBlobClient) getFileSystem().getAbfsClient());
 
     Path file1 = new Path("/testDir/dir1/file1");
     Path file2 = new Path("/testDir/dir1/file2");
@@ -458,7 +458,7 @@ public class ITestAzureBlobFileSystemDelegationSAS extends AbstractAbfsIntegrati
 
     AbfsRestOperation op = client.listPath(
         implicitDir.toString(), false, 2, null,
-        getTestTracingContext(getFileSystem(), false), null, false).getOp();
+        getTestTracingContext(getFileSystem(), false), null).getOp();
     List<? extends ListResultEntrySchema> list = op.getResult()
         .getListResultSchema()
         .paths();

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java

@@ -316,7 +316,7 @@ public class ITestAzureBlobFileSystemDelete extends
       doCallRealMethod().when((AbfsBlobClient) mockClient)
               .listPath(Mockito.nullable(String.class), Mockito.anyBoolean(),
                       Mockito.anyInt(), Mockito.nullable(String.class),
-                      Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class), Mockito.anyBoolean());
+                      Mockito.nullable(TracingContext.class), Mockito.nullable(URI.class));
       doCallRealMethod().when((AbfsBlobClient) mockClient)
               .getPathStatus(Mockito.nullable(String.class), Mockito.nullable(TracingContext.class),
                       Mockito.nullable(ContextEncryptionAdapter.class), Mockito.anyBoolean());

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemFileStatus.java

@@ -259,7 +259,7 @@ public class ITestAzureBlobFileSystemFileStatus extends
     fs.getFileStatus(implicitPath);
 
     Mockito.verify(abfsClient, Mockito.times(1)).getPathStatus(any(), eq(false), any(), any());
-    Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any(), eq(false));
+    Mockito.verify(abfsClient, Mockito.times(1)).listPath(any(), eq(false), eq(1), any(), any(), any());
   }
 
   /**

+ 100 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java

@@ -51,10 +51,12 @@ import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientHandler;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpHeader;
 import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
+import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
 import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperationType;
 import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
 import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
+import org.apache.hadoop.fs.azurebfs.services.VersionedFileStatus;
 import org.apache.hadoop.fs.azurebfs.utils.DirectoryStateHelper;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
@@ -62,6 +64,7 @@ import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 
 import static java.net.HttpURLConnection.HTTP_OK;
+import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PUT;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TRUE;
@@ -373,7 +376,104 @@ public class ITestAzureBlobFileSystemListStatus extends
         + " throw IllegalArgumentException", exceptionThrown);
   }
 
+  @Test
+  public void testEmptyListingInSubsequentCall() throws IOException {
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, EMPTY_STRING,
+        true, 1, 0);
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, EMPTY_STRING,
+        false, 1, 0);
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, TEST_CONTINUATION_TOKEN,
+        true, 1, 0);
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, true, TEST_CONTINUATION_TOKEN,
+        false, 1, 0);
+
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
+        true, 2, 0);
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
+        false, 2, 1);
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+        true, 3, 0);
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+        false, 3, 1);
+
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING,
+        true, 1, 1);
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING,
+        false, 1, 1);
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, TEST_CONTINUATION_TOKEN,
+        true, 1, 1);
+    testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, TEST_CONTINUATION_TOKEN,
+        false, 1, 1);
+
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
+        true, 2, 1);
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
+        false, 2, 2);
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+        true, 3, 1);
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+        false, 3, 2);
+  }
 
+  private void testEmptyListingInSubsequentCallInternal(String firstCT,
+      boolean isfirstEmpty, String secondCT, boolean isSecondEmpty,
+      int expectedInvocations, int expectedSize) throws IOException {
+    assumeBlobServiceType();
+    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+    spiedStore.getAbfsConfiguration().setListMaxResults(1);
+    AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler().getBlobClient());
+    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+    spiedFs.mkdirs(new Path("/testPath"));
+    VersionedFileStatus status1 = new VersionedFileStatus(
+        "owner", "group", null, false, 0, false, 0, 0, 0,
+        new Path("/testPath/file1"), "version", "encryptionContext");
+    VersionedFileStatus status2 = new VersionedFileStatus(
+        "owner", "group", null, false, 0, false, 0, 0, 0,
+        new Path("/testPath/file2"), "version", "encryptionContext");
+
+    List<VersionedFileStatus> mockedList1 = new ArrayList<>();
+    mockedList1.add(status1);
+    List<VersionedFileStatus> mockedList2 = new ArrayList<>();
+    mockedList2.add(status2);
+
+    ListResponseData listResponseData1 = new ListResponseData();
+    listResponseData1.setContinuationToken(firstCT);
+    listResponseData1.setFileStatusList(isfirstEmpty ? new ArrayList<>() : mockedList1);
+    listResponseData1.setOp(Mockito.mock(AbfsRestOperation.class));
+
+    ListResponseData listResponseData2 = new ListResponseData();
+    listResponseData2.setContinuationToken(secondCT);
+    listResponseData2.setFileStatusList(isSecondEmpty ? new ArrayList<>() : mockedList2);
+    listResponseData2.setOp(Mockito.mock(AbfsRestOperation.class));
+
+    ListResponseData listResponseData3 = new ListResponseData();
+    listResponseData3.setContinuationToken(EMPTY_STRING);
+    listResponseData3.setFileStatusList(new ArrayList<>());
+    listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class));
+
+    Mockito.doReturn(listResponseData1).doReturn(listResponseData2).doReturn(listResponseData3)
+        .when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
+        any(), any(), any());
+
+    FileStatus[] list = spiedFs.listStatus(new Path("/testPath"));
+
+    Mockito.verify(spiedClient, times(expectedInvocations))
+        .listPath(eq("/testPath"), eq(false), eq(1),
+        any(), any(TracingContext.class), any());
+    Mockito.verify(spiedClient, times(1))
+        .postListProcessing(eq("/testPath"), any(), any(), any());
+    Assertions.assertThat(list).hasSize(expectedSize);
+
+    if (expectedSize == 0) {
+      Mockito.verify(spiedClient, times(1))
+          .getPathStatus(eq("/testPath"), any(), eq(null), eq(false));
+    } else {
+      Mockito.verify(spiedClient, times(0))
+          .getPathStatus(eq("/testPath"), any(), eq(null), eq(false));
+    }
+  }
 
   /**
    * Test to verify that listStatus returns the correct file status all types

+ 40 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/contract/TestBlobListXmlParser.java

@@ -101,7 +101,7 @@ public class TestBlobListXmlParser {
         + "<OrMetadata />"
         + "</Blob>"
         + "</Blobs>"
-        + "<NextMarker />"
+        + "<NextMarker>TEST_CONTINUATION_TOKEN</NextMarker>"
         + "</EnumerationResults>";
     BlobListResultSchema listResultSchema = getResultSchema(xmlResponseWithDelimiter);
     List<BlobListResultEntrySchema> paths = listResultSchema.paths();
@@ -110,10 +110,11 @@ public class TestBlobListXmlParser {
     Assertions.assertThat(paths.get(1).isDirectory()).isEqualTo(true);
     Assertions.assertThat(paths.get(2).isDirectory()).isEqualTo(true);
     Assertions.assertThat(paths.get(3).isDirectory()).isEqualTo(false);
+    Assertions.assertThat(listResultSchema.getNextMarker()).isNotNull();
   }
 
   @Test
-  public void testEmptyBlobList() throws Exception {
+  public void testEmptyBlobListNullCT() throws Exception {
     String xmlResponse = ""
         + "<?xml version=\"1.0\" encoding=\"utf-8\"?><"
         + "EnumerationResults ServiceEndpoint=\"https://anujtestfns.blob.core.windows.net/\" ContainerName=\"manualtest\">"
@@ -124,6 +125,43 @@ public class TestBlobListXmlParser {
     BlobListResultSchema listResultSchema = getResultSchema(xmlResponse);
     List<BlobListResultEntrySchema> paths = listResultSchema.paths();
     Assertions.assertThat(paths.size()).isEqualTo(0);
+    Assertions.assertThat(listResultSchema.getNextMarker()).isNull();
+  }
+
+  @Test
+  public void testEmptyBlobListValidCT() throws Exception {
+    String xmlResponse = ""
+        + "<?xml version=\"1.0\" encoding=\"utf-8\"?><"
+        + "EnumerationResults ServiceEndpoint=\"https://anujtestfns.blob.core.windows.net/\" ContainerName=\"manualtest\">"
+        + "<Prefix>abc/</Prefix>"
+        + "<Delimiter>/</Delimiter>"
+        + "<Blobs />"
+        + "<NextMarker>TEST_CONTINUATION_TOKEN</NextMarker>"
+        + "</EnumerationResults>";
+    BlobListResultSchema listResultSchema = getResultSchema(xmlResponse);
+    List<BlobListResultEntrySchema> paths = listResultSchema.paths();
+    Assertions.assertThat(paths.size()).isEqualTo(0);
+    Assertions.assertThat(listResultSchema.getNextMarker()).isNotNull();
+  }
+
+  @Test
+  public void testNonEmptyBlobListNullCT() throws Exception {
+    String xmlResponse = ""
+        + "<?xml version=\"1.0\" encoding=\"utf-8\"?><"
+        + "EnumerationResults ServiceEndpoint=\"https://anujtestfns.blob.core.windows.net/\" ContainerName=\"manualtest\">"
+        + "<Prefix>abc/</Prefix>"
+        + "<Delimiter>/</Delimiter>"
+        + "<Blobs>"
+        + "<BlobPrefix>"
+        + "<Name>bye/</Name>"
+        + "</BlobPrefix>"
+        + "</Blobs>"
+        + "<NextMarker />"
+        + "</EnumerationResults>";
+    BlobListResultSchema listResultSchema = getResultSchema(xmlResponse);
+    List<BlobListResultEntrySchema> paths = listResultSchema.paths();
+    Assertions.assertThat(paths.size()).isEqualTo(1);
+    Assertions.assertThat(listResultSchema.getNextMarker()).isNull();
   }
 
   private static final ThreadLocal<SAXParser> SAX_PARSER_THREAD_LOCAL

+ 81 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java

@@ -23,6 +23,7 @@ import java.net.ProtocolException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
@@ -42,6 +43,7 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
 import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl;
 import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
 import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem;
+import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore;
 import org.apache.hadoop.fs.azurebfs.TestAbfsConfigurationFieldsValidation;
 import org.apache.hadoop.fs.azurebfs.constants.AbfsServiceType;
 import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
@@ -58,6 +60,7 @@ import org.apache.hadoop.test.ReflectionUtils;
 import org.apache.http.HttpResponse;
 
 import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+import static org.apache.hadoop.fs.azurebfs.ITestAzureBlobFileSystemListStatus.TEST_CONTINUATION_TOKEN;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APPEND_ACTION;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EXPECT_100_JDK_ERROR;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.HTTP_METHOD_PATCH;
@@ -75,6 +78,7 @@ import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.when;
 
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.APN_VERSION;
@@ -729,4 +733,81 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
             .describedAs("The expect header is not false")
             .isFalse();
   }
+
+  @Test
+  public void testIsNonEmptyDirectory() throws IOException {
+    testIsNonEmptyDirectoryInternal(EMPTY_STRING, true, EMPTY_STRING,
+        true, 1, false);
+    testIsNonEmptyDirectoryInternal(EMPTY_STRING, false, EMPTY_STRING,
+        false, 1, true);
+
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
+        true, 2, false);
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
+        false, 2, true);
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+        true, 3, false);
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+        false, 2, true);
+
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
+        true, 1, true);
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
+        false, 1, true);
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+        true, 1, true);
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+        false, 1, true);
+  }
+
+  private void testIsNonEmptyDirectoryInternal(String firstCT,
+      boolean isfirstEmpty, String secondCT, boolean isSecondEmpty,
+      int expectedInvocations, boolean isNonEmpty) throws IOException {
+
+    assumeBlobServiceType();
+    AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem());
+    AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore());
+    AbfsBlobClient spiedClient = Mockito.spy(spiedStore.getClientHandler().getBlobClient());
+    Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
+    Mockito.doReturn(spiedClient).when(spiedStore).getClient();
+    VersionedFileStatus status1 = new VersionedFileStatus(
+        "owner", "group", null, false, 0, false, 0, 0, 0,
+        new Path("/testPath/file1"), "version", "encryptionContext");
+    VersionedFileStatus status2 = new VersionedFileStatus(
+        "owner", "group", null, false, 0, false, 0, 0, 0,
+        new Path("/testPath/file2"), "version", "encryptionContext");
+
+    List<VersionedFileStatus> mockedList1 = new ArrayList<>();
+    mockedList1.add(status1);
+    List<VersionedFileStatus> mockedList2 = new ArrayList<>();
+    mockedList2.add(status2);
+
+    ListResponseData listResponseData1 = new ListResponseData();
+    listResponseData1.setContinuationToken(firstCT);
+    listResponseData1.setFileStatusList(isfirstEmpty ? new ArrayList<>() : mockedList1);
+    listResponseData1.setOp(Mockito.mock(AbfsRestOperation.class));
+
+    ListResponseData listResponseData2 = new ListResponseData();
+    listResponseData2.setContinuationToken(secondCT);
+    listResponseData2.setFileStatusList(isSecondEmpty ? new ArrayList<>() : mockedList2);
+    listResponseData2.setOp(Mockito.mock(AbfsRestOperation.class));
+
+    ListResponseData listResponseData3 = new ListResponseData();
+    listResponseData3.setContinuationToken(EMPTY_STRING);
+    listResponseData3.setFileStatusList(new ArrayList<>());
+    listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class));
+
+    Mockito.doReturn(listResponseData1).doReturn(listResponseData2).doReturn(listResponseData3)
+        .when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
+            any(), any(), any());
+
+    Assertions.assertThat(spiedClient.isNonEmptyDirectory("/testPath",
+        Mockito.mock(TracingContext.class)))
+        .describedAs("isNonEmptyDirectory in client giving unexpected results")
+        .isEqualTo(isNonEmpty);
+
+    Mockito.verify(spiedClient, times(expectedInvocations))
+        .listPath(eq("/testPath"), eq(false), eq(1),
+            any(), any(TracingContext.class), any());
+  }
 }

+ 1 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/DirectoryStateHelper.java

@@ -73,7 +73,7 @@ public final class DirectoryStateHelper {
 
     // 2nd condition: listPaths should return some entries.
     AbfsRestOperation op = client.listPath(
-        relativePath, false, 1, null, testTracingContext, null, false).getOp();
+        relativePath, false, 1, null, testTracingContext, null).getOp();
     if (op != null && op.getResult() != null) {
       int listSize = op.getResult().getListResultSchema().paths().size();
       if (listSize > 0) {