Procházet zdrojové kódy

ZOOKEEPER-1810. Add version to FLE notifications for trunk Germán Blanco via michim)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1608648 13f79535-47bb-0310-9956-ffa450edef68
Michi Mutsuzaki před 11 roky
rodič
revize
5a6535bf41

+ 3 - 0
CHANGES.txt

@@ -683,6 +683,9 @@ BUGFIXES:
   ZOOKEEPER-1835. dynamic configuration file renaming fails on Windows
   (Bruno Freudensprung via rakeshr)
 
+  ZOOKEEPER-1810. Add version to FLE notifications for trunk Germán Blanco via
+  michim)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

+ 172 - 99
src/java/main/org/apache/zookeeper/server/quorum/FastLeaderElection.java

@@ -94,6 +94,13 @@ public class FastLeaderElection implements Election {
      */
 
     static public class Notification {
+        /*
+         * Format version, introduced in 3.4.6
+         */
+
+        public final static int CURRENTVERSION = 0x2;
+        int version;
+
         /*
          * Proposed leader
          */
@@ -125,9 +132,9 @@ public class FastLeaderElection implements Election {
          */
         long peerEpoch;
     }
-    
+
     static byte[] dummyData = new byte[0];
-    
+
     /**
      * Messages that a peer wants to send to other peers.
      * These messages can be both Notifications and Acks
@@ -142,16 +149,15 @@ public class FastLeaderElection implements Election {
                 long electionEpoch,
                 ServerState state,
                 long sid,
-                long peerEpoch,                
+                long peerEpoch,
                 byte[] configData) {
 
-
             this.leader = leader;
             this.zxid = zxid;
             this.electionEpoch = electionEpoch;
             this.state = state;
             this.sid = sid;
-            this.peerEpoch = peerEpoch;            
+            this.peerEpoch = peerEpoch;
             this.configData = configData;
         }
 
@@ -184,7 +190,7 @@ public class FastLeaderElection implements Election {
          * Used to send a QuorumVerifier (configuration info)
          */
         byte[] configData = dummyData;
-        
+
         /*
          * Leader epoch
          */
@@ -201,7 +207,7 @@ public class FastLeaderElection implements Election {
      * spawns a new thread.
      */
 
-    private class Messenger {
+    protected class Messenger {
 
         /**
          * Receives messages from instance of QuorumCnxManager on
@@ -223,78 +229,91 @@ public class FastLeaderElection implements Election {
                 Message response;
                 while (!stop) {
                     // Sleeps on receive
-                    try{
+                    try {
                         response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                         if(response == null) continue;
-                        
+
                         // The current protocol and two previous generations all send at least 28 bytes
                         if (response.buffer.capacity() < 28) {
                             LOG.error("Got a short response: " + response.buffer.capacity());
                             continue;
                         }
-                        
+
                         // this is the backwardCompatibility mode in place before ZK-107
                         // It is for a version of the protocol in which we didn't send peer epoch
-                        // With peer epoch the message became 36 bytes
+                        // With peer epoch and version the message became 40 bytes
                         boolean backCompatibility28 = (response.buffer.capacity() == 28);
-                        
-                        // ZK-107 sends the configuration info in every message.
-                        // So messages are 36 bytes + size of configuration info 
-                        // (variable length, shoulld be at the end of the message).
-                        boolean backCompatibility36 = (response.buffer.capacity() == 36);
 
+                        // this is the backwardCompatibility mode for no version information
+                        boolean backCompatibility40 = (response.buffer.capacity() == 40);
+                        
                         response.buffer.clear();
+
+                        // Instantiate Notification and set its attributes
+                        Notification n = new Notification();
+
                         int rstate = response.buffer.getInt();
                         long rleader = response.buffer.getLong();
                         long rzxid = response.buffer.getLong();
                         long relectionEpoch = response.buffer.getLong();
                         long rpeerepoch;
-                        
-                        if(!backCompatibility28){
-                           rpeerepoch = response.buffer.getLong();
-                        } else {
-                            if(LOG.isInfoEnabled()){
-                                LOG.info("Backward compatibility mode (28 bits), server id: " + response.sid);
+
+                        int version = 0x0;
+                        if (!backCompatibility28) {
+                            rpeerepoch = response.buffer.getLong();
+                            if (!backCompatibility40) {
+                                /*
+                                 * Version added in 3.4.6
+                                 */
+                                
+                                version = response.buffer.getInt();
+                            } else {
+                                LOG.info("Backward compatibility mode (36 bits), server id: {}", response.sid);
                             }
+                        } else {
+                            LOG.info("Backward compatibility mode (28 bits), server id: {}", response.sid);
                             rpeerepoch = ZxidUtils.getEpochFromZxid(rzxid);
                         }
-                        
+
                         QuorumVerifier rqv = null;
-                        
-                        // check if we have more than 36 bytes. If so extract config info from message.
-                        if(!backCompatibility28 && !backCompatibility36){
-                           byte b[] = new byte[response.buffer.remaining()];
-                           response.buffer.get(b);
+
+                        // check if we have a version that includes config. If so extract config info from message.
+                        if (version > 0x1) {
+                            int configLength = response.buffer.getInt();
+                            byte b[] = new byte[configLength];
+
+                            response.buffer.get(b);
                                                        
-                           synchronized(self){                             
-                               try {
-                                   rqv = self.configFromString(new String(b));
-                                   QuorumVerifier curQV = self.getQuorumVerifier();
-                                   if (rqv.getVersion() > curQV.getVersion()) {
-                                       LOG.info(self.getId() + " Received version: " + Long.toHexString(rqv.getVersion()) + " my version: " + Long.toHexString(self.getQuorumVerifier().getVersion()));
-                                       self.processReconfig(rqv, null, null, false);
-                                       if (!rqv.equals(curQV)) {
-                                           LOG.info("restarting leader election");
-                                           self.shuttingDownLE = true;
-                                           self.getElectionAlg().shutdown();
-                                           break;
+                            synchronized(self) {
+                                try {
+                                    rqv = self.configFromString(new String(b));
+                                    QuorumVerifier curQV = self.getQuorumVerifier();
+                                    if (rqv.getVersion() > curQV.getVersion()) {
+                                        LOG.info("{} Received version: {} my version: {}", self.getId(),
+                                                Long.toHexString(rqv.getVersion()),
+                                                Long.toHexString(self.getQuorumVerifier().getVersion()));
+                                        self.processReconfig(rqv, null, null, false);
+                                        if (!rqv.equals(curQV)) {
+                                            LOG.info("restarting leader election");
+                                            self.shuttingDownLE = true;
+                                            self.getElectionAlg().shutdown();
+                                            
+                                            break;
                                        }
-                                   }           
-                               } catch (IOException e) {                         
-                                   LOG.error("Something went wrong while processing config received from " + response.sid);
+                                    }
+                                } catch (IOException e) {
+                                    LOG.error("Something went wrong while processing config received from {}", response.sid);
                                } catch (ConfigException e) {
-                                   LOG.error("Something went wrong while processing config received from " + response.sid);
-                               } 
-                          }                                                       
+                                   LOG.error("Something went wrong while processing config received from {}", response.sid);
+                               }
+                            }                          
                         } else {
-                            if(LOG.isInfoEnabled()){
-                                LOG.info("Backward compatibility mode (before reconfig), server id: " + response.sid);
-                            }
+                            LOG.info("Backward compatibility mode (before reconfig), server id: {}", response.sid);
                         }
                        
                         /*
-                         * If it is from a non-voting server (such as an observer or 
-                         * a non-voting follower), respond right away. 
+                         * If it is from a non-voting server (such as an observer or
+                         * a non-voting follower), respond right away.
                          */
                         if(!self.getVotingView().containsKey(response.sid)){
                             Vote current = self.getCurrentVote();
@@ -331,17 +350,18 @@ public class FastLeaderElection implements Election {
                             case 3:
                                 ackstate = QuorumPeer.ServerState.OBSERVING;
                                 break;
+                            default:
+                                continue;
                             }
 
-                            // Instantiate Notification and set its attributes
-                            Notification n = new Notification();
                             n.leader = rleader;
                             n.zxid = rzxid;
                             n.electionEpoch = relectionEpoch;
                             n.state = ackstate;
-                            n.sid = response.sid;                            
+                            n.sid = response.sid;
                             n.peerEpoch = rpeerepoch;
-                            n.qv = rqv;                              
+                            n.version = version;
+                            n.qv = rqv;
                             /*
                              * Print notification info
                              */
@@ -383,14 +403,14 @@ public class FastLeaderElection implements Election {
                                 Vote current = self.getCurrentVote();
                                 if(ackstate == QuorumPeer.ServerState.LOOKING){
                                     if(LOG.isDebugEnabled()){
-                                        LOG.debug("Sending new notification. My id =  " +
-                                                self.getId() + " recipient=" +
-                                                response.sid + " zxid=0x" +
-                                                Long.toHexString(current.getZxid()) +
-                                                " leader=" + current.getId() + " config version = " + 
+                                        LOG.debug("Sending new notification. My id ={} recipient={} zxid=0x{} leader={} config version = {}",
+                                                self.getId(),
+                                                response.sid,
+                                                Long.toHexString(current.getZxid()),
+                                                current.getId(),
                                                 Long.toHexString(self.getQuorumVerifier().getVersion()));
                                     }
-                                    
+
                                     QuorumVerifier qv = self.getQuorumVerifier();
                                     ToSend notmsg = new ToSend(
                                             ToSend.mType.notification,
@@ -399,7 +419,7 @@ public class FastLeaderElection implements Election {
                                             current.getElectionEpoch(),
                                             self.getPeerState(),
                                             response.sid,
-                                            current.getPeerEpoch(), 
+                                            current.getPeerEpoch(),
                                             qv.toString().getBytes());
                                     sendqueue.offer(notmsg);
                                 }
@@ -414,9 +434,6 @@ public class FastLeaderElection implements Election {
             }
         }
 
-    
-  
-
         /**
          * This worker simply dequeues a message to send and
          * and queues it on the manager's queue.
@@ -451,21 +468,13 @@ public class FastLeaderElection implements Election {
              *
              * @param m     message to send
              */
-            private void process(ToSend m) {
-                byte requestBytes[] = new byte[36 + m.configData.length];
-                ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
-                /*
-                 * Building notification packet to send
-                 */
-
-                requestBuffer.clear();
-                requestBuffer.putInt(m.state.ordinal());
-                requestBuffer.putLong(m.leader);
-                requestBuffer.putLong(m.zxid);
-                requestBuffer.putLong(m.electionEpoch);
-                requestBuffer.putLong(m.peerEpoch);
-                requestBuffer.put(m.configData);
+            void process(ToSend m) {
+                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
+                                                    m.leader,
+                                                    m.zxid,
+                                                    m.electionEpoch,
+                                                    m.peerEpoch,
+                                                    m.configData);
 
                 manager.toSend(m.sid, requestBuffer);
 
@@ -474,6 +483,8 @@ public class FastLeaderElection implements Election {
 
         WorkerSender ws;
         WorkerReceiver wr;
+        Thread wsThread = null;
+        Thread wrThread = null;
 
         /**
          * Constructor of class Messenger.
@@ -484,17 +495,23 @@ public class FastLeaderElection implements Election {
 
             this.ws = new WorkerSender(manager);
 
-            Thread t = new Thread(this.ws,
+            this.wsThread = new Thread(this.ws,
                     "WorkerSender[myid=" + self.getId() + "]");
-            t.setDaemon(true);
-            t.start();
+            this.wsThread.setDaemon(true);
 
             this.wr = new WorkerReceiver(manager);
 
-            t = new Thread(this.wr,
+            this.wrThread = new Thread(this.wr,
                     "WorkerReceiver[myid=" + self.getId() + "]");
-            t.setDaemon(true);
-            t.start();
+            this.wrThread.setDaemon(true);
+        }
+
+        /**
+         * Starts instances of WorkerSender and WorkerReceiver
+         */
+        void start(){
+            this.wsThread.start();
+            this.wrThread.start();
         }
 
         /**
@@ -522,6 +539,55 @@ public class FastLeaderElection implements Election {
         return logicalclock;
     }
 
+    static ByteBuffer buildMsg(int state,
+            long leader,
+            long zxid,
+            long electionEpoch,
+            long epoch) {
+        byte requestBytes[] = new byte[40];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send, this is called directly only in tests
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(electionEpoch);
+        requestBuffer.putLong(epoch);
+        requestBuffer.putInt(0x1);
+
+        return requestBuffer;
+    }
+
+    static ByteBuffer buildMsg(int state,
+            long leader,
+            long zxid,
+            long electionEpoch,
+            long epoch,
+            byte[] configData) {
+        byte requestBytes[] = new byte[44 + configData.length];
+        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
+
+        /*
+         * Building notification packet to send
+         */
+
+        requestBuffer.clear();
+        requestBuffer.putInt(state);
+        requestBuffer.putLong(leader);
+        requestBuffer.putLong(zxid);
+        requestBuffer.putLong(electionEpoch);
+        requestBuffer.putLong(epoch);
+        requestBuffer.putInt(Notification.CURRENTVERSION);
+        requestBuffer.putInt(configData.length);
+        requestBuffer.put(configData);
+
+        return requestBuffer;
+    }
+
     /**
      * Constructor of FastLeaderElection. It takes two parameters, one
      * is the QuorumPeer object that instantiated this object, and the other
@@ -557,12 +623,17 @@ public class FastLeaderElection implements Election {
         this.messenger = new Messenger(manager);
     }
 
+    /**
+     * This method starts the sender and receiver threads.
+     */
+    public void start() {
+        this.messenger.start();
+    }
+
     private void leaveInstance(Vote v) {
         if(LOG.isDebugEnabled()){
-            LOG.debug("About to leave FLE instance: leader="
-                + v.getId() + ", zxid=0x" +
-                Long.toHexString(v.getZxid()) + ", my id=" + self.getId()
-                + ", my state=" + self.getPeerState());
+            LOG.debug("About to leave FLE instance: leader={}, zxid=0x{}, my id={}, my state={}",
+                v.getId(), Long.toHexString(v.getZxid()), self.getId(), self.getPeerState());
         }
         recvqueue.clear();
     }
@@ -582,7 +653,6 @@ public class FastLeaderElection implements Election {
         messenger.halt();
         LOG.debug("FLE is down");
     }
-    
 
     /**
      * Send notifications to all peers upon a change in our vote
@@ -608,14 +678,17 @@ public class FastLeaderElection implements Election {
     }
 
     private void printNotification(Notification n){
-        LOG.info("Notification: " + n.leader + " (n.leader), 0x"
+        LOG.info("Notification: "
+                + Long.toHexString(n.version) + " (message format version), "
+                + n.leader + " (n.leader), 0x"
                 + Long.toHexString(n.zxid) + " (n.zxid), 0x"
                 + Long.toHexString(n.electionEpoch) + " (n.round), " + n.state
                 + " (n.state), " + n.sid + " (n.sid), 0x"
                 + Long.toHexString(n.peerEpoch) + " (n.peerEPoch), "
-                + self.getPeerState() + " (my state)" + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):""));
+                + self.getPeerState() + " (my state)"
+                + (n.qv!=null ? (Long.toHexString(n.qv.getVersion()) + " (n.config version)"):""));
     }
- 
+
 
     /**
      * Check if a pair (server id, zxid) succeeds our
@@ -630,7 +703,7 @@ public class FastLeaderElection implements Election {
         if(self.getQuorumVerifier().getWeight(newId) == 0){
             return false;
         }
-        
+
         /*
          * We return true if one of the following three cases hold:
          * 1- New epoch is higher
@@ -638,8 +711,8 @@ public class FastLeaderElection implements Election {
          * 3- New epoch is the same as current epoch, new zxid is the same
          *  as current zxid, but server id is higher.
          */
-        
-        return ((newEpoch > curEpoch) || 
+
+        return ((newEpoch > curEpoch) ||
                 ((newEpoch == curEpoch) &&
                 ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
     }
@@ -700,7 +773,7 @@ public class FastLeaderElection implements Election {
             else if(votes.get(leader).getState() != ServerState.LEADING) predicate = false;
         } else if(logicalclock != electionEpoch) {
             predicate = false;
-        } 
+        }
 
         return predicate;
     }
@@ -776,7 +849,7 @@ public class FastLeaderElection implements Election {
         	}
         else return Long.MIN_VALUE;
     }
-    
+
     /**
      * Starts a new round of leader election. Whenever our QuorumPeer
      * changes its state to LOOKING, this method is invoked, and it

+ 1 - 1
src/java/main/org/apache/zookeeper/server/quorum/QuorumCnxManager.java

@@ -70,7 +70,7 @@ public class QuorumCnxManager {
     // stale notifications to peers
     static final int SEND_CAPACITY = 1;
 
-    static final int PACKETMAXSIZE = 1024 * 1024; 
+    static final int PACKETMAXSIZE = 1024 * 512;
     /*
      * Maximum number of attempts to connect to a peer
      */

+ 3 - 1
src/java/main/org/apache/zookeeper/server/quorum/QuorumPeer.java

@@ -810,7 +810,9 @@ public class QuorumPeer extends ZooKeeperThread implements QuorumStats.Provider
             QuorumCnxManager.Listener listener = qcm.listener;
             if(listener != null){
                 listener.start();
-                le = new FastLeaderElection(this, qcm);
+                FastLeaderElection fle = new FastLeaderElection(this, qcm);
+                fle.start();
+                le = fle;
             } else {
                 LOG.error("Null listener when initializing cnx manager");
             }

+ 42 - 5
src/java/main/org/apache/zookeeper/server/quorum/Vote.java

@@ -23,7 +23,9 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 
 public class Vote {
     
-    public Vote(long id, long zxid) {
+    public Vote(long id,
+                    long zxid) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = -1;
@@ -31,7 +33,10 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
     
-    public Vote(long id, long zxid, long peerEpoch) {
+    public Vote(long id,
+                    long zxid,
+                    long peerEpoch) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = -1;
@@ -39,7 +44,11 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
 
-    public Vote(long id, long zxid, long electionEpoch, long peerEpoch) {
+    public Vote(long id,
+                    long zxid,
+                    long electionEpoch,
+                    long peerEpoch) {
+        this.version = 0x0;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = electionEpoch;
@@ -47,7 +56,13 @@ public class Vote {
         this.state = ServerState.LOOKING;
     }
     
-    public Vote(long id, long zxid, long electionEpoch, long peerEpoch, ServerState state) {
+    public Vote(int version,
+                    long id,
+                    long zxid,
+                    long electionEpoch,
+                    long peerEpoch,
+                    ServerState state) {
+        this.version = version;
         this.id = id;
         this.zxid = zxid;
         this.electionEpoch = electionEpoch;
@@ -55,6 +70,21 @@ public class Vote {
         this.peerEpoch = peerEpoch;
     }
     
+    public Vote(long id,
+                    long zxid,
+                    long electionEpoch,
+                    long peerEpoch,
+                    ServerState state) {
+        this.id = id;
+        this.zxid = zxid;
+        this.electionEpoch = electionEpoch;
+        this.state = state;
+        this.peerEpoch = peerEpoch;
+        this.version = 0x0;
+    }
+
+    final private int version;
+
     final private long id;
     
     final private long zxid;
@@ -63,6 +93,10 @@ public class Vote {
     
     final private long peerEpoch;
     
+    public int getVersion() {
+        return version;
+    }
+
     public long getId() {
         return id;
     }
@@ -91,7 +125,10 @@ public class Vote {
             return false;
         }
         Vote other = (Vote) o;
-        return (id == other.id && zxid == other.zxid && electionEpoch == other.electionEpoch && peerEpoch == other.peerEpoch);
+        return (id == other.id
+                    && zxid == other.zxid
+                    && electionEpoch == other.electionEpoch
+                    && peerEpoch == other.peerEpoch);
 
     }
 

+ 32 - 42
src/java/test/org/apache/zookeeper/test/FLEBackwardElectionRoundTest.java → src/java/test/org/apache/zookeeper/server/quorum/FLEBackwardElectionRoundTest.java

@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.test;
+package org.apache.zookeeper.server.quorum;
 
 import java.io.File;
 import java.io.IOException;
@@ -32,16 +32,15 @@ import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.Vote;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.zookeeper.test.FLETestUtils.LEThread;
-
 public class FLEBackwardElectionRoundTest extends ZKTestCase {
     protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
-    
+
     int count;
     HashMap<Long,QuorumServer> peers;
     File tmpdir[];
@@ -67,94 +66,85 @@ public class FLEBackwardElectionRoundTest extends ZKTestCase {
             }
         }
     }
-    
+
     /**
      * This test is checking the following case. A server S is
-     * currently LOOKING and it receives notifications from 
+     * currently LOOKING and it receives notifications from
      * a quorum indicating they are following S. The election
-     * round E of S is higher than the election round E' in the 
+     * round E of S is higher than the election round E' in the
      * notification messages, so S becomes the leader and sets
      * its epoch back to E'. In the meanwhile, one or more
      * followers turn to LOOKING and elect S in election round E.
      * Having leader and followers with different election rounds
      * might prevent other servers from electing a leader because
-     * they can't get a consistent set of notifications from a 
-     * quorum. 
-     * 
+     * they can't get a consistent set of notifications from a
+     * quorum.
+     *
      * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1514}
-     *    
-     * 
+     *
+     *
      * @throws Exception
      */
-    
+
     @Test
     public void testBackwardElectionRound() throws Exception {
-        LOG.info("TestLE: " + getTestName()+ ", " + count);
+        LOG.info("TestLE: {}, {}", getTestName(), count);
         for(int i = 0; i < count; i++) {
             int clientport = PortAssignment.unique();
             peers.put(Long.valueOf(i),
-                new QuorumServer(i,
-                    new InetSocketAddress("127.0.0.1", clientport),
-                    new InetSocketAddress(
-                        "127.0.0.1", PortAssignment.unique())));
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                            new InetSocketAddress(PortAssignment.unique())));
             tmpdir[i] = ClientBase.createTmpDir();
             port[i] = clientport;
         }
 
+        ByteBuffer initialMsg = FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1);
+
         /*
          * Start server 0
          */
-
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
         peer.startLeaderElection();
         FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 0);
-        thread.start();  
-        
-        
+        thread.start();
+
         /*
          * Start mock server 1
          */
         QuorumPeer mockPeer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
         cnxManagers[0] = new QuorumCnxManager(mockPeer);
-        QuorumCnxManager.Listener listener = cnxManagers[0].listener;
-        listener.start();
+        cnxManagers[0].listener.start();
+
+        cnxManagers[0].toSend(0l, initialMsg);
 
-        cnxManagers[0].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
-        
         /*
          * Start mock server 2
          */
         mockPeer = new QuorumPeer(peers, tmpdir[2], tmpdir[2], port[2], 3, 2, 1000, 2, 2);
         cnxManagers[1] = new QuorumCnxManager(mockPeer);
-        listener = cnxManagers[1].listener;
-        listener.start();
+        cnxManagers[1].listener.start();
+
+        cnxManagers[1].toSend(0l, initialMsg);
 
-        cnxManagers[1].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
-        
         /*
          * Run another instance of leader election.
          */
         thread.join(5000);
-        Assert.assertTrue("State is not leading. Current state:"
-                + peer.getPeerState(),
-                peer.getPeerState() == ServerState.LEADING);
         thread = new FLETestUtils.LEThread(peer, 0);
         thread.start();
-        
+
         /*
          * Send the same messages, this time should not make 0 the leader.
          */
-        cnxManagers[0].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
-        cnxManagers[1].toSend(0l, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 0, 0, 1));
-        
-        
+        cnxManagers[0].toSend(0l, initialMsg);
+        cnxManagers[1].toSend(0l, initialMsg);
+
         thread.join(5000);
-        Assert.assertTrue("State is not looking. Current state:"
-                + peer.getPeerState(),
-                peer.getPeerState() == ServerState.LOOKING);
+
         if (!thread.isAlive()) {
             Assert.fail("Should not have joined");
         }
-        
+
     }
 }

+ 11 - 23
src/java/test/org/apache/zookeeper/test/FLELostMessageTest.java → src/java/test/org/apache/zookeeper/server/quorum/FLELostMessageTest.java

@@ -16,23 +16,23 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.test;
+package org.apache.zookeeper.server.quorum;
 
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
-import java.nio.ByteBuffer;
 import java.util.HashMap;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumCnxManager;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
-import org.apache.zookeeper.server.quorum.Vote;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
+import org.apache.zookeeper.test.ClientBase;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -41,7 +41,6 @@ import org.junit.Test;
 public class FLELostMessageTest extends ZKTestCase {
     protected static final Logger LOG = LoggerFactory.getLogger(FLELostMessageTest.class);
 
-
     int count;
     HashMap<Long,QuorumServer> peers;
     File tmpdir[];
@@ -62,18 +61,16 @@ public class FLELostMessageTest extends ZKTestCase {
     public void tearDown() throws Exception {
         cnxManager.halt();
     }
-    
+
     @Test
     public void testLostMessage() throws Exception {
-
-        LOG.info("TestLE: " + getTestName()+ ", " + count);
+        LOG.info("TestLE: {}, {}", getTestName(), count);
         for(int i = 0; i < count; i++) {
             int clientport = PortAssignment.unique();
             peers.put(Long.valueOf(i),
-                new QuorumServer(i,
-                    new InetSocketAddress("127.0.0.1", clientport),
-                    new InetSocketAddress(
-                        "127.0.0.1", PortAssignment.unique())));
+                    new QuorumServer(i,
+                            new InetSocketAddress(clientport),
+                            new InetSocketAddress(PortAssignment.unique())));
             tmpdir[i] = ClientBase.createTmpDir();
             port[i] = clientport;
         }
@@ -81,7 +78,6 @@ public class FLELostMessageTest extends ZKTestCase {
         /*
          * Start server 0
          */
-
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 3, 1, 1000, 2, 2);
         peer.startLeaderElection();
         FLETestUtils.LEThread thread = new FLETestUtils.LEThread(peer, 1);
@@ -92,26 +88,18 @@ public class FLELostMessageTest extends ZKTestCase {
          */
         mockServer();
         thread.join(5000);
-        Assert.assertTrue("State is not leading. Current state:"
-                + peer.getPeerState(),
-                peer.getPeerState() == ServerState.LEADING);
         if (thread.isAlive()) {
             Assert.fail("Threads didn't join");
         }
     }
 
     void mockServer() throws InterruptedException, IOException {
-        /*
-         * Create an instance of the connection manager
-         */
         QuorumPeer peer = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 3, 0, 1000, 2, 2);
         cnxManager = new QuorumCnxManager(peer);
-        QuorumCnxManager.Listener listener = cnxManager.listener;
-        listener.start();
-
+        cnxManager.listener.start();
 
-        cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 1));
+        cnxManager.toSend(1l, FLETestUtils.createMsg(ServerState.LOOKING.ordinal(), 0, 0, 0));
         cnxManager.recvQueue.take();
-        cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 1));
+        cnxManager.toSend(1L, FLETestUtils.createMsg(ServerState.FOLLOWING.ordinal(), 1, 0, 0));
     }
 }

+ 14 - 27
src/java/test/org/apache/zookeeper/test/FLETestUtils.java → src/java/test/org/apache/zookeeper/server/quorum/FLETestUtils.java

@@ -15,10 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.zookeeper.test;
+package org.apache.zookeeper.server.quorum;
 
 import java.nio.ByteBuffer;
 
+import org.apache.zookeeper.server.quorum.FastLeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.Vote;
 
@@ -30,8 +31,7 @@ import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 
 public class FLETestUtils {
     protected static final Logger LOG = LoggerFactory.getLogger(FLETestUtils.class);
-    
-    
+
     /*
      * Thread to run an instance of leader election for 
      * a given quorum peer.
@@ -43,18 +43,18 @@ public class FLETestUtils {
         LEThread(QuorumPeer peer, int i) {
             this.i = i;
             this.peer = peer;
-            LOG.info("Constructor: " + getName());
+            LOG.info("Constructor: {}", getName());
 
         }
 
-        public void run(){
-            try{
+        public void run() {
+            try {
                 Vote v = null;
                 peer.setPeerState(ServerState.LOOKING);
-                LOG.info("Going to call leader election: " + i);
+                LOG.info("Going to call leader election: {}", i);
                 v = peer.getElectionAlg().lookForLeader();
 
-                if (v == null){
+                if (v == null) {
                     Assert.fail("Thread " + i + " got a null vote");
                 }
 
@@ -64,34 +64,21 @@ public class FLETestUtils {
                  */
                 peer.setCurrentVote(v);
 
-                LOG.info("Finished election: " + i + ", " + v.getId());
-                LOG.info("QuorumPeer state: " + peer.getPeerState());
+                LOG.info("Finished election: {}, {}", i, v.getId());
+
+                Assert.assertTrue("State is not leading.", peer.getPeerState() == ServerState.LEADING);
             } catch (Exception e) {
                 e.printStackTrace();
             }
             LOG.info("Joining");
         }
     }
-    
+
     /*
      * Creates a leader election notification message.
      */
-    
     static ByteBuffer createMsg(int state, long leader, long zxid, long epoch){
-        byte requestBytes[] = new byte[28];
-        ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
-
-        /*
-         * Building notification packet to send
-         */
-
-        requestBuffer.clear();
-        requestBuffer.putInt(state);
-        requestBuffer.putLong(leader);
-        requestBuffer.putLong(zxid);
-        requestBuffer.putLong(epoch);
-
-        return requestBuffer;
+        return FastLeaderElection.buildMsg(state, leader, zxid, 1, epoch);
     }
 
-}
+}

