Browse Source

ZOOKEEPER-3970: Enable ZooKeeperServerController to expire session.

This is a follow up of ZOOKEEPER-3948. Here we enable ZooKeeperServerController to be able to expire a global or local session. This is very useful in our experience in integration testing when we want a controlled session expiration mechanism. This is done by having session tracker exposing both global and local session stats, so a zookeeper server can expire the sessions in the controller.

Author: Michael Han <hanm@apache.org>

Reviewers: Damien Diederen <ddiederen@apache.org>

Closes #1505 from hanm/ZOOKEEPER-3970
Michael Han 4 years ago
parent
commit
e41dc7dbb0

+ 10 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTracker.java

@@ -138,4 +138,14 @@ public interface SessionTracker {
     long getLocalSessionCount();
 
     boolean isLocalSessionsEnabled();
+
+    /**
+     * Get a set of global session IDs
+     */
+    Set<Long> globalSessions();
+
+    /**
+     * Get a set of local session IDs
+     */
+    Set<Long> localSessions();
 }

+ 10 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java

@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
 import java.io.PrintWriter;
 import java.io.StringWriter;
 import java.text.MessageFormat;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -49,7 +50,7 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements Sessi
 
     private final ExpiryQueue<SessionImpl> sessionExpiryQueue;
 
-    private final ConcurrentMap<Long, Integer> sessionsWithTimeout;
+    protected final ConcurrentMap<Long, Integer> sessionsWithTimeout;
     private final AtomicLong nextSessionId = new AtomicLong();
 
     public static class SessionImpl implements Session {
@@ -347,4 +348,12 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements Sessi
     public boolean isLocalSessionsEnabled() {
         return false;
     }
+
+    public Set<Long> globalSessions() {
+        return sessionsById.keySet();
+    }
+
+    public Set<Long> localSessions() {
+        return Collections.emptySet();
+    }
 }

+ 6 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -632,6 +632,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         close(sessionId);
     }
 
+    public void expire(long sessionId) {
+        LOG.info("forcibly expiring session 0x{}", Long.toHexString(sessionId));
+
+        close(sessionId);
+    }
+
     public static class MissingSessionException extends IOException {
 
         private static final long serialVersionUID = 7467414635467261007L;

+ 21 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/controller/ZooKeeperServerController.java

@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.apache.zookeeper.server.ExitCode;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
 import org.apache.zookeeper.util.ServiceUtils;
@@ -122,13 +123,12 @@ public class ZooKeeperServerController {
                 }
                 break;
             case EXPIRESESSION:
-                // TODO: (hanm) implement once dependent feature is ready.
                 if (command.getParameter() == null) {
-                    // expireAllSessions();
+                    expireAllSessions();
                 } else {
                     // A single parameter should be a session id as long.
                     // Parse failure exceptions will be sent to the caller
-                    // expireSession(Long.decode(command.getParameter()));
+                    expireSession(Long.decode(command.getParameter()));
                 }
                 break;
             case REJECTCONNECTIONS:
@@ -164,5 +164,23 @@ public class ZooKeeperServerController {
         }
     }
 
+    private ZooKeeperServer getServer() {
+        return quorumPeer.getActiveServer();
+    }
+
+    private void expireSession(long sessionId) {
+        getServer().expire(sessionId);
+    }
+
+    private void expireAllSessions() {
+        for (Long sessionId : getServer().getSessionTracker().localSessions()) {
+            expireSession(sessionId);
+        }
+
+        for (Long sessionId : getServer().getSessionTracker().globalSessions()) {
+            expireSession(sessionId);
+        }
+    }
+
 }
 

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LeaderSessionTracker.java

@@ -223,4 +223,7 @@ public class LeaderSessionTracker extends UpgradeableSessionTracker {
         return sessionExpiryMap;
     }
 
+    public Set<Long> globalSessions() {
+        return globalSessionTracker.globalSessions();
+    }
 }

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerSessionTracker.java

