Sfoglia il codice sorgente

HADOOP-17281 Implement FileSystem.listStatusIterator() in S3AFileSystem (#2354)

Contains HADOOP-17300: FileSystem.DirListingIterator.next() call should 
return NoSuchElementException

Contributed by Mukund Thakur
Mukund Thakur 4 anni fa
parent
commit
82522d60fb

+ 3 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java

@@ -2229,7 +2229,9 @@ public abstract class FileSystem extends Configured
     @Override
     @SuppressWarnings("unchecked")
     public T next() throws IOException {
-      Preconditions.checkState(hasNext(), "No more items in iterator");
+      if (!hasNext()) {
+        throw new NoSuchElementException("No more items in iterator");
+      }
       if (i == entries.getEntries().length) {
         fetchMore();
       }

+ 18 - 0
hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md

@@ -294,6 +294,24 @@ any optimizations.
 The atomicity and consistency constraints are as for
 `listStatus(Path, PathFilter)`.
 
+### `RemoteIterator<FileStatus> listStatusIterator(Path p)`
+
+Return an iterator enumerating the `FileStatus` entries under
+a path. This is similar to `listStatus(Path)` except the fact that
+rather than returning an entire list, an iterator is returned.
+The result is exactly the same as `listStatus(Path)`, provided no other
+caller updates the directory during the listing. Having said that, this does
+not guarantee atomicity if other callers are adding/deleting the files
+inside the directory while listing is being performed. Different filesystems
+may provide a more efficient implementation, for example S3A does the
+listing in pages and fetches the next pages asynchronously while a
+page is getting processed.
+
+Note that now since the initial listing is async, bucket/path existence
+exception may show up later during next() call.
+
+Callers should prefer using listStatusIterator over listStatus as it
+is incremental in nature.
 
 ### `FileStatus[] listStatus(Path[] paths)`
 

+ 70 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java

@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
+import org.assertj.core.api.Assertions;
+
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
@@ -148,6 +150,7 @@ public abstract class AbstractContractGetFileStatusTest extends
   public void testComplexDirActions() throws Throwable {
     TreeScanResults tree = createTestTree();
     checkListStatusStatusComplexDir(tree);
+    checkListStatusIteratorComplexDir(tree);
     checkListLocatedStatusStatusComplexDir(tree);
     checkListFilesComplexDirNonRecursive(tree);
     checkListFilesComplexDirRecursive(tree);
@@ -169,6 +172,34 @@ public abstract class AbstractContractGetFileStatusTest extends
     listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
   }
 
+  /**
+   * Test {@link FileSystem#listStatusIterator(Path)} on a complex
+   * directory tree.
+   * @param tree directory tree to list.
+   * @throws Throwable
+   */
+  protected void checkListStatusIteratorComplexDir(TreeScanResults tree)
+          throws Throwable {
+    describe("Expect listStatusIterator to list all entries in top dir only");
+
+    FileSystem fs = getFileSystem();
+    TreeScanResults listing = new TreeScanResults(
+            fs.listStatusIterator(tree.getBasePath()));
+    listing.assertSizeEquals("listStatus()", TREE_FILES, TREE_WIDTH, 0);
+
+    List<FileStatus> resWithoutCheckingHasNext =
+            iteratorToListThroughNextCallsAlone(fs
+                    .listStatusIterator(tree.getBasePath()));
+
+    List<FileStatus> resWithCheckingHasNext = iteratorToList(fs
+                    .listStatusIterator(tree.getBasePath()));
+    Assertions.assertThat(resWithCheckingHasNext)
+            .describedAs("listStatusIterator() should return correct " +
+                    "results even if hasNext() calls are not made.")
+            .hasSameElementsAs(resWithoutCheckingHasNext);
+
+  }
+
   /**
    * Test {@link FileSystem#listLocatedStatus(Path)} on a complex
    * directory tree.
@@ -322,6 +353,45 @@ public abstract class AbstractContractGetFileStatusTest extends
     verifyStatusArrayMatchesFile(f, getFileSystem().listStatus(f));
   }
 
+  @Test
+  public void testListStatusIteratorFile() throws Throwable {
+    describe("test the listStatusIterator(path) on a file");
+    Path f = touchf("listStItrFile");
+
+    List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
+            getFileSystem().listStatusIterator(f));
+    validateListingForFile(f, statusList, false);
+
+    List<FileStatus> statusList2 =
+            (List<FileStatus>) iteratorToListThroughNextCallsAlone(
+                    getFileSystem().listStatusIterator(f));
+    validateListingForFile(f, statusList2, true);
+  }
+
+  /**
+   * Validate listing result for an input path which is file.
+   * @param f file.
+   * @param statusList list status of a file.
+   * @param nextCallAlone whether the listing generated just using
+   *                      next() calls.
+   */
+  private void validateListingForFile(Path f,
+                                      List<FileStatus> statusList,
+                                      boolean nextCallAlone) {
+    String msg = String.format("size of file list returned using %s should " +
+            "be 1", nextCallAlone ?
+            "next() calls alone" : "hasNext() and next() calls");
+    Assertions.assertThat(statusList)
+            .describedAs(msg)
+            .hasSize(1);
+    Assertions.assertThat(statusList.get(0).getPath())
+            .describedAs("path returned should match with the input path")
+            .isEqualTo(f);
+    Assertions.assertThat(statusList.get(0).isFile())
+            .describedAs("path returned should be a file")
+            .isEqualTo(true);
+  }
+
   @Test
   public void testListFilesFile() throws Throwable {
     describe("test the listStatus(path) on a file");

+ 11 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractRootDirectoryTest.java

@@ -22,13 +22,16 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.junit.Test;
+import org.assertj.core.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -39,6 +42,7 @@ import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.deleteChildren;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.dumpStats;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.iteratorToList;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.listChildren;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.toList;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.treeWalk;
@@ -242,6 +246,13 @@ public abstract class AbstractContractRootDirectoryTest extends AbstractFSContra
             + "listStatus = " + listStatusResult
             + "listFiles = " + listFilesResult,
         fileList.size() <= statuses.length);
+    List<FileStatus> statusList = (List<FileStatus>) iteratorToList(
+            fs.listStatusIterator(root));
+    Assertions.assertThat(statusList)
+            .describedAs("Result of listStatus(/) and listStatusIterator(/)"
+                    + " must match")
+            .hasSameElementsAs(Arrays.stream(statuses)
+                    .collect(Collectors.toList()));
   }
 
   @Test

+ 47 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

@@ -1453,6 +1453,52 @@ public class ContractTestUtils extends Assert {
     return list;
   }
 
+  /**
+   * Convert a remote iterator over file status results into a list.
+   * The utility equivalents in commons collection and guava cannot be
+   * used here, as this is a different interface, one whose operators
+   * can throw IOEs.
+   * @param iterator input iterator
+   * @return the file status entries as a list.
+   * @throws IOException
+   */
+  public static <T extends FileStatus> List<T> iteratorToList(
+          RemoteIterator<T> iterator) throws IOException {
+    List<T> list = new ArrayList<>();
+    while (iterator.hasNext()) {
+      list.add(iterator.next());
+    }
+    return list;
+  }
+
+
+  /**
+   * Convert a remote iterator over file status results into a list.
+   * This uses {@link RemoteIterator#next()} calls only, expecting
+   * a raised {@link NoSuchElementException} exception to indicate that
+   * the end of the listing has been reached. This iteration strategy is
+   * designed to verify that the implementation of the remote iterator
+   * generates results and terminates consistently with the {@code hasNext/next}
+   * iteration. More succinctly "verifies that the {@code next()} operator
+   * isn't relying on {@code hasNext()} to always be called during an iteration.
+   * @param iterator input iterator
+   * @return the status entries as a list.
+   * @throws IOException IO problems
+   */
+  @SuppressWarnings("InfiniteLoopStatement")
+  public static <T extends FileStatus> List<T> iteratorToListThroughNextCallsAlone(
+          RemoteIterator<T> iterator) throws IOException {
+    List<T> list = new ArrayList<>();
+    try {
+      while (true) {
+        list.add(iterator.next());
+      }
+    } catch (NoSuchElementException expected) {
+      // ignored
+    }
+    return list;
+  }
+
   /**
    * Convert a remote iterator over file status results into a list.
    * This uses {@link RemoteIterator#next()} calls only, expecting
@@ -1602,7 +1648,7 @@ public class ContractTestUtils extends Assert {
      * @param results results of the listFiles/listStatus call.
      * @throws IOException IO problems during the iteration.
      */
-    public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
+    public TreeScanResults(RemoteIterator<? extends FileStatus> results)
         throws IOException {
       while (results.hasNext()) {
         add(results.next());

+ 24 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -2643,6 +2643,30 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     }
   }
 
+  /**
+   * Override subclass such that we benefit for async listing done
+   * in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}.
+   * {@inheritDoc}
+   *
+   */
+  @Override
+  public RemoteIterator<FileStatus> listStatusIterator(Path p)
+          throws FileNotFoundException, IOException {
+    RemoteIterator<S3AFileStatus> listStatusItr = once("listStatus",
+            p.toString(), () -> innerListStatus(p));
+    return new RemoteIterator<FileStatus>() {
+      @Override
+      public boolean hasNext() throws IOException {
+        return listStatusItr.hasNext();
+      }
+
+      @Override
+      public FileStatus next() throws IOException {
+        return listStatusItr.next();
+      }
+    };
+  }
+
   /**
    * List the statuses of the files/directories in the given path if the path is
    * a directory.

+ 18 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java

@@ -231,6 +231,24 @@ public class ITestS3ADirectoryPerformance extends S3AScaleTestBase {
                       "match with original list of files")
               .hasSameElementsAs(originalListOfFiles)
               .hasSize(numOfPutRequests);
+      // Validate listing using listStatusIterator().
+      NanoTimer timeUsingListStatusItr = new NanoTimer();
+      RemoteIterator<FileStatus> lsItr = fs.listStatusIterator(dir);
+      List<String> listUsingListStatusItr = new ArrayList<>();
+      while (lsItr.hasNext()) {
+        listUsingListStatusItr.add(lsItr.next().getPath().toString());
+        Thread.sleep(eachFileProcessingTime);
+      }
+      timeUsingListStatusItr.end("listing %d files using " +
+                      "listStatusIterator() api with batch size of %d " +
+                      "including %dms of processing time for each file",
+              numOfPutRequests, batchSize, eachFileProcessingTime);
+      Assertions.assertThat(listUsingListStatusItr)
+              .describedAs("Listing results using listStatusIterator() must" +
+                      "match with original list of files")
+              .hasSameElementsAs(originalListOfFiles)
+              .hasSize(numOfPutRequests);
+
     } finally {
       executorService.shutdown();
     }