+ 1 - 3
src/java/test/org/apache/zookeeper/test/FLENewEpochTest.java

@@ -152,8 +152,6 @@ public class FLENewEpochTest extends ZKTestCase {
       @Test
       public void testLENewEpoch() throws Exception {
 
-          FastLeaderElection le[] = new FastLeaderElection[count];
-
           LOG.info("TestLE: " + getTestName()+ ", " + count);
           for(int i = 0; i < count; i++) {
               peers.put(Long.valueOf(i),
@@ -166,7 +164,7 @@ public class FLENewEpochTest extends ZKTestCase {
               port[i] = PortAssignment.unique();
           }
 
-          for(int i = 1; i < le.length; i++) {
+          for(int i = 1; i < count; i++) {
               QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2);
               peer.startLeaderElection();
               LEThread thread = new LEThread(peer, i);

+ 1 - 0
src/java/test/org/apache/zookeeper/test/FLEPredicateTest.java

@@ -78,6 +78,7 @@ public class FLEPredicateTest extends ZKTestCase {
                                         PortAssignment.unique(), 3, 0, 1000, 2, 2);
         
             MockFLE mock = new MockFLE(peer);
+            mock.start();
             
             /*
              * Lower epoch must return false

+ 1 - 2
src/java/test/org/apache/zookeeper/test/FLETest.java

@@ -297,7 +297,6 @@ public class FLETest extends ZKTestCase {
      * @throws Exception
      */
     private void runElection(int rounds) throws Exception {
-        FastLeaderElection le[] = new FastLeaderElection[count];
         ConcurrentHashMap<Long, HashSet<Integer> > quora = 
             new ConcurrentHashMap<Long, HashSet<Integer> >();
 
@@ -322,7 +321,7 @@ public class FLETest extends ZKTestCase {
         /*
          * Start one LEThread for each peer we want to run.
          */
-        for(int i = 0; i < le.length; i++) {
+        for(int i = 0; i < count; i++) {
             QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i],
                     port[i], 3, i, 1000, 2, 2);
             peer.startLeaderElection();

+ 1 - 3
src/java/test/org/apache/zookeeper/test/FLEZeroWeightTest.java

@@ -140,8 +140,6 @@ public class FLEZeroWeightTest extends ZKTestCase {
 
     @Test
     public void testZeroWeightQuorum() throws Exception {
-        FastLeaderElection le[] = new FastLeaderElection[count];
-
         LOG.info("TestZeroWeightQuorum: " + getTestName()+ ", " + count);
         for(int i = 0; i < count; i++) {
             InetSocketAddress addr1 = new InetSocketAddress("127.0.0.1",PortAssignment.unique());
@@ -153,7 +151,7 @@ public class FLEZeroWeightTest extends ZKTestCase {
             tmpdir[i] = ClientBase.createTmpDir();
         }
 
-        for(int i = 0; i < le.length; i++) {
+        for(int i = 0; i < count; i++) {
             QuorumHierarchical hq = new QuorumHierarchical(qp);
             QuorumPeer peer = new QuorumPeer(peers, tmpdir[i], tmpdir[i], port[i], 3, i, 1000, 2, 2, hq);
             peer.startLeaderElection();

+ 1 - 0
src/java/test/org/apache/zookeeper/test/LENonTerminateTest.java

@@ -37,6 +37,7 @@ import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.quorum.Election;
+import org.apache.zookeeper.server.quorum.FLELostMessageTest;
 import org.apache.zookeeper.server.quorum.LeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.Vote;