Browse Source

HADOOP-17038 Support disabling buffered reads in ABFS positional reads. (#2646)

- Contributed by @anoopsjohn
Anoop Sam John 4 years ago
parent
commit
1bb4101b59

+ 27 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java

@@ -30,9 +30,12 @@ import java.nio.file.AccessDeniedException;
 import java.util.Hashtable;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.List;
 import java.util.ArrayList;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
 import java.util.EnumSet;
 import java.util.Map;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
@@ -73,6 +76,8 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderExcept
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
 import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager;
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
 import org.apache.hadoop.fs.azurebfs.services.AbfsCounters;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclEntry;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsAction;
@@ -82,6 +87,7 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.functional.RemoteIterators;
 import org.apache.hadoop.util.functional.RemoteIterators;
+import org.apache.hadoop.util.LambdaUtils;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.Progressable;
 
 
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
 import static org.apache.hadoop.fs.azurebfs.AbfsStatistic.*;
@@ -176,11 +182,18 @@ public class AzureBlobFileSystem extends FileSystem {
   @Override
   @Override
   public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
   public FSDataInputStream open(final Path path, final int bufferSize) throws IOException {
     LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
     LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize);
+    // bufferSize is unused.
+    return open(path, Optional.empty());
+  }
+
+  private FSDataInputStream open(final Path path,
+      final Optional<Configuration> options) throws IOException {
     statIncrement(CALL_OPEN);
     statIncrement(CALL_OPEN);
     Path qualifiedPath = makeQualified(path);
     Path qualifiedPath = makeQualified(path);
 
 
     try {
     try {
-      InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, statistics);
+      InputStream inputStream = abfsStore.openFileForRead(qualifiedPath,
+          options, statistics);
       return new FSDataInputStream(inputStream);
       return new FSDataInputStream(inputStream);
     } catch(AzureBlobFileSystemException ex) {
     } catch(AzureBlobFileSystemException ex) {
       checkException(path, ex);
       checkException(path, ex);
@@ -188,6 +201,19 @@ public class AzureBlobFileSystem extends FileSystem {
     }
     }
   }
   }
 
 
