|
@@ -298,6 +298,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
|
|
|
switch (type) {
|
|
switch (type) {
|
|
case OpCode.create:
|
|
case OpCode.create:
|
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
CreateRequest createRequest = (CreateRequest)record;
|
|
CreateRequest createRequest = (CreateRequest)record;
|
|
if (deserialize) {
|
|
if (deserialize) {
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, createRequest);
|
|
@@ -354,6 +355,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
|
|
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
|
|
break;
|
|
break;
|
|
case OpCode.delete:
|
|
case OpCode.delete:
|
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
DeleteRequest deleteRequest = (DeleteRequest)record;
|
|
DeleteRequest deleteRequest = (DeleteRequest)record;
|
|
if(deserialize)
|
|
if(deserialize)
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
|
|
@@ -378,6 +380,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
|
|
addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
|
|
break;
|
|
break;
|
|
case OpCode.setData:
|
|
case OpCode.setData:
|
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
SetDataRequest setDataRequest = (SetDataRequest)record;
|
|
SetDataRequest setDataRequest = (SetDataRequest)record;
|
|
if(deserialize)
|
|
if(deserialize)
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
|
|
@@ -391,6 +394,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
addChangeRecord(nodeRecord);
|
|
addChangeRecord(nodeRecord);
|
|
break;
|
|
break;
|
|
case OpCode.setACL:
|
|
case OpCode.setACL:
|
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
SetACLRequest setAclRequest = (SetACLRequest)record;
|
|
SetACLRequest setAclRequest = (SetACLRequest)record;
|
|
if(deserialize)
|
|
if(deserialize)
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
|
|
@@ -439,6 +443,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
+ Long.toHexString(request.sessionId));
|
|
+ Long.toHexString(request.sessionId));
|
|
break;
|
|
break;
|
|
case OpCode.check:
|
|
case OpCode.check:
|
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
CheckVersionRequest checkVersionRequest = (CheckVersionRequest)record;
|
|
CheckVersionRequest checkVersionRequest = (CheckVersionRequest)record;
|
|
if(deserialize)
|
|
if(deserialize)
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
|
|
ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
|
|
@@ -472,10 +477,6 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
request.setTxn(null);
|
|
request.setTxn(null);
|
|
|
|
|
|
try {
|
|
try {
|
|
- if(request.type != OpCode.createSession && request.type != OpCode.closeSession) {
|
|
|
|
- zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
switch (request.type) {
|
|
switch (request.type) {
|
|
case OpCode.create:
|
|
case OpCode.create:
|
|
CreateRequest createRequest = new CreateRequest();
|
|
CreateRequest createRequest = new CreateRequest();
|
|
@@ -571,6 +572,19 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
case OpCode.closeSession:
|
|
case OpCode.closeSession:
|
|
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
|
|
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
|
|
break;
|
|
break;
|
|
|
|
+
|
|
|
|
+ //All the rest don't need to create a Txn - just verify session
|
|
|
|
+ case OpCode.sync:
|
|
|
|
+ case OpCode.exists:
|
|
|
|
+ case OpCode.getData:
|
|
|
|
+ case OpCode.getACL:
|
|
|
|
+ case OpCode.getChildren:
|
|
|
|
+ case OpCode.getChildren2:
|
|
|
|
+ case OpCode.ping:
|
|
|
|
+ case OpCode.setWatches:
|
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId,
|
|
|
|
+ request.getOwner());
|
|
|
|
+ break;
|
|
}
|
|
}
|
|
} catch (KeeperException e) {
|
|
} catch (KeeperException e) {
|
|
if (request.getHdr() != null) {
|
|
if (request.getHdr() != null) {
|