Browse Source

HADOOP-18959. Use builder for prefetch CachingBlockManager. (#6240) Contributed by Viraj Jasani

Viraj Jasani 1 năm trước cách đây
mục cha
commit
f1e4376626

+ 227 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockManagerParameters.java

@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.fs.impl.prefetch;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
+
+/**
+ * This class is used to provide parameters to {@link BlockManager}.
+ */
+@InterfaceAudience.Private
+public final class BlockManagerParameters {
+
+  /**
+   * Asynchronous tasks are performed in this pool.
+   */
+  private ExecutorServiceFuturePool futurePool;
+
+  /**
+   * Information about each block of the underlying file.
+   */
+  private BlockData blockData;
+
+  /**
+   * Size of the in-memory cache in terms of number of blocks.
+   */
+  private int bufferPoolSize;
+
+  /**
+   * Statistics for the stream.
+   */
+  private PrefetchingStatistics prefetchingStatistics;
+
+  /**
+   * The configuration object.
+   */
+  private Configuration conf;
+
+  /**
+   * The local dir allocator instance.
+   */
+  private LocalDirAllocator localDirAllocator;
+
+  /**
+   * Max blocks count to be kept in cache at any time.
+   */
+  private int maxBlocksCount;
+
+  /**
+   * Tracker with statistics to update.
+   */
+  private DurationTrackerFactory trackerFactory;
+
+  /**
+   * @return The Executor future pool to perform async prefetch tasks.
+   */
+  public ExecutorServiceFuturePool getFuturePool() {
+    return futurePool;
+  }
+
+  /**
+   * @return The object holding blocks data info for the underlying file.
+   */
+  public BlockData getBlockData() {
+    return blockData;
+  }
+
+  /**
+   * @return The size of the in-memory cache.
+   */
+  public int getBufferPoolSize() {
+    return bufferPoolSize;
+  }
+
+  /**
+   * @return The prefetching statistics for the stream.
+   */
+  public PrefetchingStatistics getPrefetchingStatistics() {
+    return prefetchingStatistics;
+  }
+
+  /**
+   * @return The configuration object.
+   */
+  public Configuration getConf() {
+    return conf;
+  }
+
+  /**
+   * @return The local dir allocator instance.
+   */
+  public LocalDirAllocator getLocalDirAllocator() {
+    return localDirAllocator;
+  }
+
+  /**
+   * @return The max blocks count to be kept in cache at any time.
+   */
+  public int getMaxBlocksCount() {
+    return maxBlocksCount;
+  }
+
+  /**
+   * @return The duration tracker with statistics to update.
+   */
+  public DurationTrackerFactory getTrackerFactory() {
+    return trackerFactory;
+  }
+
+  /**
+   * Sets the executor service future pool that is later used to perform
+   * async prefetch tasks.
+   *
+   * @param pool The future pool.
+   * @return The builder.
+   */
+  public BlockManagerParameters withFuturePool(
+      final ExecutorServiceFuturePool pool) {
+    this.futurePool = pool;
+    return this;
+  }
+
+  /**
+   * Sets the object holding blocks data info for the underlying file.
+   *
+   * @param data The block data object.
+   * @return The builder.
+   */
+  public BlockManagerParameters withBlockData(
+      final BlockData data) {
+    this.blockData = data;
+    return this;
+  }
+
+  /**
+   * Sets the in-memory cache size as number of blocks.
+   *
+   * @param poolSize The buffer pool size as number of blocks.
+   * @return The builder.
+   */
+  public BlockManagerParameters withBufferPoolSize(
+      final int poolSize) {
+    this.bufferPoolSize = poolSize;
+    return this;
+  }
+
+  /**
+   * Sets the prefetching statistics for the stream.
+   *
+   * @param statistics The prefetching statistics.
+   * @return The builder.
+   */
+  public BlockManagerParameters withPrefetchingStatistics(
+      final PrefetchingStatistics statistics) {
+    this.prefetchingStatistics = statistics;
+    return this;
+  }
+
+  /**
+   * Sets the configuration object.
+   *
+   * @param configuration The configuration object.
+   * @return The builder.
+   */
+  public BlockManagerParameters withConf(
+      final Configuration configuration) {
+    this.conf = configuration;
+    return this;
+  }
+
+  /**
+   * Sets the local dir allocator for round-robin disk allocation
+   * while creating files.
+   *
+   * @param dirAllocator The local dir allocator object.
+   * @return The builder.
+   */
+  public BlockManagerParameters withLocalDirAllocator(
+      final LocalDirAllocator dirAllocator) {
+    this.localDirAllocator = dirAllocator;
+    return this;
+  }
+
+  /**
+   * Sets the max blocks count to be kept in cache at any time.
+   *
+   * @param blocksCount The max blocks count.
+   * @return The builder.
+   */
+  public BlockManagerParameters withMaxBlocksCount(
+      final int blocksCount) {
+    this.maxBlocksCount = blocksCount;
+    return this;
+  }
+
+  /**
+   * Sets the duration tracker with statistics to update.
+   *
+   * @param factory The tracker factory object.
+   * @return The builder.
+   */
+  public BlockManagerParameters withTrackerFactory(
+      final DurationTrackerFactory factory) {
+    this.trackerFactory = factory;
+    return this;
+  }
+
+}

+ 16 - 28
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

@@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
+import javax.annotation.Nonnull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -105,47 +107,33 @@ public abstract class CachingBlockManager extends BlockManager {
   /**
    * Constructs an instance of a {@code CachingBlockManager}.
    *
-   * @param futurePool asynchronous tasks are performed in this pool.
-   * @param blockData information about each block of the underlying file.
-   * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
-   * @param prefetchingStatistics statistics for this stream.
-   * @param conf the configuration.
-   * @param localDirAllocator the local dir allocator instance.
-   * @param maxBlocksCount max blocks count to be kept in cache at any time.
-   * @param trackerFactory tracker with statistics to update.
+   * @param blockManagerParameters params for block manager.
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
-  @SuppressWarnings("checkstyle:parameternumber")
-  public CachingBlockManager(
-      ExecutorServiceFuturePool futurePool,
-      BlockData blockData,
-      int bufferPoolSize,
-      PrefetchingStatistics prefetchingStatistics,
-      Configuration conf,
-      LocalDirAllocator localDirAllocator,
-      int maxBlocksCount,
-      DurationTrackerFactory trackerFactory) {
-    super(blockData);
-
-    Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
-
-    this.futurePool = requireNonNull(futurePool);
-    this.bufferPoolSize = bufferPoolSize;
+  public CachingBlockManager(@Nonnull final BlockManagerParameters blockManagerParameters) {
+    super(blockManagerParameters.getBlockData());
+
+    Validate.checkPositiveInteger(blockManagerParameters.getBufferPoolSize(), "bufferPoolSize");
+
+    this.futurePool = requireNonNull(blockManagerParameters.getFuturePool());
+    this.bufferPoolSize = blockManagerParameters.getBufferPoolSize();
     this.numCachingErrors = new AtomicInteger();
     this.numReadErrors = new AtomicInteger();
     this.cachingDisabled = new AtomicBoolean();
-    this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
-    this.conf = requireNonNull(conf);
+    this.prefetchingStatistics = requireNonNull(
+        blockManagerParameters.getPrefetchingStatistics());
+    this.conf = requireNonNull(blockManagerParameters.getConf());
 
     if (this.getBlockData().getFileSize() > 0) {
       this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
           this.prefetchingStatistics);
-      this.cache = this.createCache(maxBlocksCount, trackerFactory);
+      this.cache = this.createCache(blockManagerParameters.getMaxBlocksCount(),
+          blockManagerParameters.getTrackerFactory());
     }
 
     this.ops = new BlockOperations();
     this.ops.setDebug(false);
-    this.localDirAllocator = localDirAllocator;
+    this.localDirAllocator = blockManagerParameters.getLocalDirAllocator();
   }
 
   /**

+ 6 - 34
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java

@@ -22,28 +22,17 @@ package org.apache.hadoop.fs.s3a.prefetch;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.annotation.Nonnull;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.impl.prefetch.BlockData;
+import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters;
 import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
-import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.impl.prefetch.Validate;
-import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
-
-import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
-import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
 
 /**
  * Provides access to S3 file one block at a time.
  */
 public class S3ACachingBlockManager extends CachingBlockManager {
 
-  private static final Logger LOG = LoggerFactory.getLogger(
-      S3ACachingBlockManager.class);
-
   /**
    * Reader that reads from S3 file.
    */
@@ -52,32 +41,15 @@ public class S3ACachingBlockManager extends CachingBlockManager {
   /**
    * Constructs an instance of a {@code S3ACachingBlockManager}.
    *
-   * @param futurePool asynchronous tasks are performed in this pool.
+   * @param blockManagerParameters params for block manager.
    * @param reader reader that reads from S3 file.
-   * @param blockData information about each block of the S3 file.
-   * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
-   * @param streamStatistics statistics for this stream.
-   * @param conf the configuration.
-   * @param localDirAllocator the local dir allocator instance.
    * @throws IllegalArgumentException if reader is null.
    */
   public S3ACachingBlockManager(
-      ExecutorServiceFuturePool futurePool,
-      S3ARemoteObjectReader reader,
-      BlockData blockData,
-      int bufferPoolSize,
-      S3AInputStreamStatistics streamStatistics,
-      Configuration conf,
-      LocalDirAllocator localDirAllocator) {
+      @Nonnull final BlockManagerParameters blockManagerParameters,
+      final S3ARemoteObjectReader reader) {
 
-    super(futurePool,
-        blockData,
-        bufferPoolSize,
-        streamStatistics,
-        conf,
-        localDirAllocator,
-        conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT),
-        streamStatistics);
+    super(blockManagerParameters);
 
     Validate.checkNotNull(reader, "reader");
 

+ 21 - 22
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java

@@ -21,21 +21,24 @@ package org.apache.hadoop.fs.s3a.prefetch;
 
 import java.io.IOException;
 
+import javax.annotation.Nonnull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.impl.prefetch.BlockData;
 import org.apache.hadoop.fs.impl.prefetch.BlockManager;
+import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters;
 import org.apache.hadoop.fs.impl.prefetch.BufferData;
-import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.impl.prefetch.FilePosition;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 
+import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
 
@@ -80,13 +83,19 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
 
     this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
     int bufferPoolSize = this.numBlocksToPrefetch + 1;
-    this.blockManager = this.createBlockManager(
-        this.getContext().getFuturePool(),
-        this.getReader(),
-        this.getBlockData(),
-        bufferPoolSize,
-        conf,
-        localDirAllocator);
+    BlockManagerParameters blockManagerParamsBuilder =
+        new BlockManagerParameters()
+            .withFuturePool(this.getContext().getFuturePool())
+            .withBlockData(this.getBlockData())
+            .withBufferPoolSize(bufferPoolSize)
+            .withConf(conf)
+            .withLocalDirAllocator(localDirAllocator)
+            .withMaxBlocksCount(
+                conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT))
+            .withPrefetchingStatistics(getS3AStreamStatistics())
+            .withTrackerFactory(getS3AStreamStatistics());
+    this.blockManager = this.createBlockManager(blockManagerParamsBuilder,
+        this.getReader());
     int fileSize = (int) s3Attributes.getLen();
     LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
         fileSize);
@@ -180,18 +189,8 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
   }
 
   protected BlockManager createBlockManager(
-      ExecutorServiceFuturePool futurePool,
-      S3ARemoteObjectReader reader,
-      BlockData blockData,
-      int bufferPoolSize,
-      Configuration conf,
-      LocalDirAllocator localDirAllocator) {
-    return new S3ACachingBlockManager(futurePool,
-        reader,
-        blockData,
-        bufferPoolSize,
-        getS3AStreamStatistics(),
-        conf,
-        localDirAllocator);
+      @Nonnull final BlockManagerParameters blockManagerParameters,
+      final S3ARemoteObjectReader reader) {
+    return new S3ACachingBlockManager(blockManagerParameters, reader);
   }
 }

