Pārlūkot izejas kodu

HADOOP-18399. S3A Prefetch - SingleFilePerBlockCache to use LocalDirAllocator (#5054)


Contributed by Viraj Jasani
Viraj Jasani 2 gadi atpakaļ
vecāks
revīzija
eb326403ac

+ 7 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockCache.java

@@ -23,6 +23,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+
 /**
  * Provides functionality necessary for caching blocks of data read from FileSystem.
  */
@@ -64,7 +67,10 @@ public interface BlockCache extends Closeable {
    *
    * @param blockNumber the id of the given block.
    * @param buffer contents of the given block to be added to this cache.
+   * @param conf the configuration.
+   * @param localDirAllocator the local dir allocator instance.
    * @throws IOException if there is an error writing the given block.
    */
-  void put(int blockNumber, ByteBuffer buffer) throws IOException;
+  void put(int blockNumber, ByteBuffer buffer, Configuration conf,
+      LocalDirAllocator localDirAllocator) throws IOException;
 }

+ 16 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

@@ -33,6 +33,8 @@ import java.util.function.Supplier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 
 import static java.util.Objects.requireNonNull;
@@ -95,6 +97,10 @@ public abstract class CachingBlockManager extends BlockManager {
 
   private final PrefetchingStatistics prefetchingStatistics;
 
+  private final Configuration conf;
+
+  private final LocalDirAllocator localDirAllocator;
+
   /**
    * Constructs an instance of a {@code CachingBlockManager}.
    *
@@ -102,14 +108,17 @@ public abstract class CachingBlockManager extends BlockManager {
    * @param blockData information about each block of the underlying file.
    * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
    * @param prefetchingStatistics statistics for this stream.
-   *
+   * @param conf the configuration.
+   * @param localDirAllocator the local dir allocator instance.
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
   public CachingBlockManager(
       ExecutorServiceFuturePool futurePool,
       BlockData blockData,
       int bufferPoolSize,
-      PrefetchingStatistics prefetchingStatistics) {
+      PrefetchingStatistics prefetchingStatistics,
+      Configuration conf,
+      LocalDirAllocator localDirAllocator) {
     super(blockData);
 
     Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -129,6 +138,8 @@ public abstract class CachingBlockManager extends BlockManager {
 
     this.ops = new BlockOperations();
     this.ops.setDebug(false);
+    this.conf = requireNonNull(conf);
+    this.localDirAllocator = localDirAllocator;
   }
 
   /**
@@ -468,7 +479,8 @@ public abstract class CachingBlockManager extends BlockManager {
         blockFuture = cf;
       }
 
-      CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
+      CachePutTask task =
+          new CachePutTask(data, blockFuture, this, Instant.now());
       Future<Void> actionFuture = futurePool.executeFunction(task);
       data.setCaching(actionFuture);
       ops.end(op);
@@ -554,7 +566,7 @@ public abstract class CachingBlockManager extends BlockManager {
       return;
     }
 
-    cache.put(blockNumber, buffer);
+    cache.put(blockNumber, buffer, conf, localDirAllocator);
   }
 
   private static class CachePutTask implements Supplier<Void> {

+ 66 - 21
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

@@ -27,10 +27,9 @@ import java.nio.channels.WritableByteChannel;
 import java.nio.file.Files;
 import java.nio.file.OpenOption;
 import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.nio.file.StandardOpenOption;
-import java.nio.file.attribute.FileAttribute;
 import java.nio.file.attribute.PosixFilePermission;
-import java.nio.file.attribute.PosixFilePermissions;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -39,9 +38,13 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+
 import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
 
@@ -67,6 +70,12 @@ public class SingleFilePerBlockCache implements BlockCache {
 
   private final PrefetchingStatistics prefetchingStatistics;
 
+  /**
+   * File attributes attached to any intermediate temporary file created during index creation.
+   */
+  private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
+      ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
+
   /**
    * Cache entry.
    * Each block is stored as a separate file.
@@ -172,11 +181,17 @@ public class SingleFilePerBlockCache implements BlockCache {
   /**
    * Puts the given block in this cache.
    *
-   * @throws IllegalArgumentException if buffer is null.
-   * @throws IllegalArgumentException if buffer.limit() is zero or negative.
+   * @param blockNumber the block number, used as a key for blocks map.
+   * @param buffer buffer contents of the given block to be added to this cache.
+   * @param conf the configuration.
+   * @param localDirAllocator the local dir allocator instance.
+   * @throws IOException if either local dir allocator fails to allocate file or if IO error
+   * occurs while writing the buffer content to the file.
+   * @throws IllegalArgumentException if buffer is null, or if buffer.limit() is zero or negative.
    */
   @Override
-  public void put(int blockNumber, ByteBuffer buffer) throws IOException {
+  public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
+      LocalDirAllocator localDirAllocator) throws IOException {
     if (closed) {
       return;
     }
@@ -191,7 +206,7 @@ public class SingleFilePerBlockCache implements BlockCache {
 
     Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
 
-    Path blockFilePath = getCacheFilePath();
+    Path blockFilePath = getCacheFilePath(conf, localDirAllocator);
     long size = Files.size(blockFilePath);
     if (size != 0) {
       String message =
@@ -221,8 +236,19 @@ public class SingleFilePerBlockCache implements BlockCache {
     writeChannel.close();
   }
 
-  protected Path getCacheFilePath() throws IOException {
-    return getTempFilePath();
+  /**
+   * Return temporary file created based on the file path retrieved from local dir allocator.
+   *
+   * @param conf The configuration object.
+   * @param localDirAllocator Local dir allocator instance.
+   * @return Path of the temporary file created.
+   * @throws IOException if IO error occurs while local dir allocator tries to retrieve path
+   * from local FS or file creation fails or permission set fails.
+   */
+  protected Path getCacheFilePath(final Configuration conf,
+      final LocalDirAllocator localDirAllocator)
+      throws IOException {
+    return getTempFilePath(conf, localDirAllocator);
   }
 
   @Override
@@ -323,9 +349,19 @@ public class SingleFilePerBlockCache implements BlockCache {
 
   private static final String CACHE_FILE_PREFIX = "fs-cache-";
 
-  public static boolean isCacheSpaceAvailable(long fileSize) {
+  /**
+   * Determine if the cache space is available on the local FS.
+   *
+   * @param fileSize The size of the file.
+   * @param conf The configuration.
+   * @param localDirAllocator Local dir allocator instance.
+   * @return True if the given file size is less than the available free space on local FS,
+   * False otherwise.
+   */
+  public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf,
+      LocalDirAllocator localDirAllocator) {
     try {
-      Path cacheFilePath = getTempFilePath();
+      Path cacheFilePath = getTempFilePath(conf, localDirAllocator);
       long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
       LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
       Files.deleteIfExists(cacheFilePath);
@@ -339,16 +375,25 @@ public class SingleFilePerBlockCache implements BlockCache {
   // The suffix (file extension) of each serialized index file.
   private static final String BINARY_FILE_SUFFIX = ".bin";
 
-  // File attributes attached to any intermediate temporary file created during index creation.
-  private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS =
-      PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ,
-          PosixFilePermission.OWNER_WRITE));
-
-  private static Path getTempFilePath() throws IOException {
-    return Files.createTempFile(
-        CACHE_FILE_PREFIX,
-        BINARY_FILE_SUFFIX,
-        TEMP_FILE_ATTRS
-    );
+  /**
+   * Create temporary file based on the file path retrieved from local dir allocator
+   * instance. The file is created with .bin suffix. The created file has been granted
+   * posix file permissions available in TEMP_FILE_ATTRS.
+   *
+   * @param conf the configuration.
+   * @param localDirAllocator the local dir allocator instance.
+   * @return path of the file created.
+   * @throws IOException if IO error occurs while local dir allocator tries to retrieve path
+   * from local FS or file creation fails or permission set fails.
+   */
+  private static Path getTempFilePath(final Configuration conf,
+      final LocalDirAllocator localDirAllocator) throws IOException {
+    org.apache.hadoop.fs.Path path =
+        localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf);
+    File dir = new File(path.getParent().toUri().getPath());
+    String prefix = path.getName();
+    File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir);
+    Path tmpFilePath = Paths.get(tmpFile.toURI());
+    return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS);
   }
 }

+ 8 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java

@@ -23,8 +23,11 @@ import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -36,6 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {
 
   private static final int BUFFER_SIZE = 16;
 
+  private static final Configuration CONF = new Configuration();
+
   @Test
   public void testArgChecks() throws Exception {
     // Should not throw.
@@ -46,7 +51,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
 
     // Verify it throws correctly.
     intercept(IllegalArgumentException.class, "'buffer' must not be null",
-        () -> cache.put(42, null));
+        () -> cache.put(42, null, null, null));
 
 
     intercept(NullPointerException.class, null,
@@ -67,7 +72,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
 
     assertEquals(0, cache.size());
     assertFalse(cache.containsBlock(0));
-    cache.put(0, buffer1);
+    cache.put(0, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
     assertEquals(1, cache.size());
     assertTrue(cache.containsBlock(0));
     ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE);
@@ -77,7 +82,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
 
     assertEquals(1, cache.size());
     assertFalse(cache.containsBlock(1));
-    cache.put(1, buffer1);
+    cache.put(1, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
     assertEquals(2, cache.size());
     assertTrue(cache.containsBlock(1));
     ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE);

+ 5 - 0
hadoop-tools/hadoop-aws/pom.xml

@@ -198,6 +198,9 @@
                     <exclude>**/ITestMarkerToolRootOperations.java</exclude>
                     <!-- leave this until the end for better statistics -->
                     <exclude>**/ITestAggregateIOStatistics.java</exclude>
+                    <!-- cache file based assertions cannot be properly achieved with parallel
+                         execution, let this be sequential -->
+                    <exclude>**/ITestS3APrefetchingCacheFiles.java</exclude>
                   </excludes>
                 </configuration>
               </execution>
@@ -244,6 +247,8 @@
                     <include>**/ITestS3AContractRootDir.java</include>
                     <!-- leave this until the end for better statistics -->
                     <include>**/ITestAggregateIOStatistics.java</include>
+                    <!-- sequential execution for the better cleanup -->
+                    <include>**/ITestS3APrefetchingCacheFiles.java</include>
                   </includes>
                 </configuration>
               </execution>

+ 20 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -1363,6 +1363,21 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
    */
   File createTmpFileForWrite(String pathStr, long size,
       Configuration conf) throws IOException {
+    initLocalDirAllocatorIfNotInitialized(conf);
+    Path path = directoryAllocator.getLocalPathForWrite(pathStr,
+        size, conf);
+    File dir = new File(path.getParent().toUri().getPath());
+    String prefix = path.getName();
+    // create a temp file on this directory
+    return File.createTempFile(prefix, null, dir);
+  }
+
+  /**
+   * Initialize dir allocator if not already initialized.
+   *
+   * @param conf The Configuration object.
+   */
+  private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
     if (directoryAllocator == null) {
       synchronized (this) {
         String bufferDir = conf.get(BUFFER_DIR) != null
@@ -1370,12 +1385,6 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         directoryAllocator = new LocalDirAllocator(bufferDir);
       }
     }
-    Path path = directoryAllocator.getLocalPathForWrite(pathStr,
-        size, conf);
-    File dir = new File(path.getParent().toUri().getPath());
-    String prefix = path.getName();
-    // create a temp file on this directory
-    return File.createTempFile(prefix, null, dir);
   }
 
   /**
@@ -1568,12 +1577,16 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     LOG.debug("Opening '{}'", readContext);
 
     if (this.prefetchEnabled) {
+      Configuration configuration = getConf();
+      initLocalDirAllocatorIfNotInitialized(configuration);
       return new FSDataInputStream(
           new S3APrefetchingInputStream(
               readContext.build(),
               createObjectAttributes(path, fileStatus),
               createInputStreamCallbacks(auditSpan),
-              inputStreamStats));
+              inputStreamStats,
+              configuration,
+              directoryAllocator));
     } else {
       return new FSDataInputStream(
           new S3AInputStream(

+ 9 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java

@@ -25,6 +25,8 @@ import java.nio.ByteBuffer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.impl.prefetch.BlockData;
 import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
@@ -52,7 +54,8 @@ public class S3ACachingBlockManager extends CachingBlockManager {
    * @param blockData information about each block of the S3 file.
    * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
    * @param streamStatistics statistics for this stream.
-   *
+   * @param conf the configuration.
+   * @param localDirAllocator the local dir allocator instance.
    * @throws IllegalArgumentException if reader is null.
    */
   public S3ACachingBlockManager(
@@ -60,8 +63,11 @@ public class S3ACachingBlockManager extends CachingBlockManager {
       S3ARemoteObjectReader reader,
       BlockData blockData,
       int bufferPoolSize,
-      S3AInputStreamStatistics streamStatistics) {
-    super(futurePool, blockData, bufferPoolSize, streamStatistics);
+      S3AInputStreamStatistics streamStatistics,
+      Configuration conf,
+      LocalDirAllocator localDirAllocator) {
+
+    super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);
 
     Validate.checkNotNull(reader, "reader");
 

+ 19 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingInputStream.java

@@ -24,6 +24,8 @@ import java.io.IOException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.impl.prefetch.BlockData;
 import org.apache.hadoop.fs.impl.prefetch.BlockManager;
 import org.apache.hadoop.fs.impl.prefetch.BufferData;
@@ -61,7 +63,8 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
    * @param s3Attributes attributes of the S3 object being read.
    * @param client callbacks used for interacting with the underlying S3 client.
    * @param streamStatistics statistics for this stream.
-   *
+   * @param conf the configuration.
+   * @param localDirAllocator the local dir allocator instance.
    * @throws IllegalArgumentException if context is null.
    * @throws IllegalArgumentException if s3Attributes is null.
    * @throws IllegalArgumentException if client is null.
@@ -70,7 +73,9 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
       S3AInputStream.InputStreamCallbacks client,
-      S3AInputStreamStatistics streamStatistics) {
+      S3AInputStreamStatistics streamStatistics,
+      Configuration conf,
+      LocalDirAllocator localDirAllocator) {
     super(context, s3Attributes, client, streamStatistics);
 
     this.numBlocksToPrefetch = this.getContext().getPrefetchBlockCount();
@@ -79,7 +84,9 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
         this.getContext().getFuturePool(),
         this.getReader(),
         this.getBlockData(),
-        bufferPoolSize);
+        bufferPoolSize,
+        conf,
+        localDirAllocator);
     int fileSize = (int) s3Attributes.getLen();
     LOG.debug("Created caching input stream for {} (size = {})", this.getName(),
         fileSize);
@@ -176,9 +183,15 @@ public class S3ACachingInputStream extends S3ARemoteInputStream {
       ExecutorServiceFuturePool futurePool,
       S3ARemoteObjectReader reader,
       BlockData blockData,
-      int bufferPoolSize) {
-    return new S3ACachingBlockManager(futurePool, reader, blockData,
+      int bufferPoolSize,
+      Configuration conf,
+      LocalDirAllocator localDirAllocator) {
+    return new S3ACachingBlockManager(futurePool,
+        reader,
+        blockData,
         bufferPoolSize,
-        getS3AStreamStatistics());
+        getS3AStreamStatistics(),
+        conf,
+        localDirAllocator);
   }
 }

+ 10 - 3
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java

@@ -27,9 +27,11 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CanSetReadahead;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.hadoop.fs.impl.prefetch.Validate;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
@@ -79,7 +81,8 @@ public class S3APrefetchingInputStream
    * @param s3Attributes attributes of the S3 object being read.
    * @param client callbacks used for interacting with the underlying S3 client.
    * @param streamStatistics statistics for this stream.
-   *
+   * @param conf the configuration.
+   * @param localDirAllocator the local dir allocator instance retrieved from S3A FS.
    * @throws IllegalArgumentException if context is null.
    * @throws IllegalArgumentException if s3Attributes is null.
    * @throws IllegalArgumentException if client is null.
@@ -88,7 +91,9 @@ public class S3APrefetchingInputStream
       S3AReadOpContext context,
       S3ObjectAttributes s3Attributes,
       S3AInputStream.InputStreamCallbacks client,
-      S3AInputStreamStatistics streamStatistics) {
+      S3AInputStreamStatistics streamStatistics,
+      Configuration conf,
+      LocalDirAllocator localDirAllocator) {
 
     Validate.checkNotNull(context, "context");
     Validate.checkNotNull(s3Attributes, "s3Attributes");
@@ -114,7 +119,9 @@ public class S3APrefetchingInputStream
           context,
           s3Attributes,
           client,
-          streamStatistics);
+          streamStatistics,
+          conf,
+          localDirAllocator);
     }
   }
 

