Просмотр исходного кода

HADOOP-13914. S3guard: Improve S3AFileStatus#isEmptyDirectory handling. Contributed by Aaron Fabbri

Mingliang Liu 8 лет назад
Родитель
Сommit
28319eb72c
27 измененных файлов с 551 добавлено и 189 удалено
  1. 12 2
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
  2. 6 0
      hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
  3. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java
  4. 44 6
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
  5. 92 29
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  6. 1 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
  7. 32 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java
  8. 21 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java
  9. 47 10
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java
  10. 29 83
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java
  11. 15 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java
  12. 6 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java
  13. 24 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java
  14. 3 3
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java
  15. 6 5
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java
  16. 12 16
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java
  17. 2 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java
  18. 2 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java
  19. 13 8
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java
  20. 9 7
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java
  21. 8 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java
  22. 83 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java
  23. 63 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java
  24. 10 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java
  25. 2 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java
  26. 6 7
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java
  27. 2 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java

+ 12 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java

@@ -54,12 +54,22 @@ public abstract class FileSystemContractBaseTest extends TestCase {
 
   @Override
   protected void tearDown() throws Exception {
+    if (fs != null) {
+      // some cases use this absolute path
+      cleanupDir(path("/test"));
+      // others use this relative path
+      cleanupDir(path("test"));
+    }
+  }
+
+  private void cleanupDir(Path p) {
     try {
       if (fs != null) {
-        fs.delete(path("/test"), true);
+        LOG.info("Deleting " + p);
+        fs.delete(p, true);
       }
     } catch (IOException e) {
-      LOG.error("Error deleting /test: " + e, e);
+      LOG.error("Error deleting test dir: " + p, e);
     }
   }
   

+ 6 - 0
hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml

@@ -26,4 +26,10 @@
   <Match>
     <Class name="org.apache.hadoop.fs.s3.INode" />
   </Match>
+  <!-- Redundant null check makes code clearer, future-proof here. -->
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.S3AFileSystem" />
+    <Method name="s3Exists" />
+    <Bug pattern="RCN_REDUNDANT_NULLCHECK_OF_NONNULL_VALUE" />
+  </Match>
 </FindBugsFilter>

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

@@ -312,7 +312,7 @@ public class Listing {
       for (String prefix : objects.getCommonPrefixes()) {
         Path keyPath = owner.keyToQualifiedPath(prefix);
         if (acceptor.accept(keyPath, prefix) && filter.accept(keyPath)) {
-          FileStatus status = new S3AFileStatus(false, keyPath,
+          FileStatus status = new S3AFileStatus(Tristate.FALSE, keyPath,
               owner.getUsername());
           LOG.debug("Adding directory: {}", status);
           added++;

+ 44 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java

@@ -31,7 +31,7 @@ import org.apache.hadoop.fs.Path;
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AFileStatus extends FileStatus {
-  private boolean isEmptyDirectory;
+  private Tristate isEmptyDirectory;
 
   /**
    * Create a directory status.
@@ -42,6 +42,18 @@ public class S3AFileStatus extends FileStatus {
   public S3AFileStatus(boolean isemptydir,
       Path path,
       String owner) {
+    this(Tristate.fromBool(isemptydir), path, owner);
+  }
+
+  /**
+   * Create a directory status.
+   * @param isemptydir is this an empty directory?
+   * @param path the path
+   * @param owner the owner
+   */
+  public S3AFileStatus(Tristate isemptydir,
+      Path path,
+      String owner) {
     super(0, true, 1, 0, 0, path);
     isEmptyDirectory = isemptydir;
     setOwner(owner);
@@ -59,12 +71,37 @@ public class S3AFileStatus extends FileStatus {
   public S3AFileStatus(long length, long modification_time, Path path,
       long blockSize, String owner) {
     super(length, false, 1, blockSize, modification_time, path);
-    isEmptyDirectory = false;
+    isEmptyDirectory = Tristate.FALSE;
     setOwner(owner);
     setGroup(owner);
   }
 
-  public boolean isEmptyDirectory() {
+  /**
+   * Convenience constructor for creating from a vanilla FileStatus plus
+   * an isEmptyDirectory flag.
+   * @param source FileStatus to convert to S3AFileStatus
+   * @param isEmptyDirectory TRUE/FALSE if known to be / not be an empty
+   *     directory, UNKNOWN if that information was not computed.
+   * @return a new S3AFileStatus
+   */
+  public static S3AFileStatus fromFileStatus(FileStatus source,
+      Tristate isEmptyDirectory) {
+    if (source.isDirectory()) {
+      return new S3AFileStatus(isEmptyDirectory, source.getPath(),
+          source.getOwner());
+    } else {
+      return new S3AFileStatus(source.getLen(), source.getModificationTime(),
+          source.getPath(), source.getBlockSize(), source.getOwner());
+    }
+  }
+
+
+  /**
+   * @return FALSE if status is not a directory, or its a dir, but known to
+   * not be empty.  TRUE if it is an empty directory.  UNKNOWN if it is a
+   * directory, but we have not computed whether or not it is empty.
+   */
+  public Tristate isEmptyDirectory() {
     return isEmptyDirectory;
   }
 
@@ -72,9 +109,10 @@ public class S3AFileStatus extends FileStatus {
    * Should not be called by clients.  Only used so {@link org.apache.hadoop
    * .fs.s3a.s3guard.MetadataStore} can maintain this flag when caching
    * FileStatuses on behalf of s3a.
-   * @param value true iff empty
+   * @param value for directories: TRUE / FALSE if known empty/not-empty,
+   *              UNKNOWN otherwise
    */
-  public void setIsEmptyDirectory(boolean value) {
+  public void setIsEmptyDirectory(Tristate value) {
     isEmptyDirectory = value;
   }
 
@@ -120,7 +158,7 @@ public class S3AFileStatus extends FileStatus {
   @Override
   public String toString() {
     return super.toString() +
-        String.format(" isEmptyDirectory=%s", isEmptyDirectory());
+        String.format(" isEmptyDirectory=%s", isEmptyDirectory().name());
   }
 
 }

+ 92 - 29
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -603,7 +603,7 @@ public class S3AFileSystem extends FileSystem {
       boolean overwrite, int bufferSize, short replication, long blockSize,
       Progressable progress) throws IOException {
     String key = pathToKey(f);
-    S3AFileStatus status = null;
+    FileStatus status = null;
     try {
       // get the status or throw an FNFE
       status = getFileStatus(f);
@@ -762,7 +762,7 @@ public class S3AFileSystem extends FileSystem {
 
     // get the source file status; this raises a FNFE if there is no source
     // file.
-    S3AFileStatus srcStatus = getFileStatus(src);
+    S3AFileStatus srcStatus = innerGetFileStatus(src, true);
 
     if (srcKey.equals(dstKey)) {
       LOG.debug("rename: src and dest refer to the same file or directory: {}",
@@ -774,7 +774,7 @@ public class S3AFileSystem extends FileSystem {
 
     S3AFileStatus dstStatus = null;
     try {
-      dstStatus = getFileStatus(dst);
+      dstStatus = innerGetFileStatus(dst, true);
       // if there is no destination entry, an exception is raised.
       // hence this code sequence can assume that there is something
       // at the end of the path; the only detail being what it is and
@@ -784,7 +784,7 @@ public class S3AFileSystem extends FileSystem {
           throw new RenameFailedException(src, dst,
               "source is a directory and dest is a file")
               .withExitCode(srcStatus.isFile());
-        } else if (!dstStatus.isEmptyDirectory()) {
+        } else if (dstStatus.isEmptyDirectory() != Tristate.TRUE) {
           throw new RenameFailedException(src, dst,
               "Destination is a non-empty directory")
               .withExitCode(false);
@@ -806,7 +806,8 @@ public class S3AFileSystem extends FileSystem {
       Path parent = dst.getParent();
       if (!pathToKey(parent).isEmpty()) {
         try {
-          S3AFileStatus dstParentStatus = getFileStatus(dst.getParent());
+          S3AFileStatus dstParentStatus = innerGetFileStatus(dst.getParent(),
+              false);
           if (!dstParentStatus.isDirectory()) {
             throw new RenameFailedException(src, dst,
                 "destination parent is not a directory");
@@ -869,7 +870,7 @@ public class S3AFileSystem extends FileSystem {
       }
 
       List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
-      if (dstStatus != null && dstStatus.isEmptyDirectory()) {
+      if (dstStatus != null && dstStatus.isEmptyDirectory() == Tristate.TRUE) {
         // delete unnecessary fake directory.
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
       }
@@ -949,6 +950,17 @@ public class S3AFileSystem extends FileSystem {
     return !S3Guard.isNullMetadataStore(metadataStore);
   }
 
+  @VisibleForTesting
+  MetadataStore getMetadataStore() {
+    return metadataStore;
+  }
+
+  /** For testing only.  See ITestS3GuardEmptyDirs. */
+  @VisibleForTesting
+  void setMetadataStore(MetadataStore ms) {
+    metadataStore = ms;
+  }
+
   /**
    * Increment a statistic by 1.
    * @param statistic The operation to increment
@@ -1338,7 +1350,7 @@ public class S3AFileSystem extends FileSystem {
    */
   public boolean delete(Path f, boolean recursive) throws IOException {
     try {
-      return innerDelete(getFileStatus(f), recursive);
+      return innerDelete(innerGetFileStatus(f, true), recursive);
     } catch (FileNotFoundException e) {
       LOG.debug("Couldn't delete {} - does not exist", f);
       instrumentation.errorIgnored();
@@ -1368,6 +1380,9 @@ public class S3AFileSystem extends FileSystem {
 
     if (status.isDirectory()) {
       LOG.debug("delete: Path is a directory: {}", f);
+      Preconditions.checkArgument(
+          status.isEmptyDirectory() != Tristate.UNKNOWN,
+          "File status must have directory emptiness computed");
 
       if (!key.endsWith("/")) {
         key = key + "/";
@@ -1377,11 +1392,11 @@ public class S3AFileSystem extends FileSystem {
         return rejectRootDirectoryDelete(status, recursive);
       }
 
-      if (!recursive && !status.isEmptyDirectory()) {
+      if (!recursive && status.isEmptyDirectory() == Tristate.FALSE) {
         throw new PathIsNotEmptyDirectoryException(f.toString());
       }
 
-      if (status.isEmptyDirectory()) {
+      if (status.isEmptyDirectory() == Tristate.TRUE) {
         LOG.debug("Deleting fake empty directory {}", key);
         // HADOOP-13761 s3guard: retries here
         deleteObject(key);
@@ -1446,7 +1461,7 @@ public class S3AFileSystem extends FileSystem {
   private boolean rejectRootDirectoryDelete(S3AFileStatus status,
       boolean recursive) throws IOException {
     LOG.info("s3a delete the {} root directory of {}", bucket, recursive);
-    boolean emptyRoot = status.isEmptyDirectory();
+    boolean emptyRoot = status.isEmptyDirectory() == Tristate.TRUE;
     if (emptyRoot) {
       return true;
     }
@@ -1461,7 +1476,7 @@ public class S3AFileSystem extends FileSystem {
   private void createFakeDirectoryIfNecessary(Path f)
       throws IOException, AmazonClientException {
     String key = pathToKey(f);
-    if (!key.isEmpty() && !exists(f)) {
+    if (!key.isEmpty() && !s3Exists(f)) {
       LOG.debug("Creating new fake directory at {}", f);
       createFakeDirectory(key);
     }
@@ -1678,35 +1693,72 @@ public class S3AFileSystem extends FileSystem {
    * @throws java.io.FileNotFoundException when the path does not exist;
    * @throws IOException on other problems.
    */
-  public S3AFileStatus getFileStatus(final Path f) throws IOException {
+  public FileStatus getFileStatus(final Path f) throws IOException {
+    return innerGetFileStatus(f, false);
+  }
+
+  /**
+   * Internal version of getFileStatus().
+   * @param f The path we want information from
+   * @param needEmptyDirectoryFlag if true, implementation will calculate
+   *        a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
+   * @return a S3AFileStatus object
+   * @throws java.io.FileNotFoundException when the path does not exist;
+   * @throws IOException on other problems.
+   */
+  @VisibleForTesting
+  S3AFileStatus innerGetFileStatus(final Path f,
+      boolean needEmptyDirectoryFlag) throws IOException {
     incrementStatistic(INVOCATION_GET_FILE_STATUS);
     final Path path = qualify(f);
     String key = pathToKey(path);
-    LOG.debug("Getting path status for {}  ({})", path , key);
+    LOG.debug("Getting path status for {}  ({})", path, key);
 
     // Check MetadataStore, if any.
-    PathMetadata pm = metadataStore.get(path);
+    PathMetadata pm = metadataStore.get(path, needEmptyDirectoryFlag);
     if (pm != null) {
       // HADOOP-13760: handle deleted files, i.e. PathMetadata#isDeleted() here
-      return (S3AFileStatus)pm.getFileStatus();
+
+      FileStatus msStatus = pm.getFileStatus();
+      if (needEmptyDirectoryFlag && msStatus.isDirectory()) {
+        if (pm.isEmptyDirectory() != Tristate.UNKNOWN) {
+          // We have a definitive true / false from MetadataStore, we are done.
+          return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
+        } else {
+          LOG.debug("MetadataStore doesn't know if dir is empty, using S3.");
+        }
+      } else {
+        // Either this is not a directory, or we don't care if it is empty
+        return S3AFileStatus.fromFileStatus(msStatus, pm.isEmptyDirectory());
+      }
     }
+    return S3Guard.putAndReturn(metadataStore, s3GetFileStatus(path, key));
+  }
 
+  /**
+   * Raw get file status that only uses S3.  Used to implement
+   * innerGetFileStatus, and for direct management of empty directory blobs.
+   * @param path Qualified path
+   * @param key  Key string for the path
+   * @return Status
+   * @throws IOException
+   */
+  private S3AFileStatus s3GetFileStatus(final Path path, String key)
+      throws IOException {
     if (!key.isEmpty()) {
       try {
         ObjectMetadata meta = getObjectMetadata(key);
 
         if (objectRepresentsDirectory(key, meta.getContentLength())) {
           LOG.debug("Found exact file: fake directory");
-          return S3Guard.putAndReturn(metadataStore,
-              new S3AFileStatus(true, path, username));
+          return new S3AFileStatus(Tristate.TRUE, path, username);
         } else {
           LOG.debug("Found exact file: normal file");
-          return S3Guard.putAndReturn(metadataStore,
-              new S3AFileStatus(meta.getContentLength(),
+          return new S3AFileStatus(meta.getContentLength(),
                   dateToLong(meta.getLastModified()),
                   path,
                   getDefaultBlockSize(path),
-                  username));
+                  username);
         }
       } catch (AmazonServiceException e) {
         if (e.getStatusCode() != 404) {
@@ -1724,18 +1776,16 @@ public class S3AFileSystem extends FileSystem {
 
           if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
             LOG.debug("Found file (with /): fake directory");
-            return S3Guard.putAndReturn(metadataStore,
-                new S3AFileStatus(true, path, username));
+            return new S3AFileStatus(Tristate.TRUE, path, username);
           } else {
             LOG.warn("Found file (with /): real file? should not happen: {}",
                 key);
 
-            return S3Guard.putAndReturn(metadataStore,
-                new S3AFileStatus(meta.getContentLength(),
+            return new S3AFileStatus(meta.getContentLength(),
                     dateToLong(meta.getLastModified()),
                     path,
                     getDefaultBlockSize(path),
-                    username));
+                    username);
           }
         } catch (AmazonServiceException e) {
           if (e.getStatusCode() != 404) {
@@ -1772,12 +1822,10 @@ public class S3AFileSystem extends FileSystem {
           }
         }
 
-        return S3Guard.putAndReturn(metadataStore,
-            new S3AFileStatus(false, path, username));
+        return new S3AFileStatus(Tristate.FALSE, path, username);
       } else if (key.isEmpty()) {
         LOG.debug("Found root directory");
-        return S3Guard.putAndReturn(metadataStore,
-            new S3AFileStatus(true, path, username));
+        return new S3AFileStatus(Tristate.TRUE, path, username);
       }
     } catch (AmazonServiceException e) {
       if (e.getStatusCode() != 404) {
@@ -1791,6 +1839,21 @@ public class S3AFileSystem extends FileSystem {
     throw new FileNotFoundException("No such file or directory: " + path);
   }
 
+  /**
+   * Raw version of {@link FileSystem#exists(Path)} which uses S3 only:
+   * S3Guard MetadataStore, if any, will be skipped.
+   * @return true if path exists in S3
+   */
+  private boolean s3Exists(final Path f) throws IOException {
+    Path path = qualify(f);
+    String key = pathToKey(path);
+    try {
+      return s3GetFileStatus(path, key) != null;
+    } catch (FileNotFoundException e) {
+      return false;
+    }
+  }
+
   /**
    * The src file is on the local disk.  Add it to FS at
    * the given dst name.

+ 1 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -306,7 +306,7 @@ public final class S3AUtils {
   private static S3AFileStatus createFileStatus(Path keyPath, boolean isDir,
       long size, Date modified, long blockSize, String owner) {
     if (isDir) {
-      return new S3AFileStatus(true, keyPath, owner);
+      return new S3AFileStatus(Tristate.UNKNOWN, keyPath, owner);
     } else {
       return new S3AFileStatus(size, dateToLong(modified), keyPath, blockSize,
           owner);

+ 32 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Tristate.java

@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Simple enum to express {true, false, don't know}.
+ */
+public enum Tristate {
+  // Do not add additional values here.  Logic will assume there are exactly
+  // three possibilities.
+  TRUE, FALSE, UNKNOWN;
+
+  public static Tristate fromBool(boolean v) {
+    return v ? TRUE : FALSE;
+  }
+}

+ 21 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DirListingMetadata.java

@@ -31,6 +31,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.s3a.Tristate;
 
 /**
  * {@code DirListingMetadata} models a directory listing stored in a
@@ -110,6 +111,26 @@ public class DirListingMetadata {
     return isAuthoritative;
   }
 
+
+  /**
+   * Is the underlying directory known to be empty?
+   * @return FALSE if directory is known to have a child entry, TRUE if
+   * directory is known to be empty, UNKNOWN otherwise.
+   */
+  public Tristate isEmpty() {
+    if (getListing().isEmpty()) {
+      if (isAuthoritative()) {
+        return Tristate.TRUE;
+      } else {
+        // This listing is empty, but may not be full list of underlying dir.
+        return Tristate.UNKNOWN;
+      }
+    } else { // not empty listing
+      // There exists at least one child, dir not empty.
+      return Tristate.FALSE;
+    }
+  }
+
   /**
    * Marks this directory listing as full and authoritative.
    * @param authoritative see {@link #isAuthoritative()}.

+ 47 - 10
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

@@ -55,7 +55,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.slf4j.Logger;
@@ -66,7 +68,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -316,6 +317,12 @@ public class DynamoDBMetadataStore implements MetadataStore {
 
   @Override
   public PathMetadata get(Path path) throws IOException {
+    return get(path, false);
+  }
+
+  @Override
+  public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+      throws IOException {
     path = checkPath(path);
     LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
 
@@ -323,7 +330,7 @@ public class DynamoDBMetadataStore implements MetadataStore {
       final PathMetadata meta;
       if (path.isRoot()) {
         // Root does not persist in the table
-        meta = new PathMetadata(new S3AFileStatus(true, path, username));
+        meta = new PathMetadata(makeDirStatus(username, path));
       } else {
         final GetItemSpec spec = new GetItemSpec()
             .withPrimaryKey(pathToKey(path))
@@ -334,8 +341,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
             tableName, region, path, meta);
       }
 
-      if (meta != null) {
-        final S3AFileStatus status = (S3AFileStatus) meta.getFileStatus();
+      if (wantEmptyDirectoryFlag && meta != null) {
+        final FileStatus status = meta.getFileStatus();
         // for directory, we query its direct children to determine isEmpty bit
         if (status.isDirectory()) {
           final QuerySpec spec = new QuerySpec()
@@ -343,7 +350,13 @@ public class DynamoDBMetadataStore implements MetadataStore {
               .withConsistentRead(true)
               .withMaxResultSize(1); // limit 1
           final ItemCollection<QueryOutcome> items = table.query(spec);
-          status.setIsEmptyDirectory(!(items.iterator().hasNext()));
+          boolean hasChildren = items.iterator().hasNext();
+          // When this class has support for authoritative
+          // (fully-cached) directory listings, we may also be able to answer
+          // TRUE here.  Until then, we don't know if we have full listing or
+          // not, thus the UNKNOWN here:
+          meta.setIsEmptyDirectory(
+              hasChildren ? Tristate.FALSE : Tristate.UNKNOWN);
         }
       }
 
@@ -353,6 +366,19 @@ public class DynamoDBMetadataStore implements MetadataStore {
     }
   }
 
+  /**
+   * Make a FileStatus object for a directory at given path.  The FileStatus
+   * only contains what S3A needs, and omits mod time since S3A uses its own
+   * implementation which returns current system time.
+   * @param owner  username of owner
+   * @param path   path to dir
+   * @return new FileStatus
+   */
+  private FileStatus makeDirStatus(String owner, Path path) {
+    return new FileStatus(0, true, 1, 0, 0, 0, null,
+            owner, null, path);
+  }
+
   @Override
   public DirListingMetadata listChildren(Path path) throws IOException {
     path = checkPath(path);
@@ -507,8 +533,8 @@ public class DynamoDBMetadataStore implements MetadataStore {
           .withConsistentRead(true); // strictly consistent read
       final Item item = table.getItem(spec);
       if (item == null) {
-        final S3AFileStatus status = new S3AFileStatus(false, path, username);
-        metasToPut.add(new PathMetadata(status));
+        final FileStatus status = makeDirStatus(path, username);
+        metasToPut.add(new PathMetadata(status, Tristate.FALSE));
         path = path.getParent();
       } else {
         break;
@@ -517,14 +543,25 @@ public class DynamoDBMetadataStore implements MetadataStore {
     return metasToPut;
   }
 
+  /** Create a directory FileStatus using current system time as mod time. */
+  static FileStatus makeDirStatus(Path f, String owner) {
+    return  new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
+        null, owner, owner, f);
+  }
+
   @Override
   public void put(DirListingMetadata meta) throws IOException {
     LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
 
     // directory path
-    final Collection<PathMetadata> metasToPut = fullPathsToPut(
-        new PathMetadata(new S3AFileStatus(false, meta.getPath(), username)));
-    // all children of the directory
+    PathMetadata p = new PathMetadata(
+        makeDirStatus(meta.getPath(), username),
+        meta.isEmpty());
+
+    // First add any missing ancestors...
+    final Collection<PathMetadata> metasToPut = fullPathsToPut(p);
+
+    // next add all children of the directory
     metasToPut.addAll(meta.getListing());
 
     try {

+ 29 - 83
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/LocalMetadataStore.java

@@ -24,8 +24,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
-import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.Tristate;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,7 +63,6 @@ public class LocalMetadataStore implements MetadataStore {
   private LruHashMap<Path, DirListingMetadata> dirHash;
 
   private FileSystem fs;
-  private boolean isS3A;
   /* Null iff this FS does not have an associated URI host. */
   private String uriHost;
 
@@ -72,14 +70,6 @@ public class LocalMetadataStore implements MetadataStore {
   public void initialize(FileSystem fileSystem) throws IOException {
     Preconditions.checkNotNull(fileSystem);
     fs = fileSystem;
-
-    // Hate to take a dependency on S3A, but if the MetadataStore has to
-    // maintain S3AFileStatus#isEmptyDirectory, best to be able to to that
-    // under our own lock.
-    if (fs instanceof S3AFileSystem) {
-      isS3A = true;
-    }
-
     URI fsURI = fs.getUri();
     uriHost = fsURI.getHost();
     if (uriHost != null && uriHost.equals("")) {
@@ -105,7 +95,6 @@ public class LocalMetadataStore implements MetadataStore {
   public String toString() {
     final StringBuilder sb = new StringBuilder(
         "LocalMetadataStore{");
-    sb.append("isS3A=").append(isS3A);
     sb.append(", uriHost='").append(uriHost).append('\'');
     sb.append('}');
     return sb.toString();
@@ -140,10 +129,35 @@ public class LocalMetadataStore implements MetadataStore {
 
   @Override
   public synchronized PathMetadata get(Path p) throws IOException {
+    return get(p, false);
+  }
+
+  @Override
+  public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag)
+      throws IOException {
     Path path = standardize(p);
-    PathMetadata m = fileHash.mruGet(path);
-    LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint());
-    return m;
+    synchronized (this) {
+      PathMetadata m = fileHash.mruGet(path);
+
+      if (wantEmptyDirectoryFlag && m != null &&
+          m.getFileStatus().isDirectory()) {
+        m.setIsEmptyDirectory(isEmptyDirectory(p));
+      }
+
+      LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint());
+      return m;
+    }
+  }
+
+  /**
+   * Determine if directory is empty.
+   * Call with lock held.
+   * @param p a Path, already filtered through standardize()
+   * @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
+   */
+  private Tristate isEmptyDirectory(Path p) {
+    DirListingMetadata dirMeta = dirHash.get(p);
+    return dirMeta.isEmpty();
   }
 
   @Override
@@ -240,11 +254,6 @@ public class LocalMetadataStore implements MetadataStore {
           parent = new DirListingMetadata(parentPath,
               DirListingMetadata.EMPTY_DIR, false);
           dirHash.put(parentPath, parent);
-        } else {
-          // S3A-specific logic to maintain S3AFileStatus#isEmptyDirectory()
-          if (isS3A) {
-            setS3AIsEmpty(parentPath, false);
-          }
         }
         parent.put(status);
       }
@@ -360,69 +369,6 @@ public class LocalMetadataStore implements MetadataStore {
       if (dir != null) {
         LOG.debug("removing parent's entry for {} ", path);
         dir.remove(path);
-
-        // S3A-specific logic dealing with S3AFileStatus#isEmptyDirectory()
-        if (isS3A) {
-          if (dir.isAuthoritative() && dir.numEntries() == 0) {
-            setS3AIsEmpty(parent, true);
-          } else if (dir.numEntries() == 0) {
-            // We do not know of any remaining entries in parent directory.
-            // However, we do not have authoritative listing, so there may
-            // still be some entries in the dir.  Since we cannot know the
-            // proper state of the parent S3AFileStatus#isEmptyDirectory, we
-            // will invalidate our entries for it.
-            // Better than deleting entries would be marking them as "missing
-            // metadata".  Deleting them means we lose consistent listing and
-            // ability to retry for eventual consistency for the parent path.
-
-            // TODO implement missing metadata feature
-            invalidateFileStatus(parent);
-          }
-          // else parent directory still has entries in it, isEmptyDirectory
-          // does not change
-        }
-
-      }
-    }
-  }
-
-  /**
-   * Invalidate any stored FileStatus's for path.  Call with lock held.
-   * @param path path to invalidate
-   */
-  private void invalidateFileStatus(Path path) {
-    // TODO implement missing metadata feature
-    fileHash.remove(path);
-    Path parent = path.getParent();
-    if (parent != null) {
-      DirListingMetadata parentListing = dirHash.get(parent);
-      if (parentListing != null) {
-        parentListing.remove(path);
-      }
-    }
-  }
-
-  private void setS3AIsEmpty(Path path, boolean isEmpty) {
-    // Update any file statuses in fileHash
-    PathMetadata meta = fileHash.get(path);
-    if (meta != null) {
-      S3AFileStatus s3aStatus =  (S3AFileStatus)meta.getFileStatus();
-      LOG.debug("Setting S3AFileStatus is empty dir ({}) for key {}, {}",
-          isEmpty, path, s3aStatus.getPath());
-      s3aStatus.setIsEmptyDirectory(isEmpty);
-    }
-    // Update any file statuses in dirHash
-    Path parent = path.getParent();
-    if (parent != null) {
-      DirListingMetadata dirMeta = dirHash.get(parent);
-      if (dirMeta != null) {
-        PathMetadata entry = dirMeta.get(path);
-        if (entry != null) {
-          S3AFileStatus s3aStatus =  (S3AFileStatus)entry.getFileStatus();
-          LOG.debug("Setting S3AFileStatus is empty dir ({}) for key " +
-                  "{}, {}", isEmpty, path, s3aStatus.getPath());
-          s3aStatus.setIsEmptyDirectory(isEmpty);
-        }
       }
     }
   }

+ 15 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStore.java

@@ -83,6 +83,21 @@ public interface MetadataStore extends Closeable {
    */
   PathMetadata get(Path path) throws IOException;
 
+  /**
+   * Gets metadata for a path.  Alternate method that includes a hint
+   * whether or not the MetadataStore should do work to compute the value for
+   * {@link PathMetadata#isEmptyDirectory()}.  Since determining emptiness
+   * may be an expensive operation, this can save wasted work.
+   *
+   * @param path the path to get
+   * @param wantEmptyDirectoryFlag Set to true to give a hint to the
+   *   MetadataStore that it should try to compute the empty directory flag.
+   * @return metadata for {@code path}, {@code null} if not found
+   * @throws IOException if there is an error
+   */
+  PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+      throws IOException;
+
   /**
    * Lists metadata for all direct children of a path.
    *

+ 6 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/NullMetadataStore.java

@@ -61,6 +61,12 @@ public class NullMetadataStore implements MetadataStore {
     return null;
   }
 
+  @Override
+  public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
+      throws IOException {
+    return null;
+  }
+
   @Override
   public DirListingMetadata listChildren(Path path) throws IOException {
     return null;

+ 24 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadata.java

@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.s3a.Tristate;
 
 /**
  * {@code PathMetadata} models path metadata stored in the
@@ -32,18 +33,24 @@ import org.apache.hadoop.fs.FileStatus;
 public class PathMetadata {
 
   private final FileStatus fileStatus;
+  private Tristate isEmptyDirectory;
 
   /**
    * Creates a new {@code PathMetadata} containing given {@code FileStatus}.
    * @param fileStatus file status containing an absolute path.
    */
   public PathMetadata(FileStatus fileStatus) {
+    this(fileStatus, Tristate.UNKNOWN);
+  }
+
+  public PathMetadata(FileStatus fileStatus, Tristate isEmptyDir) {
     Preconditions.checkNotNull(fileStatus, "fileStatus must be non-null");
     Preconditions.checkNotNull(fileStatus.getPath(), "fileStatus path must be" +
         " non-null");
     Preconditions.checkArgument(fileStatus.getPath().isAbsolute(), "path must" +
         " be absolute");
     this.fileStatus = fileStatus;
+    this.isEmptyDirectory = isEmptyDir;
   }
 
   /**
@@ -53,6 +60,19 @@ public class PathMetadata {
     return fileStatus;
   }
 
+  /**
+   * @return Tristate.TRUE if this is known to be an empty directory,
+   * Tristate.FALSE if known to not be empty, and Tristate.UNKNOWN if the
+   * MetadataStore does have enough information to determine either way.
+   */
+  public Tristate isEmptyDirectory() {
+    return isEmptyDirectory;
+  }
+
+  void setIsEmptyDirectory(Tristate isEmptyDirectory) {
+    this.isEmptyDirectory = isEmptyDirectory;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (!(o instanceof PathMetadata)) {
@@ -70,6 +90,7 @@ public class PathMetadata {
   public String toString() {
     return "PathMetadata{" +
         "fileStatus=" + fileStatus +
+        "; isEmptyDirectory=" + isEmptyDirectory +
         '}';
   }
 
@@ -78,9 +99,10 @@ public class PathMetadata {
    * @param sb target StringBuilder
    */
   public void prettyPrint(StringBuilder sb) {
-    sb.append(String.format("%-5s %-20s %-7d",
+    sb.append(String.format("%-5s %-20s %-7d %s",
         fileStatus.isDirectory() ? "dir" : "file",
-        fileStatus.getPath().toString(), fileStatus.getLen()));
+        fileStatus.getPath().toString(), fileStatus.getLen(),
+        isEmptyDirectory.name()));
     sb.append(fileStatus);
   }
 

+ 3 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/PathMetadataDynamoDBTranslation.java

@@ -39,7 +39,6 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.Constants;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
 
 /**
  * Defines methods for translating between domain model objects and their
@@ -126,12 +125,13 @@ final class PathMetadataDynamoDBTranslation {
     boolean isDir = item.hasAttribute(IS_DIR) && item.getBoolean(IS_DIR);
     final FileStatus fileStatus;
     if (isDir) {
-      fileStatus = new S3AFileStatus(true, path, username);
+      fileStatus = DynamoDBMetadataStore.makeDirStatus(path, username);
     } else {
       long len = item.hasAttribute(FILE_LENGTH) ? item.getLong(FILE_LENGTH) : 0;
       long modTime = item.hasAttribute(MOD_TIME) ? item.getLong(MOD_TIME) : 0;
       long block = item.hasAttribute(BLOCK_SIZE) ? item.getLong(BLOCK_SIZE) : 0;
-      fileStatus = new S3AFileStatus(len, modTime, path, block, username);
+      fileStatus = new FileStatus(len, false, 1, block, modTime, 0, null,
+          username, username, path);
     }
 
     return new PathMetadata(fileStatus);

+ 6 - 5
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3Guard.java

@@ -246,13 +246,13 @@ public final class S3Guard {
      *    [/a/b/file0, /a/b/file1, /a/b/file2, /a/b/file3], isAuthoritative =
      *    true
      */
-    S3AFileStatus prevStatus = null;
+    FileStatus prevStatus = null;
     // Iterate from leaf to root
     for (int i = 0; i < dirs.size(); i++) {
       boolean isLeaf = (prevStatus == null);
       Path f = dirs.get(i);
       assertQualified(f);
-      S3AFileStatus status = new S3AFileStatus(isLeaf, f, owner);
+      FileStatus status = S3AUtils.createUploadFileStatus(f, true, 0, 0, owner);
       Collection<PathMetadata> children;
       if (isLeaf) {
         children = DirListingMetadata.EMPTY_DIR;
@@ -291,7 +291,8 @@ public final class S3Guard {
     }
     assertQualified(srcPath);
     assertQualified(dstPath);
-    S3AFileStatus dstStatus = new S3AFileStatus(true, dstPath, owner);
+    FileStatus dstStatus = S3AUtils.createUploadFileStatus(dstPath, true, 0,
+        0, owner);
     addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
   }
 
@@ -316,13 +317,13 @@ public final class S3Guard {
     }
     assertQualified(srcPath);
     assertQualified(dstPath);
-    S3AFileStatus dstStatus = S3AUtils.createUploadFileStatus(dstPath, false,
+    FileStatus dstStatus = S3AUtils.createUploadFileStatus(dstPath, false,
         size, blockSize, owner);
     addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
   }
 
   private static void addMoveStatus(Collection<Path> srcPaths,
-      Collection<PathMetadata> dstMetas, Path srcPath, S3AFileStatus dstStatus)
+      Collection<PathMetadata> dstMetas, Path srcPath, FileStatus dstStatus)
   {
     srcPaths.add(srcPath);
     dstMetas.add(new PathMetadata(dstStatus));

+ 12 - 16
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/S3GuardTool.java

@@ -424,14 +424,15 @@ public abstract class S3GuardTool extends Configured implements Tool {
      * @param f the file or an empty directory.
      * @throws IOException on I/O errors.
      */
-    private void putParentsIfNotPresent(S3AFileStatus f) throws IOException {
+    private void putParentsIfNotPresent(FileStatus f) throws IOException {
       Preconditions.checkNotNull(f);
       Path parent = f.getPath().getParent();
       while (parent != null) {
         if (dirCache.contains(parent)) {
           return;
         }
-        S3AFileStatus dir = new S3AFileStatus(false, parent, f.getOwner());
+        FileStatus dir = DynamoDBMetadataStore.makeDirStatus(parent,
+            f.getOwner());
         ms.put(new PathMetadata(dir));
         dirCache.add(parent);
         parent = parent.getParent();
@@ -441,21 +442,16 @@ public abstract class S3GuardTool extends Configured implements Tool {
     /**
      * Recursively import every path under path
      */
-    private void importDir(S3AFileStatus status) throws IOException {
+    private void importDir(FileStatus status) throws IOException {
       Preconditions.checkArgument(status.isDirectory());
       RemoteIterator<LocatedFileStatus> it =
           s3a.listFiles(status.getPath(), true);
 
       while (it.hasNext()) {
         LocatedFileStatus located = it.next();
-        S3AFileStatus child;
+        FileStatus child;
         if (located.isDirectory()) {
-          // Note that {@link S3AFileStatus#isEmptyDirectory} is erased in
-          // {@link LocatedFileStatus}, the metadata store impl can choose
-          // how to set isEmptyDir when we import the subfiles after creating
-          // the directory in the metadata store.
-          final boolean isEmptyDir = true;
-          child = new S3AFileStatus(isEmptyDir, located.getPath(),
+          child = DynamoDBMetadataStore.makeDirStatus(located.getPath(),
               located.getOwner());
           dirCache.add(child.getPath());
         } else {
@@ -493,7 +489,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
         filePath = "/";
       }
       Path path = new Path(filePath);
-      S3AFileStatus status = s3a.getFileStatus(path);
+      FileStatus status = s3a.getFileStatus(path);
 
       initMetadataStore(false);
 
@@ -557,7 +553,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
      * @param status the status to print.
      * @return the string of output.
      */
-    private static String formatFileStatus(S3AFileStatus status) {
+    private static String formatFileStatus(FileStatus status) {
       return String.format("%s%s%s",
           status.isDirectory() ? "D" : "F",
           SEP,
@@ -571,8 +567,8 @@ public abstract class S3GuardTool extends Configured implements Tool {
      * @param statusFromS3 file status from S3.
      * @param out output stream.
      */
-    private static void printDiff(S3AFileStatus statusFromMS,
-                                  S3AFileStatus statusFromS3,
+    private static void printDiff(FileStatus statusFromMS,
+                                  FileStatus statusFromS3,
                                   PrintStream out) {
       Preconditions.checkArgument(
           !(statusFromMS == null && statusFromS3 == null));
@@ -599,7 +595,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
      * @param out the output stream to generate diff results.
      * @throws IOException
      */
-    private void compareDir(S3AFileStatus msDir, S3AFileStatus s3Dir,
+    private void compareDir(FileStatus msDir, FileStatus s3Dir,
                             PrintStream out) throws IOException {
       if (msDir == null && s3Dir == null) {
         return;
@@ -656,7 +652,7 @@ public abstract class S3GuardTool extends Configured implements Tool {
      */
     private void compare(Path path, PrintStream out) throws IOException {
       Path qualified = s3a.qualify(path);
-      S3AFileStatus s3Status = null;
+      FileStatus s3Status = null;
       try {
         s3Status = s3a.getFileStatus(qualified);
       } catch (FileNotFoundException e) {

+ 2 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AConfiguration.java

@@ -25,6 +25,7 @@ import com.amazonaws.services.s3.S3ClientOptions;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.reflect.FieldUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3native.S3xLoginHelper;
@@ -483,7 +484,7 @@ public class ITestS3AConfiguration {
       }
     });
     assertEquals("username", alice, fs.getUsername());
-    S3AFileStatus status = fs.getFileStatus(new Path("/"));
+    FileStatus status = fs.getFileStatus(new Path("/"));
     assertEquals("owner in " + status, alice, status.getOwner());
     assertEquals("group in " + status, alice, status.getGroup());
   }

+ 2 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ACredentialsInURL.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.fs.s3a;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.After;
@@ -133,7 +134,7 @@ public class ITestS3ACredentialsInURL extends Assert {
     conf.set(TEST_FS_S3A_NAME, testURI.toString());
     try {
       fs = S3ATestUtils.createTestFileSystem(conf);
-      S3AFileStatus status = fs.getFileStatus(new Path("/"));
+      FileStatus status = fs.getFileStatus(new Path("/"));
       fail("Expected an AccessDeniedException, got " + status);
     } catch (AccessDeniedException e) {
       // expected

+ 13 - 8
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AEmptyDirectory.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.fs.s3a;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.junit.Test;
@@ -40,13 +39,21 @@ public class ITestS3AEmptyDirectory extends AbstractS3ATestBase {
     mkdirs(child);
 
     S3AFileStatus status = getS3AFileStatus(fs, dir);
-    assertFalse("Dir status should not be empty", status.isEmptyDirectory());
+    assertEmptyDirectory(false, status);
 
     // 2. Make testEmptyDir empty
     assertDeleted(child, false);
     status = getS3AFileStatus(fs, dir);
 
-    assertTrue("Dir status should be empty", status.isEmptyDirectory());
+    assertEmptyDirectory(true, status);
+  }
+
+  private static void assertEmptyDirectory(boolean isEmpty, S3AFileStatus s) {
+    String msg = "dir is empty";
+    // Should *not* be Tristate.UNKNOWN since we request a definitive value
+    // in getS3AFileStatus() below
+    Tristate expected = Tristate.fromBool(isEmpty);
+    assertEquals(msg, expected, s.isEmptyDirectory());
   }
 
   @Test
@@ -58,21 +65,19 @@ public class ITestS3AEmptyDirectory extends AbstractS3ATestBase {
     mkdirs(dir);
 
     S3AFileStatus status = getS3AFileStatus(fs, dir);
-    assertTrue("Dir status should be empty", status.isEmptyDirectory());
+    assertEmptyDirectory(true, status);
 
     // 2. Make testEmptyDir non-empty
 
     ContractTestUtils.touch(fs, path("testEmptyDir/file1"));
     status = getS3AFileStatus(fs, dir);
 
-    assertFalse("Dir status should not be empty", status.isEmptyDirectory());
+    assertEmptyDirectory(false, status);
   }
 
   private S3AFileStatus getS3AFileStatus(S3AFileSystem fs, Path p) throws
       IOException {
-    FileStatus s = fs.getFileStatus(p);
-    assertTrue(s instanceof S3AFileStatus);
-    return (S3AFileStatus)s;
+    return fs.innerGetFileStatus(p, true /* want isEmptyDirectory value */);
   }
 
 }

+ 9 - 7
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileOperationCost.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
@@ -62,7 +63,7 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     S3AFileSystem fs = getFileSystem();
     touch(fs, simpleFile);
     resetMetricDiffs();
-    S3AFileStatus status = fs.getFileStatus(simpleFile);
+    FileStatus status = fs.getFileStatus(simpleFile);
     assertTrue("not a file: " + status, status.isFile());
     if (!fs.hasMetadataStore()) {
       metadataRequests.assertDiffEquals(1);
@@ -81,8 +82,9 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     Path dir = path("empty");
     fs.mkdirs(dir);
     resetMetricDiffs();
-    S3AFileStatus status = fs.getFileStatus(dir);
-    assertTrue("not empty: " + status, status.isEmptyDirectory());
+    S3AFileStatus status = fs.innerGetFileStatus(dir, true);
+    assertTrue("not empty: " + status,
+        status.isEmptyDirectory() == Tristate.TRUE);
 
     if (!fs.hasMetadataStore()) {
       metadataRequests.assertDiffEquals(2);
@@ -97,7 +99,7 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     Path path = path("missing");
     resetMetricDiffs();
     try {
-      S3AFileStatus status = fs.getFileStatus(path);
+      FileStatus status = fs.getFileStatus(path);
       fail("Got a status back from a missing file path " + status);
     } catch (FileNotFoundException expected) {
       // expected
@@ -113,7 +115,7 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     Path path = path("missingdir/missingpath");
     resetMetricDiffs();
     try {
-      S3AFileStatus status = fs.getFileStatus(path);
+      FileStatus status = fs.getFileStatus(path);
       fail("Got a status back from a missing file path " + status);
     } catch (FileNotFoundException expected) {
       // expected
@@ -131,8 +133,8 @@ public class ITestS3AFileOperationCost extends AbstractS3ATestBase {
     Path simpleFile = new Path(dir, "simple.txt");
     touch(fs, simpleFile);
     resetMetricDiffs();
-    S3AFileStatus status = fs.getFileStatus(dir);
-    if (status.isEmptyDirectory()) {
+    S3AFileStatus status = fs.innerGetFileStatus(dir, true);
+    if (status.isEmptyDirectory() == Tristate.TRUE) {
       // erroneous state
       String fsState = fs.toString();
       fail("FileStatus says directory isempty: " + status

+ 8 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AFileSystemContract.java

@@ -27,6 +27,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystemContractBaseTest;
 import org.apache.hadoop.fs.Path;
 
+import java.io.IOException;
+
 /**
  *  Tests a live S3 system. If your keys and bucket aren't specified, all tests
  *  are marked as passed.
@@ -78,7 +80,12 @@ public class ITestS3AFileSystemContract extends FileSystemContractBaseTest {
   @Override
   protected void tearDown() throws Exception {
     if (fs != null) {
-      fs.delete(basePath, true);
+      try {
+        LOG.info("Deleting {}", basePath);
+        fs.delete(basePath, true);
+      } catch (IOException e) {
+        LOG.info("Failed to delete {}", basePath);
+      }
     }
     super.tearDown();
   }

+ 83 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3GuardEmptyDirs.java

@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.s3guard.MetadataStore;
+import org.apache.hadoop.fs.s3a.s3guard.NullMetadataStore;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
+
+/**
+ * Test logic around whether or not a directory is empty, with S3Guard enabled.
+ * The fact that S3AFileStatus has an isEmptyDirectory flag in it makes caching
+ * S3AFileStatus's really tricky, as the flag can change as a side effect of
+ * changes to other paths.
+ * After S3Guard is merged to trunk, we should try to remove the
+ * isEmptyDirectory flag from S3AFileStatus, or maintain it outside
+ * of the MetadataStore.
+ */
+public class ITestS3GuardEmptyDirs extends AbstractS3ATestBase {
+
+  @Test
+  public void testEmptyDirs() throws Exception {
+
+    S3AFileSystem fs = getFileSystem();
+    Assume.assumeTrue(fs.hasMetadataStore());
+    MetadataStore configuredMs = fs.getMetadataStore();
+
+    // 1. Simulate files already existing in the bucket before we started our
+    // cluster.  Temporarily disable the MetadataStore so it doesn't witness
+    // us creating these files.
+
+    fs.setMetadataStore(new NullMetadataStore());
+
+    Path existingDir = path("existing-dir");
+    assertTrue(fs.mkdirs(existingDir));
+    Path existingFile = path("existing-dir/existing-file");
+    touch(fs, existingFile);
+
+
+    // 2. Simulate (from MetadataStore's perspective) starting our cluster and
+    // creating a file in an existing directory.
+    fs.setMetadataStore(configuredMs);  // "start cluster"
+    Path newFile = path("existing-dir/new-file");
+    touch(fs, newFile);
+
+    S3AFileStatus status = fs.innerGetFileStatus(existingDir, true);
+    assertEquals("Should not be empty dir", Tristate.FALSE,
+        status.isEmptyDirectory());
+
+    // 3. Assert that removing the only file the MetadataStore witnessed
+    // being created doesn't cause it to think the directory is now empty.
+    fs.delete(newFile, false);
+    status = fs.innerGetFileStatus(existingDir, true);
+    assertEquals("Should not be empty dir", Tristate.FALSE,
+        status.isEmptyDirectory());
+
+    // 4. Assert that removing the final file, that existed "before"
+    // MetadataStore started, *does* cause the directory to be marked empty.
+    fs.delete(existingFile, false);
+    status = fs.innerGetFileStatus(existingDir, true);
+    assertEquals("Should be empty dir now", Tristate.TRUE,
+        status.isEmptyDirectory());
+  }
+}

+ 63 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/MetadataStoreTestBase.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.hadoop.io.IOUtils;
 
 import com.google.common.collect.Sets;
@@ -350,6 +351,68 @@ public abstract class MetadataStoreTestBase extends Assert {
     assertNull("Don't get non-existent file", meta);
   }
 
+  @Test
+  public void testGetEmptyDir() throws Exception {
+    final String dirPath = "/a1/b1/c1/d1";
+    // Creates /a1/b1/c1/d1 as an empty dir
+    setupListStatus();
+
+    // 1. Tell MetadataStore (MS) that there are zero children
+    putListStatusFiles(dirPath, true /* authoritative */
+        /* zero children */);
+
+    // 2. Request a file status for dir, including whether or not the dir
+    // is empty.
+    PathMetadata meta = ms.get(strToPath(dirPath), true);
+
+    // 3. Check that either (a) the MS doesn't track whether or not it is
+    // empty (which is allowed), or (b) the MS knows the dir is empty.
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get should find meta for dir", meta);
+      assertNotEquals("Dir is empty or unknown", Tristate.FALSE,
+          meta.isEmptyDirectory());
+    }
+  }
+
+  @Test
+  public void testGetNonEmptyDir() throws Exception {
+    final String dirPath = "/a1/b1/c1";
+    // Creates /a1/b1/c1 as an non-empty dir
+    setupListStatus();
+
+    // Request a file status for dir, including whether or not the dir
+    // is empty.
+    PathMetadata meta = ms.get(strToPath(dirPath), true);
+
+    // MetadataStore knows /a1/b1/c1 has at least one child.  It is valid
+    // for it to answer either (a) UNKNOWN: the MS doesn't track whether
+    // or not the dir is empty, or (b) the MS knows the dir is non-empty.
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get should find meta for dir", meta);
+      assertNotEquals("Dir is non-empty or unknown", Tristate.TRUE,
+          meta.isEmptyDirectory());
+    }
+  }
+
+  @Test
+  public void testGetDirUnknownIfEmpty() throws Exception {
+    final String dirPath = "/a1/b1/c1/d1";
+    // 1. Create /a1/b1/c1/d1 as an empty dir, but do not tell MetadataStore
+    // (MS) whether or not it has any children.
+    setupListStatus();
+
+    // 2. Request a file status for dir, including whether or not the dir
+    // is empty.
+    PathMetadata meta = ms.get(strToPath(dirPath), true);
+
+    // 3. Assert MS reports isEmptyDir as UNKONWN: We haven't told MS
+    // whether or not the directory has any children.
+    if (!allowMissing() || meta != null) {
+      assertNotNull("Get should find meta for dir", meta);
+      assertEquals("Dir empty is unknown", Tristate.UNKNOWN,
+          meta.isEmptyDirectory());
+    }
+  }
 
   @Test
   public void testListChildren() throws Exception {

+ 10 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestDynamoDBMetadataStore.java

@@ -40,6 +40,7 @@ import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
 import com.amazonaws.services.dynamodbv2.model.TableDescription;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.fs.s3a.Tristate;
 import org.apache.log4j.Level;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
@@ -460,10 +461,17 @@ public class TestDynamoDBMetadataStore extends MetadataStoreTestBase {
 
   private void verifyRootDirectory(PathMetadata rootMeta, boolean isEmpty) {
     assertNotNull(rootMeta);
-    final S3AFileStatus status = (S3AFileStatus) rootMeta.getFileStatus();
+    final FileStatus status = rootMeta.getFileStatus();
     assertNotNull(status);
     assertTrue(status.isDirectory());
-    assertEquals(isEmpty, status.isEmptyDirectory());
+    // UNKNOWN is always a valid option, but true / false should not contradict
+    if (isEmpty) {
+      assertTrue("Should not be marked non-empty",
+          rootMeta.isEmptyDirectory() != Tristate.FALSE);
+    } else {
+      assertTrue("Should not be marked empty",
+          rootMeta.isEmptyDirectory() != Tristate.TRUE);
+    }
   }
 
   @Test

+ 2 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/s3guard/TestPathMetadataDynamoDBTranslation.java

@@ -28,6 +28,7 @@ import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
 import com.amazonaws.services.dynamodbv2.model.AttributeDefinition;
 import com.amazonaws.services.dynamodbv2.model.KeySchemaElement;
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.fs.FileStatus;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
@@ -138,8 +139,7 @@ public class TestPathMetadataDynamoDBTranslation {
    */
   private static void verify(Item item, PathMetadata meta) {
     assertNotNull(meta);
-    assert meta.getFileStatus() instanceof S3AFileStatus;
-    final S3AFileStatus status = (S3AFileStatus) meta.getFileStatus();
+    final FileStatus status = meta.getFileStatus();
     final Path path = status.getPath();
     assertEquals(item.get(PARENT), pathToParentKey(path.getParent()));
     assertEquals(item.get(CHILD), path.getName());

+ 6 - 7
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/AbstractSTestS3AHugeFiles.java

@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
 import com.amazonaws.event.ProgressListener;
+import org.apache.hadoop.fs.FileStatus;
 import org.junit.FixMethodOrder;
 import org.junit.Test;
 import org.junit.runners.MethodSorters;
@@ -34,11 +35,9 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.Statistic;
@@ -222,7 +221,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     assertEquals("active put requests in \n" + fs,
         0, gaugeValue(putRequestsActive));
     ContractTestUtils.assertPathExists(fs, "Huge file", hugefile);
-    S3AFileStatus status = fs.getFileStatus(hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
     ContractTestUtils.assertIsFile(hugefile, status);
     assertEquals("File size in " + status, filesize, status.getLen());
     if (progress != null) {
@@ -324,7 +323,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     String filetype = encrypted ? "encrypted file" : "file";
     describe("Positioned reads of %s %s", filetype, hugefile);
     S3AFileSystem fs = getFileSystem();
-    S3AFileStatus status = fs.getFileStatus(hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
     long filesize = status.getLen();
     int ops = 0;
     final int bufferSize = 8192;
@@ -364,7 +363,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     assumeHugeFileExists();
     describe("Reading %s", hugefile);
     S3AFileSystem fs = getFileSystem();
-    S3AFileStatus status = fs.getFileStatus(hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
     long filesize = status.getLen();
     long blocks = filesize / uploadBlockSize;
     byte[] data = new byte[uploadBlockSize];
@@ -390,7 +389,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
     assumeHugeFileExists();
     describe("renaming %s to %s", hugefile, hugefileRenamed);
     S3AFileSystem fs = getFileSystem();
-    S3AFileStatus status = fs.getFileStatus(hugefile);
+    FileStatus status = fs.getFileStatus(hugefile);
     long filesize = status.getLen();
     fs.delete(hugefileRenamed, false);
     ContractTestUtils.NanoTimer timer = new ContractTestUtils.NanoTimer();
@@ -401,7 +400,7 @@ public abstract class AbstractSTestS3AHugeFiles extends S3AScaleTestBase {
         toHuman(timer.nanosPerOperation(mb)));
     bandwidth(timer, filesize);
     logFSState();
-    S3AFileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
+    FileStatus destFileStatus = fs.getFileStatus(hugefileRenamed);
     assertEquals(filesize, destFileStatus.getLen());
 
     // rename back

+ 2 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java

@@ -20,10 +20,10 @@ package org.apache.hadoop.fs.s3a.scale;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.apache.hadoop.fs.s3a.S3AFileStatus;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3AInputPolicy;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
@@ -56,7 +56,7 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
 
   private S3AFileSystem s3aFS;
   private Path testData;
-  private S3AFileStatus testDataStatus;
+  private FileStatus testDataStatus;
   private FSDataInputStream in;
   private S3AInstrumentation.InputStreamStatistics streamStatistics;
   public static final int BLOCK_SIZE = 32 * 1024;