浏览代码

HADOOP-13171. Add StorageStatistics to S3A; instrument some more operations. Contributed by Steve Loughran.

Chris Nauroth 9 年之前
父节点
当前提交
c58a59f708
共有 18 个文件被更改,包括 1984 次插入439 次删除
  1. 420 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java
  2. 94 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java
  3. 34 31
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
  4. 7 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
  5. 377 130
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  6. 145 73
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
  7. 11 87
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
  8. 104 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java
  9. 48 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
  10. 143 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
  11. 11 1
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
  12. 153 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java
  13. 191 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFileOperationCost.java
  14. 47 107
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
  15. 3 7
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
  16. 189 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java
  17. 4 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
  18. 3 1
      hadoop-tools/hadoop-aws/src/test/resources/log4j.properties

+ 420 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

@@ -22,7 +22,9 @@ import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.io.IOUtils;
 import org.junit.Assert;
 import org.junit.internal.AssumptionViolatedException;
@@ -34,8 +36,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Properties;
+import java.util.Set;
 import java.util.UUID;
 
 /**
@@ -892,4 +900,416 @@ public class ContractTestUtils extends Assert {
       fs.delete(objectPath, false);
     }
   }
+
+  /**
+   * Make times more readable, by adding a "," every three digits.
+   * @param nanos nanos or other large number
+   * @return a string for logging
+   */
+  public static String toHuman(long nanos) {
+    return String.format(Locale.ENGLISH, "%,d", nanos);
+  }
+
+  /**
+   * Log the bandwidth of a timer as inferred from the number of
+   * bytes processed.
+   * @param timer timer
+   * @param bytes bytes processed in the time period
+   */
+  public static void bandwidth(NanoTimer timer, long bytes) {
+    LOG.info("Bandwidth = {}  MB/S",
+        timer.bandwidthDescription(bytes));
+  }
+
+  /**
+   * Work out the bandwidth in MB/s.
+   * @param bytes bytes
+   * @param durationNS duration in nanos
+   * @return the number of megabytes/second of the recorded operation
+   */
+  public static double bandwidthMBs(long bytes, long durationNS) {
+    return (bytes * 1000.0) / durationNS;
+  }
+
+  /**
+   * Recursively create a directory tree.
+   * Return the details about the created tree. The files and directories
+   * are those created under the path, not the base directory created. That
+   * is retrievable via {@link TreeScanResults#getBasePath()}.
+   * @param fs filesystem
+   * @param current parent dir
+   * @param depth depth of directory tree
+   * @param width width: subdirs per entry
+   * @param files number of files per entry
+   * @param filesize size of files to create in bytes.
+   * @return the details about the created tree.
+   * @throws IOException IO Problems
+   */
+  public static TreeScanResults createSubdirs(FileSystem fs,
+      Path current,
+      int depth,
+      int width,
+      int files,
+      int filesize) throws IOException {
+    return createSubdirs(fs, current, depth, width, files,
+        filesize, "dir-", "file-", "0");
+  }
+
+  /**
+   * Recursively create a directory tree.
+   * @param fs filesystem
+   * @param current the current dir in the walk
+   * @param depth depth of directory tree
+   * @param width width: subdirs per entry
+   * @param files number of files per entry
+   * @param filesize size of files to create in bytes.
+   * @param dirPrefix prefix for directory entries
+   * @param filePrefix prefix for file entries
+   * @param marker string which is slowly built up to uniquely name things
+   * @return the details about the created tree.
+   * @throws IOException IO Problems
+   */
+  public static TreeScanResults createSubdirs(FileSystem fs,
+      Path current,
+      int depth,
+      int width,
+      int files,
+      int filesize,
+      String dirPrefix,
+      String filePrefix,
+      String marker) throws IOException {
+    fs.mkdirs(current);
+    TreeScanResults results = new TreeScanResults(current);
+    if (depth > 0) {
+      byte[] data = dataset(filesize, 'a', 'z');
+      for (int i = 0; i < files; i++) {
+        String name = String.format("%s-%s-%04d.txt", filePrefix, marker, i);
+        Path path = new Path(current, name);
+        createFile(fs, path, true, data);
+        results.add(fs, path);
+      }
+      for (int w = 0; w < width; w++) {
+        String marker2 = String.format("%s-%04d", marker, w);
+        Path child = new Path(current, dirPrefix + marker2);
+        results.add(createSubdirs(fs, child, depth - 1, width, files,
+            filesize, dirPrefix, filePrefix, marker2));
+        results.add(fs, child);
+      }
+    }
+    return results;
+  }
+
+  /**
+   * Predicate to determine if two lists are equivalent, that is, they
+   * contain the same entries.
+   * @param left first collection of paths
+   * @param right second collection of paths
+   * @return true if all entries are in each collection of path.
+   */
+  public static boolean collectionsEquivalent(Collection<Path> left,
+      Collection<Path> right) {
+    Set<Path> leftSet = new HashSet<>(left);
+    Set<Path> rightSet = new HashSet<>(right);
+    return leftSet.containsAll(right) && rightSet.containsAll(left);
+  }
+
+  /**
+   * Predicate to determine if two lists are equivalent, that is, they
+   * contain the same entries.
+   * @param left first collection of paths
+   * @param right second collection of paths
+   * @return true if all entries are in each collection of path.
+   */
+  public static boolean collectionsEquivalentNoDuplicates(Collection<Path> left,
+      Collection<Path> right) {
+    return collectionsEquivalent(left, right) &&
+        !containsDuplicates(left) && !containsDuplicates(right);
+  }
+
+
+  /**
+   * Predicate to test for a collection of paths containing duplicate entries.
+   * @param paths collection of paths
+   * @return true if there are duplicates.
+   */
+  public static boolean containsDuplicates(Collection<Path> paths) {
+    return new HashSet<>(paths).size() != paths.size();
+  }
+
+  /**
+   * Recursively list all entries, with a depth first traversal of the
+   * directory tree.
+   * @param path path
+   * @return the number of entries listed
+   * @throws IOException IO problems
+   */
+  public static TreeScanResults treeWalk(FileSystem fs, Path path)
+      throws IOException {
+    TreeScanResults dirsAndFiles = new TreeScanResults();
+
+    FileStatus[] statuses = fs.listStatus(path);
+    for (FileStatus status : statuses) {
+      LOG.info("{}{}", status.getPath(), status.isDirectory() ? "*" : "");
+    }
+    for (FileStatus status : statuses) {
+      dirsAndFiles.add(status);
+      if (status.isDirectory()) {
+        dirsAndFiles.add(treeWalk(fs, status.getPath()));
+      }
+    }
+    return dirsAndFiles;
+  }
+
+  /**
+   * Results of recursive directory creation/scan operations.
+   */
+  public static final class TreeScanResults {
+
+    private Path basePath;
+    private final List<Path> files = new ArrayList<>();
+    private final List<Path> directories = new ArrayList<>();
+    private final List<Path> other = new ArrayList<>();
+
+
+    public TreeScanResults() {
+    }
+
+    public TreeScanResults(Path basePath) {
+      this.basePath = basePath;
+    }
+
+    /**
+     * Build from a located file status iterator.
+     * @param results results of the listFiles/listStatus call.
+     * @throws IOException IO problems during the iteration.
+     */
+    public TreeScanResults(RemoteIterator<LocatedFileStatus> results)
+        throws IOException {
+      while (results.hasNext()) {
+        add(results.next());
+      }
+    }
+
+    /**
+     * Construct results from an array of statistics.
+     * @param stats statistics array. Must not be null.
+     */
+    public TreeScanResults(FileStatus[] stats) {
+      assertNotNull("Null file status array", stats);
+      for (FileStatus stat : stats) {
+        add(stat);
+      }
+    }
+
+    /**
+     * Add all paths in the other set of results to this instance.
+     * @param that the other instance
+     * @return this instance
+     */
+    public TreeScanResults add(TreeScanResults that) {
+      files.addAll(that.files);
+      directories.addAll(that.directories);
+      other.addAll(that.other);
+      return this;
+    }
+
+    /**
+     * Increment the counters based on the file status.
+     * @param status path status to count.
+     */
+    public void add(FileStatus status) {
+      if (status.isFile()) {
+        files.add(status.getPath());
+      } else if (status.isDirectory()) {
+        directories.add(status.getPath());
+      } else {
+        other.add(status.getPath());
+      }
+    }
+
+    public void add(FileSystem fs, Path path) throws IOException {
+      add(fs.getFileStatus(path));
+    }
+
+    @Override
+    public String toString() {
+      return String.format("%d director%s and %d file%s",
+          getDirCount(),
+          getDirCount() == 1 ? "y" : "ies",
+          getFileCount(),
+          getFileCount() == 1 ? "" : "s");
+    }
+
+    /**
+     * Assert that the state of a listing has the specific number of files,
+     * directories and other entries. The error text will include
+     * the {@code text} param, the field in question, and the entire object's
+     * string value.
+     * @param text text prefix for assertions.
+     * @param f file count
+     * @param d expected directory count
+     * @param o expected other entries.
+     */
+    public void assertSizeEquals(String text, long f, long d, long o) {
+      String self = toString();
+      Assert.assertEquals(text + ": file count in " + self,
+          f, getFileCount());
+      Assert.assertEquals(text + ": directory count in " + self,
+          d, getDirCount());
+      Assert.assertEquals(text + ": 'other' count in " + self,
+          o, getOtherCount());
+    }
+
+    /**
+     * Assert that the trees are equivalent: that every list matches (and
+     * that neither has any duplicates).
+     * @param that the other entry
+     */
+    public void assertEquivalent(TreeScanResults that) {
+      String details = "this= " + this + "; that=" + that;
+      assertFieldsEquivalent("files", that, files, that.files);
+      assertFieldsEquivalent("directories", that,
+          directories, that.directories);
+      assertFieldsEquivalent("other", that, other, that.other);
+    }
+
+    /**
+     * Assert that a field in two instances are equivalent.
+     * @param fieldname field name for error messages
+     * @param that the other instance to scan
+     * @param ours our field's contents
+     * @param theirs the other instance's field constants
+     */
+    public void assertFieldsEquivalent(String fieldname,
+        TreeScanResults that,
+        List<Path> ours, List<Path> theirs) {
+      assertFalse("Duplicate  " + files + " in " + this,
+          containsDuplicates(ours));
+      assertFalse("Duplicate  " + files + " in other " + that,
+          containsDuplicates(theirs));
+      assertTrue(fieldname + " mismatch: between {" + this + "}" +
+              " and {" + that + "}",
+          collectionsEquivalent(files, that.files));
+    }
+
+    public List<Path> getFiles() {
+      return files;
+    }
+
+    public List<Path> getDirectories() {
+      return directories;
+    }
+
+    public List<Path> getOther() {
+      return other;
+    }
+
+    public Path getBasePath() {
+      return basePath;
+    }
+
+    public long getFileCount() {
+      return files.size();
+    }
+
+    public long getDirCount() {
+      return directories.size();
+    }
+
+    public long getOtherCount() {
+      return other.size();
+    }
+
+    /**
+     * Total count of entries.
+     * @return the total number of entries
+     */
+    public long totalCount() {
+      return getFileCount() + getDirCount() + getOtherCount();
+    }
+
+  }
+
+  /**
+   * A simple class for timing operations in nanoseconds, and for
+   * printing some useful results in the process.
+   */
+  public static final class NanoTimer {
+    private final long startTime;
+    private long endTime;
+
+    public NanoTimer() {
+      startTime = now();
+    }
+
+    /**
+     * End the operation.
+     * @return the duration of the operation
+     */
+    public long end() {
+      endTime = now();
+      return duration();
+    }
+
+    /**
+     * End the operation; log the duration.
+     * @param format message
+     * @param args any arguments
+     * @return the duration of the operation
+     */
+    public long end(String format, Object... args) {
+      long d = end();
+      LOG.info("Duration of {}: {} nS",
+          String.format(format, args), toHuman(d));
+      return d;
+    }
+
+    public long now() {
+      return System.nanoTime();
+    }
+
+    public long duration() {
+      return endTime - startTime;
+    }
+
+    public double bandwidth(long bytes) {
+      return bandwidthMBs(bytes, duration());
+    }
+
+    /**
+     * Bandwidth as bytes per second.
+     * @param bytes bytes in
+     * @return the number of bytes per second this operation timed.
+     */
+    public double bandwidthBytes(long bytes) {
+      return (bytes * 1.0) / duration();
+    }
+
+    /**
+     * How many nanoseconds per IOP, byte, etc.
+     * @param operations operations processed in this time period
+     * @return the nanoseconds it took each byte to be processed
+     */
+    public long nanosPerOperation(long operations) {
+      return duration() / operations;
+    }
+
+    /**
+     * Get a description of the bandwidth, even down to fractions of
+     * a MB.
+     * @param bytes bytes processed
+     * @return bandwidth
+     */
+    public String bandwidthDescription(long bytes) {
+      return String.format("%,.6f", bandwidth(bytes));
+    }
+
+    public long getStartTime() {
+      return startTime;
+    }
+
+    public long getEndTime() {
+      return endTime;
+    }
+  }
 }

