Pārlūkot izejas kodu

HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead (#5103)

* HADOOP-18517. ABFS: Add fs.azure.enable.readahead option to disable readahead

Adds new config option to turn off readahead
* also allows it to be passed in through openFile(),
* extends ITestAbfsReadWriteAndSeek to use the option, including one
  replicated test...that shows that turning it off is slower.

Important: this does not address the critical data corruption issue
HADOOP-18521. ABFS ReadBufferManager buffer sharing across concurrent HTTP requests

What is does do is provide a way to completely bypass the ReadBufferManager.
To mitigate the problem, either fs.azure.enable.readahead needs to be set to false,
or set "fs.azure.readaheadqueue.depth" to 0 -this still goes near the (broken)
ReadBufferManager code, but does't trigger the bug.

For safe reading of files through the ABFS connector, readahead MUST be disabled
or the followup fix to HADOOP-18521 applied

Contributed by Steve Loughran
Steve Loughran 2 gadi atpakaļ
vecāks
revīzija
cd517eddea

+ 14 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java

@@ -302,6 +302,11 @@ public class AbfsConfiguration{
           DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
           DefaultValue = DEFAULT_ABFS_LATENCY_TRACK)
   private boolean trackLatency;
   private boolean trackLatency;
 
 
+  @BooleanConfigurationValidatorAnnotation(
+      ConfigurationKey = FS_AZURE_ENABLE_READAHEAD,
+      DefaultValue = DEFAULT_ENABLE_READAHEAD)
+  private boolean enabledReadAhead;
+
   @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
   @LongConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS,
       MinValue = 0,
       MinValue = 0,
       DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
       DefaultValue = DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS)
@@ -915,6 +920,15 @@ public class AbfsConfiguration{
     }
     }
   }
   }
 
 
+  public boolean isReadAheadEnabled() {
+    return this.enabledReadAhead;
+  }
+
+  @VisibleForTesting
+  void setReadAheadEnabled(final boolean enabledReadAhead) {
+    this.enabledReadAhead = enabledReadAhead;
+  }
+
   public int getReadAheadRange() {
   public int getReadAheadRange() {
     return this.readAheadRange;
     return this.readAheadRange;
   }
   }

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -808,6 +808,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
             .withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
+            .isReadAheadEnabled(abfsConfiguration.isReadAheadEnabled())
             .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
             .withReadSmallFilesCompletely(abfsConfiguration.readSmallFilesCompletely())
             .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
             .withOptimizeFooterRead(abfsConfiguration.optimizeFooterRead())
             .withReadAheadRange(abfsConfiguration.getReadAheadRange())
             .withReadAheadRange(abfsConfiguration.getReadAheadRange())

+ 7 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -186,6 +186,13 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
   public static final String FS_AZURE_SKIP_SUPER_USER_REPLACEMENT = "fs.azure.identity.transformer.skip.superuser.replacement";
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
   public static final String AZURE_KEY_ACCOUNT_KEYPROVIDER = "fs.azure.account.keyprovider";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
   public static final String AZURE_KEY_ACCOUNT_SHELLKEYPROVIDER_SCRIPT = "fs.azure.shellkeyprovider.script";
+
+  /**
+   * Enable or disable readahead buffer in AbfsInputStream.
+   * Value: {@value}.
+   */
+  public static final String FS_AZURE_ENABLE_READAHEAD = "fs.azure.enable.readahead";
+
   /** Setting this true will make the driver use it's own RemoteIterator implementation */
   /** Setting this true will make the driver use it's own RemoteIterator implementation */
   public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
   public static final String FS_AZURE_ENABLE_ABFS_LIST_ITERATOR = "fs.azure.enable.abfslistiterator";
   /** Server side encryption key */
   /** Server side encryption key */

+ 1 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java

