瀏覽代碼

HADOOP-18175. fix test failures with prefetching s3a input stream (#4212)

Contributed by Monthon Klongklaew
monthonk 3 年之前
父節點
當前提交
9abc77b19e

+ 1 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3File.java

@@ -164,6 +164,7 @@ public class S3File implements Closeable {
     Validate.checkLessOrEqual(offset, "offset", size(), "size()");
     Validate.checkLessOrEqual(size, "size", size() - offset, "size() - offset");
 
+    streamStatistics.streamOpened();
     final GetObjectRequest request = client.newGetRequest(this.s3Attributes.getKey())
         .withRange(offset, offset + size - 1);
     this.changeTracker.maybeApplyConstraint(request);

+ 8 - 10
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3InputStream.java

@@ -254,6 +254,10 @@ public abstract class S3InputStream
   public int read() throws IOException {
     this.throwIfClosed();
 
+    if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
+      return -1;
+    }
+
     if (!ensureCurrentBuffer()) {
       return -1;
     }
@@ -296,6 +300,10 @@ public abstract class S3InputStream
       return 0;
     }
 
+    if (this.s3File.size() == 0 || this.seekTargetPos >= this.s3File.size()) {
+      return -1;
+    }
+
     if (!ensureCurrentBuffer()) {
       return -1;
     }
@@ -427,18 +435,8 @@ public abstract class S3InputStream
   }
 
   protected void throwIfInvalidSeek(long pos) throws EOFException {
-    long fileSize = this.s3File.size();
     if (pos < 0) {
       throw new EOFException(FSExceptionMessages.NEGATIVE_SEEK + " " + pos);
-    } else {
-      if (fileSize == 0 && pos == 0) {
-        // Do nothing. Valid combination.
-        return;
-      }
-
-      if (pos >= fileSize) {
-        throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF + " " + pos);
-      }
     }
   }
 

+ 1 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/read/S3PrefetchingInputStream.java

@@ -103,8 +103,7 @@ public class S3PrefetchingInputStream
    */
   @Override
   public synchronized long getPos() throws IOException {
-    this.throwIfClosed();
-    return this.inputStream.getPos();
+    return this.isClosed() ? 0 : this.inputStream.getPos();
   }
 
   /**

+ 11 - 5
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.statistics.IOStatisticAssertions;
 import org.apache.hadoop.fs.statistics.StreamStatisticNames;
 
 import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -78,11 +79,16 @@ public class ITestS3ARequesterPays extends AbstractS3ATestBase {
       inputStream.seek(0);
       inputStream.readByte();
 
-      // Verify > 1 call was made, so we're sure it is correctly configured for each request
-      IOStatisticAssertions
-          .assertThatStatisticCounter(inputStream.getIOStatistics(),
-              StreamStatisticNames.STREAM_READ_OPENED)
-          .isGreaterThan(1);
+      if (conf.getBoolean(PREFETCH_ENABLED_KEY, true)) {
+        // For S3PrefetchingInputStream, verify a call was made
+        IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
+            StreamStatisticNames.STREAM_READ_OPENED).isEqualTo(1);
+      } else {
+        // For S3AInputStream, verify > 1 call was made,
+        // so we're sure it is correctly configured for each request
+        IOStatisticAssertions.assertThatStatisticCounter(inputStream.getIOStatistics(),
+            StreamStatisticNames.STREAM_READ_OPENED).isGreaterThan(1);
+      }
 
       // Check list calls work without error
       fs.listFiles(requesterPaysPath.getParent(), false);

+ 10 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AUnbuffer.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.s3a;
 
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.IOStatistics;
@@ -33,6 +34,7 @@ import org.junit.Test;
 
 import java.io.IOException;
 
+import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_BYTES_READ_CLOSE;
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_TOTAL_BYTES;
@@ -72,6 +74,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
     IOStatisticsSnapshot iostats = new IOStatisticsSnapshot();
     // Open file, read half the data, and then call unbuffer
     try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      skipIfCannotUnbuffer(inputStream);
       assertTrue(inputStream.getWrappedStream() instanceof S3AInputStream);
       int bytesToRead = 8;
       readAndAssertBytesRead(inputStream, bytesToRead);
@@ -138,6 +141,7 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
     Object streamStatsStr;
     try {
       inputStream = fs.open(dest);
+      skipIfCannotUnbuffer(inputStream);
       streamStatsStr = demandStringifyIOStatisticsSource(inputStream);
 
       LOG.info("initial stream statistics {}", streamStatsStr);
@@ -192,6 +196,12 @@ public class ITestS3AUnbuffer extends AbstractS3ATestBase {
     return ((S3AInputStream) inputStream.getWrappedStream()).isObjectStreamOpen();
   }
 
+  private void skipIfCannotUnbuffer(FSDataInputStream inputStream) {
+    if (!inputStream.hasCapability(StreamCapabilities.UNBUFFER)) {
+      skip("input stream does not support unbuffer");
+    }
+  }
+
   /**
    * Read the specified number of bytes from the given
    * {@link FSDataInputStream} and assert that

+ 0 - 5
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/read/TestS3InputStream.java

@@ -169,11 +169,6 @@ public class TestS3InputStream extends AbstractHadoopTestBase {
         EOFException.class,
         FSExceptionMessages.NEGATIVE_SEEK,
         () -> inputStream.seek(-1));
-
-    ExceptionAsserts.assertThrows(
-        EOFException.class,
-        FSExceptionMessages.CANNOT_SEEK_PAST_EOF,
-        () -> inputStream.seek(fileSize + 1));
   }
 
   @Test