Преглед изворни кода

ZOOKEEPER-4492: Merge readOnly field into ConnectRequest and Response

According to [this comment in ZOOKEEPER-102](https://issues.apache.org/jira/browse/ZOOKEEPER-102?focusedCommentId=16977000&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16977000) I introduce a `Protocol` abstraction and going to moving all wire protocol concept into `cnxn` and this scope, so that client and server's business logics handle only deserialized/real record.

cc eolivelli maoling Randgalt

This supersedes #1832.

Author: tison <wander4096@gmail.com>

Reviewers: Enrico Olivelli <eolivelli@apache.org>, Mate Szalay-Beko <symat@apache.org>

Closes #1837 from tisonkun/protocol
tison пре 2 година
родитељ
комит
de7c5869d3

+ 2 - 0
zookeeper-jute/src/main/resources/zookeeper.jute

@@ -65,12 +65,14 @@ module org.apache.zookeeper.proto {
         int timeOut;
         long sessionId;
         buffer passwd;
+        boolean readOnly;
     }
     class ConnectResponse {
         int protocolVersion;
         int timeOut;
         long sessionId;
         buffer passwd;
+        boolean readOnly;
     }
     class SetWatches {
         long relativeZxid;

+ 5 - 26
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -285,8 +285,6 @@ public class ClientCnxn {
 
         WatchRegistration watchRegistration;
 
-        public boolean readOnly;
-
         WatchDeregistration watchDeregistration;
 
         /** Convenience ctor */
@@ -295,23 +293,12 @@ public class ClientCnxn {
             ReplyHeader replyHeader,
             Record request,
             Record response,
-            WatchRegistration watchRegistration) {
-            this(requestHeader, replyHeader, request, response, watchRegistration, false);
-        }
-
-        Packet(
-            RequestHeader requestHeader,
-            ReplyHeader replyHeader,
-            Record request,
-            Record response,
-            WatchRegistration watchRegistration,
-            boolean readOnly) {
-
+            WatchRegistration watchRegistration
+        ) {
             this.requestHeader = requestHeader;
             this.replyHeader = replyHeader;
             this.request = request;
             this.response = response;
-            this.readOnly = readOnly;
             this.watchRegistration = watchRegistration;
         }
 
@@ -325,8 +312,6 @@ public class ClientCnxn {
                 }
                 if (request instanceof ConnectRequest) {
                     request.serialize(boa, "connect");
-                    // append "am-I-allowed-to-be-readonly" flag
-                    boa.writeBool(readOnly, "readOnly");
                 } else if (request != null) {
                     request.serialize(boa, "request");
                 }
@@ -1008,7 +993,7 @@ public class ClientCnxn {
                 clientCnxnSocket.getRemoteSocketAddress());
             isFirstConnect = false;
             long sessId = (seenRwServerBefore) ? sessionId : 0;
-            ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
+            ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd, readOnly);
             // We add backwards since we are pushing into the front
             // Only send if there's a pending watch
             if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
@@ -1088,7 +1073,7 @@ public class ClientCnxn {
                         null,
                         null));
             }
-            outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
+            outgoingQueue.addFirst(new Packet(null, null, conReq, null, null));
             clientCnxnSocket.connectionPrimed();
             LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
         }
