소스 검색

HADOOP-19543. [ABFS][FnsOverBlob] Remove Duplicates from Blob Endpoint Listing Across Iterations (#7614)

Contributed by Anuj Modi
Reviewed by Anmol Asrani, Manish Bhatt, Manika Joshi

Signed off by Anuj Modi<anujmodi@apache.org>
Anuj Modi 2 주 전
부모
커밋
810c42f88c

+ 13 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -76,6 +76,7 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityExc
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
 import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TrileanConversionException;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
+import org.apache.hadoop.fs.azurebfs.services.AbfsBlobClient;
 import org.apache.hadoop.fs.azurebfs.services.ListResponseData;
 import org.apache.hadoop.fs.azurebfs.enums.Trilean;
 import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
@@ -117,6 +118,7 @@ import org.apache.hadoop.fs.azurebfs.utils.Base64;
 import org.apache.hadoop.fs.azurebfs.utils.CRC64;
 import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
 import org.apache.hadoop.fs.azurebfs.utils.EncryptionType;
+import org.apache.hadoop.fs.azurebfs.utils.ListUtils;
 import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
 import org.apache.hadoop.fs.azurebfs.utils.UriUtils;
 import org.apache.hadoop.fs.impl.BackReference;
@@ -1272,7 +1274,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             : generateContinuationTokenForNonXns(relativePath, startFrom);
       }
     }
-
+    List<FileStatus> fileStatusList = new ArrayList<>();
     do {
       try (AbfsPerfInfo perfInfo = startTracking("listStatus", "listPath")) {
         ListResponseData listResponseData = listingClient.listPath(relativePath,
@@ -1281,9 +1283,9 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
         AbfsRestOperation op = listResponseData.getOp();
         perfInfo.registerResult(op.getResult());
         continuation = listResponseData.getContinuationToken();
-        List<FileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
+        List<VersionedFileStatus> fileStatusListInCurrItr = listResponseData.getFileStatusList();
         if (fileStatusListInCurrItr != null && !fileStatusListInCurrItr.isEmpty()) {
-          fileStatuses.addAll(fileStatusListInCurrItr);
+          fileStatusList.addAll(fileStatusListInCurrItr);
         }
         perfInfo.registerSuccess(true);
         countAggregate++;
@@ -1296,6 +1298,14 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       }
     } while (shouldContinue);
 
+    if (listingClient instanceof AbfsBlobClient) {
+      fileStatuses.addAll(ListUtils.getUniqueListResult(fileStatusList));
+      LOG.debug("ListBlob API returned a total of {} elements including duplicates."
+          + "Number of unique Elements are {}", fileStatusList.size(), fileStatuses.size());
+    } else {
+      fileStatuses.addAll(fileStatusList);
+    }
+
     return continuation;
   }
 

+ 8 - 26
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsBlobClient.java

@@ -42,7 +42,6 @@ import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.UUID;
 
 import org.w3c.dom.Document;
@@ -52,7 +51,6 @@ import org.xml.sax.SAXException;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.fs.Path;
@@ -402,7 +400,7 @@ public class AbfsBlobClient extends AbfsClient {
       AbfsRestOperation pathStatus = this.getPathStatus(relativePath, tracingContext, null, false);
       BlobListResultSchema listResultSchema = getListResultSchemaFromPathStatus(relativePath, pathStatus);
       LOG.debug("ListBlob attempted on a file path. Returning file status.");
-      List<FileStatus> fileStatusList = new ArrayList<>();
+      List<VersionedFileStatus> fileStatusList = new ArrayList<>();
       for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
         fileStatusList.add(getVersionedFileStatusFromEntry(entry, uri));
       }
@@ -1617,7 +1615,7 @@ public class AbfsBlobClient extends AbfsClient {
         LOG.debug("ListBlobs listed {} blobs with {} as continuation token",
             listResultSchema.paths().size(),
             listResultSchema.getNextMarker());
-        return filterDuplicateEntriesAndRenamePendingFiles(listResultSchema, uri);
+        return filterRenamePendingFiles(listResultSchema, uri);
       } catch (SAXException | IOException ex) {
         throw new AbfsDriverException(ERR_BLOB_LIST_PARSING, ex);
       }
@@ -1917,39 +1915,23 @@ public class AbfsBlobClient extends AbfsClient {
   });
 
   /**
-   * This is to handle duplicate listing entries returned by Blob Endpoint for
-   * implicit paths that also has a marker file created for them.
-   * This will retain entry corresponding to marker file and remove the BlobPrefix entry.
-   * This will also filter out all the rename pending json files in listing output.
+   * This will filter out all the rename pending json files in listing output.
    * @param listResultSchema List of entries returned by Blob Endpoint.
    * @param uri URI to be used for path conversion.
    * @return List of entries after removing duplicates.
    * @throws IOException if path conversion fails.
    */
   @VisibleForTesting