+ 1 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java

@@ -75,6 +75,7 @@ public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
   public Configuration createConfiguration() {
     Configuration configuration = super.createConfiguration();
     S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY);
+    S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_BLOCK_SIZE_KEY);
     configuration.setBoolean(PREFETCH_ENABLED_KEY, true);
     return configuration;
   }

+ 11 - 19
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java

@@ -31,6 +31,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 
+import javax.annotation.Nonnull;
+
 import software.amazon.awssdk.core.ResponseInputStream;
 import software.amazon.awssdk.http.AbortableInputStream;
 import software.amazon.awssdk.services.s3.model.GetObjectRequest;
@@ -40,7 +42,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.impl.prefetch.BlockCache;
-import org.apache.hadoop.fs.impl.prefetch.BlockData;
+import org.apache.hadoop.fs.impl.prefetch.BlockManager;
+import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters;
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.impl.prefetch.SingleFilePerBlockCache;
 import org.apache.hadoop.fs.impl.prefetch.Validate;
@@ -363,15 +366,9 @@ public final class S3APrefetchFakes {
       extends S3ACachingBlockManager {
 
     public FakeS3ACachingBlockManager(
-        ExecutorServiceFuturePool futurePool,
-        S3ARemoteObjectReader reader,
-        BlockData blockData,
-        int bufferPoolSize,
-        Configuration conf,
-        LocalDirAllocator localDirAllocator) {
-      super(futurePool, reader, blockData, bufferPoolSize,
-          new EmptyS3AStatisticsContext().newInputStreamStatistics(),
-          conf, localDirAllocator);
+        @Nonnull final BlockManagerParameters blockManagerParameters,
+        final S3ARemoteObjectReader reader) {
+      super(blockManagerParameters, reader);
     }
 
     @Override
@@ -409,15 +406,10 @@ public final class S3APrefetchFakes {
     }
 
     @Override
-    protected S3ACachingBlockManager createBlockManager(
-        ExecutorServiceFuturePool futurePool,
-        S3ARemoteObjectReader reader,
-        BlockData blockData,
-        int bufferPoolSize,
-        Configuration conf,
-        LocalDirAllocator localDirAllocator) {
-      return new FakeS3ACachingBlockManager(futurePool, reader, blockData,
-          bufferPoolSize, conf, localDirAllocator);
+    protected BlockManager createBlockManager(
+        @Nonnull final BlockManagerParameters blockManagerParameters,
+        final S3ARemoteObjectReader reader) {
+      return new FakeS3ACachingBlockManager(blockManagerParameters, reader);
     }
   }
 }

+ 147 - 54
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java

@@ -29,6 +29,7 @@ import org.junit.Test;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.impl.prefetch.BlockData;
+import org.apache.hadoop.fs.impl.prefetch.BlockManagerParameters;
 import org.apache.hadoop.fs.impl.prefetch.BufferData;
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
@@ -43,6 +44,9 @@ import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests to perform read from S3ACachingBlockManager.
+ */
 public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
 
   static final int FILE_SIZE = 15;
@@ -61,50 +65,126 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
 
   private final BlockData blockData = new BlockData(FILE_SIZE, BLOCK_SIZE);
 
+  private static final Configuration CONF =
+      S3ATestUtils.prepareTestConfiguration(new Configuration());
+
   @Test
-  public void testArgChecks() throws Exception {
+  public void testFuturePoolNull() throws Exception {
     MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
-    S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
+    Configuration conf = new Configuration();
+    try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) {
+      BlockManagerParameters blockManagerParams =
+          new BlockManagerParameters()
+              .withBlockData(blockData)
+              .withBufferPoolSize(POOL_SIZE)
+              .withPrefetchingStatistics(streamStatistics)
+              .withConf(conf);
+
+      intercept(NullPointerException.class,
+          () -> new S3ACachingBlockManager(blockManagerParams, reader));
+    }
+  }
 
+  @Test
+  public void testNullReader() throws Exception {
     Configuration conf = new Configuration();
-    // Should not throw.
-    S3ACachingBlockManager blockManager =
-        new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
-            streamStatistics, conf, null);
+    BlockManagerParameters blockManagerParams =
+        new BlockManagerParameters()
+            .withFuturePool(futurePool)
+            .withBlockData(blockData)
+            .withBufferPoolSize(POOL_SIZE)
+            .withPrefetchingStatistics(streamStatistics)
+            .withConf(conf)
+            .withMaxBlocksCount(
+                conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
+
+    intercept(IllegalArgumentException.class, "'reader' must not be null",
+        () -> new S3ACachingBlockManager(blockManagerParams, null));
+  }
 
-    // Verify it throws correctly.
-    intercept(
-        NullPointerException.class,
-        () -> new S3ACachingBlockManager(null, reader, blockData, POOL_SIZE,
-            streamStatistics, conf, null));
+  @Test
+  public void testNullBlockData() throws Exception {
+    MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
+    Configuration conf = new Configuration();
+    try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) {
+      BlockManagerParameters blockManagerParams =
+          new BlockManagerParameters()
+              .withFuturePool(futurePool)
+              .withBufferPoolSize(POOL_SIZE)
+              .withPrefetchingStatistics(streamStatistics)
+              .withConf(conf);
+
+      intercept(IllegalArgumentException.class, "'blockData' must not be null",
+          () -> new S3ACachingBlockManager(blockManagerParams, reader));
+    }
+  }
 