+  @Override
+  protected CompletableFuture<FSDataInputStream> openFileWithOptions(
+      final Path path, final OpenFileParameters parameters) throws IOException {
+    LOG.debug("AzureBlobFileSystem.openFileWithOptions path: {}", path);
+    AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
+        parameters.getMandatoryKeys(),
+        Collections.emptySet(),
+        "for " + path);
+    return LambdaUtils.eval(
+        new CompletableFuture<>(), () ->
+            open(path, Optional.of(parameters.getOptions())));
+  }
+
   @Override
   @Override
   public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize,
   public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize,
       final short replication, final long blockSize, final Progressable progress) throws IOException {
       final short replication, final long blockSize, final Progressable progress) throws IOException {

+ 18 - 3
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java

@@ -46,6 +46,7 @@ import java.util.Hashtable;
 import java.util.List;
 import java.util.List;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.Set;
 
 
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
@@ -125,6 +126,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PAT
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
 import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.TOKEN_VERSION;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_ABFS_ENDPOINT;
+import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
 import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_IDENTITY_TRANSFORM_CLASS;
 
 
 /**
 /**
@@ -606,7 +608,15 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
     }
     }
   }
   }
 
 
-  public AbfsInputStream openFileForRead(final Path path, final FileSystem.Statistics statistics)
+  public AbfsInputStream openFileForRead(final Path path,
+      final FileSystem.Statistics statistics)
+      throws AzureBlobFileSystemException {
+    return openFileForRead(path, Optional.empty(), statistics);
+  }
+
+  public AbfsInputStream openFileForRead(final Path path,
+      final Optional<Configuration> options,
+      final FileSystem.Statistics statistics)
       throws AzureBlobFileSystemException {
       throws AzureBlobFileSystemException {
     try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) {
     try (AbfsPerfInfo perfInfo = startTracking("openFileForRead", "getPathStatus")) {
       LOG.debug("openFileForRead filesystem: {} path: {}",
       LOG.debug("openFileForRead filesystem: {} path: {}",
@@ -635,12 +645,16 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
       // Add statistics for InputStream
       // Add statistics for InputStream
       return new AbfsInputStream(client, statistics,
       return new AbfsInputStream(client, statistics,
               relativePath, contentLength,
               relativePath, contentLength,
-              populateAbfsInputStreamContext(),
+              populateAbfsInputStreamContext(options),
               eTag);
               eTag);
     }
     }
   }
   }
 
 
-  private AbfsInputStreamContext populateAbfsInputStreamContext() {
+  private AbfsInputStreamContext populateAbfsInputStreamContext(
+      Optional<Configuration> options) {
+    boolean bufferedPreadDisabled = options
+        .map(c -> c.getBoolean(FS_AZURE_BUFFERED_PREAD_DISABLE, false))
+        .orElse(false);
     return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
     return new AbfsInputStreamContext(abfsConfiguration.getSasTokenRenewPeriodForStreamsInSeconds())
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadBufferSize(abfsConfiguration.getReadBufferSize())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
             .withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
@@ -651,6 +665,7 @@ public class AzureBlobFileSystemStore implements Closeable, ListingSupport {
             .withShouldReadBufferSizeAlways(
             .withShouldReadBufferSizeAlways(
                 abfsConfiguration.shouldReadBufferSizeAlways())
                 abfsConfiguration.shouldReadBufferSizeAlways())
             .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
             .withReadAheadBlockSize(abfsConfiguration.getReadAheadBlockSize())
+            .withBufferedPreadDisabled(bufferedPreadDisabled)
             .build();
             .build();
   }
   }
 
 

+ 8 - 1
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.azurebfs.constants;
 
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.FileSystem;
 
 
 /**
 /**
  * Responsible to keep all the Azure Blob File System configurations keys in Hadoop configuration file.
  * Responsible to keep all the Azure Blob File System configurations keys in Hadoop configuration file.
@@ -181,6 +182,12 @@ public final class ConfigurationKeys {
   public static final String FS_AZURE_LOCAL_USER_SP_MAPPING_FILE_PATH = "fs.azure.identity.transformer.local.service.principal.mapping.file.path";
   public static final String FS_AZURE_LOCAL_USER_SP_MAPPING_FILE_PATH = "fs.azure.identity.transformer.local.service.principal.mapping.file.path";
   /** Key for Local Group to Service Group file location. */
   /** Key for Local Group to Service Group file location. */
   public static final String FS_AZURE_LOCAL_GROUP_SG_MAPPING_FILE_PATH = "fs.azure.identity.transformer.local.service.group.mapping.file.path";
   public static final String FS_AZURE_LOCAL_GROUP_SG_MAPPING_FILE_PATH = "fs.azure.identity.transformer.local.service.group.mapping.file.path";
-
+  /**
+   * Optional config to enable a lock free pread which will bypass buffer in AbfsInputStream.
+   * This is not a config which can be set at cluster level. It can be used as
+   * an option on FutureDataInputStreamBuilder.
+   * @see FileSystem#openFile(org.apache.hadoop.fs.Path)
+   */
+  public static final String FS_AZURE_BUFFERED_PREAD_DISABLE = "fs.azure.buffered.pread.disable";
   private ConfigurationKeys() {}
   private ConfigurationKeys() {}
 }
 }

+ 45 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

@@ -70,6 +70,14 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
   private final boolean tolerateOobAppends; // whether tolerate Oob Appends
   private final boolean tolerateOobAppends; // whether tolerate Oob Appends
   private final boolean readAheadEnabled; // whether enable readAhead;
   private final boolean readAheadEnabled; // whether enable readAhead;
   private final boolean alwaysReadBufferSize;
   private final boolean alwaysReadBufferSize;
+  /*
+   * By default the pread API will do a seek + read as in FSInputStream.
+   * The read data will be kept in a buffer. When bufferedPreadDisabled is true,
+   * the pread API will read only the specified amount of data from the given
+   * offset and the buffer will not come into use at all.
+   * @see #read(long, byte[], int, int)
+   */
+  private final boolean bufferedPreadDisabled;
 
 
   private boolean firstRead = true;
   private boolean firstRead = true;
   // SAS tokens can be re-used until they expire
   // SAS tokens can be re-used until they expire
@@ -117,6 +125,8 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     this.readAheadEnabled = true;
     this.readAheadEnabled = true;
     this.alwaysReadBufferSize
     this.alwaysReadBufferSize
         = abfsInputStreamContext.shouldReadBufferSizeAlways();
         = abfsInputStreamContext.shouldReadBufferSizeAlways();
