Browse Source

HADOOP-15621 S3Guard: Implement time-based (TTL) expiry for Authoritative Directory Listing. Contributed by Gabor Bota

Aaron Fabbri 6 years ago
parent
commit
046b8768af

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -1368,6 +1368,16 @@
     </description>
     </description>
 </property>
 </property>
 
 
+<property>
+    <name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
+    <value>3600000</value>
+    <description>
+        This value sets how long a directory listing in the MS is considered as
+        authoritative. The value is in milliseconds.
+        MetadataStore should be authoritative to use this configuration knob.
+    </description>
+</property>
+
 <property>
 <property>
     <name>fs.s3a.metadatastore.impl</name>
     <name>fs.s3a.metadatastore.impl</name>
     <value>org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore</value>
     <value>org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore</value>

+ 10 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -21,6 +21,8 @@ package org.apache.hadoop.fs.s3a;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
 
 
+import java.util.concurrent.TimeUnit;
+
 /**
 /**
  * All the constants used with the {@link S3AFileSystem}.
  * All the constants used with the {@link S3AFileSystem}.
  *
  *
@@ -327,6 +329,14 @@ public final class Constants {
       "fs.s3a.metadatastore.authoritative";
       "fs.s3a.metadatastore.authoritative";
   public static final boolean DEFAULT_METADATASTORE_AUTHORITATIVE = false;
   public static final boolean DEFAULT_METADATASTORE_AUTHORITATIVE = false;
 
 
+  /**
+   * How long a directory listing in the MS is considered as authoritative.
+   */
+  public static final String METADATASTORE_AUTHORITATIVE_DIR_TTL =
+      "fs.s3a.metadatastore.authoritative.dir.ttl";
+  public static final long DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL =
+      TimeUnit.MINUTES.toMillis(60);
+
   /** read ahead buffer size to prevent connection re-establishments. */
   /** read ahead buffer size to prevent connection re-establishments. */
   public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
   public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
   public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
   public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;

+ 25 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -205,6 +205,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
 
 
   private AWSCredentialProviderList credentials;
   private AWSCredentialProviderList credentials;
 
 
+  private S3Guard.ITtlTimeProvider ttlTimeProvider;
+
   /** Add any deprecated keys. */
   /** Add any deprecated keys. */
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
   private static void addDeprecatedKeys() {
   private static void addDeprecatedKeys() {
@@ -345,6 +347,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
             getMetadataStore(), allowAuthoritative);
             getMetadataStore(), allowAuthoritative);
       }
       }
       initMultipartUploads(conf);
       initMultipartUploads(conf);
+      long authDirTtl = conf.getLong(METADATASTORE_AUTHORITATIVE_DIR_TTL,
+          DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
+      ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
     } catch (AmazonClientException e) {
     } catch (AmazonClientException e) {
       throw translateException("initializing ", new Path(name), e);
       throw translateException("initializing ", new Path(name), e);
     }
     }
@@ -1907,7 +1912,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
         key = key + '/';
         key = key + '/';
       }
       }
 
 
-      DirListingMetadata dirMeta = metadataStore.listChildren(path);
+      DirListingMetadata dirMeta =
+          S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
       if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
       if (allowAuthoritative && dirMeta != null && dirMeta.isAuthoritative()) {
         return S3Guard.dirMetaToStatuses(dirMeta);
         return S3Guard.dirMetaToStatuses(dirMeta);
       }
       }
