Browse Source

ZOOKEEPER-2623: Fix database corruption caused by quorum check (#1988)

Kezhu Wang 1 year ago
parent
commit
b31f776471

+ 18 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -50,6 +50,7 @@ import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.proto.AddWatchRequest;
+import org.apache.zookeeper.proto.CheckVersionRequest;
 import org.apache.zookeeper.proto.CheckWatchesRequest;
 import org.apache.zookeeper.proto.Create2Response;
 import org.apache.zookeeper.proto.CreateResponse;
@@ -354,8 +355,10 @@ public class FinalRequestProcessor implements RequestProcessor {
             }
             case OpCode.check: {
                 lastOp = "CHEC";
-                rsp = new SetDataResponse(rc.stat);
-                err = Code.get(rc.err);
+                CheckVersionRequest checkVersionRequest = request.readRequestRecord(CheckVersionRequest::new);
+                path = checkVersionRequest.getPath();
+                handleCheckVersionRequest(checkVersionRequest, cnxn, request.authInfo);
+                requestPathMetricsCollector.registerRequest(request.type, path);
                 break;
             }
             case OpCode.exists: {
@@ -643,6 +646,19 @@ public class FinalRequestProcessor implements RequestProcessor {
         return new GetDataResponse(b, stat);
     }
 
+    private void handleCheckVersionRequest(CheckVersionRequest request, ServerCnxn cnxn, List<Id> authInfo) throws KeeperException {
+        String path = request.getPath();
+        DataNode n = zks.getZKDatabase().getNode(path);
+        if (n == null) {
+            throw new KeeperException.NoNodeException();
+        }
+        zks.checkACL(cnxn, zks.getZKDatabase().aclForNode(n), ZooDefs.Perms.READ, authInfo, path, null);
+        int version = request.getVersion();
+        if (version != -1 && version != n.stat.getVersion()) {
+            throw new KeeperException.BadVersionException(path);
+        }
+    }
+
     private boolean closeSession(ServerCnxnFactory serverCnxnFactory, long sessionId) {
         if (serverCnxnFactory == null) {
             return false;

+ 0 - 4
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -798,10 +798,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                 SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
                 pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
                 break;
-            case OpCode.check:
-                CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
-                pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
-                break;
             case OpCode.multi:
                 MultiOperationRecord multiRequest;
                 try {

+ 0 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -361,7 +361,6 @@ public class Request {
         case OpCode.deleteContainer:
         case OpCode.setACL:
         case OpCode.setData:
-        case OpCode.check:
         case OpCode.multi:
         case OpCode.reconfig:
             return true;

+ 0 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/CommitProcessor.java

@@ -181,7 +181,6 @@ public class CommitProcessor extends ZooKeeperCriticalThread implements RequestP
         case OpCode.reconfig:
         case OpCode.multi:
         case OpCode.setACL:
-        case OpCode.check:
             return true;
         case OpCode.sync:
             return matchSyncs;

+ 0 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerRequestProcessor.java

@@ -108,7 +108,6 @@ public class FollowerRequestProcessor extends ZooKeeperCriticalThread implements
                 case OpCode.reconfig:
                 case OpCode.setACL:
                 case OpCode.multi:
-                case OpCode.check:
                     zks.getFollower().request(request);
                     break;
                 case OpCode.createSession:

+ 0 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ObserverRequestProcessor.java

@@ -109,7 +109,6 @@ public class ObserverRequestProcessor extends ZooKeeperCriticalThread implements
                 case OpCode.reconfig:
                 case OpCode.setACL:
                 case OpCode.multi:
-                case OpCode.check:
                     zks.getObserver().request(request);
                     break;
                 case OpCode.createSession:

+ 0 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java

@@ -85,7 +85,6 @@ public class ReadOnlyRequestProcessor extends ZooKeeperCriticalThread implements
                 case OpCode.reconfig:
                 case OpCode.setACL:
                 case OpCode.multi:
-                case OpCode.check:
                     sendErrorResponse(request);
                     continue;
                 case OpCode.closeSession:

+ 0 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/util/RequestPathMetricsCollector.java

@@ -147,7 +147,6 @@ public class RequestPathMetricsCollector {
         case ZooDefs.OpCode.reconfig:
         case ZooDefs.OpCode.setACL:
         case ZooDefs.OpCode.multi:
-        case ZooDefs.OpCode.check:
             return true;
         }
         return false;

+ 114 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/CheckTest.java

@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.test;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.CheckVersionRequest;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.quorum.QuorumPeer;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+public class CheckTest extends ClientBase {
+
+    @BeforeEach
+    public void setUp(TestInfo testInfo) throws Exception {
+        if (testInfo.getDisplayName().contains("Cluster")) {
+            return;
+        }
+        super.setUp();
+    }
+
+    @AfterEach
+    public void tearDown(TestInfo testInfo) throws Exception {
+        if (testInfo.getDisplayName().contains("Cluster")) {
+            return;
+        }
+        super.tearDown();
+    }
+
+    @Override
+    public void setUp() throws Exception {
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+    }
+
+    private static void checkVersion(TestableZooKeeper zk, String path, int version) throws Exception {
+        RequestHeader header = new RequestHeader();
+        header.setType(ZooDefs.OpCode.check);
+        CheckVersionRequest request = new CheckVersionRequest(path, version);
+        ReplyHeader replyHeader = zk.submitRequest(header, request, null, null);
+        if (replyHeader.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(replyHeader.getErr()), path);
+        }
+    }
+
+    private void testOperations(TestableZooKeeper zk) throws Exception {
+        Stat stat = new Stat();
+        zk.getData("/", false, stat);
+        checkVersion(zk, "/", -1);
+        checkVersion(zk, "/", stat.getVersion());
+        assertThrows(KeeperException.BadVersionException.class, () -> {
+            checkVersion(zk, "/", stat.getVersion() + 1);
+        });
+        assertThrows(KeeperException.NoNodeException.class, () -> {
+            checkVersion(zk, "/no-node", Integer.MAX_VALUE);
+        });
+    }
+
+    @Test
+    public void testStandalone() throws Exception {
+        TestableZooKeeper zk = createClient();
+        testOperations(zk);
+        stopServer();
+        startServer();
+        createClient();
+    }
+
+    @Test
+    public void testCluster() throws Exception {
+        QuorumBase qb = new QuorumBase();
+        try {
+            qb.setUp(true, true);
+            testOperations(qb.createClient(new CountdownWatcher(), QuorumPeer.ServerState.OBSERVING));
+            testOperations(qb.createClient(new CountdownWatcher(), QuorumPeer.ServerState.FOLLOWING));
+            testOperations(qb.createClient(new CountdownWatcher(), QuorumPeer.ServerState.LEADING));
+            int leaderIndex = qb.getLeaderIndex();
+            int leaderPort = qb.getLeaderClientPort();
+            qb.shutdown(qb.getLeaderQuorumPeer());
+            qb.setupServer(leaderIndex + 1);
+            QuorumPeer quorumPeer = qb.getPeerList().get(leaderIndex);
+            quorumPeer.start();
+            qb.createClient("localhost:" + leaderPort, 2 * CONNECTION_TIMEOUT);
+        } finally {
+            try {
+                qb.tearDown();
+            } catch (Exception ignored) {}
+        }
+    }
+}

+ 4 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/test/ClientBase.java

@@ -197,6 +197,10 @@ public abstract class ClientBase extends ZKTestCase {
     private List<ZooKeeper> allClients;
     private boolean allClientsSetup = false;
 
+    protected TestableZooKeeper createClient(String hp, int timeout) throws IOException, InterruptedException {
+        return createClient(new CountdownWatcher(), hp, timeout);
+    }
+
     protected TestableZooKeeper createClient(CountdownWatcher watcher, String hp) throws IOException, InterruptedException {
         return createClient(watcher, hp, CONNECTION_TIMEOUT);
     }