瀏覽代碼

ZOOKEEPER-3059: EventThread leak in case of Sasl AuthFailed

Since authFailed is similar to session expired and is considered a fatal event, we should clean up after ourselves once we get a AuthFailed, other wise this results in an unavoidable and un-cleanable thread leak  of EventThread since the close operation is also a no-op (we return after checking for isAlive).

Author: Abhishek Singh Chouhan <abhishekchouhan121@gmail.com>

Reviewers: Andor Molnar <andor@apache.org>

Closes #541 from abhishek-chouhan/master and squashes the following commits:

c54a83a4 [Abhishek Singh Chouhan] ZOOKEEPER-3059 EventThread leak in case of Sasl AuthFailed. Adding testcase for the scenario
c1d9d7af [Abhishek Singh Chouhan] ZOOKEEPER-3059 EventThread leak in case of Sasl AuthFailed
Abhishek Singh Chouhan 7 年之前
父節點
當前提交
1fb644662b

+ 5 - 1
src/java/main/org/apache/zookeeper/ClientCnxn.java

@@ -820,7 +820,8 @@ public class ClientCnxn {
                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                 if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                     state = States.AUTH_FAILED;                    
                     state = States.AUTH_FAILED;                    
                     eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                     eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
-                            Watcher.Event.KeeperState.AuthFailed, null) );            		            		
+                            Watcher.Event.KeeperState.AuthFailed, null) );
+                    eventThread.queueEventOfDeath();
                 }
                 }
                 if (LOG.isDebugEnabled()) {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Got auth sessionid:0x"
                     LOG.debug("Got auth sessionid:0x"
@@ -1164,6 +1165,9 @@ public class ClientCnxn {
                                 eventThread.queueEvent(new WatchedEvent(
                                 eventThread.queueEvent(new WatchedEvent(
                                       Watcher.Event.EventType.None,
                                       Watcher.Event.EventType.None,
                                       authState,null));
                                       authState,null));
+                                if (state == States.AUTH_FAILED) {
+                                  eventThread.queueEventOfDeath();
+                                }
                             }
                             }
                         }
                         }
                         to = readTimeout - clientCnxnSocket.getIdleRecv();
                         to = readTimeout - clientCnxnSocket.getIdleRecv();

+ 42 - 3
src/java/test/org/apache/zookeeper/SaslAuthTest.java

@@ -26,8 +26,10 @@ import java.io.IOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 
+import org.apache.zookeeper.ClientCnxn.EventThread;
 import org.apache.zookeeper.ClientCnxn.SendThread;
 import org.apache.zookeeper.ClientCnxn.SendThread;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.Watcher.Event.KeeperState;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Ids;
@@ -88,7 +90,7 @@ public class SaslAuthTest extends ClientBase {
         System.clearProperty("java.security.auth.login.config");
         System.clearProperty("java.security.auth.login.config");
     }
     }
 
 
-    private AtomicInteger authFailed = new AtomicInteger(0);
+    private final CountDownLatch authFailed = new CountDownLatch(1);
     
     
     @Override
     @Override
     protected TestableZooKeeper createClient(String hp)
     protected TestableZooKeeper createClient(String hp)
@@ -102,7 +104,7 @@ public class SaslAuthTest extends ClientBase {
         @Override
         @Override
         public synchronized void process(WatchedEvent event) {
         public synchronized void process(WatchedEvent event) {
             if (event.getState() == KeeperState.AuthFailed) {
             if (event.getState() == KeeperState.AuthFailed) {
-                authFailed.incrementAndGet();
+                authFailed.countDown();
             }
             }
             else {
             else {
                 super.process(event);
                 super.process(event);
@@ -210,4 +212,41 @@ public class SaslAuthTest extends ClientBase {
         saslLoginFailedField.setBoolean(sendThread, true);
         saslLoginFailedField.setBoolean(sendThread, true);
     }
     }
 
 
+    @Test
+    public void testThreadsShutdownOnAuthFailed() throws Exception {
+        MyWatcher watcher = new MyWatcher();
+        ZooKeeper zk = null;
+        try {
+            zk = new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher);
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            try {
+                zk.addAuthInfo("FOO", "BAR".getBytes());
+                zk.getData("/path1", false, null);
+                Assert.fail("Should get auth state error");
+            } catch (KeeperException.AuthFailedException e) {
+                if (!authFailed.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)) {
+                    Assert.fail("Should have called my watcher");
+                }
+            }
+            Field cnxnField = zk.getClass().getDeclaredField("cnxn");
+            cnxnField.setAccessible(true);
+            ClientCnxn clientCnxn = (ClientCnxn) cnxnField.get(zk);
+            Field sendThreadField = clientCnxn.getClass().getDeclaredField("sendThread");
+            sendThreadField.setAccessible(true);
+            SendThread sendThread = (SendThread) sendThreadField.get(clientCnxn);
+            Field eventThreadField = clientCnxn.getClass().getDeclaredField("eventThread");
+            eventThreadField.setAccessible(true);
+            EventThread eventThread = (EventThread) eventThreadField.get(clientCnxn);
+            sendThread.join(CONNECTION_TIMEOUT);
+            eventThread.join(CONNECTION_TIMEOUT);
+            Assert.assertFalse("SendThread did not shutdown after authFail", sendThread.isAlive());
+            Assert.assertFalse("EventThread did not shutdown after authFail",
+                eventThread.isAlive());
+        } finally {
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
 }
 }