Browse Source

HADOOP-18455. S3A prefetching executor should be closed (#4879)

follow-on patch to HADOOP-18186. 

Contributed by: Viraj Jasani
Viraj Jasani 2 years ago
parent
commit
084b68e380

+ 19 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/ExecutorServiceFuturePool.java

@@ -22,8 +22,13 @@ package org.apache.hadoop.fs.impl.prefetch;
 import java.util.Locale;
 import java.util.Locale;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
 import java.util.function.Supplier;
 
 
+import org.slf4j.Logger;
+
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+
 /**
 /**
  * A FuturePool implementation backed by a java.util.concurrent.ExecutorService.
  * A FuturePool implementation backed by a java.util.concurrent.ExecutorService.
  *
  *
@@ -37,7 +42,8 @@ import java.util.function.Supplier;
  *
  *
  */
  */
 public class ExecutorServiceFuturePool {
 public class ExecutorServiceFuturePool {
-  private ExecutorService executor;
+
+  private final ExecutorService executor;
 
 
   public ExecutorServiceFuturePool(ExecutorService executor) {
   public ExecutorServiceFuturePool(ExecutorService executor) {
     this.executor = executor;
     this.executor = executor;
@@ -64,6 +70,18 @@ public class ExecutorServiceFuturePool {
     return (Future<Void>) executor.submit(r::run);
     return (Future<Void>) executor.submit(r::run);
   }
   }
 
 
+  /**
+   * Utility to shutdown the {@link ExecutorService} used by this class. Will wait up to a
+   * certain timeout for the ExecutorService to gracefully shutdown.
+   *
+   * @param logger Logger
+   * @param timeout the maximum time to wait
+   * @param unit the time unit of the timeout argument
+   */
+  public void shutdown(Logger logger, long timeout, TimeUnit unit) {
+    HadoopExecutors.shutdown(executor, logger, timeout, unit);
+  }
+
   public String toString() {
   public String toString() {
     return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor);
     return String.format(Locale.ROOT, "ExecutorServiceFuturePool(executor=%s)", executor);
   }
   }

+ 2 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/concurrent/HadoopExecutors.java

@@ -115,9 +115,8 @@ public final class HadoopExecutors {
     try {
     try {
       executorService.shutdown();
       executorService.shutdown();
 
 
-      logger.debug(
-          "Gracefully shutting down executor service. Waiting max {} {}",
-          timeout, unit);
+      logger.debug("Gracefully shutting down executor service {}. Waiting max {} {}",
+          executorService, timeout, unit);
       if (!executorService.awaitTermination(timeout, unit)) {
       if (!executorService.awaitTermination(timeout, unit)) {
         logger.debug(
         logger.debug(
             "Executor service has not shutdown yet. Forcing. "
             "Executor service has not shutdown yet. Forcing. "

+ 4 - 0
hadoop-tools/hadoop-aws/dev-support/findbugs-exclude.xml

@@ -59,6 +59,10 @@
     <Method name="openFileWithOptions"/>
     <Method name="openFileWithOptions"/>
     <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
     <Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
   </Match>
   </Match>
+  <Match>
+    <Class name="org.apache.hadoop.fs.s3a.S3AFileSystem"/>
+    <Bug pattern="IS2_INCONSISTENT_SYNC"/>
+  </Match>
   <Match>
   <Match>
     <Class name="org.apache.hadoop.fs.s3a.s3guard.S3GuardTool$BucketInfo"/>
     <Class name="org.apache.hadoop.fs.s3a.s3guard.S3GuardTool$BucketInfo"/>
     <Method name="run"/>
     <Method name="run"/>

+ 4 - 6
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -633,17 +633,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
       // amazon client exception: stop all services then throw the translation
       // amazon client exception: stop all services then throw the translation
       cleanupWithLogger(LOG, span);
       cleanupWithLogger(LOG, span);
       stopAllServices();
       stopAllServices();
-      if (this.futurePool != null) {
-        this.futurePool = null;
-      }
       throw translateException("initializing ", new Path(name), e);
       throw translateException("initializing ", new Path(name), e);
     } catch (IOException | RuntimeException e) {
     } catch (IOException | RuntimeException e) {
       // other exceptions: stop the services.
       // other exceptions: stop the services.
       cleanupWithLogger(LOG, span);
       cleanupWithLogger(LOG, span);
       stopAllServices();
       stopAllServices();
-      if (this.futurePool != null) {
-        this.futurePool = null;
-      }
       throw e;
       throw e;
     }
     }
   }
   }
@@ -4038,6 +4032,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
     HadoopExecutors.shutdown(unboundedThreadPool, LOG,
     HadoopExecutors.shutdown(unboundedThreadPool, LOG,
         THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
         THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
     unboundedThreadPool = null;
     unboundedThreadPool = null;
+    if (futurePool != null) {
+      futurePool.shutdown(LOG, THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+      futurePool = null;
+    }
     // other services are shutdown.
     // other services are shutdown.
     cleanupWithLogger(LOG,
     cleanupWithLogger(LOG,
         instrumentation,
         instrumentation,