+ 144 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingCacheFiles.java

@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.File;
+import java.net.URI;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
+
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Test the cache file behaviour with prefetching input stream.
+ */
+public class ITestS3APrefetchingCacheFiles extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestS3APrefetchingCacheFiles.class);
+
+  private Path testFile;
+  private FileSystem fs;
+  private int prefetchBlockSize;
+  private Configuration conf;
+
+  public ITestS3APrefetchingCacheFiles() {
+    super(true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    super.setup();
+    // Sets BUFFER_DIR by calling S3ATestUtils#prepareTestConfiguration
+    conf = createConfiguration();
+    String testFileUri = S3ATestUtils.getCSVTestFile(conf);
+
+    testFile = new Path(testFileUri);
+    prefetchBlockSize = conf.getInt(PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE);
+    fs = getFileSystem();
+    fs.initialize(new URI(testFileUri), conf);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration configuration = super.createConfiguration();
+    S3ATestUtils.removeBaseAndBucketOverrides(configuration, PREFETCH_ENABLED_KEY);
+    configuration.setBoolean(PREFETCH_ENABLED_KEY, true);
+    return configuration;
+  }
+
+  @Override
+  public synchronized void teardown() throws Exception {
+    super.teardown();
+    File tmpFileDir = new File(conf.get(BUFFER_DIR));
+    File[] tmpFiles = tmpFileDir.listFiles();
+    if (tmpFiles != null) {
+      for (File filePath : tmpFiles) {
+        String path = filePath.getPath();
+        if (path.endsWith(".bin") && path.contains("fs-cache-")) {
+          filePath.delete();
+        }
+      }
+    }
+    cleanupWithLogger(LOG, fs);
+    fs = null;
+    testFile = null;
+  }
+
+  /**
+   * Test to verify the existence of the cache file.
+   * Tries to perform inputStream read and seek ops to make the prefetching take place and
+   * asserts whether file with .bin suffix is present. It also verifies certain file stats.
+   */
+  @Test
+  public void testCacheFileExistence() throws Throwable {
+    describe("Verify that FS cache files exist on local FS");
+
+    try (FSDataInputStream in = fs.open(testFile)) {
+      byte[] buffer = new byte[prefetchBlockSize];
+
+      in.read(buffer, 0, prefetchBlockSize - 10240);
+      in.seek(prefetchBlockSize * 2);
+      in.read(buffer, 0, prefetchBlockSize);
+
+      File tmpFileDir = new File(conf.get(BUFFER_DIR));
+      assertTrue("The dir to keep cache files must exist", tmpFileDir.exists());
+      File[] tmpFiles = tmpFileDir
+          .listFiles((dir, name) -> name.endsWith(".bin") && name.contains("fs-cache-"));
+      boolean isCacheFileForBlockFound = tmpFiles != null && tmpFiles.length > 0;
+      if (!isCacheFileForBlockFound) {
+        LOG.warn("No cache files found under " + tmpFileDir);
+      }
+      assertTrue("File to cache block data must exist", isCacheFileForBlockFound);
+
+      for (File tmpFile : tmpFiles) {
+        Path path = new Path(tmpFile.getAbsolutePath());
+        try (FileSystem localFs = FileSystem.getLocal(conf)) {
+          FileStatus stat = localFs.getFileStatus(path);
+          ContractTestUtils.assertIsFile(path, stat);
+          assertEquals("File length not matching with prefetchBlockSize", prefetchBlockSize,
+              stat.getLen());
+          assertEquals("User permissions should be RW", FsAction.READ_WRITE,
+              stat.getPermission().getUserAction());
+          assertEquals("Group permissions should be NONE", FsAction.NONE,
+              stat.getPermission().getGroupAction());
+          assertEquals("Other permissions should be NONE", FsAction.NONE,
+              stat.getPermission().getOtherAction());
+        }
+      }
+    }
+  }
+
+}

