Преглед на файлове

ZOOKEEPER-1343. getEpochToPropose should check if lastAcceptedEpoch is greater or equal than epoch

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1227000 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed преди 13 години
родител
ревизия
1a556bd13b
променени са 3 файла, в които са добавени 133 реда и са изтрити 49 реда
  1. 2 0
      CHANGES.txt
  2. 47 47
      src/java/main/org/apache/zookeeper/server/quorum/Leader.java
  3. 84 2
      src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

+ 2 - 0
CHANGES.txt

@@ -89,6 +89,8 @@ BUGFIXES:
 
   ZOOKEEPER-1089. zkServer.sh status does not work due to invalid
   option of nc (Roman Shaposhnik via phunt)
+
+  ZOOKEEPER-1343. getEpochToPropose should check if lastAcceptedEpoch is greater or equal than epoch (fpj via breed)
  
 IMPROVEMENTS:
 

+ 47 - 47
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -410,7 +410,7 @@ public class Leader {
         }
     }
 
-	boolean isShutdown;
+    boolean isShutdown;
 
     /**
      * Close down all the LearnerHandlers
@@ -762,54 +762,54 @@ public class Leader {
     }
 
     private final HashSet<Long> connectingFollowers = new HashSet<Long>();
-	public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
-		synchronized(connectingFollowers) {
-			if (!waitingForNewEpoch) {
-				return epoch;
-			}
-			if (lastAcceptedEpoch > epoch) {
-				epoch = lastAcceptedEpoch+1;
-			}
-			connectingFollowers.add(sid);
-			QuorumVerifier verifier = self.getQuorumVerifier();
-			if (connectingFollowers.contains(self.getId()) && verifier.containsQuorum(connectingFollowers)) 
-{
-				waitingForNewEpoch = false;
-				self.setAcceptedEpoch(epoch);
-				connectingFollowers.notifyAll();
-			} else {
-                   long start = System.currentTimeMillis();
-                   long cur = start;
+    public long getEpochToPropose(long sid, long lastAcceptedEpoch) throws InterruptedException, IOException {
+        synchronized(connectingFollowers) {
+            if (!waitingForNewEpoch) {
+                return epoch;
+            }
+            if (lastAcceptedEpoch >= epoch) {
+                epoch = lastAcceptedEpoch+1;
+            }
+            connectingFollowers.add(sid);
+            QuorumVerifier verifier = self.getQuorumVerifier();
+            if (connectingFollowers.contains(self.getId()) && 
+                                            verifier.containsQuorum(connectingFollowers)) {
+                waitingForNewEpoch = false;
+                self.setAcceptedEpoch(epoch);
+                connectingFollowers.notifyAll();
+            } else {
+                long start = System.currentTimeMillis();
+                long cur = start;
                 long end = start + self.getInitLimit()*self.getTickTime();
                 while(waitingForNewEpoch && cur < end) {
                     connectingFollowers.wait(end - cur);
                     cur = System.currentTimeMillis();
                 }
-				if (waitingForNewEpoch) {
+                if (waitingForNewEpoch) {
                     throw new InterruptedException("Timeout while waiting for epoch from quorum");
-				}
-			}
-			return epoch;
-		}
-	}
-
-	private final HashSet<Long> electingFollowers = new HashSet<Long>();
-	private boolean electionFinished = false;
-	public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
-		synchronized(electingFollowers) {
-			if (electionFinished) {
-				return;
-			}
-			if (ss.getCurrentEpoch() != -1) {
-				if (ss.isMoreRecentThan(leaderStateSummary)) {
-					throw new IOException("Follower is ahead of the leader");
-				}
-				electingFollowers.add(id);
-			}
-			QuorumVerifier verifier = self.getQuorumVerifier();
-			if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
-				electionFinished = true;
-				electingFollowers.notifyAll();
+                }
+            }
+            return epoch;
+        }
+    }
+
+    private final HashSet<Long> electingFollowers = new HashSet<Long>();
+    private boolean electionFinished = false;
+    public void waitForEpochAck(long id, StateSummary ss) throws IOException, InterruptedException {
+        synchronized(electingFollowers) {
+            if (electionFinished) {
+                return;
+            }
+            if (ss.getCurrentEpoch() != -1) {
+                if (ss.isMoreRecentThan(leaderStateSummary)) {
+                    throw new IOException("Follower is ahead of the leader");
+                }
+                electingFollowers.add(id);
+            }
+            QuorumVerifier verifier = self.getQuorumVerifier();
+            if (electingFollowers.contains(self.getId()) && verifier.containsQuorum(electingFollowers)) {
+                electionFinished = true;
+                electingFollowers.notifyAll();
             } else {
                 long start = System.currentTimeMillis();
                 long cur = start;
@@ -820,8 +820,8 @@ public class Leader {
                 }
                 if (!electionFinished) {
                     throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
-				}
-			}
-		}
-	}
+                }
+            }
+        }
+    }
 }

+ 84 - 2
src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

@@ -48,6 +48,7 @@ import org.apache.zookeeper.server.ServerCnxnFactory;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.util.ZxidUtils;
@@ -76,6 +77,26 @@ public class Zab1_0Test {
         }
     }
 
+    
+   private static final class MockLeader extends Leader {
+       
+       MockLeader(QuorumPeer qp, LeaderZooKeeperServer zk)
+       throws IOException {
+           super(qp, zk);
+       }
+       
+       /**
+        * This method returns the value of the variable that holds the epoch
+        * to be proposed and that has been proposed, depending on the point
+        * of the execution in which it is called. 
+        * 
+        * @return epoch
+        */
+       public long getCurrentEpochToPropose() {
+           return epoch;
+       }
+   }
+   
    public static final class FollowerMockThread extends Thread {
     	private final Leader leader;
     	private final long followerSid;
@@ -144,6 +165,55 @@ public class Zab1_0Test {
         }
     }
     
