فهرست منبع

ZOOKEEPER-4747: Add synchronous sync to ease synchronous call chains (#2068)

Previously, there is no synchronous `sync` so client has to convert
asynchronous `sync` a bit to fit synchronous call chains. This is
apparently unfriendly.

Besides above, in absent of ZOOKEEPER-22, we can't issue a fire and
forget asynchronous `sync` to gain strong consistent. So it becomes
crucial for client to have a convenient synchronous `sync`.

Refs: ZOOKEEPER-1167, ZOOKEEPER-4747
Kezhu Wang 7 ماه پیش
والد
کامیت
3e2d6f3436

+ 25 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/ZooKeeper.java

@@ -2708,6 +2708,31 @@ public class ZooKeeper implements AutoCloseable {
         getEphemerals("/", cb, ctx);
     }
 
+    /**
+     * Synchronous sync. Flushes channel between process and leader.
+     *
+     * @param path the given path
+     * @throws KeeperException If the server signals an error with a non-zero error code
+     * @throws InterruptedException If the server transaction is interrupted.
+     * @throws IllegalArgumentException if an invalid path is specified
+     */
+    public void sync(final String path) throws KeeperException, InterruptedException {
+        final String clientPath = path;
+        PathUtils.validatePath(clientPath);
+
+        final String serverPath = prependChroot(clientPath);
+
+        RequestHeader h = new RequestHeader();
+        h.setType(ZooDefs.OpCode.sync);
+        SyncRequest request = new SyncRequest();
+        SyncResponse response = new SyncResponse();
+        request.setPath(serverPath);
+        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
+        if (r.getErr() != 0) {
+            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
+        }
+    }
+
     /**
      * Asynchronous sync. Flushes channel between process and leader.
      * @param path

+ 20 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/ZKTestCase.java

@@ -22,6 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.File;
 import java.time.Instant;
+import java.util.concurrent.CompletableFuture;
 import org.apache.zookeeper.metrics.MetricsUtils;
 import org.apache.zookeeper.util.ServiceUtils;
 import org.hamcrest.CustomMatcher;
@@ -58,6 +59,25 @@ public class ZKTestCase {
         return testName;
     }
 
+    public void syncClient(ZooKeeper zk, boolean synchronous) throws KeeperException {
+        if (synchronous) {
+            try {
+                zk.sync("/");
+            } catch (InterruptedException ex) {
+                throw new RuntimeException(ex);
+            }
+            return;
+        }
+        final CompletableFuture<KeeperException.Code> synced = new CompletableFuture<>();
+        zk.sync("/", (rc, path, ctx) -> {
+            synced.complete(KeeperException.Code.get(rc));
+        }, null);
+        KeeperException.Code code = synced.join();
+        if (code != KeeperException.Code.OK) {
+            throw KeeperException.create(code);
+        }
+    }
+
     @BeforeAll
     public static void before() {
         if (!testBaseDir.exists()) {

+ 2 - 16
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/EagerACLFilterTest.java

@@ -22,13 +22,11 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
-import java.util.concurrent.CompletableFuture;
 import java.util.stream.Stream;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.ZooDefs.Ids;
-import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
 import org.apache.zookeeper.test.QuorumBase;
@@ -95,18 +93,6 @@ public class EagerACLFilterTest extends QuorumBase {
         clientWatchB.waitForConnected(CONNECTION_TIMEOUT);
     }
 
-    void syncClient(ZooKeeper zk) {
-        CompletableFuture<Void> synced = new CompletableFuture<>();
-        zk.sync("/", (rc, path, ctx) -> {
-            if (rc == 0) {
-                synced.complete(null);
-            } else {
-                synced.completeExceptionally(KeeperException.create(KeeperException.Code.get(rc)));
-            }
-        }, null);
-        synced.join();
-    }
-
     @AfterEach
     public void tearDown() throws Exception {
         if (zkClient != null) {
@@ -124,7 +110,7 @@ public class EagerACLFilterTest extends QuorumBase {
         ZooKeeperServer.setEnableEagerACLCheck(enabled);
     }
 
-    private void assertTransactionState(String operation, QuorumPeer peer, long lastxid) {
+    private void assertTransactionState(String operation, QuorumPeer peer, long lastxid) throws Exception {
         if (peer == zkLeader && peer != zkConnected) {
             // The operation is performed on no leader, but we are asserting on leader.
             // There is no happen-before between `zkLeader.getLastLoggedZxid()` and
@@ -133,7 +119,7 @@ public class EagerACLFilterTest extends QuorumBase {
             // to sync leader client to go through commit and response path in leader to
             // build happen-before between `zkLeader.getLastLoggedZxid()` and side effect
             // of previous operation.
-            syncClient(zkLeaderClient);
+            syncClient(zkLeaderClient, false);
         }
         assertTrue(peer == zkLeader || peer == zkConnected);
         boolean eagerACL = ZooKeeperServer.isEnableEagerACLCheck();

+ 13 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumRequestPipelineTest.java

@@ -26,6 +26,8 @@ import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.ForkJoinTask;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Stream;
 import org.apache.zookeeper.AsyncCallback.VoidCallback;
@@ -179,4 +181,15 @@ public class QuorumRequestPipelineTest extends QuorumBase {
         assertTrue(complete, String.format("%s Sync completed", serverState));
     }
 
+    @ParameterizedTest
+    @MethodSource("data")
+    public void testSynchronousSync(ServerState serverState) throws Exception {
+        setUp(serverState);
+        create2EmptyNode(zkClient, PARENT_PATH);
+        ForkJoinTask<Void> task = ForkJoinPool.commonPool().submit(() -> {
+            zkClient.sync(PARENT_PATH);
+            return null;
+        });
+        task.get(30, TimeUnit.SECONDS);
+    }
 }

+ 19 - 22
zookeeper-server/src/test/java/org/apache/zookeeper/test/QuorumTest.java

@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.concurrent.TimeoutException;
+import org.apache.zookeeper.AsyncCallback;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.DummyWatcher;
 import org.apache.zookeeper.KeeperException;
@@ -182,15 +183,24 @@ public class QuorumTest extends ZKTestCase {
     }
 
     /**
-     * Make sure that we can change sessions
-     *  from follower to leader.
-     *
-     * @throws IOException
-     * @throws InterruptedException
-     * @throws KeeperException
+     * Make sure that we can change sessions among servers and maintain consistent view
+     * using {@link ZooKeeper#sync(String)}.
      */
     @Test
-    public void testSessionMoved() throws Exception {
+    public void testSessionMovedWithSynchronousSync() throws Exception {
+        testSessionMoved(true);
+    }
+
+    /**
+     * Make sure that we can change sessions among servers and maintain consistent view
+     * using {@link ZooKeeper#sync(String, AsyncCallback.VoidCallback, Object)}.
+     */
+    @Test
+    public void testSessionMovedWithAsynchronousSync() throws Exception {
+        testSessionMoved(false);
+    }
+
+    public void testSessionMoved(boolean synchronous_sync) throws Exception {
         String[] hostPorts = qb.hostPort.split(",");
         DisconnectableZooKeeper zk = new DisconnectableZooKeeper(
             hostPorts[0],
@@ -208,21 +218,8 @@ public class QuorumTest extends ZKTestCase {
                 zk.getSessionId(),
                 zk.getSessionPasswd());
             zknew.setData("/", new byte[1], -1);
-            final int[] result = new int[1];
-            result[0] = Integer.MAX_VALUE;
-            zknew.sync("/", (rc, path, ctx) -> {
-                synchronized (result) {
-                    result[0] = rc;
-                    result.notify();
-                }
-            }, null);
-            synchronized (result) {
-                if (result[0] == Integer.MAX_VALUE) {
-                    result.wait(5000);
-                }
-            }
-            LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]);
-            assertTrue(result[0] == KeeperException.Code.OK.intValue());
+            syncClient(zknew, synchronous_sync);
+            LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
             try {
                 zk.setData("/", new byte[1], -1);
                 fail("Should have lost the connection");

+ 3 - 15
zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionTest.java

@@ -243,21 +243,8 @@ public class SessionTest extends ZKTestCase {
                                                                                           % hostPorts.length], CONNECTION_TIMEOUT, new MyWatcher(Integer.toString(
                     i
                             + 1)), zk.getSessionId(), zk.getSessionPasswd());
-            final int[] result = new int[1];
-            result[0] = Integer.MAX_VALUE;
-            zknew.sync("/", (rc, path, ctx) -> {
-                synchronized (result) {
-                    result[0] = rc;
-                    result.notify();
-                }
-            }, null);
-            synchronized (result) {
-                if (result[0] == Integer.MAX_VALUE) {
-                    result.wait(5000);
-                }
-            }
-            LOG.info("{} Sync returned {}", hostPorts[(i + 1) % hostPorts.length], result[0]);
-            assertTrue(result[0] == KeeperException.Code.OK.intValue());
+            zknew.sync("/");
+            LOG.info("{} Sync succeed", hostPorts[(i + 1) % hostPorts.length]);
             zknew.setData("/", new byte[1], -1);
             try {
                 zk.setData("/", new byte[1], -1);
@@ -270,6 +257,7 @@ public class SessionTest extends ZKTestCase {
         }
         zk.close();
     }
+
     /**
      * This test makes sure that duplicate state changes are not communicated
      * to the client watcher. For example we should not notify state as