Bläddra i källkod

HADOOP-14965. S3a input stream "normal" fadvise mode to be adaptive

(cherry picked from commit 1ba491ff907fc5d2618add980734a3534e2be098)
Steve Loughran 7 år sedan
förälder
incheckning
11f577f3e6

+ 24 - 4
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -84,7 +84,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
   private final S3AInstrumentation.InputStreamStatistics streamStatistics;
   private S3AEncryptionMethods serverSideEncryptionAlgorithm;
   private String serverSideEncryptionKey;
-  private final S3AInputPolicy inputPolicy;
+  private S3AInputPolicy inputPolicy;
   private long readahead = Constants.DEFAULT_READAHEAD_RANGE;
 
   /**
@@ -124,10 +124,20 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
     this.serverSideEncryptionAlgorithm =
         s3Attributes.getServerSideEncryptionAlgorithm();
     this.serverSideEncryptionKey = s3Attributes.getServerSideEncryptionKey();
-    this.inputPolicy = inputPolicy;
+    setInputPolicy(inputPolicy);
     setReadahead(readahead);
   }
 
+  /**
+   * Set/update the input policy of the stream.
+   * This updates the stream statistics.
+   * @param inputPolicy new input policy.
+   */
+  private void setInputPolicy(S3AInputPolicy inputPolicy) {
+    this.inputPolicy = inputPolicy;
+    streamStatistics.inputPolicySet(inputPolicy.ordinal());
+  }
+
   /**
    * Opens up the stream at specified target position and for given length.
    *
@@ -146,8 +156,9 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
     contentRangeFinish = calculateRequestLimit(inputPolicy, targetPos,
         length, contentLength, readahead);
     LOG.debug("reopen({}) for {} range[{}-{}], length={}," +
-        " streamPosition={}, nextReadPosition={}",
-        uri, reason, targetPos, contentRangeFinish, length,  pos, nextReadPos);
+        " streamPosition={}, nextReadPosition={}, policy={}",
+        uri, reason, targetPos, contentRangeFinish, length,  pos, nextReadPos,
+        inputPolicy);
 
     streamStatistics.streamOpened();
     try {
@@ -258,6 +269,12 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
     } else if (diff < 0) {
       // backwards seek
       streamStatistics.seekBackwards(diff);
+      // if the stream is in "Normal" mode, switch to random IO at this
+      // point, as it is indicative of columnar format IO
+      if (inputPolicy.equals(S3AInputPolicy.Normal)) {
+        LOG.info("Switching to Random IO seek policy");
+        setInputPolicy(S3AInputPolicy.Random);
+      }
     } else {
       // targetPos == pos
       if (remainingInCurrentRequest() > 0) {
@@ -427,6 +444,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       try {
         // close or abort the stream
         closeStream("close() operation", this.contentRangeFinish, false);
+        LOG.debug("Statistics of stream {}\n{}", key, streamStatistics);
         // this is actually a no-op
         super.close();
       } finally {
@@ -697,6 +715,8 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead {
       break;
 
     case Normal:
+      // normal is considered sequential until a backwards seek switches
+      // it to 'Random'
     default:
       rangeLimit = contentLength;
 

+ 13 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -553,6 +553,8 @@ public class S3AInstrumentation {
     public long readsIncomplete;
     public long bytesReadInClose;
     public long bytesDiscardedInAbort;
+    public long policySetCount;
+    public long inputPolicy;
 
     private InputStreamStatistics() {
     }
@@ -665,6 +667,15 @@ public class S3AInstrumentation {
       mergeInputStreamStatistics(this);
     }
 
+    /**
+     * The input policy has been switched.
+     * @param updatedPolicy enum value of new policy.
+     */
+    public void inputPolicySet(int updatedPolicy) {
+      policySetCount++;
+      inputPolicy = updatedPolicy;
+    }
+
     /**
      * String operator describes all the current statistics.
      * <b>Important: there are no guarantees as to the stability
@@ -696,6 +707,8 @@ public class S3AInstrumentation {
       sb.append(", ReadsIncomplete=").append(readsIncomplete);
       sb.append(", BytesReadInClose=").append(bytesReadInClose);
       sb.append(", BytesDiscardedInAbort=").append(bytesDiscardedInAbort);
+      sb.append(", InputPolicy=").append(inputPolicy);
+      sb.append(", InputPolicySetCount=").append(policySetCount);
       sb.append('}');
       return sb.toString();
     }

+ 12 - 1
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -1393,7 +1393,18 @@ backward seeks.
 
 *"normal" (default)*
 
-This is currently the same as "sequential", though it may evolve in future.
+The "Normal" policy starts off reading a file  in "sequential" mode,
+but if the caller seeks backwards in the stream, it switches from
+sequential to "random".
+
+This policy effectively recognizes the initial read pattern of columnar
+storage formats (e.g. Apache ORC and Apache Parquet), which seek to the end
+of a file, read in index data and then seek backwards to selectively read
+columns. The first seeks may be be expensive compared to the random policy,
+however the overall process is much less expensive than either sequentially
+reading through a file with the "random" policy, or reading columnar data
+with the "sequential" policy. When the exact format/recommended
+seek policy of data are known in advance, this policy
 
 *"random"*
 

+ 5 - 1
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AInputStreamPerformance.java

@@ -427,7 +427,11 @@ public class ITestS3AInputStreamPerformance extends S3AScaleTestBase {
     long expectedOpenCount = RANDOM_IO_SEQUENCE.length;
     executeRandomIO(S3AInputPolicy.Normal, expectedOpenCount);
     assertEquals("streams aborted in " + streamStatistics,
-        4, streamStatistics.aborted);
+        1, streamStatistics.aborted);
+    assertEquals("policy changes in " + streamStatistics,
+        2, streamStatistics.policySetCount);
+    assertEquals("input policy in " + streamStatistics,
+        S3AInputPolicy.Random.ordinal(), streamStatistics.inputPolicy);
   }
 
   /**