ソースを参照

ZOOKEEPER-3104: Fix data inconsistency due to NEWLEADER being sent too early

Author: Fangmin Lyu <allenlyu@fb.com>

Reviewers: Benjamin Reed <breed@apache.org>, Norbert Kalmar <nkalmar@yahoo.com>

Closes #583 from lvfangmin/ZOOKEEPER-3104
Fangmin Lyu 7 年 前
コミット
148c2cd6ba

+ 106 - 99
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -88,8 +88,7 @@ public class Leader {
         LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + maxConcurrentSnapshotTimeout);
         LOG.info(MAX_CONCURRENT_SNAPSHOT_TIMEOUT + " = " + maxConcurrentSnapshotTimeout);
     }
     }
 
 
-    private final LearnerSnapshotThrottler learnerSnapshotThrottler = 
-        new LearnerSnapshotThrottler(maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
+    private final LearnerSnapshotThrottler learnerSnapshotThrottler;
 
 
     final LeaderZooKeeperServer zk;
     final LeaderZooKeeperServer zk;
 
 
@@ -111,6 +110,12 @@ public class Leader {
         return proposalStats;
         return proposalStats;
     }
     }
 
 
+    public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
+            int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
+        return new LearnerSnapshotThrottler(
+                maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
+    }
+
     /**
     /**
      * Returns a copy of the current learner snapshot
      * Returns a copy of the current learner snapshot
      */
      */
@@ -207,7 +212,7 @@ public class Leader {
     /**
     /**
      * Returns true if a quorum in qv is connected and synced with the leader
      * Returns true if a quorum in qv is connected and synced with the leader
      * and false otherwise
      * and false otherwise
-     *  
+     *
      * @param qv, a QuorumVerifier
      * @param qv, a QuorumVerifier
      */
      */
     public boolean isQuorumSynced(QuorumVerifier qv) {
     public boolean isQuorumSynced(QuorumVerifier qv) {
@@ -223,7 +228,7 @@ public class Leader {
        }
        }
        return qv.containsQuorum(ids);
        return qv.containsQuorum(ids);
     }
     }
-    
+
     private final ServerSocket ss;
     private final ServerSocket ss;
 
 
     Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
     Leader(QuorumPeer self,LeaderZooKeeperServer zk) throws IOException {
@@ -248,6 +253,8 @@ public class Leader {
             throw e;
             throw e;
         }
         }
         this.zk = zk;
         this.zk = zk;
+        this.learnerSnapshotThrottler = createLearnerSnapshotThrottler(
+                maxConcurrentSnapshots, maxConcurrentSnapshotTimeout);
     }
     }
 
 
     /**
     /**
@@ -342,17 +349,17 @@ public class Leader {
      * This message type informs observers of a committed proposal.
      * This message type informs observers of a committed proposal.
      */
      */
     final static int INFORM = 8;
     final static int INFORM = 8;
-    
+
     /**
     /**
      * Similar to COMMIT, only for a reconfig operation.
      * Similar to COMMIT, only for a reconfig operation.
      */
      */
     final static int COMMITANDACTIVATE = 9;
     final static int COMMITANDACTIVATE = 9;
-    
+
     /**
     /**
      * Similar to INFORM, only for a reconfig operation.
      * Similar to INFORM, only for a reconfig operation.
      */
      */
     final static int INFORMANDACTIVATE = 19;
     final static int INFORMANDACTIVATE = 19;
-    
+
     final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
     final ConcurrentMap<Long, Proposal> outstandingProposals = new ConcurrentHashMap<Long, Proposal>();
 
 
     private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
     private final ConcurrentLinkedQueue<Proposal> toBeApplied = new ConcurrentLinkedQueue<Proposal>();
