Browse Source

HADOOP-18756. S3A prefetch - CachingBlockManager to use AtomicBoolean for closed flag (#5718)

Contributed by Viraj Jasani
Viraj Jasani 1 năm trước cách đây
mục cha
commit
51a7f7b024

+ 26 - 30
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

@@ -38,6 +38,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
@@ -68,7 +69,7 @@ public class SingleFilePerBlockCache implements BlockCache {
    */
   private int numGets = 0;
 
-  private boolean closed;
+  private final AtomicBoolean closed;
 
   private final PrefetchingStatistics prefetchingStatistics;
 
@@ -174,6 +175,7 @@ public class SingleFilePerBlockCache implements BlockCache {
    */
   public SingleFilePerBlockCache(PrefetchingStatistics prefetchingStatistics) {
     this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
+    this.closed = new AtomicBoolean(false);
   }
 
   /**
@@ -207,7 +209,7 @@ public class SingleFilePerBlockCache implements BlockCache {
    */
   @Override
   public void get(int blockNumber, ByteBuffer buffer) throws IOException {
-    if (closed) {
+    if (closed.get()) {
       return;
     }
 
@@ -262,7 +264,7 @@ public class SingleFilePerBlockCache implements BlockCache {
   @Override
   public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
       LocalDirAllocator localDirAllocator) throws IOException {
-    if (closed) {
+    if (closed.get()) {
       return;
     }
 
@@ -333,37 +335,31 @@ public class SingleFilePerBlockCache implements BlockCache {
 
   @Override
   public void close() throws IOException {
-    if (closed) {
-      return;
-    }
-
-    closed = true;
+    if (closed.compareAndSet(false, true)) {
+      LOG.debug(getStats());
+      int numFilesDeleted = 0;
 
-    LOG.info(getStats());
-    int numFilesDeleted = 0;
-
-    for (Entry entry : blocks.values()) {
-      boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
-          PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-      if (!lockAcquired) {
-        LOG.error("Cache file {} deletion would not be attempted as write lock could not"
-                + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
+      for (Entry entry : blocks.values()) {
+        boolean lockAcquired = entry.takeLock(Entry.LockType.WRITE, PREFETCH_WRITE_LOCK_TIMEOUT,
             PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
-        continue;
-      }
-      try {
-        Files.deleteIfExists(entry.path);
-        prefetchingStatistics.blockRemovedFromFileCache();
-        numFilesDeleted++;
-      } catch (IOException e) {
-        LOG.debug("Failed to delete cache file {}", entry.path, e);
-      } finally {
-        entry.releaseLock(Entry.LockType.WRITE);
+        if (!lockAcquired) {
+          LOG.error("Cache file {} deletion would not be attempted as write lock could not"
+                  + " be acquired within {} {}", entry.path, PREFETCH_WRITE_LOCK_TIMEOUT,
+              PREFETCH_WRITE_LOCK_TIMEOUT_UNIT);
+          continue;
+        }
+        try {
+          Files.deleteIfExists(entry.path);
+          prefetchingStatistics.blockRemovedFromFileCache();
+          numFilesDeleted++;
+        } catch (IOException e) {
+          LOG.warn("Failed to delete cache file {}", entry.path, e);
+        } finally {
+          entry.releaseLock(Entry.LockType.WRITE);
+        }
       }
-    }
 
-    if (numFilesDeleted > 0) {
-      LOG.info("Deleted {} cache files", numFilesDeleted);
+      LOG.debug("Prefetch cache close: Deleted {} cache files", numFilesDeleted);
     }
   }