-    intercept(
-        IllegalArgumentException.class,
-        "'reader' must not be null",
-        () -> new S3ACachingBlockManager(futurePool, null, blockData, POOL_SIZE,
-            streamStatistics, conf, null));
+  @Test
+  public void testNonPositiveBufferPoolSize() throws Exception {
+    MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
+    Configuration conf = new Configuration();
+    try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) {
+      BlockManagerParameters blockManagerParams =
+          new BlockManagerParameters()
+              .withFuturePool(futurePool)
+              .withBlockData(blockData)
+              .withBufferPoolSize(0)
+              .withPrefetchingStatistics(streamStatistics)
+              .withConf(conf);
+
+      intercept(IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer",
+          () -> new S3ACachingBlockManager(blockManagerParams, reader));
+
+      BlockManagerParameters blockManagerParamsWithNegativeSize =
+          new BlockManagerParameters()
+              .withFuturePool(futurePool)
+              .withBlockData(blockData)
+              .withBufferPoolSize(-1)
+              .withPrefetchingStatistics(streamStatistics)
+              .withConf(conf);
+
+      intercept(IllegalArgumentException.class, "'bufferPoolSize' must be a positive integer",
+          () -> new S3ACachingBlockManager(blockManagerParamsWithNegativeSize, reader));
+    }
+  }
 