@@ -1406,12 +1391,6 @@ public class ClientCnxn {
         /**
          * Callback invoked by the ClientCnxnSocket once a connection has been
          * established.
-         *
-         * @param _negotiatedSessionTimeout
-         * @param _sessionId
-         * @param _sessionPasswd
-         * @param isRO
-         * @throws IOException
          */
         void onConnected(
             int _negotiatedSessionTimeout,
@@ -1629,7 +1608,7 @@ public class ClientCnxn {
         ReplyHeader r = new ReplyHeader();
         r.setXid(xid);
 
-        Packet p = new Packet(h, r, request, response, null, false);
+        Packet p = new Packet(h, r, request, response, null);
         p.cb = cb;
         sendThread.sendPacket(p);
     }

+ 7 - 13
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java

@@ -31,6 +31,7 @@ import org.apache.zookeeper.ClientCnxn.Packet;
 import org.apache.zookeeper.client.ZKClientConfig;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.common.ZKConfig;
+import org.apache.zookeeper.compat.ProtocolManager;
 import org.apache.zookeeper.proto.ConnectResponse;
 import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.slf4j.Logger;
@@ -48,6 +49,8 @@ abstract class ClientCnxnSocket {
 
     private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocket.class);
 
+    private final ProtocolManager protocolManager = new ProtocolManager();
+
     protected boolean initialized;
 
     /**
@@ -131,27 +134,18 @@ abstract class ClientCnxnSocket {
             }
             buf.append("]");
             if (LOG.isTraceEnabled()) {
-                LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
+                LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf);
             }
         }
 
         ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
         BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
-        ConnectResponse conRsp = new ConnectResponse();
-        conRsp.deserialize(bbia, "connect");
-
-        // read "is read-only" flag
-        boolean isRO = false;
-        try {
-            isRO = bbia.readBool("readOnly");
-        } catch (IOException e) {
-            // this is ok -- just a packet from an old server which
-            // doesn't contain readOnly field
+        ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia);
+        if (protocolManager.isReadonlyAvailable()) {
             LOG.warn("Connected to an old server; r-o mode will be unavailable");
         }
-
         this.sessionId = conRsp.getSessionId();
-        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
+        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), conRsp.getReadOnly());
     }
 
     abstract boolean isConnected();

+ 121 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/compat/ProtocolManager.java

@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.compat;
+
+import java.io.IOException;
+import org.apache.jute.InputArchive;
+import org.apache.zookeeper.proto.ConnectRequest;
+import org.apache.zookeeper.proto.ConnectResponse;
+
+/**
+ * A manager for switching behaviours between difference wire protocol.
+ * <p>
+ * Basically, wire protocol should be backward and forward compatible between minor versions.
+ * However, there are several cases that it's different due to Jute's limitations.
+ */
+public final class ProtocolManager {
+    private volatile Boolean isReadonlyAvailable = null;
+
+    public boolean isReadonlyAvailable() {
+        return isReadonlyAvailable != null && isReadonlyAvailable;
+    }
+
+    /**
+     * Deserializing {@link ConnectRequest} should be specially handled for request from client
+     * version before and including ZooKeeper 3.3 which doesn't understand readOnly field.
+     */
+    public ConnectRequest deserializeConnectRequest(InputArchive inputArchive) throws IOException {
+        if (isReadonlyAvailable != null) {
+            if (isReadonlyAvailable) {
+                return deserializeConnectRequestWithReadonly(inputArchive);
+            } else {
+                return deserializeConnectRequestWithoutReadonly(inputArchive);
+            }
+        }
+
+        final ConnectRequest request = deserializeConnectRequestWithoutReadonly(inputArchive);
+        try {
+            request.setReadOnly(inputArchive.readBool("readOnly"));
+            this.isReadonlyAvailable = true;
+        } catch (Exception e) {
+            request.setReadOnly(false); // old version doesn't have readonly concept
+            this.isReadonlyAvailable = false;
+        }
+        return request;
+    }
+
+    private ConnectRequest deserializeConnectRequestWithReadonly(InputArchive inputArchive) throws IOException {
+        final ConnectRequest request = new ConnectRequest();
+        request.deserialize(inputArchive, "connect");
+        return request;
+    }
+
+    private ConnectRequest deserializeConnectRequestWithoutReadonly(InputArchive inputArchive) throws IOException {
+        final ConnectRequest request = new ConnectRequest();
+        inputArchive.startRecord("connect");
+        request.setProtocolVersion(inputArchive.readInt("protocolVersion"));
+        request.setLastZxidSeen(inputArchive.readLong("lastZxidSeen"));
+        request.setTimeOut(inputArchive.readInt("timeOut"));
+        request.setSessionId(inputArchive.readLong("sessionId"));
+        request.setPasswd(inputArchive.readBuffer("passwd"));
+        inputArchive.endRecord("connect");
+        return request;
+    }
+
+    /**
+     * Deserializing {@link ConnectResponse} should be specially handled for response from server
+     * version before and including ZooKeeper 3.3 which doesn't understand readOnly field.
+     */
+    public ConnectResponse deserializeConnectResponse(InputArchive inputArchive) throws IOException {
+        if (isReadonlyAvailable != null) {
+            if (isReadonlyAvailable) {
+                return deserializeConnectResponseWithReadonly(inputArchive);
+            } else {
+                return deserializeConnectResponseWithoutReadonly(inputArchive);
+            }
+        }
+
+        final ConnectResponse response = deserializeConnectResponseWithoutReadonly(inputArchive);
+        try {
+            response.setReadOnly(inputArchive.readBool("readOnly"));
+            this.isReadonlyAvailable = true;
+        } catch (Exception e) {
+            response.setReadOnly(false); // old version doesn't have readonly concept
+            this.isReadonlyAvailable = false;
+        }
+        return response;
+    }
+
+    private ConnectResponse deserializeConnectResponseWithReadonly(InputArchive inputArchive) throws IOException {
+        final ConnectResponse response = new ConnectResponse();
+        response.deserialize(inputArchive, "connect");
+        return response;
+    }
+
+    private ConnectResponse deserializeConnectResponseWithoutReadonly(InputArchive inputArchive) throws IOException {
+        final ConnectResponse response = new ConnectResponse();
+        inputArchive.startRecord("connect");
+        response.setProtocolVersion(inputArchive.readInt("protocolVersion"));
+        response.setTimeOut(inputArchive.readInt("timeOut"));
+        response.setSessionId(inputArchive.readLong("sessionId"));
+        response.setPasswd(inputArchive.readBuffer("passwd"));
+        inputArchive.endRecord("connect");
+        return response;
+    }
+}

+ 4 - 10
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerCnxn.java

@@ -41,6 +41,7 @@ import org.apache.zookeeper.Quotas;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.ZooDefs.OpCode;
+import org.apache.zookeeper.compat.ProtocolManager;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.metrics.Counter;
@@ -60,16 +61,9 @@ public abstract class ServerCnxn implements Stats, Watcher {
     public static final Object me = new Object();
     private static final Logger LOG = LoggerFactory.getLogger(ServerCnxn.class);
 
-    private Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<Id, Boolean>());
-
-    /**
-     * If the client is of old version, we don't send r-o mode info to it.
-     * The reason is that if we would, old C client doesn't read it, which
-     * results in TCP RST packet, i.e. "connection reset by peer".
-     */
-    boolean isOldClient = true;
-
-    AtomicLong outstandingCount = new AtomicLong();
+    public final ProtocolManager protocolManager = new ProtocolManager();
+    private final Set<Id> authInfo = Collections.newSetFromMap(new ConcurrentHashMap<>());
+    private final AtomicLong outstandingCount = new AtomicLong();
 
     /** The ZooKeeperServer for this connection. May be null if the server
      * is not currently serving requests (for example if the server is not

+ 6 - 15
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -1073,14 +1073,12 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                 valid ? cnxn.getSessionTimeout() : 0,
                 valid ? cnxn.getSessionId() : 0, // send 0 if session is no
                 // longer valid
-                valid ? generatePasswd(cnxn.getSessionId()) : new byte[16]);
+                valid ? generatePasswd(cnxn.getSessionId()) : new byte[16],
+                this instanceof ReadOnlyZooKeeperServer);
             ByteArrayOutputStream baos = new ByteArrayOutputStream();
             BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
             bos.writeInt(-1, "len");
             rsp.serialize(bos, "connect");
-            if (!cnxn.isOldClient) {
-                bos.writeBool(this instanceof ReadOnlyZooKeeperServer, "readOnly");
-            }
             baos.close();
             ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
             bb.putInt(bb.remaining() - 4).rewind();
@@ -1381,8 +1379,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         throws IOException, ClientCnxnLimitException {
 
         BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
-        ConnectRequest connReq = new ConnectRequest();
-        connReq.deserialize(bia, "connect");
+        ConnectRequest connReq = cnxn.protocolManager.deserializeConnectRequest(bia);
         LOG.debug(
             "Session establishment request from client {} client's lastZxid is 0x{}",
             cnxn.getRemoteSocketAddress(),
@@ -1406,21 +1403,15 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             throw new ClientCnxnLimitException();
         }
         ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
-
         ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
 
-        boolean readOnly = false;
-        try {
-            readOnly = bia.readBool("readOnly");
-            cnxn.isOldClient = false;
-        } catch (IOException e) {
-            // this is ok -- just a packet from an old client which
-            // doesn't contain readOnly field
+        if (cnxn.protocolManager.isReadonlyAvailable()) {
             LOG.warn(
                 "Connection request from old client {}; will be dropped if server is in r-o mode",
                 cnxn.getRemoteSocketAddress());
         }
-        if (!readOnly && this instanceof ReadOnlyZooKeeperServer) {
+
+        if (!connReq.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
             String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
             LOG.info(msg);
             throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);

+ 0 - 10
zookeeper-server/src/test/java/org/apache/zookeeper/MockPacket.java

@@ -35,16 +35,6 @@ public class MockPacket extends ClientCnxn.Packet {
         super(requestHeader, replyHeader, request, response, watchRegistration);
     }
 
-    public MockPacket(
-        RequestHeader requestHeader,
-        ReplyHeader replyHeader,
-        Record request,
-        Record response,
-        WatchRegistration watchRegistration,
-        boolean readOnly) {
-        super(requestHeader, replyHeader, request, response, watchRegistration, readOnly);
-    }
-
     public ByteBuffer createAndReturnBB() {
         createBB();
         return this.bb;

+ 0 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java

@@ -48,7 +48,6 @@ public class ZooKeeperServerCreationTest {
         zks.setZKDatabase(new ZKDatabase(fileTxnSnapLog));
         zks.createSessionTracker();
 
-        ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
         ServerCnxn cnxn = new MockServerCnxn();
 
         ConnectRequest connReq = new ConnectRequest();

+ 3 - 4
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java

@@ -162,10 +162,9 @@ public class ZooKeeperServerTest extends ZKTestCase {
         output.put((byte) 1);
         output.flip();
 
-        ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> {
-            final NIOServerCnxn nioServerCnxn = mock(NIOServerCnxn.class);
-            zooKeeperServer.processConnectRequest(nioServerCnxn, output);
-        });
+        ServerCnxn.CloseRequestException e = assertThrows(
+                ServerCnxn.CloseRequestException.class,
+                () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), output));
         assertEquals(e.getReason(), ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
     }
 

+ 2 - 3
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java

@@ -22,7 +22,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 import java.nio.ByteBuffer;
-import org.apache.zookeeper.server.NIOServerCnxn;
+import org.apache.zookeeper.server.MockServerCnxn;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.ZooKeeperServer;
@@ -55,8 +55,7 @@ public class ReadOnlyZooKeeperServerTest {
         output.flip();
 
         ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> {
-            final NIOServerCnxn nioServerCnxn = mock(NIOServerCnxn.class);
-            readOnlyZooKeeperServer.processConnectRequest(nioServerCnxn, output);
+            readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), output);
         });
         assertEquals(e.getReason(), ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
     }

+ 2 - 2
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java

@@ -255,8 +255,8 @@ public class WatchLeakTest {
         Random r = new Random(SESSION_ID ^ superSecret);
         byte[] p = new byte[16];
         r.nextBytes(p);
-        ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p);
-        MockPacket packet = new MockPacket(null, null, conReq, null, null, false);
+        ConnectRequest conReq = new ConnectRequest(0, 1L, 30000, SESSION_ID, p, false);
+        MockPacket packet = new MockPacket(null, null, conReq, null, null);
         return packet.createAndReturnBB();
     }
 

+ 2 - 12
zookeeper-server/src/test/java/org/apache/zookeeper/test/MaxCnxnsTest.java

@@ -51,18 +51,16 @@ public class MaxCnxnsTest extends ClientBase {
         }
 
         public void run() {
-            SocketChannel sChannel = null;
-            try {
+            try (SocketChannel sChannel = SocketChannel.open()) {
                 /*
                  * For future unwary socket programmers: although connect 'blocks' it
                  * does not require an accept on the server side to return. Therefore
                  * you can not assume that all the sockets are connected at the end of
                  * this for loop.
                  */
-                sChannel = SocketChannel.open();
                 sChannel.connect(new InetSocketAddress(host, port));
                 // Construct a connection request
-                ConnectRequest conReq = new ConnectRequest(0, 0, 10000, 0, "password".getBytes());
+                ConnectRequest conReq = new ConnectRequest(0, 0, 10000, 0, "password".getBytes(), false);
                 ByteArrayOutputStream baos = new ByteArrayOutputStream();
                 BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                 boa.writeInt(-1, "len");
@@ -95,14 +93,6 @@ public class MaxCnxnsTest extends ClientBase {
                 }
             } catch (IOException io) {
                 // "Connection reset by peer"
-            } finally {
-                if (sChannel != null) {
-                    try {
-                        sChannel.close();
-                    } catch (Exception e) {
-                        // Do nothing
-                    }
-                }
             }
         }
 

+ 1 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/test/SessionInvalidationTest.java

@@ -53,7 +53,7 @@ public class SessionInvalidationTest extends ClientBase {
 
             // open a connection
             boa.writeInt(44, "len");
-            ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16]);
+            ConnectRequest conReq = new ConnectRequest(0, 0, 30000, 0, new byte[16], false);
             conReq.serialize(boa, "connect");
 
             // close connection