Browse Source

ZOOKEEPER-3570: make the special client xid constant

- more details in the [ZOOKEEPER-3570](https://issues.apache.org/jira/browse/ZOOKEEPER-3570)

Author: maoling <maoling199210191@sina.com>

Reviewers: Fangmin Lyu <fangmin@apache.org>, Brian Nixon <enixon@apache.org>

Closes #1136 from maoling/ZOOKEEPER-3570
maoling 5 years ago
parent
commit
ae68c7d50e

+ 17 - 10
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxn.java

@@ -116,6 +116,16 @@ public class ClientCnxn {
      */
      */
     private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
     private static final int SET_WATCHES_MAX_LENGTH = 128 * 1024;
 
 
+    /* predefined xid's values recognized as special by the server */
+    // -1 means notification(WATCHER_EVENT)
+    public static final int NOTIFICATION_XID = -1;
+    // -2 is the xid for pings
+    public static final int PING_XID = -2;
+    // -4 is the xid for AuthPacket
+    public static final int AUTHPACKET_XID = -4;
+    // -8 is the xid for setWatch
+    public static final int SET_WATCHES_XID = -8;
+
     static class AuthData {
     static class AuthData {
 
 
         AuthData(String scheme, byte[] data) {
         AuthData(String scheme, byte[] data) {
@@ -857,16 +867,14 @@ public class ClientCnxn {
             ReplyHeader replyHdr = new ReplyHeader();
             ReplyHeader replyHdr = new ReplyHeader();
 
 
             replyHdr.deserialize(bbia, "header");
             replyHdr.deserialize(bbia, "header");
-            if (replyHdr.getXid() == -2) {
-                // -2 is the xid for pings
+            if (replyHdr.getXid() == PING_XID) {
                 LOG.debug(
                 LOG.debug(
                     "Got ping response for session id: 0x{} after {}ms.",
                     "Got ping response for session id: 0x{} after {}ms.",
                     Long.toHexString(sessionId),
                     Long.toHexString(sessionId),
                     ((System.nanoTime() - lastPingSentNs) / 1000000));
                     ((System.nanoTime() - lastPingSentNs) / 1000000));
                 return;
                 return;
             }
             }
-            if (replyHdr.getXid() == -4) {
-                // -4 is the xid for AuthPacket
+            if (replyHdr.getXid() == AUTHPACKET_XID) {
                 if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                 if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                     state = States.AUTH_FAILED;
                     state = States.AUTH_FAILED;
                     eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
                     eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null));
@@ -875,8 +883,7 @@ public class ClientCnxn {
                 LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
                 LOG.debug("Got auth session id: 0x{}", Long.toHexString(sessionId));
                 return;
                 return;
             }
             }
-            if (replyHdr.getXid() == -1) {
-                // -1 means notification
+            if (replyHdr.getXid() == NOTIFICATION_XID) {
                 LOG.debug("Got notification session id: 0x{}", Long.toHexString(sessionId));
                 LOG.debug("Got notification session id: 0x{}", Long.toHexString(sessionId));
                 WatcherEvent event = new WatcherEvent();
                 WatcherEvent event = new WatcherEvent();
                 event.deserialize(bbia, "response");
                 event.deserialize(bbia, "response");
@@ -1048,7 +1055,7 @@ public class ClientCnxn {
                                     childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
                                     childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
                             opcode = OpCode.setWatches2;
                             opcode = OpCode.setWatches2;
                         }
                         }
-                        RequestHeader header = new RequestHeader(-8, opcode);
+                        RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
                         Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                         Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                         outgoingQueue.addFirst(packet);
                         outgoingQueue.addFirst(packet);
                     }
                     }
@@ -1058,7 +1065,7 @@ public class ClientCnxn {
             for (AuthData id : authInfo) {
             for (AuthData id : authInfo) {
                 outgoingQueue.addFirst(
                 outgoingQueue.addFirst(
                     new Packet(
                     new Packet(
-                        new RequestHeader(-4, OpCode.auth),
+                        new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
                         null,
                         null,
                         new AuthPacket(0, id.scheme, id.data),
                         new AuthPacket(0, id.scheme, id.data),
                         null,
                         null,
@@ -1088,7 +1095,7 @@ public class ClientCnxn {
 
 
         private void sendPing() {
         private void sendPing() {
             lastPingSentNs = System.nanoTime();
             lastPingSentNs = System.nanoTime();
-            RequestHeader h = new RequestHeader(-2, OpCode.ping);
+            RequestHeader h = new RequestHeader(ClientCnxn.PING_XID, OpCode.ping);
             queuePacket(h, null, null, null, null, null, null, null, null);
             queuePacket(h, null, null, null, null, null, null, null, null);
         }
         }
 
 
@@ -1657,7 +1664,7 @@ public class ClientCnxn {
         }
         }
         authInfo.add(new AuthData(scheme, auth));
         authInfo.add(new AuthData(scheme, auth));
         queuePacket(
         queuePacket(
-            new RequestHeader(-4, OpCode.auth),
+            new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
             null,
             null,
             new AuthPacket(0, scheme, auth),
             new AuthPacket(0, scheme, auth),
             null,
             null,

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -27,6 +27,7 @@ import java.util.Locale;
 import java.util.Set;
 import java.util.Set;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
+import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.Code;
 import org.apache.zookeeper.KeeperException.SessionMovedException;
 import org.apache.zookeeper.KeeperException.SessionMovedException;
@@ -200,7 +201,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 lastOp = "PING";
                 lastOp = "PING";
                 updateStats(request, lastOp, lastZxid);
                 updateStats(request, lastOp, lastZxid);
 
 
-                cnxn.sendResponse(new ReplyHeader(-2, lastZxid, 0), null, "response");
+                cnxn.sendResponse(new ReplyHeader(ClientCnxn.PING_XID, lastZxid, 0), null, "response");
                 return;
                 return;
             }
             }
             case OpCode.createSession: {
             case OpCode.createSession: {

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -34,6 +34,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
+import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Id;
@@ -681,7 +682,7 @@ public class NIOServerCnxn extends ServerCnxn {
      */
      */
     @Override
     @Override
     public void process(WatchedEvent event) {
     public void process(WatchedEvent event) {
-        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
+        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
         if (LOG.isTraceEnabled()) {
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(
             ZooTrace.logTraceMessage(
                 LOG,
                 LOG,

+ 2 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -40,6 +40,7 @@ import java.util.Arrays;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.Record;
 import org.apache.jute.Record;
+import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
 import org.apache.zookeeper.data.Stat;
@@ -147,7 +148,7 @@ public class NettyServerCnxn extends ServerCnxn {
 
 
     @Override
     @Override
     public void process(WatchedEvent event) {
     public void process(WatchedEvent event) {
-        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
+        ReplyHeader h = new ReplyHeader(ClientCnxn.NOTIFICATION_XID, -1L, 0);
         if (LOG.isTraceEnabled()) {
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(
             ZooTrace.logTraceMessage(
                 LOG,
                 LOG,

+ 2 - 1
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/WatchLeakTest.java

@@ -43,6 +43,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.Random;
 import org.apache.jute.InputArchive;
 import org.apache.jute.InputArchive;
 import org.apache.jute.OutputArchive;
 import org.apache.jute.OutputArchive;
+import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.MockPacket;
 import org.apache.zookeeper.MockPacket;
 import org.apache.zookeeper.ZKParameterized;
 import org.apache.zookeeper.ZKParameterized;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooDefs;
@@ -251,7 +252,7 @@ public class WatchLeakTest {
         SetWatches sw = new SetWatches(1L, dataWatches, existWatches, childWatches);
         SetWatches sw = new SetWatches(1L, dataWatches, existWatches, childWatches);
         RequestHeader h = new RequestHeader();
         RequestHeader h = new RequestHeader();
         h.setType(ZooDefs.OpCode.setWatches);
         h.setType(ZooDefs.OpCode.setWatches);
-        h.setXid(-8);
+        h.setXid(ClientCnxn.SET_WATCHES_XID);
         MockPacket p = new MockPacket(h, new ReplyHeader(), sw, null, null);
         MockPacket p = new MockPacket(h, new ReplyHeader(), sw, null, null);
         return p.createAndReturnBB();
         return p.createAndReturnBB();
     }
     }