Bladeren bron

HADOOP-16379: S3AInputStream.unbuffer should merge input stream stats into fs-wide stats

Contributed by Sahil Takiar

Change-Id: I2bcfaaea00d12c633757069402dcd0b91a5f5c05
Sahil Takiar 6 jaren geleden
bovenliggende
commit
28291a9e8a

+ 9 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

@@ -823,9 +823,17 @@ public class S3AInputStream extends FSInputStream implements  CanSetReadahead,
     }
   }
 
+  /**
+   * Closes the underlying S3 stream, and merges the {@link #streamStatistics}
+   * instance associated with the stream.
+   */
   @Override
   public synchronized void unbuffer() {
-    closeStream("unbuffer()", contentRangeFinish, false);
+    try {
+      closeStream("unbuffer()", contentRangeFinish, false);
+    } finally {
+      streamStatistics.merge(false);
+    }
   }
 
   @Override

+ 84 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -647,6 +647,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
     public long inputPolicy;
     /** This is atomic so that it can be passed as a reference. */
     private final AtomicLong versionMismatches = new AtomicLong(0);
+    private InputStreamStatistics mergedStats;
 
     private InputStreamStatistics() {
     }
@@ -759,7 +760,7 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
      */
     @Override
     public void close() {
-      mergeInputStreamStatistics(this);
+      merge(true);
     }
 
     /**
@@ -816,6 +817,88 @@ public class S3AInstrumentation implements Closeable, MetricsSource {
       sb.append('}');
       return sb.toString();
     }
+
+    /**
+     * Merge the statistics into the filesystem's instrumentation instance.
+     * Takes a diff between the current version of the stats and the
+     * version of the stats when merge was last called, and merges the diff
+     * into the instrumentation instance. Used to periodically merge the
+     * stats into the fs-wide stats. <b>Behavior is undefined if called on a
+     * closed instance.</b>
+     */
+    void merge(boolean isClosed) {
+      if (mergedStats != null) {
+        mergeInputStreamStatistics(diff(mergedStats));
+      } else {
+        mergeInputStreamStatistics(this);
+      }
+      // If stats are closed, no need to create another copy
+      if (!isClosed) {
+        mergedStats = copy();
+      }
+    }
+
+    /**
+     * Returns a diff between this {@link InputStreamStatistics} instance and
+     * the given {@link InputStreamStatistics} instance.
+     */
+    private InputStreamStatistics diff(InputStreamStatistics inputStats) {
+      InputStreamStatistics diff = new InputStreamStatistics();
+      diff.openOperations = openOperations - inputStats.openOperations;
+      diff.closeOperations = closeOperations - inputStats.closeOperations;
+      diff.closed = closed - inputStats.closed;
+      diff.aborted = aborted - inputStats.aborted;
+      diff.seekOperations = seekOperations - inputStats.seekOperations;
+      diff.readExceptions = readExceptions - inputStats.readExceptions;
+      diff.forwardSeekOperations =
+              forwardSeekOperations - inputStats.forwardSeekOperations;
+      diff.backwardSeekOperations =
+              backwardSeekOperations - inputStats.backwardSeekOperations;
+      diff.bytesRead = bytesRead - inputStats.bytesRead;
+      diff.bytesSkippedOnSeek =
+              bytesSkippedOnSeek - inputStats.bytesSkippedOnSeek;
+      diff.bytesBackwardsOnSeek =
+              bytesBackwardsOnSeek - inputStats.bytesBackwardsOnSeek;
+      diff.readOperations = readOperations - inputStats.readOperations;
+      diff.readFullyOperations =
+              readFullyOperations - inputStats.readFullyOperations;
+      diff.readsIncomplete = readsIncomplete - inputStats.readsIncomplete;
+      diff.bytesReadInClose = bytesReadInClose - inputStats.bytesReadInClose;
+      diff.bytesDiscardedInAbort =
+              bytesDiscardedInAbort - inputStats.bytesDiscardedInAbort;
+      diff.policySetCount = policySetCount - inputStats.policySetCount;
+      diff.inputPolicy = inputPolicy - inputStats.inputPolicy;
+      diff.versionMismatches.set(versionMismatches.longValue() -
+              inputStats.versionMismatches.longValue());
+      return diff;
+    }
+
+    /**
+     * Returns a new {@link InputStreamStatistics} instance with all the same
+     * values as this {@link InputStreamStatistics}.
+     */
+    private InputStreamStatistics copy() {
+      InputStreamStatistics copy = new InputStreamStatistics();
+      copy.openOperations = openOperations;
+      copy.closeOperations = closeOperations;
+      copy.closed = closed;
+      copy.aborted = aborted;
+      copy.seekOperations = seekOperations;
+      copy.readExceptions = readExceptions;
+      copy.forwardSeekOperations = forwardSeekOperations;
+      copy.backwardSeekOperations = backwardSeekOperations;
+      copy.bytesRead = bytesRead;
+      copy.bytesSkippedOnSeek = bytesSkippedOnSeek;
+      copy.bytesBackwardsOnSeek = bytesBackwardsOnSeek;
+      copy.readOperations = readOperations;
+      copy.readFullyOperations = readFullyOperations;
+      copy.readsIncomplete = readsIncomplete;
+      copy.bytesReadInClose = bytesReadInClose;
+      copy.bytesDiscardedInAbort = bytesDiscardedInAbort;
+      copy.policySetCount = policySetCount;
+      copy.inputPolicy = inputPolicy;
+      return copy;
+    }
   }
 
   /**

+ 71 - 8
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java

@@ -19,14 +19,16 @@
 package org.apache.hadoop.fs.s3a;
 
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.io.IOUtils;
 
 import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.fs.s3a.Statistic.STREAM_SEEK_BYTES_READ;
+
 /**
  * Integration test for calling
  * {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link S3AInputStream}.
@@ -38,20 +40,27 @@ import java.io.IOException;
  */
 public class ITestS3AUnbuffer extends AbstractS3ATestBase {
 
+  private Path dest;
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+    dest = path("ITestS3AUnbuffer");
+    describe("ITestS3AUnbuffer");
+
+    byte[] data = ContractTestUtils.dataset(16, 'a', 26);
+    ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
+            16, true);
+  }
+
   @Test
   public void testUnbuffer() throws IOException {
-    // Setup test file
-    Path dest = path("testUnbuffer");
     describe("testUnbuffer");
-    try (FSDataOutputStream outputStream = getFileSystem().create(dest, true)) {
-      byte[] data = ContractTestUtils.dataset(16, 'a', 26);
-      outputStream.write(data);
-    }
 
     // Open file, read half the data, and then call unbuffer
     try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
       assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
-      assertEquals(8, inputStream.read(new byte[8]));
+      readAndAssertBytesRead(inputStream, 8);
       assertTrue(isObjectStreamOpen(inputStream));
       inputStream.unbuffer();
 
@@ -60,7 +69,61 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
     }
   }
 
