Просмотр исходного кода

HADOOP-13139 Branch-2: S3a to use thread pool that blocks clients. Contributed by Pieter Reuse.

Steve Loughran 8 лет назад
Родитель
Сommit
89121745c0

+ 2 - 8
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -925,17 +925,11 @@
 
 <property>
   <name>fs.s3a.threads.max</name>
-  <value>256</value>
+  <value>10</value>
   <description> Maximum number of concurrent active (part)uploads,
     which each use a thread from the threadpool.</description>
 </property>
 
-<property>
-  <name>fs.s3a.threads.core</name>
-  <value>15</value>
-  <description>Number of core threads in the threadpool.</description>
-</property>
-
 <property>
   <name>fs.s3a.threads.keepalivetime</name>
   <value>60</value>
@@ -945,7 +939,7 @@
 
 <property>
   <name>fs.s3a.max.total.tasks</name>
-  <value>1000</value>
+  <value>5</value>
   <description>Number of (part)uploads allowed to the queue before
     blocking additional uploads.</description>
 </property>

+ 272 - 0
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/BlockingThreadPoolExecutorService.java

@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.util.concurrent.ForwardingListeningExecutorService;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+
+/**
+ * This ExecutorService blocks the submission of new tasks when its queue is
+ * already full by using a semaphore. Task submissions require permits, task
+ * completions release permits.
+ * <p>
+ * This is inspired by <a href="https://github.com/apache/incubator-s4/blob/master/subprojects/s4-comm/src/main/java/org/apache/s4/comm/staging/BlockingThreadPoolExecutorService.java">
+ * this s4 threadpool</a>
+ */
+public class BlockingThreadPoolExecutorService
+    extends ForwardingListeningExecutorService {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(BlockingThreadPoolExecutorService.class);
+
+  private Semaphore queueingPermits;
+  private ListeningExecutorService executorDelegatee;
+
+  private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);
+
+  /**
+   * Returns a {@link java.util.concurrent.ThreadFactory} that names each
+   * created thread uniquely,
+   * with a common prefix.
+   *
+   * @param prefix The prefix of every created Thread's name
+   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
+   */
+  public static ThreadFactory getNamedThreadFactory(final String prefix) {
+    SecurityManager s = System.getSecurityManager();
+    final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() :
+        Thread.currentThread().getThreadGroup();
+
+    return new ThreadFactory() {
+      private final AtomicInteger threadNumber = new AtomicInteger(1);
+      private final int poolNum = POOLNUMBER.getAndIncrement();
+      private final ThreadGroup group = threadGroup;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        final String name =
+            prefix + "-pool" + poolNum + "-t" + threadNumber.getAndIncrement();
+        return new Thread(group, r, name);
+      }
+    };
+  }
+
+  /**
+   * Get a named {@link ThreadFactory} that just builds daemon threads.
+   *
+   * @param prefix name prefix for all threads created from the factory
+   * @return a thread factory that creates named, daemon threads with
+   * the supplied exception handler and normal priority
+   */
+  private static ThreadFactory newDaemonThreadFactory(final String prefix) {
+    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
+    return new ThreadFactory() {
+      @Override
+      public Thread newThread(Runnable r) {
+        Thread t = namedFactory.newThread(r);
+        if (!t.isDaemon()) {
+          t.setDaemon(true);
+        }
+        if (t.getPriority() != Thread.NORM_PRIORITY) {
+          t.setPriority(Thread.NORM_PRIORITY);
+        }
+        return t;
+      }
+
+    };
+  }
+
+
+  /**
+   * A thread pool that that blocks clients submitting additional tasks if
+   * there are already {@code activeTasks} running threads and {@code
+   * waitingTasks} tasks waiting in its queue.
+   *
+   * @param activeTasks maximum number of active tasks
+   * @param waitingTasks maximum number of waiting tasks
+   * @param keepAliveTime time until threads are cleaned up in {@code unit}
+   * @param unit time unit
+   * @param prefixName prefix of name for threads
+   */
+  public BlockingThreadPoolExecutorService(int activeTasks, int waitingTasks,
+      long keepAliveTime, TimeUnit unit, String prefixName) {
+    super();
+    queueingPermits = new Semaphore(waitingTasks + activeTasks, false);
+    /* Although we generally only expect up to waitingTasks tasks in the
+    queue, we need to be able to buffer all tasks in case dequeueing is
+    slower than enqueueing. */
+    final BlockingQueue<Runnable> workQueue =
+        new LinkedBlockingQueue<>(waitingTasks + activeTasks);
+    ThreadPoolExecutor eventProcessingExecutor =
+        new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
+            workQueue, newDaemonThreadFactory(prefixName),
+            new RejectedExecutionHandler() {
+          @Override
+            public void rejectedExecution(Runnable r,
+                ThreadPoolExecutor executor) {
+              // This is not expected to happen.
+              LOG.error("Could not submit task to executor {}",
+                  executor.toString());
+            }
+          });
+    eventProcessingExecutor.allowCoreThreadTimeOut(true);
+    executorDelegatee =
+        MoreExecutors.listeningDecorator(eventProcessingExecutor);
+
+  }
+
+  @Override
+  protected ListeningExecutorService delegate() {
+    return executorDelegatee;
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Callable<T> task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new CallableWithPermitRelease<T>(task));
+  }
+
+  @Override
+  public <T> ListenableFuture<T> submit(Runnable task, T result) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task), result);
+  }
+
+  @Override
+  public ListenableFuture<?> submit(Runnable task) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      return Futures.immediateFailedCheckedFuture(e);
+    }
+    return super.submit(new RunnableWithPermitRelease(task));
+  }
+
+  @Override
+  public void execute(Runnable command) {
+    try {
+      queueingPermits.acquire();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+    super.execute(new RunnableWithPermitRelease(command));
+  }
+
+  /**
+   * Releases a permit after the task is executed.
+   */
+  class RunnableWithPermitRelease implements Runnable {
+
+    private Runnable delegatee;
+
+    public RunnableWithPermitRelease(Runnable delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public void run() {
+      try {
+        delegatee.run();
+      } finally {
+        queueingPermits.release();
+      }
+
+    }
+  }
+
+  /**
+   * Releases a permit after the task is completed.
+   */
+  class CallableWithPermitRelease<T> implements Callable<T> {
+
+    private Callable<T> delegatee;
+
+    public CallableWithPermitRelease(Callable<T> delegatee) {
+      this.delegatee = delegatee;
+    }
+
+    @Override
+    public T call() throws Exception {
+      try {
+        return delegatee.call();
+      } finally {
+        queueingPermits.release();
+      }
+    }
+
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
+      long timeout, TimeUnit unit) throws InterruptedException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
+      throws InterruptedException, ExecutionException {
+    throw new RuntimeException("Not implemented");
+  }
+
+  @Override
+  public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout,
+      TimeUnit unit)
+      throws InterruptedException, ExecutionException, TimeoutException {
+    throw new RuntimeException("Not implemented");
+  }
+
+}

