Bladeren bron

HDFS-11131. TestThrottledAsyncChecker#testCancellation is flaky.

Arpit Agarwal 8 jaren geleden
bovenliggende
commit
75e60b6bb7

+ 9 - 16
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java

@@ -187,28 +187,21 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
 
   /**
    * {@inheritDoc}.
+   *
+   * The results of in-progress checks are not useful during shutdown,
+   * so we optimize for faster shutdown by interrupt all actively
+   * executing checks.
    */
   @Override
   public void shutdownAndWait(long timeout, TimeUnit timeUnit)
       throws InterruptedException {
-    // Try orderly shutdown.
-    executorService.shutdown();
-
-    if (!executorService.awaitTermination(timeout, timeUnit)) {
-      // Interrupt executing tasks and wait again.
-      executorService.shutdownNow();
-      executorService.awaitTermination(timeout, timeUnit);
-    }
     if (scheduledExecutorService != null) {
-      // Try orderly shutdown
-      scheduledExecutorService.shutdown();
-
-      if (!scheduledExecutorService.awaitTermination(timeout, timeUnit)) {
-        // Interrupt executing tasks and wait again.
-        scheduledExecutorService.shutdownNow();
-        scheduledExecutorService.awaitTermination(timeout, timeUnit);
-      }
+      scheduledExecutorService.shutdownNow();
+      scheduledExecutorService.awaitTermination(timeout, timeUnit);
     }
+
+    executorService.shutdownNow();
+    executorService.awaitTermination(timeout, timeUnit);
   }
 
   /**

+ 34 - 84
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode.checker;
 
 import com.google.common.base.Optional;
 import com.google.common.base.Supplier;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.FakeTimer;
@@ -29,12 +27,9 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import static org.junit.Assert.assertFalse;
@@ -93,35 +88,9 @@ public class TestThrottledAsyncChecker {
     waitTestCheckableCheckCount(target2, 2L);
   }
 
-  @Test (timeout=60000)
-  public void testCancellation() throws Exception {
-    LatchedCheckable target = new LatchedCheckable();
-    final FakeTimer timer = new FakeTimer();
-    final LatchedCallback callback = new LatchedCallback(target);
-    ThrottledAsyncChecker<Boolean, Boolean> checker =
-        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
-            getExecutorService());
-
-    Optional<ListenableFuture<Boolean>> olf =
-        checker.schedule(target, true);
-    if (olf.isPresent()) {
-      Futures.addCallback(olf.get(), callback);
-    }
-
-    // Request immediate cancellation.
-    checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
-    try {
-      assertFalse(olf.get().get());
-      fail("Failed to get expected InterruptedException");
-    } catch (ExecutionException ee) {
-      assertTrue(ee.getCause() instanceof InterruptedException);
-    }
-    callback.failureLatch.await();
-  }
-
   @Test (timeout=60000)
   public void testConcurrentChecks() throws Exception {
-    final LatchedCheckable target = new LatchedCheckable();
+    final StalledCheckable target = new StalledCheckable();
     final FakeTimer timer = new FakeTimer();
     final ThrottledAsyncChecker<Boolean, Boolean> checker =
         new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
@@ -136,25 +105,6 @@ public class TestThrottledAsyncChecker {
     // for the first caller.
     assertTrue(olf1.isPresent());
     assertFalse(olf2.isPresent());
-
-    // Unblock the latch and wait for it to finish execution.
-    target.latch.countDown();
-    olf1.get().get();
-
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        // We should get an absent Optional.
-        // This can take a short while until the internal callback in
-        // ThrottledAsyncChecker is scheduled for execution.
-        // Also this should not trigger a new check operation as the timer
-        // was not advanced. If it does trigger a new check then the test
-        // will fail with a timeout.
-        final Optional<ListenableFuture<Boolean>> olf3 =
-            checker.schedule(target, true);
-        return !olf3.isPresent();
-      }
-    }, 100, 10000);
   }
 
   /**
@@ -192,6 +142,32 @@ public class TestThrottledAsyncChecker {
       }
     }, 100, 10000);
   }
+
+  /**
+   * Ensure that an exception thrown by the check routine is
+   * propagated.
+   *
+   * @throws Exception
+   */
+  @Test(timeout=60000)
+  public void testExceptionIsPropagated() throws Exception {
+    final ThrowingCheckable target = new ThrowingCheckable();
+    final FakeTimer timer = new FakeTimer();
+    ThrottledAsyncChecker<Boolean, Boolean> checker =
+        new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
+            getExecutorService());
+
+    final Optional<ListenableFuture<Boolean>> olf =
+        checker.schedule(target, true);
+    assertTrue(olf.isPresent());
+    try {
+      olf.get().get();
+      fail("Failed to get expected ExecutionException");
+    } catch(ExecutionException ee) {
+      assertTrue(ee.getCause() instanceof DummyException);
+    }
+  }
+
   /**
    * Ensure that the exception from a failed check is cached
    * and returned without re-running the check when the minimum
@@ -246,6 +222,9 @@ public class TestThrottledAsyncChecker {
     }
   }
 
+  /**
+   * A Checkable that throws an exception when checked.
+   */
   private static class ThrowingCheckable
       extends TestCheckableBase {
     @Override
@@ -259,43 +238,14 @@ public class TestThrottledAsyncChecker {
   }
 
   /**
-   * A checkable that hangs until signaled.
+   * A checkable that hangs forever when checked.
    */
-  private static class LatchedCheckable
+  private static class StalledCheckable
       implements Checkable<Boolean, Boolean> {
-    private final CountDownLatch latch = new CountDownLatch(1);
-
     @Override
     public Boolean check(Boolean ignored) throws InterruptedException {
-      LOG.info("LatchedCheckable {} waiting.", this);
-      latch.await();
-      return true;  // Unreachable.
-    }
-  }
-
-  /**
-   * A {@link FutureCallback} that counts its invocations.
-   */
-  private static final class LatchedCallback
-      implements FutureCallback<Boolean> {
-    private final CountDownLatch successLatch = new CountDownLatch(1);
-    private final CountDownLatch failureLatch = new CountDownLatch(1);
-    private final Checkable target;
-
-    private LatchedCallback(Checkable target) {
-      this.target = target;
-    }
-
-    @Override
-    public void onSuccess(@Nonnull Boolean result) {
-      LOG.info("onSuccess callback invoked for {}", target);
-      successLatch.countDown();
-    }
-
-    @Override
-    public void onFailure(@Nonnull Throwable t) {
-      LOG.info("onFailure callback invoked for {} with exception", target, t);
-      failureLatch.countDown();
+      Thread.sleep(Long.MAX_VALUE);
+      return false; // Unreachable.
     }
   }
 }