|
@@ -18,7 +18,6 @@
|
|
|
|
|
|
package org.apache.zookeeper.server;
|
|
|
|
|
|
-import static java.nio.charset.StandardCharsets.UTF_8;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.StringReader;
|
|
@@ -36,6 +35,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
|
import org.apache.jute.Record;
|
|
|
import org.apache.zookeeper.CreateMode;
|
|
|
+import org.apache.zookeeper.DeleteContainerRequest;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.KeeperException.BadArgumentsException;
|
|
|
import org.apache.zookeeper.KeeperException.Code;
|
|
@@ -101,7 +101,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
*/
|
|
|
private static boolean failCreate = false;
|
|
|
|
|
|
- LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
|
|
|
+ LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<>();
|
|
|
|
|
|
private final RequestProcessor nextProcessor;
|
|
|
private final boolean digestEnabled;
|
|
@@ -311,13 +311,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
/**
|
|
|
* This method will be called inside the ProcessRequestThread, which is a
|
|
|
* singleton, so there will be a single thread calling this code.
|
|
|
- *
|
|
|
- * @param type
|
|
|
- * @param zxid
|
|
|
- * @param request
|
|
|
- * @param record
|
|
|
*/
|
|
|
- protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
|
|
|
+ protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException, IOException, RequestProcessorException {
|
|
|
if (request.getHdr() == null) {
|
|
|
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
|
|
|
Time.currentWallTime(), type));
|
|
@@ -328,11 +323,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
case OpCode.create2:
|
|
|
case OpCode.createTTL:
|
|
|
case OpCode.createContainer: {
|
|
|
- pRequest2TxnCreate(type, request, record, deserialize);
|
|
|
+ pRequest2TxnCreate(type, request, record);
|
|
|
break;
|
|
|
}
|
|
|
case OpCode.deleteContainer: {
|
|
|
- String path = new String(request.readRequestBytes(), UTF_8);
|
|
|
+ DeleteContainerRequest txn = (DeleteContainerRequest) record;
|
|
|
+ String path = txn.getPath();
|
|
|
String parentPath = getParentPathAndValidate(path);
|
|
|
ChangeRecord nodeRecord = getRecordForPath(path);
|
|
|
if (nodeRecord.childCount > 0) {
|
|
@@ -359,9 +355,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
case OpCode.delete:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
DeleteRequest deleteRequest = (DeleteRequest) record;
|
|
|
- if (deserialize) {
|
|
|
- request.readRequestRecord(deleteRequest);
|
|
|
- }
|
|
|
String path = deleteRequest.getPath();
|
|
|
String parentPath = getParentPathAndValidate(path);
|
|
|
ChangeRecord parentRecord = getRecordForPath(parentPath);
|
|
@@ -387,9 +380,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
case OpCode.setData:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
SetDataRequest setDataRequest = (SetDataRequest) record;
|
|
|
- if (deserialize) {
|
|
|
- request.readRequestRecord(setDataRequest);
|
|
|
- }
|
|
|
path = setDataRequest.getPath();
|
|
|
validatePath(path, request.sessionId);
|
|
|
nodeRecord = getRecordForPath(path);
|
|
@@ -559,9 +549,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
case OpCode.setACL:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
SetACLRequest setAclRequest = (SetACLRequest) record;
|
|
|
- if (deserialize) {
|
|
|
- request.readRequestRecord(setAclRequest);
|
|
|
- }
|
|
|
path = setAclRequest.getPath();
|
|
|
validatePath(path, request.sessionId);
|
|
|
List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
|
|
@@ -577,8 +564,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
addChangeRecord(nodeRecord);
|
|
|
break;
|
|
|
case OpCode.createSession:
|
|
|
- CreateSessionTxn createSessionTxn = new CreateSessionTxn();
|
|
|
- request.readRequestRecord(createSessionTxn);
|
|
|
+ CreateSessionTxn createSessionTxn = request.readRequestRecord(CreateSessionTxn::new);
|
|
|
request.setTxn(createSessionTxn);
|
|
|
// only add the global session tracker but not to ZKDb
|
|
|
zks.sessionTracker.trackSession(request.sessionId, createSessionTxn.getTimeOut());
|
|
@@ -630,9 +616,6 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
case OpCode.check:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
|
|
|
- if (deserialize) {
|
|
|
- request.readRequestRecord(checkVersionRequest);
|
|
|
- }
|
|
|
path = checkVersionRequest.getPath();
|
|
|
validatePath(path, request.sessionId);
|
|
|
nodeRecord = getRecordForPath(path);
|
|
@@ -653,11 +636,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
|
|
|
- if (deserialize) {
|
|
|
- request.readRequestRecord(record);
|
|
|
- }
|
|
|
-
|
|
|
+ private void pRequest2TxnCreate(int type, Request request, Record record) throws IOException, KeeperException {
|
|
|
int flags;
|
|
|
String path;
|
|
|
List<ACL> acl;
|
|
@@ -792,39 +771,41 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
case OpCode.createContainer:
|
|
|
case OpCode.create:
|
|
|
case OpCode.create2:
|
|
|
- CreateRequest create2Request = new CreateRequest();
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
|
|
|
+ CreateRequest create2Request = request.readRequestRecord(CreateRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request);
|
|
|
break;
|
|
|
case OpCode.createTTL:
|
|
|
- CreateTTLRequest createTtlRequest = new CreateTTLRequest();
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
|
|
|
+ CreateTTLRequest createTtlRequest = request.readRequestRecord(CreateTTLRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest);
|
|
|
break;
|
|
|
case OpCode.deleteContainer:
|
|
|
+ DeleteContainerRequest deleteContainerRequest = request.readRequestRecord(DeleteContainerRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, deleteContainerRequest);
|
|
|
+ break;
|
|
|
case OpCode.delete:
|
|
|
- DeleteRequest deleteRequest = new DeleteRequest();
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
|
|
|
+ DeleteRequest deleteRequest = request.readRequestRecord(DeleteRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
|
|
|
break;
|
|
|
case OpCode.setData:
|
|
|
- SetDataRequest setDataRequest = new SetDataRequest();
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
|
|
|
+ SetDataRequest setDataRequest = request.readRequestRecord(SetDataRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
|
|
|
break;
|
|
|
case OpCode.reconfig:
|
|
|
- ReconfigRequest reconfigRequest = new ReconfigRequest();
|
|
|
- request.readRequestRecord(reconfigRequest);
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
|
|
|
+ ReconfigRequest reconfigRequest = request.readRequestRecord(ReconfigRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest);
|
|
|
break;
|
|
|
case OpCode.setACL:
|
|
|
- SetACLRequest setAclRequest = new SetACLRequest();
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
|
|
|
+ SetACLRequest setAclRequest = request.readRequestRecord(SetACLRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
|
|
|
break;
|
|
|
case OpCode.check:
|
|
|
- CheckVersionRequest checkRequest = new CheckVersionRequest();
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
|
|
|
+ CheckVersionRequest checkRequest = request.readRequestRecord(CheckVersionRequest::new);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
|
|
|
break;
|
|
|
case OpCode.multi:
|
|
|
- MultiOperationRecord multiRequest = new MultiOperationRecord();
|
|
|
+ MultiOperationRecord multiRequest;
|
|
|
try {
|
|
|
- request.readRequestRecord(multiRequest);
|
|
|
+ multiRequest = request.readRequestRecord(MultiOperationRecord::new);
|
|
|
} catch (IOException e) {
|
|
|
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
|
|
|
throw e;
|
|
@@ -854,7 +835,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
} else {
|
|
|
/* Prep the request and convert to a Txn */
|
|
|
try {
|
|
|
- pRequest2Txn(op.getType(), zxid, request, subrequest, false);
|
|
|
+ pRequest2Txn(op.getType(), zxid, request, subrequest);
|
|
|
type = op.getType();
|
|
|
txn = request.getTxn();
|
|
|
} catch (KeeperException e) {
|
|
@@ -899,7 +880,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
case OpCode.createSession:
|
|
|
case OpCode.closeSession:
|
|
|
if (!request.isLocalSession()) {
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, null);
|
|
|
}
|
|
|
break;
|
|
|
|
|
@@ -944,20 +925,14 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
// log at error level as we are returning a marshalling
|
|
|
// error to the user
|
|
|
LOG.error("Failed to process {}", request, e);
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- byte[] payload = request.readRequestBytes();
|
|
|
- if (payload != null) {
|
|
|
- for (byte b : payload) {
|
|
|
- sb.append(String.format("%02x", (0xff & b)));
|
|
|
- }
|
|
|
- } else {
|
|
|
- sb.append("request buffer is null");
|
|
|
- }
|
|
|
- LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), sb);
|
|
|
- if (request.getHdr() != null) {
|
|
|
- request.getHdr().setType(OpCode.error);
|
|
|
- request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
|
|
|
+ String digest = request.requestDigest();
|
|
|
+ LOG.error("Dumping request buffer for request type {}: 0x{}", Request.op2String(request.type), digest);
|
|
|
+ if (request.getHdr() == null) {
|
|
|
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getZxid(), Time.currentWallTime(), request.type));
|
|
|
}
|
|
|
+
|
|
|
+ request.getHdr().setType(OpCode.error);
|
|
|
+ request.setTxn(new ErrorTxn(Code.MARSHALLINGERROR.intValue()));
|
|
|
}
|
|
|
}
|
|
|
|