소스 검색

ZOOKEEPER-4573: Encapsulate request bytebuffer in Request

This patch is based on #1903.

This closes #1903.

Author: tison <wander4096@gmail.com>

Reviewers: Andor Molnar <andor@apache.org>, Mate Szalay-Beko <symat@apache.org>

Closes #1904 from tisonkun/encapsulate-request-bytebuffer
tison 2 년 전
부모
커밋
a7e4dea7ab
18개의 변경된 파일156개의 추가작업 그리고 166개의 파일을 삭제
  1. 9 9
      zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java
  2. 1 1
      zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java
  3. 1 3
      zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java
  4. 4 3
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java
  5. 9 3
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java
  6. 21 24
      zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java
  7. 5 2
      zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java
  8. 4 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
  9. 18 24
      zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java
  10. 24 1
      zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java
  11. 21 28
      zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java
  12. 3 7
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java
  13. 2 7
      zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java
  14. 5 9
      zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java
  15. 1 7
      zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java
  16. 9 13
      zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java
  17. 17 19
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java
  18. 2 5
      zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java

+ 9 - 9
zookeeper-jute/src/main/java/org/apache/jute/BinaryInputArchive.java

@@ -47,27 +47,27 @@ public class BinaryInputArchive implements InputArchive {
         }
     }
 
-    private DataInput in;
-    private int maxBufferSize;
-    private int extraMaxBufferSize;
+    private final DataInput in;
+    private final int maxBufferSize;
+    private final int extraMaxBufferSize;
 