@@ -1925,7 +1931,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
         result.add(files.next());
         result.add(files.next());
       }
       }
       return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
       return S3Guard.dirListingUnion(metadataStore, path, result, dirMeta,
-          allowAuthoritative);
+          allowAuthoritative, ttlTimeProvider);
     } else {
     } else {
       LOG.debug("Adding: rd (not a dir): {}", path);
       LOG.debug("Adding: rd (not a dir): {}", path);
       FileStatus[] stats = new FileStatus[1];
       FileStatus[] stats = new FileStatus[1];
@@ -2135,7 +2141,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
           // We have a definitive true / false from MetadataStore, we are done.
           // We have a definitive true / false from MetadataStore, we are done.
           return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
           return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
         } else {
         } else {
-          DirListingMetadata children = metadataStore.listChildren(path);
+          DirListingMetadata children =
+              S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
           if (children != null) {
           if (children != null) {
             tombstones = children.listTombstones();
             tombstones = children.listTombstones();
           }
           }
@@ -3122,7 +3129,8 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
           tombstones = metadataStoreListFilesIterator.listTombstones();
           tombstones = metadataStoreListFilesIterator.listTombstones();
           cachedFilesIterator = metadataStoreListFilesIterator;
           cachedFilesIterator = metadataStoreListFilesIterator;
         } else {
         } else {
-          DirListingMetadata meta = metadataStore.listChildren(path);
+          DirListingMetadata meta =
+              S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider);
           if (meta != null) {
           if (meta != null) {
             tombstones = meta.listTombstones();
             tombstones = meta.listTombstones();
           } else {
           } else {
@@ -3195,7 +3203,9 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
             final String key = maybeAddTrailingSlash(pathToKey(path));
             final String key = maybeAddTrailingSlash(pathToKey(path));
             final Listing.FileStatusAcceptor acceptor =
             final Listing.FileStatusAcceptor acceptor =
                 new Listing.AcceptAllButSelfAndS3nDirs(path);
                 new Listing.AcceptAllButSelfAndS3nDirs(path);
-            DirListingMetadata meta = metadataStore.listChildren(path);
+            DirListingMetadata meta =
+                S3Guard.listChildrenWithTtl(metadataStore, path,
+                    ttlTimeProvider);
             final RemoteIterator<FileStatus> cachedFileStatusIterator =
             final RemoteIterator<FileStatus> cachedFileStatusIterator =
                 listing.createProvidedFileStatusIterator(
                 listing.createProvidedFileStatusIterator(
                     S3Guard.dirMetaToStatuses(meta), filter, acceptor);
                     S3Guard.dirMetaToStatuses(meta), filter, acceptor);
@@ -3346,4 +3356,14 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities {
     LOG.debug("Sharing credentials for: {}", purpose);
     LOG.debug("Sharing credentials for: {}", purpose);
     return credentials.share();
     return credentials.share();
   }
   }
+
+  @VisibleForTesting
+  protected S3Guard.ITtlTimeProvider getTtlTimeProvider() {
+    return ttlTimeProvider;
+  }
+
+  @VisibleForTesting
+  protected void setTtlTimeProvider(S3Guard.ITtlTimeProvider ttlTimeProvider) {
+    this.ttlTimeProvider = ttlTimeProvider;
+  }
 }
 }

+ 10 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DDBPathMetadata.java

@@ -30,14 +30,10 @@ public class DDBPathMetadata extends PathMetadata {
 
 
   private boolean isAuthoritativeDir;
   private boolean isAuthoritativeDir;
 
 
-  public DDBPathMetadata(PathMetadata pmd, boolean isAuthoritativeDir) {
-    super(pmd.getFileStatus(), pmd.isEmptyDirectory(), pmd.isDeleted());
-    this.isAuthoritativeDir = isAuthoritativeDir;
-  }
-
   public DDBPathMetadata(PathMetadata pmd) {
   public DDBPathMetadata(PathMetadata pmd) {
     super(pmd.getFileStatus(), pmd.isEmptyDirectory(), pmd.isDeleted());
     super(pmd.getFileStatus(), pmd.isEmptyDirectory(), pmd.isDeleted());
     this.isAuthoritativeDir = false;
     this.isAuthoritativeDir = false;
+    this.setLastUpdated(pmd.getLastUpdated());
   }
   }
 
 
   public DDBPathMetadata(FileStatus fileStatus) {
   public DDBPathMetadata(FileStatus fileStatus) {
@@ -52,9 +48,10 @@ public class DDBPathMetadata extends PathMetadata {
   }
   }
 
 
   public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
   public DDBPathMetadata(FileStatus fileStatus, Tristate isEmptyDir,
-      boolean isDeleted, boolean isAuthoritativeDir) {
+      boolean isDeleted, boolean isAuthoritativeDir, long lastUpdated) {
     super(fileStatus, isEmptyDir, isDeleted);
     super(fileStatus, isEmptyDir, isDeleted);
     this.isAuthoritativeDir = isAuthoritativeDir;
     this.isAuthoritativeDir = isAuthoritativeDir;
+    this.setLastUpdated(lastUpdated);
   }
   }
 
 
   public boolean isAuthoritativeDir() {
   public boolean isAuthoritativeDir() {
@@ -74,4 +71,11 @@ public class DDBPathMetadata extends PathMetadata {
     return super.hashCode();
     return super.hashCode();
   }
   }
 
 
+  @Override public String toString() {
+    return "DDBPathMetadata{" +
+        "isAuthoritativeDir=" + isAuthoritativeDir +
+        ", lastUpdated=" + this.getLastUpdated() +
+        ", PathMetadata=" + super.toString() +
+        '}';
+  }
 }
 }