+  /**
+   * Test that calling {@link S3AInputStream#unbuffer()} merges a stream's
+   * {@link org.apache.hadoop.fs.s3a.S3AInstrumentation.InputStreamStatistics}
+   * into the {@link S3AFileSystem}'s {@link S3AInstrumentation} instance.
+   */
+  @Test
+  public void testUnbufferStreamStatistics() throws IOException {
+    describe("testUnbufferStreamStatistics");
+
+    // Validate bytesRead is updated correctly
+    S3ATestUtils.MetricDiff bytesRead = new S3ATestUtils.MetricDiff(
+            getFileSystem(), STREAM_SEEK_BYTES_READ);
+
+    // Open file, read half the data, and then call unbuffer
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream = getFileSystem().open(dest);
+
+      readAndAssertBytesRead(inputStream, 8);
+      inputStream.unbuffer();
+
+      // Validate that calling unbuffer updates the input stream statistics
+      bytesRead.assertDiffEquals(8);
+
+      // Validate that calling unbuffer twice in a row updates the statistics
+      // correctly
+      readAndAssertBytesRead(inputStream, 4);
+      inputStream.unbuffer();
+      bytesRead.assertDiffEquals(12);
+    } finally {
+      IOUtils.closeStream(inputStream);
+    }
+
+    // Validate that closing the file does not further change the statistics
+    bytesRead.assertDiffEquals(12);
+
+    // Validate that the input stream stats are correct when the file is closed
+    assertEquals("S3AInputStream statistics were not updated properly", 12,
+            ((S3AInputStream) inputStream.getWrappedStream())
+                    .getS3AStreamStatistics().bytesRead);
+  }
+
   private boolean isObjectStreamOpen(FSDataInputStream inputStream) {
     return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
   }
+
+  /**
+   * Read the specified number of bytes from the given
+   * {@link FSDataInputStream} and assert that
+   * {@link FSDataInputStream#read(byte[])} read the specified number of bytes.
+   */
+  private static void readAndAssertBytesRead(FSDataInputStream inputStream,
+                                        int bytesToRead) throws IOException {
+    assertEquals("S3AInputStream#read did not read the correct number of " +
+                    "bytes", bytesToRead,
+            inputStream.read(new byte[bytesToRead]));
+  }
 }