+ 21 - 6
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java

@@ -36,7 +36,9 @@ import com.amazonaws.services.s3.model.ObjectMetadata;
 import com.amazonaws.services.s3.model.S3Object;
 import com.amazonaws.services.s3.model.S3ObjectInputStream;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.impl.prefetch.BlockCache;
 import org.apache.hadoop.fs.impl.prefetch.BlockData;
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
@@ -60,6 +62,8 @@ import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
 
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
 import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.emptyStatisticsStore;
 
 /**
@@ -86,6 +90,8 @@ public final class S3APrefetchFakes {
 
   public static final long MODIFICATION_TIME = 0L;
 
+  private static final Configuration CONF = new Configuration();
+
   public static final ChangeDetectionPolicy CHANGE_POLICY =
       ChangeDetectionPolicy.createPolicy(
           ChangeDetectionPolicy.Mode.None,
@@ -335,7 +341,9 @@ public final class S3APrefetchFakes {
     private long fileCount = 0;
 
     @Override
-    protected Path getCacheFilePath() throws IOException {
+    protected Path getCacheFilePath(final Configuration conf,
+        final LocalDirAllocator localDirAllocator)
+        throws IOException {
       fileCount++;
       return Paths.get(Long.toString(fileCount));
     }
@@ -363,9 +371,12 @@ public final class S3APrefetchFakes {
         ExecutorServiceFuturePool futurePool,
         S3ARemoteObjectReader reader,
         BlockData blockData,
-        int bufferPoolSize) {
+        int bufferPoolSize,
+        Configuration conf,
+        LocalDirAllocator localDirAllocator) {
       super(futurePool, reader, blockData, bufferPoolSize,
-          new EmptyS3AStatisticsContext().newInputStreamStatistics());
+          new EmptyS3AStatisticsContext().newInputStreamStatistics(),
+          conf, localDirAllocator);
     }
 
     @Override
@@ -390,7 +401,9 @@ public final class S3APrefetchFakes {
         S3ObjectAttributes s3Attributes,
         S3AInputStream.InputStreamCallbacks client,
         S3AInputStreamStatistics streamStatistics) {
-      super(context, s3Attributes, client, streamStatistics);
+      super(context, s3Attributes, client, streamStatistics, CONF,
+          new LocalDirAllocator(
+              CONF.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR));
     }
 
     @Override
@@ -405,9 +418,11 @@ public final class S3APrefetchFakes {
         ExecutorServiceFuturePool futurePool,
         S3ARemoteObjectReader reader,
         BlockData blockData,
-        int bufferPoolSize) {
+        int bufferPoolSize,
+        Configuration conf,
+        LocalDirAllocator localDirAllocator) {
       return new FakeS3ACachingBlockManager(futurePool, reader, blockData,
-          bufferPoolSize);
+          bufferPoolSize, conf, localDirAllocator);
     }
   }
 }

+ 23 - 11
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ACachingBlockManager.java

@@ -26,13 +26,18 @@ import java.util.concurrent.Executors;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.impl.prefetch.BlockData;
 import org.apache.hadoop.fs.impl.prefetch.BufferData;
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
 
+import static org.apache.hadoop.fs.s3a.Constants.BUFFER_DIR;
+import static org.apache.hadoop.fs.s3a.Constants.HADOOP_TMP_DIR;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 import static org.junit.Assert.assertEquals;
 
@@ -59,44 +64,45 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
     MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
     S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
 
+    Configuration conf = new Configuration();
     // Should not throw.
     S3ACachingBlockManager blockManager =
         new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
-            streamStatistics);
+            streamStatistics, conf, null);
 
     // Verify it throws correctly.
     intercept(
         NullPointerException.class,
         () -> new S3ACachingBlockManager(null, reader, blockData, POOL_SIZE,
-            streamStatistics));
+            streamStatistics, conf, null));
 
     intercept(
         IllegalArgumentException.class,
         "'reader' must not be null",
         () -> new S3ACachingBlockManager(futurePool, null, blockData, POOL_SIZE,
-            streamStatistics));
+            streamStatistics, conf, null));
 
     intercept(
         IllegalArgumentException.class,
         "'blockData' must not be null",
         () -> new S3ACachingBlockManager(futurePool, reader, null, POOL_SIZE,
-            streamStatistics));
+            streamStatistics, conf, null));
 
     intercept(
         IllegalArgumentException.class,
         "'bufferPoolSize' must be a positive integer",
         () -> new S3ACachingBlockManager(futurePool, reader, blockData, 0,
-            streamStatistics));
+            streamStatistics, conf, null));
 
     intercept(
         IllegalArgumentException.class,
         "'bufferPoolSize' must be a positive integer",
         () -> new S3ACachingBlockManager(futurePool, reader, blockData, -1,
-            streamStatistics));
+            streamStatistics, conf, null));
 
     intercept(NullPointerException.class,
         () -> new S3ACachingBlockManager(futurePool, reader, blockData,
-            POOL_SIZE, null));
+            POOL_SIZE, null, conf, null));
 
     intercept(
         IllegalArgumentException.class,
@@ -125,13 +131,17 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
   private static final class BlockManagerForTesting
       extends S3ACachingBlockManager {
 
+    private static final Configuration CONF =
+        S3ATestUtils.prepareTestConfiguration(new Configuration());
+
     BlockManagerForTesting(
         ExecutorServiceFuturePool futurePool,
         S3ARemoteObjectReader reader,
         BlockData blockData,
         int bufferPoolSize,
         S3AInputStreamStatistics streamStatistics) {
-      super(futurePool, reader, blockData, bufferPoolSize, streamStatistics);
+      super(futurePool, reader, blockData, bufferPoolSize, streamStatistics, CONF,
+          new LocalDirAllocator(HADOOP_TMP_DIR));
     }
 
     // If true, forces the next read operation to fail.
@@ -154,8 +164,8 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
     private boolean forceNextCachePutToFail;
 
     @Override
-    protected void cachePut(int blockNumber, ByteBuffer buffer)
-        throws IOException {
+    protected void cachePut(int blockNumber,
+        ByteBuffer buffer) throws IOException {
       if (forceNextCachePutToFail) {
         forceNextCachePutToFail = false;
         throw new RuntimeException("bar");
@@ -262,9 +272,11 @@ public class TestS3ACachingBlockManager extends AbstractHadoopTestBase {
       throws IOException, InterruptedException {
     MockS3ARemoteObject s3File = new MockS3ARemoteObject(FILE_SIZE, false);
     S3ARemoteObjectReader reader = new S3ARemoteObjectReader(s3File);
+    Configuration conf = new Configuration();
     S3ACachingBlockManager blockManager =
         new S3ACachingBlockManager(futurePool, reader, blockData, POOL_SIZE,
-            streamStatistics);
+            streamStatistics, conf, new LocalDirAllocator(
+            conf.get(BUFFER_DIR) != null ? BUFFER_DIR : HADOOP_TMP_DIR));
     assertInitialState(blockManager);
 
     for (int b = 0; b < blockData.getNumBlocks(); b++) {

+ 8 - 5
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/TestS3ARemoteInputStream.java

@@ -27,11 +27,13 @@ import java.util.concurrent.Executors;
 
 import org.junit.Test;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.impl.prefetch.ExceptionAsserts;
 import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
 import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
+import org.apache.hadoop.fs.s3a.S3ATestUtils;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.test.AbstractHadoopTestBase;
@@ -63,24 +65,25 @@ public class TestS3ARemoteInputStream extends AbstractHadoopTestBase {
     S3AInputStreamStatistics stats =
         readContext.getS3AStatisticsContext().newInputStreamStatistics();
 
+    Configuration conf = S3ATestUtils.prepareTestConfiguration(new Configuration());
     // Should not throw.
-    new S3ACachingInputStream(readContext, attrs, client, stats);
+    new S3ACachingInputStream(readContext, attrs, client, stats, conf, null);
 
     ExceptionAsserts.assertThrows(
         NullPointerException.class,
-        () -> new S3ACachingInputStream(null, attrs, client, stats));
+        () -> new S3ACachingInputStream(null, attrs, client, stats, conf, null));
 
     ExceptionAsserts.assertThrows(
         NullPointerException.class,
-        () -> new S3ACachingInputStream(readContext, null, client, stats));
+        () -> new S3ACachingInputStream(readContext, null, client, stats, conf, null));
 
     ExceptionAsserts.assertThrows(
         NullPointerException.class,
-        () -> new S3ACachingInputStream(readContext, attrs, null, stats));
+        () -> new S3ACachingInputStream(readContext, attrs, null, stats, conf, null));
 
     ExceptionAsserts.assertThrows(
         NullPointerException.class,
-        () -> new S3ACachingInputStream(readContext, attrs, client, null));
+        () -> new S3ACachingInputStream(readContext, attrs, client, null, conf, null));
   }
 
   @Test