|
@@ -168,6 +168,7 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
zks.decInProcess();
|
|
|
Code err = Code.OK;
|
|
|
Record rsp = null;
|
|
|
+ String path = null;
|
|
|
try {
|
|
|
if (request.getHdr() != null && request.getHdr().getType() == OpCode.error) {
|
|
|
/*
|
|
@@ -316,7 +317,7 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
ExistsRequest existsRequest = new ExistsRequest();
|
|
|
ByteBufferInputStream.byteBuffer2Record(request.request,
|
|
|
existsRequest);
|
|
|
- String path = existsRequest.getPath();
|
|
|
+ path = existsRequest.getPath();
|
|
|
if (path.indexOf('\0') != -1) {
|
|
|
throw new KeeperException.BadArgumentsException();
|
|
|
}
|
|
@@ -330,15 +331,16 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
GetDataRequest getDataRequest = new GetDataRequest();
|
|
|
ByteBufferInputStream.byteBuffer2Record(request.request,
|
|
|
getDataRequest);
|
|
|
- DataNode n = zks.getZKDatabase().getNode(getDataRequest.getPath());
|
|
|
+ path = getDataRequest.getPath();
|
|
|
+ DataNode n = zks.getZKDatabase().getNode(path);
|
|
|
if (n == null) {
|
|
|
throw new KeeperException.NoNodeException();
|
|
|
}
|
|
|
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
|
|
|
ZooDefs.Perms.READ,
|
|
|
- request.authInfo, getDataRequest.getPath(), null);
|
|
|
+ request.authInfo, path, null);
|
|
|
Stat stat = new Stat();
|
|
|
- byte b[] = zks.getZKDatabase().getData(getDataRequest.getPath(), stat,
|
|
|
+ byte b[] = zks.getZKDatabase().getData(path, stat,
|
|
|
getDataRequest.getWatch() ? cnxn : null);
|
|
|
rsp = new GetDataResponse(b, stat);
|
|
|
break;
|
|
@@ -362,8 +364,9 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
ByteBufferInputStream.byteBuffer2Record(request.request,
|
|
|
getACLRequest);
|
|
|
Stat stat = new Stat();
|
|
|
+ path = getACLRequest.getPath();
|
|
|
List<ACL> acl =
|
|
|
- zks.getZKDatabase().getACL(getACLRequest.getPath(), stat);
|
|
|
+ zks.getZKDatabase().getACL(path, stat);
|
|
|
rsp = new GetACLResponse(acl, stat);
|
|
|
break;
|
|
|
}
|
|
@@ -372,15 +375,16 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
|
|
|
ByteBufferInputStream.byteBuffer2Record(request.request,
|
|
|
getChildrenRequest);
|
|
|
- DataNode n = zks.getZKDatabase().getNode(getChildrenRequest.getPath());
|
|
|
+ path = getChildrenRequest.getPath();
|
|
|
+ DataNode n = zks.getZKDatabase().getNode(path);
|
|
|
if (n == null) {
|
|
|
throw new KeeperException.NoNodeException();
|
|
|
}
|
|
|
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
|
|
|
ZooDefs.Perms.READ,
|
|
|
- request.authInfo, getChildrenRequest.getPath(), null);
|
|
|
+ request.authInfo, path, null);
|
|
|
List<String> children = zks.getZKDatabase().getChildren(
|
|
|
- getChildrenRequest.getPath(), null, getChildrenRequest
|
|
|
+ path, null, getChildrenRequest
|
|
|
.getWatch() ? cnxn : null);
|
|
|
rsp = new GetChildrenResponse(children);
|
|
|
break;
|
|
@@ -391,15 +395,16 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
ByteBufferInputStream.byteBuffer2Record(request.request,
|
|
|
getChildren2Request);
|
|
|
Stat stat = new Stat();
|
|
|
- DataNode n = zks.getZKDatabase().getNode(getChildren2Request.getPath());
|
|
|
+ path = getChildren2Request.getPath();
|
|
|
+ DataNode n = zks.getZKDatabase().getNode(path);
|
|
|
if (n == null) {
|
|
|
throw new KeeperException.NoNodeException();
|
|
|
}
|
|
|
PrepRequestProcessor.checkACL(zks, request.cnxn, zks.getZKDatabase().aclForNode(n),
|
|
|
ZooDefs.Perms.READ,
|
|
|
- request.authInfo, getChildren2Request.getPath(), null);
|
|
|
+ request.authInfo, path, null);
|
|
|
List<String> children = zks.getZKDatabase().getChildren(
|
|
|
- getChildren2Request.getPath(), stat, getChildren2Request
|
|
|
+ path, stat, getChildren2Request
|
|
|
.getWatch() ? cnxn : null);
|
|
|
rsp = new GetChildren2Response(children, stat);
|
|
|
break;
|
|
@@ -410,11 +415,12 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
ByteBufferInputStream.byteBuffer2Record(request.request,
|
|
|
checkWatches);
|
|
|
WatcherType type = WatcherType.fromInt(checkWatches.getType());
|
|
|
+ path = checkWatches.getPath();
|
|
|
boolean containsWatcher = zks.getZKDatabase().containsWatcher(
|
|
|
- checkWatches.getPath(), type, cnxn);
|
|
|
+ path, type, cnxn);
|
|
|
if (!containsWatcher) {
|
|
|
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
|
|
|
- checkWatches.getPath(), type);
|
|
|
+ path, type);
|
|
|
throw new KeeperException.NoWatcherException(msg);
|
|
|
}
|
|
|
break;
|
|
@@ -425,11 +431,12 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
ByteBufferInputStream.byteBuffer2Record(request.request,
|
|
|
removeWatches);
|
|
|
WatcherType type = WatcherType.fromInt(removeWatches.getType());
|
|
|
+ path = removeWatches.getPath();
|
|
|
boolean removed = zks.getZKDatabase().removeWatch(
|
|
|
- removeWatches.getPath(), type, cnxn);
|
|
|
+ path, type, cnxn);
|
|
|
if (!removed) {
|
|
|
String msg = String.format(Locale.ENGLISH, "%s (type: %s)",
|
|
|
- removeWatches.getPath(), type);
|
|
|
+ path, type);
|
|
|
throw new KeeperException.NoWatcherException(msg);
|
|
|
}
|
|
|
break;
|
|
@@ -468,7 +475,19 @@ public class FinalRequestProcessor implements RequestProcessor {
|
|
|
updateStats(request, lastOp, lastZxid);
|
|
|
|
|
|
try {
|
|
|
- cnxn.sendResponse(hdr, rsp, "response");
|
|
|
+ if (request.type == OpCode.getData && path != null && rsp != null) {
|
|
|
+ // Serialized read responses could be cached by the connection object.
|
|
|
+ // Cache entries are identified by their path and last modified zxid,
|
|
|
+ // so these values are passed along with the response.
|
|
|
+ GetDataResponse getDataResponse = (GetDataResponse)rsp;
|
|
|
+ Stat stat = null;
|
|
|
+ if (getDataResponse != null && getDataResponse.getStat() != null) {
|
|
|
+ stat = getDataResponse.getStat();
|
|
|
+ }
|
|
|
+ cnxn.sendResponse(hdr, rsp, "response", path, stat);
|
|
|
+ } else {
|
|
|
+ cnxn.sendResponse(hdr, rsp, "response");
|
|
|
+ }
|
|
|
if (request.type == OpCode.closeSession) {
|
|
|
cnxn.sendCloseSession();
|
|
|
}
|