+ 7 - 7
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

@@ -88,20 +88,20 @@ public final class Constants {
 
   // the maximum number of threads to allow in the pool used by TransferManager
   public static final String MAX_THREADS = "fs.s3a.threads.max";
-  public static final int DEFAULT_MAX_THREADS = 256;
+  public static final int DEFAULT_MAX_THREADS = 10;
 
-  // the number of threads to keep in the pool used by TransferManager
+  // unused option: maintained for compile-time compatibility.
+  // if set, a warning is logged in S3A during init
+  @Deprecated
   public static final String CORE_THREADS = "fs.s3a.threads.core";
-  public static final int DEFAULT_CORE_THREADS = DEFAULT_MAXIMUM_CONNECTIONS;
 
-  // when the number of threads is greater than the core, this is the maximum time
-  // that excess idle threads will wait for new tasks before terminating.
+  // the time an idle thread waits before terminating
   public static final String KEEPALIVE_TIME = "fs.s3a.threads.keepalivetime";
   public static final int DEFAULT_KEEPALIVE_TIME = 60;
 
-  // the maximum number of tasks that the LinkedBlockingQueue can hold
+  // the maximum number of tasks cached if all threads are already uploading
   public static final String MAX_TOTAL_TASKS = "fs.s3a.max.total.tasks";
-  public static final int DEFAULT_MAX_TOTAL_TASKS = 1000;
+  public static final int DEFAULT_MAX_TOTAL_TASKS = 5;
 
   // size of each of or multipart pieces in bytes
   public static final String MULTIPART_SIZE = "fs.s3a.multipart.size";

+ 2 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFastOutputStream.java

@@ -49,7 +49,7 @@ import java.util.List;
 
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
 
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.Statistic.*;
@@ -110,7 +110,7 @@ public class S3AFastOutputStream extends OutputStream {
       CannedAccessControlList cannedACL,
       long partSize,
       long multiPartThreshold,
-      ThreadPoolExecutor threadPoolExecutor)
+      ExecutorService threadPoolExecutor)
       throws IOException {
     this.bucket = bucket;
     this.key = key;

+ 24 - 76
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -29,11 +29,9 @@ import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
 import com.amazonaws.AmazonClientException;
@@ -121,7 +119,7 @@ public class S3AFileSystem extends FileSystem {
   private long partSize;
   private boolean enableMultiObjectsDelete;
   private TransferManager transfers;
-  private ThreadPoolExecutor threadPoolExecutor;
+  private ExecutorService threadPoolExecutor;
   private long multiPartThreshold;
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
   private CannedAccessControlList cannedACL;
@@ -130,62 +128,12 @@ public class S3AFileSystem extends FileSystem {
   private S3AStorageStatistics storageStatistics;
   private long readAhead;
   private S3AInputPolicy inputPolicy;
+  private static final AtomicBoolean warnedOfCoreThreadDeprecation =
+      new AtomicBoolean(false);
 
   // The maximum number of entries that can be deleted in any call to s3
   private static final int MAX_ENTRIES_TO_DELETE = 1000;
 
-  private static final AtomicInteger poolNumber = new AtomicInteger(1);
-
-  /**
-   * Returns a {@link java.util.concurrent.ThreadFactory} that names each created thread uniquely,
-   * with a common prefix.
-   * @param prefix The prefix of every created Thread's name
-   * @return a {@link java.util.concurrent.ThreadFactory} that names threads
-   */
-  public static ThreadFactory getNamedThreadFactory(final String prefix) {
-    SecurityManager s = System.getSecurityManager();
-    final ThreadGroup threadGroup = (s != null)
-        ? s.getThreadGroup()
-        : Thread.currentThread().getThreadGroup();
-
-    return new ThreadFactory() {
-      private final AtomicInteger threadNumber = new AtomicInteger(1);
-      private final int poolNum = poolNumber.getAndIncrement();
-      private final ThreadGroup group = threadGroup;
-
-      @Override
-      public Thread newThread(Runnable r) {
-        final String name = String.format("%s-pool%03d-t%04d",
-            prefix, poolNum, threadNumber.getAndIncrement());
-        return new Thread(group, r, name);
-      }
-    };
-  }
-
-  /**
-   * Get a named {@link ThreadFactory} that just builds daemon threads.
-   * @param prefix name prefix for all threads created from the factory
-   * @return a thread factory that creates named, daemon threads with
-   *         the supplied exception handler and normal priority
-   */
-  private static ThreadFactory newDaemonThreadFactory(final String prefix) {
-    final ThreadFactory namedFactory = getNamedThreadFactory(prefix);
-    return new ThreadFactory() {
-      @Override
-      public Thread newThread(Runnable r) {
-        Thread t = namedFactory.newThread(r);
-        if (!t.isDaemon()) {
-          t.setDaemon(true);
-        }
-        if (t.getPriority() != Thread.NORM_PRIORITY) {
-          t.setPriority(Thread.NORM_PRIORITY);
-        }
-        return t;
-      }
-
-    };
-  }
-
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.
    *   for this FileSystem
@@ -258,27 +206,27 @@ public class S3AFileSystem extends FileSystem {
                     }
                   });
 
-      int maxThreads = intOption(conf, MAX_THREADS, DEFAULT_MAX_THREADS, 0);
-      int coreThreads = intOption(conf, CORE_THREADS, DEFAULT_CORE_THREADS, 0);
-      if (maxThreads == 0) {
-        maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+      if (conf.get("fs.s3a.threads.core") != null &&
+          warnedOfCoreThreadDeprecation.compareAndSet(false, true)) {
+        LoggerFactory.getLogger(
+            "org.apache.hadoop.conf.Configuration.deprecation")
+            .warn("Unsupported option \"fs.s3a.threads.core\"" +
+                " will be ignored {}", conf.get("fs.s3a.threads.core"));
+      }
+      int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
+      if (maxThreads < 2) {
+        LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
+        maxThreads = 2;
       }
-      if (coreThreads == 0) {
-        coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+      int totalTasks = conf.getInt(MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS);
+      if (totalTasks < 1) {
+        LOG.warn(MAX_TOTAL_TASKS + "must be at least 1: forcing to 1.");
+        totalTasks = 1;
       }
-      long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
-          DEFAULT_KEEPALIVE_TIME, 0);
-      LinkedBlockingQueue<Runnable> workQueue =
-          new LinkedBlockingQueue<>(maxThreads *
-              intOption(conf, MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1));
-      threadPoolExecutor = new ThreadPoolExecutor(
-          coreThreads,
-          maxThreads,
-          keepAliveTime,
-          TimeUnit.SECONDS,
-          workQueue,
-          newDaemonThreadFactory("s3a-transfer-shared-"));
-      threadPoolExecutor.allowCoreThreadTimeOut(true);
+      long keepAliveTime = conf.getLong(KEEPALIVE_TIME, DEFAULT_KEEPALIVE_TIME);
+      threadPoolExecutor = new BlockingThreadPoolExecutorService(maxThreads,
+          maxThreads + totalTasks, keepAliveTime, TimeUnit.SECONDS,
+          "s3a-transfer-shared");
 
       initTransferManager();
 

+ 2 - 8
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/index.md

@@ -495,17 +495,11 @@ this capability.
 
     <property>
       <name>fs.s3a.threads.max</name>
-      <value>256</value>
+      <value>10</value>
       <description> Maximum number of concurrent active (part)uploads,
       which each use a thread from the threadpool.</description>
     </property>
 
-    <property>
-      <name>fs.s3a.threads.core</name>
-      <value>15</value>
-      <description>Number of core threads in the threadpool.</description>
-    </property>
-
     <property>
       <name>fs.s3a.threads.keepalivetime</name>
       <value>60</value>
@@ -515,7 +509,7 @@ this capability.
 
     <property>
       <name>fs.s3a.max.total.tasks</name>
-      <value>1000</value>
+      <value>5</value>
       <description>Number of (part)uploads allowed to the queue before
       blocking additional uploads.</description>
     </property>

+ 182 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestBlockingThreadPoolExecutorService.java

@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.hadoop.util.StopWatch;
+import org.junit.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+
+/**
+ * Basic unit test for S3A's blocking executor service.
+ */
+public class TestBlockingThreadPoolExecutorService {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      BlockingThreadPoolExecutorService.class);
+
+  private static final int NUM_ACTIVE_TASKS = 4;
+  private static final int NUM_WAITING_TASKS = 2;
+  private static final int TASK_SLEEP_MSEC = 100;
+  private static final int SHUTDOWN_WAIT_MSEC = 200;
+  private static final int SHUTDOWN_WAIT_TRIES = 5;
+  private static final int BLOCKING_THRESHOLD_MSEC = 50;
+
+  private static final Integer SOME_VALUE = 1337;
+
+  private static BlockingThreadPoolExecutorService tpe = null;
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    ensureDestroyed();
+  }
+
+  /**
+   * Basic test of running one trivial task.
+   */
+  @Test
+  public void testSubmitCallable() throws Exception {
+    ensureCreated();
+    ListenableFuture<Integer> f = tpe.submit(callableSleeper);
+    Integer v = f.get();
+    assertEquals(SOME_VALUE, v);
+  }
+
+  /**
+   * More involved test, including detecting blocking when at capacity.
+   */
+  @Test
+  public void testSubmitRunnable() throws Exception {
+    ensureCreated();
+    int totalTasks = NUM_ACTIVE_TASKS + NUM_WAITING_TASKS;
+    StopWatch stopWatch = new StopWatch().start();
+    for (int i = 0; i < totalTasks; i++) {
+      tpe.submit(sleeper);
+      assertDidntBlock(stopWatch);
+    }
+    tpe.submit(sleeper);
+    assertDidBlock(stopWatch);
+  }
+
+  @Test
+  public void testShutdown() throws Exception {
+    // Cover create / destroy, regardless of when this test case runs
+    ensureCreated();
+    ensureDestroyed();
+
+    // Cover create, execute, destroy, regardless of when test case runs
+    ensureCreated();
+    testSubmitRunnable();
+    ensureDestroyed();
+  }
+
+  // Helper functions, etc.
+
+  private void assertDidntBlock(StopWatch sw) {
+    try {
+      assertFalse("Non-blocking call took too long.",
+          sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
+    } finally {
+      sw.reset().start();
+    }
+  }
+
+  private void assertDidBlock(StopWatch sw) {
+    try {
+      if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
+        throw new RuntimeException("Blocking call returned too fast.");
+      }
+    } finally {
+      sw.reset().start();
+    }
+  }
+
+  private Runnable sleeper = new Runnable() {
+    @Override
+    public void run() {
+      String name = Thread.currentThread().getName();
+      try {
+        Thread.sleep(TASK_SLEEP_MSEC);
+      } catch (InterruptedException e) {
+        LOG.info("Thread {} interrupted.", name);
+        Thread.currentThread().interrupt();
+      }
+    }
+  };
+
+  private Callable<Integer> callableSleeper = new Callable<Integer>() {
+    @Override
+    public Integer call() throws Exception {
+      sleeper.run();
+      return SOME_VALUE;
+    }
+  };
+
+  /**
+   * Helper function to create thread pool under test.
+   */
+  private static void ensureCreated() throws Exception {
+    if (tpe == null) {
+      LOG.debug("Creating thread pool");
+      tpe = new BlockingThreadPoolExecutorService(NUM_ACTIVE_TASKS,
+          NUM_WAITING_TASKS, 1, TimeUnit.SECONDS, "btpetest");
+    }
+  }
+
+  /**
+   * Helper function to terminate thread pool under test, asserting that
+   * shutdown -> terminate works as expected.
+   */
+  private static void ensureDestroyed() throws Exception {
+    if (tpe == null) {
+      return;
+    }
+    int shutdownTries = SHUTDOWN_WAIT_TRIES;
+
+    tpe.shutdown();
+    if (!tpe.isShutdown()) {
+      throw new RuntimeException("Shutdown had no effect.");
+    }
+
+    while (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+        TimeUnit.MILLISECONDS)) {
+      LOG.info("Waiting for thread pool shutdown.");
+      if (shutdownTries-- <= 0) {
+        LOG.error("Failed to terminate thread pool gracefully.");
+        break;
+      }
+    }
+    if (!tpe.isTerminated()) {
+      tpe.shutdownNow();
+      if (!tpe.awaitTermination(SHUTDOWN_WAIT_MSEC,
+          TimeUnit.MILLISECONDS)) {
+        throw new RuntimeException(
+            "Failed to terminate thread pool in timely manner.");
+      }
+    }
+    tpe = null;
+  }
+}

