Browse Source

ZOOKEEPER-512. FLE election fails to elect leader (flavio via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@830340 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 15 years ago
parent
commit
c472d685e0
2 changed files with 52 additions and 29 deletions
  1. 2 0
      CHANGES.txt
  2. 50 29
      src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

+ 2 - 0
CHANGES.txt

@@ -92,6 +92,8 @@ BUGFIXES:
 
   ZOOKEEPER-554. zkpython can segfault when statting a deleted node
   (henry robinson via phunt)
+
+  ZOOKEEPER-512. FLE election fails to elect leader (flavio via mahadev)
  
 IMPROVEMENTS:
   ZOOKEEPER-473. cleanup junit tests to eliminate false positives due to

+ 50 - 29
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -28,6 +28,7 @@ import java.nio.channels.UnresolvedAddressException;
 import java.util.Enumeration;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
 
@@ -314,9 +315,15 @@ public class QuorumCnxManager {
      */
     
     synchronized void connectOne(long sid){
-        if (senderWorkerMap.get(sid) == null){ 
-            InetSocketAddress electionAddr =
-                self.quorumPeers.get(sid).electionAddr;
+        if (senderWorkerMap.get(sid) == null){
+            InetSocketAddress electionAddr;
+            if(self.quorumPeers.containsKey(sid))
+                electionAddr =
+                    self.quorumPeers.get(sid).electionAddr;
+            else{
+                LOG.warn("Invalid server id: " + sid);
+                return;
+            }
             try {
                 SocketChannel channel;
                 LOG.debug("Opening channel to server "  + sid);
@@ -403,27 +410,38 @@ public class QuorumCnxManager {
          */
         @Override
         public void run() {
-            try {
-                ss = ServerSocketChannel.open();
-                int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
-                LOG.info("My election bind port: " + port);
-                ss.socket().setReuseAddress(true); 
-                ss.socket().bind(new InetSocketAddress(port));
-
-                while (!shutdown) {
-                    SocketChannel client = ss.accept();
-                    Socket sock = client.socket();
-                    sock.setTcpNoDelay(true);
+            int numRetries = 0;
+            while((!shutdown) && (numRetries < 3)){
+                try {
+                    ss = ServerSocketChannel.open();
+                    int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
+                    LOG.info("My election bind port: " + port);
+                    ss.socket().setReuseAddress(true); 
+                    ss.socket().bind(new InetSocketAddress(port));
+
+                    while (!shutdown) {
+                        SocketChannel client = ss.accept();
+                        Socket sock = client.socket();
+                        sock.setTcpNoDelay(true);
                     
-                    LOG.debug("Connection request "
-                            + sock.getRemoteSocketAddress());
+                        LOG.debug("Connection request "
+                                + sock.getRemoteSocketAddress());
                     
-                    LOG.debug("Connection request: " + self.getId());
-                    receiveConnection(client);
+                        LOG.debug("Connection request: " + self.getId());
+                        receiveConnection(client);
+                        numRetries = 0;
+                    }
+                } catch (IOException e) {
+                    LOG.error("Exception while listening", e);
+                    numRetries++;
                 }
-            } catch (IOException e) {
-                LOG.error("Listener.run: " + e.getMessage());
             }
+            LOG.info("Leaving listener");
+            if(!shutdown)
+                LOG.fatal("As I'm leaving the listener thread, " +
+                		"I won't be able to participate in leader " +
+                		"election any longer: " + 
+                		self.quorumPeers.get(self.getId()).electionAddr);
         }
         
         /**
@@ -530,7 +548,7 @@ public class QuorumCnxManager {
                 try {
                     ArrayBlockingQueue<ByteBuffer> bq = queueSendMap.get(sid);                    
                     if(bq != null) 
-                        b = bq.take();
+                        b = bq.poll(1000, TimeUnit.MILLISECONDS);
                     else {
                         LOG.error("No queue of incoming messages for server " + sid);
                         this.finish();
@@ -541,11 +559,11 @@ public class QuorumCnxManager {
                     continue;
                 }
                 
-                if(b != null)
-                    lastMessageSent.put(sid, b);
-                
                 try {
-                    if(b != null) send(b);
+                    if(b != null){
+                        lastMessageSent.put(sid, b);
+                        send(b);
+                    }
                 } catch (Exception e) {
                     LOG.warn("Exception when using channel: " + sid, e);
                     this.finish();
@@ -617,11 +635,14 @@ public class QuorumCnxManager {
                     }
                 }
 
-            } catch (IOException e) {
+            } catch (Exception e) {
                 LOG.warn("Connection broken: ", e);
-            } catch (InterruptedException e) {
-                LOG.warn("Interrupted while trying to add new "
-                        + "message to the reception queue", e);
+            } finally {
+                try{
+                    channel.socket().close();
+                } catch (IOException e) {
+                    LOG.warn("Exception while trying to close channel");
+                }
             }
         }
     }