Explorar o código

ZOOKEEPER-117 threading issues in Leader election

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@698743 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed %!s(int64=16) %!d(string=hai) anos
pai
achega
d284d6fdae
Modificáronse 2 ficheiros con 60 adicións e 20 borrados
  1. 3 0
      CHANGES.txt
  2. 57 20
      src/java/main/org/apache/zookeeper/server/quorum/Leader.java

+ 3 - 0
CHANGES.txt

@@ -66,3 +66,6 @@ Backward compatibile changes:
  again. (breed via mahadev)
 
  ZOOKEEPER-137. client watcher objects can lose events (Patrick Hunt via breed)
+
+ ZOOKEEPER-117. threading issues in Leader election (Flavio Junqueira and Patrick
+ Hunt via breed)

+ 57 - 20
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -24,6 +24,7 @@ import java.net.BindException;
 import java.net.ServerSocket;
 import java.net.Socket;
 import java.net.SocketAddress;
+import java.net.SocketException;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -59,6 +60,9 @@ public class Leader {
 
     QuorumPeer self;
 
+    // the follower acceptor thread
+    FollowerCnxAcceptor cnxAcceptor;
+    
     // list of all the followers
     public HashSet<FollowerHandler> followers = new HashSet<FollowerHandler>();
 
@@ -194,6 +198,42 @@ public class Leader {
     ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
 
     Proposal newLeaderProposal = new Proposal();
+    
+    class FollowerCnxAcceptor extends Thread{
+        private volatile boolean stop = false;
+        
+        @Override
+        public void run() {
+            try {
+                while (!stop) {
+                    try{
+                        Socket s = ss.accept();
+                        s.setSoTimeout(self.tickTime * self.syncLimit);
+                        s.setTcpNoDelay(true);
+                        new FollowerHandler(s, Leader.this);
+                    } catch (SocketException e) {
+                        if (stop) {
+                            LOG.info("exception while shutting down acceptor: "
+                                    + e);
+
+                            // When Leader.shutdown() calls ss.close(),
+                            // the call to accept throws an exception.
+                            // We catch and set stop to true.
+                            stop = true;
+                        } else {
+                            throw e;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                LOG.warn("Exception while accepting follower", e);
+            }
+        }
+        
+        public void halt() {
+            stop = true;
+        }
+    }
 
     /**
      * This method is main function that is called to lead
@@ -217,21 +257,12 @@ public class Leader {
                     + newLeaderProposal.packet.getZxid());
         }
         outstandingProposals.add(newLeaderProposal);
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    while (true) {
-                        Socket s = ss.accept();
-                        s.setSoTimeout(self.tickTime * self.syncLimit);
-                        s.setTcpNoDelay(true);
-                        new FollowerHandler(s, Leader.this);
-                    }
-                } catch (Exception e) {
-                    LOG.warn("Exception while accepting follower", e);
-                }
-            }
-        }.start();
+        
+        // Start thread that waits for connection requests from 
+        // new followers.
+        cnxAcceptor = new FollowerCnxAcceptor();
+        cnxAcceptor.start();
+        
         // We have to get at least a majority of servers in sync with
         // us. We do this by waiting for the NEWLEADER packet to get
         // acknowledged
@@ -256,9 +287,12 @@ public class Leader {
             self.cnxnFactory.setZooKeeperServer(zk);
         }
         // Everything is a go, simply start counting the ticks
-        synchronized (this) {
-            notifyAll();
-        }
+        // WARNING: I couldn't find any wait statement on a synchronized
+        // block that would be notified by this notifyAll() call, so
+        // I commented it out
+        //synchronized (this) {
+        //    notifyAll();
+        //}
         // We ping twice a tick, so we only update the tick every other
         // iteration
         boolean tickSkip = true;
@@ -299,9 +333,12 @@ public class Leader {
         if (isShutdown) {
             return;
         }
+        
+        LOG.info("Shutdown called",
+                new Exception("shutdown Leader! reason: " + reason));
 
-        LOG.error("FIXMSG",new Exception("shutdown Leader! reason: "
-                        + reason));
+        cnxAcceptor.halt();
+        
         // NIO should not accept conenctions
         self.cnxnFactory.setZooKeeperServer(null);
         // clear all the connections