瀏覽代碼

HADOOP-13028 add low level counter metrics for S3A; use in read performance tests. contributed by: stevel
patch includes
HADOOP-12844 Recover when S3A fails on IOException in read()
HADOOP-13058 S3A FS fails during init against a read-only FS if multipart purge
HADOOP-13047 S3a Forward seek in stream length to be configurable

Steve Loughran 9 年之前
父節點
當前提交
27c4e90efc
共有 21 個文件被更改,包括 1753 次插入667 次删除
  1. 9 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
  2. 141 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java
  3. 1 1
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java
  4. 9 1
      hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
  5. 2 355
      hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml
  6. 2 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java
  7. 2 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java
  8. 4 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java
  9. 4 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java
  10. 17 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java
  11. 7 4
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java
  12. 11 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java
  13. 271 188
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
  14. 259 83
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
  15. 457 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
  16. 22 18
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java
  17. 65 3
      hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md
  18. 182 9
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java
  19. 0 1
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java
  20. 285 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java
  21. 3 0
      hadoop-tools/hadoop-aws/src/test/resources/log4j.properties

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java

@@ -234,4 +234,13 @@ public class FSDataInputStream extends DataInputStream
           "support unbuffering.");
     }
   }
+
+  /**
+   * String value. Includes the string value of the inner stream
+   * @return the stream
+   */
+  @Override
+  public String toString() {
+    return super.toString() + ": " + in;
+  }
 }

+ 141 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/MetricStringBuilder.java

