فهرست منبع

HADOOP-17022. Tune S3AFileSystem.listFiles() API.

Contributed by Mukund Thakur.

Change-Id: I17f5cfdcd25670ce3ddb62c13378c7e2dc06ba52
Mukund Thakur 4 سال پیش
والد
کامیت
4647a60430

+ 110 - 64
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -4206,79 +4206,125 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     Path path = qualify(f);
     LOG.debug("listFiles({}, {})", path, recursive);
     try {
-      // if a status was given, that is used, otherwise
-      // call getFileStatus, which triggers an existence check
-      final S3AFileStatus fileStatus = status != null
-          ? status
-          : (S3AFileStatus) getFileStatus(path);
-      if (fileStatus.isFile()) {
+      // if a status was given and it is a file.
+      if (status != null && status.isFile()) {
         // simple case: File
-        LOG.debug("Path is a file");
+        LOG.debug("Path is a file: {}", path);
         return new Listing.SingleStatusRemoteIterator(
-            toLocatedFileStatus(fileStatus));
-      } else {
-        // directory: do a bulk operation
-        String key = maybeAddTrailingSlash(pathToKey(path));
-        String delimiter = recursive ? null : "/";
-        LOG.debug("Requesting all entries under {} with delimiter '{}'",
-            key, delimiter);
-        final RemoteIterator<S3AFileStatus> cachedFilesIterator;
-        final Set<Path> tombstones;
-        boolean allowAuthoritative = allowAuthoritative(f);
-        if (recursive) {
-          final PathMetadata pm = metadataStore.get(path, true);
-          // shouldn't need to check pm.isDeleted() because that will have
-          // been caught by getFileStatus above.
-          MetadataStoreListFilesIterator metadataStoreListFilesIterator =
-              new MetadataStoreListFilesIterator(metadataStore, pm,
-                  allowAuthoritative);
-          tombstones = metadataStoreListFilesIterator.listTombstones();
-          // if all of the below is true
-          //  - authoritative access is allowed for this metadatastore for this directory,
-          //  - all the directory listings are authoritative on the client
-          //  - the caller does not force non-authoritative access
-          // return the listing without any further s3 access
-          if (!forceNonAuthoritativeMS &&
-              allowAuthoritative &&
-              metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
-            S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
-                metadataStoreListFilesIterator, tombstones);
-            cachedFilesIterator = listing.createProvidedFileStatusIterator(
-                statuses, ACCEPT_ALL, acceptor);
-            return listing.createLocatedFileStatusIterator(cachedFilesIterator);
-          }
-          cachedFilesIterator = metadataStoreListFilesIterator;
-        } else {
-          DirListingMetadata meta =
-              S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
-                  allowAuthoritative);
-          if (meta != null) {
-            tombstones = meta.listTombstones();
-          } else {
-            tombstones = null;
-          }
-          cachedFilesIterator = listing.createProvidedFileStatusIterator(
-              S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
-          if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
-            // metadata listing is authoritative, so return it directly
-            return listing.createLocatedFileStatusIterator(cachedFilesIterator);
-          }
+            toLocatedFileStatus(status));
+      }
+      // Assuming the path to be a directory
+      // do a bulk operation.
+      RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
+              getListFilesAssumingDir(path,
+                      recursive,
+                      acceptor,
+                      collectTombstones,
+                      forceNonAuthoritativeMS);
+      // If there are no list entries present, we
+      // fallback to file existence check as the path
+      // can be a file or empty directory.
+      if (!listFilesAssumingDir.hasNext()) {
+        // If file status was already passed, reuse it.
+        final S3AFileStatus fileStatus = status != null
+                ? status
+                : (S3AFileStatus) getFileStatus(path);
+        if (fileStatus.isFile()) {
+          return new Listing.SingleStatusRemoteIterator(
+                  toLocatedFileStatus(fileStatus));
         }
-        return listing.createTombstoneReconcilingIterator(
-            listing.createLocatedFileStatusIterator(
-                listing.createFileStatusListingIterator(path,
-                    createListObjectsRequest(key, delimiter),
-                    ACCEPT_ALL,
-                    acceptor,
-                    cachedFilesIterator)),
-            collectTombstones ? tombstones : null);
       }
