浏览代码

ZOOKEEPER-3863: Do not track global sessions in ReadOnlyZooKeeperServer

Author: Jie Huang <jiehuang@fb.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Michael Han <hanm@apache.org>

Closes #1380 from jhuan31/ZOOKEEPER-3863
Jie Huang 4 年之前
父节点
当前提交
c47ef905e0

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java

@@ -101,6 +101,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
         CLOSE_CONNECTION_COMMAND("close_connection_command"),
         CLEAN_UP("clean_up"),
         CONNECTION_MODE_CHANGED("connection_mode_changed"),
+        RENEW_GLOBAL_SESSION_IN_RO_MODE("renew a global session in readonly mode"),
         // Below reasons are NettyServerCnxnFactory only
         CHANNEL_DISCONNECTED("channel disconnected"),
         CHANNEL_CLOSED_EXCEPTION("channel_closed_exception"),
@@ -298,7 +299,7 @@ public abstract class ServerCnxn implements Stats, Watcher {
 
     protected ZooKeeperSaslServer zooKeeperSaslServer = null;
 
-    protected static class CloseRequestException extends IOException {
+    public static class CloseRequestException extends IOException {
 
         private static final long serialVersionUID = -7854505709816442681L;
         private DisconnectReason reason;

+ 18 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -1412,13 +1412,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                 connReq.getTimeOut(),
                 cnxn.getRemoteSocketAddress());
         } else {
-            long clientSessionId = connReq.getSessionId();
-                LOG.debug(
-                    "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
-                    Long.toHexString(clientSessionId),
-                    Long.toHexString(connReq.getLastZxidSeen()),
-                    connReq.getTimeOut(),
-                    cnxn.getRemoteSocketAddress());
+            validateSession(cnxn, sessionId);
+            LOG.debug(
+                "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
+                Long.toHexString(sessionId),
+                Long.toHexString(connReq.getLastZxidSeen()),
+                connReq.getTimeOut(),
+                cnxn.getRemoteSocketAddress());
             if (serverCnxnFactory != null) {
                 serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
             }
@@ -1432,6 +1432,17 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
+    /**
+     * Validate if a particular session can be reestablished.
+     *
+     * @param cnxn
+     * @param sessionId
+     */
+    protected void validateSession(ServerCnxn cnxn, long sessionId)
+            throws IOException {
+        // do nothing
+    }
+
     public boolean shouldThrottle(long outStandingCount) {
         int globalOutstandingLimit = getGlobalOutstandingLimit();
         if (globalOutstandingLimit < getInflight() || globalOutstandingLimit < getInProcess()) {

+ 19 - 9
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyRequestProcessor.java

@@ -86,16 +86,14 @@ public class ReadOnlyRequestProcessor extends ZooKeeperCriticalThread implements
                 case OpCode.setACL:
                 case OpCode.multi:
                 case OpCode.check:
-                    ReplyHeader hdr = new ReplyHeader(
-                        request.cxid,
-                        zks.getZKDatabase().getDataTreeLastProcessedZxid(),
-                        Code.NOTREADONLY.intValue());
-                    try {
-                        request.cnxn.sendResponse(hdr, null, null);
-                    } catch (IOException e) {
-                        LOG.error("IO exception while sending response", e);
-                    }
+                    sendErrorResponse(request);
                     continue;
+                case OpCode.closeSession:
+                case OpCode.createSession:
+                    if (!request.isLocalSession()) {
+                        sendErrorResponse(request);
+                        continue;
+                    }
                 }
 
                 // proceed to the next processor
@@ -109,6 +107,18 @@ public class ReadOnlyRequestProcessor extends ZooKeeperCriticalThread implements
         LOG.info("ReadOnlyRequestProcessor exited loop!");
     }
 
+    private void sendErrorResponse(Request request) {
+        ReplyHeader hdr = new ReplyHeader(
+                request.cxid,
+                zks.getZKDatabase().getDataTreeLastProcessedZxid(),
+                Code.NOTREADONLY.intValue());
+        try {
+            request.cnxn.sendResponse(hdr, null, null);
+        } catch (IOException e) {
+            LOG.error("IO exception while sending response", e);
+        }
+    }
+
     @Override
     public void processRequest(Request request) {
         if (!finished) {

+ 47 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServer.java

@@ -18,14 +18,18 @@
 
 package org.apache.zookeeper.server.quorum;
 
+import java.io.IOException;
 import java.io.PrintWriter;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.server.DataTreeBean;
 import org.apache.zookeeper.server.FinalRequestProcessor;
 import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
 import org.apache.zookeeper.server.ZooKeeperServerBean;
@@ -80,6 +84,49 @@ public class ReadOnlyZooKeeperServer extends ZooKeeperServer {
         LOG.info("Read-only server started");
     }
 
+    @Override
+    public void createSessionTracker() {
+        sessionTracker = new LearnerSessionTracker(
+                this, getZKDatabase().getSessionWithTimeOuts(),
+                this.tickTime, self.getId(), self.areLocalSessionsEnabled(),
+                getZooKeeperServerListener());
+    }
+
+    @Override
+    protected void startSessionTracker() {
+        ((LearnerSessionTracker) sessionTracker).start();
+    }
+
+    @Override
+    protected void setLocalSessionFlag(Request si) {
+        switch (si.type) {
+            case OpCode.createSession:
+                if (self.areLocalSessionsEnabled()) {
+                    si.setLocalSession(true);
+                }
+                break;
+            case OpCode.closeSession:
+                if (((UpgradeableSessionTracker) sessionTracker).isLocalSession(si.sessionId)) {
+                    si.setLocalSession(true);
+                } else {
+                    LOG.warn("Submitting global closeSession request for session 0x{} in ReadOnly mode",
+                            Long.toHexString(si.sessionId));
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
+    @Override
+    protected void validateSession(ServerCnxn cnxn, long sessionId) throws IOException {
+        if (((LearnerSessionTracker) sessionTracker).isGlobalSession(sessionId)) {
+            String msg = "Refusing global session reconnection in RO mode " + cnxn.getRemoteSocketAddress();
+            LOG.info(msg);
+            throw new ServerCnxn.CloseRequestException(msg, ServerCnxn.DisconnectReason.RENEW_GLOBAL_SESSION_IN_RO_MODE);
+        }
+    }
+
     @Override
     protected void registerJMX() {
         // register with JMX

+ 56 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/ReadOnlyModeTest.java

@@ -27,6 +27,7 @@ import static org.junit.jupiter.api.Assertions.fail;
 import java.io.ByteArrayOutputStream;
 import java.io.LineNumberReader;
 import java.io.StringReader;
+import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import org.apache.log4j.Layout;
 import org.apache.log4j.Level;
@@ -57,7 +58,6 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @BeforeEach
     public void setUp() throws Exception {
         System.setProperty("readonlymode.enabled", "true");
-        qu.startQuorum();
     }
 
     @AfterEach
@@ -72,6 +72,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testMultiTransaction() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         CountdownWatcher watcher = new CountdownWatcher();
         ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
@@ -80,9 +83,12 @@ public class ReadOnlyModeTest extends ZKTestCase {
         final String node1 = "/tnode1";
         final String node2 = "/tnode2";
         zk.create(node1, data.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+        zk.close();
+        watcher.waitForDisconnected(CONNECTION_TIMEOUT);
 
         watcher.reset();
         qu.shutdown(2);
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT);
         assertEquals(States.CONNECTEDREADONLY, zk.getState(), "Should be in r-o mode");
 
@@ -110,6 +116,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testReadOnlyClient() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         CountdownWatcher watcher = new CountdownWatcher();
         ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
         watcher.waitForConnected(CONNECTION_TIMEOUT); // ensure zk got connected
@@ -161,6 +170,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testConnectionEvents() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         CountdownWatcher watcher = new CountdownWatcher();
         ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
         boolean success = false;
@@ -202,6 +214,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testSessionEstablishment() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         qu.shutdown(2);
 
         CountdownWatcher watcher = new CountdownWatcher();
@@ -230,6 +245,43 @@ public class ReadOnlyModeTest extends ZKTestCase {
         zk.close();
     }
 
+    @Test(timeout = 90000)
+    public void testGlobalSessionInRO() throws Exception {
+        qu.startQuorum();
+
+        CountdownWatcher watcher = new CountdownWatcher();
+        ZooKeeper zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
+        watcher.waitForConnected(CONNECTION_TIMEOUT);
+        LOG.info("global session created 0x{}", Long.toHexString(zk.getSessionId()));
+
+        watcher.reset();
+        qu.shutdown(2);
+        try {
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            fail("Should not be able to renew a global session");
+        } catch (TimeoutException e) {
+        }
+        zk.close();
+
+        watcher.reset();
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
+        try {
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+            fail("Should not be able to create a global session");
+        } catch (TimeoutException e) {
+        }
+        zk.close();
+
+        qu.getPeer(1).peer.enableLocalSessions(true);
+        zk = new ZooKeeper(qu.getConnString(), CONNECTION_TIMEOUT, watcher, true);
+        try {
+            watcher.waitForConnected(CONNECTION_TIMEOUT);
+        } catch (TimeoutException e) {
+            fail("Should be able to create a local session");
+        }
+        zk.close();
+    }
+
     /**
      * Ensures that client seeks for r/w servers while it's connected to r/o
      * server.
@@ -238,6 +290,9 @@ public class ReadOnlyModeTest extends ZKTestCase {
     @Test
     @Timeout(value = 90)
     public void testSeekForRwServer() throws Exception {
+        qu.enableLocalSession(true);
+        qu.startQuorum();
+
         // setup the logger to capture all logs
         Layout layout = Logger.getRootLogger().getAppender("CONSOLE").getLayout();
         ByteArrayOutputStream os = new ByteArrayOutputStream();