+ 94 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/ProgressableProgressListener.java

@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.amazonaws.event.ProgressEvent;
+import com.amazonaws.event.ProgressEventType;
+import com.amazonaws.event.ProgressListener;
+import com.amazonaws.services.s3.transfer.Upload;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+
+import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
+import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
+
+/**
+ * Listener to progress from AWS regarding transfers.
+ */
+public class ProgressableProgressListener implements ProgressListener {
+  private static final Logger LOG = S3AFileSystem.LOG;
+  private final S3AFileSystem fs;
+  private final String key;
+  private final Progressable progress;
+  private long lastBytesTransferred;
+  private final Upload upload;
+
+  /**
+   * Instantiate.
+   * @param fs filesystem: will be invoked with statistics updates
+   * @param key key for the upload
+   * @param upload source of events
+   * @param progress optional callback for progress.
+   */
+  public ProgressableProgressListener(S3AFileSystem fs,
+      String key,
+      Upload upload,
+      Progressable progress) {
+    this.fs = fs;
+    this.key = key;
+    this.upload = upload;
+    this.progress = progress;
+    this.lastBytesTransferred = 0;
+  }
+
+  @Override
+  public void progressChanged(ProgressEvent progressEvent) {
+    if (progress != null) {
+      progress.progress();
+    }
+
+    // There are 3 http ops here, but this should be close enough for now
+    ProgressEventType pet = progressEvent.getEventType();
+    if (pet == TRANSFER_PART_STARTED_EVENT ||
+        pet == TRANSFER_COMPLETED_EVENT) {
+      fs.incrementWriteOperations();
+    }
+
+    long transferred = upload.getProgress().getBytesTransferred();
+    long delta = transferred - lastBytesTransferred;
+    fs.incrementPutProgressStatistics(key, delta);
+    lastBytesTransferred = transferred;
+  }
+
+  /**
+   * Method to invoke after upload has completed.
+   * This can handle race conditions in setup/teardown.
+   * @return the number of bytes which were transferred after the notification
+   */
+  public long uploadCompleted() {
+    long delta = upload.getProgress().getBytesTransferred() -
+        lastBytesTransferred;
+    if (delta > 0) {
+      LOG.debug("S3A write delta changed after finished: {} bytes", delta);
+      fs.incrementPutProgressStatistics(key, delta);
+    }
+    return delta;
+  }
+
+}

+ 34 - 31
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java

@@ -35,10 +35,8 @@ import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.util.Progressable;
 import org.slf4j.Logger;
 
@@ -54,6 +52,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
 
 /**
  * Upload files/parts asap directly from a memory buffer (instead of buffering
@@ -77,8 +76,6 @@ public class S3AFastOutputStream extends OutputStream {
   private final int multiPartThreshold;
   private final S3AFileSystem fs;
   private final CannedAccessControlList cannedACL;
-  private final FileSystem.Statistics statistics;
-  private final String serverSideEncryptionAlgorithm;
   private final ProgressListener progressListener;
   private final ListeningExecutorService executorService;
   private MultiPartUpload multiPartUpload;
@@ -98,28 +95,28 @@ public class S3AFastOutputStream extends OutputStream {
    * @param bucket S3 bucket name
    * @param key S3 key name
    * @param progress report progress in order to prevent timeouts
-   * @param statistics track FileSystem.Statistics on the performed operations
    * @param cannedACL used CannedAccessControlList
-   * @param serverSideEncryptionAlgorithm algorithm for server side encryption
    * @param partSize size of a single part in a multi-part upload (except
    * last part)
    * @param multiPartThreshold files at least this size use multi-part upload
    * @param threadPoolExecutor thread factory
    * @throws IOException on any problem
    */