@@ -106,6 +106,7 @@ public final class FileSystemConfigurations {
   public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
   public static final boolean DEFAULT_ABFS_LATENCY_TRACK = false;
   public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
   public static final long DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS = 120;
 
 
+  public static final boolean DEFAULT_ENABLE_READAHEAD = true;
   public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
   public static final String DEFAULT_FS_AZURE_USER_AGENT_PREFIX = EMPTY_STRING;
   public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
   public static final String DEFAULT_VALUE_UNKNOWN = "UNKNOWN";
 
 

+ 6 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -137,7 +137,7 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.tolerateOobAppends = abfsInputStreamContext.isTolerateOobAppends();
     this.eTag = eTag;
     this.eTag = eTag;
     this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
     this.readAheadRange = abfsInputStreamContext.getReadAheadRange();
-    this.readAheadEnabled = true;
+    this.readAheadEnabled = abfsInputStreamContext.isReadAheadEnabled();
     this.alwaysReadBufferSize
     this.alwaysReadBufferSize
         = abfsInputStreamContext.shouldReadBufferSizeAlways();
         = abfsInputStreamContext.shouldReadBufferSizeAlways();
     this.bufferedPreadDisabled = abfsInputStreamContext
     this.bufferedPreadDisabled = abfsInputStreamContext
@@ -745,6 +745,11 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return buffer;
     return buffer;
   }
   }
 
 
+  @VisibleForTesting
+  public boolean isReadAheadEnabled() {
+    return readAheadEnabled;
+  }
+
   @VisibleForTesting
   @VisibleForTesting
   public int getReadAheadRange() {
   public int getReadAheadRange() {
     return readAheadRange;
     return readAheadRange;

+ 12 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

@@ -35,6 +35,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
 
   private boolean tolerateOobAppends;
   private boolean tolerateOobAppends;
 
 
+  private boolean isReadAheadEnabled = true;
+
   private boolean alwaysReadBufferSize;
   private boolean alwaysReadBufferSize;
 
 
   private int readAheadBlockSize;
   private int readAheadBlockSize;
@@ -72,6 +74,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return this;
     return this;
   }
   }
 
 
+  public AbfsInputStreamContext isReadAheadEnabled(
+          final boolean isReadAheadEnabled) {
+    this.isReadAheadEnabled = isReadAheadEnabled;
+    return this;
+  }
+
   public AbfsInputStreamContext withReadAheadRange(
   public AbfsInputStreamContext withReadAheadRange(
           final int readAheadRange) {
           final int readAheadRange) {
     this.readAheadRange = readAheadRange;
     this.readAheadRange = readAheadRange;
@@ -141,6 +149,10 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return tolerateOobAppends;
     return tolerateOobAppends;
   }
   }
 
 
+  public boolean isReadAheadEnabled() {
+    return isReadAheadEnabled;
+  }
+
   public int getReadAheadRange() {
   public int getReadAheadRange() {
     return readAheadRange;
     return readAheadRange;
   }
   }

+ 23 - 9
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAbfsReadWriteAndSeek.java

@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
 import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
-import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 import org.apache.hadoop.fs.statistics.IOStatisticsSource;
 
 
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_INFO;
@@ -40,6 +39,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.A
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MAX_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
 import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.MIN_BUFFER_SIZE;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
 
 
 /**
 /**
  * Test read, write and seek.
  * Test read, write and seek.
@@ -50,18 +50,27 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.M
 public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
 public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
   private static final String TEST_PATH = "/testfile";
   private static final String TEST_PATH = "/testfile";
 
 
-  @Parameterized.Parameters(name = "Size={0}")
+  /**
+   * Parameterize on read buffer size and readahead.
+   * For test performance, a full x*y test matrix is not used.
+   * @return the test parameters
+   */
+  @Parameterized.Parameters(name = "Size={0}-readahead={1}")
   public static Iterable<Object[]> sizes() {
   public static Iterable<Object[]> sizes() {
-    return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE},
-        {DEFAULT_READ_BUFFER_SIZE},
-        {APPENDBLOB_MAX_WRITE_BUFFER_SIZE},
-        {MAX_BUFFER_SIZE}});
+    return Arrays.asList(new Object[][]{{MIN_BUFFER_SIZE, true},
+        {DEFAULT_READ_BUFFER_SIZE, false},
+        {DEFAULT_READ_BUFFER_SIZE, true},
+        {APPENDBLOB_MAX_WRITE_BUFFER_SIZE, false},
+        {MAX_BUFFER_SIZE, true}});
   }
   }
 
 
   private final int size;
   private final int size;
+  private final boolean readaheadEnabled;
 
 
-  public ITestAbfsReadWriteAndSeek(final int size) throws Exception {
+  public ITestAbfsReadWriteAndSeek(final int size,
+      final boolean readaheadEnabled) throws Exception {
     this.size = size;
     this.size = size;
+    this.readaheadEnabled = readaheadEnabled;
   }
   }
 
 
   @Test
   @Test
