|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.zookeeper.server;
|
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
+import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
@@ -289,12 +290,18 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
* @param request
|
|
|
* @param record
|
|
|
*/
|
|
|
- protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException {
|
|
|
+ protected void pRequest2Txn(int type, long zxid, Request request,
|
|
|
+ Record record, boolean deserialize)
|
|
|
+ throws KeeperException, IOException
|
|
|
+ {
|
|
|
request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type));
|
|
|
|
|
|
switch (type) {
|
|
|
case OpCode.create:
|
|
|
CreateRequest createRequest = (CreateRequest)record;
|
|
|
+ if (deserialize) {
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
|
|
|
+ }
|
|
|
String path = createRequest.getPath();
|
|
|
int lastSlash = path.lastIndexOf('/');
|
|
|
if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
|
|
@@ -348,6 +355,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
break;
|
|
|
case OpCode.delete:
|
|
|
DeleteRequest deleteRequest = (DeleteRequest)record;
|
|
|
+ if(deserialize)
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
|
|
|
path = deleteRequest.getPath();
|
|
|
lastSlash = path.lastIndexOf('/');
|
|
|
if (lastSlash == -1 || path.indexOf('\0') != -1
|
|
@@ -370,6 +379,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
break;
|
|
|
case OpCode.setData:
|
|
|
SetDataRequest setDataRequest = (SetDataRequest)record;
|
|
|
+ if(deserialize)
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
|
|
|
path = setDataRequest.getPath();
|
|
|
nodeRecord = getRecordForPath(path);
|
|
|
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
|
|
@@ -381,6 +392,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
break;
|
|
|
case OpCode.setACL:
|
|
|
SetACLRequest setAclRequest = (SetACLRequest)record;
|
|
|
+ if(deserialize)
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
|
|
|
path = setAclRequest.getPath();
|
|
|
listACL = removeDuplicates(setAclRequest.getAcl());
|
|
|
if (!fixupACL(request.authInfo, listACL)) {
|
|
@@ -427,6 +440,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
break;
|
|
|
case OpCode.check:
|
|
|
CheckVersionRequest checkVersionRequest = (CheckVersionRequest)record;
|
|
|
+ if(deserialize)
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
|
|
|
path = checkVersionRequest.getPath();
|
|
|
nodeRecord = getRecordForPath(path);
|
|
|
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo);
|
|
@@ -464,34 +479,34 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
switch (request.type) {
|
|
|
case OpCode.create:
|
|
|
CreateRequest createRequest = new CreateRequest();
|
|
|
- ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, createRequest, true);
|
|
|
break;
|
|
|
case OpCode.delete:
|
|
|
- DeleteRequest deleteRequest = new DeleteRequest();
|
|
|
- ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest);
|
|
|
+ DeleteRequest deleteRequest = new DeleteRequest();
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
|
|
|
break;
|
|
|
case OpCode.setData:
|
|
|
- SetDataRequest setDataRequest = new SetDataRequest();
|
|
|
- ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest);
|
|
|
+ SetDataRequest setDataRequest = new SetDataRequest();
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
|
|
|
break;
|
|
|
case OpCode.setACL:
|
|
|
- SetACLRequest setAclRequest = new SetACLRequest();
|
|
|
- ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest);
|
|
|
+ SetACLRequest setAclRequest = new SetACLRequest();
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
|
|
|
break;
|
|
|
case OpCode.check:
|
|
|
- CheckVersionRequest checkRequest = new CheckVersionRequest();
|
|
|
- ByteBufferInputStream.byteBuffer2Record(request.request, checkRequest);
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest);
|
|
|
+ CheckVersionRequest checkRequest = new CheckVersionRequest();
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
|
|
|
break;
|
|
|
case OpCode.multi:
|
|
|
MultiTransactionRecord multiRequest = new MultiTransactionRecord();
|
|
|
- ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
|
|
|
+ try {
|
|
|
+ ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
|
|
|
+ } catch(IOException e) {
|
|
|
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zks.getNextZxid(),
|
|
|
+ zks.getTime(), OpCode.multi));
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
List<Txn> txns = new ArrayList<Txn>();
|
|
|
-
|
|
|
//Each op in a multi-op must have the same zxid!
|
|
|
long zxid = zks.getNextZxid();
|
|
|
KeeperException ke = null;
|
|
@@ -516,7 +531,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
/* Prep the request and convert to a Txn */
|
|
|
else {
|
|
|
try {
|
|
|
- pRequest2Txn(op.getType(), zxid, request, subrequest);
|
|
|
+ pRequest2Txn(op.getType(), zxid, request, subrequest, false);
|
|
|
type = request.getHdr().getType();
|
|
|
txn = request.getTxn();
|
|
|
} catch (KeeperException e) {
|
|
@@ -554,7 +569,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
//create/close session don't require request record
|
|
|
case OpCode.createSession:
|
|
|
case OpCode.closeSession:
|
|
|
- pRequest2Txn(request.type, zks.getNextZxid(), request, null);
|
|
|
+ pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
|
|
|
break;
|
|
|
}
|
|
|
} catch (KeeperException e) {
|