+ 80 - 0
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3ABlockingThreadPool.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+
+/**
+ * Demonstrate that the threadpool blocks additional client requests if
+ * its queue is full (rather than throwing an exception) by initiating an
+ * upload consisting of 4 parts with 2 threads and 1 spot in the queue. The
+ * 4th part should not trigger an exception as it would with a
+ * non-blocking threadpool.
+ */
+public class TestS3ABlockingThreadPool {
+
+  private Configuration conf;
+  private S3AFileSystem fs;
+
+  @Rule
+  public Timeout testTimeout = new Timeout(30 * 60 * 1000);
+
+  protected Path getTestPath() {
+    return new Path("/tests3a");
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.setLong(Constants.MIN_MULTIPART_THRESHOLD, 5 * 1024 * 1024);
+    conf.setLong(Constants.MULTIPART_SIZE, 5 * 1024 * 1024);
+    conf.setInt(Constants.MAX_THREADS, 2);
+    conf.setInt(Constants.MAX_TOTAL_TASKS, 1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (fs != null) {
+      fs.delete(getTestPath(), true);
+    }
+  }
+
+  @Test
+  public void testRegularMultiPartUpload() throws Exception {
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+        1024);
+  }
+
+  @Test
+  public void testFastMultiPartUpload() throws Exception {
+    conf.setBoolean(Constants.FAST_UPLOAD, true);
+    fs = S3ATestUtils.createTestFileSystem(conf);
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(), 16 * 1024 *
+        1024);
+
+  }
+}