@@ -415,9 +422,9 @@ public class Leader {
     long epoch = -1;
     long epoch = -1;
     boolean waitingForNewEpoch = true;
     boolean waitingForNewEpoch = true;
 
 
-    // when a reconfig occurs where the leader is removed or becomes an observer, 
+    // when a reconfig occurs where the leader is removed or becomes an observer,
    // it does not commit ops after committing the reconfig
    // it does not commit ops after committing the reconfig
-    boolean allowedToCommit = true;     
+    boolean allowedToCommit = true;
     /**
     /**
      * This method is main function that is called to lead
      * This method is main function that is called to lead
      *
      *
@@ -467,20 +474,20 @@ public class Leader {
             QuorumVerifier curQV = self.getQuorumVerifier();
             QuorumVerifier curQV = self.getQuorumVerifier();
             if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
             if (curQV.getVersion() == 0 && curQV.getVersion() == lastSeenQV.getVersion()) {
                 // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly
                 // This was added in ZOOKEEPER-1783. The initial config has version 0 (not explicitly
-                // specified by the user; the lack of version in a config file is interpreted as version=0). 
+                // specified by the user; the lack of version in a config file is interpreted as version=0).
                 // As soon as a config is established we would like to increase its version so that it
                 // As soon as a config is established we would like to increase its version so that it
                 // takes presedence over other initial configs that were not established (such as a config
                 // takes presedence over other initial configs that were not established (such as a config
-                // of a server trying to join the ensemble, which may be a partial view of the system, not the full config). 
+                // of a server trying to join the ensemble, which may be a partial view of the system, not the full config).
                 // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that
                 // We chose to set the new version to the one of the NEWLEADER message. However, before we can do that
                 // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,
                 // there must be agreement on the new version, so we can only change the version when sending/receiving UPTODATE,
-                // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier, 
-                // and there's still no agreement on the new version that we'd like to use. Instead, we use 
+                // not when sending/receiving NEWLEADER. In other words, we can't change curQV here since its the committed quorum verifier,
+                // and there's still no agreement on the new version that we'd like to use. Instead, we use
                 // lastSeenQuorumVerifier which is being sent with NEWLEADER message
                 // lastSeenQuorumVerifier which is being sent with NEWLEADER message
-                // so its a good way to let followers know about the new version. (The original reason for sending 
+                // so its a good way to let followers know about the new version. (The original reason for sending
                 // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs
                 // lastSeenQuorumVerifier with NEWLEADER is so that the leader completes any potentially uncommitted reconfigs
-                // that it finds before starting to propose operations. Here we're reusing the same code path for 
+                // that it finds before starting to propose operations. Here we're reusing the same code path for
                 // reaching consensus on the new version number.)
                 // reaching consensus on the new version number.)
-                
+
                 // It is important that this is done before the leader executes waitForEpochAck,
                 // It is important that this is done before the leader executes waitForEpochAck,
                 // so before LearnerHandlers return from their waitForEpochAck
                 // so before LearnerHandlers return from their waitForEpochAck
                 // hence before they construct the NEWLEADER message containing
                 // hence before they construct the NEWLEADER message containing
@@ -488,24 +495,24 @@ public class Leader {
                try {
                try {
                    QuorumVerifier newQV = self.configFromString(curQV.toString());
                    QuorumVerifier newQV = self.configFromString(curQV.toString());
                    newQV.setVersion(zk.getZxid());
                    newQV.setVersion(zk.getZxid());
-                   self.setLastSeenQuorumVerifier(newQV, true);    
+                   self.setLastSeenQuorumVerifier(newQV, true);
                } catch (Exception e) {
                } catch (Exception e) {
                    throw new IOException(e);
                    throw new IOException(e);
                }
                }
             }
             }
-            
+
             newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
             newLeaderProposal.addQuorumVerifier(self.getQuorumVerifier());
             if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
             if (self.getLastSeenQuorumVerifier().getVersion() > self.getQuorumVerifier().getVersion()){
                newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                newLeaderProposal.addQuorumVerifier(self.getLastSeenQuorumVerifier());
             }
             }
-            
+
             // We have to get at least a majority of servers in sync with
             // We have to get at least a majority of servers in sync with
             // us. We do this by waiting for the NEWLEADER packet to get
             // us. We do this by waiting for the NEWLEADER packet to get
             // acknowledged
             // acknowledged
-                       
+
              waitForEpochAck(self.getId(), leaderStateSummary);
              waitForEpochAck(self.getId(), leaderStateSummary);
-             self.setCurrentEpoch(epoch);    
-            
+             self.setCurrentEpoch(epoch);
+
              try {
              try {
                  waitForNewLeaderAck(self.getId(), zk.getZxid());
                  waitForNewLeaderAck(self.getId(), zk.getZxid());
              } catch (InterruptedException e) {
              } catch (InterruptedException e) {
@@ -517,14 +524,14 @@ public class Leader {
                      if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
                      if (self.getQuorumVerifier().getVotingMembers().containsKey(f.getSid())){
                          followerSet.add(f.getSid());
                          followerSet.add(f.getSid());
                      }
                      }
-                 }    
+                 }
                  boolean initTicksShouldBeIncreased = true;
                  boolean initTicksShouldBeIncreased = true;
                  for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
                  for (Proposal.QuorumVerifierAcksetPair qvAckset:newLeaderProposal.qvAcksetPairs) {
                      if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
                      if (!qvAckset.getQuorumVerifier().containsQuorum(followerSet)) {
                          initTicksShouldBeIncreased = false;
                          initTicksShouldBeIncreased = false;
                          break;
                          break;
                      }
                      }
-                 }                  
+                 }
                  if (initTicksShouldBeIncreased) {
                  if (initTicksShouldBeIncreased) {
                      LOG.warn("Enough followers present. "+
                      LOG.warn("Enough followers present. "+
                              "Perhaps the initTicks need to be increased.");
                              "Perhaps the initTicks need to be increased.");
@@ -533,7 +540,7 @@ public class Leader {
              }
              }
 
 
              startZkServer();
              startZkServer();
-             
+
             /**
             /**
              * WARNING: do not use this for anything other than QA testing
              * WARNING: do not use this for anything other than QA testing
              * on a real cluster. Specifically to enable verification that quorum
              * on a real cluster. Specifically to enable verification that quorum
@@ -679,39 +686,39 @@ public class Leader {
 
 
     /** In a reconfig operation, this method attempts to find the best leader for next configuration.
     /** In a reconfig operation, this method attempts to find the best leader for next configuration.
      *  If the current leader is a voter in the next configuartion, then it remains the leader.
      *  If the current leader is a voter in the next configuartion, then it remains the leader.
-     *  Otherwise, choose one of the new voters that acked the reconfiguartion, such that it is as   
+     *  Otherwise, choose one of the new voters that acked the reconfiguartion, such that it is as
      * up-to-date as possible, i.e., acked as many outstanding proposals as possible.
      * up-to-date as possible, i.e., acked as many outstanding proposals as possible.
-     *  
+     *
      * @param reconfigProposal
      * @param reconfigProposal
      * @param zxid of the reconfigProposal
      * @param zxid of the reconfigProposal
      * @return server if of the designated leader
      * @return server if of the designated leader
      */
      */
-    
+
     private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
     private long getDesignatedLeader(Proposal reconfigProposal, long zxid) {
        //new configuration
        //new configuration
-       Proposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size()-1);        
-       
-       //check if I'm in the new configuration with the same quorum address - 
-       // if so, I'll remain the leader    
-       if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) && 
-               newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())){  
+       Proposal.QuorumVerifierAcksetPair newQVAcksetPair = reconfigProposal.qvAcksetPairs.get(reconfigProposal.qvAcksetPairs.size()-1);
+
+       //check if I'm in the new configuration with the same quorum address -
+       // if so, I'll remain the leader
+       if (newQVAcksetPair.getQuorumVerifier().getVotingMembers().containsKey(self.getId()) &&
+               newQVAcksetPair.getQuorumVerifier().getVotingMembers().get(self.getId()).addr.equals(self.getQuorumAddress())){
            return self.getId();
            return self.getId();
        }
        }
-       // start with an initial set of candidates that are voters from new config that 
-       // acknowledged the reconfig op (there must be a quorum). Choose one of them as 
+       // start with an initial set of candidates that are voters from new config that
+       // acknowledged the reconfig op (there must be a quorum). Choose one of them as
        // current leader candidate
        // current leader candidate
        HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
        HashSet<Long> candidates = new HashSet<Long>(newQVAcksetPair.getAckset());
        candidates.remove(self.getId()); // if we're here, I shouldn't be the leader
        candidates.remove(self.getId()); // if we're here, I shouldn't be the leader
        long curCandidate = candidates.iterator().next();
        long curCandidate = candidates.iterator().next();
-       
+
        //go over outstanding ops in order, and try to find a candidate that acked the most ops.
        //go over outstanding ops in order, and try to find a candidate that acked the most ops.
        //this way it will be the most up-to-date and we'll minimize the number of ops that get dropped
        //this way it will be the most up-to-date and we'll minimize the number of ops that get dropped