-  public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
-      String bucket, String key, Progressable progress,
-      FileSystem.Statistics statistics, CannedAccessControlList cannedACL,
-      String serverSideEncryptionAlgorithm, long partSize,
-      long multiPartThreshold, ExecutorService threadPoolExecutor)
+  public S3AFastOutputStream(AmazonS3Client client,
+      S3AFileSystem fs,
+      String bucket,
+      String key,
+      Progressable progress,
+      CannedAccessControlList cannedACL,
+      long partSize,
+      long multiPartThreshold,
+      ExecutorService threadPoolExecutor)
       throws IOException {
     this.bucket = bucket;
     this.key = key;
     this.client = client;
     this.fs = fs;
     this.cannedACL = cannedACL;
-    this.statistics = statistics;
-    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
     //Ensure limit as ByteArrayOutputStream size cannot exceed Integer.MAX_VALUE
     if (partSize > Integer.MAX_VALUE) {
       this.partSize = Integer.MAX_VALUE;
@@ -246,16 +243,17 @@ public class S3AFastOutputStream extends OutputStream {
       if (multiPartUpload == null) {
         putObject();
       } else {
-        if (buffer.size() > 0) {
+        int size = buffer.size();
+        if (size > 0) {
+          fs.incrementPutStartStatistics(size);
           //send last part
           multiPartUpload.uploadPartAsync(new ByteArrayInputStream(buffer
-              .toByteArray()), buffer.size());
+              .toByteArray()), size);
         }
         final List<PartETag> partETags = multiPartUpload
             .waitForAllPartUploads();
         multiPartUpload.complete(partETags);
       }
-      statistics.incrementWriteOps(1);
       // This will delete unnecessary fake parent directories
       fs.finishedWrite(key);
       LOG.debug("Upload complete for bucket '{}' key '{}'", bucket, key);
@@ -265,18 +263,19 @@ public class S3AFastOutputStream extends OutputStream {
     }
   }
 
+  /**
+   * Create the default metadata for a multipart upload operation.
+   * @return the metadata to use/extend.
+   */
   private ObjectMetadata createDefaultMetadata() {
-    ObjectMetadata om = new ObjectMetadata();
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-    }
-    return om;
+    return fs.newObjectMetadata();
   }
 
   private MultiPartUpload initiateMultiPartUpload() throws IOException {
-    final ObjectMetadata om = createDefaultMetadata();
     final InitiateMultipartUploadRequest initiateMPURequest =
-        new InitiateMultipartUploadRequest(bucket, key, om);
+        new InitiateMultipartUploadRequest(bucket,
+            key,
+            createDefaultMetadata());
     initiateMPURequest.setCannedACL(cannedACL);
     try {
       return new MultiPartUpload(
@@ -290,15 +289,18 @@ public class S3AFastOutputStream extends OutputStream {
     LOG.debug("Executing regular upload for bucket '{}' key '{}'",
         bucket, key);
     final ObjectMetadata om = createDefaultMetadata();
-    om.setContentLength(buffer.size());
-    final PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
-        new ByteArrayInputStream(buffer.toByteArray()), om);
-    putObjectRequest.setCannedAcl(cannedACL);
+    final int size = buffer.size();
+    om.setContentLength(size);
+    final PutObjectRequest putObjectRequest =
+        fs.newPutObjectRequest(key,
+            om,
+            new ByteArrayInputStream(buffer.toByteArray()));
     putObjectRequest.setGeneralProgressListener(progressListener);
     ListenableFuture<PutObjectResult> putObjectResult =
         executorService.submit(new Callable<PutObjectResult>() {
           @Override
           public PutObjectResult call() throws Exception {
+            fs.incrementPutStartStatistics(size);
             return client.putObject(putObjectRequest);
           }
         });
@@ -306,7 +308,7 @@ public class S3AFastOutputStream extends OutputStream {
     try {
       putObjectResult.get();
     } catch (InterruptedException ie) {
-      LOG.warn("Interrupted object upload:" + ie, ie);
+      LOG.warn("Interrupted object upload: {}", ie, ie);
       Thread.currentThread().interrupt();
     } catch (ExecutionException ee) {
       throw extractException("regular upload", key, ee);
@@ -339,7 +341,7 @@ public class S3AFastOutputStream extends OutputStream {
             public PartETag call() throws Exception {
               LOG.debug("Uploading part {} for id '{}'", currentPartNumber,
                   uploadId);
-              return client.uploadPart(request).getPartETag();
+              return fs.uploadPart(request).getPartETag();
             }
           });
       partETagsFutures.add(partETagFuture);
@@ -349,7 +351,7 @@ public class S3AFastOutputStream extends OutputStream {
       try {
         return Futures.allAsList(partETagsFutures).get();
       } catch (InterruptedException ie) {
-        LOG.warn("Interrupted partUpload:" + ie, ie);
+        LOG.warn("Interrupted partUpload: {}", ie, ie);
         Thread.currentThread().interrupt();
         return null;
       } catch (ExecutionException ee) {
@@ -382,11 +384,12 @@ public class S3AFastOutputStream extends OutputStream {
     public void abort() {
       LOG.warn("Aborting multi-part upload with id '{}'", uploadId);
       try {
+        fs.incrementStatistic(OBJECT_MULTIPART_UPLOAD_ABORTED);
         client.abortMultipartUpload(new AbortMultipartUploadRequest(bucket,
             key, uploadId));
       } catch (Exception e2) {
         LOG.warn("Unable to abort multipart upload, you may need to purge  " +
-            "uploaded parts: " + e2, e2);
+            "uploaded parts: {}", e2, e2);
       }
     }
   }

+ 7 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java

@@ -93,4 +93,11 @@ public class S3AFileStatus extends FileStatus {
       return super.getModificationTime();
     }
   }
+
+  @Override
+  public String toString() {
+    return super.toString() +
+        String.format(" isEmptyDirectory=%s", isEmptyDirectory());
+  }
+
 }

+ 377 - 130
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -42,7 +42,6 @@ import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.S3ClientOptions;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
-import com.amazonaws.services.s3.model.DeleteObjectRequest;
 import com.amazonaws.services.s3.model.DeleteObjectsRequest;
 import com.amazonaws.services.s3.model.ListObjectsRequest;
 import com.amazonaws.services.s3.model.ObjectListing;
@@ -50,6 +49,8 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.model.CopyObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectSummary;
+import com.amazonaws.services.s3.model.UploadPartRequest;
+import com.amazonaws.services.s3.model.UploadPartResult;
 import com.amazonaws.services.s3.transfer.Copy;
 import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
@@ -68,8 +69,13 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.GlobalStorageStatistics;
 import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.StorageStatistics;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.ProviderUtils;
 import org.apache.hadoop.util.Progressable;
@@ -77,6 +83,7 @@ import org.apache.hadoop.util.VersionInfo;
 
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +122,7 @@ public class S3AFileSystem extends FileSystem {
   private CannedAccessControlList cannedACL;
   private String serverSideEncryptionAlgorithm;
   private S3AInstrumentation instrumentation;
+  private S3AStorageStatistics storageStatistics;
   private long readAhead;
 
   // The maximum number of entries that can be deleted in any call to s3
@@ -182,6 +190,15 @@ public class S3AFileSystem extends FileSystem {
       enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
 
       readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
+      storageStatistics = (S3AStorageStatistics)
+          GlobalStorageStatistics.INSTANCE
+              .put(S3AStorageStatistics.NAME,
+                  new GlobalStorageStatistics.StorageStatisticsProvider() {
+                    @Override
+                    public StorageStatistics provide() {
+                      return new S3AStorageStatistics();
+                    }
+                  });
 
       int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
       if (maxThreads < 2) {
@@ -283,6 +300,14 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
+  /**
+   * Get S3A Instrumentation. For test purposes.
+   * @return this instance's instrumentation.
+   */
+  public S3AInstrumentation getInstrumentation() {
+    return instrumentation;
+  }
+
   /**
    * Initializes the User-Agent header to send in HTTP requests to the S3
    * back-end.  We always include the Hadoop version number.  The user also may
@@ -559,23 +584,26 @@ public class S3AFileSystem extends FileSystem {
     }
     instrumentation.fileCreated();
     if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
-      return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
-          key, progress, statistics, cannedACL,
-          serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
-          threadPoolExecutor), statistics);
+      return new FSDataOutputStream(
+          new S3AFastOutputStream(s3,
+              this,
+              bucket,
+              key,
+              progress,
+              cannedACL,
+              partSize,
+              multiPartThreshold,
+              threadPoolExecutor),
+          statistics);
     }
     // We pass null to FSDataOutputStream so it won't count writes that
     // are being buffered to a file
     return new FSDataOutputStream(
         new S3AOutputStream(getConf(),
-            transfers,
             this,
-            bucket,
             key,
-            progress,
-            cannedACL,
-            statistics,
-            serverSideEncryptionAlgorithm),
+            progress
+        ),
         null);
   }
 
@@ -631,6 +659,7 @@ public class S3AFileSystem extends FileSystem {
   private boolean innerRename(Path src, Path dst) throws IOException,
       AmazonClientException {
     LOG.debug("Rename path {} to {}", src, dst);
+    incrementStatistic(INVOCATION_RENAME);
 
     String srcKey = pathToKey(src);
     String dstKey = pathToKey(dst);
@@ -731,8 +760,7 @@ public class S3AFileSystem extends FileSystem {
       request.setPrefix(srcKey);
       request.setMaxKeys(maxKeys);
 
-      ObjectListing objects = s3.listObjects(request);
-      statistics.incrementReadOps(1);
+      ObjectListing objects = listObjects(request);
 
       while (true) {
         for (S3ObjectSummary summary : objects.getObjectSummaries()) {
@@ -746,8 +774,7 @@ public class S3AFileSystem extends FileSystem {
         }
 
         if (objects.isTruncated()) {
-          objects = s3.listNextBatchOfObjects(objects);
-          statistics.incrementReadOps(1);
+          objects = continueListObjects(objects);
         } else {
           if (!keysToDelete.isEmpty()) {
             removeKeys(keysToDelete, false);
@@ -775,17 +802,223 @@ public class S3AFileSystem extends FileSystem {
     return getObjectMetadata(pathToKey(path));
   }
 
+  /**
+   * Increment a statistic by 1.
+   * @param statistic The operation to increment
+   */
+  protected void incrementStatistic(Statistic statistic) {
+    incrementStatistic(statistic, 1);
+  }
+
+  /**
+   * Increment a statistic by a specific value.
+   * @param statistic The operation to increment
+   * @param count the count to increment
+   */
+  protected void incrementStatistic(Statistic statistic, long count) {
+    instrumentation.incrementCounter(statistic, count);
+    storageStatistics.incrementCounter(statistic, count);
+  }
+
   /**
    * Request object metadata; increments counters in the process.
    * @param key key
    * @return the metadata
    */
-  private ObjectMetadata getObjectMetadata(String key) {
+  protected ObjectMetadata getObjectMetadata(String key) {
+    incrementStatistic(OBJECT_METADATA_REQUESTS);
     ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
-    statistics.incrementReadOps(1);
+    incrementReadOperations();
     return meta;
   }
 
+  /**
+   * Initiate a {@code listObjects} operation, incrementing metrics
+   * in the process.
+   * @param request request to initiate
+   * @return the results
+   */
+  protected ObjectListing listObjects(ListObjectsRequest request) {
+    incrementStatistic(OBJECT_LIST_REQUESTS);
+    incrementReadOperations();
+    return s3.listObjects(request);
+  }
+
+  /**
+   * List the next set of objects.
+   * @param objects paged result
+   * @return the next result object
+   */
+  protected ObjectListing continueListObjects(ObjectListing objects) {
+    incrementStatistic(OBJECT_LIST_REQUESTS);
+    incrementReadOperations();
+    return s3.listNextBatchOfObjects(objects);
+  }
+
+  /**
+   * Increment read operations.
+   */
+  public void incrementReadOperations() {
+    statistics.incrementReadOps(1);
+  }
+
+  /**
+   * Increment the write operation counter.
+   * This is somewhat inaccurate, as it appears to be invoked more
+   * often than needed in progress callbacks.
+   */
+  public void incrementWriteOperations() {
+    statistics.incrementWriteOps(1);
+  }
+
+  /**
+   * Delete an object.
+   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+   * operation statistics.
+   * @param key key to blob to delete.
+   */
+  private void deleteObject(String key) {
+    incrementWriteOperations();
+    incrementStatistic(OBJECT_DELETE_REQUESTS);
+    s3.deleteObject(bucket, key);
+  }
+
+  /**
+   * Perform a bulk object delete operation.
+   * Increments the {@code OBJECT_DELETE_REQUESTS} and write
+   * operation statistics.
+   * @param deleteRequest keys to delete on the s3-backend
+   */
+  private void deleteObjects(DeleteObjectsRequest deleteRequest) {
+    incrementWriteOperations();
+    incrementStatistic(OBJECT_DELETE_REQUESTS, 1);
+    s3.deleteObjects(deleteRequest);
+  }
+
+  /**
+   * Create a putObject request.
+   * Adds the ACL and metadata
+   * @param key key of object
+   * @param metadata metadata header
+   * @param srcfile source file
+   * @return the request
+   */
+  public PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata, File srcfile) {
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+        srcfile);
+    putObjectRequest.setCannedAcl(cannedACL);
+    putObjectRequest.setMetadata(metadata);
+    return putObjectRequest;
+  }
+
+  /**
+   * Create a {@link PutObjectRequest} request.
+   * The metadata is assumed to have been configured with the size of the
+   * operation.
+   * @param key key of object
+   * @param metadata metadata header
+   * @param inputStream source data.
+   * @return the request
+   */
+  PutObjectRequest newPutObjectRequest(String key,
+      ObjectMetadata metadata, InputStream inputStream) {
+    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key,
+        inputStream, metadata);
+    putObjectRequest.setCannedAcl(cannedACL);
+    return putObjectRequest;
+  }
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   * @return a new metadata instance
+   */
+  public ObjectMetadata newObjectMetadata() {
+    final ObjectMetadata om = new ObjectMetadata();
+    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
+      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
+    }
+    return om;
+  }
+
+  /**
+   * Create a new object metadata instance.
+   * Any standard metadata headers are added here, for example:
+   * encryption.
+   *
+   * @param length length of data to set in header.
+   * @return a new metadata instance
+   */
+  public ObjectMetadata newObjectMetadata(long length) {
+    final ObjectMetadata om = newObjectMetadata();
+    om.setContentLength(length);
+    return om;
+  }
+
+  /**
+   * PUT an object, incrementing the put requests and put bytes
+   * counters.
+   * It does not update the other counters,
+   * as existing code does that as progress callbacks come in.
+   * Byte length is calculated from the file length, or, if there is no
+   * file, from the content length of the header.
+   * @param putObjectRequest the request
+   * @return the upload initiated
+   */
+  public Upload putObject(PutObjectRequest putObjectRequest) {
+    long len;
+    if (putObjectRequest.getFile() != null) {
+      len = putObjectRequest.getFile().length();
+    } else {
+      len = putObjectRequest.getMetadata().getContentLength();
+    }
+    incrementPutStartStatistics(len);
+    return transfers.upload(putObjectRequest);
+  }
+
+  /**
+   * Upload part of a multi-partition file.
+   * Increments the write and put counters
+   * @param request request
+   * @return the result of the operation.
+   */
+  public UploadPartResult uploadPart(UploadPartRequest request) {
+    incrementPutStartStatistics(request.getPartSize());
+    return s3.uploadPart(request);
+  }
+
+  /**
+   * At the start of a put/multipart upload operation, update the
+   * relevant counters.
+   *
+   * @param bytes bytes in the request.
+   */
+  public void incrementPutStartStatistics(long bytes) {
+    LOG.debug("PUT start {} bytes", bytes);
+    incrementWriteOperations();
+    incrementStatistic(OBJECT_PUT_REQUESTS);
+    if (bytes > 0) {
+      incrementStatistic(OBJECT_PUT_BYTES, bytes);
+    }
+  }
+
+  /**
+   * Callback for use in progress callbacks from put/multipart upload events.
+   * Increments those statistics which are expected to be updated during
+   * the ongoing upload operation.
+   * @param key key to file that is being written (for logging)
+   * @param bytes bytes successfully uploaded.
+   */
+  public void incrementPutProgressStatistics(String key, long bytes) {
+    LOG.debug("PUT {}: {} bytes", key, bytes);
+    incrementWriteOperations();
+    if (bytes > 0) {
+      statistics.incrementBytesWritten(bytes);
+    }
+  }
+
   /**
    * A helper method to delete a list of keys on a s3-backend.
    *
@@ -796,21 +1029,13 @@ public class S3AFileSystem extends FileSystem {
   private void removeKeys(List<DeleteObjectsRequest.KeyVersion> keysToDelete,
           boolean clearKeys) throws AmazonClientException {
     if (enableMultiObjectsDelete) {
-      DeleteObjectsRequest deleteRequest
-          = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
-      s3.deleteObjects(deleteRequest);
+      deleteObjects(new DeleteObjectsRequest(bucket).withKeys(keysToDelete));
       instrumentation.fileDeleted(keysToDelete.size());
-      statistics.incrementWriteOps(1);
     } else {
-      int writeops = 0;
-
       for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
-        s3.deleteObject(
-            new DeleteObjectRequest(bucket, keyVersion.getKey()));
-        writeops++;
+        deleteObject(keyVersion.getKey());
       }
       instrumentation.fileDeleted(keysToDelete.size());
-      statistics.incrementWriteOps(writeops);
     }
     if (clearKeys) {
       keysToDelete.clear();
@@ -880,9 +1105,8 @@ public class S3AFileSystem extends FileSystem {
 
       if (status.isEmptyDirectory()) {
         LOG.debug("Deleting fake empty directory {}", key);
-        s3.deleteObject(bucket, key);
+        deleteObject(key);
         instrumentation.directoryDeleted();
-        statistics.incrementWriteOps(1);
       } else {
         LOG.debug("Getting objects for directory prefix {} to delete", key);
 
@@ -893,9 +1117,9 @@ public class S3AFileSystem extends FileSystem {
         //request.setDelimiter("/");
         request.setMaxKeys(maxKeys);
 
-        List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>();
-        ObjectListing objects = s3.listObjects(request);
-        statistics.incrementReadOps(1);
+        ObjectListing objects = listObjects(request);
+        List<DeleteObjectsRequest.KeyVersion> keys =
+            new ArrayList<>(objects.getObjectSummaries().size());
         while (true) {
           for (S3ObjectSummary summary : objects.getObjectSummaries()) {
             keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
@@ -907,8 +1131,7 @@ public class S3AFileSystem extends FileSystem {
           }
 
           if (objects.isTruncated()) {
-            objects = s3.listNextBatchOfObjects(objects);
-            statistics.incrementReadOps(1);
+            objects = continueListObjects(objects);
           } else {
             if (!keys.isEmpty()) {
               removeKeys(keys, false);
@@ -919,13 +1142,11 @@ public class S3AFileSystem extends FileSystem {
       }
     } else {
       LOG.debug("delete: Path is a file");
-      s3.deleteObject(bucket, key);
       instrumentation.fileDeleted(1);
-      statistics.incrementWriteOps(1);
+      deleteObject(key);
     }
 
     createFakeDirectoryIfNecessary(f.getParent());
-
     return true;
   }
 
@@ -934,7 +1155,7 @@ public class S3AFileSystem extends FileSystem {
     String key = pathToKey(f);
     if (!key.isEmpty() && !exists(f)) {
       LOG.debug("Creating new fake directory at {}", f);
-      createFakeDirectory(bucket, key);
+      createFakeDirectory(key);
     }
   }
 
@@ -970,6 +1191,7 @@ public class S3AFileSystem extends FileSystem {
       IOException, AmazonClientException {
     String key = pathToKey(f);
     LOG.debug("List status for path: {}", f);
+    incrementStatistic(INVOCATION_LIST_STATUS);
 
     final List<FileStatus> result = new ArrayList<FileStatus>();
     final FileStatus fileStatus =  getFileStatus(f);
@@ -987,8 +1209,7 @@ public class S3AFileSystem extends FileSystem {
 
       LOG.debug("listStatus: doing listObjects for directory {}", key);
 
-      ObjectListing objects = s3.listObjects(request);
-      statistics.incrementReadOps(1);
+      ObjectListing objects = listObjects(request);
 
       Path fQualified = f.makeQualified(uri, workingDir);
 
@@ -999,33 +1220,25 @@ public class S3AFileSystem extends FileSystem {
           if (keyPath.equals(fQualified) ||
               summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
             LOG.debug("Ignoring: {}", keyPath);
-            continue;
-          }
-
-          if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
-            result.add(new S3AFileStatus(true, true, keyPath));
-            LOG.debug("Adding: fd: {}", keyPath);
           } else {
-            result.add(new S3AFileStatus(summary.getSize(),
-                dateToLong(summary.getLastModified()), keyPath,
-                getDefaultBlockSize(fQualified)));
-            LOG.debug("Adding: fi: {}", keyPath);
+            S3AFileStatus status = createFileStatus(keyPath, summary,
+                getDefaultBlockSize(keyPath));
+            result.add(status);
+            LOG.debug("Adding: {}", status);
           }
         }
 
         for (String prefix : objects.getCommonPrefixes()) {
           Path keyPath = keyToPath(prefix).makeQualified(uri, workingDir);
-          if (keyPath.equals(f)) {
-            continue;
+          if (!keyPath.equals(f)) {
+            result.add(new S3AFileStatus(true, false, keyPath));
+            LOG.debug("Adding: rd: {}", keyPath);
           }
-          result.add(new S3AFileStatus(true, false, keyPath));
-          LOG.debug("Adding: rd: {}", keyPath);
         }
 
         if (objects.isTruncated()) {
           LOG.debug("listStatus: list truncated - getting next batch");
-          objects = s3.listNextBatchOfObjects(objects);
-          statistics.incrementReadOps(1);
+          objects = continueListObjects(objects);
         } else {
           break;
         }
@@ -1038,8 +1251,6 @@ public class S3AFileSystem extends FileSystem {
     return result.toArray(new FileStatus[result.size()]);
   }
 
-
-
   /**
    * Set the current working directory for the given file system. All relative
    * paths will be resolved relative to it.
@@ -1061,7 +1272,7 @@ public class S3AFileSystem extends FileSystem {
   /**
    *
    * Make the given path and all non-existent parents into
-   * directories. Has the semantics of Unix @{code 'mkdir -p'}.
+   * directories. Has the semantics of Unix {@code 'mkdir -p'}.
    * Existence of the directory hierarchy is not an error.
    * @param path path to create
    * @param permission to apply to f
@@ -1096,7 +1307,7 @@ public class S3AFileSystem extends FileSystem {
   private boolean innerMkdirs(Path f, FsPermission permission)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
     LOG.debug("Making directory: {}", f);
-
+    incrementStatistic(INVOCATION_MKDIRS);
     try {
       FileStatus fileStatus = getFileStatus(f);
 
@@ -1125,7 +1336,7 @@ public class S3AFileSystem extends FileSystem {
       } while (fPart != null);
 
       String key = pathToKey(f);
-      createFakeDirectory(bucket, key);
+      createFakeDirectory(key);
       return true;
     }
   }
@@ -1139,12 +1350,12 @@ public class S3AFileSystem extends FileSystem {
    */
   public S3AFileStatus getFileStatus(Path f) throws IOException {
     String key = pathToKey(f);
+    incrementStatistic(INVOCATION_GET_FILE_STATUS);
     LOG.debug("Getting path status for {}  ({})", f , key);
 
     if (!key.isEmpty()) {
       try {
-        ObjectMetadata meta = s3.getObjectMetadata(bucket, key);
-        statistics.incrementReadOps(1);
+        ObjectMetadata meta = getObjectMetadata(key);
 
         if (objectRepresentsDirectory(key, meta.getContentLength())) {
           LOG.debug("Found exact file: fake directory");
@@ -1169,8 +1380,7 @@ public class S3AFileSystem extends FileSystem {
       if (!key.endsWith("/")) {
         String newKey = key + "/";
         try {
-          ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
-          statistics.incrementReadOps(1);
+          ObjectMetadata meta = getObjectMetadata(newKey);
 
           if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
             LOG.debug("Found file (with /): fake directory");
@@ -1203,8 +1413,7 @@ public class S3AFileSystem extends FileSystem {
       request.setDelimiter("/");
       request.setMaxKeys(1);
 
-      ObjectListing objects = s3.listObjects(request);
-      statistics.incrementReadOps(1);
+      ObjectListing objects = listObjects(request);
 
       if (!objects.getCommonPrefixes().isEmpty()
           || !objects.getObjectSummaries().isEmpty()) {
@@ -1287,7 +1496,8 @@ public class S3AFileSystem extends FileSystem {
   private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite,
       Path src, Path dst)
       throws IOException, FileAlreadyExistsException, AmazonClientException {
-    String key = pathToKey(dst);
+    incrementStatistic(INVOCATION_COPY_FROM_LOCAL_FILE);
+    final String key = pathToKey(dst);
 
     if (!overwrite && exists(dst)) {
       throw new FileAlreadyExistsException(dst + " already exists");
@@ -1298,35 +1508,19 @@ public class S3AFileSystem extends FileSystem {
     LocalFileSystem local = getLocal(getConf());
     File srcfile = local.pathToFile(src);
 
-    final ObjectMetadata om = new ObjectMetadata();
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-    }
-    PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, srcfile);
-    putObjectRequest.setCannedAcl(cannedACL);
-    putObjectRequest.setMetadata(om);
-
-    ProgressListener progressListener = new ProgressListener() {
-      public void progressChanged(ProgressEvent progressEvent) {
-        switch (progressEvent.getEventType()) {
-          case TRANSFER_PART_COMPLETED_EVENT:
-            statistics.incrementWriteOps(1);
-            break;
-          default:
-            break;
-        }
-      }
-    };
-
-    statistics.incrementWriteOps(1);
-    Upload up = transfers.upload(putObjectRequest);
-    up.addProgressListener(progressListener);
+    final ObjectMetadata om = newObjectMetadata();
+    PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile);
+    Upload up = putObject(putObjectRequest);
+    ProgressableProgressListener listener = new ProgressableProgressListener(
+        this, key, up, null);
+    up.addProgressListener(listener);
     try {
       up.waitForUploadResult();
     } catch (InterruptedException e) {
       throw new InterruptedIOException("Interrupted copying " + src
           + " to "  + dst + ", cancelling");
     }
+    listener.uploadCompleted();
 
     // This will delete unnecessary fake parent directories
     finishedWrite(key);
@@ -1375,7 +1569,7 @@ public class S3AFileSystem extends FileSystem {
     LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
 
     try {
-      ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
+      ObjectMetadata srcom = getObjectMetadata(srcKey);
       ObjectMetadata dstom = cloneObjectMetadata(srcom);
       if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
         dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
@@ -1389,7 +1583,7 @@ public class S3AFileSystem extends FileSystem {
         public void progressChanged(ProgressEvent progressEvent) {
           switch (progressEvent.getEventType()) {
             case TRANSFER_PART_COMPLETED_EVENT:
-              statistics.incrementWriteOps(1);
+              incrementWriteOperations();
               break;
             default:
               break;
@@ -1401,7 +1595,7 @@ public class S3AFileSystem extends FileSystem {
       copy.addProgressListener(progressListener);
       try {
         copy.waitForCopyResult();
-        statistics.incrementWriteOps(1);
+        incrementWriteOperations();
         instrumentation.filesCopied(1, size);
       } catch (InterruptedException e) {
         throw new InterruptedIOException("Interrupted copying " + srcKey
@@ -1413,26 +1607,12 @@ public class S3AFileSystem extends FileSystem {
     }
   }
 
-  private boolean objectRepresentsDirectory(final String name, final long size) {
-    return !name.isEmpty()
-        && name.charAt(name.length() - 1) == '/'
-        && size == 0L;
-  }
-
-  // Handles null Dates that can be returned by AWS
-  private static long dateToLong(final Date date) {
-    if (date == null) {
-      return 0L;
-    }
-
-    return date.getTime();
-  }
-
   /**
    * Perform post-write actions.
    * @param key key written to
    */
   public void finishedWrite(String key) {
+    LOG.debug("Finished write to {}", key);
     deleteUnnecessaryFakeDirectories(keyToPath(key).getParent());
   }
 
@@ -1454,8 +1634,7 @@ public class S3AFileSystem extends FileSystem {
 
         if (status.isDirectory() && status.isEmptyDirectory()) {
           LOG.debug("Deleting fake directory {}/", key);
-          s3.deleteObject(bucket, key + "/");
-          statistics.incrementWriteOps(1);
+          deleteObject(key + "/");
         }
       } catch (IOException | AmazonClientException e) {
         LOG.debug("While deleting key {} ", key, e);
@@ -1471,18 +1650,20 @@ public class S3AFileSystem extends FileSystem {
   }
 
 
-  private void createFakeDirectory(final String bucketName, final String objectName)
-      throws AmazonClientException, AmazonServiceException {
+  private void createFakeDirectory(final String objectName)
+      throws AmazonClientException, AmazonServiceException,
+      InterruptedIOException {
     if (!objectName.endsWith("/")) {
-      createEmptyObject(bucketName, objectName + "/");
+      createEmptyObject(objectName + "/");
     } else {
-      createEmptyObject(bucketName, objectName);
+      createEmptyObject(objectName);
     }
   }
 
   // Used to create an empty file that represents an empty directory
-  private void createEmptyObject(final String bucketName, final String objectName)
-      throws AmazonClientException, AmazonServiceException {
+  private void createEmptyObject(final String objectName)
+      throws AmazonClientException, AmazonServiceException,
+      InterruptedIOException {
     final InputStream im = new InputStream() {
       @Override
       public int read() throws IOException {
@@ -1490,16 +1671,16 @@ public class S3AFileSystem extends FileSystem {
       }
     };
 
-    final ObjectMetadata om = new ObjectMetadata();
-    om.setContentLength(0L);
-    if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-      om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
+    PutObjectRequest putObjectRequest = newPutObjectRequest(objectName,
+        newObjectMetadata(0L),
+        im);
+    Upload upload = putObject(putObjectRequest);
+    try {
+      upload.waitForUploadResult();
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException("Interrupted creating " + objectName);
     }
-    PutObjectRequest putObjectRequest =
-        new PutObjectRequest(bucketName, objectName, im, om);
-    putObjectRequest.setCannedAcl(cannedACL);
-    s3.putObject(putObjectRequest);
-    statistics.incrementWriteOps(1);
+    incrementPutProgressStatistics(objectName, 0);
     instrumentation.directoryCreated();
   }
 
@@ -1514,10 +1695,7 @@ public class S3AFileSystem extends FileSystem {
     // This approach may be too brittle, especially if
     // in future there are new attributes added to ObjectMetadata
     // that we do not explicitly call to set here
-    ObjectMetadata ret = new ObjectMetadata();
-
-    // Non null attributes
-    ret.setContentLength(source.getContentLength());
+    ObjectMetadata ret = newObjectMetadata(source.getContentLength());
 
     // Possibly null attributes
     // Allowing nulls to pass breaks it during later use
@@ -1626,6 +1804,75 @@ public class S3AFileSystem extends FileSystem {
     return multiPartThreshold;
   }
 
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public FileStatus[] globStatus(Path pathPattern) throws IOException {
+    incrementStatistic(INVOCATION_GLOB_STATUS);
+    return super.globStatus(pathPattern);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public FileStatus[] globStatus(Path pathPattern, PathFilter filter)
+      throws IOException {
+    incrementStatistic(INVOCATION_GLOB_STATUS);
+    return super.globStatus(pathPattern, filter);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
+      throws FileNotFoundException, IOException {
+    incrementStatistic(INVOCATION_LIST_LOCATED_STATUS);
+    return super.listLocatedStatus(f);
+  }
+
+  @Override
+  public RemoteIterator<LocatedFileStatus> listFiles(Path f,
+      boolean recursive) throws FileNotFoundException, IOException {
+    incrementStatistic(INVOCATION_LIST_FILES);
+    return super.listFiles(f, recursive);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean exists(Path f) throws IOException {
+    incrementStatistic(INVOCATION_EXISTS);
+    return super.exists(f);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isDirectory(Path f) throws IOException {
+    incrementStatistic(INVOCATION_IS_DIRECTORY);
+    return super.isDirectory(f);
+  }
+
+  /**
+   * Override superclass so as to add statistic collection.
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isFile(Path f) throws IOException {
+    incrementStatistic(INVOCATION_IS_FILE);
+    return super.isFile(f);
+  }
+
   /**
    * Get a integer option >= the minimum allowed value.
    * @param conf configuration

+ 145 - 73
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.fs.s3a;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.MetricStringBuilder;
@@ -26,49 +27,30 @@ import org.apache.hadoop.metrics2.lib.Interns;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+import org.apache.hadoop.metrics2.lib.MutableMetric;
 
 import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+
 /**
  * Instrumentation of S3a.
- * Derived from the {@code AzureFileSystemInstrumentation}
+ * Derived from the {@code AzureFileSystemInstrumentation}.
+ *
+ * Counters and metrics are generally addressed in code by their name or
+ * {@link Statistic} key. There <i>may</i> be some Statistics which do
+ * not have an entry here. To avoid attempts to access such counters failing,
+ * the operations to increment/query metric values are designed to handle
+ * lookup failures.
  */
 @Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public class S3AInstrumentation {
   public static final String CONTEXT = "S3AFileSystem";
-
-  public static final String STREAM_OPENED = "streamOpened";
-  public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations";
-  public static final String STREAM_CLOSED = "streamClosed";
-  public static final String STREAM_ABORTED = "streamAborted";
-  public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions";
-  public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations";
-  public static final String STREAM_FORWARD_SEEK_OPERATIONS
-      = "streamForwardSeekOperations";
-  public static final String STREAM_BACKWARD_SEEK_OPERATIONS
-      = "streamBackwardSeekOperations";
-  public static final String STREAM_SEEK_BYTES_SKIPPED =
-      "streamBytesSkippedOnSeek";
-  public static final String STREAM_SEEK_BYTES_BACKWARDS =
-      "streamBytesBackwardsOnSeek";
-  public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead";
-  public static final String STREAM_READ_OPERATIONS = "streamReadOperations";
-  public static final String STREAM_READ_FULLY_OPERATIONS
-      = "streamReadFullyOperations";
-  public static final String STREAM_READ_OPERATIONS_INCOMPLETE
-      = "streamReadOperationsIncomplete";
-  public static final String FILES_CREATED = "files_created";
-  public static final String FILES_COPIED = "files_copied";
-  public static final String FILES_COPIED_BYTES = "files_copied_bytes";
-  public static final String FILES_DELETED = "files_deleted";
-  public static final String DIRECTORIES_CREATED = "directories_created";
-  public static final String DIRECTORIES_DELETED = "directories_deleted";
-  public static final String IGNORED_ERRORS = "ignored_errors";
   private final MetricsRegistry registry =
       new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
   private final MutableCounterLong streamOpenOperations;
@@ -95,6 +77,27 @@ public class S3AInstrumentation {
   private final MutableCounterLong numberOfDirectoriesDeleted;
   private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>();
 
+  private static final Statistic[] COUNTERS_TO_CREATE = {
+      INVOCATION_COPY_FROM_LOCAL_FILE,
+      INVOCATION_EXISTS,
+      INVOCATION_GET_FILE_STATUS,
+      INVOCATION_GLOB_STATUS,
+      INVOCATION_IS_DIRECTORY,
+      INVOCATION_IS_FILE,
+      INVOCATION_LIST_FILES,
+      INVOCATION_LIST_LOCATED_STATUS,
+      INVOCATION_LIST_STATUS,
+      INVOCATION_MKDIRS,
+      INVOCATION_RENAME,
+      OBJECT_COPY_REQUESTS,
+      OBJECT_DELETE_REQUESTS,
+      OBJECT_LIST_REQUESTS,
+      OBJECT_METADATA_REQUESTS,
+      OBJECT_MULTIPART_UPLOAD_ABORTED,
+      OBJECT_PUT_BYTES,
+      OBJECT_PUT_REQUESTS
+  };
+
   public S3AInstrumentation(URI name) {
     UUID fileSystemInstanceId = UUID.randomUUID();
     registry.tag("FileSystemId",
@@ -103,50 +106,35 @@ public class S3AInstrumentation {
     registry.tag("fsURI",
         "URI of this filesystem",
         name.toString());
-    streamOpenOperations = streamCounter(STREAM_OPENED,
-        "Total count of times an input stream to object store was opened");
-    streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS,
-        "Total count of times an attempt to close a data stream was made");
-    streamClosed = streamCounter(STREAM_CLOSED,
-        "Count of times the TCP stream was closed");
-    streamAborted = streamCounter(STREAM_ABORTED,
-        "Count of times the TCP stream was aborted");
-    streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS,
-        "Number of seek operations invoked on input streams");
-    streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS,
-        "Number of read exceptions caught and attempted to recovered from");
-    streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS,
-        "Number of executed seek operations which went forward in a stream");
-    streamBackwardSeekOperations = streamCounter(
-        STREAM_BACKWARD_SEEK_OPERATIONS,
-        "Number of executed seek operations which went backwards in a stream");
-    streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED,
-        "Count of bytes skipped during forward seek operations");
-    streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS,
-        "Count of bytes moved backwards during seek operations");
-    streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ,
-        "Count of bytes read during seek() in stream operations");
-    streamReadOperations = streamCounter(STREAM_READ_OPERATIONS,
-        "Count of read() operations in streams");
-    streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS,
-        "Count of readFully() operations in streams");
-    streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE,
-        "Count of incomplete read() operations in streams");
-
-    numberOfFilesCreated = counter(FILES_CREATED,
-            "Total number of files created through the object store.");
-    numberOfFilesCopied = counter(FILES_COPIED,
-            "Total number of files copied within the object store.");
-    bytesOfFilesCopied = counter(FILES_COPIED_BYTES,
-            "Total number of bytes copied within the object store.");
-    numberOfFilesDeleted = counter(FILES_DELETED,
-            "Total number of files deleted through from the object store.");
-    numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED,
-        "Total number of directories created through the object store.");
-    numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED,
-        "Total number of directories deleted through the object store.");
-    ignoredErrors = counter(IGNORED_ERRORS,
-        "Total number of errors caught and ingored.");
+    streamOpenOperations = streamCounter(STREAM_OPENED);
+    streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
+    streamClosed = streamCounter(STREAM_CLOSED);
+    streamAborted = streamCounter(STREAM_ABORTED);
+    streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS);
+    streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS);
+    streamForwardSeekOperations =
+        streamCounter(STREAM_FORWARD_SEEK_OPERATIONS);
+    streamBackwardSeekOperations =
+        streamCounter(STREAM_BACKWARD_SEEK_OPERATIONS);
+    streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED);
+    streamBytesBackwardsOnSeek =
+        streamCounter(STREAM_SEEK_BYTES_BACKWARDS);
+    streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ);
+    streamReadOperations = streamCounter(STREAM_READ_OPERATIONS);
+    streamReadFullyOperations =
+        streamCounter(STREAM_READ_FULLY_OPERATIONS);
+    streamReadsIncomplete =
+        streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE);
+    numberOfFilesCreated = counter(FILES_CREATED);
+    numberOfFilesCopied = counter(FILES_COPIED);
+    bytesOfFilesCopied = counter(FILES_COPIED_BYTES);
+    numberOfFilesDeleted = counter(FILES_DELETED);
+    numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED);
+    numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED);
+    ignoredErrors = counter(IGNORED_ERRORS);
+    for (Statistic statistic : COUNTERS_TO_CREATE) {
+      counter(statistic);
+    }
   }
 
   /**
@@ -173,6 +161,25 @@ public class S3AInstrumentation {
     return counter;
   }
 
+  /**
+   * Create a counter in the registry.
+   * @param op statistic to count
+   * @return a new counter
+   */
+  protected final MutableCounterLong counter(Statistic op) {
+    return counter(op.getSymbol(), op.getDescription());
+  }
+
+  /**
+   * Create a counter in the stream map: these are unregistered in the public
+   * metrics.
+   * @param op statistic to count
+   * @return a new counter
+   */
+  protected final MutableCounterLong streamCounter(Statistic op) {
+    return streamCounter(op.getSymbol(), op.getDescription());
+  }
+
   /**
    * Create a gauge in the registry.
    * @param name name gauge name
@@ -215,6 +222,58 @@ public class S3AInstrumentation {
     return metricBuilder.toString();
   }
 
+  /**
+   * Get the value of a counter.
+   * @param statistic the operation
+   * @return its value, or 0 if not found.
+   */
+  public long getCounterValue(Statistic statistic) {
+    return getCounterValue(statistic.getSymbol());
+  }
+
+  /**
+   * Get the value of a counter.
+   * If the counter is null, return 0.
+   * @param name the name of the counter
+   * @return its value.
+   */
+  public long getCounterValue(String name) {
+    MutableCounterLong counter = lookupCounter(name);
+    return counter == null ? 0 : counter.value();
+  }
+
+  /**
+   * Lookup a counter by name. Return null if it is not known.
+   * @param name counter name
+   * @return the counter
+   */
+  private MutableCounterLong lookupCounter(String name) {
+    MutableMetric metric = lookupMetric(name);
+    if (metric == null) {
+      return null;
+    }
+    Preconditions.checkNotNull(metric, "not found: " + name);
+    if (!(metric instanceof MutableCounterLong)) {
+      throw new IllegalStateException("Metric " + name
+          + " is not a MutableCounterLong: " + metric);
+    }
+    return (MutableCounterLong) metric;
+  }
+
+  /**
+   * Look up a metric from both the registered set and the lighter weight
+   * stream entries.
+   * @param name metric name
+   * @return the metric or null
+   */
+  public MutableMetric lookupMetric(String name) {
+    MutableMetric metric = getRegistry().get(name);
+    if (metric == null) {
+      metric = streamMetrics.get(name);
+    }
+    return metric;
+  }
+
   /**
    * Indicate that S3A created a file.
    */
@@ -262,6 +321,19 @@ public class S3AInstrumentation {
     ignoredErrors.incr();
   }
 
+  /**
+   * Increment a specific counter.
+   * No-op if not defined.
+   * @param op operation
+   * @param count increment value
+   */
+  public void incrementCounter(Statistic op, long count) {
+    MutableCounterLong counter = lookupCounter(op.getSymbol());
+    if (counter != null) {
+      counter.incr(count);
+    }
+  }
+
   /**
    * Create a stream input statistics instance.
    * @return the new instance

+ 11 - 87
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java

@@ -19,19 +19,11 @@
 package org.apache.hadoop.fs.s3a;
 
 import com.amazonaws.AmazonClientException;
-import com.amazonaws.event.ProgressEvent;
-import com.amazonaws.event.ProgressEventType;
-import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.ObjectMetadata;
-import com.amazonaws.services.s3.model.PutObjectRequest;
-import com.amazonaws.services.s3.transfer.TransferManager;
 import com.amazonaws.services.s3.transfer.Upload;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.util.Progressable;
 
@@ -44,8 +36,6 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.OutputStream;
 
-import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
-import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 
@@ -59,32 +49,20 @@ public class S3AOutputStream extends OutputStream {
   private File backupFile;
   private boolean closed;
   private String key;
-  private String bucket;
-  private TransferManager transfers;
   private Progressable progress;
   private long partSize;
   private long partSizeThreshold;
   private S3AFileSystem fs;
-  private CannedAccessControlList cannedACL;
-  private FileSystem.Statistics statistics;
   private LocalDirAllocator lDirAlloc;
-  private String serverSideEncryptionAlgorithm;
 
   public static final Logger LOG = S3AFileSystem.LOG;
 
-  public S3AOutputStream(Configuration conf, TransferManager transfers,
-      S3AFileSystem fs, String bucket, String key, Progressable progress,
-      CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
-      String serverSideEncryptionAlgorithm)
+  public S3AOutputStream(Configuration conf,
+      S3AFileSystem fs, String key, Progressable progress)
       throws IOException {
-    this.bucket = bucket;
     this.key = key;
-    this.transfers = transfers;
     this.progress = progress;
     this.fs = fs;
-    this.cannedACL = cannedACL;
-    this.statistics = statistics;
-    this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
 
     partSize = fs.getPartitionSize();
     partSizeThreshold = fs.getMultiPartThreshold();
@@ -124,30 +102,18 @@ public class S3AOutputStream extends OutputStream {
 
 
     try {
-      final ObjectMetadata om = new ObjectMetadata();
-      if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
-        om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
-      }
-      PutObjectRequest putObjectRequest =
-          new PutObjectRequest(bucket, key, backupFile);
-      putObjectRequest.setCannedAcl(cannedACL);
-      putObjectRequest.setMetadata(om);
-
-      Upload upload = transfers.upload(putObjectRequest);
-
-      ProgressableProgressListener listener = 
-          new ProgressableProgressListener(upload, progress, statistics);
+      final ObjectMetadata om = fs.newObjectMetadata();
+      Upload upload = fs.putObject(
+          fs.newPutObjectRequest(
+              key,
+              om,
+              backupFile));
+      ProgressableProgressListener listener =
+          new ProgressableProgressListener(fs, key, upload, progress);
       upload.addProgressListener(listener);
 
       upload.waitForUploadResult();
-
-      long delta = upload.getProgress().getBytesTransferred() -
-          listener.getLastBytesTransferred();
-      if (statistics != null && delta != 0) {
-        LOG.debug("S3A write delta changed after finished: {} bytes", delta);
-        statistics.incrementBytesWritten(delta);
-      }
-
+      listener.uploadCompleted();
       // This will delete unnecessary fake parent directories
       fs.finishedWrite(key);
     } catch (InterruptedException e) {
@@ -175,46 +141,4 @@ public class S3AOutputStream extends OutputStream {
     backupStream.write(b, off, len);
   }
 
-  /**
-   * Listener to progress from AWS regarding transfers.
-   */
-  public static class ProgressableProgressListener implements ProgressListener {
-    private Progressable progress;
-    private FileSystem.Statistics statistics;
-    private long lastBytesTransferred;
-    private Upload upload;
-
-    public ProgressableProgressListener(Upload upload, Progressable progress, 
-        FileSystem.Statistics statistics) {
-      this.upload = upload;
-      this.progress = progress;
-      this.statistics = statistics;
-      this.lastBytesTransferred = 0;
-    }
-
-    public void progressChanged(ProgressEvent progressEvent) {
-      if (progress != null) {
-        progress.progress();
-      }
-
-      // There are 3 http ops here, but this should be close enough for now
-      ProgressEventType pet = progressEvent.getEventType();
-      if (pet == TRANSFER_PART_STARTED_EVENT ||
-          pet == TRANSFER_COMPLETED_EVENT) {
-        statistics.incrementWriteOps(1);
-      }
-
-      long transferred = upload.getProgress().getBytesTransferred();
-      long delta = transferred - lastBytesTransferred;
-      if (statistics != null && delta != 0) {
-        statistics.incrementBytesWritten(delta);
-      }
-
-      lastBytesTransferred = transferred;
-    }
-
-    public long getLastBytesTransferred() {
-      return lastBytesTransferred;
-    }
-  }
 }

+ 104 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AStorageStatistics.java

@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.StorageStatistics;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Storage statistics for S3A.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class S3AStorageStatistics extends StorageStatistics {
+  private static final Logger LOG = S3AFileSystem.LOG;
+
+  public static final String NAME = "S3AStorageStatistics";
+  private final Map<Statistic, AtomicLong> opsCount =
+      new EnumMap<>(Statistic.class);
+
+  public S3AStorageStatistics() {
+    super(NAME);
+    for (Statistic opType : Statistic.values()) {
+      opsCount.put(opType, new AtomicLong(0));
+    }
+  }
+
+  /**
+   * Increment a specific counter.
+   * @param op operation
+   * @param count increment value
+   * @return the new value
+   */
+  public long incrementCounter(Statistic op, long count) {
+    long updated = opsCount.get(op).addAndGet(count);
+    LOG.debug("{} += {}  ->  {}", op, count, updated);
+    return updated;
+  }
+
+  private class LongIterator implements Iterator<LongStatistic> {
+    private Iterator<Map.Entry<Statistic, AtomicLong>> iterator =
+        Collections.unmodifiableSet(opsCount.entrySet()).iterator();
+
+    @Override
+    public boolean hasNext() {
+      return iterator.hasNext();
+    }
+
+    @Override
+    public LongStatistic next() {
+      if (!iterator.hasNext()) {
+        throw new NoSuchElementException();
+      }
+      final Map.Entry<Statistic, AtomicLong> entry = iterator.next();
+      return new LongStatistic(entry.getKey().name(), entry.getValue().get());
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException();
+    }
+  }
+
+  @Override
+  public Iterator<LongStatistic> getLongStatistics() {
+    return new LongIterator();
+  }
+
+  @Override
+  public Long getLong(String key) {
+    final Statistic type = Statistic.fromSymbol(key);
+    return type == null ? null : opsCount.get(type).get();
+  }
+
+  @Override
+  public boolean isTracked(String key) {
+    return Statistic.fromSymbol(key) == null;
+  }
+
+}

+ 48 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
+import com.amazonaws.services.s3.model.S3ObjectSummary;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
@@ -29,6 +30,7 @@ import java.io.EOFException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.nio.file.AccessDeniedException;
+import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ExecutionException;
 
@@ -186,4 +188,50 @@ public final class S3AUtils {
     }
     return builder.toString();
   }
+
+  /**
+   * Create a files status instance from a listing.
+   * @param keyPath path to entry
+   * @param summary summary from AWS
+   * @param blockSize block size to declare.
+   * @return a status entry
+   */
+  public static S3AFileStatus createFileStatus(Path keyPath,
+      S3ObjectSummary summary,
+      long blockSize) {
+    if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
+      return new S3AFileStatus(true, true, keyPath);
+    } else {
+      return new S3AFileStatus(summary.getSize(),
+          dateToLong(summary.getLastModified()), keyPath,
+          blockSize);
+    }
+  }
+
+  /**
+   * Predicate: does the object represent a directory?.
+   * @param name object name
+   * @param size object size
+   * @return true if it meets the criteria for being an object
+   */
+  public static boolean objectRepresentsDirectory(final String name,
+      final long size) {
+    return !name.isEmpty()
+        && name.charAt(name.length() - 1) == '/'
+        && size == 0L;
+  }
+
+  /**
+   * Date to long conversion.
+   * Handles null Dates that can be returned by AWS by returning 0
+   * @param date date from AWS query
+   * @return timestamp of the object
+   */
+  public static long dateToLong(final Date date) {
+    if (date == null) {
+      return 0L;
+    }
+
+    return date.getTime();
+  }
 }

+ 143 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+/**
+ * Statistic which are collected in S3A.
+ * These statistics are available at a low level in {@link S3AStorageStatistics}
+ * and as metrics in {@link S3AInstrumentation}
+ */
+public enum Statistic {
+
+  DIRECTORIES_CREATED("directories_created",
+      "Total number of directories created through the object store."),
+  DIRECTORIES_DELETED("directories_deleted",
+      "Total number of directories deleted through the object store."),
+  FILES_COPIED("files_copied",
+      "Total number of files copied within the object store."),
+  FILES_COPIED_BYTES("files_copied_bytes",
+      "Total number of bytes copied within the object store."),
+  FILES_CREATED("files_created",
+      "Total number of files created through the object store."),
+  FILES_DELETED("files_deleted",
+      "Total number of files deleted from the object store."),
+  IGNORED_ERRORS("ignored_errors", "Errors caught and ignored"),
+  INVOCATION_COPY_FROM_LOCAL_FILE("invocations_copyfromlocalfile",
+      "Calls of copyFromLocalFile()"),
+  INVOCATION_EXISTS("invocations_exists",
+      "Calls of exists()"),
+  INVOCATION_GET_FILE_STATUS("invocations_getfilestatus",
+      "Calls of getFileStatus()"),
+  INVOCATION_GLOB_STATUS("invocations_globstatus",
+      "Calls of globStatus()"),
+  INVOCATION_IS_DIRECTORY("invocations_is_directory",
+      "Calls of isDirectory()"),
+  INVOCATION_IS_FILE("invocations_is_file",
+      "Calls of isFile()"),
+  INVOCATION_LIST_FILES("invocations_listfiles",
+      "Calls of listFiles()"),
+  INVOCATION_LIST_LOCATED_STATUS("invocations_listlocatedstatus",
+      "Calls of listLocatedStatus()"),
+  INVOCATION_LIST_STATUS("invocations_liststatus",
+      "Calls of listStatus()"),
+  INVOCATION_MKDIRS("invocations_mdkirs",
+      "Calls of mkdirs()"),
+  INVOCATION_RENAME("invocations_rename",
+      "Calls of rename()"),
+  OBJECT_COPY_REQUESTS("object_copy_requests", "Object copy requests"),
+  OBJECT_DELETE_REQUESTS("object_delete_requests", "Object delete requests"),
+  OBJECT_LIST_REQUESTS("object_list_requests",
+      "Number of object listings made"),
+  OBJECT_METADATA_REQUESTS("object_metadata_requests",
+      "Number of requests for object metadata"),
+  OBJECT_MULTIPART_UPLOAD_ABORTED("object_multipart_aborted",
+      "Object multipart upload aborted"),
+  OBJECT_PUT_REQUESTS("object_put_requests",
+      "Object put/multipart upload count"),
+  OBJECT_PUT_BYTES("object_put_bytes", "number of bytes uploaded"),
+  STREAM_ABORTED("streamAborted",
+      "Count of times the TCP stream was aborted"),
+  STREAM_BACKWARD_SEEK_OPERATIONS("streamBackwardSeekOperations",
+      "Number of executed seek operations which went backwards in a stream"),
+  STREAM_CLOSED("streamClosed", "Count of times the TCP stream was closed"),
+  STREAM_CLOSE_OPERATIONS("streamCloseOperations",
+      "Total count of times an attempt to close a data stream was made"),
+  STREAM_FORWARD_SEEK_OPERATIONS("streamForwardSeekOperations",
+      "Number of executed seek operations which went forward in a stream"),
+  STREAM_OPENED("streamOpened",
+      "Total count of times an input stream to object store was opened"),
+  STREAM_READ_EXCEPTIONS("streamReadExceptions",
+      "Number of seek operations invoked on input streams"),
+  STREAM_READ_FULLY_OPERATIONS("streamReadFullyOperations",
+      "count of readFully() operations in streams"),
+  STREAM_READ_OPERATIONS("streamReadOperations",
+      "Count of read() operations in streams"),
+  STREAM_READ_OPERATIONS_INCOMPLETE("streamReadOperationsIncomplete",
+      "Count of incomplete read() operations in streams"),
+  STREAM_SEEK_BYTES_BACKWARDS("streamBytesBackwardsOnSeek",
+      "Count of bytes moved backwards during seek operations"),
+  STREAM_SEEK_BYTES_READ("streamBytesRead",
+      "Count of bytes read during seek() in stream operations"),
+  STREAM_SEEK_BYTES_SKIPPED("streamBytesSkippedOnSeek",
+      "Count of bytes skipped during forward seek operation"),
+  STREAM_SEEK_OPERATIONS("streamSeekOperations",
+      "Number of read exceptions caught and attempted to recovered from");
+
+  Statistic(String symbol, String description) {
+    this.symbol = symbol;
+    this.description = description;
+  }
+
+  private final String symbol;
+  private final String description;
+
+  public String getSymbol() {
+    return symbol;
+  }
+
+  /**
+   * Get a statistic from a symbol.
+   * @param symbol statistic to look up
+   * @return the value or null.
+   */
+  public static Statistic fromSymbol(String symbol) {
+    if (symbol != null) {
+      for (Statistic opType : values()) {
+        if (opType.getSymbol().equals(symbol)) {
+          return opType;
+        }
+      }
+    }
+    return null;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  /**
+   * The string value is simply the symbol.
+   * This makes this operation very low cost.
+   * @return the symbol of this statistic.
+   */
+  @Override
+  public String toString() {
+    return symbol;
+  }
+}

+ 11 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -735,9 +735,19 @@ The exact number of operations to perform is configurable in the option
 Larger values generate more load, and are recommended when testing locally,
 or in batch runs.
 
-Smaller values should result in faster test runs, especially when the object
+Smaller values results in faster test runs, especially when the object
 store is a long way away.
 
+Operations which work on directories have a separate option: this controls
+the width and depth of tests creating recursive directories. Larger
+values create exponentially more directories, with consequent performance
+impact.
+
+      <property>
+        <name>scale.test.directory.count</name>
+        <value>2</value>
+      </property>
+
 DistCp tests targeting S3A support a configurable file size.  The default is
 10 MB, but the configuration value is expressed in KB so that it can be tuned
 smaller to achieve faster test runs.

+ 153 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

@@ -21,7 +21,9 @@ package org.apache.hadoop.fs.s3a;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
+import org.junit.Assert;
 import org.junit.internal.AssumptionViolatedException;
+import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.net.URI;
@@ -190,4 +192,155 @@ public class S3ATestUtils {
     }
   }
 
+  /**
+   * Reset all metrics in a list.
+   * @param metrics metrics to reset
+   */
+  public static void reset(S3ATestUtils.MetricDiff... metrics) {
+    for (S3ATestUtils.MetricDiff metric : metrics) {
+      metric.reset();
+    }
+  }
+
+  /**
+   * Print all metrics in a list.
+   * @param log log to print the metrics to.
+   * @param metrics metrics to process
+   */
+  public static void print(Logger log, S3ATestUtils.MetricDiff... metrics) {
+    for (S3ATestUtils.MetricDiff metric : metrics) {
+      log.info(metric.toString());
+    }
+  }
+
+  /**
+   * Print all metrics in a list, then reset them.
+   * @param log log to print the metrics to.
+   * @param metrics metrics to process
+   */
+  public static void printThenReset(Logger log,
+      S3ATestUtils.MetricDiff... metrics) {
+    print(log, metrics);
+    reset(metrics);
+  }
+
+  /**
+   * Helper class to do diffs of metrics.
+   */
+  public static final class MetricDiff {
+    private final S3AFileSystem fs;
+    private final Statistic statistic;
+    private long startingValue;
+
+    /**
+     * Constructor.
+     * Invokes {@link #reset()} so it is immediately capable of measuring the
+     * difference in metric values.
+     *
+     * @param fs the filesystem to monitor
+     * @param statistic the statistic to monitor.
+     */
+    public MetricDiff(S3AFileSystem fs, Statistic statistic) {
+      this.fs = fs;
+      this.statistic = statistic;
+      reset();
+    }
+
+    /**
+     * Reset the starting value to the current value.
+     * Diffs will be against this new value.
+     */
+    public void reset() {
+      startingValue = currentValue();
+    }
+
+    /**
+     * Get the current value of the metric.
+     * @return the latest value.
+     */
+    public long currentValue() {
+      return fs.getInstrumentation().getCounterValue(statistic);
+    }
+
+    /**
+     * Get the difference between the the current value and
+     * {@link #startingValue}.
+     * @return the difference.
+     */
+    public long diff() {
+      return currentValue() - startingValue;
+    }
+
+    @Override
+    public String toString() {
+      long c = currentValue();
+      final StringBuilder sb = new StringBuilder(statistic.getSymbol());
+      sb.append(" starting=").append(startingValue);
+      sb.append(" current=").append(c);
+      sb.append(" diff=").append(c - startingValue);
+      return sb.toString();
+    }
+
+    /**
+     * Assert that the value of {@link #diff()} matches that expected.
+     * @param expected expected value.
+     */
+    public void assertDiffEquals(long expected) {
+      Assert.assertEquals("Count of " + this,
+          expected, diff());
+    }
+
+    /**
+     * Assert that the value of {@link #diff()} matches that of another
+     * instance.
+     * @param that the other metric diff instance.
+     */
+    public void assertDiffEquals(MetricDiff that) {
+      Assert.assertEquals(this.toString() + " != " + that,
+          this.diff(), that.diff());
+    }
+
+    /**
+     * Comparator for assertions.
+     * @param that other metric diff
+     * @return true if the value is {@code ==} the other's
+     */
+    public boolean diffEquals(MetricDiff that) {
+      return this.currentValue() == that.currentValue();
+    }
+
+    /**
+     * Comparator for assertions.
+     * @param that other metric diff
+     * @return true if the value is {@code <} the other's
+     */
+    public boolean diffLessThan(MetricDiff that) {
+      return this.currentValue() < that.currentValue();
+    }
+
+    /**
+     * Comparator for assertions.
+     * @param that other metric diff
+     * @return true if the value is {@code <=} the other's
+     */
+    public boolean diffLessThanOrEquals(MetricDiff that) {
+      return this.currentValue() <= that.currentValue();
+    }
+
+    /**
+     * Get the statistic
+     * @return the statistic
+     */
+    public Statistic getStatistic() {
+      return statistic;
+    }
+
+    /**
+     * Get the starting value; that set in the last {@link #reset()}.
+     * @return the starting value for diffs.
+     */
+    public long getStartingValue() {
+      return startingValue;
+    }
+  }
 }

+ 191 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AFileOperationCost.java

@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.AbstractFSContract;
+import org.apache.hadoop.fs.contract.AbstractFSContractTestBase;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.contract.s3a.S3AContract;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.net.URI;
+
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.MetricDiff;
+import static org.apache.hadoop.test.GenericTestUtils.getTestDir;
+
+/**
+ * Use metrics to assert about the cost of file status queries.
+ * {@link S3AFileSystem#getFileStatus(Path)}.
+ */
+public class TestS3AFileOperationCost extends AbstractFSContractTestBase {
+
+  private MetricDiff metadataRequests;
+  private MetricDiff listRequests;
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestS3AFileOperationCost.class);
+
+  @Override
+  protected AbstractFSContract createContract(Configuration conf) {
+    return new S3AContract(conf);
+  }
+
+  @Override
+  public S3AFileSystem getFileSystem() {
+    return (S3AFileSystem) super.getFileSystem();
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    S3AFileSystem fs = getFileSystem();
+    metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
+    listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnFile() throws Throwable {
+    describe("performing getFileStatus on a file");
+    Path simpleFile = path("simple.txt");
+    S3AFileSystem fs = getFileSystem();
+    touch(fs, simpleFile);
+    resetMetricDiffs();
+    S3AFileStatus status = fs.getFileStatus(simpleFile);
+    assertTrue("not a file: " + status, status.isFile());
+    metadataRequests.assertDiffEquals(1);
+    listRequests.assertDiffEquals(0);
+  }
+
+  private void resetMetricDiffs() {
+    reset(metadataRequests, listRequests);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnEmptyDir() throws Throwable {
+    describe("performing getFileStatus on an empty directory");
+    S3AFileSystem fs = getFileSystem();
+    Path dir = path("empty");
+    fs.mkdirs(dir);
+    resetMetricDiffs();
+    S3AFileStatus status = fs.getFileStatus(dir);
+    assertTrue("not empty: " + status, status.isEmptyDirectory());
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(0);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnMissingFile() throws Throwable {
+    describe("performing getFileStatus on a missing file");
+    S3AFileSystem fs = getFileSystem();
+    Path path = path("missing");
+    resetMetricDiffs();
+    try {
+      S3AFileStatus status = fs.getFileStatus(path);
+      fail("Got a status back from a missing file path " + status);
+    } catch (FileNotFoundException expected) {
+      // expected
+    }
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(1);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnMissingSubPath() throws Throwable {
+    describe("performing getFileStatus on a missing file");
+    S3AFileSystem fs = getFileSystem();
+    Path path = path("missingdir/missingpath");
+    resetMetricDiffs();
+    try {
+      S3AFileStatus status = fs.getFileStatus(path);
+      fail("Got a status back from a missing file path " + status);
+    } catch (FileNotFoundException expected) {
+      // expected
+    }
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(1);
+  }
+
+  @Test
+  public void testCostOfGetFileStatusOnNonEmptyDir() throws Throwable {
+    describe("performing getFileStatus on a non-empty directory");
+    S3AFileSystem fs = getFileSystem();
+    Path dir = path("empty");
+    fs.mkdirs(dir);
+    Path simpleFile = new Path(dir, "simple.txt");
+    touch(fs, simpleFile);
+    resetMetricDiffs();
+    S3AFileStatus status = fs.getFileStatus(dir);
+    if (status.isEmptyDirectory()) {
+      // erroneous state
+      String fsState = fs.toString();
+      fail("FileStatus says directory isempty: " + status
+          + "\n" + ContractTestUtils.ls(fs, dir)
+          + "\n" + fsState);
+    }
+    metadataRequests.assertDiffEquals(2);
+    listRequests.assertDiffEquals(1);
+  }
+
+  @Test
+  public void testCostOfCopyFromLocalFile() throws Throwable {
+    describe("testCostOfCopyFromLocalFile");
+    File localTestDir = getTestDir("tmp");
+    localTestDir.mkdirs();
+    File tmpFile = File.createTempFile("tests3acost", ".txt",
+        localTestDir);
+    tmpFile.delete();
+    try {
+      URI localFileURI = tmpFile.toURI();
+      FileSystem localFS = FileSystem.get(localFileURI,
+          getFileSystem().getConf());
+      Path localPath = new Path(localFileURI);
+      int len = 10 * 1024;
+      byte[] data = dataset(len, 'A', 'Z');
+      writeDataset(localFS, localPath, data, len, 1024, true);
+      S3AFileSystem s3a = getFileSystem();
+      MetricDiff copyLocalOps = new MetricDiff(s3a,
+          INVOCATION_COPY_FROM_LOCAL_FILE);
+      MetricDiff putRequests = new MetricDiff(s3a,
+          OBJECT_PUT_REQUESTS);
+      MetricDiff putBytes = new MetricDiff(s3a,
+          OBJECT_PUT_BYTES);
+
+      Path remotePath = path("copied");
+      s3a.copyFromLocalFile(false, true, localPath, remotePath);
+      verifyFileContents(s3a, remotePath, data);
+      copyLocalOps.assertDiffEquals(1);
+      putRequests.assertDiffEquals(1);
+      putBytes.assertDiffEquals(len);
+      // print final stats
+      LOG.info("Filesystem {}", s3a);
+    } finally {
+      tmpFile.delete();
+    }
+  }
+}

+ 47 - 107
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java

@@ -34,13 +34,11 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.rules.TestName;
+import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.InputStream;
-import java.util.Locale;
-
-import static org.junit.Assume.assumeTrue;
 
 /**
  * Base class for scale tests; here is where the common scale configuration
@@ -51,11 +49,57 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
   @Rule
   public TestName methodName = new TestName();
 
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
   @BeforeClass
   public static void nameThread() {
     Thread.currentThread().setName("JUnit");
   }
 
+  /**
+   * The number of operations to perform: {@value}.
+   */
+  public static final String KEY_OPERATION_COUNT =
+      SCALE_TEST + "operation.count";
+
+  /**
+   * The number of directory operations to perform: {@value}.
+   */
+  public static final String KEY_DIRECTORY_COUNT =
+      SCALE_TEST + "directory.count";
+
+  /**
+   * The readahead buffer: {@value}.
+   */
+  public static final String KEY_READ_BUFFER_SIZE =
+      S3A_SCALE_TEST + "read.buffer.size";
+
+  public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
+
+  /**
+   * Key for a multi MB test file: {@value}.
+   */
+  public static final String KEY_CSVTEST_FILE =
+      S3A_SCALE_TEST + "csvfile";
+
+  /**
+   * Default path for the multi MB test file: {@value}.
+   */
+  public static final String DEFAULT_CSVTEST_FILE
+      = "s3a://landsat-pds/scene_list.gz";
+
+  /**
+   * The default number of operations to perform: {@value}.
+   */
+  public static final long DEFAULT_OPERATION_COUNT = 2005;
+
+  /**
+   * Default number of directories to create when performing
+   * directory performance/scale tests.
+   */
+  public static final int DEFAULT_DIRECTORY_COUNT = 2;
+
   protected S3AFileSystem fs;
 
   protected static final Logger LOG =
@@ -132,108 +176,4 @@ public class S3AScaleTestBase extends Assert implements S3ATestConstants {
     }
   }
 
-  /**
-   * Make times more readable, by adding a "," every three digits.
-   * @param nanos nanos or other large number
-   * @return a string for logging
-   */
-  protected static String toHuman(long nanos) {
-    return String.format(Locale.ENGLISH, "%,d", nanos);
-  }
-
-  /**
-   * Log the bandwidth of a timer as inferred from the number of
-   * bytes processed.
-   * @param timer timer
-   * @param bytes bytes processed in the time period
-   */
-  protected void bandwidth(NanoTimer timer, long bytes) {
-    LOG.info("Bandwidth = {}  MB/S",
-        timer.bandwidthDescription(bytes));
-  }
-
-  /**
-   * Work out the bandwidth in MB/s
-   * @param bytes bytes
-   * @param durationNS duration in nanos
-   * @return the number of megabytes/second of the recorded operation
-   */
-  public static double bandwidthMBs(long bytes, long durationNS) {
-    return (bytes * 1000.0 ) / durationNS;
-  }
-
-  /**
-   * A simple class for timing operations in nanoseconds, and for
-   * printing some useful results in the process.
-   */
-  protected static class NanoTimer {
-    final long startTime;
-    long endTime;
-
-    public NanoTimer() {
-      startTime = now();
-    }
-
-    /**
-     * End the operation
-     * @return the duration of the operation
-     */
-    public long end() {
-      endTime = now();
-      return duration();
-    }
-
-    /**
-     * End the operation; log the duration
-     * @param format message
-     * @param args any arguments
-     * @return the duration of the operation
-     */
-    public long end(String format, Object... args) {
-      long d = end();
-      LOG.info("Duration of {}: {} nS",
-          String.format(format, args), toHuman(d));
-      return d;
-    }
-
-    long now() {
-      return System.nanoTime();
-    }
-
-    long duration() {
-      return endTime - startTime;
-    }
-
-    double bandwidth(long bytes) {
-      return S3AScaleTestBase.bandwidthMBs(bytes, duration());
-    }
-
-    /**
-     * Bandwidth as bytes per second
-     * @param bytes bytes in
-     * @return the number of bytes per second this operation timed.
-     */
-    double bandwidthBytes(long bytes) {
-      return (bytes * 1.0 ) / duration();
-    }
-
-    /**
-     * How many nanoseconds per byte
-     * @param bytes bytes processed in this time period
-     * @return the nanoseconds it took each byte to be processed
-     */
-    long nanosPerByte(long bytes) {
-      return duration() / bytes;
-    }
-
-    /**
-     * Get a description of the bandwidth, even down to fractions of
-     * a MB
-     * @param bytes bytes processed
-     * @return bandwidth
-     */
-    String bandwidthDescription(long bytes) {
-      return String.format("%,.6f", bandwidth(bytes));
-    }
-  }
 }

+ 3 - 7
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java

@@ -20,9 +20,7 @@ package org.apache.hadoop.fs.s3a.scale;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -34,15 +32,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-import static org.junit.Assert.assertEquals;
-
+/**
+ * Test some scalable operations related to file renaming and deletion.
+ */
 public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
 
-  @Rule
-  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
-
   /**
    * CAUTION: If this test starts failing, please make sure that the
    * {@link org.apache.hadoop.fs.s3a.Constants#MAX_THREADS} configuration is not

+ 189 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADirectoryPerformance.java

@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Statistic;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.fs.s3a.Statistic.*;
+import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+
+/**
+ * Test the performance of listing files/directories.
+ */
+public class TestS3ADirectoryPerformance extends S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestS3ADirectoryPerformance.class);
+
+  @Test
+  public void testListOperations() throws Throwable {
+    describe("Test recursive list operations");
+    final Path scaleTestDir = getTestPath();
+    final Path listDir = new Path(scaleTestDir, "lists");
+
+    // scale factor.
+    int scale = getConf().getInt(KEY_DIRECTORY_COUNT, DEFAULT_DIRECTORY_COUNT);
+    int width = scale;
+    int depth = scale;
+    int files = scale;
+    MetricDiff metadataRequests = new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
+    MetricDiff listRequests = new MetricDiff(fs, OBJECT_LIST_REQUESTS);
+    MetricDiff listStatusCalls = new MetricDiff(fs, INVOCATION_LIST_FILES);
+    MetricDiff getFileStatusCalls =
+        new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
+    NanoTimer createTimer = new NanoTimer();
+    TreeScanResults created =
+        createSubdirs(fs, listDir, depth, width, files, 0);
+    // add some empty directories
+    int emptyDepth = 1 * scale;
+    int emptyWidth = 3 * scale;
+
+    created.add(createSubdirs(fs, listDir, emptyDepth, emptyWidth, 0,
+        0, "empty", "f-", ""));
+    createTimer.end("Time to create %s", created);
+    LOG.info("Time per operation: {}",
+        toHuman(createTimer.nanosPerOperation(created.totalCount())));
+    printThenReset(LOG,
+        metadataRequests,
+        listRequests,
+        listStatusCalls,
+        getFileStatusCalls);
+
+    try {
+      // Scan the directory via an explicit tree walk.
+      // This is the baseline for any listing speedups.
+      MetricDiff treewalkMetadataRequests =
+          new MetricDiff(fs, OBJECT_METADATA_REQUESTS);
+      MetricDiff treewalkListRequests = new MetricDiff(fs,
+          OBJECT_LIST_REQUESTS);
+      MetricDiff treewalkListStatusCalls = new MetricDiff(fs,
+          INVOCATION_LIST_FILES);
+      MetricDiff treewalkGetFileStatusCalls =
+          new MetricDiff(fs, INVOCATION_GET_FILE_STATUS);
+      NanoTimer treeWalkTimer = new NanoTimer();
+      TreeScanResults treewalkResults = treeWalk(fs, listDir);
+      treeWalkTimer.end("List status via treewalk");
+
+      print(LOG,
+          treewalkMetadataRequests,
+          treewalkListRequests,
+          treewalkListStatusCalls,
+          treewalkGetFileStatusCalls);
+      assertEquals("Files found in listFiles(recursive=true) " +
+              " created=" + created + " listed=" + treewalkResults,
+          created.getFileCount(), treewalkResults.getFileCount());
+
+
+      // listFiles() does the recursion internally
+      NanoTimer listFilesRecursiveTimer = new NanoTimer();
+
+      TreeScanResults listFilesResults = new TreeScanResults(
+          fs.listFiles(listDir, true));
+
+      listFilesRecursiveTimer.end("listFiles(recursive=true) of %s", created);
+      assertEquals("Files found in listFiles(recursive=true) " +
+          " created=" + created  + " listed=" + listFilesResults,
+          created.getFileCount(), listFilesResults.getFileCount());
+
+      treewalkListRequests.assertDiffEquals(listRequests);
+      printThenReset(LOG,
+          metadataRequests, listRequests,
+          listStatusCalls, getFileStatusCalls);
+
+      NanoTimer globStatusTimer = new NanoTimer();
+      FileStatus[] globStatusFiles = fs.globStatus(listDir);
+      globStatusTimer.end("Time to globStatus() %s", globStatusTimer);
+      LOG.info("Time for glob status {} entries: {}",
+          globStatusFiles.length,
+          toHuman(createTimer.duration()));
+      printThenReset(LOG,
+          metadataRequests,
+          listRequests,
+          listStatusCalls,
+          getFileStatusCalls);
+
+    } finally {
+      // deletion at the end of the run
+      NanoTimer deleteTimer = new NanoTimer();
+      fs.delete(listDir, true);
+      deleteTimer.end("Deleting directory tree");
+      printThenReset(LOG,
+          metadataRequests, listRequests,
+          listStatusCalls, getFileStatusCalls);
+    }
+  }
+
+  @Test
+  public void testTimeToStatEmptyDirectory() throws Throwable {
+    describe("Time to stat an empty directory");
+    Path path = new Path(getTestPath(), "empty");
+    fs.mkdirs(path);
+    timeToStatPath(path);
+  }
+
+  @Test
+  public void testTimeToStatNonEmptyDirectory() throws Throwable {
+    describe("Time to stat a non-empty directory");
+    Path path = new Path(getTestPath(), "dir");
+    fs.mkdirs(path);
+    touch(fs, new Path(path, "file"));
+    timeToStatPath(path);
+  }
+
+  @Test
+  public void testTimeToStatFile() throws Throwable {
+    describe("Time to stat a simple file");
+    Path path = new Path(getTestPath(), "file");
+    touch(fs, path);
+    timeToStatPath(path);
+  }
+
+  @Test
+  public void testTimeToStatRoot() throws Throwable {
+    describe("Time to stat the root path");
+    timeToStatPath(new Path("/"));
+  }
+
+  private void timeToStatPath(Path path) throws IOException {
+    describe("Timing getFileStatus(\"%s\")", path);
+    MetricDiff metadataRequests =
+        new MetricDiff(fs, Statistic.OBJECT_METADATA_REQUESTS);
+    MetricDiff listRequests =
+        new MetricDiff(fs, Statistic.OBJECT_LIST_REQUESTS);
+    long attempts = getOperationCount();
+    NanoTimer timer = new NanoTimer();
+    for (long l = 0; l < attempts; l++) {
+      fs.getFileStatus(path);
+    }
+    timer.end("Time to execute %d getFileStatusCalls", attempts);
+    LOG.info("Time per call: {}", toHuman(timer.nanosPerOperation(attempts)));
+    LOG.info("metadata: {}", metadataRequests);
+    LOG.info("metadata per operation {}", metadataRequests.diff() / attempts);
+    LOG.info("listObjects: {}", listRequests);
+    LOG.info("listObjects: per operation {}", listRequests.diff() / attempts);
+  }
+
+}

+ 4 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java

@@ -36,8 +36,10 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
+
 /**
- * Look at the performance of S3a operations
+ * Look at the performance of S3a operations.
  */
 public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(
@@ -151,7 +153,7 @@ public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
     readTimer.end("Time to read %d bytes", len);
     bandwidth(readTimer, count);
     assertEquals("Not enough bytes were read)", len, count);
-    long nanosPerByte = readTimer.nanosPerByte(count);
+    long nanosPerByte = readTimer.nanosPerOperation(count);
     LOG.info("An open() call has the equivalent duration of reading {} bytes",
         toHuman( timeOpen.duration() / nanosPerByte));
   }

+ 3 - 1
hadoop-tools/hadoop-aws/src/test/resources/log4j.properties

@@ -15,7 +15,9 @@ log4j.rootLogger=info,stdout
 log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} (%F:%M(%L)) - %m%n
+
+log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
 
 # for debugging low level S3a operations, uncomment this line
 # log4j.logger.org.apache.hadoop.fs.s3a=DEBUG