فهرست منبع

HADOOP-17040. Fix intermittent failure of ITestBlockingThreadPoolExecutorService. (#2020)

(cherry picked from commit 968531463375ebf29ba3186c13b5f8685df10d25)
Masatake Iwasaki 5 سال پیش
والد
کامیت
77e69a73da

+ 24 - 13
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestBlockingThreadPoolExecutorService.java

@@ -31,11 +31,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 
 /**
  * Basic test for S3A's blocking executor service.
@@ -92,11 +92,12 @@ public class ITestBlockingThreadPoolExecutorService {
    */
   protected void verifyQueueSize(ExecutorService executorService,
       int expectedQueueSize) {
-    StopWatch stopWatch = new StopWatch().start();
+    CountDownLatch latch = new CountDownLatch(1);
     for (int i = 0; i < expectedQueueSize; i++) {
-      executorService.submit(sleeper);
-      assertDidntBlock(stopWatch);
+      executorService.submit(new LatchedSleeper(latch));
     }
+    StopWatch stopWatch = new StopWatch().start();
+    latch.countDown();
     executorService.submit(sleeper);
     assertDidBlock(stopWatch);
   }
@@ -124,15 +125,6 @@ public class ITestBlockingThreadPoolExecutorService {
 
   // Helper functions, etc.
 
-  private void assertDidntBlock(StopWatch sw) {
-    try {
-      assertFalse("Non-blocking call took too long.",
-          sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
-    } finally {
-      sw.reset().start();
-    }
-  }
-
   private void assertDidBlock(StopWatch sw) {
     try {
       if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
@@ -164,6 +156,25 @@ public class ITestBlockingThreadPoolExecutorService {
     }
   };
 
+  private class LatchedSleeper implements Runnable {
+    private final CountDownLatch latch;
+
+    LatchedSleeper(CountDownLatch latch) {
+      this.latch = latch;
+    }
+
+    @Override
+    public void run() {
+      try {
+        latch.await();
+        Thread.sleep(TASK_SLEEP_MSEC);
+      } catch (InterruptedException e) {
+        LOG.info("Thread {} interrupted.", Thread.currentThread().getName());
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   /**
    * Helper function to create thread pool under test.
    */