-       
+
        long curZxid = zxid + 1;
        long curZxid = zxid + 1;
        Proposal p = outstandingProposals.get(curZxid);
        Proposal p = outstandingProposals.get(curZxid);
-               
-       while (p!=null && !candidates.isEmpty()) {                              
-           for (Proposal.QuorumVerifierAcksetPair qvAckset: p.qvAcksetPairs){ 
+
+       while (p!=null && !candidates.isEmpty()) {
+           for (Proposal.QuorumVerifierAcksetPair qvAckset: p.qvAcksetPairs){
                //reduce the set of candidates to those that acknowledged p
                //reduce the set of candidates to those that acknowledged p
                candidates.retainAll(qvAckset.getAckset());
                candidates.retainAll(qvAckset.getAckset());
                //no candidate acked p, return the best candidate found so far
                //no candidate acked p, return the best candidate found so far
@@ -719,18 +726,18 @@ public class Leader {
                //update the current candidate, and if it is the only one remaining, return it
                //update the current candidate, and if it is the only one remaining, return it
                curCandidate = candidates.iterator().next();
                curCandidate = candidates.iterator().next();
                if (candidates.size() == 1) return curCandidate;
                if (candidates.size() == 1) return curCandidate;
-           }      
+           }
            curZxid++;
            curZxid++;
            p = outstandingProposals.get(curZxid);
            p = outstandingProposals.get(curZxid);
        }
        }
-       
+
        return curCandidate;
        return curCandidate;
     }
     }
 
 
     /**
     /**
      * @return True if committed, otherwise false.
      * @return True if committed, otherwise false.
      **/
      **/
-    synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {       
+    synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) {
        // make sure that ops are committed in order. With reconfigurations it is now possible
        // make sure that ops are committed in order. With reconfigurations it is now possible
        // that different operations wait for different sets of acks, and we still want to enforce
        // that different operations wait for different sets of acks, and we still want to enforce
        // that they are committed in order. Currently we only permit one outstanding reconfiguration
        // that they are committed in order. Currently we only permit one outstanding reconfiguration
@@ -739,50 +746,50 @@ public class Leader {
        // for an operation without getting enough acks for preceding ops. But in the future if multiple
        // for an operation without getting enough acks for preceding ops. But in the future if multiple
        // concurrent reconfigs are allowed, this can happen.
        // concurrent reconfigs are allowed, this can happen.
        if (outstandingProposals.containsKey(zxid - 1)) return false;
        if (outstandingProposals.containsKey(zxid - 1)) return false;
-       
+
        // in order to be committed, a proposal must be accepted by a quorum.
        // in order to be committed, a proposal must be accepted by a quorum.
        //
        //
        // getting a quorum from all necessary configurations.
        // getting a quorum from all necessary configurations.
         if (!p.hasAllQuorums()) {
         if (!p.hasAllQuorums()) {
-           return false;                 
+           return false;
         }
         }
-        
+
         // commit proposals in order
         // commit proposals in order
-        if (zxid != lastCommitted+1) {    
+        if (zxid != lastCommitted+1) {
            LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
            LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)
                     + " from " + followerAddr + " not first!");
                     + " from " + followerAddr + " not first!");
             LOG.warn("First is "
             LOG.warn("First is "
                     + (lastCommitted+1));
                     + (lastCommitted+1));
-        }     
-        
+        }
+
         outstandingProposals.remove(zxid);
         outstandingProposals.remove(zxid);
-        
+
         if (p.request != null) {
         if (p.request != null) {
              toBeApplied.add(p);
              toBeApplied.add(p);
         }
         }
 
 
         if (p.request == null) {
         if (p.request == null) {
             LOG.warn("Going to commmit null: " + p);
             LOG.warn("Going to commmit null: " + p);
-        } else if (p.request.getHdr().getType() == OpCode.reconfig) {                                   
-            LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); 
-                 
-            //if this server is voter in new config with the same quorum address, 
+        } else if (p.request.getHdr().getType() == OpCode.reconfig) {
+            LOG.debug("Committing a reconfiguration! " + outstandingProposals.size());
+
+            //if this server is voter in new config with the same quorum address,
             //then it will remain the leader
             //then it will remain the leader
             //otherwise an up-to-date follower will be designated as leader. This saves
             //otherwise an up-to-date follower will be designated as leader. This saves
-            //leader election time, unless the designated leader fails                             
+            //leader election time, unless the designated leader fails
             Long designatedLeader = getDesignatedLeader(p, zxid);
             Long designatedLeader = getDesignatedLeader(p, zxid);
             //LOG.warn("designated leader is: " + designatedLeader);
             //LOG.warn("designated leader is: " + designatedLeader);
 
 
             QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
             QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();
-       
+
             self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
             self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
 
 
             if (designatedLeader != self.getId()) {
             if (designatedLeader != self.getId()) {
                 allowedToCommit = false;
                 allowedToCommit = false;
             }
             }
-                   
-            // we're sending the designated leader, and if the leader is changing the followers are 
-            // responsible for closing the connection - this way we are sure that at least a majority of them 
+
+            // we're sending the designated leader, and if the leader is changing the followers are
+            // responsible for closing the connection - this way we are sure that at least a majority of them
             // receive the commit message.
             // receive the commit message.
             commitAndActivate(zxid, designatedLeader);
             commitAndActivate(zxid, designatedLeader);
             informAndActivate(p, designatedLeader);
             informAndActivate(p, designatedLeader);
@@ -795,12 +802,12 @@ public class Leader {
         if(pendingSyncs.containsKey(zxid)){
         if(pendingSyncs.containsKey(zxid)){
             for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
             for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                 sendSync(r);
                 sendSync(r);
-            }               
-        } 
-        
-        return  true;   
+            }
+        }
+
+        return  true;
     }
     }
-    
+
     /**
     /**
      * Keep a count of acks that are received by the leader for a particular
      * Keep a count of acks that are received by the leader for a particular
      * proposal
      * proposal
@@ -809,9 +816,9 @@ public class Leader {
      * @param sid, the id of the server that sent the ack
      * @param sid, the id of the server that sent the ack
      * @param followerAddr
      * @param followerAddr
      */
      */
