Browse Source

HADOOP-19580. [ABFS][BugFix] IsNonEmptyDirectory Check should loop on listing using updated continuation token (#7716)

Contributed by Anuj Modi
Reviewed by Sneha Vijayarajan
Anuj Modi 2 days ago
parent
commit
8f78af1edc

+ 1 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -2037,7 +2037,7 @@ public class AbfsBlobClient extends AbfsClient {
     List<FileStatus> fileStatusList = new ArrayList<>();
     // We need to loop on continuation token until we get an entry or continuation token becomes null.
     do {
-      ListResponseData listResponseData = listPath(path, false, 1, null, tracingContext, null);
+      ListResponseData listResponseData = listPath(path, false, 1, continuationToken, tracingContext, null);
       fileStatusList.addAll(listResponseData.getFileStatusList());
       continuationToken = listResponseData.getContinuationToken();
     } while (StringUtils.isNotEmpty(continuationToken) && fileStatusList.isEmpty());

+ 37 - 7
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java

@@ -391,9 +391,9 @@ public class ITestAzureBlobFileSystemListStatus extends
         true, 2, 0);
     testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
         false, 2, 1);
-    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
         true, 3, 0);
-    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
         false, 3, 1);
 
     testEmptyListingInSubsequentCallInternal(EMPTY_STRING, false, EMPTY_STRING,
@@ -409,9 +409,9 @@ public class ITestAzureBlobFileSystemListStatus extends
         true, 2, 1);
     testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
         false, 2, 2);
-    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
         true, 3, 1);
-    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+    testEmptyListingInSubsequentCallInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
         false, 3, 2);
   }
 
@@ -453,9 +453,23 @@ public class ITestAzureBlobFileSystemListStatus extends
     listResponseData3.setFileStatusList(new ArrayList<>());
     listResponseData3.setOp(Mockito.mock(AbfsRestOperation.class));
 
-    Mockito.doReturn(listResponseData1).doReturn(listResponseData2).doReturn(listResponseData3)
-        .when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
-        any(), any(), any());
+    final int[] itr = new int[1];
+    final String[] continuationTokenUsed = new String[3];
+
+    Mockito.doAnswer(invocationOnMock -> {
+      if (itr[0] == 0) {
+        itr[0]++;
+        continuationTokenUsed[0] = invocationOnMock.getArgument(3);
+        return listResponseData1;
+      } else if (itr[0] == 1) {
+        itr[0]++;
+        continuationTokenUsed[1] = invocationOnMock.getArgument(3);
+        return listResponseData2;
+      }
+      continuationTokenUsed[2] = invocationOnMock.getArgument(3);
+      return listResponseData3;
+    }).when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
+        any(), any(TracingContext.class), any());
 
     FileStatus[] list = spiedFs.listStatus(new Path("/testPath"));
 
@@ -473,6 +487,22 @@ public class ITestAzureBlobFileSystemListStatus extends
       Mockito.verify(spiedClient, times(0))
           .getPathStatus(eq("/testPath"), any(), eq(null), eq(false));
     }
+
+    Assertions.assertThat(continuationTokenUsed[0])
+        .describedAs("First continuation token used is not as expected")
+        .isNull();
+
+    if (expectedInvocations > 1) {
+      Assertions.assertThat(continuationTokenUsed[1])
+          .describedAs("Second continuation token used is not as expected")
+          .isEqualTo(firstCT);
+    }
+
+    if (expectedInvocations > 2) {
+      Assertions.assertThat(continuationTokenUsed[2])
+          .describedAs("Third continuation token used is not as expected")
+          .isEqualTo(secondCT);
+    }
   }
 
   /**

+ 38 - 4
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsClient.java

@@ -745,18 +745,18 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
         true, 2, false);
     testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, EMPTY_STRING,
         false, 2, true);
-    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
         true, 3, false);
-    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, true, TEST_CONTINUATION_TOKEN,
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, true, TEST_CONTINUATION_TOKEN + 2,
         false, 2, true);
 
     testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
         true, 1, true);
     testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, EMPTY_STRING,
         false, 1, true);
-    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
         true, 1, true);
-    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN, false, TEST_CONTINUATION_TOKEN,
+    testIsNonEmptyDirectoryInternal(TEST_CONTINUATION_TOKEN + 1, false, TEST_CONTINUATION_TOKEN + 2,
         false, 1, true);
   }
 
@@ -801,11 +801,45 @@ public final class ITestAbfsClient extends AbstractAbfsIntegrationTest {
         .when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
             any(), any(), any());
 
+    final int[] itr = new int[1];
+    final String[] continuationTokenUsed = new String[3];
+
+    Mockito.doAnswer(invocationOnMock -> {
+      if (itr[0] == 0) {
+        itr[0]++;
+        continuationTokenUsed[0] = invocationOnMock.getArgument(3);
+        return listResponseData1;
+      } else if (itr[0] == 1) {
+        itr[0]++;
+        continuationTokenUsed[1] = invocationOnMock.getArgument(3);
+        return listResponseData2;
+      }
+      continuationTokenUsed[2] = invocationOnMock.getArgument(3);
+      return listResponseData3;
+    }).when(spiedClient).listPath(eq("/testPath"), eq(false), eq(1),
+        any(), any(TracingContext.class), any());
+
     Assertions.assertThat(spiedClient.isNonEmptyDirectory("/testPath",
         Mockito.mock(TracingContext.class)))
         .describedAs("isNonEmptyDirectory in client giving unexpected results")
         .isEqualTo(isNonEmpty);
 
+    Assertions.assertThat(continuationTokenUsed[0])
+        .describedAs("First continuation token used is not as expected")
+        .isNull();
+
+    if (expectedInvocations > 1) {
+      Assertions.assertThat(continuationTokenUsed[1])
+          .describedAs("Second continuation token used is not as expected")
+          .isEqualTo(firstCT);
+    }
+
+    if (expectedInvocations > 2) {
+      Assertions.assertThat(continuationTokenUsed[2])
+          .describedAs("Third continuation token used is not as expected")
+          .isEqualTo(secondCT);
+    }
+
     Mockito.verify(spiedClient, times(expectedInvocations))
         .listPath(eq("/testPath"), eq(false), eq(1),
             any(), any(TracingContext.class), any());