Bläddra i källkod

ZOOKEEPER-4909: Fix exceeded request timeout in case of spurious wakeup

Closes #2237

Co-authored-by: Kezhu Wang <kezhuw@apache.org>
Signed-off-by: Kezhu Wang <kezhuw@apache.org>
Signed-off-by: Chris Nauroth <cnauroth@apache.org>
luozongle01 1 månad sedan
förälder
incheckning
9cc30438a4

+ 10 - 8
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -1540,14 +1540,16 @@ public class ClientCnxn {
      * Wait for request completion with timeout.
      */
     private void waitForPacketFinish(ReplyHeader r, Packet packet) throws InterruptedException {
-        long waitStartTime = Time.currentElapsedTime();
-        while (!packet.finished) {
-            packet.wait(requestTimeout);
-            if (!packet.finished && ((Time.currentElapsedTime() - waitStartTime) >= requestTimeout)) {
-                LOG.error("Timeout error occurred for the packet '{}'.", packet);
-                r.setErr(Code.REQUESTTIMEOUT.intValue());
-                break;
-            }
+        long remainingTime = requestTimeout;
+        while (!packet.finished && remainingTime > 0) {
+            long waitStartTime = Time.currentElapsedTime();
+            packet.wait(remainingTime);
+            remainingTime -= (Time.currentElapsedTime() - waitStartTime);
+        }
+
+        if (!packet.finished) {
+            LOG.error("Timeout error occurred for the packet '{}'.", packet);
+            r.setErr(Code.REQUESTTIMEOUT.intValue());
         }
     }
 

+ 131 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/ClientRequestTimeoutTest.java

@@ -19,13 +19,21 @@
 package org.apache.zookeeper;
 
 import static org.apache.zookeeper.test.ClientBase.CONNECTION_TIMEOUT;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.jute.Record;
 import org.apache.zookeeper.ZooDefs.Ids;
 import org.apache.zookeeper.client.HostProvider;
 import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.common.Time;
+import org.apache.zookeeper.proto.ReplyHeader;
+import org.apache.zookeeper.proto.RequestHeader;
 import org.apache.zookeeper.server.quorum.QuorumPeerTestBase;
 import org.apache.zookeeper.test.ClientBase;
 import org.apache.zookeeper.test.ClientBase.CountdownWatcher;
@@ -37,6 +45,9 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase {
     private static final int SERVER_COUNT = 3;
     private boolean dropPacket = false;
     private int dropPacketType = ZooDefs.OpCode.create;
+    private boolean capturePacket = false;
+    private int capturePacketType = ZooDefs.OpCode.create;
+    private ClientCnxn.Packet capturedPacket = null;
 
     @Test
     @Timeout(value = 120)
@@ -94,6 +105,105 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase {
         }
     }
 
+    @Test
+    void testClientRequestTimeoutTime() throws Exception {
+        long requestTimeout = TimeUnit.SECONDS.toMillis(5);
+        System.setProperty("zookeeper.request.timeout", Long.toString(requestTimeout));
+
+        CustomZooKeeper zk = null;
+        int clientPort = PortAssignment.unique();
+        MainThread mainThread = new MainThread(0, clientPort, "", false);
+        mainThread.start();
+        try {
+            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT),
+                    "waiting for server 0 being up");
+
+            CountdownWatcher watch = new CountdownWatcher();
+            zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), ClientBase.CONNECTION_TIMEOUT, watch);
+            watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+            dropPacket = true;
+            dropPacketType = ZooDefs.OpCode.create;
+
+            String data = "originalData";
+            long startTime = Time.currentElapsedTime();
+            try {
+                zk.create("/testClientRequestTimeout", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+                fail("KeeperException is expected.");
+            } catch (KeeperException exception) {
+                long cost = Time.currentElapsedTime() - startTime;
+                assertEquals(KeeperException.Code.REQUESTTIMEOUT, exception.code());
+                LOG.info("testClientRequestTimeoutTime cost:{}", cost);
+                assertThat(cost, greaterThanOrEqualTo(requestTimeout));
+                assertThat(cost, lessThan(requestTimeout + 500));
+            }
+        } finally {
+            mainThread.shutdown();
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
+
+    @Test
+    void testClientRequestTimeoutTimeSimulatingSpuriousWakeup() throws Exception {
+        long requestTimeout = TimeUnit.SECONDS.toMillis(5);
+        System.setProperty("zookeeper.request.timeout", Long.toString(requestTimeout));
+
+        CustomZooKeeper zk = null;
+        int clientPort = PortAssignment.unique();
+        MainThread mainThread = new MainThread(0, clientPort, "", false);
+        mainThread.start();
+        try {
+            assertTrue(ClientBase.waitForServerUp("127.0.0.1:" + clientPort, CONNECTION_TIMEOUT),
+                    "waiting for server 0 being up");
+
+            CountdownWatcher watch = new CountdownWatcher();
+            zk = new CustomZooKeeper(getCxnString(new int[]{clientPort}), ClientBase.CONNECTION_TIMEOUT, watch);
+            watch.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+
+            dropPacket = true;
+            dropPacketType = ZooDefs.OpCode.create;
+            capturePacket = true;
+            capturePacketType = ZooDefs.OpCode.create;
+
+            // Simulating spurious wakeup
+            new Thread(() -> {
+                try {
+                    TimeUnit.MILLISECONDS.sleep(requestTimeout / 2);
+                    if (capturedPacket != null) {
+                        synchronized (capturedPacket) {
+                            capturedPacket.notifyAll();
+                        }
+                    }
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }).start();
+
+            String data = "originalData";
+            long startTime = Time.currentElapsedTime();
+            try {
+                zk.create("/testClientRequestTimeout", data.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);
+                fail("KeeperException is expected.");
+            } catch (KeeperException exception) {
+                long cost = Time.currentElapsedTime() - startTime;
+                assertEquals(KeeperException.Code.REQUESTTIMEOUT, exception.code());
+                LOG.info("testClientRequestTimeoutTimeSimulatingSpuriousWakeup cost:{}", cost);
+                assertThat(cost, greaterThanOrEqualTo(requestTimeout));
+                assertThat(cost, lessThan(requestTimeout + 500));
+            }
+        } finally {
+            capturePacket = false;
+            capturedPacket = null;
+            mainThread.shutdown();
+            if (zk != null) {
+                zk.close();
+            }
+        }
+    }
+
     /**
      * @return connection string in the form of
      *         127.0.0.1:port1,127.0.0.1:port2,127.0.0.1:port3
@@ -143,6 +253,27 @@ public class ClientRequestTimeoutTest extends QuorumPeerTestBase {
             super.finishPacket(p);
         }
 
+        @Override
+        public Packet queuePacket(
+                RequestHeader h,
+                ReplyHeader r,
+                Record request,
+                Record response,
+                AsyncCallback cb,
+                String clientPath,
+                String serverPath,
+                Object ctx,
+                ZooKeeper.WatchRegistration watchRegistration,
+                WatchDeregistration watchDeregistration) {
+            Packet packet = super.queuePacket(h, r, request, response, cb, clientPath, serverPath,
+                    ctx, watchRegistration, watchDeregistration);
+
+            if (capturePacket && h != null && h.getType() == capturePacketType) {
+                capturedPacket = packet;
+            }
+            return packet;
+        }
+
     }
 
     class CustomZooKeeper extends ZooKeeper {