@@ -74,6 +83,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
     final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
     final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
     abfsConfiguration.setWriteBufferSize(bufferSize);
     abfsConfiguration.setWriteBufferSize(bufferSize);
     abfsConfiguration.setReadBufferSize(bufferSize);
     abfsConfiguration.setReadBufferSize(bufferSize);
+    abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
 
 
     final byte[] b = new byte[2 * bufferSize];
     final byte[] b = new byte[2 * bufferSize];
     new Random().nextBytes(b);
     new Random().nextBytes(b);
@@ -85,7 +95,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
     } finally{
     } finally{
     stream.close();
     stream.close();
     }
     }
-    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
+    logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
 
 
     final byte[] readBuffer = new byte[2 * bufferSize];
     final byte[] readBuffer = new byte[2 * bufferSize];
     int result;
     int result;
@@ -109,7 +119,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
       inputStream.seek(0);
       inputStream.seek(0);
       result = inputStream.read(readBuffer, 0, bufferSize);
       result = inputStream.read(readBuffer, 0, bufferSize);
     }
     }
-    IOStatisticsLogging.logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
+    logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, statisticsSource);
 
 
     assertNotEquals("data read in final read()", -1, result);
     assertNotEquals("data read in final read()", -1, result);
     assertArrayEquals(readBuffer, b);
     assertArrayEquals(readBuffer, b);
@@ -121,6 +131,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
     final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
     final AbfsConfiguration abfsConfiguration = fs.getAbfsStore().getAbfsConfiguration();
     int bufferSize = MIN_BUFFER_SIZE;
     int bufferSize = MIN_BUFFER_SIZE;
     abfsConfiguration.setReadBufferSize(bufferSize);
     abfsConfiguration.setReadBufferSize(bufferSize);
+    abfsConfiguration.setReadAheadEnabled(readaheadEnabled);
 
 
     final byte[] b = new byte[bufferSize * 10];
     final byte[] b = new byte[bufferSize * 10];
     new Random().nextBytes(b);
     new Random().nextBytes(b);
@@ -132,8 +143,10 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
               ((AbfsOutputStream) stream.getWrappedStream())
               ((AbfsOutputStream) stream.getWrappedStream())
                   .getStreamID()));
                   .getStreamID()));
       stream.write(b);
       stream.write(b);
+      logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, stream);
     }
     }
 
 
+
     final byte[] readBuffer = new byte[4 * bufferSize];
     final byte[] readBuffer = new byte[4 * bufferSize];
     int result;
     int result;
     fs.registerListener(
     fs.registerListener(
@@ -146,6 +159,7 @@ public class ITestAbfsReadWriteAndSeek extends AbstractAbfsScaleTest {
               ((AbfsInputStream) inputStream.getWrappedStream())
               ((AbfsInputStream) inputStream.getWrappedStream())
                   .getStreamID()));
                   .getStreamID()));
       result = inputStream.read(readBuffer, 0, bufferSize*4);
       result = inputStream.read(readBuffer, 0, bufferSize*4);
+      logIOStatisticsAtLevel(LOG, IOSTATISTICS_LOGGING_LEVEL_INFO, inputStream);
     }
     }
     fs.registerListener(null);
     fs.registerListener(null);
   }
   }

+ 2 - 2
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/TestTracingContext.java

@@ -130,10 +130,10 @@ public class TestTracingContext extends AbstractAbfsIntegrationTest {
 
 
     testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus
     testClasses.put(new ITestAzureBlobFileSystemListStatus(), //liststatus
         ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath"));
         ITestAzureBlobFileSystemListStatus.class.getMethod("testListPath"));
-    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //open,
+    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, true), //open,
         // read, write
         // read, write
         ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID"));
         ITestAbfsReadWriteAndSeek.class.getMethod("testReadAheadRequestID"));
-    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE), //read (bypassreadahead)
+    testClasses.put(new ITestAbfsReadWriteAndSeek(MIN_BUFFER_SIZE, false), //read (bypassreadahead)
         ITestAbfsReadWriteAndSeek.class
         ITestAbfsReadWriteAndSeek.class
             .getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek"));
             .getMethod("testReadAndWriteWithDifferentBufferSizesAndSeek"));
     testClasses.put(new ITestAzureBlobFileSystemAppend(), //append
     testClasses.put(new ITestAzureBlobFileSystemAppend(), //append