@@ -225,4 +225,7 @@ public class LearnerSessionTracker extends UpgradeableSessionTracker {
         return new HashMap<Long, Set<Long>>();
     }
 
+    public Set<Long> globalSessions() {
+        return globalSessionsWithTimeouts.keySet();
+    }
 }

+ 4 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LocalSessionTracker.java

@@ -18,6 +18,7 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.zookeeper.server.SessionTrackerImpl;
 import org.apache.zookeeper.server.ZooKeeperServerListener;
@@ -26,7 +27,6 @@ import org.apache.zookeeper.server.ZooKeeperServerListener;
  * Local session tracker.
  */
 public class LocalSessionTracker extends SessionTrackerImpl {
-
     public LocalSessionTracker(SessionExpirer expirer, ConcurrentMap<Long, Integer> sessionsWithTimeouts, int tickTime, long id, ZooKeeperServerListener listener) {
         super(expirer, sessionsWithTimeouts, tickTime, id, listener);
     }
@@ -45,4 +45,7 @@ public class LocalSessionTracker extends SessionTrackerImpl {
         return sessionId;
     }
 
+    public Set<Long> localSessions() {
+        return sessionsWithTimeout.keySet();
+    }
 }

+ 8 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UpgradeableSessionTracker.java

@@ -18,6 +18,8 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.util.Collections;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import org.apache.zookeeper.KeeperException;
@@ -111,7 +113,8 @@ public abstract class UpgradeableSessionTracker implements SessionTracker {
         localSessionTracker.removeSession(sessionId);
     }
 
-    public void checkGlobalSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
+    public void checkGlobalSession(long sessionId, Object owner)
+        throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
         throw new UnsupportedOperationException();
     }
 
@@ -122,4 +125,8 @@ public abstract class UpgradeableSessionTracker implements SessionTracker {
         return localSessionsWithTimeouts.size();
     }
 
+    public Set<Long> localSessions() {
+        return (localSessionTracker == null) ? Collections.<Long>emptySet()
+            : localSessionTracker.localSessions();
+    }
 }

+ 9 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorTest.java

@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.PrintWriter;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -394,6 +395,13 @@ public class PrepRequestProcessorTest extends ClientBase {
         public boolean isLocalSessionsEnabled() {
             return false;
         }
-    }
 
+        public Set<Long> globalSessions() {
+            return Collections.emptySet();
+        }
+
+        public Set<Long> localSessions() {
+            return Collections.emptySet();
+        }
+    }
 }

+ 1 - 5
zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ControllerClientServerTest.java

@@ -19,7 +19,6 @@
 package org.apache.zookeeper.server.controller;
 
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class ControllerClientServerTest extends ControllerTestBase {
@@ -41,10 +40,7 @@ public class ControllerClientServerTest extends ControllerTestBase {
         Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.CLOSECONNECTION));
     }
 
-    // TODO (hanm): this depends on the expiration session feature which
-    // is not part of this patch. This test will be enabled once that
-    // feature is upstreamed.
-    @Ignore
+    @Test
     public void verifyExpireSessionCommand() {
         // Valid long session ids should be accepted.
         Assert.assertTrue(commandClient.trySendCommand(ControlCommand.Action.EXPIRESESSION, "0x1234"));

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/server/controller/ZooKeeperServerControllerEndToEndTest.java

@@ -84,7 +84,7 @@ public class ZooKeeperServerControllerEndToEndTest extends ControllerTestBase {
         watcher.waitForEvent();
     }
 
-    @Ignore
+    @Test
     public void verifySessionExpiration() throws Exception {
         // Setup: First connect to the server and wait for connected.
         BlockingStateWatcher watcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);
@@ -104,7 +104,7 @@ public class ZooKeeperServerControllerEndToEndTest extends ControllerTestBase {
         watcher.waitForEvent();
     }
 
-    @Ignore
+    @Test
     public void verifyGlobalSessionExpiration() throws Exception {
         // Step 1: Connect.
         BlockingStateWatcher stateWatcher = new BlockingStateWatcher(Watcher.Event.KeeperState.SyncConnected);