+    /**
+     * In this test, the leader sets the last accepted epoch to 5. The call
+     * to getEpochToPropose should set epoch to 6 and wait until another 
+     * follower executes it. If in getEpochToPropose we don't check if
+     * lastAcceptedEpoch == epoch, then the call from the subsequent
+     * follower with lastAcceptedEpoch = 6 doesn't change the value
+     * of epoch, and the test fails. It passes with the fix to predicate.
+     * 
+     * {@link https://issues.apache.org/jira/browse/ZOOKEEPER-1343}
+     * 
+     * 
+     * @throws Exception
+     */
+    
+    @Test
+    public void testLastAcceptedEpoch() throws Exception {    
+        File tmpDir = File.createTempFile("test", "dir");
+        tmpDir.delete();
+        tmpDir.mkdir();
+        Leader leader = null;
+        LeadThread leadThread = null;
+        try {
+            QuorumPeer peer = createQuorumPeer(tmpDir);
+            leader = createMockLeader(tmpDir, peer);
+            peer.leader = leader;
+            peer.setAcceptedEpoch(5);
+            leadThread = new LeadThread(leader); 
+            leadThread.start();
+            
+            while(((MockLeader) leader).getCurrentEpochToPropose() != 6){
+                Thread.sleep(20);
+            }
+            
+            try {
+                long epoch = leader.getEpochToPropose(1, 6);
+                Assert.assertEquals("New proposed epoch is wrong", 7, epoch);  
+            } catch (Exception e){ 
+                Assert.fail("Timed out in getEpochToPropose");
+            }
+            
+        } finally {
+            recursiveDelete(tmpDir);
+            if (leader != null) {
+                leader.shutdown("end of test");
+            }
+        }
+    }
+    
+    
     @Test
     public void testLeaderInElectingFollowers() throws Exception {    
         File tmpDir = File.createTempFile("test", "dir");
@@ -632,6 +702,18 @@ public class Zab1_0Test {
     }
 
     private Leader createLeader(File tmpDir, QuorumPeer peer)
+    throws IOException, NoSuchFieldException, IllegalAccessException{
+        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+        return new Leader(peer, zk);
+    }
+    
+    private Leader createMockLeader(File tmpDir, QuorumPeer peer)
+    throws IOException, NoSuchFieldException, IllegalAccessException{
+        LeaderZooKeeperServer zk = prepareLeader(tmpDir, peer);
+        return new MockLeader(peer, zk);
+    }
+    
+    private LeaderZooKeeperServer prepareLeader(File tmpDir, QuorumPeer peer)
             throws IOException, NoSuchFieldException, IllegalAccessException {
         FileTxnSnapLog logFactory = new FileTxnSnapLog(tmpDir, tmpDir);
         peer.setTxnFactory(logFactory);
@@ -640,9 +722,9 @@ public class Zab1_0Test {
         addrField.set(peer, new InetSocketAddress(33556));
         ZKDatabase zkDb = new ZKDatabase(logFactory);
         LeaderZooKeeperServer zk = new LeaderZooKeeperServer(logFactory, peer, zkDb);
-        return new Leader(peer, zk);
+        return zk;
     }
-
+    
     static class ConversableFollower extends Follower {
 
         ConversableFollower(QuorumPeer self, FollowerZooKeeperServer zk) {