瀏覽代碼

HADOOP-18291. S3A prefetch - Implement thread-safe LRU cache for SingleFilePerBlockCache (#5754)

Contributed by Viraj Jasani
Viraj Jasani 1 年之前
父節點
當前提交
e7d74f3d59

+ 7 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

@@ -110,6 +110,7 @@ public abstract class CachingBlockManager extends BlockManager {
    * @param prefetchingStatistics statistics for this stream.
    * @param conf the configuration.
    * @param localDirAllocator the local dir allocator instance.
+   * @param maxBlocksCount max blocks count to be kept in cache at any time.
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
   public CachingBlockManager(
@@ -118,7 +119,8 @@ public abstract class CachingBlockManager extends BlockManager {
       int bufferPoolSize,
       PrefetchingStatistics prefetchingStatistics,
       Configuration conf,
-      LocalDirAllocator localDirAllocator) {
+      LocalDirAllocator localDirAllocator,
+      int maxBlocksCount) {
     super(blockData);
 
     Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -129,16 +131,16 @@ public abstract class CachingBlockManager extends BlockManager {
     this.numReadErrors = new AtomicInteger();
     this.cachingDisabled = new AtomicBoolean();
     this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
+    this.conf = requireNonNull(conf);
 
     if (this.getBlockData().getFileSize() > 0) {
       this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
           this.prefetchingStatistics);
-      this.cache = this.createCache();
+      this.cache = this.createCache(maxBlocksCount);
     }
 
     this.ops = new BlockOperations();
     this.ops.setDebug(false);
-    this.conf = requireNonNull(conf);
     this.localDirAllocator = localDirAllocator;
   }
 
@@ -557,8 +559,8 @@ public abstract class CachingBlockManager extends BlockManager {
     }
   }
 
-  protected BlockCache createCache() {
-    return new SingleFilePerBlockCache(prefetchingStatistics);
+  protected BlockCache createCache(int maxBlocksCount) {
+    return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount);
   }
 
   protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {

+ 44 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchConstants.java

@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.impl.prefetch;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Constants used by prefetch implementations.
+ */
+public final class PrefetchConstants {
+
+  private PrefetchConstants() {
+  }
+
+  /**
+   * Timeout to be used by close, while acquiring prefetch block write lock.
+   * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT}
+   */
+  static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
+
+  /**
+   * Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
+   * Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT_UNIT}
+   */
+  static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
+
+}

+ 189 - 32
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

@@ -47,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.util.Preconditions;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
@@ -61,27 +62,42 @@ public class SingleFilePerBlockCache implements BlockCache {
   /**
    * Blocks stored in this cache.
    */
-  private final Map<Integer, Entry> blocks = new ConcurrentHashMap<>();
+  private final Map<Integer, Entry> blocks;
 
   /**
-   * Number of times a block was read from this cache.
-   * Used for determining cache utilization factor.
+   * Total max blocks count, to be considered as baseline for LRU cache eviction.
    */
-  private int numGets = 0;
+  private final int maxBlocksCount;
 
-  private final AtomicBoolean closed;
+  /**
+   * The lock to be shared by LRU based linked list updates.
+   */
+  private final ReentrantReadWriteLock blocksLock;
 
-  private final PrefetchingStatistics prefetchingStatistics;
+  /**
+   * Head of the linked list.
+   */
+  private Entry head;
+
+  /**
+   * Tail of the linked list.
+   */
+  private Entry tail;
 
   /**
-   * Timeout to be used by close, while acquiring prefetch block write lock.
+   * Total size of the linked list.
    */
-  private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
+  private int entryListSize;
 
   /**
-   * Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
+   * Number of times a block was read from this cache.
+   * Used for determining cache utilization factor.
    */
-  private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
+  private int numGets = 0;
+
+  private final AtomicBoolean closed;
+
+  private final PrefetchingStatistics prefetchingStatistics;
 
   /**
    * File attributes attached to any intermediate temporary file created during index creation.
@@ -103,6 +119,8 @@ public class SingleFilePerBlockCache implements BlockCache {
       READ,
       WRITE
     }
+    private Entry previous;
+    private Entry next;
 
     Entry(int blockNumber, Path path, int size, long checksum) {
       this.blockNumber = blockNumber;
@@ -110,6 +128,8 @@ public class SingleFilePerBlockCache implements BlockCache {
       this.size = size;
       this.checksum = checksum;
       this.lock = new ReentrantReadWriteLock();
+      this.previous = null;
+      this.next = null;
     }
 
     @Override
@@ -166,16 +186,37 @@ public class SingleFilePerBlockCache implements BlockCache {
       }
       return false;
     }
+
+    private Entry getPrevious() {
+      return previous;
+    }
+
+    private void setPrevious(Entry previous) {
+      this.previous = previous;
+    }
+
+    private Entry getNext() {
+      return next;
+    }
+
+    private void setNext(Entry next) {
+      this.next = next;
+    }
   }
 
   /**
    * Constructs an instance of a {@code SingleFilePerBlockCache}.
    *
    * @param prefetchingStatistics statistics for this stream.
+   * @param maxBlocksCount max blocks count to be kept in cache at any time.
    */
-  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
+  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, int maxBlocksCount) {
     this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
     this.closed = new AtomicBoolean(false);
+    this.maxBlocksCount = maxBlocksCount;
+    Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0");
+    blocks = new ConcurrentHashMap<>();
+    blocksLock = new ReentrantReadWriteLock();
   }
 
   /**
@@ -247,9 +288,60 @@ public class SingleFilePerBlockCache implements BlockCache {
       throw new IllegalStateException(String.format("block %d not found in cache", blockNumber));
     }
     numGets++;
+    addToLinkedListHead(entry);
     return entry;
   }
 
+  /**
+   * Helper method to add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListHead(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {
+      addToHeadOfLinkedList(entry);
+    } finally {
+      blocksLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Add the given entry to the head of the linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToHeadOfLinkedList(Entry entry) {
+    if (head == null) {
+      head = entry;
+      tail = entry;
+    }
+    LOG.debug(
+        "Block num {} to be added to the head. Current head block num: {} and tail block num: {}",
+        entry.blockNumber, head.blockNumber, tail.blockNumber);
+    if (entry != head) {
+      Entry prev = entry.getPrevious();
+      Entry nxt = entry.getNext();
+      // no-op if the block is already evicted
+      if (!blocks.containsKey(entry.blockNumber)) {
+        return;
+      }
+      if (prev != null) {
+        prev.setNext(nxt);
+      }
+      if (nxt != null) {
+        nxt.setPrevious(prev);
+      }
+      entry.setPrevious(null);
+      entry.setNext(head);
+      head.setPrevious(entry);
+      head = entry;
+      if (prev != null && prev.getNext() == null) {
+        tail = prev;
+      }
+    }
+  }
+
   /**
    * Puts the given block in this cache.
    *
@@ -278,6 +370,7 @@ public class SingleFilePerBlockCache implements BlockCache {
       } finally {
         entry.releaseLock(Entry.LockType.READ);
       }
+      addToLinkedListHead(entry);
       return;
     }
 
@@ -299,9 +392,65 @@ public class SingleFilePerBlockCache implements BlockCache {
     // Update stream_read_blocks_in_cache stats only after blocks map is updated with new file
     // entry to avoid any discrepancy related to the value of stream_read_blocks_in_cache.
     // If stream_read_blocks_in_cache is updated before updating the blocks map here, closing of
-    // the input stream can lead to the removal of the cache file even before blocks is added with
-    // the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
+    // the input stream can lead to the removal of the cache file even before blocks is added
+    // with the new cache file, leading to incorrect value of stream_read_blocks_in_cache.
     prefetchingStatistics.blockAddedToFileCache();
+    addToLinkedListAndEvictIfRequired(entry);
+  }
+
+  /**
+   * Add the given entry to the head of the linked list and if the LRU cache size
+   * exceeds the max limit, evict tail of the LRU linked list.
+   *
+   * @param entry Block entry to add.
+   */
+  private void addToLinkedListAndEvictIfRequired(Entry entry) {
+    blocksLock.writeLock().lock();
+    try {
+      addToHeadOfLinkedList(entry);
+      entryListSize++;
+      if (entryListSize > maxBlocksCount && !closed.get()) {
+        Entry elementToPurge = tail;
+        tail = tail.getPrevious();
+        if (tail == null) {
+          tail = head;
+        }
+        tail.setNext(null);
+        elementToPurge.setPrevious(null);
+        deleteBlockFileAndEvictCache(elementToPurge);
+      }
+    } finally {
+      blocksLock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Delete cache file as part of the block cache LRU eviction.
+   *
+   * @param elementToPurge Block entry to evict.
+   */
+  private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
+    boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
+        PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+        PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+    if (!lockAcquired) {
+      LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+              + " be acquired within {} {}", elementToPurge.path,
+          PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+          PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+    } else {
+      try {
+        if (Files.deleteIfExists(elementToPurge.path)) {
+          entryListSize--;
+          prefetchingStatistics.blockRemovedFromFileCache();
+          blocks.remove(elementToPurge.blockNumber);
+        }
+      } catch (IOException e) {
+        LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
+      } finally {
+        elementToPurge.releaseLock(Entry.LockType.WRITE);
+      }
+    }
   }
 
   private static final Set<? extends OpenOption> CREATE_OPTIONS =
@@ -337,30 +486,38 @@ public class SingleFilePerBlockCache implements BlockCache {
   public void close() throws IOException {
     if (closed.compareAndSet(false, true)) {
       LOG.debug(getStats());
-      int numFilesDeleted = 0;
-
-      for (Entry entry : blocks.values()) {
-        boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
-            PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-        if (!lockAcquired) {
-          LOG.error("Cache file {} deletion would not be attempted as write lock could not"
-                  + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
-              PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-          continue;
-        }
-        try {
-          Files.deleteIfExists(entry.path);
+      deleteCacheFiles();
+    }
+  }
+
+  /**
+   * Delete cache files as part of the close call.
+   */
+  private void deleteCacheFiles() {
+    int numFilesDeleted = 0;
+    for (Entry entry : blocks.values()) {
+      boolean lockAcquired =
+          entry.takeLock(Entry.LockType.WRITE, PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+              PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+      if (!lockAcquired) {
+        LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+                + " be acquired within {} {}", entry.path,
+            PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+            PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+        continue;
+      }
+      try {
+        if (Files.deleteIfExists(entry.path)) {
           prefetchingStatistics.blockRemovedFromFileCache();
           numFilesDeleted++;
-        } catch (IOException e) {
-          LOG.warn("Failed to delete cache file {}", entry.path, e);
-        } finally {
-          entry.releaseLock(Entry.LockType.WRITE);
         }
+      } catch (IOException e) {
+        LOG.warn("Failed to delete cache file {}", entry.path, e);
+      } finally {
+        entry.releaseLock(Entry.LockType.WRITE);
       }
-
-      LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
     }
+    LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
   }
 
   @Override

+ 3 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java

@@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
   public void testArgChecks() throws Exception {
     // Should not throw.
     BlockCache cache =
-        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
+        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
 
     ByteBuffer buffer = ByteBuffer.allocate(16);
 
@@ -55,7 +55,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
 
 
     intercept(NullPointerException.class, null,
-        () -> new SingleFilePerBlockCache(null));
+        () -> new SingleFilePerBlockCache(null, 2));
 
   }
 
@@ -63,7 +63,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
   @Test
   public void testPutAndGet() throws Exception {
     BlockCache cache =
-        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
+        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
 
     ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
     for (byte i = 0; i < BUFFER_SIZE; i++) {

+ 12 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -1276,4 +1276,16 @@ public final class Constants {
   public static final String STORE_CAPABILITY_DIRECTORY_MARKER_MULTIPART_UPLOAD_ENABLED =
       "fs.s3a.capability.multipart.uploads.enabled";
 
+  /**
+   * Prefetch max blocks count config.
+   * Value = {@value}
+   */
+  public static final String PREFETCH_MAX_BLOCKS_COUNT = "fs.s3a.prefetch.max.blocks.count";
+
+  /**
+   * Default value for max blocks count config.
+   * Value = {@value}
+   */
+  public static final int DEFAULT_PREFETCH_MAX_BLOCKS_COUNT = 4;
+
 }

+ 10 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java

@@ -33,6 +33,9 @@ import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.impl.prefetch.Validate;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+
 /**
  * Provides access to S3 file one block at a time.
  */
@@ -67,7 +70,13 @@ public class S3ACachingBlockManager extends CachingBlockManager {
       Configuration conf,
       LocalDirAllocator localDirAllocator) {
 
-    super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);
+    super(futurePool,
+        blockData,
+        bufferPoolSize,
+        streamStatistics,
+        conf,
+        localDirAllocator,
+        conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
 
     Validate.checkNotNull(reader, "reader");
 

+ 243 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java

@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.test.LambdaTestUtils;
+
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Test the prefetching input stream with LRU cache eviction on S3ACachingInputStream.
+ */
+@RunWith(Parameterized.class)
+public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest {
+
+  private final String maxBlocks;
+
+  @Parameterized.Parameters(name = "max-blocks-{0}")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(new Object[][]{
+        {"1"},
+        {"2"},
+        {"3"},
+        {"4"}
+    });
+  }
+
+  public ITestS3APrefetchingLruEviction(final String maxBlocks) {
+    super(true);
+    this.maxBlocks = maxBlocks;
+  }
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3APrefetchingLruEviction.class);
+
+  private static final int S_1K = 1024;
+  // Path for file which should have length > block size so S3ACachingInputStream is used
+  private Path largeFile;
+  private FileSystem largeFileFS;
+  private int blockSize;
+
+  private static final int TIMEOUT_MILLIS = 5000;
+  private static final int INTERVAL_MILLIS = 500;
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_ENABLED_KEY);
+    S3ATestUtils.removeBaseAndBucketOverrides(conf, PREFETCH_MAX_BLOCKS_COUNT);
+    conf.setBoolean(PREFETCH_ENABLED_KEY, true);
+    conf.setInt(PREFETCH_MAX_BLOCKS_COUNT, Integer.parseInt(maxBlocks));
+    return conf;
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    cleanupWithLogger(LOG, largeFileFS);
+    largeFileFS = null;
+  }
+
+  private void openFS() throws Exception {
+    Configuration conf = getConfiguration();
+    String largeFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+    largeFile = new Path(largeFileUri);
+    blockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
+    largeFileFS = new S3AFileSystem();
+    largeFileFS.initialize(new URI(largeFileUri), getConfiguration());
+  }
+
+  @Test
+  public void testSeeksWithLruEviction() throws Throwable {
+    IOStatistics ioStats;
+    openFS();
+
+    ExecutorService executorService = Executors.newFixedThreadPool(5,
+        new ThreadFactoryBuilder()
+            .setDaemon(true)
+            .setNameFormat("testSeeksWithLruEviction-%d")
+            .build());
+    CountDownLatch countDownLatch = new CountDownLatch(7);
+
+    try (FSDataInputStream in = largeFileFS.open(largeFile)) {
+      ioStats = in.getIOStatistics();
+      // tests to add multiple blocks in the prefetch cache
+      // and let LRU eviction take place as more cache entries
+      // are added with multiple block reads.
+
+      // Don't read block 0 completely
+      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+          in,
+          0,
+          blockSize - S_1K * 10));
+
+      // Seek to block 1 and don't read completely
+      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+          in,
+          blockSize,
+          2 * S_1K));
+
+      // Seek to block 2 and don't read completely
+      executorService.submit(() -> readFullyWithSeek(countDownLatch,
+          in,
+          blockSize * 2L,
+          2 * S_1K));
+
+      // Seek to block 3 and don't read completely
+      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+          in,
+          blockSize * 3L,
+          2 * S_1K));
+
+      // Seek to block 4 and don't read completely
+      executorService.submit(() -> readFullyWithSeek(countDownLatch,
+          in,
+          blockSize * 4L,
+          2 * S_1K));
+
+      // Seek to block 5 and don't read completely
+      executorService.submit(() -> readFullyWithPositionedRead(countDownLatch,
+          in,
+          blockSize * 5L,
+          2 * S_1K));
+
+      // backward seek, can't use block 0 as it is evicted
+      executorService.submit(() -> readFullyWithSeek(countDownLatch,
+          in,
+          S_1K * 5,
+          2 * S_1K));
+
+      countDownLatch.await();
+
+      // expect 3 blocks as rest are to be evicted by LRU
+      LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
+        LOG.info("IO stats: {}", ioStats);
+        verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE,
+            Integer.parseInt(maxBlocks));
+      });
+      // let LRU evictions settle down, if any
+      Thread.sleep(TIMEOUT_MILLIS);
+    } finally {
+      executorService.shutdownNow();
+      executorService.awaitTermination(5, TimeUnit.SECONDS);
+    }
+    LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
+      LOG.info("IO stats: {}", ioStats);
+      verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
+    });
+  }
+
+  /**
+   * Read the bytes from the given position in the stream to a new buffer using the positioned
+   * readable.
+   *
+   * @param countDownLatch count down latch to mark the operation completed.
+   * @param in input stream.
+   * @param position position in the given input stream to seek from.
+   * @param len the number of bytes to read.
+   * @return true if the read operation is successful.
+   */
+  private boolean readFullyWithPositionedRead(CountDownLatch countDownLatch, FSDataInputStream in,
+      long position, int len) {
+    byte[] buffer = new byte[blockSize];
+    // Don't read block 0 completely
+    try {
+      in.readFully(position, buffer, 0, len);
+      countDownLatch.countDown();
+      return true;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  /**
+   * Read the bytes from the given position in the stream to a new buffer using seek followed by
+   * input stream read.
+   *
+   * @param countDownLatch count down latch to mark the operation completed.
+   * @param in input stream.
+   * @param position position in the given input stream to seek from.
+   * @param len the number of bytes to read.
+   * @return true if the read operation is successful.
+   */
+  private boolean readFullyWithSeek(CountDownLatch countDownLatch, FSDataInputStream in,
+      long position, int len) {
+    byte[] buffer = new byte[blockSize];
+    // Don't read block 0 completely
+    try {
+      in.seek(position);
+      in.readFully(buffer, 0, len);
+      countDownLatch.countDown();
+      return true;
+    } catch (IOException e) {
+      throw new UncheckedIOException(e);
+    }
+  }
+
+}

+ 4 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.fs.impl.prefetch.BlockData;
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache;
 import org.apache.hadoop.fs.impl.prefetch.Validate;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.Invoker;
 import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
 import org.apache.hadoop.fs.s3a.S3AFileStatus;
@@ -314,7 +315,8 @@ public final class S3APrefetchFakes {
     private final int writeDelay;
 
     public FakeS3FilePerBlockCache(int readDelay, int writeDelay) {
-      super(new EmptyS3AStatisticsContext().newInputStreamStatistics());
+      super(new EmptyS3AStatisticsContext().newInputStreamStatistics(),
+          Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT);
       this.files = new ConcurrentHashMap<>();
       this.readDelay = readDelay;
       this.writeDelay = writeDelay;
@@ -387,7 +389,7 @@ public final class S3APrefetchFakes {
     }
 
     @Override
-    protected BlockCache createCache() {
+    protected BlockCache createCache(int maxBlocksCount) {
       final int readDelayMs = 50;
       final int writeDelayMs = 200;
       return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs);

+ 16 - 4
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java

@@ -37,7 +37,9 @@ import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
 import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
 import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.assertEquals;
 
@@ -173,6 +175,10 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
         super.cachePut(blockNumber, buffer);
       }
     }
+
+    public Configuration getConf() {
+      return CONF;
+    }
   }
 
   // @Ignore
@@ -285,8 +291,11 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
       blockManager.requestCaching(data);
     }
 
-    waitForCaching(blockManager, blockData.getNumBlocks());
-    assertEquals(blockData.getNumBlocks(), blockManager.numCached());
+    waitForCaching(blockManager, Math.min(blockData.getNumBlocks(),
+        conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)));
+    assertEquals(Math.min(blockData.getNumBlocks(),
+            conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)),
+        blockManager.numCached());
     assertEquals(0, this.totalErrors(blockManager));
   }
 
@@ -330,8 +339,11 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
       }
 
       blockManager.requestCaching(data);
-      waitForCaching(blockManager, expectedNumSuccesses);
-      assertEquals(expectedNumSuccesses, blockManager.numCached());
+      waitForCaching(blockManager, Math.min(expectedNumSuccesses, blockManager.getConf()
+          .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)));
+      assertEquals(Math.min(expectedNumSuccesses, blockManager.getConf()
+              .getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT)),
+          blockManager.numCached());
 
       if (forceCachingFailure) {
         assertEquals(expectedNumErrors, this.totalErrors(blockManager));