+ 12 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java

@@ -42,7 +42,7 @@ import org.apache.hadoop.fs.s3a.Tristate;
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving
-public class DirListingMetadata {
+public class DirListingMetadata extends ExpirableMetadata {
 
 
   /**
   /**
    * Convenience parameter for passing into constructor.
    * Convenience parameter for passing into constructor.
@@ -69,7 +69,7 @@ public class DirListingMetadata {
    *     the full and authoritative listing of all files in the directory.
    *     the full and authoritative listing of all files in the directory.
    */
    */
   public DirListingMetadata(Path path, Collection<PathMetadata> listing,
   public DirListingMetadata(Path path, Collection<PathMetadata> listing,
-      boolean isAuthoritative) {
+      boolean isAuthoritative, long lastUpdated) {
 
 
     checkPathAbsolute(path);
     checkPathAbsolute(path);
     this.path = path;
     this.path = path;
@@ -82,6 +82,12 @@ public class DirListingMetadata {
       }
       }
     }
     }
     this.isAuthoritative  = isAuthoritative;
     this.isAuthoritative  = isAuthoritative;
+    this.setLastUpdated(lastUpdated);
+  }
+
+  public DirListingMetadata(Path path, Collection<PathMetadata> listing,
+      boolean isAuthoritative) {
+    this(path, listing, isAuthoritative, 0);
   }
   }
 
 
   /**
   /**
@@ -91,6 +97,7 @@ public class DirListingMetadata {
   public DirListingMetadata(DirListingMetadata d) {
   public DirListingMetadata(DirListingMetadata d) {
     path = d.path;
     path = d.path;
     isAuthoritative = d.isAuthoritative;
     isAuthoritative = d.isAuthoritative;
+    this.setLastUpdated(d.getLastUpdated());
     listMap = new ConcurrentHashMap<>(d.listMap);
     listMap = new ConcurrentHashMap<>(d.listMap);
   }
   }
 
 
@@ -125,7 +132,8 @@ public class DirListingMetadata {
         filteredList.add(meta);
         filteredList.add(meta);
       }
       }
     }
     }
-    return new DirListingMetadata(path, filteredList, isAuthoritative);
+    return new DirListingMetadata(path, filteredList, isAuthoritative,
+        this.getLastUpdated());
   }
   }
 
 
   /**
   /**
@@ -231,6 +239,7 @@ public class DirListingMetadata {
         "path=" + path +
         "path=" + path +
         ", listMap=" + listMap +
         ", listMap=" + listMap +
         ", isAuthoritative=" + isAuthoritative +
         ", isAuthoritative=" + isAuthoritative +
+        ", lastUpdated=" + this.getLastUpdated() +
         '}';
         '}';
   }
   }
 
 

+ 4 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

@@ -632,7 +632,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
 
 
           return (metas.isEmpty() && dirPathMeta == null)
           return (metas.isEmpty() && dirPathMeta == null)
               ? null
               ? null
-              : new DirListingMetadata(path, metas, isAuthoritative);
+              : new DirListingMetadata(path, metas, isAuthoritative,
+              dirPathMeta.getLastUpdated());
         });
         });
   }
   }
 
 
@@ -864,7 +865,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
       if (!itemExists(item)) {
       if (!itemExists(item)) {
         final FileStatus status = makeDirStatus(path, username);
         final FileStatus status = makeDirStatus(path, username);
         metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false,
         metasToPut.add(new DDBPathMetadata(status, Tristate.FALSE, false,
-            meta.isAuthoritativeDir()));
+            meta.isAuthoritativeDir(), meta.getLastUpdated()));
         path = path.getParent();
         path = path.getParent();
       } else {
       } else {
         break;
         break;
@@ -907,7 +908,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
     Path path = meta.getPath();
     Path path = meta.getPath();
     DDBPathMetadata ddbPathMeta =
     DDBPathMetadata ddbPathMeta =
         new DDBPathMetadata(makeDirStatus(path, username), meta.isEmpty(),
         new DDBPathMetadata(makeDirStatus(path, username), meta.isEmpty(),
-            false, meta.isAuthoritative());
+            false, meta.isAuthoritative(), meta.getLastUpdated());
 
 
     // First add any missing ancestors...
     // First add any missing ancestors...
     final Collection<DDBPathMetadata> metasToPut = fullPathsToPut(ddbPathMeta);
     final Collection<DDBPathMetadata> metasToPut = fullPathsToPut(ddbPathMeta);

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.s3a.Tristate;
  */
  */
 @InterfaceAudience.Private
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 @InterfaceStability.Evolving
-public class PathMetadata {
+public class PathMetadata extends ExpirableMetadata {
 
 
   private final FileStatus fileStatus;
   private final FileStatus fileStatus;
   private Tristate isEmptyDirectory;
   private Tristate isEmptyDirectory;

+ 25 - 36
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java

@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.List;
+import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
 
 
 import com.amazonaws.services.dynamodbv2.document.Item;
 import com.amazonaws.services.dynamodbv2.document.Item;
@@ -67,6 +69,11 @@ final class PathMetadataDynamoDBTranslation {
   static final String BLOCK_SIZE = "block_size";
   static final String BLOCK_SIZE = "block_size";
   static final String IS_DELETED = "is_deleted";
   static final String IS_DELETED = "is_deleted";
   static final String IS_AUTHORITATIVE = "is_authoritative";
   static final String IS_AUTHORITATIVE = "is_authoritative";
+  static final String LAST_UPDATED = "last_updated";
+
+  /** Used while testing backward compatibility. */
+  @VisibleForTesting
+  static final Set<String> IGNORED_FIELDS = new HashSet<>();
 
 
   /** Table version field {@value} in version marker item. */
   /** Table version field {@value} in version marker item. */
   @VisibleForTesting
   @VisibleForTesting
@@ -107,23 +114,7 @@ final class PathMetadataDynamoDBTranslation {
    * @param item DynamoDB item to convert
    * @param item DynamoDB item to convert
    * @return {@code item} converted to a {@link DDBPathMetadata}
    * @return {@code item} converted to a {@link DDBPathMetadata}
    */
    */
-  static DDBPathMetadata itemToPathMetadata(Item item, String username)
-      throws IOException {
-    return itemToPathMetadata(item, username, false);
-  }
-
-  /**
-   * Converts a DynamoDB item to a {@link DDBPathMetadata}.
-   * Can ignore {@code IS_AUTHORITATIVE} flag if {@code ignoreIsAuthFlag} is
-   * true.
-   *
-   * @param item DynamoDB item to convert
-   * @param ignoreIsAuthFlag if true, ignore the authoritative flag on item
-   * @return {@code item} converted to a {@link DDBPathMetadata}
-   */
-  static DDBPathMetadata itemToPathMetadata(Item item, String username,
-      boolean ignoreIsAuthFlag)
-      throws IOException {
+  static DDBPathMetadata itemToPathMetadata(Item item, String username) {
     if (item == null) {
     if (item == null) {
       return null;
       return null;
     }
     }
@@ -145,11 +136,11 @@ final class PathMetadataDynamoDBTranslation {
     boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
     boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
     boolean isAuthoritativeDir = false;
     boolean isAuthoritativeDir = false;
     final FileStatus fileStatus;
     final FileStatus fileStatus;
+    long lastUpdated = 0;
     if (isDir) {
     if (isDir) {
-      if (!ignoreIsAuthFlag) {
-        isAuthoritativeDir = item.hasAttribute(IS_AUTHORITATIVE)
-            && item.getBoolean(IS_AUTHORITATIVE);
-      }
+      isAuthoritativeDir = !IGNORED_FIELDS.contains(IS_AUTHORITATIVE)
+          && item.hasAttribute(IS_AUTHORITATIVE)
+          && item.getBoolean(IS_AUTHORITATIVE);
       fileStatus = DynamoDBMetadataStore.makeDirStatus(path, username);
       fileStatus = DynamoDBMetadataStore.makeDirStatus(path, username);
     } else {
     } else {
       long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
       long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
@@ -158,21 +149,16 @@ final class PathMetadataDynamoDBTranslation {
       fileStatus = new FileStatus(len, false, 1, block, modTime, 0, null,
       fileStatus = new FileStatus(len, false, 1, block, modTime, 0, null,
           username, username, path);
           username, username, path);
     }
     }
+    lastUpdated =
+        !IGNORED_FIELDS.contains(LAST_UPDATED)
+            && item.hasAttribute(LAST_UPDATED)
+            ? item.getLong(LAST_UPDATED) : 0;
+
     boolean isDeleted =
     boolean isDeleted =
         item.hasAttribute(IS_DELETED) && item.getBoolean(IS_DELETED);
         item.hasAttribute(IS_DELETED) && item.getBoolean(IS_DELETED);
 
 
     return new DDBPathMetadata(fileStatus, Tristate.UNKNOWN, isDeleted,
     return new DDBPathMetadata(fileStatus, Tristate.UNKNOWN, isDeleted,
-        isAuthoritativeDir);
-  }
-
-  /**
-   * Converts a {@link DDBPathMetadata} to a DynamoDB item.
-   *
-   * @param meta {@link DDBPathMetadata} to convert
-   * @return {@code meta} converted to DynamoDB item
-   */
-  static Item pathMetadataToItem(DDBPathMetadata meta) {
-    return pathMetadataToItem(meta, false);
+        isAuthoritativeDir, lastUpdated);
   }
   }
 
 
   /**
   /**
@@ -182,17 +168,15 @@ final class PathMetadataDynamoDBTranslation {
    * true.
    * true.
    *
    *
    * @param meta {@link DDBPathMetadata} to convert
    * @param meta {@link DDBPathMetadata} to convert
-   * @param ignoreIsAuthFlag if true, ignore the authoritative flag on item
    * @return {@code meta} converted to DynamoDB item
    * @return {@code meta} converted to DynamoDB item
    */
    */
-  static Item pathMetadataToItem(DDBPathMetadata meta,
-      boolean ignoreIsAuthFlag) {
+  static Item pathMetadataToItem(DDBPathMetadata meta) {
     Preconditions.checkNotNull(meta);
     Preconditions.checkNotNull(meta);
     final FileStatus status = meta.getFileStatus();
     final FileStatus status = meta.getFileStatus();
     final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
     final Item item = new Item().withPrimaryKey(pathToKey(status.getPath()));
     if (status.isDirectory()) {
     if (status.isDirectory()) {
       item.withBoolean(IS_DIR, true);
       item.withBoolean(IS_DIR, true);
-      if (!ignoreIsAuthFlag) {
+      if (!IGNORED_FIELDS.contains(IS_AUTHORITATIVE)) {
         item.withBoolean(IS_AUTHORITATIVE, meta.isAuthoritativeDir());
         item.withBoolean(IS_AUTHORITATIVE, meta.isAuthoritativeDir());
       }
       }
     } else {
     } else {
@@ -201,6 +185,11 @@ final class PathMetadataDynamoDBTranslation {
           .withLong(BLOCK_SIZE, status.getBlockSize());
           .withLong(BLOCK_SIZE, status.getBlockSize());
     }
     }
     item.withBoolean(IS_DELETED, meta.isDeleted());
     item.withBoolean(IS_DELETED, meta.isDeleted());
+
+    if(!IGNORED_FIELDS.contains(LAST_UPDATED)) {
+      item.withLong(LAST_UPDATED, meta.getLastUpdated());
+    }
+
     return item;
     return item;
   }
   }
 
 

+ 57 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java

@@ -194,7 +194,8 @@ public final class S3Guard {
    */
    */
   public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
   public static FileStatus[] dirListingUnion(MetadataStore ms, Path path,
       List<FileStatus> backingStatuses, DirListingMetadata dirMeta,
       List<FileStatus> backingStatuses, DirListingMetadata dirMeta,
-      boolean isAuthoritative) throws IOException {
+      boolean isAuthoritative, ITtlTimeProvider timeProvider)
+      throws IOException {
 
 
     // Fast-path for NullMetadataStore
     // Fast-path for NullMetadataStore
     if (isNullMetadataStore(ms)) {
     if (isNullMetadataStore(ms)) {
@@ -241,7 +242,7 @@ public final class S3Guard {
 
 
     if (changed && isAuthoritative) {
     if (changed && isAuthoritative) {
       dirMeta.setAuthoritative(true); // This is the full directory contents
       dirMeta.setAuthoritative(true); // This is the full directory contents
-      ms.put(dirMeta);
+      S3Guard.putWithTtl(ms, dirMeta, timeProvider);
     }
     }
 
 
     return dirMetaToStatuses(dirMeta);
     return dirMetaToStatuses(dirMeta);
@@ -282,7 +283,7 @@ public final class S3Guard {
   @Deprecated
   @Deprecated
   @Retries.OnceExceptionsSwallowed
   @Retries.OnceExceptionsSwallowed
   public static void makeDirsOrdered(MetadataStore ms, List<Path> dirs,
   public static void makeDirsOrdered(MetadataStore ms, List<Path> dirs,
-      String owner, boolean authoritative) {
+      String owner, boolean authoritative, ITtlTimeProvider timeProvider) {
     if (dirs == null) {
     if (dirs == null) {
       return;
       return;
     }
     }
@@ -326,7 +327,7 @@ public final class S3Guard {
             children.add(new PathMetadata(prevStatus));
             children.add(new PathMetadata(prevStatus));
           }
           }
           dirMeta = new DirListingMetadata(f, children, authoritative);
           dirMeta = new DirListingMetadata(f, children, authoritative);
-          ms.put(dirMeta);
+          S3Guard.putWithTtl(ms, dirMeta, timeProvider);
         }
         }
 
 
         pathMetas.add(new PathMetadata(status));
         pathMetas.add(new PathMetadata(status));
@@ -487,4 +488,56 @@ public final class S3Guard {
       assertQualified(path);
       assertQualified(path);
     }
     }
   }
   }
+
+  /**
+   * This interface is defined for testing purposes.
+   * TTL can be tested by implementing this interface and setting is as
+   * {@code S3Guard.ttlTimeProvider}. By doing this, getNow() can return any
+   * value preferred and flaky tests could be avoided.
+   */
+  public interface ITtlTimeProvider {
+    long getNow();
+    long getAuthoritativeDirTtl();
+  }
+
+  /**
+   * Runtime implementation for TTL Time Provider interface.
+   */
+  public static class TtlTimeProvider implements ITtlTimeProvider {
+    private long authoritativeDirTtl;
+
+    public TtlTimeProvider(long authoritativeDirTtl) {
+      this.authoritativeDirTtl = authoritativeDirTtl;
+    }
+
+    @Override
+    public long getNow() {
+      return System.currentTimeMillis();
+    }
+
+    @Override public long getAuthoritativeDirTtl() {
+      return authoritativeDirTtl;
+    }
+  }
+
+  public static void putWithTtl(MetadataStore ms, DirListingMetadata dirMeta,
+      ITtlTimeProvider timeProvider)
+      throws IOException {
+    dirMeta.setLastUpdated(timeProvider.getNow());
+    ms.put(dirMeta);
+  }
+
+  public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
+      Path path, ITtlTimeProvider timeProvider)
+      throws IOException {
+    long ttl = timeProvider.getAuthoritativeDirTtl();
+
+    DirListingMetadata dlm = ms.listChildren(path);
+
+    if(dlm != null && dlm.isAuthoritative()
+        && dlm.isExpired(ttl, timeProvider.getNow())) {
+      dlm.setAuthoritative(false);
+    }
+    return dlm;
+  }
 }
 }

+ 11 - 0
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/s3guard.md

@@ -165,6 +165,17 @@ In particular: **If the Metadata Store is declared as authoritative,
 all interactions with the S3 bucket(s) must be through S3A clients sharing
 all interactions with the S3 bucket(s) must be through S3A clients sharing
 the same Metadata Store**
 the same Metadata Store**
 
 
+It can be configured how long a directory listing in the MetadataStore is
+considered as authoritative. If `((lastUpdated + ttl) <= now)` is false, the
+directory  listing is no longer considered authoritative, so the flag will be
+removed on `S3AFileSystem` level.
+
+```xml
+<property>
+    <name>fs.s3a.metadatastore.authoritative.dir.ttl</name>
+    <value>3600000</value>
+</property>
+```
 
 
 ### 3. Configure the Metadata Store.
 ### 3. Configure the Metadata Store.
 
 

+ 13 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 import org.apache.hadoop.fs.s3a.commit.CommitConstants;
 
 
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStoreCapabilities;
 import org.hamcrest.core.Is;
 import org.hamcrest.core.Is;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Assume;
@@ -46,6 +48,7 @@ import java.net.URISyntaxException;
 import java.text.DateFormat;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
 import java.text.SimpleDateFormat;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
 
 
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
@@ -906,4 +909,14 @@ public final class S3ATestUtils {
         .contains(providerClassname);
         .contains(providerClassname);
   }
   }
 
 
+  public static boolean metadataStorePersistsAuthoritativeBit(MetadataStore ms)
+      throws IOException {
+    Map<String, String> diags = ms.getDiagnostics();
+    String persists =
+        diags.get(MetadataStoreCapabilities.PERSISTS_AUTHORITATIVE_BIT);
+    if(persists == null){
+      return false;
+    }
+    return Boolean.valueOf(persists);
+  }
 }
 }

+ 5 - 11
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java

@@ -24,7 +24,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Set;
-import java.util.Map;
 
 
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets;
 import org.junit.After;
 import org.junit.After;
@@ -44,6 +43,9 @@ import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.test.HadoopTestBase;
 import org.apache.hadoop.test.HadoopTestBase;
 
 
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.isMetadataStoreAuthoritative;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.metadataStorePersistsAuthoritativeBit;
+
 /**
 /**
  * Main test class for MetadataStore implementations.
  * Main test class for MetadataStore implementations.
  * Implementations should each create a test by subclassing this and
  * Implementations should each create a test by subclassing this and
@@ -511,21 +513,13 @@ public abstract class MetadataStoreTestBase extends HadoopTestBase {
     }
     }
   }
   }
 
 
-  private boolean isMetadataStoreAuthoritative() throws IOException {
-    Map<String, String> diags = ms.getDiagnostics();
-    String isAuth =
-        diags.get(MetadataStoreCapabilities.PERSISTS_AUTHORITATIVE_BIT);
-    if(isAuth == null){
-      return false;
-    }
-    return Boolean.valueOf(isAuth);
-  }
+
 
 
   @Test
   @Test
   public void testListChildrenAuthoritative() throws IOException {
   public void testListChildrenAuthoritative() throws IOException {
     Assume.assumeTrue("MetadataStore should be capable for authoritative "
     Assume.assumeTrue("MetadataStore should be capable for authoritative "
         + "storage of directories to run this test.",
         + "storage of directories to run this test.",
-        isMetadataStoreAuthoritative());
+        metadataStorePersistsAuthoritativeBit(ms));
 
 
     setupListStatus();
     setupListStatus();
 
 

+ 46 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java

@@ -29,6 +29,7 @@ import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
 import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
 import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
 import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
 import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Preconditions;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Rule;
@@ -114,6 +115,11 @@ public class TestPathMetadataDynamoDBTranslation extends Assert {
     }
     }
   }
   }
 
 
+  @After
+  public void tearDown() {
+    PathMetadataDynamoDBTranslation.IGNORED_FIELDS.clear();
+  }
+
   @Test
   @Test
   public void testAttributeDefinitions() {
   public void testAttributeDefinitions() {
     final Collection<AttributeDefinition> attrs =
     final Collection<AttributeDefinition> attrs =
@@ -248,10 +254,11 @@ public class TestPathMetadataDynamoDBTranslation extends Assert {
       throws Exception {
       throws Exception {
     Item item = Mockito.spy(TEST_DIR_ITEM);
     Item item = Mockito.spy(TEST_DIR_ITEM);
     item.withBoolean(IS_AUTHORITATIVE, true);
     item.withBoolean(IS_AUTHORITATIVE, true);
+    PathMetadataDynamoDBTranslation.IGNORED_FIELDS.add(IS_AUTHORITATIVE);
 
 
     final String user =
     final String user =
         UserGroupInformation.getCurrentUser().getShortUserName();
         UserGroupInformation.getCurrentUser().getShortUserName();
-    DDBPathMetadata meta = itemToPathMetadata(item, user, true);
+    DDBPathMetadata meta = itemToPathMetadata(item, user);
 
 
     Mockito.verify(item, Mockito.never()).getBoolean(IS_AUTHORITATIVE);
     Mockito.verify(item, Mockito.never()).getBoolean(IS_AUTHORITATIVE);
     assertFalse(meta.isAuthoritativeDir());
     assertFalse(meta.isAuthoritativeDir());
@@ -265,11 +272,48 @@ public class TestPathMetadataDynamoDBTranslation extends Assert {
   public void testIsAuthoritativeCompatibilityPathMetadataToItem() {
   public void testIsAuthoritativeCompatibilityPathMetadataToItem() {
     DDBPathMetadata meta = Mockito.spy(testFilePathMetadata);
     DDBPathMetadata meta = Mockito.spy(testFilePathMetadata);
     meta.setAuthoritativeDir(true);
     meta.setAuthoritativeDir(true);
+    PathMetadataDynamoDBTranslation.IGNORED_FIELDS.add(IS_AUTHORITATIVE);
 
 
-    Item item = pathMetadataToItem(meta, true);
+    Item item = pathMetadataToItem(meta);
 
 
     Mockito.verify(meta, never()).isAuthoritativeDir();
     Mockito.verify(meta, never()).isAuthoritativeDir();
     assertFalse(item.hasAttribute(IS_AUTHORITATIVE));
     assertFalse(item.hasAttribute(IS_AUTHORITATIVE));
   }
   }
 
 
+
+  /**
+   * Test when translating an {@link Item} to {@link DDBPathMetadata} works
+   * if {@code LAST_UPDATED} flag is ignored.
+   */
+  @Test
+  public void testIsLastUpdatedCompatibilityItemToPathMetadata()
+      throws Exception {
+    Item item = Mockito.spy(TEST_DIR_ITEM);
+    item.withLong(LAST_UPDATED, 100);
+    PathMetadataDynamoDBTranslation.IGNORED_FIELDS.add(LAST_UPDATED);
+
+    final String user =
+        UserGroupInformation.getCurrentUser().getShortUserName();
+    DDBPathMetadata meta = itemToPathMetadata(item, user);
+
+    Mockito.verify(item, Mockito.never()).getLong(LAST_UPDATED);
+    assertFalse(meta.isAuthoritativeDir());
+  }
+
+  /**
+   * Test when translating an {@link DDBPathMetadata} to {@link Item} works
+   * if {@code LAST_UPDATED} flag is ignored.
+   */
+  @Test
+  public void testIsLastUpdatedCompatibilityPathMetadataToItem() {
+    DDBPathMetadata meta = Mockito.spy(testFilePathMetadata);
+    meta.setLastUpdated(100);
+    PathMetadataDynamoDBTranslation.IGNORED_FIELDS.add(LAST_UPDATED);
+
+    Item item = pathMetadataToItem(meta);
+
+    Mockito.verify(meta, never()).getLastUpdated();
+    assertFalse(item.hasAttribute(LAST_UPDATED));
+  }
+
 }
 }

+ 5 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestS3Guard.java

@@ -27,6 +27,8 @@ import org.junit.Test;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.Path;
 
 
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL;
+
 /**
 /**
  * Tests for the {@link S3Guard} utility class.
  * Tests for the {@link S3Guard} utility class.
  */
  */
@@ -54,8 +56,10 @@ public class TestS3Guard extends Assert {
         makeFileStatus("s3a://bucket/dir/s3-file4", false)
         makeFileStatus("s3a://bucket/dir/s3-file4", false)
     );
     );
 
 
+    S3Guard.ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
+        DEFAULT_METADATASTORE_AUTHORITATIVE_DIR_TTL);
     FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing,
     FileStatus[] result = S3Guard.dirListingUnion(ms, dirPath, s3Listing,
-        dirMeta, false);
+        dirMeta, false, timeProvider);
 
 
     assertEquals("listing length", 4, result.length);
     assertEquals("listing length", 4, result.length);
     assertContainsPath(result, "s3a://bucket/dir/ms-file1");
     assertContainsPath(result, "s3a://bucket/dir/ms-file1");