-    public static BinaryInputArchive getArchive(InputStream strm) {
-        return new BinaryInputArchive(new DataInputStream(strm));
+    public static BinaryInputArchive getArchive(InputStream stream) {
+        return new BinaryInputArchive(new DataInputStream(stream));
     }
 
     private static class BinaryIndex implements Index {
-        private int nelems;
+        private int n;
 
         BinaryIndex(int nelems) {
-            this.nelems = nelems;
+            this.n = nelems;
         }
 
         public boolean done() {
-            return (nelems <= 0);
+            return (n <= 0);
         }
 
         public void incr() {
-            nelems--;
+            n--;
         }
     }
 

+ 1 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java

@@ -141,7 +141,7 @@ abstract class ClientCnxnSocket {
         ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
         BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
         ConnectResponse conRsp = protocolManager.deserializeConnectResponse(bbia);
-        if (protocolManager.isReadonlyAvailable()) {
+        if (!protocolManager.isReadonlyAvailable()) {
             LOG.warn("Connected to an old server; r-o mode will be unavailable");
         }
         this.sessionId = conRsp.getSessionId();

+ 1 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/audit/AuditHelper.java

@@ -32,7 +32,6 @@ import org.apache.zookeeper.proto.CreateRequest;
 import org.apache.zookeeper.proto.DeleteRequest;
 import org.apache.zookeeper.proto.SetACLRequest;
 import org.apache.zookeeper.proto.SetDataRequest;
-import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
 import org.apache.zookeeper.server.Request;
 import org.slf4j.Logger;
@@ -127,8 +126,7 @@ public final class AuditHelper {
     }
 
     private static void deserialize(Request request, Record record) throws IOException {
-        request.request.rewind();
-        ByteBufferInputStream.byteBuffer2Record(request.request.slice(), record);
+        request.readRequestRecord(record);
     }
 
     private static Result getResult(ProcessTxnResult rc, boolean failedTxn) {

+ 4 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferInputStream.java

@@ -21,12 +21,13 @@ package org.apache.zookeeper.server;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.Record;
 
 public class ByteBufferInputStream extends InputStream {
 
-    ByteBuffer bb;
+    private final ByteBuffer bb;
 
     public ByteBufferInputStream(ByteBuffer bb) {
         this.bb = bb;
@@ -46,7 +47,7 @@ public class ByteBufferInputStream extends InputStream {
     }
 
     @Override
-    public int read(byte[] b, int off, int len) throws IOException {
+    public int read(@Nonnull byte[] b, int off, int len) throws IOException {
         if (bb.remaining() == 0) {
             return -1;
         }
@@ -58,7 +59,7 @@ public class ByteBufferInputStream extends InputStream {
     }
 
     @Override
-    public int read(byte[] b) throws IOException {
+    public int read(@Nonnull byte[] b) throws IOException {
         return read(b, 0, b.length);
     }
 

+ 9 - 3
zookeeper-server/src/main/java/org/apache/zookeeper/server/ByteBufferOutputStream.java

@@ -21,27 +21,33 @@ package org.apache.zookeeper.server;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import javax.annotation.Nonnull;
 import org.apache.jute.BinaryOutputArchive;
 import org.apache.jute.Record;
 
 public class ByteBufferOutputStream extends OutputStream {
 
-    ByteBuffer bb;
+    private final ByteBuffer bb;
+
     public ByteBufferOutputStream(ByteBuffer bb) {
         this.bb = bb;
     }
+
     @Override
     public void write(int b) throws IOException {
         bb.put((byte) b);
     }
+
     @Override
-    public void write(byte[] b) throws IOException {
+    public void write(@Nonnull byte[] b) throws IOException {
         bb.put(b);
     }
+
     @Override
-    public void write(byte[] b, int off, int len) throws IOException {
+    public void write(@Nonnull byte[] b, int off, int len) throws IOException {
         bb.put(b, off, len);
     }
+
     public static void record2ByteBuffer(Record record, ByteBuffer bb) throws IOException {
         BinaryOutputArchive oa;
         oa = BinaryOutputArchive.getArchive(new ByteBufferOutputStream(bb));

+ 21 - 24
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -20,7 +20,6 @@ package org.apache.zookeeper.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -271,7 +270,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.multiRead: {
                 lastOp = "MLTR";
                 MultiOperationRecord multiReadRecord = new MultiOperationRecord();
-                ByteBufferInputStream.byteBuffer2Record(request.request, multiReadRecord);
+                request.readRequestRecord(multiReadRecord);
                 rsp = new MultiResponse();
                 OpResult subResult;
                 for (Op readOp : multiReadRecord) {
@@ -350,7 +349,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.sync: {
                 lastOp = "SYNC";
                 SyncRequest syncRequest = new SyncRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, syncRequest);
+                request.readRequestRecord(syncRequest);
                 rsp = new SyncResponse(syncRequest.getPath());
                 requestPathMetricsCollector.registerRequest(request.type, syncRequest.getPath());
                 break;
@@ -365,7 +364,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 lastOp = "EXIS";
                 // TODO we need to figure out the security requirement for this!
                 ExistsRequest existsRequest = new ExistsRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
+                request.readRequestRecord(existsRequest);
                 path = existsRequest.getPath();
                 if (path.indexOf('\0') != -1) {
                     throw new KeeperException.BadArgumentsException();
@@ -378,7 +377,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.getData: {
                 lastOp = "GETD";
                 GetDataRequest getDataRequest = new GetDataRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, getDataRequest);
+                request.readRequestRecord(getDataRequest);
                 path = getDataRequest.getPath();
                 rsp = handleGetDataRequest(getDataRequest, cnxn, request.authInfo);
                 requestPathMetricsCollector.registerRequest(request.type, path);
@@ -387,9 +386,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.setWatches: {
                 lastOp = "SETW";
                 SetWatches setWatches = new SetWatches();
-                // TODO we really should not need this
-                request.request.rewind();
-                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
+                request.readRequestRecord(setWatches);
                 long relativeZxid = setWatches.getRelativeZxid();
                 zks.getZKDatabase()
                    .setWatches(
@@ -405,9 +402,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.setWatches2: {
                 lastOp = "STW2";
                 SetWatches2 setWatches = new SetWatches2();
-                // TODO we really should not need this
-                request.request.rewind();
-                ByteBufferInputStream.byteBuffer2Record(request.request, setWatches);
+                request.readRequestRecord(setWatches);
                 long relativeZxid = setWatches.getRelativeZxid();
                 zks.getZKDatabase().setWatches(relativeZxid,
                         setWatches.getDataWatches(),
@@ -421,8 +416,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.addWatch: {
                 lastOp = "ADDW";
                 AddWatchRequest addWatcherRequest = new AddWatchRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request,
-                        addWatcherRequest);
+                request.readRequestRecord(addWatcherRequest);
                 zks.getZKDatabase().addWatch(addWatcherRequest.getPath(), cnxn, addWatcherRequest.getMode());
                 rsp = new ErrorResponse(0);
                 break;
@@ -430,7 +424,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.getACL: {
                 lastOp = "GETA";
                 GetACLRequest getACLRequest = new GetACLRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, getACLRequest);
+                request.readRequestRecord(getACLRequest);
                 path = getACLRequest.getPath();
                 DataNode n = zks.getZKDatabase().getNode(path);
                 if (n == null) {
@@ -473,7 +467,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.getChildren: {
                 lastOp = "GETC";
                 GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, getChildrenRequest);
+                request.readRequestRecord(getChildrenRequest);
                 path = getChildrenRequest.getPath();
                 rsp = handleGetChildrenRequest(getChildrenRequest, cnxn, request.authInfo);
                 requestPathMetricsCollector.registerRequest(request.type, path);
@@ -482,7 +476,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.getAllChildrenNumber: {
                 lastOp = "GETACN";
                 GetAllChildrenNumberRequest getAllChildrenNumberRequest = new GetAllChildrenNumberRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, getAllChildrenNumberRequest);
+                request.readRequestRecord(getAllChildrenNumberRequest);
                 path = getAllChildrenNumberRequest.getPath();
                 DataNode n = zks.getZKDatabase().getNode(path);
                 if (n == null) {
@@ -502,7 +496,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.getChildren2: {
                 lastOp = "GETC";
                 GetChildren2Request getChildren2Request = new GetChildren2Request();
-                ByteBufferInputStream.byteBuffer2Record(request.request, getChildren2Request);
+                request.readRequestRecord(getChildren2Request);
                 Stat stat = new Stat();
                 path = getChildren2Request.getPath();
                 DataNode n = zks.getZKDatabase().getNode(path);
@@ -524,7 +518,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.checkWatches: {
                 lastOp = "CHKW";
                 CheckWatchesRequest checkWatches = new CheckWatchesRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, checkWatches);
+                request.readRequestRecord(checkWatches);
                 WatcherType type = WatcherType.fromInt(checkWatches.getType());
                 path = checkWatches.getPath();
                 boolean containsWatcher = zks.getZKDatabase().containsWatcher(path, type, cnxn);
@@ -538,7 +532,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.removeWatches: {
                 lastOp = "REMW";
                 RemoveWatchesRequest removeWatches = new RemoveWatchesRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, removeWatches);
+                request.readRequestRecord(removeWatches);
                 WatcherType type = WatcherType.fromInt(removeWatches.getType());
                 path = removeWatches.getPath();
                 boolean removed = zks.getZKDatabase().removeWatch(path, type, cnxn);
@@ -557,7 +551,7 @@ public class FinalRequestProcessor implements RequestProcessor {
             case OpCode.getEphemerals: {
                 lastOp = "GETE";
                 GetEphemeralsRequest getEphemerals = new GetEphemeralsRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, getEphemerals);
+                request.readRequestRecord(getEphemerals);
                 String prefixPath = getEphemerals.getPrefixPath();
                 Set<String> allEphems = zks.getZKDatabase().getDataTree().getEphemerals(request.sessionId);
                 List<String> ephemerals = new ArrayList<>();
@@ -592,10 +586,13 @@ public class FinalRequestProcessor implements RequestProcessor {
             // error to the user
             LOG.error("Failed to process {}", request, e);
             StringBuilder sb = new StringBuilder();
-            ByteBuffer bb = request.request;
-            bb.rewind();
-            while (bb.hasRemaining()) {
-                sb.append(String.format("%02x", (0xff & bb.get())));
+            byte[] payload = request.readRequestBytes();
+            if (payload != null) {
+                for (byte b : payload) {
+                    sb.append(String.format("%02x", (0xff & b)));
+                }
+            } else {
+                sb.append("request buffer is null");
             }
             LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
             err = Code.MARSHALLINGERROR;

+ 5 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -40,6 +40,7 @@ import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.NIOServerCnxnFactory.SelectorThread;
@@ -427,11 +428,13 @@ public class NIOServerCnxn extends ServerCnxn {
         }
     }
 
-    private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException {
+    private void readConnectRequest() throws IOException, ClientCnxnLimitException {
         if (!isZKServerRunning()) {
             throw new IOException("ZooKeeperServer not running");
         }
-        zkServer.processConnectRequest(this, incomingBuffer);
+        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
+        ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
+        zkServer.processConnectRequest(this, request);
         initialized = true;
     }
 

+ 4 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -45,6 +45,7 @@ import org.apache.zookeeper.ClientCnxn;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.Stat;
+import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.proto.ReplyHeader;
 import org.apache.zookeeper.proto.WatcherEvent;
 import org.apache.zookeeper.server.command.CommandExecutor;
@@ -482,7 +483,9 @@ public class NettyServerCnxn extends ServerCnxn {
                             zks.processPacket(this, bb);
                         } else {
                             LOG.debug("got conn req request from {}", getRemoteSocketAddress());
-                            zks.processConnectRequest(this, bb);
+                            BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(bb));
+                            ConnectRequest request = protocolManager.deserializeConnectRequest(bia);
+                            zks.processConnectRequest(this, request);
                             initialized = true;
                         }
                         bb = null;

+ 18 - 24
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -332,7 +332,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             break;
         }
         case OpCode.deleteContainer: {
-            String path = new String(request.request.array(), UTF_8);
+            String path = new String(request.readRequestBytes(), UTF_8);
             String parentPath = getParentPathAndValidate(path);
             ChangeRecord nodeRecord = getRecordForPath(path);
             if (nodeRecord.childCount > 0) {
@@ -360,7 +360,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             DeleteRequest deleteRequest = (DeleteRequest) record;
             if (deserialize) {
-                ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
+                request.readRequestRecord(deleteRequest);
             }
             String path = deleteRequest.getPath();
             String parentPath = getParentPathAndValidate(path);
@@ -388,7 +388,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             SetDataRequest setDataRequest = (SetDataRequest) record;
             if (deserialize) {
-                ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
+                request.readRequestRecord(setDataRequest);
             }
             path = setDataRequest.getPath();
             validatePath(path, request.sessionId);
@@ -560,7 +560,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             SetACLRequest setAclRequest = (SetACLRequest) record;
             if (deserialize) {
-                ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
+                request.readRequestRecord(setAclRequest);
             }
             path = setAclRequest.getPath();
             validatePath(path, request.sessionId);
@@ -577,12 +577,11 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             addChangeRecord(nodeRecord);
             break;
         case OpCode.createSession:
-            request.request.rewind();
-            int to = request.request.getInt();
-            request.setTxn(new CreateSessionTxn(to));
-            request.request.rewind();
+            CreateSessionTxn createSessionTxn = new CreateSessionTxn();
+            request.readRequestRecord(createSessionTxn);
+            request.setTxn(createSessionTxn);
             // only add the global session tracker but not to ZKDb
-            zks.sessionTracker.trackSession(request.sessionId, to);
+            zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
             zks.setOwner(request.sessionId, request.getOwner());
             break;
         case OpCode.closeSession:
@@ -632,7 +631,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
             CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
             if (deserialize) {
-                ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
+                request.readRequestRecord(checkVersionRequest);
             }
             path = checkVersionRequest.getPath();
             validatePath(path, request.sessionId);
@@ -656,7 +655,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
 
     private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
         if (deserialize) {
-            ByteBufferInputStream.byteBuffer2Record(request.request, record);
+            request.readRequestRecord(record);
         }
 
         int flags;
@@ -786,10 +785,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
 
     /**
      * This method is a helper to pRequest method
-     *
-     * @param request
      */
-    private void pRequestHelper(Request request) throws RequestProcessorException {
+    private void pRequestHelper(Request request) {
         try {
             switch (request.type) {
             case OpCode.createContainer:
@@ -813,7 +810,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
                 break;
             case OpCode.reconfig:
                 ReconfigRequest reconfigRequest = new ReconfigRequest();
-                ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
+                request.readRequestRecord(reconfigRequest);
                 pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
                 break;
             case OpCode.setACL:
@@ -827,12 +824,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             case OpCode.multi:
                 MultiOperationRecord multiRequest = new MultiOperationRecord();
                 try {
-                    ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
+                    request.readRequestRecord(multiRequest);
                 } catch (IOException e) {
                     request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
                     throw e;
                 }
-                List<Txn> txns = new ArrayList<Txn>();
+                List<Txn> txns = new ArrayList<>();
                 //Each op in a multi-op must have the same zxid!
                 long zxid = zks.getNextZxid();
                 KeeperException ke = null;
@@ -947,18 +944,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
             // log at error level as we are returning a marshalling
             // error to the user
             LOG.error("Failed to process {}", request, e);
-
             StringBuilder sb = new StringBuilder();
-            ByteBuffer bb = request.request;
-            if (bb != null) {
-                bb.rewind();
-                while (bb.hasRemaining()) {
-                    sb.append(String.format("%02x", (0xff & bb.get())));
+            byte[] payload = request.readRequestBytes();
+            if (payload != null) {
+                for (byte b : payload) {
+                    sb.append(String.format("%02x", (0xff & b)));
                 }
             } else {
                 sb.append("request buffer is null");
             }
-
             LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
             if (request.getHdr() != null) {
                 request.getHdr().setType(OpCode.error);

+ 24 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -19,6 +19,7 @@
 package org.apache.zookeeper.server;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 import org.apache.jute.Record;
@@ -78,7 +79,29 @@ public class Request {
 
     public final int type;
 
-    public final ByteBuffer request;
+    private final ByteBuffer request;
+
+    public void readRequestRecord(Record record) throws IOException {
+        if (request != null) {
+            request.rewind();
+            ByteBufferInputStream.byteBuffer2Record(request, record);
+            request.rewind();
+            return;
+        }
+        throw new IOException(new NullPointerException("request"));
+    }
+
+    public byte[] readRequestBytes() {
+        if (request != null) {
+            request.rewind();
+            int len = request.remaining();
+            byte[] b = new byte[len];
+            request.get(b);
+            request.rewind();
+            return b;
+        }
+        return null;
+    }
 
     public final ServerCnxn cnxn;
 

+ 21 - 28
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -1375,17 +1375,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
     }
 
     @SuppressFBWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "the value won't change after startup")
-    public void processConnectRequest(ServerCnxn cnxn, ByteBuffer incomingBuffer)
-        throws IOException, ClientCnxnLimitException {
-
-        BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
-        ConnectRequest connReq = cnxn.protocolManager.deserializeConnectRequest(bia);
+    public void processConnectRequest(ServerCnxn cnxn, ConnectRequest request) throws IOException, ClientCnxnLimitException {
         LOG.debug(
             "Session establishment request from client {} client's lastZxid is 0x{}",
             cnxn.getRemoteSocketAddress(),
-            Long.toHexString(connReq.getLastZxidSeen()));
+            Long.toHexString(request.getLastZxidSeen()));
 
-        long sessionId = connReq.getSessionId();
+        long sessionId = request.getSessionId();
         int tokensNeeded = 1;
         if (connThrottle.isConnectionWeightEnabled()) {
             if (sessionId == 0) {
@@ -1405,22 +1401,22 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         ServerMetrics.getMetrics().CONNECTION_TOKEN_DEFICIT.add(connThrottle.getDeficit());
         ServerMetrics.getMetrics().CONNECTION_REQUEST_COUNT.add(1);
 
-        if (cnxn.protocolManager.isReadonlyAvailable()) {
+        if (!cnxn.protocolManager.isReadonlyAvailable()) {
             LOG.warn(
                 "Connection request from old client {}; will be dropped if server is in r-o mode",
                 cnxn.getRemoteSocketAddress());
         }
 
-        if (!connReq.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
+        if (!request.getReadOnly() && this instanceof ReadOnlyZooKeeperServer) {
             String msg = "Refusing session request for not-read-only client " + cnxn.getRemoteSocketAddress();
             LOG.info(msg);
             throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
         }
-        if (connReq.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
+        if (request.getLastZxidSeen() > zkDb.dataTree.lastProcessedZxid) {
             String msg = "Refusing session request for client "
                          + cnxn.getRemoteSocketAddress()
                          + " as it has seen zxid 0x"
-                         + Long.toHexString(connReq.getLastZxidSeen())
+                         + Long.toHexString(request.getLastZxidSeen())
                          + " our last zxid is 0x"
                          + Long.toHexString(getZKDatabase().getDataTreeLastProcessedZxid())
                          + " client must try another server";
@@ -1428,8 +1424,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             LOG.info(msg);
             throw new CloseRequestException(msg, ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
         }
-        int sessionTimeout = connReq.getTimeOut();
-        byte[] passwd = connReq.getPasswd();
+        int sessionTimeout = request.getTimeOut();
+        byte[] passwd = request.getPasswd();
         int minSessionTimeout = getMinSessionTimeout();
         if (sessionTimeout < minSessionTimeout) {
             sessionTimeout = minSessionTimeout;
@@ -1447,16 +1443,16 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             LOG.debug(
                 "Client attempting to establish new session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                 Long.toHexString(id),
-                Long.toHexString(connReq.getLastZxidSeen()),
-                connReq.getTimeOut(),
+                Long.toHexString(request.getLastZxidSeen()),
+                request.getTimeOut(),
                 cnxn.getRemoteSocketAddress());
         } else {
             validateSession(cnxn, sessionId);
             LOG.debug(
                 "Client attempting to renew session: session = 0x{}, zxid = 0x{}, timeout = {}, address = {}",
                 Long.toHexString(sessionId),
-                Long.toHexString(connReq.getLastZxidSeen()),
-                connReq.getTimeOut(),
+                Long.toHexString(request.getLastZxidSeen()),
+                request.getTimeOut(),
                 cnxn.getRemoteSocketAddress());
             if (serverCnxnFactory != null) {
                 serverCnxnFactory.closeSession(sessionId, ServerCnxn.DisconnectReason.CLIENT_RECONNECT);
@@ -2182,7 +2178,7 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         case OpCode.create:
         case OpCode.create2: {
             CreateRequest req = new CreateRequest();
-            if (buffer2Record(request.request, req)) {
+            if (readRequestRecord(request, req)) {
                 mustCheckACL = true;
                 acl = req.getAcl();
                 path = parentPath(req.getPath());
@@ -2191,21 +2187,21 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
         case OpCode.delete: {
             DeleteRequest req = new DeleteRequest();
-            if (buffer2Record(request.request, req)) {
+            if (readRequestRecord(request, req)) {
                 path = parentPath(req.getPath());
             }
             break;
         }
         case OpCode.setData: {
             SetDataRequest req = new SetDataRequest();
-            if (buffer2Record(request.request, req)) {
+            if (readRequestRecord(request, req)) {
                 path = req.getPath();
             }
             break;
         }
         case OpCode.setACL: {
             SetACLRequest req = new SetACLRequest();
-            if (buffer2Record(request.request, req)) {
+            if (readRequestRecord(request, req)) {
                 mustCheckACL = true;
                 acl = req.getAcl();
                 path = req.getPath();
@@ -2299,16 +2295,13 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         return err == KeeperException.Code.OK.intValue();
     }
 
-    private boolean buffer2Record(ByteBuffer request, Record record) {
-        boolean rv = false;
+    private boolean readRequestRecord(Request request, Record record) {
         try {
-            ByteBufferInputStream.byteBuffer2Record(request, record);
-            request.rewind();
-            rv = true;
+            request.readRequestRecord(record);
+            return true;
         } catch (IOException ex) {
+            return false;
         }
-
-        return rv;
     }
 
     public int getOutstandingHandshakeNum() {

+ 3 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Learner.java

@@ -254,13 +254,9 @@ public class Learner {
         oa.writeLong(request.sessionId);
         oa.writeInt(request.cxid);
         oa.writeInt(request.type);
-        if (request.request != null) {
-            request.request.rewind();
-            int len = request.request.remaining();
-            byte[] b = new byte[len];
-            request.request.get(b);
-            request.request.rewind();
-            oa.write(b);
+        byte[] payload = request.readRequestBytes();
+        if (payload != null) {
+            oa.write(payload);
         }
         oa.close();
         QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);

+ 2 - 7
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java

@@ -31,7 +31,6 @@ import org.apache.zookeeper.Op;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.metrics.MetricsContext;
 import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.ZKDatabase;
@@ -78,9 +77,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
 
         if (OpCode.multi == request.type) {
             MultiOperationRecord multiTransactionRecord = new MultiOperationRecord();
-            request.request.rewind();
-            ByteBufferInputStream.byteBuffer2Record(request.request, multiTransactionRecord);
-            request.request.rewind();
+            request.readRequestRecord(multiTransactionRecord);
             boolean containsEphemeralCreate = false;
             for (Op op : multiTransactionRecord) {
                 if (op.getType() == OpCode.create || op.getType() == OpCode.create2) {
@@ -97,9 +94,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {
             }
         } else {
             CreateRequest createRequest = new CreateRequest();
-            request.request.rewind();
-            ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
-            request.request.rewind();
+            request.readRequestRecord(createRequest);
             CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
             if (!createMode.isEphemeral()) {
                 return null;

+ 5 - 9
zookeeper-server/src/test/java/org/apache/zookeeper/server/CreateContainerTest.java

@@ -27,7 +27,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -221,11 +220,11 @@ public class CreateContainerTest extends ClientBase {
     @Test
     @Timeout(value = 30)
     public void testMaxPerMinute() throws InterruptedException {
-        final BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
+        final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
         RequestProcessor processor = new RequestProcessor() {
             @Override
             public void processRequest(Request request) {
-                queue.add(new String(request.request.array()));
+                queue.add(new String(request.readRequestBytes()));
             }
 
             @Override
@@ -243,12 +242,9 @@ public class CreateContainerTest extends ClientBase {
                 return Arrays.asList("/one", "/two", "/three", "/four");
             }
         };
-        Executors.newSingleThreadExecutor().submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                containerManager.checkContainers();
-                return null;
-            }
+        Executors.newSingleThreadExecutor().submit(() -> {
+            containerManager.checkContainers();
+            return null;
         });
         assertEquals(queue.poll(5, TimeUnit.SECONDS), "/one");
         assertEquals(queue.poll(5, TimeUnit.SECONDS), "/two");

+ 1 - 7
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerCreationTest.java

@@ -18,10 +18,7 @@
 
 package org.apache.zookeeper.server;
 
-import java.io.ByteArrayOutputStream;
 import java.io.File;
-import java.nio.ByteBuffer;
-import org.apache.jute.BinaryOutputArchive;
 import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.test.ClientBase;
@@ -51,10 +48,7 @@ public class ZooKeeperServerCreationTest {
         ServerCnxn cnxn = new MockServerCnxn();
 
         ConnectRequest connReq = new ConnectRequest();
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
-        connReq.serialize(boa, "connect");
-        zks.processConnectRequest(cnxn, ByteBuffer.wrap(baos.toByteArray()));
+        zks.processConnectRequest(cnxn, connReq);
     }
 
 }

+ 9 - 13
zookeeper-server/src/test/java/org/apache/zookeeper/server/ZooKeeperServerTest.java

@@ -26,12 +26,12 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 import java.io.File;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.zookeeper.ZKTestCase;
 import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.server.persistence.FileTxnLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.SnapStream;
@@ -150,21 +150,17 @@ public class ZooKeeperServerTest extends ZKTestCase {
         final ZKDatabase zkDatabase = new ZKDatabase(mock(FileTxnSnapLog.class));
         zooKeeperServer.setZKDatabase(zkDatabase);
 
-        final ByteBuffer output = ByteBuffer.allocate(30);
-        // serialize a connReq
-        output.putInt(1);
-        // lastZxid
-        output.putLong(99L);
-        output.putInt(500);
-        output.putLong(123L);
-        output.putInt(1);
-        output.put((byte) 1);
-        output.put((byte) 1);
-        output.flip();
+        final ConnectRequest request = new ConnectRequest();
+        request.setProtocolVersion(1);
+        request.setLastZxidSeen(99L);
+        request.setTimeOut(500);
+        request.setSessionId(123L);
+        request.setPasswd(new byte[]{ 1 });
+        request.setReadOnly(true);
 
         ServerCnxn.CloseRequestException e = assertThrows(
                 ServerCnxn.CloseRequestException.class,
-                () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), output));
+                () -> zooKeeperServer.processConnectRequest(new MockServerCnxn(), request));
         assertEquals(e.getReason(), ServerCnxn.DisconnectReason.CLIENT_ZXID_AHEAD);
     }
 

+ 17 - 19
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/ReadOnlyZooKeeperServerTest.java

@@ -21,7 +21,7 @@ package org.apache.zookeeper.server.quorum;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
-import java.nio.ByteBuffer;
+import org.apache.zookeeper.proto.ConnectRequest;
 import org.apache.zookeeper.server.MockServerCnxn;
 import org.apache.zookeeper.server.ServerCnxn;
 import org.apache.zookeeper.server.ZKDatabase;
@@ -35,28 +35,26 @@ import org.junit.jupiter.api.Test;
 public class ReadOnlyZooKeeperServerTest {
 
     /**
-     * test method {@link ZooKeeperServer#processConnectRequest(org.apache.zookeeper.server.ServerCnxn, java.nio.ByteBuffer)}
+     * test method {@link ZooKeeperServer#processConnectRequest(ServerCnxn, ConnectRequest)}
      */
     @Test
     public void testReadOnlyZookeeperServer() {
         ReadOnlyZooKeeperServer readOnlyZooKeeperServer = new ReadOnlyZooKeeperServer(
-                mock(FileTxnSnapLog.class), mock(QuorumPeer.class), mock(ZKDatabase.class));
-
-        final ByteBuffer output = ByteBuffer.allocate(30);
-        // serialize a connReq
-        output.putInt(1);
-        output.putLong(1L);
-        output.putInt(500);
-        output.putLong(123L);
-        output.putInt(1);
-        output.put((byte) 1);
-        // set readOnly false
-        output.put((byte) 0);
-        output.flip();
-
-        ServerCnxn.CloseRequestException e = assertThrows(ServerCnxn.CloseRequestException.class, () -> {
-            readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), output);
-        });
+                mock(FileTxnSnapLog.class),
+                mock(QuorumPeer.class),
+                mock(ZKDatabase.class));
+
+        final ConnectRequest request = new ConnectRequest();
+        request.setProtocolVersion(1);
+        request.setLastZxidSeen(99L);
+        request.setTimeOut(500);
+        request.setSessionId(123L);
+        request.setPasswd(new byte[]{ 1 });
+        request.setReadOnly(false);
+
+        ServerCnxn.CloseRequestException e = assertThrows(
+                ServerCnxn.CloseRequestException.class,
+                () -> readOnlyZooKeeperServer.processConnectRequest(new MockServerCnxn(), request));
         assertEquals(e.getReason(), ServerCnxn.DisconnectReason.NOT_READ_ONLY_CLIENT);
     }
 

+ 2 - 5
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SessionUpgradeQuorumTest.java

@@ -39,7 +39,6 @@ import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.ZooKeeper.States;
 import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.proto.CreateRequest;
-import org.apache.zookeeper.server.ByteBufferInputStream;
 import org.apache.zookeeper.server.Request;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.test.ClientBase;
@@ -319,15 +318,13 @@ public class SessionUpgradeQuorumTest extends QuorumPeerTestBase {
 
                             if (request.type == ZooDefs.OpCode.create && request.cnxn != null) {
                                 CreateRequest createRequest = new CreateRequest();
-                                request.request.rewind();
-                                ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
-                                request.request.rewind();
+                                request.readRequestRecord(createRequest);
                                 try {
                                     CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
                                     if (createMode.isEphemeral()) {
                                         request.cnxn.sendCloseSession();
                                     }
-                                } catch (KeeperException e) {
+                                } catch (KeeperException ignore) {
                                 }
                                 return;
                             }