-  public ListResponseData filterDuplicateEntriesAndRenamePendingFiles(
+  public ListResponseData filterRenamePendingFiles(
       BlobListResultSchema listResultSchema, URI uri) throws IOException {
-    List<FileStatus> fileStatuses = new ArrayList<>();
+    List<VersionedFileStatus> fileStatuses = new ArrayList<>();
     Map<Path, Integer> renamePendingJsonPaths = new HashMap<>();
-    TreeMap<String, BlobListResultEntrySchema> nameToEntryMap = new TreeMap<>();
 
     for (BlobListResultEntrySchema entry : listResultSchema.paths()) {
-      if (StringUtils.isNotEmpty(entry.eTag())) {
-        // This is a blob entry. It is either a file or a marker blob.
-        // In both cases we will add this.
-        if (isRenamePendingJsonPathEntry(entry)) {
-          renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
-        } else {
-          nameToEntryMap.put(entry.name(), entry);
-          fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
-        }
+      if (isRenamePendingJsonPathEntry(entry)) {
+        renamePendingJsonPaths.put(entry.path(), entry.contentLength().intValue());
       } else {
-        // This is a BlobPrefix entry. It is a directory with file inside
-        // This might have already been added as a marker blob.
-        if (!nameToEntryMap.containsKey(entry.name())) {
-          nameToEntryMap.put(entry.name(), entry);
-          fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
-        }
+        fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
       }
     }
 

+ 1 - 2
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java

@@ -45,7 +45,6 @@ import com.fasterxml.jackson.databind.JsonMappingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -1353,7 +1352,7 @@ public class AbfsDfsClient extends AbfsClient {
         LOG.debug("ListPath listed {} paths with {} as continuation token",
             listResultSchema.paths().size(),
             getContinuationFromResponse(result));
-        List<FileStatus> fileStatuses = new ArrayList<>();
+        List<VersionedFileStatus> fileStatuses = new ArrayList<>();
         for (DfsListResultEntrySchema entry : listResultSchema.paths()) {
           fileStatuses.add(getVersionedFileStatusFromEntry(entry, uri));
         }

+ 8 - 9
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ListResponseData.java

@@ -21,34 +21,33 @@ package org.apache.hadoop.fs.azurebfs.services;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
 /**
  * This class is used to hold the response data for list operations.
- * It contains a list of FileStatus objects, a map of rename pending JSON paths,
+ * It contains a list of VersionedFileStatus objects, a map of rename pending JSON paths,
  * continuation token and the executed REST operation.
  */
 public class ListResponseData {
 
-  private List<FileStatus> fileStatusList;
+  private List<VersionedFileStatus> fileStatusList;
   private Map<Path, Integer> renamePendingJsonPaths;
   private AbfsRestOperation executedRestOperation;
   private String continuationToken;
 
   /**
-   * Returns the list of FileStatus objects.
-   * @return the list of FileStatus objects
+   * Returns the list of VersionedFileStatus objects.
+   * @return the list of VersionedFileStatus objects
    */
-  public List<FileStatus> getFileStatusList() {
+  public List<VersionedFileStatus> getFileStatusList() {
     return fileStatusList;
   }
 
   /**
-   * Sets the list of FileStatus objects.
-   * @param fileStatusList the list of FileStatus objects
+   * Sets the list of VersionedFileStatus objects.
+   * @param fileStatusList the list of VersionedFileStatus objects
    */
-  public void setFileStatusList(final List<FileStatus> fileStatusList) {
+  public void setFileStatusList(final List<VersionedFileStatus> fileStatusList) {
     this.fileStatusList = fileStatusList;
   }
 

+ 69 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/utils/ListUtils.java

@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.TreeMap;
+
+import org.apache.hadoop.fs.FileStatus;
+
+/**
+ * Utility class for List operations.
+ */
+public final class ListUtils {
+
+  private ListUtils() {
+    // Private constructor to prevent instantiation
+  }
+
+  /**
+   * Utility method to remove duplicates from a list of FileStatus.
+   * ListBlob API of blob endpoint can return duplicate entries.
+   * @param originalList prone to have duplicates
+   * @return rectified list with no duplicates.
+   */
+  public static List<FileStatus> getUniqueListResult(List<FileStatus> originalList) {
+    if (originalList == null || originalList.isEmpty()) {
+      return originalList;
+    }
+
+    TreeMap<String, FileStatus> nameToEntryMap = new TreeMap<>();
+    String prefix = null;
+    List<FileStatus> rectifiedFileStatusList = new ArrayList<>();
+
+    for (FileStatus current : originalList) {
+      String fileName = current.getPath().getName();
+
+      if (prefix == null || !fileName.startsWith(prefix)) {
+        // Prefix pattern breaks here. Reset Map and prefix.
+        prefix = fileName;
+        nameToEntryMap.clear();
+      }
+
+      // Add the current entry if it is not already added.
+      if (!nameToEntryMap.containsKey(fileName)) {
+        nameToEntryMap.put(fileName, current);
+        rectifiedFileStatusList.add(current);
+      }
+    }
+
+    return rectifiedFileStatusList;
+  }
+}

+ 86 - 1
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java

@@ -24,7 +24,9 @@ import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -90,6 +92,8 @@ public class ITestAzureBlobFileSystemListStatus extends
     AbstractAbfsIntegrationTest {
   private static final int TEST_FILES_NUMBER = 6000;
   public static final String TEST_CONTINUATION_TOKEN = "continuation";
+  private static final int TOTAL_NUMBER_OF_PATHS = 11;
+  private static final int NUMBER_OF_UNIQUE_PATHS = 7;
 
   public ITestAzureBlobFileSystemListStatus() throws Exception {
     super();
@@ -197,7 +201,7 @@ public class ITestAzureBlobFileSystemListStatus extends
     Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
     Mockito.doReturn(spiedClient).when(spiedStore).getClient();
 
-    Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterDuplicateEntriesAndRenamePendingFiles(any(), any());
+    Mockito.doThrow(new SocketException(CONNECTION_RESET_MESSAGE)).when(spiedClient).filterRenamePendingFiles(any(), any());
     List<FileStatus> fileStatuses = new ArrayList<>();
     AbfsDriverException ex = intercept(AbfsDriverException.class,
       () -> {
@@ -532,6 +536,87 @@ public class ITestAzureBlobFileSystemListStatus extends
         .describedAs("Listing Size Not as expected").hasSize(1);
   }
 
+  /**
+   * Test to verify that listStatus returns the correct file status
+   * after removing duplicates across multiple iterations of list blobs.
+   * Also verifies that in case of non-empty explicit dir,
+   * entry corresponding to marker blob is returned.
+   * @throws Exception if test fails.
+   */
+  @Test
+  public void testDuplicateEntriesAcrossListBlobIterations() throws Exception {
+    AzureBlobFileSystem fs = Mockito.spy(getFileSystem());
+    AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore());
+    store.getAbfsConfiguration().setListMaxResults(1);
+    AbfsClient client = Mockito.spy(store.getClient());
+
+    Mockito.doReturn(store).when(fs).getAbfsStore();
+    Mockito.doReturn(client).when(store).getClient();
+
+    /*
+     * Following entries will be created inside the root path.
+     * 0. /A - implicit directory without any marker blob
+     * 1. /a - marker file for explicit directory
+     * 2. /a/file1 - normal file inside explicit directory
+     * 3. /b - normal file inside root
+     * 4. /c - marker file for explicit directory
+     * 5. /c.bak - marker file for explicit directory
+     * 6. /c.bak/file2 - normal file inside explicit directory
+     * 7. /c/file3 - normal file inside explicit directory
+     * 8. /d - implicit directory
+     * 9. /e - marker file for explicit directory
+     * 10. /e/file4 - normal file inside explicit directory
+     */
+    // Create Path 0
+    createAzCopyFolder(new Path("/A"));
+
+    // Create Path 1 and 2.
+    fs.create(new Path("/a/file1"));
+
+    // Create Path 3
+    fs.create(new Path("/b"));
+
+    // Create Path 4 and 7
+    fs.create(new Path("/c/file3"));
+
+    // Create Path 5 and 6
+    fs.create(new Path("/c.bak/file2"));
+
+    // Create Path 8
+    createAzCopyFolder(new Path("/d"));
+
+    // Create Path 9 and 10
+    fs.create(new Path("/e/file4"));
+
+    FileStatus[] fileStatuses = fs.listStatus(new Path(ROOT_PATH));
+
+    // Assert that client.listPath was called 11 times.
+    // This will assert server returned 11 entries in total.
+    Mockito.verify(client, Mockito.times(TOTAL_NUMBER_OF_PATHS))
+        .listPath(eq(ROOT_PATH), eq(false), eq(1), any(), any(), any());
+
+    // Assert that after duplicate removal, only 7 unique entries are returned.
+    Assertions.assertThat(fileStatuses.length)
+        .describedAs("List size is not expected").isEqualTo(NUMBER_OF_UNIQUE_PATHS);
+
+    // Assert that for duplicates, entry corresponding to marker blob is returned.
+    assertImplicitDirectoryFileStatus(fileStatuses[0], fs.makeQualified(new Path("/A")));
+    assertExplicitDirectoryFileStatus(fileStatuses[1], fs.makeQualified(new Path("/a")));
+    assertFilePathFileStatus(fileStatuses[2], fs.makeQualified(new Path("/b")));
+    assertExplicitDirectoryFileStatus(fileStatuses[3], fs.makeQualified(new Path("/c")));
+    assertExplicitDirectoryFileStatus(fileStatuses[4], fs.makeQualified(new Path("/c.bak")));
+    assertImplicitDirectoryFileStatus(fileStatuses[5], fs.makeQualified(new Path("/d")));
+    assertExplicitDirectoryFileStatus(fileStatuses[6], fs.makeQualified(new Path("/e")));
+
+    // Assert that there are no duplicates in the returned file statuses.
+    Set<Path> uniquePaths = new HashSet<>();
+    for (FileStatus fileStatus : fileStatuses) {
+      Assertions.assertThat(uniquePaths.add(fileStatus.getPath()))
+          .describedAs("Duplicate Entries found")
+          .isTrue();
+    }
+  }
+
   private void assertFilePathFileStatus(final FileStatus fileStatus,
       final Path qualifiedPath) {
     Assertions.assertThat(fileStatus.getPath())

+ 110 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestListUtils.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.azurebfs.utils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Test class for ListUtils.
+ */
+public class TestListUtils {
+
+  /**
+   * Test method to check the removal of duplicates from a list of FileStatus.
+   */
+  @Test
+  public void testRemoveDuplicates() {
+    List<FileStatus> originalList = new ArrayList<>();
+    validateList(originalList, 0);
+
+    originalList = new ArrayList<>();
+    originalList.add(getFileStatusObject(new Path("/A")));
+    validateList(originalList, 1);
+
+    originalList = new ArrayList<>();
+    originalList.add(getFileStatusObject(new Path("/A")));
+    originalList.add(getFileStatusObject(new Path("/A")));
+    validateList(originalList, 1);
+
+    originalList = new ArrayList<>();
+    originalList.add(getFileStatusObject(new Path("/a")));
+    originalList.add(getFileStatusObject(new Path("/a.bak1")));
+    originalList.add(getFileStatusObject(new Path("/a.bak1.bak2")));
+    originalList.add(getFileStatusObject(new Path("/a.bak1.bak2")));
+    originalList.add(getFileStatusObject(new Path("/a.bak1")));
+    originalList.add(getFileStatusObject(new Path("/a")));
+    originalList.add(getFileStatusObject(new Path("/abc")));
+    originalList.add(getFileStatusObject(new Path("/abc.bak1")));
+    originalList.add(getFileStatusObject(new Path("/abc")));
+    validateList(originalList, 5);
+
+    originalList = new ArrayList<>();
+    originalList.add(getFileStatusObject(new Path("/a")));
+    originalList.add(getFileStatusObject(new Path("/a")));
+    originalList.add(getFileStatusObject(new Path("/a_bak1")));
+    originalList.add(getFileStatusObject(new Path("/a_bak1")));
+    originalList.add(getFileStatusObject(new Path("/a_bak1_bak2")));
+    originalList.add(getFileStatusObject(new Path("/a_bak1_bak2")));
+    originalList.add(getFileStatusObject(new Path("/abc")));
+    originalList.add(getFileStatusObject(new Path("/abc")));
+    originalList.add(getFileStatusObject(new Path("/abc_bak1")));
+    validateList(originalList, 5);
+
+    originalList = new ArrayList<>();
+    originalList.add(getFileStatusObject(new Path("/a")));
+    originalList.add(getFileStatusObject(new Path("/b")));
+    validateList(originalList, 2);
+
+    originalList = new ArrayList<>();
+    originalList.add(getFileStatusObject(new Path("/a")));
+    originalList.add(getFileStatusObject(new Path("/b")));
+    originalList.add(getFileStatusObject(new Path("/b")));
+    validateList(originalList, 2);
+  }
+
+  /**
+   * Validate the size of the list after removing duplicates.
+   * @param originalList list having duplicates
+   * @param expectedSize number of unique entries expected
+   */
+  private void validateList(List<FileStatus> originalList, int expectedSize) {
+    List<FileStatus> uniqueList = ListUtils.getUniqueListResult(originalList);
+    Assertions.assertThat(uniqueList)
+        .describedAs("List Size is not as expected after duplicate removal")
+        .hasSize(expectedSize);
+  }
+
+  /**
+   * Create a FileStatus object with the given path.
+   * @param path path to be set in the FileStatus object
+   * @return FileStatus object with the given path
+   */
+  private FileStatus getFileStatusObject(Path path) {
+    FileStatus status = new FileStatus();
+    status.setPath(path);
+    return status;
+  }
+}