Browse Source

[ZOOKEEPER-2368] Send a watch event is when a client is closed

Currently, if the client is closed (rather than being remotely disconnected) there is no notification to the watcher. This means that asynchronous clients can end up waiting indefinitely for events that will never come. Watchers need to be aware that the client is closed for good.

Signed-off-by: Tim Ward <timothyjwardapache.org>

Note that this is a variation on a patch I produced some time ago, which was broadly accepted as a good idea, and didn't cause any problems for Curator, but was deemed by some to be too risky because it reused an existing KeeperState. This patch is therefore updated to use a new `Closed` KeeperState. Fixing this would allow me to avoid maintaining a separate fork of Zookeeper just to support this one feature!

Author: Tim Ward <timothyjward@apache.org>

Reviewers: Andor Molnar <andor@apache.org>

Closes #529 from timothyjward/ZOOKEEPER-2368 and squashes the following commits:

d7196d19 [Tim Ward] Review comments from @anmolnar
088056b4 [Tim Ward] Review comments from @enixon
7fad1d36 [Tim Ward] [ZOOKEEPER-2368] Send a watch event is when a client is closed

(cherry picked from commit 6748a0e3f58f2a398dec4c6988bc70ea4363b807)
Signed-off-by: Andor Molnar <andor@cloudera.com>
Tim Ward 7 years ago
parent
commit
2a951868bd

+ 7 - 0
src/java/main/org/apache/zookeeper/ClientCnxn.java

@@ -1264,6 +1264,8 @@ public class ClientCnxn {
                 eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                         Event.KeeperState.Disconnected, null));
             }
+            eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
+                        Event.KeeperState.Closed, null));
             ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                     "SendThread exited loop for session: 0x"
                            + Long.toHexString(getSessionId()));
@@ -1438,6 +1440,11 @@ public class ClientCnxn {
         }
 
         sendThread.close();
+        try {
+            sendThread.join();
+        } catch (InterruptedException ex) {
+            LOG.warn("Got interrupted while waiting for the sender thread to close", ex);
+        }
         eventThread.queueEventOfDeath();
         if (zooKeeperSaslClient != null) {
             zooKeeperSaslClient.shutdown();

+ 9 - 1
src/java/main/org/apache/zookeeper/Watcher.java

@@ -84,7 +84,14 @@ public interface Watcher {
              * client connection (the session) is no longer valid. You must
              * create a new client connection (instantiate a new ZooKeeper
              * instance) if you with to access the ensemble. */
-            Expired (-112);
+            Expired (-112),
+            
+            /** 
+             * The client has been closed. This state is never generated by
+             * the server, but is generated locally when a client calls
+             * {@link ZooKeeper#close()} or {@link ZooKeeper#close(int)}
+             */
+            Closed (7);
 
             private final int intValue;     // Integer representation of value
                                             // for sending over wire
@@ -107,6 +114,7 @@ public interface Watcher {
                     case    5: return KeeperState.ConnectedReadOnly;
                     case    6: return KeeperState.SaslAuthenticated;
                     case -112: return KeeperState.Expired;
+                    case   7: return KeeperState.Closed;
 
                     default:
                         throw new RuntimeException("Invalid integer value for conversion to KeeperState");

+ 53 - 0
src/java/test/org/apache/zookeeper/test/WatcherTest.java

@@ -19,6 +19,7 @@
 package org.apache.zookeeper.test;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -32,6 +33,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.AsyncCallback.StatCallback;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
+import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.Watcher.Event;
 import org.apache.zookeeper.Watcher.Event.EventType;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -140,6 +142,57 @@ public class WatcherTest extends ClientBase {
             }
         }
     }
+    
+    @Test
+    public void testWatcherDisconnectOnClose() 
+        throws IOException, InterruptedException, KeeperException 
+    {
+        ZooKeeper zk = null;
+        try {
+            final BlockingQueue<WatchedEvent> queue = new LinkedBlockingQueue<>();
+            
+            MyWatcher connWatcher = new MyWatcher();
+            
+            Watcher watcher = new Watcher(){
+                @Override
+                public void process(WatchedEvent event) {
+                    try {
+                        queue.put(event);
+                    } catch (InterruptedException e) {
+                        // Oh well, never mind
+                    }
+                }
+                
+            };
+            
+            zk = createClient(connWatcher, hostPort);
+    
+            StatCallback scb = new StatCallback() {
+                public void processResult(int rc, String path, Object ctx,
+                        Stat stat) {
+                    // don't do anything
+                }
+            };
+            
+            // Register a watch on the node
+            zk.exists("/missing", watcher, scb, null);
+            
+            // Close the client without changing the node
+            zk.close();
+            
+            
+            WatchedEvent event = queue.poll(10, TimeUnit.SECONDS);
+            
+            Assert.assertNotNull("No watch event was received after closing the Zookeeper client. A 'Closed' event should have occurred", event);
+            Assert.assertEquals("Closed events are not generated by the server, and so should have a type of 'None'", Event.EventType.None, event.getType());
+            Assert.assertEquals("A 'Closed' event was expected as the Zookeeper client was closed without altering the node it was watching", Event.KeeperState.Closed, event.getState());
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+
+    }
 
     @Test
     public void testWatcherCount()