瀏覽代碼

ZOOKEEPER-108. Fix sync operation reordering on a Quorum. (Flavio Paiva Junqueira via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@683707 13f79535-47bb-0310-9956-ffa450edef68
Mahadev Konar 17 年之前
父節點
當前提交
8a22ee2a08

+ 1 - 0
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -158,6 +158,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 err = rc.err;
                 break;
             case OpCode.sync:
+                LOG.debug("OpCode.sync " + request);
                 SyncRequest syncRequest = new SyncRequest();
                 ZooKeeperServer.byteBuffer2Record(request.request,
                         syncRequest);

+ 2 - 6
src/java/main/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -47,11 +47,6 @@ public class CommitProcessor extends Thread implements RequestProcessor {
      */
     LinkedList<Request> committedRequests = new LinkedList<Request>();
 
-    /*
-     * Pending sync requests
-     */
-    LinkedList<Request> pendingSyncs = new LinkedList<Request>();
-
     RequestProcessor nextProcessor;
 
     public CommitProcessor(RequestProcessor nextProcessor) {
@@ -127,7 +122,7 @@ public class CommitProcessor extends Thread implements RequestProcessor {
                             break;
                         case OpCode.sync:
                             nextPending = request;
-                            pendingSyncs.add(request);
+                            //pendingSyncs.add(request);
                             break;
                         default:
                             toProcess.add(request);
@@ -149,6 +144,7 @@ public class CommitProcessor extends Thread implements RequestProcessor {
                          new Exception("committing a null! "));
                 return;
             }
+            LOG.debug("Committing" + request.cxid);
             committedRequests.add(request);
             notifyAll();
         }

+ 2 - 1
src/java/main/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java

@@ -64,10 +64,11 @@ public class FollowerRequestProcessor extends Thread implements
                 // the response
                 nextProcessor.processRequest(request);
                 switch (request.type) {
+                case OpCode.sync:
+                    zks.pendingSyncs.add(request);
                 case OpCode.create:
                 case OpCode.delete:
                 case OpCode.setData:
-                case OpCode.sync:
                 case OpCode.setACL:
                 case OpCode.createSession:
                 case OpCode.closeSession:

+ 9 - 2
src/java/main/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.log4j.Logger;
 
@@ -50,6 +51,11 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
 
     SyncRequestProcessor syncProcessor;
 
+    /*
+     * Pending sync requests
+     */
+    ConcurrentLinkedQueue<Request> pendingSyncs;
+    
     /**
      * @param port
      * @param dataDir
@@ -59,6 +65,7 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
             QuorumPeer self,DataTreeBuilder treeBuilder) throws IOException {
         super(dataDir, dataLogDir, self.tickTime,treeBuilder);
         this.self = self;
+        this.pendingSyncs = new ConcurrentLinkedQueue<Request>();
     }
 
     public Follower getFollower(){
@@ -131,12 +138,12 @@ public class FollowerZooKeeperServer extends ZooKeeperServer {
     }
     
     public void sync(){
-        if(commitProcessor.pendingSyncs.size() ==0){
+        if(pendingSyncs.size() ==0){
             LOG.warn("Not expecting a sync.");
             return;
         }
                 
-        commitProcessor.commit(commitProcessor.pendingSyncs.remove());
+        commitProcessor.commit(pendingSyncs.remove());
     }
              
          

+ 13 - 8
src/java/main/org/apache/zookeeper/server/quorum/Leader.java

@@ -373,6 +373,11 @@ public class Leader {
                         }
                         commit(zxid);
                         zk.commitProcessor.commit(p.request);
+                        if(pendingSyncs.containsKey(zxid)){
+                            sendSync(syncHandler.get(pendingSyncs.get(zxid).sessionId), pendingSyncs.get(zxid));
+                            syncHandler.remove(pendingSyncs.get(zxid));
+                            pendingSyncs.remove(zxid);
+                        }
                     }
                 }
                 return;
@@ -462,12 +467,6 @@ public class Leader {
         lastCommitted = zxid;
         QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);
         sendPacket(qp);
-               
-        if(pendingSyncs.containsKey(zxid)){
-            sendSync(syncHandler.get(pendingSyncs.get(zxid).sessionId), pendingSyncs.get(zxid));
-            syncHandler.remove(pendingSyncs.get(zxid));
-            pendingSyncs.remove(zxid);
-        }
     }
 
     long lastProposed;
@@ -544,8 +543,14 @@ public class Leader {
      */
             
     public void sendSync(FollowerHandler f, Request r){
-        QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
-        f.queuePacket(qp);
+        if(f != null){
+            QuorumPacket qp = new QuorumPacket(Leader.SYNC, 0, null, null);
+            f.queuePacket(qp);
+        }
+        else{
+            LOG.warn("Committing sync: " + r.cxid );
+            zk.commitProcessor.commit(r);
+        }
     }
                 
     /**

+ 13 - 5
src/java/main/org/apache/zookeeper/server/quorum/ProposalRequestProcessor.java

@@ -48,14 +48,22 @@ public class ProposalRequestProcessor implements RequestProcessor {
         // request.addRQRec(">prop");
                 
         
+        /* In the following IF-THEN-ELSE block, we process syncs on the leader. 
+         * If the sync is coming from a follower, then the follower
+         * handler adds it to syncHandler. Otherwise, if it is a client of
+         * the leader that issued the sync command, then syncHandler won't 
+         * contain the handler. In this case, we add it to syncHandler, and 
+         * call processRequest on the next processor.
+         */
+        
         if(request.type == ZooDefs.OpCode.sync){
-            if(zks.getLeader().syncHandler.containsKey(request.sessionId)){
-                zks.getLeader().processSync(request);
-            }
-            else{
+            zks.getLeader().processSync(request);
+
+            if(!zks.getLeader().syncHandler.containsKey(request.sessionId)){
+                zks.getLeader().syncHandler.put(request.sessionId, null);
                 nextProcessor.processRequest(request);
-                zks.commitProcessor.commit(request);
             }
+            
         }
         else{
             nextProcessor.processRequest(request);