Prechádzať zdrojové kódy

ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1082260 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 14 rokov pred
rodič
commit
858e308a70

+ 2 - 0
CHANGES.txt

@@ -192,6 +192,8 @@ BUGFIXES:
 
   ZOOKEEPER-1012. support distinct JVMFLAGS for zookeeper server in zkServer.sh and zookeeper client in zkCli.sh (Eugene Koontz via breed)
 
+  ZOOKEEPER-880. QuorumCnxManager$SendWorker grows without bounds (vishal via breed)
+
 IMPROVEMENTS:
   ZOOKEEPER-724. Improve junit test integration - log harness information 
   (phunt via mahadev)

+ 32 - 4
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -32,6 +32,8 @@ import java.util.Enumeration;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Date;
 
 import org.apache.log4j.Logger;
 
@@ -110,6 +112,11 @@ public class QuorumCnxManager {
      */
     public final Listener listener;
 
+    /*
+     * Counter to count worker threads
+     */
+    private AtomicInteger threadCnt = new AtomicInteger(0);
+
     static public class Message {
         Message(ByteBuffer buffer, long sid) {
             this.buffer = buffer;
@@ -177,7 +184,7 @@ public class QuorumCnxManager {
             // Otherwise proceed with the connection
         } else {
             SendWorker sw = new SendWorker(sock, sid);
-            RecvWorker rw = new RecvWorker(sock, sid);
+            RecvWorker rw = new RecvWorker(sock, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -253,7 +260,7 @@ public class QuorumCnxManager {
             // Otherwise start worker threads to receive data.
         } else {
             SendWorker sw = new SendWorker(sock, sid);
-            RecvWorker rw = new RecvWorker(sock, sid);
+            RecvWorker rw = new RecvWorker(sock, sid, sw);
             sw.setRecv(rw);
 
             SendWorker vsw = senderWorkerMap.get(sid);
@@ -447,6 +454,19 @@ public class QuorumCnxManager {
         }
     }
 
+    /**
+     * Return number of worker threads
+     */
+    public long getThreadCount() {
+        return threadCnt.get();
+    }
+    /**
+     * Return reference to QuorumPeer
+     */
+    public QuorumPeer getQuorumPeer() {
+        return self;
+    }
+
     /**
      * Thread to listen on some port
      */
@@ -591,6 +611,7 @@ public class QuorumCnxManager {
                 LOG.debug("Removing entry from senderWorkerMap sid=" + sid);
             }
             senderWorkerMap.remove(sid, this);
+            threadCnt.decrementAndGet();
             return running;
         }
         
@@ -610,6 +631,7 @@ public class QuorumCnxManager {
 
         @Override
         public void run() {
+            threadCnt.incrementAndGet();
             try {
                 ByteBuffer b = lastMessageSent.get(sid);
                 if (b != null) {
@@ -662,10 +684,12 @@ public class QuorumCnxManager {
         Socket sock;
         volatile boolean running = true;
         DataInputStream din;
+        final SendWorker sw;
 
-        RecvWorker(Socket sock, Long sid) {
+        RecvWorker(Socket sock, Long sid, SendWorker sw) {
             this.sid = sid;
             this.sock = sock;
+            this.sw = sw;
             try {
                 din = new DataInputStream(sock.getInputStream());
                 // OK to wait until socket disconnects while reading.
@@ -692,11 +716,13 @@ public class QuorumCnxManager {
             running = false;            
 
             this.interrupt();
+            threadCnt.decrementAndGet();
             return running;
         }
 
         @Override
         public void run() {
+            threadCnt.incrementAndGet();
             try {
                 while (running && !shutdown && sock != null) {
                     /**
@@ -719,8 +745,10 @@ public class QuorumCnxManager {
                 }
             } catch (Exception e) {
                 LOG.warn("Connection broken for id " + sid + ", my id = " + 
-                        self.getId() + ", error = " + e);
+                        self.getId() + ", error = " , e);
             } finally {
+                LOG.warn("Interrupting SendWorker");
+                sw.finish();
                 if (sock != null) {
                     closeSocket(sock);
                 }

+ 11 - 3
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -74,6 +74,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     QuorumBean jmxQuorumBean;
     LocalPeerBean jmxLocalPeerBean;
     LeaderElectionBean jmxLeaderElectionBean;
+    QuorumCnxManager qcm;
 
     /* ZKDatabase is a top level member of quorumpeer 
      * which will be used in all the zookeeperservers
@@ -524,11 +525,11 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
             le = new AuthFastLeaderElection(this, true);
             break;
         case 3:
-            QuorumCnxManager mng = new QuorumCnxManager(this);
-            QuorumCnxManager.Listener listener = mng.listener;
+            qcm = new QuorumCnxManager(this);
+            QuorumCnxManager.Listener listener = qcm.listener;
             if(listener != null){
                 listener.start();
-                le = new FastLeaderElection(this,mng);
+                le = new FastLeaderElection(this, qcm);
             } else {
                 LOG.error("Null listener when initializing cnx manager");
             }
@@ -957,4 +958,11 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     public boolean isRunning() {
         return running;
     }
+
+    /**
+     * get reference to QuorumCnxManager
+     */
+    public QuorumCnxManager getQuorumCnxManager() {
+        return qcm;
+}
 }

+ 104 - 25
src/java/test/org/apache/zookeeper/test/CnxManagerTest.java

@@ -22,18 +22,19 @@ import java.io.File;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
-import java.net.Socket;
+import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import java.io.*;
+import java.net.Socket;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager;
-import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.junit.Assert;
@@ -46,25 +47,26 @@ public class CnxManagerTest extends ZKTestCase {
 
     int count;
     HashMap<Long,QuorumServer> peers;
-    File tmpdir[];
-    int port[];
-
+    File peerTmpdir[];
+    int peerQuorumPort[];
+    int peerClientPort[];
     @Before
     public void setUp() throws Exception {
 
         this.count = 3;
-        this.peers = new HashMap<Long,QuorumServer>(count);
-        tmpdir = new File[count];
-        port = new int[count];
-
+        this.peers = new HashMap<Long,QuorumServer>(count); 
+        peerTmpdir = new File[count];
+        peerQuorumPort = new int[count];
+        peerClientPort = new int[count];
+        
         for(int i = 0; i < count; i++) {
-            int clientport = PortAssignment.unique();
+            peerQuorumPort[i] = PortAssignment.unique();
+            peerClientPort[i] = PortAssignment.unique();
             peers.put(Long.valueOf(i),
                     new QuorumServer(i,
-                            new InetSocketAddress(clientport),
+                            new InetSocketAddress(peerQuorumPort[i]),
                     new InetSocketAddress(PortAssignment.unique())));
-            tmpdir[i] = ClientBase.createTmpDir();
-            port[i] = clientport;
+            peerTmpdir[i] = ClientBase.createTmpDir();
         }
     }
 
@@ -94,7 +96,7 @@ public class CnxManagerTest extends ZKTestCase {
 
         public void run(){
             try {
-                QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 2, 2, 2);
+                QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[0], peerTmpdir[0], peerClientPort[0], 3, 0, 2, 2, 2);
                 QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
                 QuorumCnxManager.Listener listener = cnxManager.listener;
                 if(listener != null){
@@ -137,8 +139,8 @@ public class CnxManagerTest extends ZKTestCase {
         CnxManagerThread thread = new CnxManagerThread();
 
         thread.start();
-
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2, 2, 2);
         QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -155,7 +157,7 @@ public class CnxManagerTest extends ZKTestCase {
             m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
             if(m == null) cnxManager.connectAll();
         }
-
+        
         Assert.assertTrue("Exceeded number of retries", numRetries <= THRESHOLD);
 
         thread.join(5000);
@@ -181,10 +183,9 @@ public class CnxManagerTest extends ZKTestCase {
                 new QuorumServer(2,
                         new InetSocketAddress(deadAddress, deadPort),
                         new InetSocketAddress(deadAddress, PortAssignment.unique())));
-        tmpdir[2] = ClientBase.createTmpDir();
-        port[2] = deadPort;
-            
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        peerTmpdir[2] = ClientBase.createTmpDir();
+    
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2, 2, 2);
         QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -211,7 +212,7 @@ public class CnxManagerTest extends ZKTestCase {
      */
     @Test
     public void testCnxManagerSpinLock() throws Exception {               
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2, 2, 2);
         QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -258,14 +259,16 @@ public class CnxManagerTest extends ZKTestCase {
         } catch (Exception e) {
             LOG.info("Socket has been closed as expected");
         }
-    }
+        peer.shutdown();
+        cnxManager.halt();
+    }   
 
     /*
      * Test if a receiveConnection is able to timeout on socket errors
      */
     @Test
     public void testSocketTimeout() throws Exception {
-        QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 2000, 2, 2);
+        QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[1], peerTmpdir[1], peerClientPort[1], 3, 1, 2000, 2, 2);
         QuorumCnxManager cnxManager = new QuorumCnxManager(peer);
         QuorumCnxManager.Listener listener = cnxManager.listener;
         if(listener != null){
@@ -286,4 +289,80 @@ public class CnxManagerTest extends ZKTestCase {
         long end = System.currentTimeMillis();
         if((end - begin) > ((peer.getSyncLimit() * peer.getTickTime()) + 500)) Assert.fail("Waited more than necessary");
     }
+
+    /*
+     * Test if Worker threads are getting killed after connection loss
+     */
+    @Test
+    public void testWorkerThreads() throws Exception {
+        ArrayList<QuorumPeer> peerList = new ArrayList<QuorumPeer>();
+
+        for (int sid = 0; sid < 3; sid++) {
+            QuorumPeer peer = new QuorumPeer(peers, peerTmpdir[sid], peerTmpdir[sid],
+                                             peerClientPort[sid], 3, sid, 1000, 2, 2);
+            LOG.info("Starting peer " + peer.getId());
+            peer.start();
+            peerList.add(sid, peer);
+        }
+        String failure = verifyThreadCount(peerList, 4);
+        if (failure != null) {
+            Assert.fail(failure);
+        }
+        for (int myid = 0; myid < 3; myid++) {
+            for (int i = 0; i < 5; i++) {
+                // halt one of the listeners and verify count
+                QuorumPeer peer = peerList.get(myid);
+                LOG.info("Round " + i + ", halting peer " + peer.getId());
+                peer.shutdown();
+                peerList.remove(myid);
+                failure = verifyThreadCount(peerList, 2);
+                if (failure != null) {
+                    Assert.fail(failure);
+                }
+
+                // Restart halted node and verify count
+                peer = new QuorumPeer(peers, peerTmpdir[myid], peerTmpdir[myid],
+                                        peerClientPort[myid], 3, myid, 1000, 2, 2);
+                LOG.info("Round " + i + ", restarting peer " + peer.getId());
+                peer.start();
+                peerList.add(myid, peer);
+                failure = verifyThreadCount(peerList, 4);
+                if (failure != null) {
+                    Assert.fail(failure);
+                }
+            }
+        }
+    }
+
+    /**
+     * Returns null on success, otw the message assoc with the failure 
+     * @throws InterruptedException
+     */
+    public String verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt)
+        throws InterruptedException
+    {
+        String failure = null;
+        for (int i = 0; i < 120; i++) {
+            Thread.sleep(500);
+
+            failure = _verifyThreadCount(peerList, ecnt);
+            if (failure == null) {
+                return null;
+            }
+        }
+        return failure;
+    }
+    public String _verifyThreadCount(ArrayList<QuorumPeer> peerList, long ecnt) {
+        for (int myid = 0; myid < peerList.size(); myid++) {
+            QuorumPeer peer = peerList.get(myid);
+            QuorumCnxManager cnxManager = peer.getQuorumCnxManager();
+            long cnt = cnxManager.getThreadCount();
+            if (cnt != ecnt) {
+                return new String(new Date()
+                    + " Incorrect number of Worker threads for sid=" + myid
+                    + " expected " + ecnt + " found " + cnt);
+            }
+        }
+        return null;
+    }
 }