瀏覽代碼

HADOOP-18829. S3A prefetch LRU cache eviction metrics (#5893) (#6111)

Contributed by: Viraj Jasani
Viraj Jasani 1 年之前
父節點
當前提交
9289b552d2
共有 13 個文件被更改,包括 145 次插入29 次删除
  1. 8 4
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java
  2. 5 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/EmptyPrefetchingStatistics.java
  3. 5 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java
  4. 34 17
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java
  5. 12 0
      hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
  6. 3 3
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java
  7. 37 0
      hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java
  8. 9 2
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java
  9. 10 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java
  10. 2 1
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java
  11. 5 0
      hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java
  12. 12 0
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java
  13. 3 2
      hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java

+ 8 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 
 import static java.util.Objects.requireNonNull;
 
@@ -111,8 +112,10 @@ public abstract class CachingBlockManager extends BlockManager {
    * @param conf the configuration.
    * @param localDirAllocator the local dir allocator instance.
    * @param maxBlocksCount max blocks count to be kept in cache at any time.
+   * @param trackerFactory tracker with statistics to update.
    * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
    */
+  @SuppressWarnings("checkstyle:parameternumber")
   public CachingBlockManager(
       ExecutorServiceFuturePool futurePool,
       BlockData blockData,
@@ -120,7 +123,8 @@ public abstract class CachingBlockManager extends BlockManager {
       PrefetchingStatistics prefetchingStatistics,
       Configuration conf,
       LocalDirAllocator localDirAllocator,
-      int maxBlocksCount) {
+      int maxBlocksCount,
+      DurationTrackerFactory trackerFactory) {
     super(blockData);
 
     Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -136,7 +140,7 @@ public abstract class CachingBlockManager extends BlockManager {
     if (this.getBlockData().getFileSize() > 0) {
       this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
           this.prefetchingStatistics);
-      this.cache = this.createCache(maxBlocksCount);
+      this.cache = this.createCache(maxBlocksCount, trackerFactory);
     }
 
     this.ops = new BlockOperations();
@@ -559,8 +563,8 @@ public abstract class CachingBlockManager extends BlockManager {
     }
   }
 
-  protected BlockCache createCache(int maxBlocksCount) {
-    return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount);
+  protected BlockCache createCache(int maxBlocksCount, DurationTrackerFactory trackerFactory) {
+    return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount, trackerFactory);
   }
 
   protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/EmptyPrefetchingStatistics.java

@@ -57,6 +57,11 @@ public final class EmptyPrefetchingStatistics
 
   }
 
+  @Override
+  public void blockEvictedFromFileCache() {
+
+  }
+
   @Override
   public void prefetchOperationCompleted() {
 

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/PrefetchingStatistics.java

@@ -42,6 +42,11 @@ public interface PrefetchingStatistics extends IOStatisticsSource {
    */
   void blockRemovedFromFileCache();
 
+  /**
+   * A block has been evicted from the file cache.
+   */
+  void blockEvictedFromFileCache();
+
   /**
    * A prefetch operation has completed.
    */

+ 34 - 17
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

@@ -47,10 +47,14 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.statistics.DurationTracker;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.util.Preconditions;
 
 import static java.util.Objects.requireNonNull;
 import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTrackerFactory;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
 
 /**
  * Provides functionality necessary for caching blocks of data read from FileSystem.
@@ -99,6 +103,11 @@ public class SingleFilePerBlockCache implements BlockCache {
 
   private final PrefetchingStatistics prefetchingStatistics;
 
+  /**
+   * Duration tracker factory required to track the duration of some operations.
+   */
+  private final DurationTrackerFactory trackerFactory;
+
   /**
    * File attributes attached to any intermediate temporary file created during index creation.
    */
@@ -209,14 +218,19 @@ public class SingleFilePerBlockCache implements BlockCache {
    *
    * @param prefetchingStatistics statistics for this stream.
    * @param maxBlocksCount max blocks count to be kept in cache at any time.
+   * @param trackerFactory tracker with statistics to update
    */
-  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics, int maxBlocksCount) {
+  public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics,
+      int maxBlocksCount,
+      DurationTrackerFactory trackerFactory) {
     this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
     this.closed = new AtomicBoolean(false);
     this.maxBlocksCount = maxBlocksCount;
     Preconditions.checkArgument(maxBlocksCount > 0, "maxBlocksCount should be more than 0");
     blocks = new ConcurrentHashMap<>();
     blocksLock = new ReentrantReadWriteLock();
+    this.trackerFactory = trackerFactory != null
+        ? trackerFactory : stubDurationTrackerFactory();
   }
 
   /**
@@ -430,25 +444,28 @@ public class SingleFilePerBlockCache implements BlockCache {
    * @param elementToPurge Block entry to evict.
    */
   private void deleteBlockFileAndEvictCache(Entry elementToPurge) {
-    boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
-        PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
-        PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-    if (!lockAcquired) {
-      LOG.error("Cache file {} deletion would not be attempted as write lock could not"
-              + " be acquired within {} {}", elementToPurge.path,
+    try (DurationTracker ignored = trackerFactory.trackDuration(STREAM_FILE_CACHE_EVICTION)) {
+      boolean lockAcquired = elementToPurge.takeLock(Entry.LockType.WRITE,
           PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
           PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-    } else {
-      try {
-        if (Files.deleteIfExists(elementToPurge.path)) {
-          entryListSize--;
-          prefetchingStatistics.blockRemovedFromFileCache();
-          blocks.remove(elementToPurge.blockNumber);
+      if (!lockAcquired) {
+        LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+                + " be acquired within {} {}", elementToPurge.path,
+            PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT,
+            PrefetchConstants.PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+      } else {
+        try {
+          if (Files.deleteIfExists(elementToPurge.path)) {
+            entryListSize--;
+            prefetchingStatistics.blockRemovedFromFileCache();
+            blocks.remove(elementToPurge.blockNumber);
+            prefetchingStatistics.blockEvictedFromFileCache();
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
+        } finally {
+          elementToPurge.releaseLock(Entry.LockType.WRITE);
         }
-      } catch (IOException e) {
-        LOG.warn("Failed to delete cache file {}", elementToPurge.path, e);
-      } finally {
-        elementToPurge.releaseLock(Entry.LockType.WRITE);
       }
     }
   }

+ 12 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

@@ -455,6 +455,18 @@ public final class StreamStatisticNames {
   public static final String STREAM_READ_BLOCK_ACQUIRE_AND_READ
       = "stream_read_block_acquire_read";
 
+  /**
+   * Total number of blocks evicted from the disk cache.
+   */
+  public static final String STREAM_EVICT_BLOCKS_FROM_FILE_CACHE
+      = "stream_evict_blocks_from_cache";
+
+  /**
+   * Track duration of LRU cache eviction for disk cache.
+   */
+  public static final String STREAM_FILE_CACHE_EVICTION
+      = "stream_file_cache_eviction";
+
   private StreamStatisticNames() {
   }
 

+ 3 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java

@@ -45,7 +45,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
   public void testArgChecks() throws Exception {
     // Should not throw.
     BlockCache cache =
-        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
+        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2, null);
 
     ByteBuffer buffer = ByteBuffer.allocate(16);
 
@@ -55,7 +55,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
 
 
     intercept(NullPointerException.class, null,
-        () -> new SingleFilePerBlockCache(null, 2));
+        () -> new SingleFilePerBlockCache(null, 2, null));
 
   }
 
@@ -63,7 +63,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
   @Test
   public void testPutAndGet() throws Exception {
     BlockCache cache =
-        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2);
+        new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance(), 2, null);
 
     ByteBuffer buffer1 = ByteBuffer.allocate(BUFFER_SIZE);
     for (byte i = 0; i < BUFFER_SIZE; i++) {

+ 37 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/statistics/IOStatisticAssertions.java

@@ -176,6 +176,23 @@ public final class IOStatisticAssertions {
         verifyStatisticsNotNull(stats).counters(), value);
   }
 
+  /**
+   * Assert that two counters have similar values.
+   *
+   * @param stats statistics source.
+   * @param key1 statistic first key.
+   * @param key2 statistic second key.
+   */
+  public static void verifyStatisticCounterValues(
+      final IOStatistics stats,
+      final String key1,
+      final String key2) {
+    verifyStatisticValues(COUNTER,
+        key1,
+        key2,
+        verifyStatisticsNotNull(stats).counters());
+  }
+
   /**
    * Assert that a gauge has an expected value.
    * @param stats statistics source
@@ -258,6 +275,26 @@ public final class IOStatisticAssertions {
     return statistic;
   }
 
+  /**
+   * Assert that the given two statistics have same values.
+   *
+   * @param type type of the statistics.
+   * @param key1 statistic first key.
+   * @param key2 statistic second key.
+   * @param map map to look up.
+   * @param <E> type of map element.
+   */
+  private static <E> void verifyStatisticValues(
+      final String type,
+      final String key1,
+      final String key2,
+      final Map<String, E> map) {
+    final E statistic1 = lookupStatistic(type, key1, map);
+    final E statistic2 = lookupStatistic(type, key2, map);
+    assertThat(statistic1)
+        .describedAs("%s named %s and %s named %s", type, key1, type, key2)
+        .isEqualTo(statistic2);
+  }
 
   /**
    * Assert that a given statistic has an expected value.

+ 9 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

@@ -886,7 +886,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES,
               StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
               StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
-              StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
+              StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
+              StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE)
           .withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
               STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
               STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -899,7 +900,8 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
               StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
               StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
               StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ,
-              StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ)
+              StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ,
+              StreamStatisticNames.STREAM_FILE_CACHE_EVICTION)
           .build();
       setIOStatistics(st);
       aborted = st.getCounterReference(
@@ -1395,6 +1397,11 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
       incAllGauges(STREAM_READ_BLOCKS_IN_FILE_CACHE, -1);
     }
 
+    @Override
+    public void blockEvictedFromFileCache() {
+      increment(StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE);
+    }
+
     @Override
     public void prefetchOperationCompleted() {
       incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, -1);

+ 10 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

@@ -447,6 +447,16 @@ public enum Statistic {
       "Total queue duration of all block uploads",
       TYPE_DURATION),
 
+  /* Stream prefetch file cache eviction */
+  STREAM_EVICT_BLOCKS_FROM_FILE_CACHE(
+      StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
+      "Count of blocks evicted from the disk cache",
+      TYPE_COUNTER),
+  STREAM_FILE_CACHE_EVICTION(
+      StreamStatisticNames.STREAM_FILE_CACHE_EVICTION,
+      "Duration of the eviction of an element from LRU cache that holds disk cache blocks",
+      TYPE_DURATION),
+
   /* committer stats */
   COMMITTER_COMMITS_CREATED(
       "committer_commits_created",

+ 2 - 1
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java

@@ -76,7 +76,8 @@ public class S3ACachingBlockManager extends CachingBlockManager {
         streamStatistics,
         conf,
         localDirAllocator,
-        conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT));
+        conf.getInt(PREFETCH_MAX_BLOCKS_COUNT, DEFAULT_PREFETCH_MAX_BLOCKS_COUNT),
+        streamStatistics);
 
     Validate.checkNotNull(reader, "reader");
 

+ 5 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

@@ -241,6 +241,11 @@ public final class EmptyS3AStatisticsContext implements S3AStatisticsContext {
 
     }
 
+    @Override
+    public void blockEvictedFromFileCache() {
+
+    }
+
     @Override
     public void executorAcquired(Duration timeInQueue) {
 

+ 12 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingLruEviction.java

@@ -45,7 +45,11 @@ import org.apache.hadoop.test.LambdaTestUtils;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_SIZE_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_MAX_BLOCKS_COUNT;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertThatStatisticCounter;
+import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValues;
 import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticGaugeValue;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE;
+import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_FILE_CACHE_EVICTION;
 import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BLOCKS_IN_FILE_CACHE;
 
 /**
@@ -172,6 +176,14 @@ public class ITestS3APrefetchingLruEviction extends AbstractS3ACostTest {
     LambdaTestUtils.eventually(TIMEOUT_MILLIS, INTERVAL_MILLIS, () -> {
       LOG.info("IO stats: {}", ioStats);
       verifyStatisticGaugeValue(ioStats, STREAM_READ_BLOCKS_IN_FILE_CACHE, 0);
+      // stream_evict_blocks_from_cache is expected to be higher than 4, however we might face
+      // transient failures due to async prefetch get cancel issues. While TIMEOUT_MILLIS is
+      // sufficient wait time, consider re-running the test if stream_evict_blocks_from_cache
+      // value stays lower than 4.
+      assertThatStatisticCounter(ioStats,
+          STREAM_EVICT_BLOCKS_FROM_FILE_CACHE).isGreaterThanOrEqualTo(4);
+      verifyStatisticCounterValues(ioStats, STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
+          STREAM_FILE_CACHE_EVICTION);
     });
   }
 

+ 3 - 2
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchFakes.java

@@ -59,6 +59,7 @@ import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
 import org.apache.hadoop.fs.s3a.statistics.impl.CountingChangeTracker;
 import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
+import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.util.functional.CallableRaisingIOE;
@@ -316,7 +317,7 @@ public final class S3APrefetchFakes {
 
     public FakeS3FilePerBlockCache(int readDelay, int writeDelay) {
       super(new EmptyS3AStatisticsContext().newInputStreamStatistics(),
-          Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT);
+          Constants.DEFAULT_PREFETCH_MAX_BLOCKS_COUNT, null);
       this.files = new ConcurrentHashMap<>();
       this.readDelay = readDelay;
       this.writeDelay = writeDelay;
@@ -389,7 +390,7 @@ public final class S3APrefetchFakes {
     }
 
     @Override
-    protected BlockCache createCache(int maxBlocksCount) {
+    protected BlockCache createCache(int maxBlocksCount, DurationTrackerFactory trackerFactory) {
       final int readDelayMs = 50;
       final int writeDelayMs = 200;
       return new FakeS3FilePerBlockCache(readDelayMs, writeDelayMs);