+      // If we have reached here, it means either there are files
+      // in this directory or it is empty.
+      return listFilesAssumingDir;
     } catch (AmazonClientException e) {
-      // TODO S3Guard: retry on file not found exception
       throw translateException("listFiles", path, e);
     }
   }
 
+  /**
+   * List files under a path assuming the path to be a directory.
+   * @param path input path.
+   * @param recursive recursive listing?
+   * @param acceptor file status filter
+   * @param collectTombstones should tombstones be collected from S3Guard?
+   * @param forceNonAuthoritativeMS forces metadata store to act like non
+   *                                authoritative. This is useful when
+   *                                listFiles output is used by import tool.
+   * @return an iterator over listing.
+   * @throws IOException any exception.
+   */
+  private RemoteIterator<S3ALocatedFileStatus> getListFilesAssumingDir(
+          Path path,
+          boolean recursive, Listing.FileStatusAcceptor acceptor,
+          boolean collectTombstones,
+          boolean forceNonAuthoritativeMS) throws IOException {
+
+    String key = maybeAddTrailingSlash(pathToKey(path));
+    String delimiter = recursive ? null : "/";
+    LOG.debug("Requesting all entries under {} with delimiter '{}'",
+        key, delimiter);
+    final RemoteIterator<S3AFileStatus> cachedFilesIterator;
+    final Set<Path> tombstones;
+    boolean allowAuthoritative = allowAuthoritative(path);
+    if (recursive) {
+      final PathMetadata pm = metadataStore.get(path, true);
+      if (pm != null) {
+        if (pm.isDeleted()) {
+          OffsetDateTime deletedAt = OffsetDateTime
+                  .ofInstant(Instant.ofEpochMilli(
+                          pm.getFileStatus().getModificationTime()),
+                          ZoneOffset.UTC);
+          throw new FileNotFoundException("Path " + path + " is recorded as " +
+                  "deleted by S3Guard at " + deletedAt);
+        }
+      }
+      MetadataStoreListFilesIterator metadataStoreListFilesIterator =
+          new MetadataStoreListFilesIterator(metadataStore, pm,
+              allowAuthoritative);
+      tombstones = metadataStoreListFilesIterator.listTombstones();
+      // if all of the below is true
+      //  - authoritative access is allowed for this metadatastore
+      //  for this directory,
+      //  - all the directory listings are authoritative on the client
+      //  - the caller does not force non-authoritative access
+      // return the listing without any further s3 access
+      if (!forceNonAuthoritativeMS &&
+          allowAuthoritative &&
+          metadataStoreListFilesIterator.isRecursivelyAuthoritative()) {
+        S3AFileStatus[] statuses = S3Guard.iteratorToStatuses(
+            metadataStoreListFilesIterator, tombstones);
+        cachedFilesIterator = listing.createProvidedFileStatusIterator(
+            statuses, ACCEPT_ALL, acceptor);
+        return listing.createLocatedFileStatusIterator(cachedFilesIterator);
+      }
+      cachedFilesIterator = metadataStoreListFilesIterator;
+    } else {
+      DirListingMetadata meta =
+          S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
+              allowAuthoritative);
+      if (meta != null) {
+        tombstones = meta.listTombstones();
+      } else {
+        tombstones = null;
+      }
+      cachedFilesIterator = listing.createProvidedFileStatusIterator(
+          S3Guard.dirMetaToStatuses(meta), ACCEPT_ALL, acceptor);
+      if (allowAuthoritative && meta != null && meta.isAuthoritative()) {
+        // metadata listing is authoritative, so return it directly
+        return listing.createLocatedFileStatusIterator(cachedFilesIterator);
+      }
+    }
+    return listing.createTombstoneReconcilingIterator(
+        listing.createLocatedFileStatusIterator(
+            listing.createFileStatusListingIterator(path,
+                createListObjectsRequest(key, delimiter),
+                ACCEPT_ALL,
+                acceptor,
+                cachedFilesIterator)),
+        collectTombstones ? tombstones : null);
+  }
+
   /**
    * Override superclass so as to add statistic collection.
    * {@inheritDoc}

+ 70 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java

@@ -168,6 +168,76 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     }
   }
 
+  @Test
+  public void testCostOfListFilesOnFile() throws Throwable {
+    describe("Performing listFiles() on a file");
+    Path file = path(getMethodName() + ".txt");
+    S3AFileSystem fs = getFileSystem();
+    touch(fs, file);
+    resetMetricDiffs();
+    fs.listFiles(file, true);
+    if (!fs.hasMetadataStore()) {
+      metadataRequests.assertDiffEquals(1);
+    } else {
+      if (fs.allowAuthoritative(file)) {
+        listRequests.assertDiffEquals(0);
+      } else {
+        listRequests.assertDiffEquals(1);
+      }
+    }
+  }
+
+  @Test
+  public void testCostOfListFilesOnEmptyDir() throws Throwable {
+    describe("Performing listFiles() on an empty dir");
+    Path dir = path(getMethodName());
+    S3AFileSystem fs = getFileSystem();
+    fs.mkdirs(dir);
+    resetMetricDiffs();
+    fs.listFiles(dir, true);
+    if (!fs.hasMetadataStore()) {
+      verifyOperationCount(2, 1);
+    } else {
+      if (fs.allowAuthoritative(dir)) {
+        verifyOperationCount(0, 0);
+      } else {
+        verifyOperationCount(0, 1);
+      }
+    }
+  }
+
+  @Test
+  public void testCostOfListFilesOnNonEmptyDir() throws Throwable {
+    describe("Performing listFiles() on a non empty dir");
+    Path dir = path(getMethodName());
+    S3AFileSystem fs = getFileSystem();
+    fs.mkdirs(dir);
+    Path file = new Path(dir, "file.txt");
+    touch(fs, file);
+    resetMetricDiffs();
+    fs.listFiles(dir, true);
+    if (!fs.hasMetadataStore()) {
+      verifyOperationCount(0, 1);
+    } else {
+      if (fs.allowAuthoritative(dir)) {
+        verifyOperationCount(0, 0);
+      } else {
+        verifyOperationCount(0, 1);
+      }
+    }
+  }
+
+  @Test
+  public void testCostOfListFilesOnNonExistingDir() throws Throwable {
+    describe("Performing listFiles() on a non existing dir");
+    Path dir = path(getMethodName());
+    S3AFileSystem fs = getFileSystem();
+    resetMetricDiffs();
+    intercept(FileNotFoundException.class,
+        () -> fs.listFiles(dir, true));
+    verifyOperationCount(2, 2);
+  }
+
   @Test
   public void testCostOfGetFileStatusOnFile() throws Throwable {
     describe("performing getFileStatus on a file");

+ 5 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardListConsistency.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.contract.s3a.S3AContract;
 
 import com.google.common.collect.Lists;
@@ -271,7 +272,10 @@ public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
     assertTrue(fs.delete(testDirs[1], false));
     assertTrue(fs.delete(testDirs[2], false));
 
-    fs.rename(path("a"), path("a3"));
+    ContractTestUtils.rename(fs, path("a"), path("a3"));
+    ContractTestUtils.assertPathsDoNotExist(fs,
+            "Source paths shouldn't exist post rename operation",
+            testDirs[0], testDirs[1], testDirs[2]);
     FileStatus[] paths = fs.listStatus(path("a3/b"));
     List<Path> list = new ArrayList<>();
     for (FileStatus fileState : paths) {

+ 4 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/auth/ITestAssumeRole.java

@@ -397,6 +397,10 @@ public class ITestAssumeRole extends AbstractS3ATestBase {
       }
       forbidden("",
           () -> fs.listStatus(ROOT));
+      forbidden("",
+          () -> fs.listFiles(ROOT, true));
+      forbidden("",
+          () -> fs.listLocatedStatus(ROOT));
       forbidden("",
           () -> fs.mkdirs(path("testAssumeRoleFS")));
     }