فهرست منبع

HADOOP-18740. S3A prefetch cache blocks should be accessed by RW locks (#5675)

Contributed by Viraj Jasani
Viraj Jasani 1 سال پیش
والد
کامیت
afb863acf4

+ 91 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

@@ -37,6 +37,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
@@ -70,6 +72,16 @@ public class SingleFilePerBlockCache implements BlockCache {
 
   private final PrefetchingStatistics prefetchingStatistics;
 
+  /**
+   * Timeout to be used by close, while acquiring prefetch block write lock.
+   */
+  private static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
+
+  /**
+   * Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
+   */
+  private static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
+
   /**
    * File attributes attached to any intermediate temporary file created during index creation.
    */
@@ -85,12 +97,18 @@ public class SingleFilePerBlockCache implements BlockCache {
     private final Path path;
     private final int size;
     private final long checksum;
+    private final ReentrantReadWriteLock lock;
+    private enum LockType {
+      READ,
+      WRITE
+    }
 
     Entry(int blockNumber, Path path, int size, long checksum) {
       this.blockNumber = blockNumber;
       this.path = path;
       this.size = size;
       this.checksum = checksum;
+      this.lock = new ReentrantReadWriteLock();
     }
 
     @Override
@@ -99,6 +117,54 @@ public class SingleFilePerBlockCache implements BlockCache {
           "([%03d] %s: size = %d, checksum = %d)",
           blockNumber, path, size, checksum);
     }
+
+    /**
+     * Take the read or write lock.
+     *
+     * @param lockType type of the lock.
+     */
+    private void takeLock(LockType lockType) {
+      if (LockType.READ == lockType) {
+        lock.readLock().lock();
+      } else if (LockType.WRITE == lockType) {
+        lock.writeLock().lock();
+      }
+    }
+
+    /**
+     * Release the read or write lock.
+     *
+     * @param lockType type of the lock.
+     */
+    private void releaseLock(LockType lockType) {
+      if (LockType.READ == lockType) {
+        lock.readLock().unlock();
+      } else if (LockType.WRITE == lockType) {
+        lock.writeLock().unlock();
+      }
+    }
+
+    /**
+     * Try to take the read or write lock within the given timeout.
+     *
+     * @param lockType type of the lock.
+     * @param timeout the time to wait for the given lock.
+     * @param unit the time unit of the timeout argument.
+     * @return true if the lock of the given lock type was acquired.
+     */
+    private boolean takeLock(LockType lockType, long timeout, TimeUnit unit) {
+      try {
+        if (LockType.READ == lockType) {
+          return lock.readLock().tryLock(timeout, unit);
+        } else if (LockType.WRITE == lockType) {
+          return lock.writeLock().tryLock(timeout, unit);
+        }
+      } catch (InterruptedException e) {
+        LOG.warn("Thread interrupted while trying to acquire {} lock", lockType, e);
+        Thread.currentThread().interrupt();
+      }
+      return false;
+    }
   }
 
   /**
@@ -148,11 +214,15 @@ public class SingleFilePerBlockCache implements BlockCache {
     checkNotNull(buffer, "buffer");
 
     Entry entry = getEntry(blockNumber);
-    buffer.clear();
-    readFile(entry.path, buffer);
-    buffer.rewind();
-
-    validateEntry(entry, buffer);
+    entry.takeLock(Entry.LockType.READ);
+    try {
+      buffer.clear();
+      readFile(entry.path, buffer);
+      buffer.rewind();
+      validateEntry(entry, buffer);
+    } finally {
+      entry.releaseLock(Entry.LockType.READ);
+    }
   }
 
   protected int readFile(Path path, ByteBuffer buffer) throws IOException {
@@ -200,7 +270,12 @@ public class SingleFilePerBlockCache implements BlockCache {
 
     if (blocks.containsKey(blockNumber)) {
       Entry entry = blocks.get(blockNumber);
-      validateEntry(entry, buffer);
+      entry.takeLock(Entry.LockType.READ);
+      try {
+        validateEntry(entry, buffer);
+      } finally {
+        entry.releaseLock(Entry.LockType.READ);
+      }
       return;
     }
 
@@ -268,12 +343,22 @@ public class SingleFilePerBlockCache implements BlockCache {
     int numFilesDeleted = 0;
 
     for (Entry entry : blocks.values()) {
+      boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
+          PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+      if (!lockAcquired) {
+        LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+                + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
+            PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+        continue;
+      }
       try {
         Files.deleteIfExists(entry.path);
         prefetchingStatistics.blockRemovedFromFileCache();
         numFilesDeleted++;
       } catch (IOException e) {
         LOG.debug("Failed to delete cache file {}", entry.path, e);
+      } finally {
+        entry.releaseLock(Entry.LockType.WRITE);
       }
     }