Jelajahi Sumber

ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after cluster restart. (Benjamin Reed via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/zookeeper/trunk@1238176 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 13 tahun lalu
induk
melakukan
73af50a8a9

+ 4 - 1
CHANGES.txt

@@ -104,7 +104,10 @@ BUGFIXES:
 
 
   ZOOKEEPER-973. bind() could fail on Leader because it does not
   ZOOKEEPER-973. bind() could fail on Leader because it does not
   setReuseAddress on its ServerSocket (Harsh J via phunt)
   setReuseAddress on its ServerSocket (Harsh J via phunt)
-  
+ 
+  ZOOKEEPER-1367. Data inconsistencies and unexpired ephemeral nodes after cluster restart.
+  (Benjamin Reed via mahadev)
+ 
 IMPROVEMENTS:
 IMPROVEMENTS:
 
 
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,
   ZOOKEEPER-1170. Fix compiler (eclipse) warnings: unused imports,

+ 5 - 14
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -54,6 +54,7 @@ import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
 import org.apache.zookeeper.txn.ErrorTxn;
+import org.apache.zookeeper.txn.TxnHeader;
 
 
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.OpResult;
 import org.apache.zookeeper.OpResult.CheckResult;
 import org.apache.zookeeper.OpResult.CheckResult;
@@ -107,20 +108,10 @@ public class FinalRequestProcessor implements RequestProcessor {
                 }
                 }
             }
             }
             if (request.getHdr() != null) {
             if (request.getHdr() != null) {
-                rc = zks.getZKDatabase().processTxn(request.getHdr(), request.getTxn());
-                if (request.type == OpCode.createSession) {
-                    if (request.getTxn() instanceof CreateSessionTxn) {
-                        CreateSessionTxn cst = (CreateSessionTxn) request.getTxn();
-                        zks.sessionTracker.addSession(request.sessionId, cst
-                                .getTimeOut());
-                    } else {
-                        LOG.warn("*****>>>>> Got "
-                                + request.getTxn().getClass() + " "
-                                + request.getTxn().toString());
-                    }
-                } else if (request.type == OpCode.closeSession) {
-                    zks.sessionTracker.removeSession(request.sessionId);
-                }
+                TxnHeader hdr = request.getHdr();
+                Record txn = request.getTxn();
+                
+                rc = zks.processTxn(hdr, txn);
             }
             }
             // do not add non quorum packets to the queue.
             // do not add non quorum packets to the queue.
             if (Request.isQuorum(request.type)) {
             if (Request.isQuorum(request.type)) {

+ 34 - 4
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -53,6 +53,7 @@ import org.apache.zookeeper.proto.GetSASLRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.proto.SetSASLResponse;
 import org.apache.zookeeper.proto.SetSASLResponse;
+import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
 import org.apache.zookeeper.server.ServerCnxn.CloseRequestException;
 import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
@@ -60,6 +61,9 @@ import org.apache.zookeeper.server.auth.AuthenticationProvider;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.auth.ProviderRegistry;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
 import org.apache.zookeeper.server.quorum.ReadOnlyZooKeeperServer;
+import org.apache.zookeeper.txn.CreateSessionTxn;
+import org.apache.zookeeper.txn.TxnHeader;
+
 import javax.security.sasl.SaslException;
 import javax.security.sasl.SaslException;
 
 
 /**
 /**
@@ -87,7 +91,6 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     protected int maxSessionTimeout = -1;
     protected int maxSessionTimeout = -1;
     protected SessionTracker sessionTracker;
     protected SessionTracker sessionTracker;
     private FileTxnSnapLog txnLogFactory = null;
     private FileTxnSnapLog txnLogFactory = null;
-    private ConcurrentHashMap<Long, Integer> sessionsWithTimeouts;
     private ZKDatabase zkDb;
     private ZKDatabase zkDb;
     protected long hzxid = 0;
     protected long hzxid = 0;
     public final static Exception ok = new Exception("No prob");
     public final static Exception ok = new Exception("No prob");
@@ -228,8 +231,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         // Clean up dead sessions
         // Clean up dead sessions
         LinkedList<Long> deadSessions = new LinkedList<Long>();
         LinkedList<Long> deadSessions = new LinkedList<Long>();
         for (Long session : zkDb.getSessions()) {
         for (Long session : zkDb.getSessions()) {
-            sessionsWithTimeouts = zkDb.getSessionWithTimeOuts();
-            if (sessionsWithTimeouts.get(session) == null) {
+            if (zkDb.getSessionWithTimeOuts().get(session) == null) {
                 deadSessions.add(session);
                 deadSessions.add(session);
             }
             }
         }
         }
@@ -357,7 +359,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
     }
 
 
     public void startup() {
     public void startup() {
-        createSessionTracker();
+        if (sessionTracker == null) {
+            createSessionTracker();
+        }
+        startSessionTracker();
         setupRequestProcessors();
         setupRequestProcessors();
 
 
         registerJMX();
         registerJMX();
@@ -380,6 +385,9 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     protected void createSessionTracker() {
     protected void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
         sessionTracker = new SessionTrackerImpl(this, zkDb.getSessionWithTimeOuts(),
                 tickTime, 1);
                 tickTime, 1);
+    }
+    
+    protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
         ((SessionTrackerImpl)sessionTracker).start();
     }
     }
 
 
@@ -918,4 +926,26 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         // wrap SASL response token to client inside a Response object.
         // wrap SASL response token to client inside a Response object.
         return new SetSASLResponse(responseToken);
         return new SetSASLResponse(responseToken);
     }
     }
+    
+    public ProcessTxnResult processTxn(TxnHeader hdr, Record txn) {
+        ProcessTxnResult rc;
+        int opCode = hdr.getType();
+        long sessionId = hdr.getClientId();
+        rc = getZKDatabase().processTxn(hdr, txn);
+        if (opCode == OpCode.createSession) {
+            if (txn instanceof CreateSessionTxn) {
+                CreateSessionTxn cst = (CreateSessionTxn) txn;
+                sessionTracker.addSession(sessionId, cst
+                        .getTimeOut());
+            } else {
+                LOG.warn("*****>>>>> Got "
+                        + txn.getClass() + " "
+                        + txn.toString());
+            }
+        } else if (opCode == OpCode.closeSession) {
+            sessionTracker.removeSession(sessionId);
+        }
+        return rc;
+    }
+
 }
 }

+ 5 - 1
src/java/main/org/apache/zookeeper/server/quorum/LeaderZooKeeperServer.java

@@ -74,9 +74,13 @@ public class LeaderZooKeeperServer extends QuorumZooKeeperServer {
     }
     }
 
 
     @Override
     @Override
-    protected void createSessionTracker() {
+    public void createSessionTracker() {
         sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
         sessionTracker = new SessionTrackerImpl(this, getZKDatabase().getSessionWithTimeOuts(),
                 tickTime, self.getId());
                 tickTime, self.getId());
+    }
+    
+    @Override
+    protected void startSessionTracker() {
         ((SessionTrackerImpl)sessionTracker).start();
         ((SessionTrackerImpl)sessionTracker).start();
     }
     }
 
 

+ 4 - 3
src/java/main/org/apache/zookeeper/server/quorum/Learner.java

@@ -352,7 +352,8 @@ public class Learner {
 
 
             }
             }
             zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
             zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());
-                        
+            zk.createSessionTracker();            
+            
             long lastQueued = 0;
             long lastQueued = 0;
 
 
             // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
             // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
@@ -383,7 +384,7 @@ public class Learner {
                         if (pif.hdr.getZxid() != qp.getZxid()) {
                         if (pif.hdr.getZxid() != qp.getZxid()) {
                             LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                             LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                         } else {
                         } else {
-                            zk.getZKDatabase().processTxn(pif.hdr, pif.rec);
+                            zk.processTxn(pif.hdr, pif.rec);
                             packetsNotCommitted.remove();
                             packetsNotCommitted.remove();
                         }
                         }
                     } else {
                     } else {
@@ -393,7 +394,7 @@ public class Learner {
                 case Leader.INFORM:
                 case Leader.INFORM:
                     TxnHeader hdr = new TxnHeader();
                     TxnHeader hdr = new TxnHeader();
                     Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
                     Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
-                    zk.getZKDatabase().processTxn(hdr, txn);
+                    zk.processTxn(hdr, txn);
                     break;
                     break;
                 case Leader.UPTODATE:
                 case Leader.UPTODATE:
                     if (!snapshotTaken) { // true for the pre v1.0 case
                     if (!snapshotTaken) { // true for the pre v1.0 case

+ 4 - 1
src/java/main/org/apache/zookeeper/server/quorum/LearnerZooKeeperServer.java

@@ -69,11 +69,14 @@ public abstract class LearnerZooKeeperServer extends QuorumZooKeeperServer {
     }    
     }    
     
     
     @Override
     @Override
-    protected void createSessionTracker() {
+    public void createSessionTracker() {
         sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
         sessionTracker = new LearnerSessionTracker(this, getZKDatabase().getSessionWithTimeOuts(),
                 self.getId());
                 self.getId());
     }
     }
     
     
+    @Override
+    protected void startSessionTracker() {}
+    
     @Override
     @Override
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
     protected void revalidateSession(ServerCnxn cnxn, long sessionId,
             int sessionTimeout) throws IOException {
             int sessionTimeout) throws IOException {

+ 3 - 3
src/java/test/org/apache/zookeeper/server/quorum/LearnerTest.java

@@ -44,8 +44,8 @@ public class LearnerTest extends ZKTestCase {
 
 
         Learner learner;
         Learner learner;
 
 
-        public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl) throws IOException {
-            super(ftsl, 2000, 2000, 2000, new ZKDatabase(ftsl), null);
+        public SimpleLearnerZooKeeperServer(FileTxnSnapLog ftsl, QuorumPeer self) throws IOException {
+            super(ftsl, 2000, 2000, 2000, new ZKDatabase(ftsl), self);
         }
         }
 
 
         @Override
         @Override
@@ -57,7 +57,7 @@ public class LearnerTest extends ZKTestCase {
     static class SimpleLearner extends Learner {
     static class SimpleLearner extends Learner {
         SimpleLearner(FileTxnSnapLog ftsl) throws IOException {
         SimpleLearner(FileTxnSnapLog ftsl) throws IOException {
             self = new QuorumPeer();
             self = new QuorumPeer();
-            zk = new SimpleLearnerZooKeeperServer(ftsl);
+            zk = new SimpleLearnerZooKeeperServer(ftsl, self);
             ((SimpleLearnerZooKeeperServer) zk).learner = this;
             ((SimpleLearnerZooKeeperServer) zk).learner = this;
         }
         }
     }
     }

+ 105 - 0
src/java/test/org/apache/zookeeper/server/quorum/Zab1_0Test.java

@@ -52,6 +52,7 @@ import org.apache.zookeeper.server.quorum.Leader;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
 import org.apache.zookeeper.server.util.ZxidUtils;
 import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.CreateSessionTxn;
 import org.apache.zookeeper.txn.CreateTxn;
 import org.apache.zookeeper.txn.CreateTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.SetDataTxn;
 import org.apache.zookeeper.txn.TxnHeader;
 import org.apache.zookeeper.txn.TxnHeader;
@@ -548,6 +549,110 @@ public class Zab1_0Test {
         });
         });
     }
     }
     
     
+    @Test
+    public void testNormalFollowerRunWithDiff() throws Exception {
+        testFollowerConversation(new FollowerConversation() {
+            @Override
+            public void converseWithFollower(InputArchive ia, OutputArchive oa,
+                    Follower f) throws Exception {
+                File tmpDir = File.createTempFile("test", "dir");
+                tmpDir.delete();
+                tmpDir.mkdir();
+                File logDir = f.fzk.getTxnLogFactory().getDataDir().getParentFile();
+                File snapDir = f.fzk.getTxnLogFactory().getSnapDir().getParentFile();
+                try {
+                    Assert.assertEquals(0, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, f.self.getCurrentEpoch());
+
+                    // Setup a database with a single /foo node
+                    ZKDatabase zkDb = new ZKDatabase(new FileTxnSnapLog(tmpDir, tmpDir));
+                    final long firstZxid = ZxidUtils.makeZxid(1, 1);
+                    zkDb.processTxn(new TxnHeader(13, 1313, firstZxid, 33, ZooDefs.OpCode.create), new CreateTxn("/foo", "data1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 1));
+                    Stat stat = new Stat();
+                    Assert.assertEquals("data1", new String(zkDb.getData("/foo", stat, null)));
+
+                    QuorumPacket qp = new QuorumPacket();
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.FOLLOWERINFO, qp.getType());
+                    Assert.assertEquals(qp.getZxid(), 0);
+                    LearnerInfo learnInfo = new LearnerInfo();
+                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(qp.getData()), learnInfo);
+                    Assert.assertEquals(learnInfo.getProtocolVersion(), 0x10000);
+                    Assert.assertEquals(learnInfo.getServerid(), 0);
+                
+                    // We are simulating an established leader, so the epoch is 1
+                    qp.setType(Leader.LEADERINFO);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    byte protoBytes[] = new byte[4];
+                    ByteBuffer.wrap(protoBytes).putInt(0x10000);
+                    qp.setData(protoBytes);
+                    oa.writeRecord(qp, null);
+                
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACKEPOCH, qp.getType());
+                    Assert.assertEquals(0, qp.getZxid());
+                    Assert.assertEquals(ZxidUtils.makeZxid(0, 0), ByteBuffer.wrap(qp.getData()).getInt());
+                    Assert.assertEquals(1, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(0, f.self.getCurrentEpoch());
+                    
+                    // Send a diff
+                    qp.setType(Leader.DIFF);
+                    qp.setData(new byte[0]);
+                    qp.setZxid(zkDb.getDataTreeLastProcessedZxid());
+                    oa.writeRecord(qp, null);
+                    final long createSessionZxid = ZxidUtils.makeZxid(1, 2);
+                    proposeNewSession(qp, createSessionZxid, 0x333);
+                    oa.writeRecord(qp, null);
+                    qp.setType(Leader.COMMIT);
+                    qp.setZxid(createSessionZxid);
+                    oa.writeRecord(qp, null);
+                    qp.setType(Leader.NEWLEADER);
+                    qp.setZxid(ZxidUtils.makeZxid(1, 0));
+                    oa.writeRecord(qp, null);
+                    qp.setType(Leader.UPTODATE);
+                    qp.setZxid(0);
+                    oa.writeRecord(qp, null);
+                    
+                    // Read the uptodate ack
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    
+                  
+                    // Get the ack of the new leader
+                    readPacketSkippingPing(ia, qp);
+                    Assert.assertEquals(Leader.ACK, qp.getType());
+                    Assert.assertEquals(ZxidUtils.makeZxid(1, 0), qp.getZxid());
+                    Assert.assertEquals(1, f.self.getAcceptedEpoch());
+                    Assert.assertEquals(1, f.self.getCurrentEpoch());
+                    
+                    Assert.assertEquals(createSessionZxid, f.fzk.getLastProcessedZxid());
+                    
+                    // Make sure the data was recorded in the filesystem ok
+                    ZKDatabase zkDb2 = new ZKDatabase(new FileTxnSnapLog(logDir, snapDir));
+                    zkDb2.loadDataBase();
+                    System.out.println(zkDb2.getSessions());
+                    Assert.assertNotNull(zkDb2.getSessionWithTimeOuts().get(4L));
+                } finally {
+                    recursiveDelete(tmpDir);
+                }
+                
+            }
+
+            private void proposeNewSession(QuorumPacket qp, long zxid, long sessionId) throws IOException {
+                qp.setType(Leader.PROPOSAL);
+                qp.setZxid(zxid);
+                TxnHeader hdr = new TxnHeader(4, 1414, qp.getZxid(), 55, ZooDefs.OpCode.createSession);
+                CreateSessionTxn cst = new CreateSessionTxn(30000);
+                ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                OutputArchive boa = BinaryOutputArchive.getArchive(baos);
+                boa.writeRecord(hdr, null);
+                boa.writeRecord(cst, null);
+                qp.setData(baos.toByteArray());
+            }
+        });
+    }
+    
     @Test
     @Test
     public void testNormalRun() throws Exception {
     public void testNormalRun() throws Exception {
         testLeaderConversation(new LeaderConversation() {
         testLeaderConversation(new LeaderConversation() {