Преглед изворни кода

ZOOKEEPER-795. eventThread isn't shutdown after a connection "session expired" event coming (Sergey Doroshenko and Ben via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@982485 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar пре 15 година
родитељ
комит
b95f1ca96a

+ 3 - 0
CHANGES.txt

@@ -75,6 +75,9 @@ BUGFIXES:
   ZOOKEEPER-790.  Last processed zxid set prematurely while establishing 
   ZOOKEEPER-790.  Last processed zxid set prematurely while establishing 
   leadership (flavio via mahadev)
   leadership (flavio via mahadev)
 
 
+  ZOOKEEPER-795. eventThread isn't shutdown after a connection 
+  "session expired" event coming (Sergey Doroshenko and Ben via mahadev)
+
 IMPROVEMENTS:
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)
   (phunt via mahadev)

+ 14 - 2
src/java/main/org/apache/zookeeper/ClientCnxn.java

@@ -430,6 +430,17 @@ public class ClientCnxn {
         }
         }
     }
     }
 
 
+    /**
+     * Guard against creating "-EventThread-EventThread-EventThread-..." thread
+     * names when ZooKeeper object is being created from within a watcher.
+     * See ZOOKEEPER-795 for details.
+     */
+    private static String makeThreadName(String suffix) {
+        String name = Thread.currentThread().getName().
+            replaceAll("-EventThread", "");
+        return name + suffix;
+    }
+
     class EventThread extends Thread {
     class EventThread extends Thread {
         private final LinkedBlockingQueue<Object> waitingEvents =
         private final LinkedBlockingQueue<Object> waitingEvents =
             new LinkedBlockingQueue<Object>();
             new LinkedBlockingQueue<Object>();
@@ -441,7 +452,7 @@ public class ClientCnxn {
         private volatile KeeperState sessionState = KeeperState.Disconnected;
         private volatile KeeperState sessionState = KeeperState.Disconnected;
 
 
         EventThread() {
         EventThread() {
-            super(currentThread().getName() + "-EventThread");
+            super(makeThreadName("-EventThread"));
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setDaemon(true);
             setDaemon(true);
         }
         }
@@ -689,6 +700,7 @@ public class ClientCnxn {
                 eventThread.queueEvent(new WatchedEvent(
                 eventThread.queueEvent(new WatchedEvent(
                         Watcher.Event.EventType.None,
                         Watcher.Event.EventType.None,
                         Watcher.Event.KeeperState.Expired, null));
                         Watcher.Event.KeeperState.Expired, null));
+                eventThread.queueEventOfDeath();
                 throw new SessionExpiredException(
                 throw new SessionExpiredException(
                         "Unable to reconnect to ZooKeeper service, session 0x"
                         "Unable to reconnect to ZooKeeper service, session 0x"
                         + Long.toHexString(sessionId) + " has expired");
                         + Long.toHexString(sessionId) + " has expired");
@@ -898,7 +910,7 @@ public class ClientCnxn {
         }
         }
 
 
         SendThread() {
         SendThread() {
-            super(currentThread().getName() + "-SendThread()");
+            super(makeThreadName("-SendThread()"));
             zooKeeper.state = States.CONNECTING;
             zooKeeper.state = States.CONNECTING;
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setUncaughtExceptionHandler(uncaughtExceptionHandler);
             setDaemon(true);
             setDaemon(true);

+ 40 - 1
src/java/test/org/apache/zookeeper/test/SessionTest.java

@@ -23,7 +23,9 @@ import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 import java.io.File;
 import java.io.File;
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.LinkedList;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
@@ -194,18 +196,55 @@ public class SessionTest extends ZKTestCase implements Watcher {
         zk.close();
         zk.close();
     }
     }
 
 
+    private List<Thread> findThreads(String name) {
+        int threadCount = Thread.activeCount();
+        Thread threads[] = new Thread[threadCount*2];
+        threadCount = Thread.enumerate(threads);
+        ArrayList<Thread> list = new ArrayList<Thread>();
+        for(int i = 0; i < threadCount; i++) {
+            if (threads[i].getName().indexOf(name) != -1) {
+                list.add(threads[i]);
+            }
+        }
+        return list;
+    }
+
     /**
     /**
      * Make sure ephemerals get cleaned up when a session times out.
      * Make sure ephemerals get cleaned up when a session times out.
      */
      */
     @Test
     @Test
     public void testSessionTimeout() throws Exception {
     public void testSessionTimeout() throws Exception {
         final int TIMEOUT = 5000;
         final int TIMEOUT = 5000;
+        List<Thread> etBefore = findThreads("EventThread");
+        List<Thread> stBefore = findThreads("SendThread");
         DisconnectableZooKeeper zk = createClient(TIMEOUT);
         DisconnectableZooKeeper zk = createClient(TIMEOUT);
         zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE,
         zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE,
                 CreateMode.EPHEMERAL);
                 CreateMode.EPHEMERAL);
-        zk.disconnect();
+
+        // Find the new event and send threads
+        List<Thread> etAfter = findThreads("EventThread");
+        List<Thread> stAfter = findThreads("SendThread");
+        Thread eventThread = null;
+        Thread sendThread = null;
+        for(Thread t: etAfter) {
+            if (!etBefore.contains(t)) {
+                eventThread = t;
+                break;
+            }
+        }
+        for(Thread t: stAfter) {
+            if (!stBefore.contains(t)) {
+                sendThread = t;
+                break;
+            }
+        }
+        sendThread.suspend();
+        //zk.disconnect();
 
 
         Thread.sleep(TIMEOUT*2);
         Thread.sleep(TIMEOUT*2);
+        sendThread.resume();
+        eventThread.join(TIMEOUT);
+        Assert.assertFalse("EventThread is still running", eventThread.isAlive());
 
 
         zk = createClient(TIMEOUT);
         zk = createClient(TIMEOUT);
         zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE,
         zk.create("/stest", new byte[0], Ids.OPEN_ACL_UNSAFE,