+    this.bufferedPreadDisabled = abfsInputStreamContext
+        .isBufferedPreadDisabled();
     this.cachedSasToken = new CachedSASToken(
     this.cachedSasToken = new CachedSASToken(
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
         abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
     this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
@@ -135,6 +145,41 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
     return path;
     return path;
   }
   }
 
 
+  @Override
+  public int read(long position, byte[] buffer, int offset, int length)
+      throws IOException {
+    // When bufferedPreadDisabled = true, this API does not use any shared buffer,
+    // cursor position etc. So this is implemented as NOT synchronized. HBase
+    // kind of random reads on a shared file input stream will greatly get
+    // benefited by such implementation.
+    // Strict close check at the begin of the API only not for the entire flow.
+    synchronized (this) {
+      if (closed) {
+        throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
+      }
+    }
+    LOG.debug("pread requested offset = {} len = {} bufferedPreadDisabled = {}",
+        offset, length, bufferedPreadDisabled);
+    if (!bufferedPreadDisabled) {
+      return super.read(position, buffer, offset, length);
+    }
+    validatePositionedReadArgs(position, buffer, offset, length);
+    if (length == 0) {
+      return 0;
+    }
+    if (streamStatistics != null) {
+      streamStatistics.readOperationStarted();
+    }
+    int bytesRead = readRemote(position, buffer, offset, length);
+    if (statistics != null) {
+      statistics.incrementBytesRead(bytesRead);
+    }
+    if (streamStatistics != null) {
+      streamStatistics.bytesRead(bytesRead);
+    }
+    return bytesRead;
+  }
+
   @Override
   @Override
   public int read() throws IOException {
   public int read() throws IOException {
     byte[] b = new byte[1];
     byte[] b = new byte[1];

+ 11 - 0
hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java

@@ -44,6 +44,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
 
 
   private boolean optimizeFooterRead;
   private boolean optimizeFooterRead;
 
 
+  private boolean bufferedPreadDisabled;
+
   public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
   public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
     super(sasTokenRenewPeriodForStreamsInSeconds);
     super(sasTokenRenewPeriodForStreamsInSeconds);
   }
   }
@@ -97,6 +99,12 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return this;
     return this;
   }
   }
 
 
