瀏覽代碼

HADOOP-16709. S3Guard: Make authoritative mode exclusive for metadata - don't check for expiry for authoritative paths (#1721). Contributed by Gabor Bota.

Gabor Bota 5 年之前
父節點
當前提交
ea25f4de23

+ 12 - 10
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -2415,9 +2415,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         key = key + '/';
       }
 
-      DirListingMetadata dirMeta =
-          S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
       boolean allowAuthoritative = allowAuthoritative(f);
+      DirListingMetadata dirMeta =
+          S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
+              allowAuthoritative);
       if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
         return S3Guard.dirMetaToStatuses(dirMeta);
       }
@@ -2649,11 +2650,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     String key = pathToKey(path);
     LOG.debug("Getting path status for {}  ({})", path, key);
 
+    boolean allowAuthoritative = allowAuthoritative(path);
     // Check MetadataStore, if any.
     PathMetadata pm = null;
     if (hasMetadataStore()) {
       pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider,
-          needEmptyDirectoryFlag);
+          needEmptyDirectoryFlag, allowAuthoritative);
     }
     Set<Path> tombstones = Collections.emptySet();
     if (pm != null) {
@@ -2669,9 +2671,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // modification - compare the modTime to check if metadata is up to date
       // Skip going to s3 if the file checked is a directory. Because if the
       // dest is also a directory, there's no difference.
-      // TODO After HADOOP-16085 the modification detection can be done with
-      //  etags or object version instead of modTime
-      boolean allowAuthoritative = allowAuthoritative(path);
+
       if (!pm.getFileStatus().isDirectory() &&
           !allowAuthoritative &&
           probes.contains(StatusProbeEnum.Head)) {
@@ -2709,7 +2709,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           return msStatus;
         } else {
           DirListingMetadata children =
-              S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
+              S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
+                  allowAuthoritative);
           if (children != null) {
             tombstones = children.listTombstones();
           }
@@ -3995,7 +3996,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
           cachedFilesIterator = metadataStoreListFilesIterator;
         } else {
           DirListingMetadata meta =
-              S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
+              S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider,
+                  allowAuthoritative);
           if (meta != null) {
             tombstones = meta.listTombstones();
           } else {
@@ -4070,13 +4072,13 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
               final String key = maybeAddTrailingSlash(pathToKey(path));
               final Listing.FileStatusAcceptor acceptor =
                   new Listing.AcceptAllButSelfAndS3nDirs(path);
+              boolean allowAuthoritative = allowAuthoritative(f);
               DirListingMetadata meta =
                   S3Guard.listChildrenWithTtl(metadataStore, path,
-                      ttlTimeProvider);
+                      ttlTimeProvider, allowAuthoritative);
               final RemoteIterator<S3AFileStatus> cachedFileStatusIterator =
                   listing.createProvidedFileStatusIterator(
                       S3Guard.dirMetaToStatuses(meta), filter, acceptor);
-              boolean allowAuthoritative = allowAuthoritative(f);
               return (allowAuthoritative && meta != null
                   && meta.isAuthoritative())
                   ? listing.createLocatedFileStatusIterator(

+ 4 - 23
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

@@ -805,15 +805,9 @@ public class DynamoDBMetadataStore implements MetadataStore,
           // get a null in DDBPathMetadata.
           DDBPathMetadata dirPathMeta = get(path);
 
-          // Filter expired entries.
           final DirListingMetadata dirListing =
               getDirListingMetadataFromDirMetaAndList(path, metas,
                   dirPathMeta);
-          if(dirListing != null) {
-            dirListing.removeExpiredEntriesFromListing(
-                ttlTimeProvider.getMetadataTtl(),
-                ttlTimeProvider.getNow());
-          }
           return dirListing;
         });
   }
@@ -1001,7 +995,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
     if (!newDirs.isEmpty()) {
       // patch up the time.
       patchLastUpdated(newDirs, ttlTimeProvider);
-      innerPut(newDirs, operationState, ttlTimeProvider);
+      innerPut(newDirs, operationState);
     }
   }
 
