|
@@ -18,6 +18,9 @@
|
|
|
|
|
|
package org.apache.zookeeper.test;
|
|
|
|
|
|
+import static org.hamcrest.MatcherAssert.assertThat;
|
|
|
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
|
|
+import static org.hamcrest.Matchers.lessThan;
|
|
|
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
|
import static org.junit.jupiter.api.Assertions.assertNull;
|
|
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
|
@@ -31,12 +34,15 @@ import java.util.List;
|
|
|
import java.util.concurrent.CompletableFuture;
|
|
|
import java.util.concurrent.CountDownLatch;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.TimeoutException;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.TestableZooKeeper;
|
|
|
import org.apache.zookeeper.WatchedEvent;
|
|
|
import org.apache.zookeeper.Watcher;
|
|
|
import org.apache.zookeeper.ZooDefs;
|
|
|
+import org.apache.zookeeper.ZooKeeper;
|
|
|
+import org.apache.zookeeper.common.Time;
|
|
|
import org.junit.jupiter.api.BeforeEach;
|
|
|
import org.junit.jupiter.api.Test;
|
|
|
import org.slf4j.Logger;
|
|
@@ -54,6 +60,21 @@ public class SessionTimeoutTest extends ClientBase {
|
|
|
zk = createClient();
|
|
|
}
|
|
|
|
|
|
+ private static class ExpiredWatcher implements Watcher {
|
|
|
+ public volatile CompletableFuture<Void> expired = new CompletableFuture<>();
|
|
|
+
|
|
|
+ synchronized void reset() {
|
|
|
+ expired = new CompletableFuture<>();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public synchronized void process(WatchedEvent event) {
|
|
|
+ if (event.getState() == Event.KeeperState.Expired) {
|
|
|
+ expired.complete(null);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
private static class BusyServer implements AutoCloseable {
|
|
|
private final ServerSocket server;
|
|
|
private final Socket client;
|
|
@@ -143,17 +164,24 @@ public class SessionTimeoutTest extends ClientBase {
|
|
|
// stop client also to gain less distraction
|
|
|
zk.close();
|
|
|
|
|
|
- // small connection timeout to gain quick ci feedback
|
|
|
- int sessionTimeout = 3000;
|
|
|
- CompletableFuture<Void> expired = new CompletableFuture<>();
|
|
|
+ // given: established session
|
|
|
+ int sessionTimeout = 3000; // small connection timeout to gain quick ci feedback
|
|
|
+ ExpiredWatcher watcher = new ExpiredWatcher();
|
|
|
zk = createClient(new CountdownWatcher(), hostPort, sessionTimeout);
|
|
|
- zk.register(event -> {
|
|
|
- if (event.getState() == Watcher.Event.KeeperState.Expired) {
|
|
|
- expired.complete(null);
|
|
|
- }
|
|
|
- });
|
|
|
+ zk.register(watcher);
|
|
|
+
|
|
|
+ // when: all server down
|
|
|
+ long start = Time.currentElapsedTime();
|
|
|
+ zk.sync("/"); // touch timeout counts
|
|
|
stopServer();
|
|
|
- expired.join();
|
|
|
+
|
|
|
+ // then: get Expired after session timeout
|
|
|
+ watcher.expired.join();
|
|
|
+ long elapsed = Time.currentElapsedTime() - start;
|
|
|
+ assertThat(elapsed, greaterThanOrEqualTo((long) zk.getSessionTimeout()));
|
|
|
+ assertThat(elapsed, lessThan(zk.getSessionTimeout() * 10L));
|
|
|
+
|
|
|
+ // then: future request will get SessionExpiredException
|
|
|
assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
|
|
|
}
|
|
|
|
|
@@ -162,18 +190,17 @@ public class SessionTimeoutTest extends ClientBase {
|
|
|
// stop client also to gain less distraction
|
|
|
zk.close();
|
|
|
|
|
|
+ // given: unavailable cluster
|
|
|
stopServer();
|
|
|
|
|
|
- // small connection timeout to gain quick ci feedback
|
|
|
- int sessionTimeout = 3000;
|
|
|
- CompletableFuture<Void> expired = new CompletableFuture<>();
|
|
|
- new TestableZooKeeper(hostPort, sessionTimeout, event -> {
|
|
|
- if (event.getState() == Watcher.Event.KeeperState.Expired) {
|
|
|
- expired.complete(null);
|
|
|
- }
|
|
|
- });
|
|
|
- expired.join();
|
|
|
- assertThrows(KeeperException.SessionExpiredException.class, () -> zk.exists("/", null));
|
|
|
+ // when: try to establish a brand-new session
|
|
|
+ int sessionTimeout = 300; // small connection timeout to gain quick ci feedback
|
|
|
+ ExpiredWatcher watcher = new ExpiredWatcher();
|
|
|
+ try (ZooKeeper zk = new ZooKeeper(hostPort, sessionTimeout, watcher)) {
|
|
|
+ // then: never Expired
|
|
|
+ assertThrows(TimeoutException.class, () -> watcher.expired.get(3 * sessionTimeout, TimeUnit.MILLISECONDS));
|
|
|
+ assertThrows(KeeperException.ConnectionLossException.class, () -> zk.exists("/", null));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Test
|