-    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {        
-        if (!allowedToCommit) return; // last op committed was a leader change - from now on 
-                                     // the new leader should commit        
+    synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
+        if (!allowedToCommit) return; // last op committed was a leader change - from now on
+                                     // the new leader should commit
         if (LOG.isTraceEnabled()) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
             LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
             for (Proposal p : outstandingProposals.values()) {
             for (Proposal p : outstandingProposals.values()) {
@@ -821,7 +828,7 @@ public class Leader {
             }
             }
             LOG.trace("outstanding proposals all");
             LOG.trace("outstanding proposals all");
         }
         }
-        
+
         if ((zxid & 0xffffffffL) == 0) {
         if ((zxid & 0xffffffffL) == 0) {
             /*
             /*
              * We no longer process NEWLEADER ack with this method. However,
              * We no longer process NEWLEADER ack with this method. However,
@@ -830,8 +837,8 @@ public class Leader {
              */
              */
             return;
             return;
         }
         }
-            
-            
+
+
         if (outstandingProposals.size() == 0) {
         if (outstandingProposals.size() == 0) {
             if (LOG.isDebugEnabled()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("outstanding is 0");
                 LOG.debug("outstanding is 0");
@@ -852,13 +859,13 @@ public class Leader {
                     Long.toHexString(zxid), followerAddr);
                     Long.toHexString(zxid), followerAddr);
             return;
             return;
         }
         }
-        
-        p.addAck(sid);        
+
+        p.addAck(sid);
         /*if (LOG.isDebugEnabled()) {
         /*if (LOG.isDebugEnabled()) {
             LOG.debug("Count for zxid: 0x{} is {}",
             LOG.debug("Count for zxid: 0x{} is {}",
                     Long.toHexString(zxid), p.ackSet.size());
                     Long.toHexString(zxid), p.ackSet.size());
         }*/
         }*/
-        
+
         boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
         boolean hasCommitted = tryToCommit(p, zxid, followerAddr);
 
 
         // If p is a reconfiguration, multiple other operations may be ready to be committed,
         // If p is a reconfiguration, multiple other operations may be ready to be committed,
@@ -875,11 +882,11 @@ public class Leader {
            while (allowedToCommit && hasCommitted && p!=null){
            while (allowedToCommit && hasCommitted && p!=null){
                curZxid++;
                curZxid++;
                p = outstandingProposals.get(curZxid);
                p = outstandingProposals.get(curZxid);
-               if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);             
+               if (p !=null) hasCommitted = tryToCommit(p, curZxid, null);
            }
            }
         }
         }
     }
     }
-    
+
     static class ToBeAppliedRequestProcessor implements RequestProcessor {
     static class ToBeAppliedRequestProcessor implements RequestProcessor {
         private final RequestProcessor next;
         private final RequestProcessor next;
 
 
@@ -988,11 +995,11 @@ public class Leader {
         synchronized(this){
         synchronized(this){
             lastCommitted = zxid;
             lastCommitted = zxid;
         }
         }
-        
+
         byte data[] = new byte[8];
         byte data[] = new byte[8];
-        ByteBuffer buffer = ByteBuffer.wrap(data);                            
+        ByteBuffer buffer = ByteBuffer.wrap(data);
        buffer.putLong(designatedLeader);
        buffer.putLong(designatedLeader);
-       
+
         QuorumPacket qp = new QuorumPacket(Leader.COMMITANDACTIVATE, zxid, data, null);
         QuorumPacket qp = new QuorumPacket(Leader.COMMITANDACTIVATE, zxid, data, null);
         sendPacket(qp);
         sendPacket(qp);
     }
     }
@@ -1006,17 +1013,17 @@ public class Leader {
         sendObserverPacket(qp);
         sendObserverPacket(qp);
     }
     }
 
 
-    
+
     /**
     /**
      * Create an inform&activate packet and send it to all observers.
      * Create an inform&activate packet and send it to all observers.
      */
      */
     public void informAndActivate(Proposal proposal, long designatedLeader) {
     public void informAndActivate(Proposal proposal, long designatedLeader) {
        byte[] proposalData = proposal.packet.getData();
        byte[] proposalData = proposal.packet.getData();
         byte[] data = new byte[proposalData.length + 8];
         byte[] data = new byte[proposalData.length + 8];
-        ByteBuffer buffer = ByteBuffer.wrap(data);                            
+        ByteBuffer buffer = ByteBuffer.wrap(data);
        buffer.putLong(designatedLeader);
        buffer.putLong(designatedLeader);
        buffer.put(proposalData);
        buffer.put(proposalData);
-       
+
         QuorumPacket qp = new QuorumPacket(Leader.INFORMANDACTIVATE, proposal.request.zxid, data, null);
         QuorumPacket qp = new QuorumPacket(Leader.INFORMANDACTIVATE, proposal.request.zxid, data, null);
         sendObserverPacket(qp);
         sendObserverPacket(qp);
     }
     }
@@ -1064,19 +1071,19 @@ public class Leader {
 
 
         Proposal p = new Proposal();
         Proposal p = new Proposal();
         p.packet = pp;
         p.packet = pp;
-        p.request = request;                
-        
+        p.request = request;
+
         synchronized(this) {
         synchronized(this) {
            p.addQuorumVerifier(self.getQuorumVerifier());
            p.addQuorumVerifier(self.getQuorumVerifier());
-                   
+
            if (request.getHdr().getType() == OpCode.reconfig){
            if (request.getHdr().getType() == OpCode.reconfig){
-               self.setLastSeenQuorumVerifier(request.qv, true);                       
+               self.setLastSeenQuorumVerifier(request.qv, true);
            }
            }
-           
+
            if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
            if (self.getQuorumVerifier().getVersion()<self.getLastSeenQuorumVerifier().getVersion()) {
                p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
                p.addQuorumVerifier(self.getLastSeenQuorumVerifier());
            }
            }
-                   
+
             if (LOG.isDebugEnabled()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Proposing:: " + request);
                 LOG.debug("Proposing:: " + request);
             }
             }
@@ -1087,7 +1094,7 @@ public class Leader {
         }
         }
         return p;
         return p;
     }
     }
-    
+
     public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
     public LearnerSnapshotThrottler getLearnerSnapshotThrottler() {
         return learnerSnapshotThrottler;
         return learnerSnapshotThrottler;
     }
     }