+  public AbfsInputStreamContext withBufferedPreadDisabled(
+      final boolean bufferedPreadDisabled) {
+    this.bufferedPreadDisabled = bufferedPreadDisabled;
+    return this;
+  }
+
   public AbfsInputStreamContext build() {
   public AbfsInputStreamContext build() {
     if (readBufferSize > readAheadBlockSize) {
     if (readBufferSize > readAheadBlockSize) {
       LOG.debug(
       LOG.debug(
@@ -142,4 +150,7 @@ public class AbfsInputStreamContext extends AbfsStreamContext {
     return readAheadBlockSize;
     return readAheadBlockSize;
   }
   }
 
 
+  public boolean isBufferedPreadDisabled() {
+    return bufferedPreadDisabled;
+  }
 }
 }

+ 10 - 0
hadoop-tools/hadoop-azure/src/site/markdown/abfs.md

@@ -812,6 +812,16 @@ aheads. Specify the value in bytes. The value should be between 16384 to
 104857600 both inclusive (16 KB to 100 MB). The default value will be
 104857600 both inclusive (16 KB to 100 MB). The default value will be
 4194304 (4 MB).
 4194304 (4 MB).
 
 
+`fs.azure.buffered.pread.disable`: By default the positional read API will do a
+seek and read on input stream. This read will fill the buffer cache in
+AbfsInputStream and update the cursor positions. If this optimization is true
+it will skip usage of buffer and do a lock free REST call for reading from blob.
+This optimization is very much helpful for HBase kind of short random read over
+a shared AbfsInputStream instance.
+Note: This is not a config which can be set at cluster level. It can be used as
+an option on FutureDataInputStreamBuilder.
+See FileSystem#openFile(Path path)
+
 To run under limited memory situations configure the following. Especially
 To run under limited memory situations configure the following. Especially
 when there are too many writes from the same process. 
 when there are too many writes from the same process. 
 
 

+ 233 - 0
hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestAbfsPositionedRead.java

@@ -0,0 +1,233 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.azurebfs.services;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Rule;
+import org.junit.rules.TestName;
+import org.junit.Test;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
+import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.StreamStatisticNames;
+import org.assertj.core.api.Assertions;
+
+public class ITestAbfsPositionedRead extends AbstractAbfsIntegrationTest {
+
+  private static final int TEST_FILE_DATA_SIZE = 100;
+
+  @Rule
+  public TestName methodName = new TestName();
+
+  public ITestAbfsPositionedRead() throws Exception {
+  }
+
+  @Test
+  public void testPositionedRead() throws IOException {
+    describe("Testing positioned reads in AbfsInputStream");
+    Path dest = path(methodName.getMethodName());
+
+    byte[] data = ContractTestUtils.dataset(TEST_FILE_DATA_SIZE, 'a', 'z');
+    ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
+        TEST_FILE_DATA_SIZE, true);
+    int bytesToRead = 10;
+    try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
+      assertTrue(
+          "unexpected stream type "
+              + inputStream.getWrappedStream().getClass().getSimpleName(),
+          inputStream.getWrappedStream() instanceof AbfsInputStream);
+      byte[] readBuffer = new byte[bytesToRead];
+      int readPos = 0;
+      Assertions
+          .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead))
+          .describedAs(
+              "AbfsInputStream pread did not read the correct number of bytes")
+          .isEqualTo(bytesToRead);
+      Assertions.assertThat(readBuffer)
+          .describedAs("AbfsInputStream pread did not read correct data")
+          .containsExactly(
+              Arrays.copyOfRange(data, readPos, readPos + bytesToRead));
+      // Read only 10 bytes from offset 0. But by default it will do the seek
+      // and read where the entire 100 bytes get read into the
+      // AbfsInputStream buffer.
+      Assertions
+          .assertThat(Arrays.copyOfRange(
+              ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0,
+              TEST_FILE_DATA_SIZE))
+          .describedAs(
+              "AbfsInputStream pread did not read more data into its buffer")
+          .containsExactly(data);
+      // Check statistics
+      assertStatistics(inputStream.getIOStatistics(), bytesToRead, 1, 1,
+          TEST_FILE_DATA_SIZE);
+
+      readPos = 50;
+      Assertions
+          .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead))
+          .describedAs(
+              "AbfsInputStream pread did not read the correct number of bytes")
+          .isEqualTo(bytesToRead);
+      Assertions.assertThat(readBuffer)
+          .describedAs("AbfsInputStream pread did not read correct data")
+          .containsExactly(
+              Arrays.copyOfRange(data, readPos, readPos + bytesToRead));
+      // Check statistics
+      assertStatistics(inputStream.getIOStatistics(), 2 * bytesToRead, 2, 1,
+          TEST_FILE_DATA_SIZE);
+      // Did positioned read from pos 0 and then 50 but the stream pos should
+      // remain at 0.
+      Assertions.assertThat(inputStream.getPos())
+          .describedAs("AbfsInputStream positioned reads moved stream position")
+          .isEqualTo(0);
+    }
+  }
+
+  private void assertStatistics(IOStatistics ioStatistics,
+      long expectedBytesRead, long expectedReadOps, long expectedRemoteReadOps,
+      long expectedRemoteReadBytes) {
+    Assertions
+        .assertThat(ioStatistics.counters()
+            .get(StreamStatisticNames.STREAM_READ_BYTES).longValue())
+        .describedAs("Mismatch in bytesRead statistics")
+        .isEqualTo(expectedBytesRead);
+    Assertions
+        .assertThat(ioStatistics.counters()
+            .get(StreamStatisticNames.STREAM_READ_OPERATIONS).longValue())
+        .describedAs("Mismatch in readOps statistics")
+        .isEqualTo(expectedReadOps);
+    Assertions
+        .assertThat(ioStatistics.counters()
+            .get(StreamStatisticNames.REMOTE_READ_OP).longValue())
+        .describedAs("Mismatch in remoteReadOps statistics")
+        .isEqualTo(expectedRemoteReadOps);
+    Assertions
+        .assertThat(ioStatistics.counters()
+            .get(StreamStatisticNames.REMOTE_BYTES_READ).longValue())
+        .describedAs("Mismatch in remoteReadBytes statistics")
+        .isEqualTo(expectedRemoteReadBytes);
+  }
+
+  @Test
+  public void testPositionedReadWithBufferedReadDisabled() throws IOException {
+    describe("Testing positioned reads in AbfsInputStream with BufferedReadDisabled");
+    Path dest = path(methodName.getMethodName());
+    byte[] data = ContractTestUtils.dataset(TEST_FILE_DATA_SIZE, 'a', 'z');
+    ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
+        TEST_FILE_DATA_SIZE, true);
+    FutureDataInputStreamBuilder builder = getFileSystem().openFile(dest);
+    builder.opt(ConfigurationKeys.FS_AZURE_BUFFERED_PREAD_DISABLE, true);
+    FSDataInputStream inputStream = null;
+    try {
+      inputStream = builder.build().get();
+    } catch (IllegalArgumentException | UnsupportedOperationException
+        | InterruptedException | ExecutionException e) {
+      throw new IOException(
+          "Exception opening " + dest + " with FutureDataInputStreamBuilder",
+          e);
+    }
+    assertNotNull("Null InputStream over " + dest, inputStream);
+    int bytesToRead = 10;
+    try {
+      AbfsInputStream abfsIs = (AbfsInputStream) inputStream.getWrappedStream();
+      byte[] readBuffer = new byte[bytesToRead];
+      int readPos = 10;
+      Assertions
+          .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead))
+          .describedAs(
+              "AbfsInputStream pread did not read the correct number of bytes")
+          .isEqualTo(bytesToRead);
+      Assertions.assertThat(readBuffer)
+          .describedAs("AbfsInputStream pread did not read correct data")
+          .containsExactly(
+              Arrays.copyOfRange(data, readPos, readPos + bytesToRead));
+      // Read only 10 bytes from offset 10. This time, as buffered pread is
+      // disabled, it will only read the exact bytes as requested and no data
+      // will get read into the AbfsInputStream#buffer. Infact the buffer won't
+      // even get initialized.
+      assertNull("AbfsInputStream pread caused the internal buffer creation",
+          abfsIs.getBuffer());
+      // Check statistics
+      assertStatistics(inputStream.getIOStatistics(), bytesToRead, 1, 1,
+          bytesToRead);
+      readPos = 40;
+      Assertions
+          .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead))
+          .describedAs(
+              "AbfsInputStream pread did not read the correct number of bytes")
+          .isEqualTo(bytesToRead);
+      Assertions.assertThat(readBuffer)
+          .describedAs("AbfsInputStream pread did not read correct data")
+          .containsExactly(
+              Arrays.copyOfRange(data, readPos, readPos + bytesToRead));
+      assertStatistics(inputStream.getIOStatistics(), 2 * bytesToRead, 2, 2,
+          2 * bytesToRead);
+      // Now make a seek and read so that internal buffer gets created
+      inputStream.seek(0);
+      Assertions.assertThat(inputStream.read(readBuffer)).describedAs(
+          "AbfsInputStream seek+read did not read the correct number of bytes")
+          .isEqualTo(bytesToRead);
+      // This read would have fetched all 100 bytes into internal buffer.
+      Assertions
+          .assertThat(Arrays.copyOfRange(
+              ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0,
+              TEST_FILE_DATA_SIZE))
+          .describedAs(
+              "AbfsInputStream seek+read did not read more data into its buffer")
+          .containsExactly(data);
+      assertStatistics(inputStream.getIOStatistics(), 3 * bytesToRead, 3, 3,
+          TEST_FILE_DATA_SIZE + 2 * bytesToRead);
+      resetBuffer(abfsIs.getBuffer());
+      // Now again do pos read and make sure not any extra data being fetched.
+      readPos = 0;
+      Assertions
+          .assertThat(inputStream.read(readPos, readBuffer, 0, bytesToRead))
+          .describedAs(
+              "AbfsInputStream pread did not read the correct number of bytes")
+          .isEqualTo(bytesToRead);
+      Assertions.assertThat(readBuffer)
+          .describedAs("AbfsInputStream pread did not read correct data")
+          .containsExactly(
+              Arrays.copyOfRange(data, readPos, readPos + bytesToRead));
+      Assertions
+          .assertThat(Arrays.copyOfRange(
+              ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer(), 0,
+              TEST_FILE_DATA_SIZE))
+          .describedAs(
+              "AbfsInputStream pread read more data into its buffer than expected")
+          .doesNotContain(data);
+      assertStatistics(inputStream.getIOStatistics(), 4 * bytesToRead, 4, 4,
+          TEST_FILE_DATA_SIZE + 3 * bytesToRead);
+    } finally {
+      inputStream.close();
+    }
+  }
+
+  private void resetBuffer(byte[] buf) {
+    for (int i = 0; i < buf.length; i++) {
+      buf[i] = (byte) 0;
+    }
+  }
+}