Forráskód Böngészése

ZOOKEEPER-1191. Synchronization issue - wait not in guarded block
ZOOKEEPER-1192. Leader.waitForEpochAck() checks waitingForNewEpoch instead of checking electionFinished


git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1173949 13f79535-47bb-0310-9956-ffa450edef68

Benjamin Reed 13 éve
szülő
commit
b182dd4d76

+ 4 - 0
CHANGES.txt

@@ -11,6 +11,10 @@ BUGFIXES:
   ZOOKEEPER-786. Exception in ZooKeeper.toString
   (Thomas Koch via phunt)
 
+  ZOOKEEPER-1191. Synchronization issue - wait not in guarded block (Alex Shraer via breed)
+
+  ZOOKEEPER-1192. Leader.waitForEpochAck() checks waitingForNewEpoch instead of checking electionFinished (Alex Shraer via breed)
+
 IMPROVEMENTS:
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

+ 18 - 6
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -781,9 +781,15 @@ public class Leader {
 				waitingForNewEpoch = false;
 				connectingFollowers.notifyAll();
 			} else {
-				connectingFollowers.wait(self.getInitLimit()*self.getTickTime());
+                   long start = System.currentTimeMillis();
+                   long cur = start;
+                long end = start + self.getInitLimit()*self.getTickTime();
+                while(waitingForNewEpoch && cur < end) {
+                    connectingFollowers.wait(end - cur);
+                    cur = System.currentTimeMillis();
+                }
 				if (waitingForNewEpoch) {
-					throw new InterruptedException("Out of time to propose an epoch");
+                    throw new InterruptedException("Timeout while waiting for epoch from quorum");        
 				}
 			}
 			return epoch;
@@ -807,10 +813,16 @@ public class Leader {
 			if (readyToStart && verifier.containsQuorum(electingFollowers)) {
 				electionFinished = true;
 				electingFollowers.notifyAll();
-			} else {
-				electingFollowers.wait(self.getInitLimit()*self.getTickTime());
-				if (waitingForNewEpoch) {
-					throw new InterruptedException("Out of time to propose an epoch");
+            } else {                
+                long start = System.currentTimeMillis();
+                long cur = start;
+                long end = start + self.getInitLimit()*self.getTickTime();
+                while(!electionFinished && cur < end) {
+                    electingFollowers.wait(end - cur);
+                    cur = System.currentTimeMillis();
+                }
+                if (!electionFinished) {
+                    throw new InterruptedException("Timeout while waiting for epoch to be acked by quorum");
 				}
 			}
 		}

+ 39 - 5
src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

@@ -44,7 +44,10 @@ import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServer.DataTreeBuilder;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.Leader;
+import org.apache.zookeeper.server.quorum.LearnerInfo;
+import org.apache.zookeeper.server.quorum.QuorumPacket;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
+import org.apache.zookeeper.server.quorum.Zab1_0Test.LeaderConversation;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.junit.Assert;
@@ -124,7 +127,7 @@ public class Zab1_0Test {
     }
     
     static public interface LeaderConversation {
-        void converseWithLeader(InputArchive ia, OutputArchive oa) throws Exception;
+        void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l) throws Exception;
     }
     
     static public interface FollowerConversation {
@@ -160,7 +163,7 @@ public class Zab1_0Test {
             OutputArchive oa = BinaryOutputArchive.getArchive(followerSocket
                     .getOutputStream());
 
-            conversation.converseWithLeader(ia, oa);
+            conversation.converseWithLeader(ia, oa, leader);
         } finally {
             recursiveDelete(tmpDir);
             if (leader != null) {
@@ -176,7 +179,7 @@ public class Zab1_0Test {
     @Test
     public void testNormalRun() throws Exception {
         testConversation(new LeaderConversation() {
-            public void converseWithLeader(InputArchive ia, OutputArchive oa)
+            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException {
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000);
@@ -209,7 +212,7 @@ public class Zab1_0Test {
     @Test
     public void testLeaderBehind() throws Exception {
         testConversation(new LeaderConversation() {
-            public void converseWithLeader(InputArchive ia, OutputArchive oa)
+            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
                     throws IOException {
                 /* we test a normal run. everything should work out well. */
                 LearnerInfo li = new LearnerInfo(1, 0x10000);
@@ -240,7 +243,38 @@ public class Zab1_0Test {
         });
     }
 
-
+    /**
+     * Tests that when a quorum of followers send LearnerInfo but do not ack the epoch (which is sent
+     * by the leader upon receipt of LearnerInfo from a quorum), the leader does not start using this epoch
+     * as it would in the normal case (when a quorum do ack the epoch). This tests ZK-1192
+     * @throws Exception
+     */
+    @Test
+    public void testAbandonBeforeACKEpoch() throws Exception {
+        testConversation(new LeaderConversation() {
+            public void converseWithLeader(InputArchive ia, OutputArchive oa, Leader l)
+                    throws IOException, InterruptedException {
+                /* we test a normal run. everything should work out well. */            	
+                LearnerInfo li = new LearnerInfo(1, 0x10000);
+                byte liBytes[] = new byte[12];
+                ByteBufferOutputStream.record2ByteBuffer(li,
+                        ByteBuffer.wrap(liBytes));
+                QuorumPacket qp = new QuorumPacket(Leader.FOLLOWERINFO, 0,
+                        liBytes, null);
+                oa.writeRecord(qp, null);
+                readPacketSkippingPing(ia, qp);
+                Assert.assertEquals(Leader.LEADERINFO, qp.getType());
+                Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                Assert.assertEquals(ByteBuffer.wrap(qp.getData()).getInt(),
+                        0x10000);                
+                Thread.sleep(l.self.getInitLimit()*l.self.getTickTime() + 5000);
+                
+                // The leader didn't get a quorum of acks - make sure that leader's current epoch is not advanced
+                Assert.assertEquals(0, l.self.getCurrentEpoch());			
+            }
+        });
+    }
+    
     private void recursiveDelete(File file) {
         if (file.isFile()) {
             file.delete();