Browse Source

HADOOP-15729. [s3a] Allow core threads to time out. (#1075)

Sean Mackrory 5 years ago
parent
commit
5672efa5c7

+ 2 - 2
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

@@ -42,7 +42,6 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -213,7 +212,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
   private boolean enableMultiObjectsDelete;
   private TransferManager transfers;
   private ListeningExecutorService boundedThreadPool;
-  private ExecutorService unboundedThreadPool;
+  private ThreadPoolExecutor unboundedThreadPool;
   private int executorCapacity;
   private long multiPartThreshold;
   public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
@@ -440,6 +439,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
         new LinkedBlockingQueue<>(),
         BlockingThreadPoolExecutorService.newDaemonThreadFactory(
             "s3a-transfer-unbounded"));
+    unboundedThreadPool.allowCoreThreadTimeOut(true);
     executorCapacity = intOption(conf,
         EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
   }

+ 110 - 65
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3AConcurrentOps.java

@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.contract.ContractTestUtils;
 import org.apache.hadoop.fs.contract.ContractTestUtils.NanoTimer;
+import org.apache.hadoop.fs.s3a.Constants;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
 
@@ -52,10 +53,8 @@ public class ITestS3AConcurrentOps extends S3AScaleTestBase {
   private static final Logger LOG = LoggerFactory.getLogger(
       ITestS3AConcurrentOps.class);
   private final int concurrentRenames = 10;
+  private final int fileSize = 1024 * 1024;
   private Path testRoot;
-  private Path[] source = new Path[concurrentRenames];
-  private Path[] target = new Path[concurrentRenames];
-  private S3AFileSystem fs;
   private S3AFileSystem auxFs;
 
   @Override
@@ -66,20 +65,44 @@ public class ITestS3AConcurrentOps extends S3AScaleTestBase {
   @Override
   public void setup() throws Exception {
     super.setup();
-    fs = getRestrictedFileSystem();
     auxFs = getNormalFileSystem();
 
     testRoot = path("/ITestS3AConcurrentOps");
     testRoot = S3ATestUtils.createTestPath(testRoot);
+  }
+
+  private S3AFileSystem getNormalFileSystem() throws Exception {
+    S3AFileSystem s3a = new S3AFileSystem();
+    Configuration conf = createScaleConfiguration();
+    URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
+    s3a.initialize(rootURI, conf);
+    return s3a;
+  }
+
+  @After
+  public void teardown() throws Exception {
+    super.teardown();
+    if (auxFs != null) {
+      auxFs.delete(testRoot, true);
+      auxFs.close();
+    }
+  }
+
+  private void parallelRenames(int concurrentRenames, final S3AFileSystem fs,
+      String sourceNameBase, String targetNameBase) throws ExecutionException,
+      InterruptedException, IOException {
+
+    Path[] source = new Path[concurrentRenames];
+    Path[] target = new Path[concurrentRenames];
 
     for (int i = 0; i < concurrentRenames; i++){
-      source[i] = new Path(testRoot, "source" + i);
-      target[i] = new Path(testRoot, "target" + i);
+      source[i] = new Path(testRoot, sourceNameBase + i);
+      target[i] = new Path(testRoot, targetNameBase + i);
     }
 
     LOG.info("Generating data...");
     auxFs.mkdirs(testRoot);
-    byte[] zeroes = ContractTestUtils.dataset(1024*1024, 0, Integer.MAX_VALUE);
+    byte[] zeroes = ContractTestUtils.dataset(fileSize, 0, Integer.MAX_VALUE);
     for (Path aSource : source) {
       try(FSDataOutputStream out = auxFs.create(aSource)) {
         for (int mb = 0; mb < 20; mb++) {
@@ -89,38 +112,48 @@ public class ITestS3AConcurrentOps extends S3AScaleTestBase {
       }
     }
     LOG.info("Data generated...");
-  }
-
-  private S3AFileSystem getRestrictedFileSystem() throws Exception {
-    Configuration conf = getConfiguration();
-    conf.setInt(MAX_THREADS, 2);
-    conf.setInt(MAX_TOTAL_TASKS, 1);
 
-    conf.set(MIN_MULTIPART_THRESHOLD, "10M");
-    conf.set(MULTIPART_SIZE, "5M");
-
-    S3AFileSystem s3a = getFileSystem();
-    URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
-    s3a.initialize(rootURI, conf);
-    return s3a;
-  }
-
-  private S3AFileSystem getNormalFileSystem() throws Exception {
-    S3AFileSystem s3a = new S3AFileSystem();
-    Configuration conf = createScaleConfiguration();
-    URI rootURI = new URI(conf.get(TEST_FS_S3A_NAME));
-    s3a.initialize(rootURI, conf);
-    return s3a;
-  }
+    ExecutorService executor = Executors.newFixedThreadPool(
+        concurrentRenames, new ThreadFactory() {
+          private AtomicInteger count = new AtomicInteger(0);
 
-  @After
-  public void teardown() throws Exception {
-    super.teardown();
-    if (auxFs != null) {
-      auxFs.delete(testRoot, true);
+          public Thread newThread(Runnable r) {
+            return new Thread(r,
+                "testParallelRename" + count.getAndIncrement());
+          }
+        });
+    try {
+      ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
+      Future<Boolean>[] futures = new Future[concurrentRenames];
+      for (int i = 0; i < concurrentRenames; i++) {
+        final int index = i;
+        futures[i] = executor.submit(new Callable<Boolean>() {
+          @Override
+          public Boolean call() throws Exception {
+            NanoTimer timer = new NanoTimer();
+            boolean result = fs.rename(source[index], target[index]);
+            timer.end("parallel rename %d", index);
+            LOG.info("Rename {} ran from {} to {}", index,
+                timer.getStartTime(), timer.getEndTime());
+            return result;
+          }
+        });
+      }
+      LOG.info("Waiting for tasks to complete...");
+      LOG.info("Deadlock may have occurred if nothing else is logged" +
+          " or the test times out");
+      for (int i = 0; i < concurrentRenames; i++) {
+        assertTrue("No future " + i, futures[i].get());
+        assertPathExists("target path", target[i]);
+        assertPathDoesNotExist("source path", source[i]);
+      }
+      LOG.info("All tasks have completed successfully");
+    } finally {
+      executor.shutdown();
     }
   }
 
+
   /**
    * Attempts to trigger a deadlock that would happen if any bounded resource
    * pool became saturated with control tasks that depended on other tasks
@@ -130,39 +163,51 @@ public class ITestS3AConcurrentOps extends S3AScaleTestBase {
   @SuppressWarnings("unchecked")
   public void testParallelRename() throws InterruptedException,
       ExecutionException, IOException {
-    ExecutorService executor = Executors.newFixedThreadPool(
-        concurrentRenames, new ThreadFactory() {
-          private AtomicInteger count = new AtomicInteger(0);
 
-          public Thread newThread(Runnable r) {
-            return new Thread(r,
-                "testParallelRename" + count.getAndIncrement());
-          }
-        });
-    ((ThreadPoolExecutor)executor).prestartAllCoreThreads();
-    Future<Boolean>[] futures = new Future[concurrentRenames];
-    for (int i = 0; i < concurrentRenames; i++) {
-      final int index = i;
-      futures[i] = executor.submit(new Callable<Boolean>() {
-        @Override
-        public Boolean call() throws Exception {
-          NanoTimer timer = new NanoTimer();
-          boolean result = fs.rename(source[index], target[index]);
-          timer.end("parallel rename %d", index);
-          LOG.info("Rename {} ran from {} to {}", index,
-              timer.getStartTime(), timer.getEndTime());
-          return result;
-        }
-      });
+    Configuration conf = getConfiguration();
+    conf.setInt(MAX_THREADS, 2);
+    conf.setInt(MAX_TOTAL_TASKS, 1);
+
+    conf.set(MIN_MULTIPART_THRESHOLD, "10K");
+    conf.set(MULTIPART_SIZE, "5K");
+
+    try (S3AFileSystem tinyThreadPoolFs = new S3AFileSystem()) {
+      tinyThreadPoolFs.initialize(auxFs.getUri(), conf);
+
+      parallelRenames(concurrentRenames, tinyThreadPoolFs,
+          "testParallelRename-source", "testParallelRename-target");
     }
-    LOG.info("Waiting for tasks to complete...");
-    LOG.info("Deadlock may have occurred if nothing else is logged" +
-        " or the test times out");
-    for (int i = 0; i < concurrentRenames; i++) {
-      assertTrue("No future " + i, futures[i].get());
-      assertPathExists("target path", target[i]);
-      assertPathDoesNotExist("source path", source[i]);
+  }
+
+  @Test
+  public void testThreadPoolCoolDown() throws InterruptedException,
+      ExecutionException, IOException {
+
+    int hotThreads = 0;
+    int coldThreads = 0;
+
+    parallelRenames(concurrentRenames, auxFs,
+        "testThreadPoolCoolDown-source", "testThreadPoolCoolDown-target");
+
+    for (Thread t : Thread.getAllStackTraces().keySet()) {
+      if (t.getName().startsWith("s3a-transfer")) {
+        hotThreads++;
+      }
+    }
+
+    int timeoutMs = Constants.DEFAULT_KEEPALIVE_TIME * 1000;
+    Thread.sleep((int)(1.1 * timeoutMs));
+
+    for (Thread t : Thread.getAllStackTraces().keySet()) {
+      if (t.getName().startsWith("s3a-transfer")) {
+        coldThreads++;
+      }
     }
-    LOG.info("All tasks have completed successfully");
+
+    assertNotEquals("Failed to find threads in active FS - test is flawed",
+        hotThreads, 0);
+    assertTrue("s3a-transfer threads went from " + hotThreads + " to " +
+        coldThreads + ", should have gone to 0", 0 == coldThreads);
+
   }
 }