-    intercept(
-        IllegalArgumentException.class,
-        "'blockData' must not be null",
-        () -> new S3ACachingBlockManager(futurePool, reader, null, POOL_SIZE,
-            streamStatistics, conf, null));
+  @Test
+  public void testNullPrefetchingStatistics() throws Exception {
+    MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
+    Configuration conf = new Configuration();
+    try (S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File)) {
 
-    intercept(
-        IllegalArgumentException.class,
-        "'bufferPoolSize' must be a positive integer",
-        () -> new S3ACachingBlockManager(futurePool, reader, blockData, 0,
-            streamStatistics, conf, null));
+      BlockManagerParameters blockManagerParamsBuilder7 =
+          new BlockManagerParameters()
+              .withFuturePool(futurePool)
+              .withBlockData(blockData)
+              .withBufferPoolSize(POOL_SIZE)
+              .withConf(conf);
 
-    intercept(
-        IllegalArgumentException.class,
-        "'bufferPoolSize' must be a positive integer",
-        () -> new S3ACachingBlockManager(futurePool, reader, blockData, -1,
-            streamStatistics, conf, null));
+      intercept(NullPointerException.class,
+          () -> new S3ACachingBlockManager(blockManagerParamsBuilder7, reader));
+    }
+  }
 
-    intercept(NullPointerException.class,
-        () -> new S3ACachingBlockManager(futurePool, reader, blockData,
-            POOL_SIZE, null, conf, null));
+  @Test
+  public void testArgChecks() throws Exception {
+    MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
+    S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
+
+    Configuration conf = new Configuration();
+    BlockManagerParameters blockManagerParams =
+        new BlockManagerParameters()
+            .withFuturePool(futurePool)
+            .withBlockData(blockData)
+            .withBufferPoolSize(POOL_SIZE)
+            .withPrefetchingStatistics(streamStatistics)
+            .withConf(conf)
+            .withMaxBlocksCount(
+                conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
+
+    // Should not throw.
+    S3ACachingBlockManager blockManager =
+        new S3ACachingBlockManager(blockManagerParams, reader);
 
     intercept(
         IllegalArgumentException.class,
@@ -133,17 +213,9 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
   private static final class BlockManagerForTesting
       extends S3ACachingBlockManager {
 
-    private static final Configuration CONF =
-        S3ATestUtils.prepareTestConfiguration(new Configuration());
-
-    BlockManagerForTesting(
-        ExecutorServiceFuturePool futurePool,
-        S3ARemoteObjectReader reader,
-        BlockData blockData,
-        int bufferPoolSize,
-        S3AInputStreamStatistics streamStatistics) {
-      super(futurePool, reader, blockData, bufferPoolSize, streamStatistics, CONF,
-          new LocalDirAllocator(HADOOP_TMP_DIR));
+    BlockManagerForTesting(BlockManagerParameters blockManagerParameters,
+        S3ARemoteObjectReader reader) {
+      super(blockManagerParameters, reader);
     }
 
     // If true, forces the next read operation to fail.
@@ -196,9 +268,9 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
   private void testGetHelper(boolean forceReadFailure) throws Exception {
     MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, true);
     S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
+    BlockManagerParameters blockManagerParams = getBlockManagerParameters();
     BlockManagerForTesting blockManager =
-        new BlockManagerForTesting(futurePool, reader, blockData, POOL_SIZE,
-            streamStatistics);
+        new BlockManagerForTesting(blockManagerParams, reader);
 
     for (int b = 0; b < blockData.getNumBlocks(); b++) {
       // We simulate caching failure for all even numbered blocks.
@@ -244,9 +316,9 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
       throws IOException, InterruptedException {
     MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
     S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
+    BlockManagerParameters blockManagerParams = getBlockManagerParameters();
     BlockManagerForTesting blockManager =
-        new BlockManagerForTesting(futurePool, reader, blockData, POOL_SIZE,
-            streamStatistics);
+        new BlockManagerForTesting(blockManagerParams, reader);
     assertInitialState(blockManager);
 
     int expectedNumErrors = 0;
@@ -272,6 +344,18 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
     assertEquals(expectedNumSuccesses, blockManager.numCached());
   }
 
+  private BlockManagerParameters getBlockManagerParameters() {
+    return new BlockManagerParameters()
+        .withFuturePool(futurePool)
+        .withBlockData(blockData)
+        .withBufferPoolSize(POOL_SIZE)
+        .withPrefetchingStatistics(streamStatistics)
+        .withLocalDirAllocator(new LocalDirAllocator(HADOOP_TMP_DIR))
+        .withConf(CONF)
+        .withMaxBlocksCount(
+            CONF.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
+  }
+
   // @Ignore
   @Test
   public void testCachingOfPrefetched()
@@ -279,10 +363,19 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
     MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
     S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
     Configuration conf = new Configuration();
+    BlockManagerParameters blockManagerParamsBuilder =
+        new BlockManagerParameters()
+            .withFuturePool(futurePool)
+            .withBlockData(blockData)
+            .withBufferPoolSize(POOL_SIZE)
+            .withPrefetchingStatistics(streamStatistics)
+            .withLocalDirAllocator(
+                new LocalDirAllocator(conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR))
+            .withConf(conf)
+            .withMaxBlocksCount(
+                conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
     S3ACachingBlockManager blockManager =
-        new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
-            streamStatistics, conf, new LocalDirAllocator(
-            conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR));
+        new S3ACachingBlockManager(blockManagerParamsBuilder, reader);
     assertInitialState(blockManager);
 
     for (int b = 0; b < blockData.getNumBlocks(); b++) {
@@ -316,9 +409,9 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
       throws IOException, InterruptedException {
     MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
     S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
+    BlockManagerParameters blockManagerParams = getBlockManagerParameters();
     BlockManagerForTesting blockManager =
-        new BlockManagerForTesting(futurePool, reader, blockData, POOL_SIZE,
-            streamStatistics);
+        new BlockManagerForTesting(blockManagerParams, reader);
     assertInitialState(blockManager);
 
     int expectedNumErrors = 0;