@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Build a string dump of the metrics.
+ *
+ * The {@link #toString()} operator dumps out all values collected.
+ *
+ * Every entry is formatted as
+ * {@code prefix + name + separator + value + suffix}
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class MetricStringBuilder extends MetricsRecordBuilder {
+
+  private final StringBuilder builder = new StringBuilder(256);
+
+  private final String prefix;
+  private final String suffix;
+  private final String separator;
+  private final MetricsCollector parent;
+
+  /**
+   * Build an instance.
+   * @param parent parent collector. Unused in this instance; only used for
+   * the {@link #parent()} method
+   * @param prefix string before each entry
+   * @param separator separator between name and value
+   * @param suffix suffix after each entry
+   */
+  public MetricStringBuilder(MetricsCollector parent,
+      String prefix,
+      String separator,
+      String suffix) {
+    this.parent = parent;
+    this.prefix = prefix;
+    this.suffix = suffix;
+    this.separator = separator;
+  }
+
+  public MetricStringBuilder add(MetricsInfo info, Object value) {
+    return tuple(info.name(), value.toString());
+  }
+
+  /**
+   * Add any key,val pair to the string, between the prefix and suffix,
+   * separated by the separator.
+   * @param key key
+   * @param value value
+   * @return this instance
+   */
+  public MetricStringBuilder tuple(String key, String value) {
+    builder.append(prefix)
+        .append(key)
+        .append(separator)
+        .append(value)
+        .append(suffix);
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder tag(MetricsInfo info, String value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder add(MetricsTag tag) {
+    return tuple(tag.name(), tag.value());
+  }
+
+  @Override
+  public MetricsRecordBuilder add(AbstractMetric metric) {
+    add(metric.info(), metric.toString());
+    return this;
+  }
+
+  @Override
+  public MetricsRecordBuilder setContext(String value) {
+    return tuple("context", value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, int value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addCounter(MetricsInfo info, long value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, int value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, long value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, float value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsRecordBuilder addGauge(MetricsInfo info, double value) {
+    return add(info, value);
+  }
+
+  @Override
+  public MetricsCollector parent() {
+    return parent;
+  }
+
+  @Override
+  public String toString() {
+    return builder.toString();
+  }
+}

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableCounterLong.java

@@ -34,7 +34,7 @@ public class MutableCounterLong extends MutableCounter {
 
   private AtomicLong value = new AtomicLong();
 
-  MutableCounterLong(MetricsInfo info, long initValue) {
+  public MutableCounterLong(MetricsInfo info, long initValue) {
     super(info);
     this.value.set(initValue);
   }

+ 9 - 1
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -938,7 +938,15 @@
     uploading (fs.s3a.threads.max) or queueing (fs.s3a.max.total.tasks)</description>
 </property>
 
-  <property>
+<property>
+  <name>fs.s3a.readahead.range</name>
+  <value>65536</value>
+  <description>Bytes to read ahead during a seek() before closing and
+  re-opening the S3 HTTP connection. This option will be overridden if
+  any call to setReadahead() is made to an open stream.</description>
+</property>
+
+<property>
   <name>fs.s3a.fast.buffer.size</name>
   <value>1048576</value>
   <description>Size of initial memory buffer in bytes allocated for an

+ 2 - 355
hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml

@@ -15,361 +15,8 @@
    limitations under the License.
 -->
 <FindBugsFilter>
-     <Match>
-       <Package name="org.apache.hadoop.security.proto" />
-     </Match>
-     <Match>
-       <Package name="org.apache.hadoop.tools.proto" />
-     </Match>
-     <Match>
-       <Bug pattern="EI_EXPOSE_REP" />
-     </Match>
-     <Match>
-       <Bug pattern="EI_EXPOSE_REP2" />
-     </Match>
-     <Match>
-       <Bug pattern="SE_COMPARATOR_SHOULD_BE_SERIALIZABLE" />
-     </Match>
-     <Match>
-       <Class name="~.*_jsp" />
-       <Bug pattern="DLS_DEAD_LOCAL_STORE" />
-     </Match>
-     <Match>
-       <Field name="_jspx_dependants" />
-       <Bug pattern="UWF_UNWRITTEN_FIELD" />
-     </Match>
-     <!-- 
-       Inconsistent synchronization for Client.Connection.out is
-       is intentional to make a connection to be closed instantly. 
-     --> 
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="out" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-     <!-- 
-       Further SaslException should be ignored during cleanup and
-       original exception should be re-thrown.
-     --> 
-     <Match>
-       <Class name="org.apache.hadoop.security.SaslRpcClient" />
-       <Bug pattern="DE_MIGHT_IGNORE" />
-     </Match>
-     <!-- 
-       Ignore Cross Scripting Vulnerabilities
-     -->
-     <Match>
-       <Package name="~org.apache.hadoop.mapred.*" />
-       <Bug code="XSS" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.taskdetails_jsp" />
-       <Bug code="HRS" />
-     </Match>
-     <!--
-       Ignore warnings where child class has the same name as
-       super class. Classes based on Old API shadow names from
-       new API. Should go off after HADOOP-1.0
-     -->
-     <Match>
-       <Class name="~org.apache.hadoop.mapred.*" />
-       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
-     </Match>
-     <!--
-       Ignore warnings for usage of System.exit. This is
-       required and have been well thought out
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.Child$2" />
-       <Method name="run" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.JobTracker" />
-       <Method name="addHostToNodeMapping" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.Task" />
-       <Or>
-       <Method name="done" />
-       <Method name="commit" />
-       <Method name="statusUpdate" />
-       </Or>
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.Task$TaskReporter" />
-       <Method name="run" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.util.ProgramDriver" />
-       <Method name="driver" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.util.RunJar" />
-       <Method name="run" />
-       <Bug pattern="DM_EXIT" />
-     </Match>
-     <!--
-       We need to cast objects between old and new api objects
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.OutputCommitter" />
-       <Bug pattern="BC_UNCONFIRMED_CAST" />
-     </Match>
-     <!--
-       We intentionally do the get name from the inner class
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.TaskTracker$MapEventsFetcherThread" />
-       <Method name="run" />
-       <Bug pattern="IA_AMBIGUOUS_INVOCATION_OF_INHERITED_OR_OUTER_METHOD" />
-     </Match>
-     <Match>
-       <Class name="org.apache.hadoop.mapred.FileOutputCommitter" />
-       <Bug pattern="NM_WRONG_PACKAGE_INTENTIONAL" />
-     </Match>
-     <!--
-       Ignoring this warning as resolving this would need a non-trivial change in code 
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorBaseDescriptor" />
-       <Method name="configure" />
-       <Field name="maxNumItems" />
-       <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD" />
-     </Match>
-     <!--
-       Comes from org.apache.jasper.runtime.ResourceInjector. Cannot do much.
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.jobqueue_005fdetails_jsp" />
-       <Field name="_jspx_resourceInjector" />
-       <Bug pattern="SE_BAD_FIELD" />
-     </Match>
-     <!--
-       Storing textInputFormat and then passing it as a parameter. Safe to ignore.
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorJob" />
-       <Method name="createValueAggregatorJob" />
-       <Bug pattern="DLS_DEAD_STORE_OF_CLASS_LITERAL" />
-     </Match>
-     <!--
-       Can remove this after the upgrade to findbugs1.3.8
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.lib.db.DBInputFormat" />
-       <Method name="getSplits" />
-       <Bug pattern="DLS_DEAD_LOCAL_STORE" />
-     </Match>
-    <!--
-      This is a spurious warning. Just ignore
-    -->
-     <Match>
-       <Class name="org.apache.hadoop.mapred.MapTask$MapOutputBuffer" />
-       <Field name="kvindex" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-
-     <!-- 
-        core changes 
-     -->
-     <Match>
-       <Class name="~org.apache.hadoop.*" />
-       <Bug code="MS" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.fs.FileSystem" />
-       <Method name="checkPath" />
-       <Bug pattern="ES_COMPARING_STRINGS_WITH_EQ" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.io.Closeable" />
-       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_INTERFACE" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.security.AccessControlException" />
-       <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.util.ProcfsBasedProcessTree" />
-       <Bug pattern="DMI_HARDCODED_ABSOLUTE_FILENAME" />
-     </Match>
-
-     <!--
-       Streaming, Examples
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.streaming.StreamUtil$TaskId" />
-       <Bug pattern="URF_UNREAD_FIELD" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.examples.DBCountPageView" />
-       <Method name="verify" />
-       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.examples.ContextFactory" />
-       <Method name="setAttributes" />
-       <Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
-     </Match>
-
-     <!--
-       TFile
-     -->
-      <Match>
-       <Class name="org.apache.hadoop.io.file.tfile.Chunk$ChunkDecoder" />
-       <Method name="close" />
-       <Bug pattern="SR_NOT_CHECKED" />
-      </Match>
-    <!--
-      The purpose of skip() is to drain remaining bytes of the chunk-encoded
-	  stream (one chunk at a time). The termination condition is checked by
-	  checkEOF().
-    -->
-     <Match>
-       <Class name="org.apache.hadoop.io.file.tfile.Utils" />
-       <Method name="writeVLong" />
-       <Bug pattern="SF_SWITCH_FALLTHROUGH" />
-     </Match>
-    <!--
-	  The switch condition fall through is intentional and for performance
-	  purposes.
-    -->
-
-    <Match>
-      <Class name="org.apache.hadoop.log.EventCounter"/>
-      <!-- backward compatibility -->
-      <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
-    </Match>
-    <Match>
-      <Class name="org.apache.hadoop.metrics.jvm.EventCounter"/>
-      <!-- backward compatibility -->
-      <Bug pattern="NM_SAME_SIMPLE_NAME_AS_SUPERCLASS"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtobufRpcEngineProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.ProtocolInfoProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.IpcConnectionContextProtos.*"/>
-    </Match>
-        <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.RpcHeaderProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ha\.proto\.HAServiceProtocolProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ha\.proto\.ZKFCProtocolProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.security\.proto\.SecurityProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.protobuf\.TestProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.proto\.RefreshCallQueueProtocolProtos.*"/>
-    </Match>
-    <Match>
-      <!-- protobuf generated code -->
-      <Class name="~org\.apache\.hadoop\.ipc\.proto\.GenericRefreshProtocolProtos.*"/>
-    </Match>
-
-    <!--
-       Manually checked, misses child thread manually syncing on parent's intrinsic lock.
-    -->
-     <Match>
-       <Class name="org.apache.hadoop.metrics2.lib.MutableQuantiles" />
-       <Field name="previousSnapshot" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-     <!--
-       The method uses a generic type T that extends two other types
-       T1 and T2. Findbugs complains of a cast from T1 to T2.
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.fs.DelegationTokenRenewer" />
-       <Method name="removeRenewAction" />
-       <Bug pattern="BC_UNCONFIRMED_CAST" />
-     </Match>
-     
-     <!-- Inconsistent synchronization flagged by findbugs is not valid. -->
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Client$Connection" />
-       <Field name="in" />
-       <Bug pattern="IS2_INCONSISTENT_SYNC" />
-     </Match>
-     <!-- 
-       The switch condition for INITIATE is expected to fallthru to RESPONSE
-       to process initial sasl response token included in the INITIATE
-     -->
-     <Match>
-       <Class name="org.apache.hadoop.ipc.Server$Connection" />
-       <Method name="processSaslMessage" />
-       <Bug pattern="SF_SWITCH_FALLTHROUGH" />
-     </Match>
-
-     <!-- Synchronization performed on util.concurrent instance. -->
-     <Match>
-       <Class name="org.apache.hadoop.service.AbstractService" />
-       <Method name="stop" />
-       <Bug code="JLM" />
-     </Match>
-
-     <Match>
-       <Class name="org.apache.hadoop.service.AbstractService" />
-       <Method name="waitForServiceToStop" />
-       <Bug code="JLM" />
-     </Match>
-
-  <!--
-  OpenStack Swift FS module -closes streams in a different method
-  from where they are opened.
-  -->
-    <Match>
-      <Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
-      <Method name="uploadFileAttempt"/>
-      <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
-    </Match>
-    <Match>
-      <Class name="org.apache.hadoop.fs.swift.snative.SwiftNativeOutputStream"/>
-      <Method name="uploadFilePartAttempt"/>
-      <Bug pattern="OBL_UNSATISFIED_OBLIGATION"/>
-    </Match>
-
-     <!-- code from maven source, null value is checked at callee side. -->
-     <Match>
-       <Class name="org.apache.hadoop.util.ComparableVersion$ListItem" />
-       <Method name="compareTo" />
-       <Bug code="NP" />
-     </Match>
-
+  <!-- S3n warnings about malicious code aren't that relevant given its limited future. -->
   <Match>
-    <Class name="org.apache.hadoop.util.HttpExceptionUtils"/>
-    <Method name="validateResponse"/>
-    <Bug pattern="REC_CATCH_EXCEPTION"/>
+    <Class name="org.apache.hadoop.fs.s3.INode" />
   </Match>
-
 </FindBugsFilter>

+ 2 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/FileSystemStore.java

@@ -55,13 +55,13 @@ public interface FileSystemStore {
 
   /**
    * Delete everything. Used for testing.
-   * @throws IOException
+   * @throws IOException on any problem
    */
   void purge() throws IOException;
   
   /**
    * Diagnostic method to dump all INodes to the console.
-   * @throws IOException
+   * @throws IOException on any problem
    */
   void dump() throws IOException;
 }

+ 2 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3/S3Credentials.java

@@ -38,6 +38,8 @@ public class S3Credentials {
   private String secretAccessKey; 
 
   /**
+   * @param uri bucket URI optionally containing username and password.
+   * @param conf configuration
    * @throws IllegalArgumentException if credentials for S3 cannot be
    * determined.
    * @throws IOException if credential providers are misconfigured and we have

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/AnonymousAWSCredentialsProvider.java

@@ -21,7 +21,11 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.AnonymousAWSCredentials;
 import com.amazonaws.auth.AWSCredentials;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
+@InterfaceAudience.Private
+@InterfaceStability.Stable
 public class AnonymousAWSCredentialsProvider implements AWSCredentialsProvider {
   public AWSCredentials getCredentials() {
     return new AnonymousAWSCredentials();

+ 4 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BasicAWSCredentialsProvider.java

@@ -23,7 +23,11 @@ import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.auth.BasicAWSCredentials;
 import com.amazonaws.auth.AWSCredentials;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 
+@InterfaceAudience.Private
+@InterfaceStability.Stable
 public class BasicAWSCredentialsProvider implements AWSCredentialsProvider {
   private final String accessKey;
   private final String secretKey;

+ 17 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -18,7 +18,19 @@
 
 package org.apache.hadoop.fs.s3a;
 
-public class Constants {
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * All the constants used with the {@link S3AFileSystem}.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class Constants {
+
+  private Constants() {
+  }
+
   // s3 access key
   public static final String ACCESS_KEY = "fs.s3a.access.key";
 
@@ -124,4 +136,8 @@ public class Constants {
   public static final int S3A_DEFAULT_PORT = -1;
 
   public static final String USER_AGENT_PREFIX = "fs.s3a.user.agent.prefix";
+
+  /** read ahead buffer size to prevent connection re-establishments. */
+  public static final String READAHEAD_RANGE = "fs.s3a.readahead.range";
+  public static final long DEFAULT_READAHEAD_RANGE = 64 * 1024;
 }

+ 7 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java

@@ -37,6 +37,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.util.Progressable;
@@ -64,6 +65,7 @@ import java.util.concurrent.ExecutorService;
  * <p>
  * Unstable: statistics and error handling might evolve
  */
+@InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class S3AFastOutputStream extends OutputStream {
 
@@ -102,7 +104,8 @@ public class S3AFastOutputStream extends OutputStream {
    * @param partSize size of a single part in a multi-part upload (except
    * last part)
    * @param multiPartThreshold files at least this size use multi-part upload
-   * @throws IOException
+   * @param threadPoolExecutor thread factory
+   * @throws IOException on any problem
    */
   public S3AFastOutputStream(AmazonS3Client client, S3AFileSystem fs,
       String bucket, String key, Progressable progress,
@@ -159,7 +162,7 @@ public class S3AFastOutputStream extends OutputStream {
    * Writes a byte to the memory buffer. If this causes the buffer to reach
    * its limit, the actual upload is submitted to the threadpool.
    * @param b the int of which the lowest byte is written
-   * @throws IOException
+   * @throws IOException on any problem
    */
   @Override
   public synchronized void write(int b) throws IOException {
@@ -177,10 +180,10 @@ public class S3AFastOutputStream extends OutputStream {
    * @param b byte array containing
    * @param off offset in array where to start
    * @param len number of bytes to be written
-   * @throws IOException
+   * @throws IOException on any problem
    */
   @Override
-  public synchronized void write(byte b[], int off, int len)
+  public synchronized void write(byte[] b, int off, int len)
       throws IOException {
     if (b == null) {
       throw new NullPointerException();

+ 11 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java

@@ -17,9 +17,19 @@
  */
 package org.apache.hadoop.fs.s3a;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 
+/**
+ * File status for an S3A "file".
+ * Modification time is trouble, see {@link #getModificationTime()}.
+ *
+ * The subclass is private as it should not be created directly.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class S3AFileStatus extends FileStatus {
   private boolean isEmptyDirectory;
 
@@ -45,7 +55,7 @@ public class S3AFileStatus extends FileStatus {
     return System.getProperty("user.name");
   }
 
-  /** Compare if this object is equal to another object
+  /** Compare if this object is equal to another object.
    * @param   o the object to be compared.
    * @return  true if two file status has the same path name; false if not.
    */

+ 271 - 188
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.InterruptedIOException;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.Date;
@@ -56,8 +57,11 @@ import com.amazonaws.event.ProgressListener;
 import com.amazonaws.event.ProgressEvent;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
 
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -76,9 +80,24 @@ import static org.apache.hadoop.fs.s3a.Constants.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * The core S3A Filesystem implementation.
+ *
+ * This subclass is marked as private as code should not be creating it
+ * directly; use {@link FileSystem#get(Configuration)} and variants to
+ * create one.
+ *
+ * If cast to {@code S3AFileSystem}, extra methods and features may be accessed.
+ * Consider those private and unstable.
+ *
+ * Because it prints some of the state of the instrumentation,
+ * the output of {@link #toString()} must also be considered unstable.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class S3AFileSystem extends FileSystem {
   /**
-   * Default blocksize as used in blocksize and FS status queries
+   * Default blocksize as used in blocksize and FS status queries.
    */
   public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
   private URI uri;
@@ -94,6 +113,8 @@ public class S3AFileSystem extends FileSystem {
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
   private CannedAccessControlList cannedACL;
   private String serverSideEncryptionAlgorithm;
+  private S3AInstrumentation instrumentation;
+  private long readAhead;
 
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
@@ -105,10 +126,12 @@ public class S3AFileSystem extends FileSystem {
    */
   public void initialize(URI name, Configuration conf) throws IOException {
     super.initialize(name, conf);
+    setConf(conf);
+    instrumentation = new S3AInstrumentation(name);
 
     uri = URI.create(name.getScheme() + "://" + name.getAuthority());
-    workingDir = new Path("/user", System.getProperty("user.name")).makeQualified(this.uri,
-        this.getWorkingDirectory());
+    workingDir = new Path("/user", System.getProperty("user.name"))
+        .makeQualified(this.uri, this.getWorkingDirectory());
 
     AWSAccessKeys creds = getAWSAccessKeys(name, conf);
 
@@ -122,19 +145,20 @@ public class S3AFileSystem extends FileSystem {
     bucket = name.getHost();
 
     ClientConfiguration awsConf = new ClientConfiguration();
-    awsConf.setMaxConnections(conf.getInt(MAXIMUM_CONNECTIONS,
-      DEFAULT_MAXIMUM_CONNECTIONS));
+    awsConf.setMaxConnections(intOption(conf, MAXIMUM_CONNECTIONS,
+        DEFAULT_MAXIMUM_CONNECTIONS, 1));
     boolean secureConnections = conf.getBoolean(SECURE_CONNECTIONS,
         DEFAULT_SECURE_CONNECTIONS);
     awsConf.setProtocol(secureConnections ?  Protocol.HTTPS : Protocol.HTTP);
-    awsConf.setMaxErrorRetry(conf.getInt(MAX_ERROR_RETRIES,
-      DEFAULT_MAX_ERROR_RETRIES));
-    awsConf.setConnectionTimeout(conf.getInt(ESTABLISH_TIMEOUT,
-        DEFAULT_ESTABLISH_TIMEOUT));
-    awsConf.setSocketTimeout(conf.getInt(SOCKET_TIMEOUT,
-      DEFAULT_SOCKET_TIMEOUT));
+    awsConf.setMaxErrorRetry(intOption(conf, MAX_ERROR_RETRIES,
+        DEFAULT_MAX_ERROR_RETRIES, 0));
+    awsConf.setConnectionTimeout(intOption(conf, ESTABLISH_TIMEOUT,
+        DEFAULT_ESTABLISH_TIMEOUT, 0));
+    awsConf.setSocketTimeout(intOption(conf, SOCKET_TIMEOUT,
+        DEFAULT_SOCKET_TIMEOUT, 0));
     String signerOverride = conf.getTrimmed(SIGNING_ALGORITHM, "");
-    if(!signerOverride.isEmpty()) {
+    if (!signerOverride.isEmpty()) {
+      LOG.debug("Signer override = {}", signerOverride);
       awsConf.setSignerOverride(signerOverride);
     }
 
@@ -144,21 +168,23 @@ public class S3AFileSystem extends FileSystem {
 
     initAmazonS3Client(conf, credentials, awsConf);
 
-    maxKeys = conf.getInt(MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS);
+    maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
     partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
-      DEFAULT_MIN_MULTIPART_THRESHOLD);
-    enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
-
     if (partSize < 5 * 1024 * 1024) {
       LOG.error(MULTIPART_SIZE + " must be at least 5 MB");
       partSize = 5 * 1024 * 1024;
     }
 
+    multiPartThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
+        DEFAULT_MIN_MULTIPART_THRESHOLD);
     if (multiPartThreshold < 5 * 1024 * 1024) {
       LOG.error(MIN_MULTIPART_THRESHOLD + " must be at least 5 MB");
       multiPartThreshold = 5 * 1024 * 1024;
     }
+    //check but do not store the block size
+    longOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
+    enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
+    readAhead = longOption(conf, READAHEAD_RANGE, DEFAULT_READAHEAD_RANGE, 0);
 
     int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
     if (maxThreads < 2) {
@@ -180,19 +206,17 @@ public class S3AFileSystem extends FileSystem {
     initCannedAcls(conf);
 
     if (!s3.doesBucketExist(bucket)) {
-      throw new IOException("Bucket " + bucket + " does not exist");
+      throw new FileNotFoundException("Bucket " + bucket + " does not exist");
     }
 
     initMultipartUploads(conf);
 
     serverSideEncryptionAlgorithm = conf.get(SERVER_SIDE_ENCRYPTION_ALGORITHM);
 
-    setConf(conf);
   }
 
   void initProxySupport(Configuration conf, ClientConfiguration awsConf,
-      boolean secureConnections) throws IllegalArgumentException,
-      IllegalArgumentException {
+      boolean secureConnections) throws IllegalArgumentException {
     String proxyHost = conf.getTrimmed(PROXY_HOST, "");
     int proxyPort = conf.getInt(PROXY_PORT, -1);
     if (!proxyHost.isEmpty()) {
@@ -223,7 +247,8 @@ public class S3AFileSystem extends FileSystem {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Using proxy server {}:{} as user {} with password {} on " +
                 "domain {} as workstation {}", awsConf.getProxyHost(),
-            awsConf.getProxyPort(), String.valueOf(awsConf.getProxyUsername()),
+            awsConf.getProxyPort(),
+            String.valueOf(awsConf.getProxyUsername()),
             awsConf.getProxyPassword(), awsConf.getProxyDomain(),
             awsConf.getProxyWorkstation());
       }
@@ -258,7 +283,7 @@ public class S3AFileSystem extends FileSystem {
       AWSCredentialsProviderChain credentials, ClientConfiguration awsConf)
       throws IllegalArgumentException {
     s3 = new AmazonS3Client(credentials, awsConf);
-    String endPoint = conf.getTrimmed(ENDPOINT,"");
+    String endPoint = conf.getTrimmed(ENDPOINT, "");
     if (!endPoint.isEmpty()) {
       try {
         s3.setEndpoint(endPoint);
@@ -301,14 +326,25 @@ public class S3AFileSystem extends FileSystem {
 
   private void initMultipartUploads(Configuration conf) {
     boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
-      DEFAULT_PURGE_EXISTING_MULTIPART);
-    long purgeExistingMultipartAge = conf.getLong(PURGE_EXISTING_MULTIPART_AGE,
-      DEFAULT_PURGE_EXISTING_MULTIPART_AGE);
+        DEFAULT_PURGE_EXISTING_MULTIPART);
+    long purgeExistingMultipartAge = longOption(conf,
+        PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0);
 
     if (purgeExistingMultipart) {
-      Date purgeBefore = new Date(new Date().getTime() - purgeExistingMultipartAge*1000);
+      Date purgeBefore =
+          new Date(new Date().getTime() - purgeExistingMultipartAge * 1000);
 
-      transfers.abortMultipartUploads(bucket, purgeBefore);
+      try {
+        transfers.abortMultipartUploads(bucket, purgeBefore);
+      } catch (AmazonServiceException e) {
+        if (e.getStatusCode() == 403) {
+          instrumentation.errorIgnored();
+          LOG.debug("Failed to abort multipart uploads against {}," +
+              " FS may be read only", bucket, e);
+        } else {
+          throw e;
+        }
+      }
     }
   }
 
@@ -421,16 +457,15 @@ public class S3AFileSystem extends FileSystem {
   public FSDataInputStream open(Path f, int bufferSize)
       throws IOException {
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Opening '{}' for reading.", f);
-    }
+    LOG.debug("Opening '{}' for reading.", f);
     final FileStatus fileStatus = getFileStatus(f);
     if (fileStatus.isDirectory()) {
-      throw new FileNotFoundException("Can't open " + f + " because it is a directory");
+      throw new FileNotFoundException("Can't open " + f
+          + " because it is a directory");
     }
 
     return new FSDataInputStream(new S3AInputStream(bucket, pathToKey(f),
-      fileStatus.getLen(), s3, statistics));
+      fileStatus.getLen(), s3, statistics, instrumentation, readAhead));
   }
 
   /**
@@ -456,16 +491,26 @@ public class S3AFileSystem extends FileSystem {
     if (!overwrite && exists(f)) {
       throw new FileAlreadyExistsException(f + " already exists");
     }
+    instrumentation.fileCreated();
     if (getConf().getBoolean(FAST_UPLOAD, DEFAULT_FAST_UPLOAD)) {
       return new FSDataOutputStream(new S3AFastOutputStream(s3, this, bucket,
           key, progress, statistics, cannedACL,
           serverSideEncryptionAlgorithm, partSize, multiPartThreshold,
           threadPoolExecutor), statistics);
     }
-    // We pass null to FSDataOutputStream so it won't count writes that are being buffered to a file
-    return new FSDataOutputStream(new S3AOutputStream(getConf(), transfers, this,
-      bucket, key, progress, cannedACL, statistics,
-      serverSideEncryptionAlgorithm), null);
+    // We pass null to FSDataOutputStream so it won't count writes that
+    // are being buffered to a file
+    return new FSDataOutputStream(
+        new S3AOutputStream(getConf(),
+            transfers,
+            this,
+            bucket,
+            key,
+            progress,
+            cannedACL,
+            statistics,
+            serverSideEncryptionAlgorithm),
+        null);
   }
 
   /**
@@ -476,7 +521,7 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException indicating that append is not supported.
    */
   public FSDataOutputStream append(Path f, int bufferSize,
-    Progressable progress) throws IOException {
+      Progressable progress) throws IOException {
     throw new IOException("Not supported");
   }
 
@@ -501,17 +546,13 @@ public class S3AFileSystem extends FileSystem {
    * @return true if rename is successful
    */
   public boolean rename(Path src, Path dst) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Rename path {} to {}", src, dst);
-    }
+    LOG.debug("Rename path {} to {}", src, dst);
 
     String srcKey = pathToKey(src);
     String dstKey = pathToKey(dst);
 
     if (srcKey.isEmpty() || dstKey.isEmpty()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: src or dst are empty");
-      }
+      LOG.debug("rename: source {} or dest {}, is empty", srcKey, dstKey);
       return false;
     }
 
@@ -524,9 +565,8 @@ public class S3AFileSystem extends FileSystem {
     }
 
     if (srcKey.equals(dstKey)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: src and dst refer to the same file or directory");
-      }
+      LOG.debug("rename: src and dst refer to the same file or directory: {}",
+          dst);
       return srcStatus.isFile();
     }
 
@@ -535,9 +575,8 @@ public class S3AFileSystem extends FileSystem {
       dstStatus = getFileStatus(dst);
 
       if (srcStatus.isDirectory() && dstStatus.isFile()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("rename: src is a directory and dst is a file");
-        }
+        LOG.debug("rename: src {} is a directory and dst {} is a file",
+            src, dst);
         return false;
       }
 
@@ -545,6 +584,7 @@ public class S3AFileSystem extends FileSystem {
         return false;
       }
     } catch (FileNotFoundException e) {
+      LOG.debug("rename: destination path {} not found", dst);
       // Parent must exist
       Path parent = dst.getParent();
       if (!pathToKey(parent).isEmpty()) {
@@ -554,6 +594,8 @@ public class S3AFileSystem extends FileSystem {
             return false;
           }
         } catch (FileNotFoundException e2) {
+          LOG.debug("rename: destination path {} has no parent {}",
+              dst, parent);
           return false;
         }
       }
@@ -561,9 +603,7 @@ public class S3AFileSystem extends FileSystem {
 
     // Ok! Time to start
     if (srcStatus.isFile()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: renaming file " + src + " to " + dst);
-      }
+      LOG.debug("rename: renaming file {} to {}", src, dst);
       if (dstStatus != null && dstStatus.isDirectory()) {
         String newDstKey = dstKey;
         if (!newDstKey.endsWith("/")) {
@@ -572,15 +612,13 @@ public class S3AFileSystem extends FileSystem {
         String filename =
             srcKey.substring(pathToKey(src.getParent()).length()+1);
         newDstKey = newDstKey + filename;
-        copyFile(srcKey, newDstKey);
+        copyFile(srcKey, newDstKey, srcStatus.getLen());
       } else {
-        copyFile(srcKey, dstKey);
+        copyFile(srcKey, dstKey, srcStatus.getLen());
       }
       delete(src, false);
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("rename: renaming directory " + src + " to " + dst);
-      }
+      LOG.debug("rename: renaming directory {} to {}", src, dst);
 
       // This is a directory to directory copy
       if (!dstKey.endsWith("/")) {
@@ -593,14 +631,12 @@ public class S3AFileSystem extends FileSystem {
 
       //Verify dest is not a child of the source directory
       if (dstKey.startsWith(srcKey)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("cannot rename a directory to a subdirectory of self");
-        }
+        LOG.debug("cannot rename a directory {}" +
+              " to a subdirectory of self: {}", srcKey, dstKey);
         return false;
       }
 
-      List<DeleteObjectsRequest.KeyVersion> keysToDelete =
-        new ArrayList<>();
+      List<DeleteObjectsRequest.KeyVersion> keysToDelete = new ArrayList<>();
       if (dstStatus != null && dstStatus.isEmptyDirectory()) {
         // delete unnecessary fake directory.
         keysToDelete.add(new DeleteObjectsRequest.KeyVersion(dstKey));
@@ -618,7 +654,7 @@ public class S3AFileSystem extends FileSystem {
         for (S3ObjectSummary summary : objects.getObjectSummaries()) {
           keysToDelete.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
           String newDstKey = dstKey + summary.getKey().substring(srcKey.length());
-          copyFile(summary.getKey(), newDstKey);
+          copyFile(summary.getKey(), newDstKey, summary.getSize());
 
           if (keysToDelete.size() == MAX_ENTRIES_TO_DELETE) {
             removeKeys(keysToDelete, true);
@@ -657,6 +693,7 @@ public class S3AFileSystem extends FileSystem {
       DeleteObjectsRequest deleteRequest
           = new DeleteObjectsRequest(bucket).withKeys(keysToDelete);
       s3.deleteObjects(deleteRequest);
+      instrumentation.fileDeleted(keysToDelete.size());
       statistics.incrementWriteOps(1);
     } else {
       int writeops = 0;
@@ -666,7 +703,7 @@ public class S3AFileSystem extends FileSystem {
             new DeleteObjectRequest(bucket, keyVersion.getKey()));
         writeops++;
       }
-
+      instrumentation.fileDeleted(keysToDelete.size());
       statistics.incrementWriteOps(writeops);
     }
     if (clearKeys) {
@@ -684,25 +721,20 @@ public class S3AFileSystem extends FileSystem {
    * @throws IOException due to inability to delete a directory or file.
    */
   public boolean delete(Path f, boolean recursive) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Delete path " + f + " - recursive " + recursive);
-    }
+    LOG.debug("Delete path {} - recursive {}", f , recursive);
     S3AFileStatus status;
     try {
       status = getFileStatus(f);
     } catch (FileNotFoundException e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Couldn't delete " + f + " - does not exist");
-      }
+      LOG.debug("Couldn't delete {} - does not exist", f);
+      instrumentation.errorIgnored();
       return false;
     }
 
     String key = pathToKey(f);
 
     if (status.isDirectory()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("delete: Path is a directory");
-      }
+      LOG.debug("delete: Path is a directory: {}", f);
 
       if (!recursive && !status.isEmptyDirectory()) {
         throw new IOException("Path is a folder: " + f +
@@ -719,15 +751,12 @@ public class S3AFileSystem extends FileSystem {
       }
 
       if (status.isEmptyDirectory()) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Deleting fake empty directory");
-        }
+        LOG.debug("Deleting fake empty directory {}", key);
         s3.deleteObject(bucket, key);
+        instrumentation.directoryDeleted();
         statistics.incrementWriteOps(1);
       } else {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Getting objects for directory prefix " + key + " to delete");
-        }
+        LOG.debug("Getting objects for directory prefix {} to delete", key);
 
         ListObjectsRequest request = new ListObjectsRequest();
         request.setBucketName(bucket);
@@ -736,16 +765,13 @@ public class S3AFileSystem extends FileSystem {
         //request.setDelimiter("/");
         request.setMaxKeys(maxKeys);
 
-        List<DeleteObjectsRequest.KeyVersion> keys =
-          new ArrayList<>();
+        List<DeleteObjectsRequest.KeyVersion> keys = new ArrayList<>();
         ObjectListing objects = s3.listObjects(request);
         statistics.incrementReadOps(1);
         while (true) {
           for (S3ObjectSummary summary : objects.getObjectSummaries()) {
             keys.add(new DeleteObjectsRequest.KeyVersion(summary.getKey()));
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Got object to delete " + summary.getKey());
-            }
+            LOG.debug("Got object to delete {}", summary.getKey());
 
             if (keys.size() == MAX_ENTRIES_TO_DELETE) {
               removeKeys(keys, true);
@@ -764,10 +790,9 @@ public class S3AFileSystem extends FileSystem {
         }
       }
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("delete: Path is a file");
-      }
+      LOG.debug("delete: Path is a file");
       s3.deleteObject(bucket, key);
+      instrumentation.fileDeleted(1);
       statistics.incrementWriteOps(1);
     }
 
@@ -779,9 +804,7 @@ public class S3AFileSystem extends FileSystem {
   private void createFakeDirectoryIfNecessary(Path f) throws IOException {
     String key = pathToKey(f);
     if (!key.isEmpty() && !exists(f)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Creating new fake directory at " + f);
-      }
+      LOG.debug("Creating new fake directory at {}", f);
       createFakeDirectory(bucket, key);
     }
   }
@@ -798,9 +821,7 @@ public class S3AFileSystem extends FileSystem {
   public FileStatus[] listStatus(Path f) throws FileNotFoundException,
       IOException {
     String key = pathToKey(f);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("List status for path: " + f);
-    }
+    LOG.debug("List status for path: {}", f);
 
     final List<FileStatus> result = new ArrayList<FileStatus>();
     final FileStatus fileStatus =  getFileStatus(f);
@@ -816,9 +837,7 @@ public class S3AFileSystem extends FileSystem {
       request.setDelimiter("/");
       request.setMaxKeys(maxKeys);
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("listStatus: doing listObjects for directory " + key);
-      }
+      LOG.debug("listStatus: doing listObjects for directory {}", key);
 
       ObjectListing objects = s3.listObjects(request);
       statistics.incrementReadOps(1);
@@ -831,24 +850,18 @@ public class S3AFileSystem extends FileSystem {
           // Skip over keys that are ourselves and old S3N _$folder$ files
           if (keyPath.equals(fQualified) ||
               summary.getKey().endsWith(S3N_FOLDER_SUFFIX)) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Ignoring: " + keyPath);
-            }
+            LOG.debug("Ignoring: {}", keyPath);
             continue;
           }
 
           if (objectRepresentsDirectory(summary.getKey(), summary.getSize())) {
             result.add(new S3AFileStatus(true, true, keyPath));
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Adding: fd: " + keyPath);
-            }
+            LOG.debug("Adding: fd: {}", keyPath);
           } else {
             result.add(new S3AFileStatus(summary.getSize(),
                 dateToLong(summary.getLastModified()), keyPath,
                 getDefaultBlockSize(fQualified)));
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Adding: fi: " + keyPath);
-            }
+            LOG.debug("Adding: fi: {}", keyPath);
           }
         }
 
@@ -858,16 +871,11 @@ public class S3AFileSystem extends FileSystem {
             continue;
           }
           result.add(new S3AFileStatus(true, false, keyPath));
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Adding: rd: " + keyPath);
-          }
+          LOG.debug("Adding: rd: {}", keyPath);
         }
 
         if (objects.isTruncated()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("listStatus: list truncated - getting next batch");
-          }
-
+          LOG.debug("listStatus: list truncated - getting next batch");
           objects = s3.listNextBatchOfObjects(objects);
           statistics.incrementReadOps(1);
         } else {
@@ -875,9 +883,7 @@ public class S3AFileSystem extends FileSystem {
         }
       }
     } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding: rd (not a dir): " + f);
-      }
+      LOG.debug("Adding: rd (not a dir): {}", f);
       result.add(fileStatus);
     }
 
@@ -890,14 +896,14 @@ public class S3AFileSystem extends FileSystem {
    * Set the current working directory for the given file system. All relative
    * paths will be resolved relative to it.
    *
-   * @param new_dir the current working directory.
+   * @param newDir the current working directory.
    */
-  public void setWorkingDirectory(Path new_dir) {
-    workingDir = new_dir;
+  public void setWorkingDirectory(Path newDir) {
+    workingDir = newDir;
   }
 
   /**
-   * Get the current working directory for the given file system
+   * Get the current working directory for the given file system.
    * @return the directory pathname
    */
   public Path getWorkingDirectory() {
@@ -914,10 +920,7 @@ public class S3AFileSystem extends FileSystem {
   // TODO: If we have created an empty file at /foo/bar and we then call
   // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/?
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Making directory: " + f);
-    }
-
+    LOG.debug("Making directory: {}", f);
 
     try {
       FileStatus fileStatus = getFileStatus(f);
@@ -938,6 +941,7 @@ public class S3AFileSystem extends FileSystem {
                 fPart));
           }
         } catch (FileNotFoundException fnfe) {
+          instrumentation.errorIgnored();
         }
         fPart = fPart.getParent();
       } while (fPart != null);
@@ -957,10 +961,7 @@ public class S3AFileSystem extends FileSystem {
    */
   public S3AFileStatus getFileStatus(Path f) throws IOException {
     String key = pathToKey(f);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Getting path status for " + f + " (" + key + ")");
-    }
-
+    LOG.debug("Getting path status for {}  ({})", f , key);
 
     if (!key.isEmpty()) {
       try {
@@ -968,15 +969,11 @@ public class S3AFileSystem extends FileSystem {
         statistics.incrementReadOps(1);
 
         if (objectRepresentsDirectory(key, meta.getContentLength())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found exact file: fake directory");
-          }
+          LOG.debug("Found exact file: fake directory");
           return new S3AFileStatus(true, true,
               f.makeQualified(uri, workingDir));
         } else {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Found exact file: normal file");
-          }
+          LOG.debug("Found exact file: normal file");
           return new S3AFileStatus(meta.getContentLength(),
               dateToLong(meta.getLastModified()),
               f.makeQualified(uri, workingDir),
@@ -984,25 +981,23 @@ public class S3AFileSystem extends FileSystem {
         }
       } catch (AmazonServiceException e) {
         if (e.getStatusCode() != 404) {
-          printAmazonServiceException(e);
+          printAmazonServiceException(f.toString(), e);
           throw e;
         }
       } catch (AmazonClientException e) {
-        printAmazonClientException(e);
+        printAmazonClientException(f.toString(), e);
         throw e;
       }
 
       // Necessary?
       if (!key.endsWith("/")) {
+        String newKey = key + "/";
         try {
-          String newKey = key + "/";
           ObjectMetadata meta = s3.getObjectMetadata(bucket, newKey);
           statistics.incrementReadOps(1);
 
           if (objectRepresentsDirectory(newKey, meta.getContentLength())) {
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("Found file (with /): fake directory");
-            }
+            LOG.debug("Found file (with /): fake directory");
             return new S3AFileStatus(true, true, f.makeQualified(uri, workingDir));
           } else {
             LOG.warn("Found file (with /): real file? should not happen: {}", key);
@@ -1014,11 +1009,11 @@ public class S3AFileSystem extends FileSystem {
           }
         } catch (AmazonServiceException e) {
           if (e.getStatusCode() != 404) {
-            printAmazonServiceException(e);
+            printAmazonServiceException(newKey, e);
             throw e;
           }
         } catch (AmazonClientException e) {
-          printAmazonClientException(e);
+          printAmazonClientException(newKey, e);
           throw e;
         }
       }
@@ -1038,17 +1033,17 @@ public class S3AFileSystem extends FileSystem {
       statistics.incrementReadOps(1);
 
       if (!objects.getCommonPrefixes().isEmpty()
-          || objects.getObjectSummaries().size() > 0) {
+          || !objects.getObjectSummaries().isEmpty()) {
         if (LOG.isDebugEnabled()) {
-          LOG.debug("Found path as directory (with /): " +
-              objects.getCommonPrefixes().size() + "/" +
+          LOG.debug("Found path as directory (with /): {}/{}",
+              objects.getCommonPrefixes().size() ,
               objects.getObjectSummaries().size());
 
           for (S3ObjectSummary summary : objects.getObjectSummaries()) {
-            LOG.debug("Summary: " + summary.getKey() + " " + summary.getSize());
+            LOG.debug("Summary: {} {}", summary.getKey(), summary.getSize());
           }
           for (String prefix : objects.getCommonPrefixes()) {
-            LOG.debug("Prefix: " + prefix);
+            LOG.debug("Prefix: {}", prefix);
           }
         }
 
@@ -1060,17 +1055,15 @@ public class S3AFileSystem extends FileSystem {
       }
     } catch (AmazonServiceException e) {
       if (e.getStatusCode() != 404) {
-        printAmazonServiceException(e);
+        printAmazonServiceException(key, e);
         throw e;
       }
     } catch (AmazonClientException e) {
-      printAmazonClientException(e);
+      printAmazonClientException(key, e);
       throw e;
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Not Found: " + f);
-    }
+    LOG.debug("Not Found: {}", f);
     throw new FileNotFoundException("No such file or directory: " + f);
   }
 
@@ -1089,15 +1082,13 @@ public class S3AFileSystem extends FileSystem {
    */
   @Override
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
-    Path dst) throws IOException {
+      Path dst) throws IOException {
     String key = pathToKey(dst);
 
     if (!overwrite && exists(dst)) {
-      throw new IOException(dst + " already exists");
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Copying local file from " + src + " to " + dst);
+      throw new FileAlreadyExistsException(dst + " already exists");
     }
+    LOG.debug("Copying local file from {} to {}", src, dst);
 
     // Since we have a local file, we don't need to stream into a temporary file
     LocalFileSystem local = getLocal(getConf());
@@ -1123,13 +1114,14 @@ public class S3AFileSystem extends FileSystem {
       }
     };
 
+    statistics.incrementWriteOps(1);
     Upload up = transfers.upload(putObjectRequest);
     up.addProgressListener(progressListener);
     try {
       up.waitForUploadResult();
-      statistics.incrementWriteOps(1);
     } catch (InterruptedException e) {
-      throw new IOException("Got interrupted, cancelling");
+      throw new InterruptedIOException("Interrupted copying " + src
+          + " to "  + dst + ", cancelling");
     }
 
     // This will delete unnecessary fake parent directories
@@ -1153,7 +1145,7 @@ public class S3AFileSystem extends FileSystem {
   }
 
   /**
-  * Override getCononicalServiceName because we don't support token in S3A
+  * Override getCanonicalServiceName because we don't support token in S3A.
   */
   @Override
   public String getCanonicalServiceName() {
@@ -1161,17 +1153,17 @@ public class S3AFileSystem extends FileSystem {
     return null;
   }
 
-  private void copyFile(String srcKey, String dstKey) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("copyFile " + srcKey + " -> " + dstKey);
-    }
+  private void copyFile(String srcKey, String dstKey, long size)
+      throws IOException {
+    LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
 
     ObjectMetadata srcom = s3.getObjectMetadata(bucket, srcKey);
     ObjectMetadata dstom = cloneObjectMetadata(srcom);
     if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
       dstom.setSSEAlgorithm(serverSideEncryptionAlgorithm);
     }
-    CopyObjectRequest copyObjectRequest = new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
+    CopyObjectRequest copyObjectRequest =
+        new CopyObjectRequest(bucket, srcKey, bucket, dstKey);
     copyObjectRequest.setCannedAccessControlList(cannedACL);
     copyObjectRequest.setNewObjectMetadata(dstom);
 
@@ -1192,13 +1184,17 @@ public class S3AFileSystem extends FileSystem {
     try {
       copy.waitForCopyResult();
       statistics.incrementWriteOps(1);
+      instrumentation.filesCopied(1, size);
     } catch (InterruptedException e) {
-      throw new IOException("Got interrupted, cancelling");
+      throw new InterruptedIOException("Interrupted copying " + srcKey
+          + " to " + dstKey + ", cancelling");
     }
   }
 
   private boolean objectRepresentsDirectory(final String name, final long size) {
-    return !name.isEmpty() && name.charAt(name.length() - 1) == '/' && size == 0L;
+    return !name.isEmpty()
+        && name.charAt(name.length() - 1) == '/'
+        && size == 0L;
   }
 
   // Handles null Dates that can be returned by AWS
@@ -1216,8 +1212,9 @@ public class S3AFileSystem extends FileSystem {
 
   private void deleteUnnecessaryFakeDirectories(Path f) throws IOException {
     while (true) {
+      String key = "";
       try {
-        String key = pathToKey(f);
+        key = pathToKey(f);
         if (key.isEmpty()) {
           break;
         }
@@ -1225,13 +1222,13 @@ public class S3AFileSystem extends FileSystem {
         S3AFileStatus status = getFileStatus(f);
 
         if (status.isDirectory() && status.isEmptyDirectory()) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Deleting fake directory " + key + "/");
-          }
+          LOG.debug("Deleting fake directory {}/", key);
           s3.deleteObject(bucket, key + "/");
           statistics.incrementWriteOps(1);
         }
       } catch (FileNotFoundException | AmazonServiceException e) {
+        LOG.debug("While deleting key {} ", key, e);
+        instrumentation.errorIgnored();
       }
 
       if (f.isRoot()) {
@@ -1267,10 +1264,12 @@ public class S3AFileSystem extends FileSystem {
     if (StringUtils.isNotBlank(serverSideEncryptionAlgorithm)) {
       om.setSSEAlgorithm(serverSideEncryptionAlgorithm);
     }
-    PutObjectRequest putObjectRequest = new PutObjectRequest(bucketName, objectName, im, om);
+    PutObjectRequest putObjectRequest =
+        new PutObjectRequest(bucketName, objectName, im, om);
     putObjectRequest.setCannedAcl(cannedACL);
     s3.putObject(putObjectRequest);
     statistics.incrementWriteOps(1);
+    instrumentation.directoryCreated();
   }
 
   /**
@@ -1342,31 +1341,115 @@ public class S3AFileSystem extends FileSystem {
 
   /**
    * Return the number of bytes that large input files should be optimally
-   * be split into to minimize i/o time.
+   * be split into to minimize I/O time.
    * @deprecated use {@link #getDefaultBlockSize(Path)} instead
    */
   @Deprecated
   public long getDefaultBlockSize() {
-    // default to 32MB: large enough to minimize the impact of seeks
     return getConf().getLong(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
   }
 
-  private void printAmazonServiceException(AmazonServiceException ase) {
-    LOG.info("Caught an AmazonServiceException, which means your request made it " +
-        "to Amazon S3, but was rejected with an error response for some reason.");
-    LOG.info("Error Message: " + ase.getMessage());
-    LOG.info("HTTP Status Code: " + ase.getStatusCode());
-    LOG.info("AWS Error Code: " + ase.getErrorCode());
-    LOG.info("Error Type: " + ase.getErrorType());
-    LOG.info("Request ID: " + ase.getRequestId());
-    LOG.info("Class Name: " + ase.getClass().getName());
+  private void printAmazonServiceException(String target,
+      AmazonServiceException ase) {
+    LOG.info("{}: caught an AmazonServiceException {}", target, ase);
+    LOG.info("This means your request made it to Amazon S3," +
+        " but was rejected with an error response for some reason.");
+    LOG.info("Error Message: {}", ase.getMessage());
+    LOG.info("HTTP Status Code: {}", ase.getStatusCode());
+    LOG.info("AWS Error Code: {}", ase.getErrorCode());
+    LOG.info("Error Type: {}", ase.getErrorType());
+    LOG.info("Request ID: {}", ase.getRequestId());
+    LOG.info("Class Name: {}", ase.getClass().getName());
+    LOG.info("Exception", ase);
+  }
+
+  private void printAmazonClientException(String target,
+      AmazonClientException ace) {
+    LOG.info("{}: caught an AmazonClientException {}", target, ace);
+    LOG.info("This means the client encountered " +
+        "a problem while trying to communicate with S3, " +
+        "such as not being able to access the network.", ace);
   }
 
-  private void printAmazonClientException(AmazonClientException ace) {
-    LOG.info("Caught an AmazonClientException, which means the client encountered " +
-        "a serious internal problem while trying to communicate with S3, " +
-        "such as not being able to access the network.");
-    LOG.info("Error Message: {}" + ace, ace);
+  @Override
+  public String toString() {
+    final StringBuilder sb = new StringBuilder(
+        "S3AFileSystem{");
+    sb.append("uri=").append(uri);
+    sb.append(", workingDir=").append(workingDir);
+    sb.append(", partSize=").append(partSize);
+    sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
+    sb.append(", maxKeys=").append(maxKeys);
+    sb.append(", cannedACL=").append(cannedACL.toString());
+    sb.append(", readAhead=").append(readAhead);
+    sb.append(", blockSize=").append(getDefaultBlockSize());
+    sb.append(", multiPartThreshold=").append(multiPartThreshold);
+    if (serverSideEncryptionAlgorithm != null) {
+      sb.append(", serverSideEncryptionAlgorithm='")
+          .append(serverSideEncryptionAlgorithm)
+          .append('\'');
+    }
+    sb.append(", statistics {")
+        .append(statistics.toString())
+        .append("}");
+    sb.append(", metrics {")
+        .append(instrumentation.dump("{", "=", "} ", true))
+        .append("}");
+    sb.append('}');
+    return sb.toString();
+  }
+
+  /**
+   * Get the partition size for multipart operations.
+   * @return the value as set during initialization
+   */
+  public long getPartitionSize() {
+    return partSize;
+  }
+
+  /**
+   * Get the threshold for multipart files
+   * @return the value as set during initialization
+   */
+  public long getMultiPartThreshold() {
+    return multiPartThreshold;
+  }
+
+  /**
+   * Get a integer option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static int intOption(Configuration conf, String key, int defVal, int min) {
+    int v = conf.getInt(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    return v;
+  }
+
+  /**
+   * Get a long option >= the minimum allowed value.
+   * @param conf configuration
+   * @param key key to look up
+   * @param defVal default value
+   * @param min minimum value
+   * @return the value
+   * @throws IllegalArgumentException if the value is below the minimum
+   */
+  static long longOption(Configuration conf,
+      String key,
+      long defVal,
+      long min) {
+    long v = conf.getLong(key, defVal);
+    Preconditions.checkArgument(v >= min,
+        String.format("Value of %s: %d is below the minimum value %d",
+            key, v, min));
+    return v;
   }
 
   /**

+ 259 - 83
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -21,20 +21,50 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.fs.FileSystem;
-
 import org.slf4j.Logger;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
-import java.net.SocketException;
 
-public class S3AInputStream extends FSInputStream {
+/**
+ * The input stream for an S3A object.
+ *
+ * As this stream seeks withing an object, it may close then re-open the stream.
+ * When this happens, any updated stream data may be retrieved, and, given
+ * the consistency model of Amazon S3, outdated data may in fact be picked up.
+ *
+ * As a result, the outcome of reading from a stream of an object which is
+ * actively manipulated during the read process is "undefined".
+ *
+ * The class is marked as private as code should not be creating instances
+ * themselves. Any extra feature (e.g instrumentation) should be considered
+ * unstable.
+ *
+ * Because it prints some of the state of the instrumentation,
+ * the output of {@link #toString()} must also be considered unstable.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class S3AInputStream extends FSInputStream implements CanSetReadahead {
+  /**
+   * This is the public position; the one set in {@link #seek(long)}
+   * and returned in {@link #getPos()}.
+   */
   private long pos;
-  private boolean closed;
+  /**
+   * Closed bit. Volatile so reads are non-blocking.
+   * Updates must be in a synchronized block to guarantee an atomic check and
+   * set
+   */
+  private volatile boolean closed;
   private S3ObjectInputStream wrappedStream;
   private final FileSystem.Statistics stats;
   private final AmazonS3Client client;
@@ -44,62 +74,65 @@ public class S3AInputStream extends FSInputStream {
   private final String uri;
   public static final Logger LOG = S3AFileSystem.LOG;
   public static final long CLOSE_THRESHOLD = 4096;
+  private final S3AInstrumentation.InputStreamStatistics streamStatistics;
+  private long readahead;
 
-  // Used by lazy seek
+  /**
+   * This is the actual position within the object, used by
+   * lazy seek to decide whether to seek on the next read or not.
+   */
   private long nextReadPos;
 
-  //Amount of data requested from the request
+  /* Amount of data desired from the request */
   private long requestedStreamLen;
 
-  public S3AInputStream(String bucket, String key, long contentLength,
-      AmazonS3Client client, FileSystem.Statistics stats) {
+  public S3AInputStream(String bucket,
+      String key,
+      long contentLength,
+      AmazonS3Client client,
+      FileSystem.Statistics stats,
+      S3AInstrumentation instrumentation,
+      long readahead) {
+    Preconditions.checkArgument(StringUtils.isNotEmpty(bucket), "No Bucket");
+    Preconditions.checkArgument(StringUtils.isNotEmpty(key), "No Key");
+    Preconditions.checkArgument(contentLength >= 0 , "Negative content length");
     this.bucket = bucket;
     this.key = key;
     this.contentLength = contentLength;
     this.client = client;
     this.stats = stats;
-    this.pos = 0;
-    this.nextReadPos = 0;
-    this.closed = false;
-    this.wrappedStream = null;
     this.uri = "s3a://" + this.bucket + "/" + this.key;
+    this.streamStatistics = instrumentation.newInputStreamStatistics();
+    setReadahead(readahead);
   }
 
   /**
    * Opens up the stream at specified target position and for given length.
    *
+   * @param reason reason for reopen
    * @param targetPos target position
    * @param length length requested
    * @throws IOException
    */
-  private synchronized void reopen(long targetPos, long length)
+  private synchronized void reopen(String reason, long targetPos, long length)
       throws IOException {
-    requestedStreamLen = (length < 0) ? this.contentLength :
-        Math.max(this.contentLength, (CLOSE_THRESHOLD + (targetPos + length)));
+    requestedStreamLen = this.contentLength;
 
     if (wrappedStream != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Closing the previous stream");
-      }
-      closeStream(requestedStreamLen);
-    }
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Requesting for "
-          + "targetPos=" + targetPos
-          + ", length=" + length
-          + ", requestedStreamLen=" + requestedStreamLen
-          + ", streamPosition=" + pos
-          + ", nextReadPosition=" + nextReadPos
-      );
+      closeStream("reopen(" + reason + ")", requestedStreamLen);
     }
+    LOG.debug("reopen({}) for {} at targetPos={}, length={}," +
+        " requestedStreamLen={}, streamPosition={}, nextReadPosition={}",
+        uri, reason, targetPos, length, requestedStreamLen, pos, nextReadPos);
 
+    streamStatistics.streamOpened();
     GetObjectRequest request = new GetObjectRequest(bucket, key)
         .withRange(targetPos, requestedStreamLen);
     wrappedStream = client.getObject(request).getObjectContent();
 
     if (wrappedStream == null) {
-      throw new IOException("Null IO stream");
+      throw new IOException("Null IO stream from reopen of (" + reason +  ") "
+          + uri);
     }
 
     this.pos = targetPos;
@@ -128,6 +161,20 @@ public class S3AInputStream extends FSInputStream {
     nextReadPos = targetPos;
   }
 
+  /**
+   * Seek without raising any exception. This is for use in
+   * {@code finally} clauses
+   * @param positiveTargetPos a target position which must be positive.
+   */
+  private void seekQuietly(long positiveTargetPos) {
+    try {
+      seek(positiveTargetPos);
+    } catch (IOException ioe) {
+      LOG.debug("Ignoring IOE on seek of {} to {}",
+          uri, positiveTargetPos, ioe);
+    }
+  }
+
   /**
    * Adjust the stream to a specific position.
    *
@@ -140,23 +187,50 @@ public class S3AInputStream extends FSInputStream {
     if (wrappedStream == null) {
       return;
     }
-
     // compute how much more to skip
     long diff = targetPos - pos;
-    if (targetPos > pos) {
-      if ((diff + length) <= wrappedStream.available()) {
-        // already available in buffer
-        pos += wrappedStream.skip(diff);
-        if (pos != targetPos) {
-          throw new IOException("Failed to seek to " + targetPos
-              + ". Current position " + pos);
+    if (diff > 0) {
+      // forward seek -this is where data can be skipped
+
+      int available = wrappedStream.available();
+      // always seek at least as far as what is available
+      long forwardSeekRange = Math.max(readahead, available);
+      // work out how much is actually left in the stream
+      // then choose whichever comes first: the range or the EOF
+      long forwardSeekLimit = Math.min(remaining(), forwardSeekRange);
+      if (diff <= forwardSeekLimit) {
+        // the forward seek range is within the limits
+        LOG.debug("Forward seek on {}, of {} bytes", uri, diff);
+        streamStatistics.seekForwards(diff);
+        long skipped = wrappedStream.skip(diff);
+        if (skipped > 0) {
+          pos += skipped;
+          // as these bytes have been read, they are included in the counter
+          incrementBytesRead(diff);
+        }
+
+        if (pos == targetPos) {
+          // all is well
+          return;
+        } else {
+          // log a warning; continue to attempt to re-open
+          LOG.warn("Failed to seek on {} to {}. Current position {}",
+              uri, targetPos,  pos);
         }
-        return;
       }
+    } else if (diff < 0) {
+      // backwards seek
+      streamStatistics.seekBackwards(diff);
+    } else {
+      // targetPos == pos
+      // this should never happen as the caller filters it out.
+      // Retained just in case
+      LOG.debug("Ignoring seek {} to {} as target position == current",
+          uri, targetPos);
     }
 
     // close the stream; if read the object will be opened at the new pos
-    closeStream(this.requestedStreamLen);
+    closeStream("seekInStream()", this.requestedStreamLen);
     pos = targetPos;
   }
 
@@ -179,7 +253,19 @@ public class S3AInputStream extends FSInputStream {
 
     //re-open at specific location if needed
     if (wrappedStream == null) {
-      reopen(targetPos, len);
+      reopen("read from new offset", targetPos, len);
+    }
+  }
+
+  /**
+   * Increment the bytes read counter if there is a stats instance
+   * and the number of bytes read is more than zero.
+   * @param bytesRead number of bytes read
+   */
+  private void incrementBytesRead(long bytesRead) {
+    streamStatistics.bytesRead(bytesRead);
+    if (stats != null && bytesRead > 0) {
+      stats.incrementBytesRead(bytesRead);
     }
   }
 
@@ -195,13 +281,11 @@ public class S3AInputStream extends FSInputStream {
     int byteRead;
     try {
       byteRead = wrappedStream.read();
-    } catch (SocketTimeoutException | SocketException e) {
-      LOG.info("Got exception while trying to read from stream,"
-          + " trying to recover " + e);
-      reopen(pos, 1);
-      byteRead = wrappedStream.read();
     } catch (EOFException e) {
       return -1;
+    } catch (IOException e) {
+      onReadFailure(e, 1);
+      byteRead = wrappedStream.read();
     }
 
     if (byteRead >= 0) {
@@ -209,12 +293,36 @@ public class S3AInputStream extends FSInputStream {
       nextReadPos++;
     }
 
-    if (stats != null && byteRead >= 0) {
-      stats.incrementBytesRead(1);
+    if (byteRead >= 0) {
+      incrementBytesRead(1);
     }
     return byteRead;
   }
 
+  /**
+   * Handle an IOE on a read by attempting to re-open the stream.
+   * The filesystem's readException count will be incremented.
+   * @param ioe exception caught.
+   * @param length length of data being attempted to read
+   * @throws IOException any exception thrown on the re-open attempt.
+   */
+  private void onReadFailure(IOException ioe, int length) throws IOException {
+    LOG.info("Got exception while trying to read from stream {}"
+        + " trying to recover: "+ ioe, uri);
+    LOG.debug("While trying to read from stream {}", uri, ioe);
+    streamStatistics.readException();
+    reopen("failure recovery", pos, length);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * This updates the statistics on read operations started and whether
+   * or not the read operation "completed", that is: returned the exact
+   * number of bytes requested.
+   * @throws EOFException if there is no more data
+   * @throws IOException if there are other problems
+   */
   @Override
   public synchronized int read(byte[] buf, int off, int len)
       throws IOException {
@@ -230,61 +338,85 @@ public class S3AInputStream extends FSInputStream {
     }
 
     lazySeek(nextReadPos, len);
+    streamStatistics.readOperationStarted(nextReadPos, len);
 
-    int byteRead;
+    int bytesRead;
     try {
-      byteRead = wrappedStream.read(buf, off, len);
-    } catch (SocketTimeoutException | SocketException e) {
-      LOG.info("Got exception while trying to read from stream,"
-          + " trying to recover " + e);
-      reopen(pos, len);
-      byteRead = wrappedStream.read(buf, off, len);
-    }
-
-    if (byteRead > 0) {
-      pos += byteRead;
-      nextReadPos += byteRead;
+      bytesRead = wrappedStream.read(buf, off, len);
+    } catch (EOFException e) {
+      throw e;
+    } catch (IOException e) {
+      onReadFailure(e, len);
+      bytesRead = wrappedStream.read(buf, off, len);
     }
 
-    if (stats != null && byteRead > 0) {
-      stats.incrementBytesRead(byteRead);
+    if (bytesRead > 0) {
+      pos += bytesRead;
+      nextReadPos += bytesRead;
     }
-
-    return byteRead;
+    incrementBytesRead(bytesRead);
+    streamStatistics.readOperationCompleted(len, bytesRead);
+    return bytesRead;
   }
 
+  /**
+   * Verify that the input stream is open. Non blocking; this gives
+   * the last state of the volatile {@link #closed} field.
+   * @throws IOException if the connection is closed.
+   */
   private void checkNotClosed() throws IOException {
     if (closed) {
-      throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      throw new IOException(uri + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
     }
   }
 
+  /**
+   * Close the stream.
+   * This triggers publishing of the stream statistics back to the filesystem
+   * statistics.
+   * This operation is synchronized, so that only one thread can attempt to
+   * close the connection; all later/blocked calls are no-ops.
+   * @throws IOException on any problem
+   */
   @Override
   public synchronized void close() throws IOException {
-    super.close();
-    closed = true;
-    closeStream(this.contentLength);
+    if (!closed) {
+      closed = true;
+      try {
+        // close or abort the stream
+        closeStream("close() operation", this.contentLength);
+        // this is actually a no-op
+        super.close();
+      } finally {
+        // merge the statistics back into the FS statistics.
+        streamStatistics.close();
+      }
+    }
   }
 
   /**
    * Close a stream: decide whether to abort or close, based on
    * the length of the stream and the current position.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
    *
    * This does not set the {@link #closed} flag.
+   *
+   * @param reason reason for stream being closed; used in messages
    * @param length length of the stream.
-   * @throws IOException
    */
-  private void closeStream(long length) throws IOException {
+  private void closeStream(String reason, long length) {
     if (wrappedStream != null) {
-      String reason = null;
       boolean shouldAbort = length - pos > CLOSE_THRESHOLD;
       if (!shouldAbort) {
         try {
-          reason = "Closed stream";
+          // clean close. This will read to the end of the stream,
+          // so, while cleaner, can be pathological on a multi-GB object
           wrappedStream.close();
+          streamStatistics.streamClose(false);
         } catch (IOException e) {
           // exception escalates to an abort
-          LOG.debug("When closing stream", e);
+          LOG.debug("When closing {} stream for {}", uri, reason, e);
           shouldAbort = true;
         }
       }
@@ -292,13 +424,12 @@ public class S3AInputStream extends FSInputStream {
         // Abort, rather than just close, the underlying stream.  Otherwise, the
         // remaining object payload is read from S3 while closing the stream.
         wrappedStream.abort();
-        reason = "Closed stream with abort";
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(reason + "; streamPos=" + pos
-            + ", nextReadPos=" + nextReadPos
-            + ", contentLength=" + length);
+        streamStatistics.streamClose(true);
       }
+      LOG.debug("Stream {} {}: {}; streamPos={}, nextReadPos={}," +
+          " length={}",
+          uri, (shouldAbort ? "aborted":"closed"), reason, pos, nextReadPos,
+          length);
       wrappedStream = null;
     }
   }
@@ -307,19 +438,34 @@ public class S3AInputStream extends FSInputStream {
   public synchronized int available() throws IOException {
     checkNotClosed();
 
-    long remaining = this.contentLength - this.pos;
+    long remaining = remaining();
     if (remaining > Integer.MAX_VALUE) {
       return Integer.MAX_VALUE;
     }
     return (int)remaining;
   }
 
+  /**
+   * Bytes left in stream.
+   * @return how many bytes are left to read
+   */
+  protected long remaining() {
+    return this.contentLength - this.pos;
+  }
+
   @Override
   public boolean markSupported() {
     return false;
   }
 
+  /**
+   * String value includes statistics as well as stream state.
+   * <b>Important: there are no guarantees as to the stability
+   * of this value.</b>
+   * @return a string value for printing in logs/diagnostics
+   */
   @Override
+  @InterfaceStability.Unstable
   public String toString() {
     final StringBuilder sb = new StringBuilder(
         "S3AInputStream{");
@@ -327,6 +473,7 @@ public class S3AInputStream extends FSInputStream {
     sb.append(" pos=").append(pos);
     sb.append(" nextReadPos=").append(nextReadPos);
     sb.append(" contentLength=").append(contentLength);
+    sb.append(" ").append(streamStatistics.toString());
     sb.append('}');
     return sb.toString();
   }
@@ -348,6 +495,7 @@ public class S3AInputStream extends FSInputStream {
       throws IOException {
     checkNotClosed();
     validatePositionedReadArgs(position, buffer, offset, length);
+    streamStatistics.readFullyOperationStarted(position, length);
     if (length == 0) {
       return;
     }
@@ -363,10 +511,38 @@ public class S3AInputStream extends FSInputStream {
           }
           nread += nbytes;
         }
-
       } finally {
-        seek(oldPos);
+        seekQuietly(oldPos);
       }
     }
   }
+
+  /**
+   * Access the input stream statistics.
+   * This is for internal testing and may be removed without warning.
+   * @return the statistics for this input stream
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public S3AInstrumentation.InputStreamStatistics getS3AStreamStatistics() {
+    return streamStatistics;
+  }
+
+  @Override
+  public void setReadahead(Long readahead) {
+    if (readahead == null) {
+      this.readahead = Constants.DEFAULT_READAHEAD_RANGE;
+    } else {
+      Preconditions.checkArgument(readahead >= 0, "Negative readahead value");
+      this.readahead = readahead;
+    }
+  }
+
+  /**
+   * Get the current readahead value.
+   * @return a non-negative readahead value
+   */
+  public long getReadahead() {
+    return readahead;
+  }
 }

+ 457 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.MetricStringBuilder;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Instrumentation of S3a.
+ * Derived from the {@code AzureFileSystemInstrumentation}
+ */
+@Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class S3AInstrumentation {
+  public static final String CONTEXT = "S3AFileSystem";
+
+  public static final String STREAM_OPENED = "streamOpened";
+  public static final String STREAM_CLOSE_OPERATIONS = "streamCloseOperations";
+  public static final String STREAM_CLOSED = "streamClosed";
+  public static final String STREAM_ABORTED = "streamAborted";
+  public static final String STREAM_READ_EXCEPTIONS = "streamReadExceptions";
+  public static final String STREAM_SEEK_OPERATIONS = "streamSeekOperations";
+  public static final String STREAM_FORWARD_SEEK_OPERATIONS
+      = "streamForwardSeekOperations";
+  public static final String STREAM_BACKWARD_SEEK_OPERATIONS
+      = "streamBackwardSeekOperations";
+  public static final String STREAM_SEEK_BYTES_SKIPPED =
+      "streamBytesSkippedOnSeek";
+  public static final String STREAM_SEEK_BYTES_BACKWARDS =
+      "streamBytesBackwardsOnSeek";
+  public static final String STREAM_SEEK_BYTES_READ = "streamBytesRead";
+  public static final String STREAM_READ_OPERATIONS = "streamReadOperations";
+  public static final String STREAM_READ_FULLY_OPERATIONS
+      = "streamReadFullyOperations";
+  public static final String STREAM_READ_OPERATIONS_INCOMPLETE
+      = "streamReadOperationsIncomplete";
+  public static final String FILES_CREATED = "files_created";
+  public static final String FILES_COPIED = "files_copied";
+  public static final String FILES_COPIED_BYTES = "files_copied_bytes";
+  public static final String FILES_DELETED = "files_deleted";
+  public static final String DIRECTORIES_CREATED = "directories_created";
+  public static final String DIRECTORIES_DELETED = "directories_deleted";
+  public static final String IGNORED_ERRORS = "ignored_errors";
+  private final MetricsRegistry registry =
+      new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
+  private final MutableCounterLong streamOpenOperations;
+  private final MutableCounterLong streamCloseOperations;
+  private final MutableCounterLong streamClosed;
+  private final MutableCounterLong streamAborted;
+  private final MutableCounterLong streamSeekOperations;
+  private final MutableCounterLong streamReadExceptions;
+  private final MutableCounterLong streamForwardSeekOperations;
+  private final MutableCounterLong streamBackwardSeekOperations;
+  private final MutableCounterLong streamBytesSkippedOnSeek;
+  private final MutableCounterLong streamBytesBackwardsOnSeek;
+  private final MutableCounterLong streamBytesRead;
+  private final MutableCounterLong streamReadOperations;
+  private final MutableCounterLong streamReadFullyOperations;
+  private final MutableCounterLong streamReadsIncomplete;
+  private final MutableCounterLong ignoredErrors;
+
+  private final MutableCounterLong numberOfFilesCreated;
+  private final MutableCounterLong numberOfFilesCopied;
+  private final MutableCounterLong bytesOfFilesCopied;
+  private final MutableCounterLong numberOfFilesDeleted;
+  private final MutableCounterLong numberOfDirectoriesCreated;
+  private final MutableCounterLong numberOfDirectoriesDeleted;
+  private final Map<String, MutableCounterLong> streamMetrics = new HashMap<>();
+
+  public S3AInstrumentation(URI name) {
+    UUID fileSystemInstanceId = UUID.randomUUID();
+    registry.tag("FileSystemId",
+        "A unique identifier for the FS ",
+        fileSystemInstanceId.toString() + "-" + name.getHost());
+    registry.tag("fsURI",
+        "URI of this filesystem",
+        name.toString());
+    streamOpenOperations = streamCounter(STREAM_OPENED,
+        "Total count of times an input stream to object store was opened");
+    streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS,
+        "Total count of times an attempt to close a data stream was made");
+    streamClosed = streamCounter(STREAM_CLOSED,
+        "Count of times the TCP stream was closed");
+    streamAborted = streamCounter(STREAM_ABORTED,
+        "Count of times the TCP stream was aborted");
+    streamSeekOperations = streamCounter(STREAM_SEEK_OPERATIONS,
+        "Number of seek operations invoked on input streams");
+    streamReadExceptions = streamCounter(STREAM_READ_EXCEPTIONS,
+        "Number of read exceptions caught and attempted to recovered from");
+    streamForwardSeekOperations = streamCounter(STREAM_FORWARD_SEEK_OPERATIONS,
+        "Number of executed seek operations which went forward in a stream");
+    streamBackwardSeekOperations = streamCounter(
+        STREAM_BACKWARD_SEEK_OPERATIONS,
+        "Number of executed seek operations which went backwards in a stream");
+    streamBytesSkippedOnSeek = streamCounter(STREAM_SEEK_BYTES_SKIPPED,
+        "Count of bytes skipped during forward seek operations");
+    streamBytesBackwardsOnSeek = streamCounter(STREAM_SEEK_BYTES_BACKWARDS,
+        "Count of bytes moved backwards during seek operations");
+    streamBytesRead = streamCounter(STREAM_SEEK_BYTES_READ,
+        "Count of bytes read during seek() in stream operations");
+    streamReadOperations = streamCounter(STREAM_READ_OPERATIONS,
+        "Count of read() operations in streams");
+    streamReadFullyOperations = streamCounter(STREAM_READ_FULLY_OPERATIONS,
+        "Count of readFully() operations in streams");
+    streamReadsIncomplete = streamCounter(STREAM_READ_OPERATIONS_INCOMPLETE,
+        "Count of incomplete read() operations in streams");
+
+    numberOfFilesCreated = counter(FILES_CREATED,
+            "Total number of files created through the object store.");
+    numberOfFilesCopied = counter(FILES_COPIED,
+            "Total number of files copied within the object store.");
+    bytesOfFilesCopied = counter(FILES_COPIED_BYTES,
+            "Total number of bytes copied within the object store.");
+    numberOfFilesDeleted = counter(FILES_DELETED,
+            "Total number of files deleted through from the object store.");
+    numberOfDirectoriesCreated = counter(DIRECTORIES_CREATED,
+        "Total number of directories created through the object store.");
+    numberOfDirectoriesDeleted = counter(DIRECTORIES_DELETED,
+        "Total number of directories deleted through the object store.");
+    ignoredErrors = counter(IGNORED_ERRORS,
+        "Total number of errors caught and ingored.");
+  }
+
+  /**
+   * Create a counter in the registry.
+   * @param name counter name
+   * @param desc counter description
+   * @return a new counter
+   */
+  protected final MutableCounterLong counter(String name, String desc) {
+    return registry.newCounter(name, desc, 0L);
+  }
+
+  /**
+   * Create a counter in the stream map: these are unregistered in the public
+   * metrics.
+   * @param name counter name
+   * @param desc counter description
+   * @return a new counter
+   */
+  protected final MutableCounterLong streamCounter(String name, String desc) {
+    MutableCounterLong counter = new MutableCounterLong(
+        Interns.info(name, desc), 0L);
+    streamMetrics.put(name, counter);
+    return counter;
+  }
+
+  /**
+   * Create a gauge in the registry.
+   * @param name name gauge name
+   * @param desc description
+   * @return the gauge
+   */
+  protected final MutableGaugeLong gauge(String name, String desc) {
+    return registry.newGauge(name, desc, 0L);
+  }
+
+  /**
+   * Get the metrics registry.
+   * @return the registry
+   */
+  public MetricsRegistry getRegistry() {
+    return registry;
+  }
+
+  /**
+   * Dump all the metrics to a string.
+   * @param prefix prefix before every entry
+   * @param separator separator between name and value
+   * @param suffix suffix
+   * @param all get all the metrics even if the values are not changed.
+   * @return a string dump of the metrics
+   */
+  public String dump(String prefix,
+      String separator,
+      String suffix,
+      boolean all) {
+    MetricStringBuilder metricBuilder = new MetricStringBuilder(null,
+        prefix,
+        separator, suffix);
+    registry.snapshot(metricBuilder, all);
+    for (Map.Entry<String, MutableCounterLong> entry:
+        streamMetrics.entrySet()) {
+      metricBuilder.tuple(entry.getKey(),
+          Long.toString(entry.getValue().value()));
+    }
+    return metricBuilder.toString();
+  }
+
+  /**
+   * Indicate that S3A created a file.
+   */
+  public void fileCreated() {
+    numberOfFilesCreated.incr();
+  }
+
+  /**
+   * Indicate that S3A deleted one or more file.s
+   * @param count number of files.
+   */
+  public void fileDeleted(int count) {
+    numberOfFilesDeleted.incr(count);
+  }
+
+  /**
+   * Indicate that S3A created a directory.
+   */
+  public void directoryCreated() {
+    numberOfDirectoriesCreated.incr();
+  }
+
+  /**
+   * Indicate that S3A just deleted a directory.
+   */
+  public void directoryDeleted() {
+    numberOfDirectoriesDeleted.incr();
+  }
+
+  /**
+   * Indicate that S3A copied some files within the store.
+   *
+   * @param files number of files
+   * @param size total size in bytes
+   */
+  public void filesCopied(int files, long size) {
+    numberOfFilesCopied.incr(files);
+    bytesOfFilesCopied.incr(size);
+  }
+
+  /**
+   * Note that an error was ignored.
+   */
+  public void errorIgnored() {
+    ignoredErrors.incr();
+  }
+
+  /**
+   * Create a stream input statistics instance.
+   * @return the new instance
+   */
+  InputStreamStatistics newInputStreamStatistics() {
+    return new InputStreamStatistics();
+  }
+
+  /**
+   * Merge in the statistics of a single input stream into
+   * the filesystem-wide statistics.
+   * @param statistics stream statistics
+   */
+  private void mergeInputStreamStatistics(InputStreamStatistics statistics) {
+    streamOpenOperations.incr(statistics.openOperations);
+    streamCloseOperations.incr(statistics.closeOperations);
+    streamClosed.incr(statistics.closed);
+    streamAborted.incr(statistics.aborted);
+    streamSeekOperations.incr(statistics.seekOperations);
+    streamReadExceptions.incr(statistics.readExceptions);
+    streamForwardSeekOperations.incr(statistics.forwardSeekOperations);
+    streamBytesSkippedOnSeek.incr(statistics.bytesSkippedOnSeek);
+    streamBackwardSeekOperations.incr(statistics.backwardSeekOperations);
+    streamBytesBackwardsOnSeek.incr(statistics.bytesBackwardsOnSeek);
+    streamBytesRead.incr(statistics.bytesRead);
+    streamReadOperations.incr(statistics.readOperations);
+    streamReadFullyOperations.incr(statistics.readFullyOperations);
+    streamReadsIncomplete.incr(statistics.readsIncomplete);
+  }
+
+  /**
+   * Statistics updated by an input stream during its actual operation.
+   * These counters not thread-safe and are for use in a single instance
+   * of a stream.
+   */
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  public final class InputStreamStatistics implements AutoCloseable {
+    public long openOperations;
+    public long closeOperations;
+    public long closed;
+    public long aborted;
+    public long seekOperations;
+    public long readExceptions;
+    public long forwardSeekOperations;
+    public long backwardSeekOperations;
+    public long bytesRead;
+    public long bytesSkippedOnSeek;
+    public long bytesBackwardsOnSeek;
+    public long readOperations;
+    public long readFullyOperations;
+    public long readsIncomplete;
+
+    private InputStreamStatistics() {
+    }
+
+    /**
+     * Seek backwards, incrementing the seek and backward seek counters.
+     * @param negativeOffset how far was the seek?
+     * This is expected to be negative.
+     */
+    public void seekBackwards(long negativeOffset) {
+      seekOperations++;
+      backwardSeekOperations++;
+      bytesBackwardsOnSeek -= negativeOffset;
+    }
+
+    /**
+     * Record a forward seek, adding a seek operation, a forward
+     * seek operation, and any bytes skipped.
+     * @param skipped number of bytes skipped by reading from the stream.
+     * If the seek was implemented by a close + reopen, set this to zero.
+     */
+    public void seekForwards(long skipped) {
+      seekOperations++;
+      forwardSeekOperations++;
+      if (skipped > 0) {
+        bytesSkippedOnSeek += skipped;
+      }
+    }
+
+    /**
+     * The inner stream was opened.
+     */
+    public void streamOpened() {
+      openOperations++;
+    }
+
+    /**
+     * The inner stream was closed.
+     * @param abortedConnection flag to indicate the stream was aborted,
+     * rather than closed cleanly
+     */
+    public void streamClose(boolean abortedConnection) {
+      closeOperations++;
+      if (abortedConnection) {
+        this.aborted++;
+      } else {
+        closed++;
+      }
+    }
+
+    /**
+     * An ignored stream read exception was received.
+     */
+    public void readException() {
+      readExceptions++;
+    }
+
+    /**
+     * Increment the bytes read counter by the number of bytes;
+     * no-op if the argument is negative.
+     * @param bytes number of bytes read
+     */
+    public void bytesRead(long bytes) {
+      if (bytes > 0) {
+        bytesRead += bytes;
+      }
+    }
+
+    /**
+     * A {@code read(byte[] buf, int off, int len)} operation has started.
+     * @param pos starting position of the read
+     * @param len length of bytes to read
+     */
+    public void readOperationStarted(long pos, long len) {
+      readOperations++;
+    }
+
+    /**
+     * A {@code PositionedRead.read(position, buffer, offset, length)}
+     * operation has just started.
+     * @param pos starting position of the read
+     * @param len length of bytes to read
+     */
+    public void readFullyOperationStarted(long pos, long len) {
+      readFullyOperations++;
+    }
+
+    /**
+     * A read operation has completed.
+     * @param requested number of requested bytes
+     * @param actual the actual number of bytes
+     */
+    public void readOperationCompleted(int requested, int actual) {
+      if (requested > actual) {
+        readsIncomplete++;
+      }
+    }
+
+    /**
+     * Close triggers the merge of statistics into the filesystem's
+     * instrumentation instance.
+     */
+    @Override
+    public void close() {
+      mergeInputStreamStatistics(this);
+    }
+
+    /**
+     * String operator describes all the current statistics.
+     * <b>Important: there are no guarantees as to the stability
+     * of this value.</b>
+     * @return the current values of the stream statistics.
+     */
+    @Override
+    @InterfaceStability.Unstable
+    public String toString() {
+      final StringBuilder sb = new StringBuilder(
+          "StreamStatistics{");
+      sb.append("OpenOperations=").append(openOperations);
+      sb.append(", CloseOperations=").append(closeOperations);
+      sb.append(", Closed=").append(closed);
+      sb.append(", Aborted=").append(aborted);
+      sb.append(", SeekOperations=").append(seekOperations);
+      sb.append(", ReadExceptions=").append(readExceptions);
+      sb.append(", ForwardSeekOperations=")
+          .append(forwardSeekOperations);
+      sb.append(", BackwardSeekOperations=")
+          .append(backwardSeekOperations);
+      sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek);
+      sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek);
+      sb.append(", BytesRead=").append(bytesRead);
+      sb.append(", BytesRead excluding skipped=")
+          .append(bytesRead - bytesSkippedOnSeek);
+      sb.append(", ReadOperations=").append(readOperations);
+      sb.append(", ReadFullyOperations=").append(readFullyOperations);
+      sb.append(", ReadsIncomplete=").append(readsIncomplete);
+      sb.append('}');
+      return sb.toString();
+    }
+  }
+}

+ 22 - 18
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AOutputStream.java

@@ -21,14 +21,14 @@ package org.apache.hadoop.fs.s3a;
 import com.amazonaws.event.ProgressEvent;
 import com.amazonaws.event.ProgressEventType;
 import com.amazonaws.event.ProgressListener;
-import com.amazonaws.services.s3.AmazonS3Client;
 import com.amazonaws.services.s3.model.CannedAccessControlList;
 import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.PutObjectRequest;
 import com.amazonaws.services.s3.transfer.TransferManager;
-import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
 import com.amazonaws.services.s3.transfer.Upload;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
@@ -46,6 +46,11 @@ import static com.amazonaws.event.ProgressEventType.TRANSFER_COMPLETED_EVENT;
 import static com.amazonaws.event.ProgressEventType.TRANSFER_PART_STARTED_EVENT;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 
+/**
+ * Output stream to save data to S3.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
 public class S3AOutputStream extends OutputStream {
   private OutputStream backupStream;
   private File backupFile;
@@ -65,9 +70,9 @@ public class S3AOutputStream extends OutputStream {
   public static final Logger LOG = S3AFileSystem.LOG;
 
   public S3AOutputStream(Configuration conf, TransferManager transfers,
-    S3AFileSystem fs, String bucket, String key, Progressable progress, 
-    CannedAccessControlList cannedACL, FileSystem.Statistics statistics, 
-    String serverSideEncryptionAlgorithm)
+      S3AFileSystem fs, String bucket, String key, Progressable progress,
+      CannedAccessControlList cannedACL, FileSystem.Statistics statistics,
+      String serverSideEncryptionAlgorithm)
       throws IOException {
     this.bucket = bucket;
     this.key = key;
@@ -78,9 +83,8 @@ public class S3AOutputStream extends OutputStream {
     this.statistics = statistics;
     this.serverSideEncryptionAlgorithm = serverSideEncryptionAlgorithm;
 
-    partSize = conf.getLong(MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
-    partSizeThreshold = conf.getLong(MIN_MULTIPART_THRESHOLD,
-        DEFAULT_MIN_MULTIPART_THRESHOLD);
+    partSize = fs.getPartitionSize();
+    partSizeThreshold = fs.getMultiPartThreshold();
 
     if (conf.get(BUFFER_DIR, null) != null) {
       lDirAlloc = new LocalDirAllocator(BUFFER_DIR);
@@ -91,10 +95,8 @@ public class S3AOutputStream extends OutputStream {
     backupFile = lDirAlloc.createTmpFileForWrite("output-", LocalDirAllocator.SIZE_UNKNOWN, conf);
     closed = false;
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("OutputStream for key '" + key + "' writing to tempfile: " +
-                this.backupFile);
-    }
+    LOG.debug("OutputStream for key '{}' writing to tempfile: {}",
+        key, backupFile);
 
     this.backupStream = new BufferedOutputStream(new FileOutputStream(backupFile));
   }
@@ -111,10 +113,9 @@ public class S3AOutputStream extends OutputStream {
     }
 
     backupStream.close();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("OutputStream for key '" + key + "' closed. Now beginning upload");
-      LOG.debug("Minimum upload part size: " + partSize + " threshold " + partSizeThreshold);
-    }
+    LOG.debug("OutputStream for key '{}' closed. Now beginning upload", key);
+    LOG.debug("Minimum upload part size: {} threshold {}" , partSize,
+        partSizeThreshold);
 
 
     try {
@@ -129,7 +130,7 @@ public class S3AOutputStream extends OutputStream {
       Upload upload = transfers.upload(putObjectRequest);
 
       ProgressableProgressListener listener = 
-        new ProgressableProgressListener(upload, progress, statistics);
+          new ProgressableProgressListener(upload, progress, statistics);
       upload.addProgressListener(listener);
 
       upload.waitForUploadResult();
@@ -168,6 +169,9 @@ public class S3AOutputStream extends OutputStream {
     backupStream.write(b, off, len);
   }
 
+  /**
+   * Listener to progress from AWS regarding transfers.
+   */
   public static class ProgressableProgressListener implements ProgressListener {
     private Progressable progress;
     private FileSystem.Statistics statistics;
@@ -175,7 +179,7 @@ public class S3AOutputStream extends OutputStream {
     private Upload upload;
 
     public ProgressableProgressListener(Upload upload, Progressable progress, 
-      FileSystem.Statistics statistics) {
+        FileSystem.Statistics statistics) {
       this.upload = upload;
       this.progress = progress;
       this.statistics = statistics;

+ 65 - 3
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -434,6 +434,14 @@ this capability.
       <description>The implementation class of the S3A AbstractFileSystem.</description>
     </property>
 
+    <property>
+      <name>fs.s3a.readahead.range</name>
+      <value>65536</value>
+      <description>Bytes to read ahead during a seek() before closing and
+      re-opening the S3 HTTP connection. This option will be overridden if
+      any call to setReadahead() is made to an open stream.</description>
+    </property>
+
 ### S3AFastOutputStream
  **Warning: NEW in hadoop 2.7. UNSTABLE, EXPERIMENTAL: use at own risk**
 
@@ -644,7 +652,7 @@ Example:
     <configuration>
     
       <include xmlns="http://www.w3.org/2001/XInclude"
-        href="auth-keys.xml"/>
+        href="/home/testuser/.ssh/auth-keys.xml"/>
     
       <property>
         <name>fs.contract.test.fs.s3</name>
@@ -664,7 +672,61 @@ Example:
 
     </configuration>
 
-This example pulls in the `auth-keys.xml` file for the credentials. 
+This example pulls in the `~/.ssh/auth-keys.xml` file for the credentials.
 This provides one single place to keep the keys up to date —and means
 that the file `contract-test-options.xml` does not contain any
-secret credentials itself.
+secret credentials itself. As the auth keys XML file is kept out of the
+source code tree, it is not going to get accidentally committed.
+
+### Running Performance Tests against non-AWS storage infrastructures
+
+
+#### CSV Data source
+
+The `TestS3AInputStreamPerformance` tests require read access to a multi-MB
+text file. The default file for these tests is one published by amazon,
+[s3a://landsat-pds.s3.amazonaws.com/scene_list.gz](http://landsat-pds.s3.amazonaws.com/scene_list.gz).
+This is a gzipped CSV index of other files which amazon serves for open use.
+
+The path to this object is set in the option `fs.s3a.scale.test.csvfile`:
+
+    <property>
+      <name>fs.s3a.scale.test.csvfile</name>
+      <value>s3a://landsat-pds/scene_list.gz</value>
+    </property>
+
+1. If the option is not overridden, the default value is used. This
+is hosted in Amazon's US-east datacenter.
+1. If the property is empty, tests which require it will be skipped.
+1. If the data cannot be read for any reason then the test will fail.
+1. If the property is set to a different path, then that data must be readable
+and "sufficiently" large.
+
+To test on different S3 endpoints, or alternate infrastructures supporting
+the same APIs, the option `fs.s3a.scale.test.csvfile` must therefore be
+set to " ", or an object of at least 10MB is uploaded to the object store, and
+the `fs.s3a.scale.test.csvfile` option set to its path.
+
+      <property>
+        <name>fs.s3a.scale.test.csvfile</name>
+        <value> </value>
+      </property>
+
+
+#### Scale test operation count
+
+Some scale tests perform multiple operations (such as creating many directories).
+
+The exact number of operations to perform is configurable in the option
+`scale.test.operation.count`
+
+      <property>
+        <name>scale.test.operation.count</name>
+        <value>10</value>
+      </property>
+
+Larger values generate more load, and are recommended when testing locally,
+or in batch runs.
+
+Smaller values should result in faster test runs, especially when the object
+store is a long way away.

+ 182 - 9
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/S3AScaleTestBase.java

@@ -19,18 +19,25 @@
 package org.apache.hadoop.fs.s3a.scale;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.Path;
 
+import org.apache.hadoop.fs.s3a.S3AInputStream;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.net.URI;
+import java.io.InputStream;
+import java.util.Locale;
 
 import static org.junit.Assume.assumeTrue;
 
@@ -38,30 +45,61 @@ import static org.junit.Assume.assumeTrue;
  * Base class for scale tests; here is where the common scale configuration
  * keys are defined.
  */
-public class S3AScaleTestBase {
+public class S3AScaleTestBase extends Assert {
 
   public static final String SCALE_TEST = "scale.test.";
 
+  public static final String S3A_SCALE_TEST = "fs.s3a.scale.test.";
+
+  @Rule
+  public TestName methodName = new TestName();
+
+  @BeforeClass
+  public static void nameThread() {
+    Thread.currentThread().setName("JUnit");
+  }
+
   /**
-   * The number of operations to perform: {@value}
+   * The number of operations to perform: {@value}.
    */
   public static final String KEY_OPERATION_COUNT =
       SCALE_TEST + "operation.count";
 
   /**
-   * The default number of operations to perform: {@value}
+   * The readahead buffer: {@value}.
+   */
+  public static final String KEY_READ_BUFFER_SIZE =
+      S3A_SCALE_TEST + "read.buffer.size";
+
+  public static final int DEFAULT_READ_BUFFER_SIZE = 16384;
+
+  /**
+   * Key for a multi MB test file: {@value}.
+   */
+  public static final String KEY_CSVTEST_FILE =
+      S3A_SCALE_TEST + "csvfile";
+
+  /**
+   * Default path for the multi MB test file: {@value}.
+   */
+  public static final String DEFAULT_CSVTEST_FILE
+      = "s3a://landsat-pds/scene_list.gz";
+
+  /**
+   * The default number of operations to perform: {@value}.
    */
   public static final long DEFAULT_OPERATION_COUNT = 2005;
 
   protected S3AFileSystem fs;
-  private static final Logger LOG =
+
+  protected static final Logger LOG =
       LoggerFactory.getLogger(S3AScaleTestBase.class);
 
   private Configuration conf;
 
   /**
    * Configuration generator. May be overridden to inject
-   * some custom options
+   * some custom options.
    * @return a configuration with which to create FS instances
    */
   protected Configuration createConfiguration() {
@@ -69,7 +107,7 @@ public class S3AScaleTestBase {
   }
 
   /**
-   * Get the configuration used to set up the FS
+   * Get the configuration used to set up the FS.
    * @return the configuration
    */
   public Configuration getConf() {
@@ -79,7 +117,7 @@ public class S3AScaleTestBase {
   @Before
   public void setUp() throws Exception {
     conf = createConfiguration();
-    LOG.info("Scale test operation count = {}", getOperationCount());
+    LOG.debug("Scale test operation count = {}", getOperationCount());
     fs = S3ATestUtils.createTestFileSystem(conf);
   }
 
@@ -95,4 +133,139 @@ public class S3AScaleTestBase {
   protected long getOperationCount() {
     return getConf().getLong(KEY_OPERATION_COUNT, DEFAULT_OPERATION_COUNT);
   }
+
+  /**
+   * Describe a test in the logs
+   * @param text text to print
+   * @param args arguments to format in the printing
+   */
+  protected void describe(String text, Object... args) {
+    LOG.info("\n\n{}: {}\n",
+        methodName.getMethodName(),
+        String.format(text, args));
+  }
+
+  /**
+   * Get the input stream statistics of an input stream.
+   * Raises an exception if the inner stream is not an S3A input stream
+   * @param in wrapper
+   * @return the statistics for the inner stream
+   */
+  protected S3AInstrumentation.InputStreamStatistics getInputStreamStatistics(
+      FSDataInputStream in) {
+    InputStream inner = in.getWrappedStream();
+    if (inner instanceof S3AInputStream) {
+      S3AInputStream s3a = (S3AInputStream) inner;
+      return s3a.getS3AStreamStatistics();
+    } else {
+      Assert.fail("Not an S3AInputStream: " + inner);
+      // never reached
+      return null;
+    }
+  }
+
+  /**
+   * Make times more readable, by adding a "," every three digits.
+   * @param nanos nanos or other large number
+   * @return a string for logging
+   */
+  protected static String toHuman(long nanos) {
+    return String.format(Locale.ENGLISH, "%,d", nanos);
+  }
+
+  /**
+   * Log the bandwidth of a timer as inferred from the number of
+   * bytes processed.
+   * @param timer timer
+   * @param bytes bytes processed in the time period
+   */
+  protected void bandwidth(NanoTimer timer, long bytes) {
+    LOG.info("Bandwidth = {}  MB/S",
+        timer.bandwidthDescription(bytes));
+  }
+
+  /**
+   * Work out the bandwidth in MB/s
+   * @param bytes bytes
+   * @param durationNS duration in nanos
+   * @return the number of megabytes/second of the recorded operation
+   */
+  public static double bandwidthMBs(long bytes, long durationNS) {
+    return (bytes * 1000.0 ) / durationNS;
+  }
+
+  /**
+   * A simple class for timing operations in nanoseconds, and for
+   * printing some useful results in the process.
+   */
+  protected static class NanoTimer {
+    final long startTime;
+    long endTime;
+
+    public NanoTimer() {
+      startTime = now();
+    }
+
+    /**
+     * End the operation
+     * @return the duration of the operation
+     */
+    public long end() {
+      endTime = now();
+      return duration();
+    }
+
+    /**
+     * End the operation; log the duration
+     * @param format message
+     * @param args any arguments
+     * @return the duration of the operation
+     */
+    public long end(String format, Object... args) {
+      long d = end();
+      LOG.info("Duration of {}: {} nS",
+          String.format(format, args), toHuman(d));
+      return d;
+    }
+
+    long now() {
+      return System.nanoTime();
+    }
+
+    long duration() {
+      return endTime - startTime;
+    }
+
+    double bandwidth(long bytes) {
+      return S3AScaleTestBase.bandwidthMBs(bytes, duration());
+    }
+
+    /**
+     * Bandwidth as bytes per second
+     * @param bytes bytes in
+     * @return the number of bytes per second this operation timed.
+     */
+    double bandwidthBytes(long bytes) {
+      return (bytes * 1.0 ) / duration();
+    }
+
+    /**
+     * How many nanoseconds per byte
+     * @param bytes bytes processed in this time period
+     * @return the nanoseconds it took each byte to be processed
+     */
+    long nanosPerByte(long bytes) {
+      return duration() / bytes;
+    }
+
+    /**
+     * Get a description of the bandwidth, even down to fractions of
+     * a MB
+     * @param bytes bytes processed
+     * @return bandwidth
+     */
+    String bandwidthDescription(long bytes) {
+      return String.format("%,.6f", bandwidth(bytes));
+    }
+  }
 }

+ 0 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3ADeleteManyFiles.java

@@ -40,7 +40,6 @@ public class TestS3ADeleteManyFiles extends S3AScaleTestBase {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestS3ADeleteManyFiles.class);
 
-
   @Rule
   public Timeout testTimeout = new Timeout(30 * 60 * 1000);
 

+ 285 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/TestS3AInputStreamPerformance.java

@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.scale;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.Constants;
+import org.apache.hadoop.fs.s3a.S3AFileStatus;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.fs.s3a.S3AInstrumentation;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assume;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Look at the performance of S3a operations
+ */
+public class TestS3AInputStreamPerformance extends S3AScaleTestBase {
+  private static final Logger LOG = LoggerFactory.getLogger(
+      TestS3AInputStreamPerformance.class);
+
+  private S3AFileSystem s3aFS;
+  private Path testData;
+  private S3AFileStatus testDataStatus;
+  private FSDataInputStream in;
+  private S3AInstrumentation.InputStreamStatistics streamStatistics;
+  public static final int BLOCK_SIZE = 32 * 1024;
+  public static final int BIG_BLOCK_SIZE = 256 * 1024;
+
+  /** Tests only run if the there is a named test file that can be read */
+  private boolean testDataAvailable = true;
+  private String assumptionMessage = "test file";
+
+  /**
+   * Open the FS and the test data. The input stream is always set up here.
+   * @throws IOException
+   */
+  @Before
+  public void openFS() throws IOException {
+    Configuration conf = getConf();
+    String testFile =  conf.getTrimmed(KEY_CSVTEST_FILE, DEFAULT_CSVTEST_FILE);
+    if (testFile.isEmpty()) {
+      assumptionMessage = "Empty test property: " + KEY_CSVTEST_FILE;
+      testDataAvailable = false;
+    } else {
+      testData = new Path(testFile);
+      s3aFS = (S3AFileSystem) FileSystem.newInstance(testData.toUri(), conf);
+      try {
+        testDataStatus = s3aFS.getFileStatus(testData);
+      } catch (IOException e) {
+        LOG.warn("Failed to read file {} specified in {}",
+            testFile, KEY_CSVTEST_FILE, e);
+        throw e;
+      }
+    }
+  }
+
+  /**
+   * Cleanup: close the stream, close the FS.
+   */
+  @After
+  public void cleanup() {
+    IOUtils.closeStream(in);
+    IOUtils.closeStream(s3aFS);
+  }
+
+  /**
+   * Declare that the test requires the CSV test dataset
+   */
+  private void requireCSVTestData() {
+    Assume.assumeTrue(assumptionMessage, testDataAvailable);
+  }
+
+  /**
+   * Open the test file with the read buffer specified in the setting
+   * {@link #KEY_READ_BUFFER_SIZE}
+   * @return the stream, wrapping an S3a one
+   * @throws IOException
+   */
+  FSDataInputStream openTestFile() throws IOException {
+    int bufferSize = getConf().getInt(KEY_READ_BUFFER_SIZE,
+        DEFAULT_READ_BUFFER_SIZE);
+    FSDataInputStream stream = s3aFS.open(testData, bufferSize);
+    streamStatistics = getInputStreamStatistics(stream);
+    return stream;
+  }
+
+  /**
+   * assert tha the stream was only ever opened once
+   */
+  protected void assertStreamOpenedExactlyOnce() {
+    assertOpenOperationCount(1);
+  }
+
+  /**
+   * Make an assertion count about the number of open operations
+   * @param expected the expected number
+   */
+  private void assertOpenOperationCount(int expected) {
+    assertEquals("open operations in " + streamStatistics,
+        expected, streamStatistics.openOperations);
+  }
+
+  /**
+   * Log how long an IOP took, by dividing the total time by the
+   * count of operations, printing in a human-readable form
+   * @param timer timing data
+   * @param count IOP count.
+   */
+  protected void logTimePerIOP(NanoTimer timer, long count) {
+    LOG.info("Time per IOP: {} nS", toHuman(timer.duration() / count));
+  }
+
+  @Test
+  public void testTimeToOpenAndReadWholeFileByByte() throws Throwable {
+    requireCSVTestData();
+    describe("Open the test file %s and read it byte by byte", testData);
+    long len = testDataStatus.getLen();
+    NanoTimer timeOpen = new NanoTimer();
+    in = openTestFile();
+    timeOpen.end("Open stream");
+    NanoTimer readTimer = new NanoTimer();
+    long count = 0;
+    while (in.read() >= 0) {
+      count ++;
+    }
+    readTimer.end("Time to read %d bytes", len);
+    bandwidth(readTimer, count);
+    assertEquals("Not enough bytes were read)", len, count);
+    long nanosPerByte = readTimer.nanosPerByte(count);
+    LOG.info("An open() call has the equivalent duration of reading {} bytes",
+        toHuman( timeOpen.duration() / nanosPerByte));
+  }
+
+  @Test
+  public void testTimeToOpenAndReadWholeFileBlocks() throws Throwable {
+    requireCSVTestData();
+    describe("Open the test file %s and read it in blocks of size %d",
+        testData, BLOCK_SIZE);
+    long len = testDataStatus.getLen();
+    in = openTestFile();
+    byte[] block = new byte[BLOCK_SIZE];
+    NanoTimer timer2 = new NanoTimer();
+    long count = 0;
+    // implicitly rounding down here
+    long blockCount = len / BLOCK_SIZE;
+    for (long i = 0; i < blockCount; i++) {
+      int offset = 0;
+      int remaining = BLOCK_SIZE;
+      NanoTimer blockTimer = new NanoTimer();
+      int reads = 0;
+      while (remaining > 0) {
+        int bytesRead = in.read(block, offset, remaining);
+        reads ++;
+        if (bytesRead == 1) {
+          break;
+        }
+        remaining -= bytesRead;
+        offset += bytesRead;
+        count += bytesRead;
+      }
+      blockTimer.end("Reading block %d in %d reads", i, reads);
+    }
+    timer2.end("Time to read %d bytes in %d blocks", len, blockCount );
+    bandwidth(timer2, count);
+    LOG.info("{}", streamStatistics);
+  }
+
+  @Test
+  public void testLazySeekEnabled() throws Throwable {
+    requireCSVTestData();
+    describe("Verify that seeks do not trigger any IO");
+    long len = testDataStatus.getLen();
+    in = openTestFile();
+    NanoTimer timer = new NanoTimer();
+    long blockCount = len / BLOCK_SIZE;
+    for (long i = 0; i < blockCount; i++) {
+      in.seek(in.getPos() + BLOCK_SIZE - 1);
+    }
+    in.seek(0);
+    blockCount++;
+    timer.end("Time to execute %d seeks", blockCount);
+    logTimePerIOP(timer, blockCount);
+    LOG.info("{}", streamStatistics);
+    assertOpenOperationCount(0);
+    assertEquals("bytes read", 0, streamStatistics.bytesRead);
+  }
+
+  @Test
+  public void testReadAheadDefault() throws Throwable {
+    requireCSVTestData();
+    describe("Verify that a series of forward skips within the readahead" +
+        " range do not close and reopen the stream");
+    executeSeekReadSequence(BLOCK_SIZE, Constants.DEFAULT_READAHEAD_RANGE);
+    assertStreamOpenedExactlyOnce();
+  }
+
+  @Test
+  public void testReadaheadOutOfRange() throws Throwable {
+    requireCSVTestData();
+    try {
+      in = openTestFile();
+      in.setReadahead(-1L);
+      fail("Stream should have rejected the request "+ in);
+    } catch (IllegalArgumentException e) {
+      // expected
+    }
+
+  }
+
+  @Test
+  public void testReadBigBlocksAvailableReadahead() throws Throwable {
+    requireCSVTestData();
+    describe("set readahead to available bytes only");
+    executeSeekReadSequence(BIG_BLOCK_SIZE, 0);
+    // expect that the stream will have had lots of opens
+    assertTrue("not enough open operations in " + streamStatistics,
+        streamStatistics.openOperations > 1);
+  }
+
+  @Test
+  public void testReadBigBlocksBigReadahead() throws Throwable {
+    requireCSVTestData();
+    describe("Read big blocks with a big readahead");
+    executeSeekReadSequence(BIG_BLOCK_SIZE, BIG_BLOCK_SIZE * 2);
+    assertStreamOpenedExactlyOnce();
+  }
+
+  /**
+   * Execute a seek+read sequence
+   * @param blockSize block size for seeks
+   * @param readahead what the readahead value of the stream should be
+   * @throws IOException IO problems
+   */
+  protected void executeSeekReadSequence(long blockSize,
+      long readahead) throws IOException {
+    requireCSVTestData();
+    long len = testDataStatus.getLen();
+    in = openTestFile();
+    in.setReadahead(readahead);
+    NanoTimer timer = new NanoTimer();
+    long blockCount = len / blockSize;
+    LOG.info("Reading {} blocks, readahead = {}",
+        blockCount, readahead);
+    for (long i = 0; i < blockCount; i++) {
+      in.seek(in.getPos() + blockSize - 1);
+      // this is the read
+      assertTrue(in.read() >= 0);
+    }
+    timer.end("Time to execute %d seeks of distance %d with readahead = %d",
+        blockCount,
+        blockSize,
+        readahead);
+    logTimePerIOP(timer, blockCount);
+    LOG.info("Effective bandwidth {} MB/S",
+        timer.bandwidthDescription(streamStatistics.bytesRead -
+            streamStatistics.bytesSkippedOnSeek));
+    LOG.info("{}", streamStatistics);
+  }
+
+}

+ 3 - 0
hadoop-tools/hadoop-aws/src/test/resources/log4j.properties

@@ -16,3 +16,6 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n
+
+# for debugging low level S3a operations, uncomment this line
+# log4j.logger.org.apache.hadoop.fs.s3a=DEBUG