@@ -1244,7 +1238,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
   public void put(
       final Collection<? extends PathMetadata> metas,
       @Nullable final BulkOperationState operationState) throws IOException {
-    innerPut(pathMetaToDDBPathMeta(metas), operationState, ttlTimeProvider);
+    innerPut(pathMetaToDDBPathMeta(metas), operationState);
   }
 
   /**
@@ -1258,15 +1252,13 @@ public class DynamoDBMetadataStore implements MetadataStore,
    * create entries in the table without parents.
    * @param metas metadata entries to write.
    * @param operationState (nullable) operational state for a bulk update
-   * @param ttlTp The time provider for metadata expiry
    * @throws IOException failure.
    */
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Retries.RetryTranslated
   private void innerPut(
       final Collection<DDBPathMetadata> metas,
-      @Nullable final BulkOperationState operationState,
-      final ITtlTimeProvider ttlTp) throws IOException {
+      @Nullable final BulkOperationState operationState) throws IOException {
     if (metas.isEmpty()) {
       // Happens when someone calls put() with an empty list.
       LOG.debug("Ignoring empty list of entries to put");
@@ -1641,7 +1633,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
     try {
       LOG.debug("innerPut on metas: {}", metas);
       if (!metas.isEmpty()) {
-        innerPut(metas, state, ttlTimeProvider);
+        innerPut(metas, state);
       }
     } catch (IOException e) {
       String msg = String.format("IOException while setting false "
@@ -2003,17 +1995,6 @@ public class DynamoDBMetadataStore implements MetadataStore,
     this.ttlTimeProvider = ttlTimeProvider;
   }
 
-  /**
-   * Extract a time provider from the argument or fall back to the
-   * one in the constructor.
-   * @param ttlTp nullable time source passed in as an argument.
-   * @return a non-null time source.
-   */
-  private ITtlTimeProvider extractTimeProvider(
-      @Nullable ITtlTimeProvider ttlTp) {
-    return ttlTp != null ? ttlTp : this.ttlTimeProvider;
-  }
-
   /**
    * Username.
    * @return the current username

+ 0 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java

@@ -213,10 +213,6 @@ public class LocalMetadataStore implements MetadataStore {
     }
 
     if (listing != null) {
-      listing.removeExpiredEntriesFromListing(
-          ttlTimeProvider.getMetadataTtl(), ttlTimeProvider.getNow());
-      LOG.debug("listChildren [after removing expired entries] ({}) -> {}",
-          path, listing.prettyPrint());
       // Make a copy so callers can mutate without affecting our state
       return new DirListingMetadata(listing);
     }

+ 30 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java

@@ -712,17 +712,22 @@ public final class S3Guard {
 
   /**
    * Get a path entry provided it is not considered expired.
+   * If the allowAuthoritative flag is true, return without
+   * checking for TTL expiry.
    * @param ms metastore
    * @param path path to look up.
    * @param timeProvider nullable time provider
    * @param needEmptyDirectoryFlag if true, implementation will
    * return known state of directory emptiness.
+   * @param allowAuthoritative if this flag is true, the ttl won't apply to the
+   * metadata - so it will be returned regardless of it's expiry.
    * @return the metadata or null if there as no entry.
    * @throws IOException failure.
    */
   public static PathMetadata getWithTtl(MetadataStore ms, Path path,
       @Nullable ITtlTimeProvider timeProvider,
-      final boolean needEmptyDirectoryFlag) throws IOException {
+      final boolean needEmptyDirectoryFlag,
+      final boolean allowAuthoritative) throws IOException {
     final PathMetadata pathMetadata = ms.get(path, needEmptyDirectoryFlag);
     // if timeProvider is null let's return with what the ms has
     if (timeProvider == null) {
@@ -730,6 +735,12 @@ public final class S3Guard {
       return pathMetadata;
     }
 
+    // authoritative mode is enabled for this directory, return what the ms has
+    if (allowAuthoritative) {
+      LOG.debug("allowAuthoritative is true, returning pathMetadata as is");
+      return pathMetadata;
+    }
+
     long ttl = timeProvider.getMetadataTtl();
 
     if (pathMetadata != null) {
@@ -755,15 +766,21 @@ public final class S3Guard {
 
   /**
    * List children; mark the result as non-auth if the TTL has expired.
+   * If the allowAuthoritative flag is true, return without filtering or
+   * checking for TTL expiry.
    * @param ms metastore
    * @param path path to look up.
    * @param timeProvider nullable time provider
+   * @param allowAuthoritative if this flag is true, the ttl won't apply to the
+   * metadata - so it will be returned regardless of it's expiry.
    * @return the listing of entries under a path, or null if there as no entry.
    * @throws IOException failure.
    */
-  @Retries.RetryTranslated
+  @RetryTranslated
   public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
-      Path path, @Nullable ITtlTimeProvider timeProvider)
+      Path path,
+      @Nullable ITtlTimeProvider timeProvider,
+      boolean allowAuthoritative)
       throws IOException {
     DirListingMetadata dlm = ms.listChildren(path);
 
@@ -772,12 +789,18 @@ public final class S3Guard {
       return dlm;
     }
 
-    long ttl = timeProvider.getMetadataTtl();
+    if (allowAuthoritative) {
+      LOG.debug("allowAuthoritative is true, returning pathMetadata as is");
+      return dlm;
+    }
 
-    if (dlm != null && dlm.isAuthoritative()
-        && dlm.isExpired(ttl, timeProvider.getNow())) {
-      dlm.setAuthoritative(false);
+    // filter expired entries
+    if (dlm != null) {
+      dlm.removeExpiredEntriesFromListing(
+          timeProvider.getMetadataTtl(),
+          timeProvider.getNow());
     }
+
     return dlm;
   }
 

+ 10 - 4
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md

@@ -135,6 +135,8 @@ two different reasons:
     * All interactions with the S3 bucket(s) must be through S3A clients sharing
     the same metadata store.
     * This is independent from which metadata store implementation is used.
+    * In authoritative mode the metadata TTL metadata expiry is not effective.
+    This means that the metadata entries won't expire on authoritative paths.
 
 * Authoritative directory listings (isAuthoritative bit)
     * Tells if the stored directory listing metadata is complete.
@@ -193,10 +195,14 @@ In particular: **If the Metadata Store is declared as authoritative,
 all interactions with the S3 bucket(s) must be through S3A clients sharing
 the same Metadata Store**
 
-It can be configured how long a directory listing in the MetadataStore is
-considered as authoritative. If `((lastUpdated + ttl) <= now)` is false, the
-directory  listing is no longer considered authoritative, so the flag will be
-removed on `S3AFileSystem` level.
+#### TTL metadata expiry
+
+It can be configured how long an entry is valid in the MetadataStore
+**if the authoritative mode is turned off**, or the path is not
+configured to be authoritative.
+If `((lastUpdated + ttl) <= now)` is false for an entry, the entry will
+be expired, so the S3 bucket will be queried for fresh metadata.
+The time for expiry of metadata can be set as the following:
 
 ```xml
 <property>

+ 48 - 15
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardOutOfBandOperations.java

@@ -365,16 +365,25 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
           testTimeProvider.getNow(), testTimeProvider.getMetadataTtl());
 
       // READ GUARDED
-      String newRead = readBytesToString(guardedFs, testFilePath,
-          newText.length());
+      // This should fail in authoritative mode since we trust the metadatastore
+      // despite of the expiry. The metadata will not expire.
+      if (authoritative) {
+        intercept(FileNotFoundException.class, testFilePath.toString(),
+            "File should not be present in the metedatastore in authoritative mode.",
+            () -> readBytesToString(guardedFs, testFilePath, newText.length()));
+      } else {
+        String newRead = readBytesToString(guardedFs, testFilePath,
+            newText.length());
+
+        // CHECK LISTING - THE FILE SHOULD BE THERE, TOMBSTONE EXPIRED
+        checkListingContainsPath(guardedFs, testFilePath);
 
-      // CHECK LISTING - THE FILE SHOULD BE THERE, TOMBSTONE EXPIRED
-      checkListingContainsPath(guardedFs, testFilePath);
+        // we can assert that the originalText is the new one, which created raw
+        LOG.info("Old: {}, New: {}, Read: {}", originalText, newText, newRead);
+        assertEquals("The text should be modified with a new.", newText,
+            newRead);
+      }
 
-      // we can assert that the originalText is the new one, which created raw
-      LOG.info("Old: {}, New: {}, Read: {}", originalText, newText, newRead);
-      assertEquals("The text should be modified with a new.", newText,
-          newRead);
     } finally {
       guardedFs.delete(testFilePath, true);
       guardedFs.setTtlTimeProvider(originalTimeProvider);
@@ -448,8 +457,16 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
       // SET TIME SO METADATA EXPIRES
       when(mockTimeProvider.getNow()).thenReturn(110L);
 
-      // WRITE TO DELETED DIRECTORY - SUCCESS
-      createNonRecursive(guardedFs, filePath);
+      // WRITE TO DELETED DIRECTORY
+      // - FAIL ON AUTH = TRUE
+      // - SUCCESS ON AUTH = FALSE
+      if (authoritative) {
+        intercept(FileNotFoundException.class, filePath.getParent().toString(),
+            "Parent does not exist, so in authoritative mode this should fail.",
+            () -> createNonRecursive(guardedFs, filePath));
+      } else {
+        createNonRecursive(guardedFs, filePath);
+      }
 
     } finally {
       guardedFs.delete(filePath, true);
@@ -546,13 +563,24 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
       when(mockTimeProvider.getNow()).thenReturn(100L + 2 * ttl);
 
       // DELETE IN GUARDED FS
+      // NOTE: in auth this will be ineffective:
+      //  we already have the tombstone marker on the item, it won't expire,
+      //  so we don't delete the raw S3 file.
       guardedFs.delete(filePath, true);
 
       // FILE MUST NOT EXIST IN RAW
-      intercept(FileNotFoundException.class, filePath.toString(),
-          "This file should throw FNFE when reading through "
-              + "the raw fs, and the guarded fs deleted the file.",
-          () -> rawFS.getFileStatus(filePath));
+      // If authoritative, the file status can be retrieved raw:
+      //    deleting with guarded FS won't do anything because the tombstone
+      //    marker won't expire in auth mode.
+      // If not authoritative, we go to the S3 bucket and get an FNFE
+      if (authoritative) {
+        rawFS.getFileStatus(filePath);
+      } else {
+        intercept(FileNotFoundException.class, filePath.toString(),
+            "This file should throw FNFE when reading through "
+                + "the raw fs, and the guarded fs deleted the file.",
+            () -> rawFS.getFileStatus(filePath));
+      }
 
     } finally {
       guardedFs.delete(filePath, true);
@@ -592,8 +620,13 @@ public class ITestS3GuardOutOfBandOperations extends AbstractS3ATestBase {
       checkListingDoesNotContainPath(guardedFs, testFile);
 
       // the tombstone is expired, so we should detect the file
+      // in non-authoritative mode
       when(mockTimeProvider.getNow()).thenReturn(100 + ttl);
-      checkListingContainsPath(guardedFs, testFile);
+      if (authoritative) {
+        checkListingDoesNotContainPath(guardedFs, testFile);
+      } else {
+        checkListingContainsPath(guardedFs, testFile);
+      }
     } finally {
       // cleanup
       guardedFs.delete(base, true);

+ 42 - 17
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardTtl.java

@@ -59,7 +59,7 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
    * Test array for parameterized test runs.
    * @return a list of parameter tuples.
    */
-  @Parameterized.Parameters
+  @Parameterized.Parameters(name = "auth={0}")
   public static Collection<Object[]> params() {
     return Arrays.asList(new Object[][]{
         {true}, {false}
@@ -133,21 +133,30 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
 
       // get an authoritative listing in ms
       fs.listStatus(dir);
+
       // check if authoritative
       DirListingMetadata dirListing =
-          S3Guard.listChildrenWithTtl(ms, dir, mockTimeProvider);
+          S3Guard.listChildrenWithTtl(ms, dir, mockTimeProvider, authoritative);
       assertTrue("Listing should be authoritative.",
           dirListing.isAuthoritative());
       // change the time, and assume it's not authoritative anymore
+      // if the metadatastore is not authoritative.
       when(mockTimeProvider.getNow()).thenReturn(102L);
-      dirListing = S3Guard.listChildrenWithTtl(ms, dir, mockTimeProvider);
-      assertFalse("Listing should not be authoritative.",
-          dirListing.isAuthoritative());
+      dirListing = S3Guard.listChildrenWithTtl(ms, dir, mockTimeProvider,
+          authoritative);
+      if (authoritative) {
+        assertTrue("Listing should be authoritative.",
+            dirListing.isAuthoritative());
+      } else {
+        assertFalse("Listing should not be authoritative.",
+            dirListing.isAuthoritative());
+      }
 
       // get an authoritative listing in ms again - retain test
       fs.listStatus(dir);
       // check if authoritative
-      dirListing = S3Guard.listChildrenWithTtl(ms, dir, mockTimeProvider);
+      dirListing = S3Guard.listChildrenWithTtl(ms, dir, mockTimeProvider,
+          authoritative);
       assertTrue("Listing shoud be authoritative after listStatus.",
           dirListing.isAuthoritative());
     } finally {
@@ -189,16 +198,24 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
       when(mockTimeProvider.getNow()).thenReturn(110L);
 
       // metadata is expired so this should refresh the metadata with
-      // last_updated to the getNow()
+      // last_updated to the getNow() if the store is not authoritative
       final FileStatus fileExpire1Status = fs.getFileStatus(fileExpire1);
       assertNotNull(fileExpire1Status);
-      assertEquals(110L, ms.get(fileExpire1).getLastUpdated());
+      if (authoritative) {
+        assertEquals(100L, ms.get(fileExpire1).getLastUpdated());
+      } else {
+        assertEquals(110L, ms.get(fileExpire1).getLastUpdated());
+      }
 
       // metadata is expired so this should refresh the metadata with
-      // last_updated to the getNow()
+      // last_updated to the getNow() if the store is not authoritative
       final FileStatus fileExpire2Status = fs.getFileStatus(fileExpire2);
       assertNotNull(fileExpire2Status);
-      assertEquals(110L, ms.get(fileExpire2).getLastUpdated());
+      if (authoritative) {
+        assertEquals(101L, ms.get(fileExpire2).getLastUpdated());
+      } else {
+        assertEquals(110L, ms.get(fileExpire2).getLastUpdated());
+      }
 
       final FileStatus fileRetainStatus = fs.getFileStatus(fileRetain);
       assertEquals("Modification time of these files should be equal.",
@@ -347,17 +364,25 @@ public class ITestS3GuardTtl extends AbstractS3ATestBase {
           .hasSize(11)
           .contains(tombstonedPath);
 
-      // listing will be filtered, and won't contain the tombstone with oldtime
+      // listing will be filtered if the store is not authoritative,
+      // and won't contain the tombstone with oldtime
       when(mockTimeProvider.getNow()).thenReturn(newTime);
-      final DirListingMetadata filteredDLM = getDirListingMetadata(ms,
-          baseDirPath);
+      final DirListingMetadata filteredDLM = S3Guard.listChildrenWithTtl(
+          ms, baseDirPath, mockTimeProvider, authoritative);
       containedPaths = filteredDLM.getListing().stream()
           .map(pm -> pm.getFileStatus().getPath())
           .collect(Collectors.toList());
-      Assertions.assertThat(containedPaths)
-          .describedAs("Full listing of path %s", baseDirPath)
-          .hasSize(10)
-          .doesNotContain(tombstonedPath);
+      if (authoritative) {
+        Assertions.assertThat(containedPaths)
+            .describedAs("Full listing of path %s", baseDirPath)
+            .hasSize(11)
+            .contains(tombstonedPath);
+      } else {
+        Assertions.assertThat(containedPaths)
+            .describedAs("Full listing of path %s", baseDirPath)
+            .hasSize(10)
+            .doesNotContain(tombstonedPath);
+      }
     } finally {
       fs.delete(baseDirPath, true);
       fs.setTtlTimeProvider(originalTimeProvider);

+ 3 - 3
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java

@@ -168,7 +168,7 @@ public class TestS3Guard extends Assert {
 
     // act
     final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider,
-        false);
+        false, false);
 
     // assert
     assertNull(pmExpired);
@@ -193,7 +193,7 @@ public class TestS3Guard extends Assert {
 
     // act
     final PathMetadata pmNotExpired =
-        S3Guard.getWithTtl(ms, path, timeProvider, false);
+        S3Guard.getWithTtl(ms, path, timeProvider, false, false);
 
     // assert
     assertNotNull(pmNotExpired);
@@ -220,7 +220,7 @@ public class TestS3Guard extends Assert {
 
     // act
     final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider,
-        false);
+        false, false);
 
     // assert
     assertNotNull(pmExpired);