Pārlūkot izejas kodu

ZOOKEEPER-3183: Interrupting the WatcherCleaner thread during shutdown

…are waiting list to add watchers during the shutdown  and avoid adding the dead watchers when shut down is initiated

Author: vtumati <vtumati@paypal.com>

Reviewers: fangmin@apache.org, andor@apache.org

Closes #689 from tumativ/ZOOKEEPER-3183
vtumati 6 gadi atpakaļ
vecāks
revīzija
061e76123e

+ 11 - 5
zookeeper-server/src/main/java/org/apache/zookeeper/server/watch/WatcherCleaner.java

@@ -50,6 +50,7 @@ public class WatcherCleaner extends Thread {
 
     private volatile boolean stopped = false;
     private final Object cleanEvent = new Object();
+    private final Object processingCompletedEvent = new Object();
     private final Random r = new Random(System.nanoTime());
     private final WorkerService cleaners;
 
@@ -102,12 +103,13 @@ public class WatcherCleaner extends Thread {
                 totalDeadWatchers.get() >= maxInProcessingDeadWatchers) {
             try {
                 RATE_LOGGER.rateLimitLog("Waiting for dead watchers cleaning");
-                synchronized(totalDeadWatchers) {
-                    totalDeadWatchers.wait(100);
+                synchronized(processingCompletedEvent) {
+                    processingCompletedEvent.wait(100);
                 }
             } catch (InterruptedException e) {
                 LOG.info("Got interrupted while waiting for dead watches " +
                         "queue size");
+                break;
             }
         }
         synchronized (this) {
@@ -129,7 +131,7 @@ public class WatcherCleaner extends Thread {
                 try {
                     // add some jitter to avoid cleaning dead watchers at the
                     // same time in the quorum
-                    if (deadWatchers.size() < watcherCleanThreshold) {
+                    if (!stopped && deadWatchers.size() < watcherCleanThreshold) {
                         int maxWaitMs = (watcherCleanIntervalInSeconds +
                             r.nextInt(watcherCleanIntervalInSeconds / 2 + 1)) * 1000;
                         cleanEvent.wait(maxWaitMs);
@@ -163,8 +165,8 @@ public class WatcherCleaner extends Thread {
                         long latency = Time.currentElapsedTime() - startTime;
                         LOG.info("Takes {} to process {} watches", latency, total);
                         totalDeadWatchers.addAndGet(-total);
-                        synchronized(totalDeadWatchers) {
-                            totalDeadWatchers.notifyAll();
+                        synchronized(processingCompletedEvent) {
+                            processingCompletedEvent.notifyAll();
                         }
                     }
                 });
@@ -177,6 +179,10 @@ public class WatcherCleaner extends Thread {
         stopped = true;
         deadWatchers.clear();
         cleaners.stop();
+        this.interrupt();
+        if (LOG.isInfoEnabled()) {
+            LOG.info("WatcherCleaner thread shutdown is initiated");
+        }
     }
 
 }