@@ -1125,7 +1132,7 @@ public class Leader {
      *
      *
      * @param handler handler of the follower
      * @param handler handler of the follower
      * @return last proposed zxid
      * @return last proposed zxid
-     * @throws InterruptedException 
+     * @throws InterruptedException
      */
      */
     synchronized public long startForwarding(LearnerHandler handler,
     synchronized public long startForwarding(LearnerHandler handler,
             long lastSeenZxid) {
             long lastSeenZxid) {
@@ -1209,7 +1216,7 @@ public class Leader {
             }
             }
             if (ss.getCurrentEpoch() != -1) {
             if (ss.getCurrentEpoch() != -1) {
                 if (ss.isMoreRecentThan(leaderStateSummary)) {
                 if (ss.isMoreRecentThan(leaderStateSummary)) {
-                    throw new IOException("Follower is ahead of the leader, leader summary: " 
+                    throw new IOException("Follower is ahead of the leader, leader summary: "
                                                     + leaderStateSummary.getCurrentEpoch()
                                                     + leaderStateSummary.getCurrentEpoch()
                                                     + " (current epoch), "
                                                     + " (current epoch), "
                                                     + leaderStateSummary.getLastZxid()
                                                     + leaderStateSummary.getLastZxid()
@@ -1237,9 +1244,9 @@ public class Leader {
             }
             }
         }
         }
     }
     }
-    
+
     /**
     /**
-     * Return a list of sid in set as string  
+     * Return a list of sid in set as string
      */
      */
     private String getSidSetString(Set<Long> sidSet) {
     private String getSidSetString(Set<Long> sidSet) {
         StringBuilder sids = new StringBuilder();
         StringBuilder sids = new StringBuilder();
@@ -1264,7 +1271,7 @@ public class Leader {
                 + newLeaderProposal.ackSetsToString()
                 + newLeaderProposal.ackSetsToString()
                 + " ]; starting up and setting last processed zxid: 0x{}",
                 + " ]; starting up and setting last processed zxid: 0x{}",
                 Long.toHexString(zk.getZxid()));
                 Long.toHexString(zk.getZxid()));
-        
+
         /*
         /*
          * ZOOKEEPER-1324. the leader sends the new config it must complete
          * ZOOKEEPER-1324. the leader sends the new config it must complete
          *  to others inside a NEWLEADER message (see LearnerHandler where
          *  to others inside a NEWLEADER message (see LearnerHandler where
@@ -1273,20 +1280,20 @@ public class Leader {
          *  config to itself.
          *  config to itself.
          */
          */
         QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
         QuorumVerifier newQV = self.getLastSeenQuorumVerifier();
-        
-        Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());                                         
+
+        Long designatedLeader = getDesignatedLeader(newLeaderProposal, zk.getZxid());
 
 
         self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
         self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);
         if (designatedLeader != self.getId()) {
         if (designatedLeader != self.getId()) {
             allowedToCommit = false;
             allowedToCommit = false;
         }
         }
-        
+
         zk.startup();
         zk.startup();
         /*
         /*
          * Update the election vote here to ensure that all members of the
          * Update the election vote here to ensure that all members of the
          * ensemble report the same vote to new servers that start up and
          * ensemble report the same vote to new servers that start up and
          * send leader election notifications to the ensemble.
          * send leader election notifications to the ensemble.
-         * 
+         *
          * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
          * @see https://issues.apache.org/jira/browse/ZOOKEEPER-1732
          */
          */
         self.updateElectionVote(getEpoch());
         self.updateElectionVote(getEpoch());
@@ -1378,7 +1385,7 @@ public class Leader {
         case COMMIT:
         case COMMIT:
             return "COMMIT";
             return "COMMIT";
         case COMMITANDACTIVATE:
         case COMMITANDACTIVATE:
-            return "COMMITANDACTIVATE";           
+            return "COMMITANDACTIVATE";
         case PING:
         case PING:
             return "PING";
             return "PING";
         case REVALIDATE:
         case REVALIDATE:

+ 38 - 33
src/java/main/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -74,7 +74,7 @@ public class LearnerHandler extends ZooKeeperThread {
      * on the syncLimit. Once the deadline is past this learner should
      * on the syncLimit. Once the deadline is past this learner should
      * be considered no longer "sync'd" with the leader. */
      * be considered no longer "sync'd" with the leader. */
     volatile long tickOfNextAckDeadline;
     volatile long tickOfNextAckDeadline;
-    
+
     /**
     /**
      * ZooKeeper server identifier of this learner
      * ZooKeeper server identifier of this learner
      */
      */
@@ -159,7 +159,7 @@ public class LearnerHandler extends ZooKeeperThread {
 
 
     private final BufferedInputStream bufferedInput;
     private final BufferedInputStream bufferedInput;
     private BufferedOutputStream bufferedOutput;
     private BufferedOutputStream bufferedOutput;
-    
+
     /**
     /**
      * Keep track of whether we have started send packets thread
      * Keep track of whether we have started send packets thread
      */
      */
@@ -176,7 +176,7 @@ public class LearnerHandler extends ZooKeeperThread {
      * that we are going to blast it to the learner
      * that we are going to blast it to the learner
      */
      */
     private boolean needOpPacket = true;
     private boolean needOpPacket = true;
-    
+
     /**
     /**
      * Last zxid sent to the learner as part of synchronization
      * Last zxid sent to the learner as part of synchronization
      */
      */
@@ -188,6 +188,11 @@ public class LearnerHandler extends ZooKeeperThread {
         this.leader = leader;
         this.leader = leader;
         this.bufferedInput = bufferedInput;
         this.bufferedInput = bufferedInput;
 
 
+        if (Boolean.getBoolean(FORCE_SNAP_SYNC)) {
+            forceSnapSync = true;
+            LOG.info("Forcing snapshot sync is enabled");
+        }
+
         try {
         try {
             if (leader.self != null) {
             if (leader.self != null) {
                 leader.self.authServer.authenticate(sock,
                 leader.self.authServer.authenticate(sock,
@@ -403,7 +408,7 @@ public class LearnerHandler extends ZooKeeperThread {
             } else {
             } else {
                 LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
                 LOG.info("Follower sid: " + this.sid + " not in the current config " + Long.toHexString(leader.self.getQuorumVerifier().getVersion()));
             }
             }
-                        
+
             if (qp.getType() == Leader.OBSERVERINFO) {
             if (qp.getType() == Leader.OBSERVERINFO) {
                   learnerType = LearnerType.OBSERVER;
                   learnerType = LearnerType.OBSERVER;
             }
             }
@@ -440,31 +445,15 @@ public class LearnerHandler extends ZooKeeperThread {
                 leader.waitForEpochAck(this.getSid(), ss);
                 leader.waitForEpochAck(this.getSid(), ss);
             }
             }
             peerLastZxid = ss.getLastZxid();
             peerLastZxid = ss.getLastZxid();
-           
+
             // Take any necessary action if we need to send TRUNC or DIFF
             // Take any necessary action if we need to send TRUNC or DIFF
             // startForwarding() will be called in all cases
             // startForwarding() will be called in all cases
             boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
             boolean needSnap = syncFollower(peerLastZxid, leader.zk.getZKDatabase(), leader);
-            
-            LOG.debug("Sending NEWLEADER message to " + sid);
-            // the version of this quorumVerifier will be set by leader.lead() in case
-            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
-            // we got here, so the version was set
-            if (getVersion() < 0x10000) {
-                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                        newLeaderZxid, null, null);
-                oa.writeRecord(newLeaderQP, "packet");
-            } else {
-                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
-                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
-                                .toString().getBytes(), null);
-                queuedPackets.add(newLeaderQP);
-            }
-            bufferedOutput.flush();
 
 
             /* if we are not truncating or sending a diff just send a snapshot */
             /* if we are not truncating or sending a diff just send a snapshot */
             if (needSnap) {
             if (needSnap) {
                 boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
                 boolean exemptFromThrottle = getLearnerType() != LearnerType.OBSERVER;
-                LearnerSnapshot snapshot = 
+                LearnerSnapshot snapshot =
                         leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                         leader.getLearnerSnapshotThrottler().beginSnapshot(exemptFromThrottle);
                 try {
                 try {
                     long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
                     long zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
@@ -472,11 +461,11 @@ public class LearnerHandler extends ZooKeeperThread {
                     bufferedOutput.flush();
                     bufferedOutput.flush();
 
 
                     LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
                     LOG.info("Sending snapshot last zxid of peer is 0x{}, zxid of leader is 0x{}, "
-                            + "send zxid of db as 0x{}, {} concurrent snapshots, " 
+                            + "send zxid of db as 0x{}, {} concurrent snapshots, "
                             + "snapshot was {} from throttle",
                             + "snapshot was {} from throttle",
-                            Long.toHexString(peerLastZxid), 
+                            Long.toHexString(peerLastZxid),
                             Long.toHexString(leaderLastZxid),
                             Long.toHexString(leaderLastZxid),
-                            Long.toHexString(zxidToSend), 
+                            Long.toHexString(zxidToSend),
                             snapshot.getConcurrentSnapshotNumber(),
                             snapshot.getConcurrentSnapshotNumber(),
                             snapshot.isEssential() ? "exempt" : "not exempt");
                             snapshot.isEssential() ? "exempt" : "not exempt");
                     // Dump data to peer
                     // Dump data to peer
@@ -488,9 +477,25 @@ public class LearnerHandler extends ZooKeeperThread {
                 }
                 }
             }
             }
 
 
+            LOG.debug("Sending NEWLEADER message to " + sid);
+            // the version of this quorumVerifier will be set by leader.lead() in case
+            // the leader is just being established. waitForEpochAck makes sure that readyToStart is true if
+            // we got here, so the version was set
+            if (getVersion() < 0x10000) {
+                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                        newLeaderZxid, null, null);
+                oa.writeRecord(newLeaderQP, "packet");
+            } else {
+                QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
+                        newLeaderZxid, leader.self.getLastSeenQuorumVerifier()
+                                .toString().getBytes(), null);
+                queuedPackets.add(newLeaderQP);
+            }
+            bufferedOutput.flush();
+
             // Start thread that blast packets in the queue to learner
             // Start thread that blast packets in the queue to learner
             startSendingPackets();
             startSendingPackets();
-            
+
             /*
             /*
              * Have to wait for the first ACK, wait until
              * Have to wait for the first ACK, wait until
              * the leader is ready, and only then we can
              * the leader is ready, and only then we can
@@ -505,12 +510,12 @@ public class LearnerHandler extends ZooKeeperThread {
             }
             }
 
 
             if(LOG.isDebugEnabled()){
             if(LOG.isDebugEnabled()){
-            	LOG.debug("Received NEWLEADER-ACK message from " + sid);   
+            	LOG.debug("Received NEWLEADER-ACK message from " + sid);
             }
             }
             leader.waitForNewLeaderAck(getSid(), qp.getZxid());
             leader.waitForNewLeaderAck(getSid(), qp.getZxid());
 
 
             syncLimitCheck.start();
             syncLimitCheck.start();
-            
+
             // now that the ack has been processed expect the syncLimit
             // now that the ack has been processed expect the syncLimit
             sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
             sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);
 
 
@@ -526,7 +531,7 @@ public class LearnerHandler extends ZooKeeperThread {
             // so we need to mark when the peer can actually start
             // so we need to mark when the peer can actually start
             // using the data
             // using the data
             //
             //
-            LOG.debug("Sending UPTODATE message to " + sid);      
+            LOG.debug("Sending UPTODATE message to " + sid);
             queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
             queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
 
 
             while (true) {
             while (true) {
@@ -920,8 +925,8 @@ public class LearnerHandler extends ZooKeeperThread {
         }
         }
 
 
         return queuedZxid;
         return queuedZxid;
-    }    
-    
+    }
+
     public void shutdown() {
     public void shutdown() {
         // Send the packet of death
         // Send the packet of death
         try {
         try {
@@ -975,7 +980,7 @@ public class LearnerHandler extends ZooKeeperThread {
         QuorumPacket packet = new QuorumPacket(type, zxid, null, null);
         QuorumPacket packet = new QuorumPacket(type, zxid, null, null);
         queuePacket(packet);
         queuePacket(packet);
     }
     }
-    
+
     void queuePacket(QuorumPacket p) {
     void queuePacket(QuorumPacket p) {
         queuedPackets.add(p);
         queuedPackets.add(p);
     }
     }
@@ -984,7 +989,7 @@ public class LearnerHandler extends ZooKeeperThread {
         return isAlive()
         return isAlive()
         && leader.self.tick.get() <= tickOfNextAckDeadline;
         && leader.self.tick.get() <= tickOfNextAckDeadline;
     }
     }
-    
+
     /**
     /**
      * For testing, return packet queue
      * For testing, return packet queue
      * @return
      * @return

+ 299 - 2
src/java/test/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java

@@ -36,14 +36,18 @@ import java.nio.channels.SocketChannel;
 import java.nio.file.Paths;
 import java.nio.file.Paths;
 import java.util.HashMap;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
+import javax.security.sasl.SaslException;
 
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.log4j.Level;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.Logger;
 import org.apache.log4j.PatternLayout;
 import org.apache.log4j.PatternLayout;
 import org.apache.log4j.WriterAppender;
 import org.apache.log4j.WriterAppender;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.PortAssignment;
 import org.apache.zookeeper.PortAssignment;
@@ -53,6 +57,7 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.server.quorum.Leader.Proposal;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase;
@@ -481,7 +486,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
             Thread.sleep(1000);
             Thread.sleep(1000);
         }
         }
     }
     }
-    
+
     private void logStates(ZooKeeper[] zks) {
     private void logStates(ZooKeeper[] zks) {
             StringBuilder sbBuilder = new StringBuilder("Connection States: {");
             StringBuilder sbBuilder = new StringBuilder("Connection States: {");
            for (int i = 0; i < zks.length; i++) {
            for (int i = 0; i < zks.length; i++) {
@@ -1205,7 +1210,7 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
             }
             }
         }
         }
     }
     }
-    
+
     private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
     private Proposal findProposalOfType(Map<Long, Proposal> proposals, int type) {
         for (Proposal proposal : proposals.values()) {
         for (Proposal proposal : proposals.values()) {
             if (proposal.request.getHdr().getType() == type) {
             if (proposal.request.getHdr().getType() == type) {
@@ -1214,4 +1219,296 @@ public class QuorumPeerMainTest extends QuorumPeerTestBase {
         }
         }
         return null;
         return null;
     }
     }
+
+    /**
+     * Currently, in SNAP sync, the leader will start queuing the
+     * proposal/commits and the NEWLEADER packet before sending
+     * over the snapshot over wire. So it's possible that the zxid
+     * associated with the snapshot might be higher than all the
+     * packets queued before NEWLEADER.
+     *
+     * When the follower received the snapshot, it will apply all
+     * the txns queued before NEWLEADER, which may not cover all
+     * the txns up to the zxid in the snapshot. After that, it
+     * will write the snapshot out to disk with the zxid associated
+     * with the snapshot. In case the server crashed after writing
+     * this out, when loading the data from disk, it will use zxid
+     * of the snapshot file to sync with leader, and it could cause
+     * data inconsistent, because we only replayed partial of the
+     * historical data during previous syncing.
+     *
+     * This test case is going to cover and simulate this scenario
+     * and make sure there is no data inconsistency issue after fix.
+     */
+    @Test
+    public void testInconsistentDueToNewLeaderOrder() throws Exception {
+
+        // 1. set up an ensemble with 3 servers
+        final int ENSEMBLE_SERVERS = 3;
+        final int clientPorts[] = new int[ENSEMBLE_SERVERS];
+        StringBuilder sb = new StringBuilder();
+        String server;
+
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            server = "server." + i + "=127.0.0.1:" + PortAssignment.unique()
+                    + ":" + PortAssignment.unique() + ":participant;127.0.0.1:"
+                    + clientPorts[i];
+            sb.append(server + "\n");
+        }
+        String currentQuorumCfgSection = sb.toString();
+
+        // start servers
+        MainThread[] mt = new MainThread[ENSEMBLE_SERVERS];
+        ZooKeeper zk[] = new ZooKeeper[ENSEMBLE_SERVERS];
+        Context contexts[] = new Context[ENSEMBLE_SERVERS];
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            final Context context = new Context();
+            contexts[i] = context;
+            mt[i] = new MainThread(i, clientPorts[i], currentQuorumCfgSection,
+                    false) {
+                @Override
+                public TestQPMain getTestQPMain() {
+                    return new CustomizedQPMain(context);
+                }
+            };
+            mt[i].start();
+            zk[i] = new ZooKeeper("127.0.0.1:" + clientPorts[i],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+        }
+        waitForAll(zk, States.CONNECTED);
+        LOG.info("all servers started");
+
+        String nodePath = "/testInconsistentDueToNewLeader";
+
+        int leaderId = -1;
+        int followerA = -1;
+        for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+            if (mt[i].main.quorumPeer.leader != null) {
+                leaderId = i;
+            } else if (followerA == -1) {
+                followerA = i;
+            }
+        }
+        LOG.info("shutdown follower {}", followerA);
+        mt[followerA].shutdown();
+        waitForOne(zk[followerA], States.CONNECTING);
+
+        try {
+            // 2. set force snapshot to be true
+            LOG.info("force snapshot sync");
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "true");
+
+            // 3. create a node
+            String initialValue = "1";
+            final ZooKeeper leaderZk = zk[leaderId];
+            leaderZk.create(nodePath, initialValue.getBytes(), Ids.OPEN_ACL_UNSAFE,
+                    CreateMode.PERSISTENT);
+            LOG.info("created node {} with value {}", nodePath, initialValue);
+
+            CustomQuorumPeer leaderQuorumPeer =
+                    (CustomQuorumPeer) mt[leaderId].main.quorumPeer;
+
+            // 4. on the customized leader catch the startForwarding call
+            //    (without synchronized), set the node to value v1, then
+            //    call the super.startForwarding to generate the ongoing
+            //    txn proposal and commit for v1 value update
+            leaderQuorumPeer.setStartForwardingListener(
+                    new StartForwardingListener() {
+                @Override
+                public void start() {
+                    if (!Boolean.getBoolean(LearnerHandler.FORCE_SNAP_SYNC)) {
+                        return;
+                    }
+                    final String value = "2";
+                    LOG.info("start forwarding, set {} to {}", nodePath, value);
+                    // use async, otherwise it will block the logLock in
+                    // ZKDatabase and the setData request will timeout
+                    try {
+                        leaderZk.setData(nodePath, value.getBytes(), -1,
+                                new AsyncCallback.StatCallback() {
+                            public void processResult(int rc, String path,
+                                   Object ctx, Stat stat) {}
+                        }, null);
+                        // wait for the setData txn being populated
+                        Thread.sleep(1000);
+                    } catch (Exception e) {
+                        LOG.error("error when set {} to {}", nodePath, value, e);
+                    }
+                }
+            });
+
+            // 5. on the customized leader catch the beginSnapshot call in
+            //    LearnerSnapshotThrottler to set the node to value v2,
+            //    wait it hit data tree
+            leaderQuorumPeer.setBeginSnapshotListener(new BeginSnapshotListener() {
+                @Override
+                public void start() {
+                    String value = "3";
+                    LOG.info("before sending snapshot, set {} to {}",
+                            nodePath, value);
+                    try {
+                        leaderZk.setData(nodePath, value.getBytes(), -1);
+                        LOG.info("successfully set {} to {}", nodePath, value);
+                    } catch (Exception e) {
+                        LOG.error("error when set {} to {}, {}", nodePath, value, e);
+                    }
+                }
+            });
+
+            // 6. exit follower A after taking snapshot
+            CustomQuorumPeer followerAQuorumPeer =
+                    ((CustomQuorumPeer) mt[followerA].main.quorumPeer);
+            LOG.info("set exit when ack new leader packet on {}", followerA);
+            contexts[followerA].exitWhenAckNewLeader = true;
+            CountDownLatch latch = new CountDownLatch(1);
+            final MainThread followerAMT = mt[followerA];
+            contexts[followerA].newLeaderAckCallback = new NewLeaderAckCallback() {
+                @Override
+                public void start() {
+                    try {
+                        latch.countDown();
+                        followerAMT.shutdown();
+                    } catch (Exception e) {}
+                }
+            };
+
+            // 7. start follower A to do snapshot sync
+            LOG.info("starting follower {}", followerA);
+            mt[followerA].start();
+            Assert.assertTrue(latch.await(30, TimeUnit.SECONDS));
+
+            // 8. now we have invalid data on disk, let's load it and verify
+            LOG.info("disable exit when ack new leader packet on {}", followerA);
+            System.setProperty(LearnerHandler.FORCE_SNAP_SYNC, "false");
+            contexts[followerA].exitWhenAckNewLeader = true;
+            contexts[followerA].newLeaderAckCallback = null;
+
+            LOG.info("restarting follower {}", followerA);
+            mt[followerA].start();
+            zk[followerA].close();
+
+            zk[followerA] = new ZooKeeper("127.0.0.1:" + clientPorts[followerA],
+                    ClientBase.CONNECTION_TIMEOUT, this);
+
+            // 9. start follower A, after it's in broadcast state, make sure
+            //    the node value is same as what we have on leader
+            waitForOne(zk[followerA], States.CONNECTED);
+            Assert.assertEquals(
+                new String(zk[followerA].getData(nodePath, null, null)),
+                new String(zk[leaderId].getData(nodePath, null, null))
+            );
+        } finally {
+            System.clearProperty(LearnerHandler.FORCE_SNAP_SYNC);
+            for (int i = 0; i < ENSEMBLE_SERVERS; i++) {
+                mt[i].shutdown();
+                zk[i].close();
+            }
+        }
+    }
+
+    static class Context {
+        boolean quitFollowing = false;
+        boolean exitWhenAckNewLeader = false;
+        NewLeaderAckCallback newLeaderAckCallback = null;
+    }
+
+    static interface NewLeaderAckCallback {
+        public void start();
+    }
+
+    static interface StartForwardingListener {
+        public void start();
+    }
+
+    static interface BeginSnapshotListener {
+        public void start();
+    }
+
+    static class CustomizedQPMain extends TestQPMain {
+
+        private Context context;
+
+        public CustomizedQPMain(Context context) {
+            this.context = context;
+        }
+
+        @Override
+        protected QuorumPeer getQuorumPeer() throws SaslException {
+            return new CustomQuorumPeer(context);
+        }
+    }
+
+    static class CustomQuorumPeer extends QuorumPeer {
+        private Context context;
+
+        private StartForwardingListener startForwardingListener;
+        private BeginSnapshotListener beginSnapshotListener;
+
+        public CustomQuorumPeer(Context context)
+                throws SaslException {
+            this.context = context;
+        }
+
+        public void setStartForwardingListener(
+                StartForwardingListener startForwardingListener) {
+            this.startForwardingListener = startForwardingListener;
+        }
+
+        public void setBeginSnapshotListener(
+                BeginSnapshotListener beginSnapshotListener) {
+            this.beginSnapshotListener = beginSnapshotListener;
+        }
+
+        @Override
+        protected Follower makeFollower(FileTxnSnapLog logFactory)
+                throws IOException {
+            return new Follower(this, new FollowerZooKeeperServer(logFactory,
+                    this, this.getZkDb())) {
+
+                @Override
+                void writePacket(QuorumPacket pp, boolean flush) throws IOException {
+                    if (pp != null && pp.getType() == Leader.ACK
+                            && context.exitWhenAckNewLeader) {
+                        if (context.newLeaderAckCallback != null) {
+                            context.newLeaderAckCallback.start();
+                        }
+                    }
+                    super.writePacket(pp, flush);
+                }
+            };
+        }
+
+        @Override
+        protected Leader makeLeader(FileTxnSnapLog logFactory) throws IOException {
+            return new Leader(this, new LeaderZooKeeperServer(logFactory,
+                    this, this.getZkDb())) {
+                @Override
+                public long startForwarding(LearnerHandler handler,
+                        long lastSeenZxid) {
+                    if (startForwardingListener != null) {
+                        startForwardingListener.start();
+                    }
+                    return super.startForwarding(handler, lastSeenZxid);
+                }
+
+                @Override
+                public LearnerSnapshotThrottler createLearnerSnapshotThrottler(
+                        int maxConcurrentSnapshots, long maxConcurrentSnapshotTimeout) {
+                    return new LearnerSnapshotThrottler(
+                            maxConcurrentSnapshots, maxConcurrentSnapshotTimeout) {
+
+                        @Override
+                        public LearnerSnapshot beginSnapshot(boolean essential)
+                                throws SnapshotThrottleException, InterruptedException {
+                            if (beginSnapshotListener != null) {
+                                beginSnapshotListener.start();
+                            }
+                            return super.beginSnapshot(essential);
+                        }
+                    };
+                }
+            };
+        }
+    }
 }
 }