|
@@ -208,7 +208,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
case OpCode.create:
|
|
|
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
|
|
|
.getNextZxid(), zks.getTime(), OpCode.create);
|
|
|
- zks.sessionTracker.checkSession(request.sessionId);
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
CreateRequest createRequest = new CreateRequest();
|
|
|
ZooKeeperServer.byteBuffer2Record(request.request,
|
|
|
createRequest);
|
|
@@ -270,7 +270,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
case OpCode.delete:
|
|
|
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
|
|
|
.getNextZxid(), zks.getTime(), OpCode.delete);
|
|
|
- zks.sessionTracker.checkSession(request.sessionId);
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
DeleteRequest deleteRequest = new DeleteRequest();
|
|
|
ZooKeeperServer.byteBuffer2Record(request.request,
|
|
|
deleteRequest);
|
|
@@ -304,7 +304,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
case OpCode.setData:
|
|
|
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
|
|
|
.getNextZxid(), zks.getTime(), OpCode.setData);
|
|
|
- zks.sessionTracker.checkSession(request.sessionId);
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
SetDataRequest setDataRequest = new SetDataRequest();
|
|
|
ZooKeeperServer.byteBuffer2Record(request.request,
|
|
|
setDataRequest);
|
|
@@ -326,7 +326,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
case OpCode.setACL:
|
|
|
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
|
|
|
.getNextZxid(), zks.getTime(), OpCode.setACL);
|
|
|
- zks.sessionTracker.checkSession(request.sessionId);
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
SetACLRequest setAclRequest = new SetACLRequest();
|
|
|
ZooKeeperServer.byteBuffer2Record(request.request,
|
|
|
setAclRequest);
|
|
@@ -356,10 +356,12 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
txn = new CreateSessionTxn(to);
|
|
|
request.request.rewind();
|
|
|
zks.sessionTracker.addSession(request.sessionId, to);
|
|
|
+ zks.sessionTracker.setOwner(request.sessionId, request.getOwner());
|
|
|
break;
|
|
|
case OpCode.closeSession:
|
|
|
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
|
|
|
.getNextZxid(), zks.getTime(), OpCode.closeSession);
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
HashSet<String> es = zks.dataTree
|
|
|
.getEphemerals(request.sessionId);
|
|
|
synchronized (zks.outstandingChanges) {
|
|
@@ -386,6 +388,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
case OpCode.getChildren:
|
|
|
case OpCode.ping:
|
|
|
case OpCode.setWatches:
|
|
|
+ zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
break;
|
|
|
}
|
|
|
} catch (KeeperException e) {
|
|
@@ -393,6 +396,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
txnHeader.setType(OpCode.error);
|
|
|
txn = new ErrorTxn(e.code().intValue());
|
|
|
}
|
|
|
+ LOG.warn("Got exception when processing " + request.toString(), e);
|
|
|
+ request.setException(e);
|
|
|
} catch (Exception e) {
|
|
|
// log at error level as we are returning a marshalling
|
|
|
// error to the user
|