浏览代码

ZOOKEEPER-337. improve logging in leader election lookForLeader method when address resolution fails (phunt via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@765797 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 16 年之前
父节点
当前提交
f6521f7c29

+ 3 - 0
CHANGES.txt

@@ -51,6 +51,9 @@ via mahadev)
   ZOOKEEPER-374. Uninitialized struct variable in C causes warning which 
 is treated as an error (phunt via mahadev)
 
+  ZOOKEEPER-337. improve logging in leader election lookForLeader method when
+address resolution fails (phunt via mahadev)
+
 IMPROVEMENTS:
   ZOOKEEPER-308. improve the atomic broadcast performance 3x.
   (breed via mahadev)

+ 178 - 111
src/java/main/org/apache/zookeeper/server/quorum/AuthFastLeaderElection.java

@@ -32,6 +32,7 @@ import java.util.Random;
 
 import org.apache.log4j.Logger;
 
+import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.quorum.Election;
 import org.apache.zookeeper.server.quorum.Vote;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -452,7 +453,17 @@ public class AuthFastLeaderElection implements Election {
                     requestBuffer.put(zeroes);
 
                     requestPacket.setLength(48);
-                    requestPacket.setSocketAddress(m.addr);
+                    try {
+                        requestPacket.setSocketAddress(m.addr);
+                    } catch (IllegalArgumentException e) {
+                        // Sun doesn't include the address that causes this
+                        // exception to be thrown, so we wrap the exception
+                        // in order to capture this critical detail.
+                        throw new IllegalArgumentException(
+                                "Unable to set socket address on packet, msg:"
+                                + e.getMessage() + " with addr:" + m.addr,
+                                e);
+                    }
 
                     try {
                         if (challengeMap.get(m.tag) == null) {
@@ -486,7 +497,18 @@ public class AuthFastLeaderElection implements Election {
                     requestBuffer.put(zeroes);
 
                     requestPacket.setLength(48);
-                    requestPacket.setSocketAddress(m.addr);
+                    try {
+                        requestPacket.setSocketAddress(m.addr);
+                    } catch (IllegalArgumentException e) {
+                        // Sun doesn't include the address that causes this
+                        // exception to be thrown, so we wrap the exception
+                        // in order to capture this critical detail.
+                        throw new IllegalArgumentException(
+                                "Unable to set socket address on packet, msg:"
+                                + e.getMessage() + " with addr:" + m.addr,
+                                e);
+                    }
+
 
                     try {
                         mySocket.send(requestPacket);
@@ -512,7 +534,18 @@ public class AuthFastLeaderElection implements Election {
                     requestBuffer.put(zeroes);
 
                     requestPacket.setLength(48);
-                    requestPacket.setSocketAddress(m.addr);
+                    try {
+                        requestPacket.setSocketAddress(m.addr);
+                    } catch (IllegalArgumentException e) {
+                        // Sun doesn't include the address that causes this
+                        // exception to be thrown, so we wrap the exception
+                        // in order to capture this critical detail.
+                        throw new IllegalArgumentException(
+                                "Unable to set socket address on packet, msg:"
+                                + e.getMessage() + " with addr:" + m.addr,
+                                e);
+                    }
+
 
                     boolean myChallenge = false;
                     boolean myAck = false;
@@ -629,7 +662,18 @@ public class AuthFastLeaderElection implements Election {
                     requestBuffer.putLong(m.epoch);
 
                     requestPacket.setLength(48);
-                    requestPacket.setSocketAddress(m.addr);
+                    try {
+                        requestPacket.setSocketAddress(m.addr);
+                    } catch (IllegalArgumentException e) {
+                        // Sun doesn't include the address that causes this
+                        // exception to be thrown, so we wrap the exception
+                        // in order to capture this critical detail.
+                        throw new IllegalArgumentException(
+                                "Unable to set socket address on packet, msg:"
+                                + e.getMessage() + " with addr:" + m.addr,
+                                e);
+                    }
+
 
                     try {
                         mySocket.send(requestPacket);
@@ -764,129 +808,152 @@ public class AuthFastLeaderElection implements Election {
     }
 
     public Vote lookForLeader() throws InterruptedException {
-        HashMap<InetSocketAddress, Vote> recvset = 
-            new HashMap<InetSocketAddress, Vote>();
-
-        HashMap<InetSocketAddress, Vote> outofelection = 
-            new HashMap<InetSocketAddress, Vote>();
-
-        logicalclock++;
-
-        proposedLeader = self.getId();
-        proposedZxid = self.getLastLoggedZxid();
-
-        LOG.info("Election tally");
-        sendNotifications();
-
-        /*
-         * Loop in which we exchange notifications until we find a leader
-         */
-
-        while (self.getPeerState() == ServerState.LOOKING) {
-            /*
-             * Remove next notification from queue, times out after 2 times the
-             * termination time
-             */
-            Notification n = recvqueue.poll(2 * finalizeWait,
-                    TimeUnit.MILLISECONDS);
+        try {
+            self.jmxLeaderElectionBean = new LeaderElectionBean();
+            MBeanRegistry.getInstance().register(
+                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);        
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            self.jmxLeaderElectionBean = null;
+        }
 
+        try {
+            HashMap<InetSocketAddress, Vote> recvset = 
+                new HashMap<InetSocketAddress, Vote>();
+    
+            HashMap<InetSocketAddress, Vote> outofelection = 
+                new HashMap<InetSocketAddress, Vote>();
+    
+            logicalclock++;
+    
+            proposedLeader = self.getId();
+            proposedZxid = self.getLastLoggedZxid();
+    
+            LOG.info("Election tally");
+            sendNotifications();
+    
             /*
-             * Sends more notifications if haven't received enough. Otherwise
-             * processes new notification.
+             * Loop in which we exchange notifications until we find a leader
              */
-            if (n == null) {
-                if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
-                    sendNotifications();
-            } else
-                switch (n.state) {
-                case LOOKING:
-                    if (n.epoch > logicalclock) {
-                        logicalclock = n.epoch;
-                        recvset.clear();
-                        if (totalOrderPredicate(n.leader, n.zxid)) {
+    
+            while (self.getPeerState() == ServerState.LOOKING) {
+                /*
+                 * Remove next notification from queue, times out after 2 times
+                 * the termination time
+                 */
+                Notification n = recvqueue.poll(2 * finalizeWait,
+                        TimeUnit.MILLISECONDS);
+    
+                /*
+                 * Sends more notifications if haven't received enough.
+                 * Otherwise processes new notification.
+                 */
+                if (n == null) {
+                    if (((!outofelection.isEmpty()) || (recvset.size() > 1)))
+                        sendNotifications();
+                } else
+                    switch (n.state) {
+                    case LOOKING:
+                        if (n.epoch > logicalclock) {
+                            logicalclock = n.epoch;
+                            recvset.clear();
+                            if (totalOrderPredicate(n.leader, n.zxid)) {
+                                proposedLeader = n.leader;
+                                proposedZxid = n.zxid;
+                            }
+                            sendNotifications();
+                        } else if (n.epoch < logicalclock) {
+                            break;
+                        } else if (totalOrderPredicate(n.leader, n.zxid)) {
                             proposedLeader = n.leader;
                             proposedZxid = n.zxid;
+    
+                            sendNotifications();
                         }
-                        sendNotifications();
-                    } else if (n.epoch < logicalclock) {
-                        break;
-                    } else if (totalOrderPredicate(n.leader, n.zxid)) {
-                        proposedLeader = n.leader;
-                        proposedZxid = n.zxid;
-
-                        sendNotifications();
-                    }
-
-                    recvset.put(n.addr, new Vote(n.leader, n.zxid));
-
-                    // If have received from all nodes, then terminate
-                    if (self.quorumPeers.size() == recvset.size()) {
-                        self.setPeerState((proposedLeader == self.getId()) ? 
-                                ServerState.LEADING: ServerState.FOLLOWING);
-                        // if (self.state == ServerState.FOLLOWING) {
-                        // Thread.sleep(100);
-                        // }
-                        leaveInstance();
-                        return new Vote(proposedLeader, proposedZxid);
-
-                    } else if (termPredicate(recvset, proposedLeader,
-                            proposedZxid)) {
-                        // Otherwise, wait for a fixed amount of time
-                        LOG.info("Passed predicate");
-                        Thread.sleep(finalizeWait);
-
-                        // Notification probe = recvqueue.peek();
-
-                        // Verify if there is any change in the proposed leader
-                        while ((!recvqueue.isEmpty())
-                                && !totalOrderPredicate(
-                                        recvqueue.peek().leader, recvqueue
-                                                .peek().zxid)) {
-                            recvqueue.poll();
-                        }
-                        if (recvqueue.isEmpty()) {
-                            // LOG.warn("Proposed leader: " +
-                            // proposedLeader);
+    
+                        recvset.put(n.addr, new Vote(n.leader, n.zxid));
+    
+                        // If have received from all nodes, then terminate
+                        if (self.quorumPeers.size() == recvset.size()) {
                             self.setPeerState((proposedLeader == self.getId()) ? 
                                     ServerState.LEADING: ServerState.FOLLOWING);
                             // if (self.state == ServerState.FOLLOWING) {
                             // Thread.sleep(100);
                             // }
-
                             leaveInstance();
                             return new Vote(proposedLeader, proposedZxid);
+    
+                        } else if (termPredicate(recvset, proposedLeader,
+                                proposedZxid)) {
+                            // Otherwise, wait for a fixed amount of time
+                            LOG.info("Passed predicate");
+                            Thread.sleep(finalizeWait);
+    
+                            // Notification probe = recvqueue.peek();
+    
+                            // Verify if there is any change in the proposed leader
+                            while ((!recvqueue.isEmpty())
+                                    && !totalOrderPredicate(
+                                            recvqueue.peek().leader, recvqueue
+                                                    .peek().zxid)) {
+                                recvqueue.poll();
+                            }
+                            if (recvqueue.isEmpty()) {
+                                // LOG.warn("Proposed leader: " +
+                                // proposedLeader);
+                                self.setPeerState(
+                                        (proposedLeader == self.getId()) ? 
+                                         ServerState.LEADING :
+                                         ServerState.FOLLOWING);
+                                // if (self.state == ServerState.FOLLOWING) {
+                                // Thread.sleep(100);
+                                // }
+    
+                                leaveInstance();
+                                return new Vote(proposedLeader, proposedZxid);
+                            }
                         }
+                        break;
+                    case LEADING:
+                        outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+    
+                        if (termPredicate(outofelection, n.leader, n.zxid)) {
+    
+                            self.setPeerState((n.leader == self.getId()) ? 
+                                    ServerState.LEADING: ServerState.FOLLOWING);
+    
+                            leaveInstance();
+                            return new Vote(n.leader, n.zxid);
+                        }
+                        break;
+                    case FOLLOWING:
+                        outofelection.put(n.addr, new Vote(n.leader, n.zxid));
+    
+                        if (termPredicate(outofelection, n.leader, n.zxid)) {
+    
+                            self.setPeerState((n.leader == self.getId()) ? 
+                                    ServerState.LEADING: ServerState.FOLLOWING);
+    
+                            leaveInstance();
+                            return new Vote(n.leader, n.zxid);
+                        }
+                        break;
+                    default:
+                        break;
                     }
-                    break;
-                case LEADING:
-                    outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
-                    if (termPredicate(outofelection, n.leader, n.zxid)) {
-
-                        self.setPeerState((n.leader == self.getId()) ? 
-                                ServerState.LEADING: ServerState.FOLLOWING);
-
-                        leaveInstance();
-                        return new Vote(n.leader, n.zxid);
-                    }
-                    break;
-                case FOLLOWING:
-                    outofelection.put(n.addr, new Vote(n.leader, n.zxid));
-
-                    if (termPredicate(outofelection, n.leader, n.zxid)) {
-
-                        self.setPeerState((n.leader == self.getId()) ? 
-                                ServerState.LEADING: ServerState.FOLLOWING);
-
-                        leaveInstance();
-                        return new Vote(n.leader, n.zxid);
-                    }
-                    break;
-                default:
-                    break;
+            }
+    
+            return null;
+        } finally {
+            try {
+                if(self.jmxLeaderElectionBean != null){
+                    MBeanRegistry.getInstance().unregister(
+                            self.jmxLeaderElectionBean);
                 }
+            } catch (Exception e) {
+                LOG.warn("Failed to unregister with JMX", e);
+            }
+            self.jmxLeaderElectionBean = null;
         }
-
-        return null;
     }
 }

+ 161 - 123
src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java

@@ -26,6 +26,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.log4j.Logger;
+import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
@@ -510,146 +511,183 @@ public class FastLeaderElection implements Election {
      * sends notifications to al other peers.
      */
     public Vote lookForLeader() throws InterruptedException {
-        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
-
-        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
-
-        int notTimeout = finalizeWait;
-        
-        synchronized(this){
-            logicalclock++;
-            updateProposal(self.getId(), self.getLastLoggedZxid());
+        try {
+            self.jmxLeaderElectionBean = new LeaderElectionBean();
+            MBeanRegistry.getInstance().register(
+                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);        
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            self.jmxLeaderElectionBean = null;
         }
-        
-        LOG.info("New election: " + proposedZxid);
-        sendNotifications();
-
-        /*
-         * Loop in which we exchange notifications until we find a leader
-         */
 
-        while (self.getPeerState() == ServerState.LOOKING) {
-            /*
-             * Remove next notification from queue, times out after 2 times
-             * the termination time
-             */
-            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
+        try {
+            HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
+    
+            HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
+    
+            int notTimeout = finalizeWait;
+            
+            synchronized(this){
+                logicalclock++;
+                updateProposal(self.getId(), self.getLastLoggedZxid());
+            }
             
+            LOG.info("New election: " + proposedZxid);
+            sendNotifications();
+    
             /*
-             * Sends more notifications if haven't received enough.
-             * Otherwise processes new notification.
+             * Loop in which we exchange notifications until we find a leader
              */
-            if(n == null){
-            	if(manager.haveDelivered()){
-            		sendNotifications();
-            	} else {
-            	    manager.connectAll();
-            	}
-            	
-            	/*
-            	 * Exponential backoff
-            	 */
-            	int tmpTimeOut = notTimeout*2;
-            	notTimeout = (tmpTimeOut < maxNotificationInterval? tmpTimeOut : maxNotificationInterval);
-            	LOG.info("Notification time out: " + notTimeout);
-            }
-            else { 
-                //notTimeout = finalizeWait;
-                switch (n.state) { 
-                case LOOKING:
-                    // If notification > current, replace and send messages out
-                    LOG.info("Notification: " + n.leader + ", " + n.zxid + ", " + 
-                            n.epoch + ", " + self.getId() + ", " + self.getPeerState() + 
-                            ", " + n.state + ", " + n.sid);
-                    if (n.epoch > logicalclock) {
-                        LOG.debug("Increasing logical clock: " + n.epoch);
-                        logicalclock = n.epoch;
-                        recvset.clear();
-                        if(totalOrderPredicate(n.leader, n.zxid, self.getId(), self.getLastLoggedZxid()))
-                            updateProposal(n.leader, n.zxid);
-                        else
-                            updateProposal(self.getId(), self.getLastLoggedZxid());
-                        sendNotifications();
-                    } else if (n.epoch < logicalclock) {
-                        LOG.info("n.epoch < logicalclock");
-                        break;
-                    } else if (totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)) {
-                        LOG.info("Updating proposal");
-                        updateProposal(n.leader, n.zxid);
-                        sendNotifications();
-                    }
+    
+            while (self.getPeerState() == ServerState.LOOKING) {
+                /*
+                 * Remove next notification from queue, times out after 2 times
+                 * the termination time
+                 */
+                Notification n = recvqueue.poll(notTimeout,
+                        TimeUnit.MILLISECONDS);
                 
-                    LOG.info("Adding vote");
-                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
-
-                    //If have received from all nodes, then terminate
-                    if (self.quorumPeers.size() == recvset.size()) {
-                        self.setPeerState((proposedLeader == self.getId()) ? 
-                                ServerState.LEADING: ServerState.FOLLOWING);
-                        leaveInstance();
-                        return new Vote(proposedLeader, proposedZxid);
-                        
-                    } else if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid, logicalclock))) {
-                        //Otherwise, wait for a fixed amount of time
-                        LOG.debug("Passed predicate");
-
-                        // Verify if there is any change in the proposed leader
-                        while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null){
-                            if(totalOrderPredicate(n.leader, n.zxid, proposedLeader, proposedZxid)){
-                                recvqueue.put(n);
-                                break;
-                            }
+                /*
+                 * Sends more notifications if haven't received enough.
+                 * Otherwise processes new notification.
+                 */
+                if(n == null){
+                	if(manager.haveDelivered()){
+                		sendNotifications();
+                	} else {
+                	    manager.connectAll();
+                	}
+                	
+                	/*
+                	 * Exponential backoff
+                	 */
+                	int tmpTimeOut = notTimeout*2;
+                	notTimeout = (tmpTimeOut < maxNotificationInterval?
+                	        tmpTimeOut : maxNotificationInterval);
+                	LOG.info("Notification time out: " + notTimeout);
+                }
+                else { 
+                    //notTimeout = finalizeWait;
+                    switch (n.state) { 
+                    case LOOKING:
+                        // If notification > current, replace and send messages out
+                        LOG.info("Notification: " + n.leader + ", " + n.zxid
+                                + ", " + n.epoch + ", " + self.getId() + ", "
+                                + self.getPeerState() + ", " + n.state + ", "
+                                + n.sid);
+                        if (n.epoch > logicalclock) {
+                            logicalclock = n.epoch;
+                            recvset.clear();
+                            if(totalOrderPredicate(n.leader, n.zxid,
+                                    self.getId(), self.getLastLoggedZxid()))
+                                updateProposal(n.leader, n.zxid);
+                            else
+                                updateProposal(self.getId(),
+                                        self.getLastLoggedZxid());
+                            sendNotifications();
+                        } else if (n.epoch < logicalclock) {
+                            LOG.info("n.epoch < logicalclock");
+                            break;
+                        } else if (totalOrderPredicate(n.leader, n.zxid,
+                                proposedLeader, proposedZxid)) {
+                            LOG.info("Updating proposal");
+                            updateProposal(n.leader, n.zxid);
+                            sendNotifications();
                         }
                     
-                        if (n == null) {
+                        LOG.info("Adding vote");
+                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.epoch));
+    
+                        //If have received from all nodes, then terminate
+                        if (self.quorumPeers.size() == recvset.size()) {
                             self.setPeerState((proposedLeader == self.getId()) ? 
-                                ServerState.LEADING: ServerState.FOLLOWING);
-                            LOG.info("About to leave instance:" + proposedLeader + ", " + 
-                                    proposedZxid + ", " + self.getId() + ", " + self.getPeerState());
+                                    ServerState.LEADING: ServerState.FOLLOWING);
                             leaveInstance();
-                            return new Vote(proposedLeader,
-                                proposedZxid);
+                            return new Vote(proposedLeader, proposedZxid);
+                            
+                        } else if (termPredicate(recvset,
+                                new Vote(proposedLeader, proposedZxid,
+                                        logicalclock))) {
+                            //Otherwise, wait for a fixed amount of time
+                            LOG.debug("Passed predicate");
+    
+                            // Verify if there is any change in the proposed leader
+                            while((n = recvqueue.poll(finalizeWait,
+                                    TimeUnit.MILLISECONDS)) != null){
+                                if(totalOrderPredicate(n.leader, n.zxid,
+                                        proposedLeader, proposedZxid)){
+                                    recvqueue.put(n);
+                                    break;
+                                }
+                            }
+                        
+                            if (n == null) {
+                                self.setPeerState((proposedLeader == self.getId()) ? 
+                                    ServerState.LEADING: ServerState.FOLLOWING);
+                                LOG.info("About to leave instance:"
+                                        + proposedLeader + ", " + 
+                                        proposedZxid + ", " + self.getId()
+                                        + ", " + self.getPeerState());
+                                leaveInstance();
+                                return new Vote(proposedLeader,
+                                    proposedZxid);
+                            }
                         }
-                    }
-                    break;
-                case LEADING:
-                    /*
-                     * There is at most one leader for each epoch, so if a peer claims to
-                     * be the leader for an epoch, then that peer must be the leader (no
-                     * arbitrary failures assumed). Now, if there is no quorum supporting 
-                     * this leader, then processes will naturally move to a new epoch.
-                     */
-                    if(n.epoch == logicalclock){
-                        self.setPeerState((n.leader == self.getId()) ? 
-                                ServerState.LEADING: ServerState.FOLLOWING);
-                   
-                        leaveInstance();
-                        return new Vote(n.leader, n.zxid);
-                    }
-                case FOLLOWING:
-                    LOG.info("Notification: " + n.leader + ", " + n.zxid + 
-                            ", " + n.epoch + ", " + self.getId() + ", " + 
-                            self.getPeerState() + ", " + n.state + ", " + n.sid);
-       
-                    outofelection.put(n.sid, new Vote(n.leader, n.zxid, n.epoch, n.state));
-
-                    if (termPredicate(outofelection, new Vote(n.leader, n.zxid, n.epoch, n.state))
-                            && checkLeader(outofelection, n.leader, n.epoch)) {
-                        synchronized(this){
-                            logicalclock = n.epoch;
+                        break;
+                    case LEADING:
+                        /*
+                         * There is at most one leader for each epoch, so if a
+                         * peer claims to be the leader for an epoch, then that
+                         * peer must be the leader (no* arbitrary failures
+                         * assumed). Now, if there is no quorum supporting 
+                         * this leader, then processes will naturally move
+                         * to a new epoch.
+                         */
+                        if(n.epoch == logicalclock){
                             self.setPeerState((n.leader == self.getId()) ? 
                                     ServerState.LEADING: ServerState.FOLLOWING);
+                       
+                            leaveInstance();
+                            return new Vote(n.leader, n.zxid);
+                        }
+                    case FOLLOWING:
+                        LOG.info("Notification: " + n.leader + ", " + n.zxid + 
+                                ", " + n.epoch + ", " + self.getId() + ", " + 
+                                self.getPeerState() + ", " + n.state + ", "
+                                + n.sid);
+           
+                        outofelection.put(n.sid, new Vote(n.leader, n.zxid,
+                                n.epoch, n.state));
+    
+                        if (termPredicate(outofelection, new Vote(n.leader,
+                                n.zxid, n.epoch, n.state))
+                                && checkLeader(outofelection, n.leader, n.epoch)) {
+                            synchronized(this){
+                                logicalclock = n.epoch;
+                                self.setPeerState((n.leader == self.getId()) ? 
+                                        ServerState.LEADING: ServerState.FOLLOWING);
+                            }
+                            leaveInstance();
+                            return new Vote(n.leader, n.zxid);
                         }
-                        leaveInstance();
-                        return new Vote(n.leader, n.zxid);
+                        break;
+                    default:
+                        break;
                     }
-                    break;
-                default:
-                    break;
                 }
             }
+    
+            return null;
+        } finally {
+            try {
+                if(self.jmxLeaderElectionBean != null){
+                    MBeanRegistry.getInstance().unregister(
+                            self.jmxLeaderElectionBean);
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to unregister with JMX", e);
+            }
+            self.jmxLeaderElectionBean = null;
         }
-
-        return null;
     }
 }

+ 109 - 73
src/java/main/org/apache/zookeeper/server/quorum/LeaderElection.java

@@ -33,6 +33,7 @@ import java.util.Map.Entry;
 
 import org.apache.log4j.Logger;
 
+import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.quorum.Vote;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
@@ -40,7 +41,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 public class LeaderElection implements Election  {
     private static final Logger LOG = Logger.getLogger(LeaderElection.class);
     private static Random epochGen = new Random();
-    
+
     QuorumPeer self;
 
     public LeaderElection(QuorumPeer self) {
@@ -80,7 +81,7 @@ public class LeaderElection implements Election  {
                 }
             }
         }
-        
+
         HashMap<Vote, Integer> countTable = new HashMap<Vote, Integer>();
         // Now do the tally
         for (Vote v : votesCast) {
@@ -110,85 +111,120 @@ public class LeaderElection implements Election  {
     }
 
     public Vote lookForLeader() throws InterruptedException {
-        self.setCurrentVote(new Vote(self.getId(), self.getLastLoggedZxid()));
-        // We are going to look for a leader by casting a vote for ourself
-        byte requestBytes[] = new byte[4];
-        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-        byte responseBytes[] = new byte[28];
-        ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
-        /* The current vote for the leader. Initially me! */
-        DatagramSocket s = null;
         try {
-            s = new DatagramSocket();
-            s.setSoTimeout(200);
-        } catch (SocketException e1) {
-            e1.printStackTrace();
-            System.exit(4);
+            self.jmxLeaderElectionBean = new LeaderElectionBean();
+            MBeanRegistry.getInstance().register(
+                    self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
+        } catch (Exception e) {
+            LOG.warn("Failed to register with JMX", e);
+            self.jmxLeaderElectionBean = null;
         }
-        DatagramPacket requestPacket = new DatagramPacket(requestBytes,
-                requestBytes.length);
-        DatagramPacket responsePacket = new DatagramPacket(responseBytes,
-                responseBytes.length);
-        HashMap<InetSocketAddress, Vote> votes = new HashMap<InetSocketAddress, Vote>(
-                self.quorumPeers.size());
-        int xid = epochGen.nextInt();
-        while (self.running) {
-            votes.clear();
-            requestBuffer.clear();
-            requestBuffer.putInt(xid);
-            requestPacket.setLength(4);
-            HashSet<Long> heardFrom = new HashSet<Long>();
-            for (QuorumServer server : self.quorumPeers.values()) {
-                requestPacket.setSocketAddress(server.addr);
-                LOG.info("Server address: " + server.addr);
-                try {
-                    s.send(requestPacket);
-                    responsePacket.setLength(responseBytes.length);
-                    s.receive(responsePacket);
-                    if (responsePacket.getLength() != responseBytes.length) {
-                        LOG.error("Got a short response: "
-                                + responsePacket.getLength());
-                        continue;
+
+        try {
+            self.setCurrentVote(new Vote(self.getId(),
+                    self.getLastLoggedZxid()));
+            // We are going to look for a leader by casting a vote for ourself
+            byte requestBytes[] = new byte[4];
+            ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+            byte responseBytes[] = new byte[28];
+            ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
+            /* The current vote for the leader. Initially me! */
+            DatagramSocket s = null;
+            try {
+                s = new DatagramSocket();
+                s.setSoTimeout(200);
+            } catch (SocketException e1) {
+                e1.printStackTrace();
+                System.exit(4);
+            }
+            DatagramPacket requestPacket = new DatagramPacket(requestBytes,
+                    requestBytes.length);
+            DatagramPacket responsePacket = new DatagramPacket(responseBytes,
+                    responseBytes.length);
+            HashMap<InetSocketAddress, Vote> votes =
+                new HashMap<InetSocketAddress, Vote>(self.quorumPeers.size());
+            int xid = epochGen.nextInt();
+            while (self.running) {
+                votes.clear();
+                requestBuffer.clear();
+                requestBuffer.putInt(xid);
+                requestPacket.setLength(4);
+                HashSet<Long> heardFrom = new HashSet<Long>();
+                for (QuorumServer server : self.quorumPeers.values()) {
+                    LOG.info("Server address: " + server.addr);
+                    try {
+                        requestPacket.setSocketAddress(server.addr);
+                    } catch (IllegalArgumentException e) {
+                        // Sun doesn't include the address that causes this
+                        // exception to be thrown, so we wrap the exception
+                        // in order to capture this critical detail.
+                        throw new IllegalArgumentException(
+                                "Unable to set socket address on packet, msg:"
+                                + e.getMessage() + " with addr:" + server.addr,
+                                e);
                     }
-                    responseBuffer.clear();
-                    int recvedXid = responseBuffer.getInt();
-                    if (recvedXid != xid) {
-                        LOG.error("Got bad xid: expected " + xid
-                                + " got " + recvedXid);
-                        continue;
+
+                    try {
+                        s.send(requestPacket);
+                        responsePacket.setLength(responseBytes.length);
+                        s.receive(responsePacket);
+                        if (responsePacket.getLength() != responseBytes.length) {
+                            LOG.error("Got a short response: "
+                                    + responsePacket.getLength());
+                            continue;
+                        }
+                        responseBuffer.clear();
+                        int recvedXid = responseBuffer.getInt();
+                        if (recvedXid != xid) {
+                            LOG.error("Got bad xid: expected " + xid
+                                    + " got " + recvedXid);
+                            continue;
+                        }
+                        long peerId = responseBuffer.getLong();
+                        heardFrom.add(peerId);
+                        //if(server.id != peerId){
+                            Vote vote = new Vote(responseBuffer.getLong(),
+                                responseBuffer.getLong());
+                            InetSocketAddress addr =
+                                (InetSocketAddress) responsePacket
+                                .getSocketAddress();
+                            votes.put(addr, vote);
+                        //}
+                    } catch (IOException e) {
+                        LOG.warn("Ignoring exception while looking for leader",
+                                e);
+                        // Errors are okay, since hosts may be
+                        // down
                     }
-                    long peerId = responseBuffer.getLong();
-                    heardFrom.add(peerId);
-                    //if(server.id != peerId){
-                        Vote vote = new Vote(responseBuffer.getLong(),
-                            responseBuffer.getLong());
-                        InetSocketAddress addr = (InetSocketAddress) responsePacket
-                            .getSocketAddress();
-                        votes.put(addr, vote);
-                    //}
-                } catch (IOException e) {
-                    LOG.warn("Ignoring exception while looking for leader", e);
-                    // Errors are okay, since hosts may be
-                    // down
                 }
-            }
-            ElectionResult result = countVotes(votes, heardFrom);
-            if (result.winner.id >= 0) {
-                self.setCurrentVote(result.vote);
-                if (result.winningCount > (self.quorumPeers.size() / 2)) {
-                    self.setCurrentVote(result.winner);
-                    s.close();
-                    Vote current = self.getCurrentVote();
-                    self.setPeerState((current.id == self.getId()) 
-                            ? ServerState.LEADING: ServerState.FOLLOWING);
-                    if (self.getPeerState() == ServerState.FOLLOWING) {
-                        Thread.sleep(100);
+                ElectionResult result = countVotes(votes, heardFrom);
+                if (result.winner.id >= 0) {
+                    self.setCurrentVote(result.vote);
+                    if (result.winningCount > (self.quorumPeers.size() / 2)) {
+                        self.setCurrentVote(result.winner);
+                        s.close();
+                        Vote current = self.getCurrentVote();
+                        self.setPeerState((current.id == self.getId())
+                                ? ServerState.LEADING: ServerState.FOLLOWING);
+                        if (self.getPeerState() == ServerState.FOLLOWING) {
+                            Thread.sleep(100);
+                        }
+                        return current;
                     }
-                    return current;
                 }
+                Thread.sleep(1000);
+            }
+            return null;
+        } finally {
+            try {
+                if(self.jmxLeaderElectionBean != null){
+                    MBeanRegistry.getInstance().unregister(
+                            self.jmxLeaderElectionBean);
+                }
+            } catch (Exception e) {
+                LOG.warn("Failed to unregister with JMX", e);
             }
-            Thread.sleep(1000);
+            self.jmxLeaderElectionBean = null;
         }
-        return null;
     }
 }

+ 1 - 8
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -366,14 +366,7 @@ public class QuorumPeer extends Thread implements QuorumStats.Provider {
     }
 
     protected Election makeLEStrategy(){
-        LOG.debug("Running leader election protocol...");
-        try {
-            jmxLeaderElectionBean = new LeaderElectionBean();
-            MBeanRegistry.getInstance().register(jmxLeaderElectionBean, jmxLocalPeerBean);        
-        } catch (Exception e) {
-            LOG.warn("Failed to register with JMX", e);
-            jmxLeaderElectionBean = null;
-        }
+        LOG.debug("Initializing leader election protocol...");
 
         if(electionAlg==null)
             return new LeaderElection(this);

+ 1 - 1
src/java/test/org/apache/zookeeper/server/CRCTest.java

@@ -91,7 +91,7 @@ public class CRCTest extends TestCase implements Watcher{
         LOG.info("starting up the zookeeper server .. waiting");
         assertTrue("waiting for server being up", 
                 ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
-        ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
         for (int i =0; i < 2000; i++) {
             zk.create("/crctest- " + i , ("/crctest- " + i).getBytes(), 
                     Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

+ 1 - 1
src/java/test/org/apache/zookeeper/server/InvalidSnapshotTest.java

@@ -58,7 +58,7 @@ public class InvalidSnapshotTest extends TestCase implements Watcher {
        f.startup(zks);
        assertTrue("waiting for server being up ", 
                ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
-       ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+       ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
        for (int i=0; i< 2000; i++) {
            zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 
                    CreateMode.PERSISTENT);

+ 69 - 0
src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

@@ -20,13 +20,20 @@ package org.apache.zookeeper.server.quorum;
 
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.LineNumberReader;
+import java.io.StringReader;
+import java.util.regex.Pattern;
 
 import junit.framework.TestCase;
 
+import org.apache.log4j.Layout;
+import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.log4j.WriterAppender;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
@@ -35,6 +42,7 @@ import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.test.ClientBase;
 import org.junit.Test;
 
+
 /**
  * Test stand-alone server.
  *
@@ -159,6 +167,67 @@ public class QuorumPeerMainTest extends TestCase implements Watcher {
                         ClientBase.CONNECTION_TIMEOUT));
     }
 
+    /**
+     * Verify handling of bad quorum address
+     */
+    @Test
+    public void testBadPeerAddressInQuorum() throws Exception {
+        LOG.info("STARTING " + getName());
+        ClientBase.setupTestEnv();
+
+        // setup the logger to capture all logs
+        Layout layout =
+            Logger.getRootLogger().getAppender("CONSOLE").getLayout();
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        WriterAppender appender = new WriterAppender(layout, os);
+        appender.setThreshold(Level.WARN);
+        Logger.getLogger(org.apache.zookeeper.server.quorum.QuorumPeer.class)
+            .addAppender(appender);
+
+        try {
+            final int CLIENT_PORT_QP1 = 3181;
+            final int CLIENT_PORT_QP2 = CLIENT_PORT_QP1 + 3;
+
+            String quorumCfgSection =
+                "server.1=localhost:" + (CLIENT_PORT_QP1 + 1)
+                + ":" + (CLIENT_PORT_QP1 + 2)
+                + "\nserver.2=fee.fii.foo.fum:" + (CLIENT_PORT_QP2 + 1)
+                + ":" + (CLIENT_PORT_QP2 + 2);
+
+            MainThread q1 = new MainThread(1, CLIENT_PORT_QP1, quorumCfgSection);
+            q1.start();
+
+            boolean isup =
+                ClientBase.waitForServerUp("localhost:" + CLIENT_PORT_QP1,
+                        5000);
+
+            assertFalse(isup);
+            
+            q1.shutdown();
+
+            assertTrue("waiting for server 1 down",
+                    ClientBase.waitForServerDown("localhost:" + CLIENT_PORT_QP1,
+                            ClientBase.CONNECTION_TIMEOUT));
+            
+        } finally {
+            Logger.getLogger(org.apache.zookeeper.server.quorum.QuorumPeer.class)
+                .removeAppender(appender);
+        }
+
+        LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
+        String line;
+        boolean found = false;
+        Pattern p =
+            Pattern.compile(".*IllegalArgumentException.*fee.fii.foo.fum.*");
+        while ((line = r.readLine()) != null) {
+            found = p.matcher(line).matches();
+            if (found) {
+                break;
+            }
+        }
+        assertTrue("complains about host", found);
+    }
+
     public void process(WatchedEvent event) {
         // ignore for this test
     }

+ 1 - 1
src/java/test/org/apache/zookeeper/test/ACLTest.java

@@ -71,7 +71,7 @@ public class ACLTest extends TestCase implements Watcher {
         LOG.info("starting up the zookeeper server .. waiting");
         assertTrue("waiting for server being up", 
                 ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
-        ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
         String path;
         LOG.info("starting creating acls");
         for (int i = 0; i < 100; i++) {

+ 2 - 2
src/java/test/org/apache/zookeeper/test/AsyncTest.java

@@ -91,7 +91,7 @@ public class AsyncTest extends TestCase
         throws IOException, InterruptedException
     {
         CountdownWatcher watcher = new CountdownWatcher();
-        ZooKeeper zk = new ZooKeeper(hp, 30000, watcher);
+        ZooKeeper zk = new ZooKeeper(hp, CONNECTION_TIMEOUT, watcher);
         if(!watcher.clientConnected.await(CONNECTION_TIMEOUT,
                 TimeUnit.MILLISECONDS))
         {
@@ -117,7 +117,7 @@ public class AsyncTest extends TestCase
 
         public void run() {
             try {
-                zk = new ZooKeeper(qb.hostPort, 30000, this);
+                zk = new ZooKeeper(qb.hostPort, CONNECTION_TIMEOUT, this);
                 while(bang) {
                     incOutstanding(); // before create otw race
                     zk.create("/test-", new byte[0], Ids.OPEN_ACL_UNSAFE,

+ 2 - 1
src/java/test/org/apache/zookeeper/test/ClientBase.java

@@ -129,7 +129,8 @@ public abstract class ClientBase extends TestCase {
     protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp)
         throws IOException, InterruptedException
     {
-        TestableZooKeeper zk = new TestableZooKeeper(hp, 9000, watcher);
+        TestableZooKeeper zk =
+            new TestableZooKeeper(hp, CONNECTION_TIMEOUT, watcher);
         if (!watcher.clientConnected.await(CONNECTION_TIMEOUT,
                 TimeUnit.MILLISECONDS))
         {

+ 1 - 1
src/java/test/org/apache/zookeeper/test/IntegrityCheck.java

@@ -82,7 +82,7 @@ public class IntegrityCheck implements Watcher, StatCallback, DataCallback {
 
     IntegrityCheck(String hostPort, String path, int count) throws
             IOException {
-        zk = new ZooKeeper(hostPort, 15000, this);
+        zk = new ZooKeeper(hostPort, 30000, this);
         this.path = path;
         this.count = count;
     }

+ 8 - 4
src/java/test/org/apache/zookeeper/test/OOMTest.java

@@ -104,7 +104,8 @@ public class OOMTest extends TestCase implements Watcher {
     }
 
     private void utestExists() throws IOException, InterruptedException, KeeperException {
-        ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+        ZooKeeper zk =
+            new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
         for (int i = 0; i < 10000; i++) {
             zk.exists("/this/path/doesnt_exist!", true);
         }
@@ -113,7 +114,8 @@ public class OOMTest extends TestCase implements Watcher {
 
     private void utestPrep() throws IOException,
             InterruptedException, KeeperException {
-        ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+        ZooKeeper zk =
+            new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
         for (int i = 0; i < 10000; i++) {
             zk.create("/" + i, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
         }
@@ -121,7 +123,8 @@ public class OOMTest extends TestCase implements Watcher {
     }
 
     private void utestGet() throws IOException, InterruptedException, KeeperException {
-        ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+        ZooKeeper zk =
+            new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
         for (int i = 0; i < 10000; i++) {
             Stat stat = new Stat();
             zk.getData("/" + i, true, stat);
@@ -130,7 +133,8 @@ public class OOMTest extends TestCase implements Watcher {
     }
 
     private void utestChildren() throws IOException, InterruptedException, KeeperException {
-        ZooKeeper zk = new ZooKeeper("127.0.0.1:33221", 30000, this);
+        ZooKeeper zk =
+            new ZooKeeper("127.0.0.1:33221", CONNECTION_TIMEOUT, this);
         for (int i = 0; i < 10000; i++) {
             zk.getChildren("/" + i, true);
         }

+ 1 - 1
src/java/test/org/apache/zookeeper/test/PurgeTxnTest.java

@@ -58,7 +58,7 @@ public class PurgeTxnTest extends TestCase implements  Watcher {
         f.startup(zks);
         assertTrue("waiting for server being up ", 
                 ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
-        ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
         for (int i=0; i< 2000; i++) {
             zk.create("/invalidsnap-" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 
                     CreateMode.PERSISTENT);

+ 1 - 1
src/java/test/org/apache/zookeeper/test/RecoveryTest.java

@@ -87,7 +87,7 @@ public class RecoveryTest extends TestCase implements Watcher {
                                        CONNECTION_TIMEOUT));
 
             startSignal = new CountDownLatch(1);
-            ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+            ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
             startSignal.await(CONNECTION_TIMEOUT,
                     TimeUnit.MILLISECONDS);
             assertTrue("count == 0", startSignal.getCount() == 0);

+ 1 - 1
src/java/test/org/apache/zookeeper/test/UpgradeTest.java

@@ -73,7 +73,7 @@ public class UpgradeTest extends TestCase implements Watcher {
         LOG.info("starting up the zookeeper server .. waiting");
         assertTrue("waiting for server being up", 
                 ClientBase.waitForServerUp(HOSTPORT,CONNECTION_TIMEOUT));
-        ZooKeeper zk = new ZooKeeper(HOSTPORT, 20000, this);
+        ZooKeeper zk = new ZooKeeper(HOSTPORT, CONNECTION_TIMEOUT, this);
         Stat stat = zk.exists("/", false);
         List<String> children = zk.getChildren("/", false);
         Collections.sort(children);

+ 1 - 1
src/java/test/org/apache/zookeeper/test/WatcherFuncTest.java

@@ -108,7 +108,7 @@ public class WatcherFuncTest extends ClientBase {
     protected ZooKeeper createClient(Watcher watcher, CountDownLatch latch)
         throws IOException, InterruptedException
     {
-        ZooKeeper zk = new ZooKeeper(hostPort, 20000, watcher);
+        ZooKeeper zk = new ZooKeeper(hostPort, CONNECTION_TIMEOUT, watcher);
         if(!latch.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
             fail("Unable to connect to server");
         }