|
@@ -21,13 +21,13 @@ package org.apache.zookeeper.server;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.ListIterator;
|
|
|
import java.util.Locale;
|
|
|
+import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
@@ -92,12 +92,11 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
|
|
|
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
|
|
|
|
|
|
- RequestProcessor nextProcessor;
|
|
|
+ private final RequestProcessor nextProcessor;
|
|
|
|
|
|
ZooKeeperServer zks;
|
|
|
|
|
|
- public PrepRequestProcessor(ZooKeeperServer zks,
|
|
|
- RequestProcessor nextProcessor) {
|
|
|
+ public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
|
|
|
super("ProcessThread(sid:" + zks.getServerId()
|
|
|
+ " cport:" + zks.getClientPort() + "):");
|
|
|
this.nextProcessor = nextProcessor;
|
|
@@ -134,7 +133,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
LOG.info("PrepRequestProcessor exited loop!");
|
|
|
}
|
|
|
|
|
|
- ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
|
|
|
+ private ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
|
|
|
ChangeRecord lastChange = null;
|
|
|
synchronized (zks.outstandingChanges) {
|
|
|
lastChange = zks.outstandingChangesForPath.get(path);
|
|
@@ -167,7 +166,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
return lastChange;
|
|
|
}
|
|
|
|
|
|
- void addChangeRecord(ChangeRecord c) {
|
|
|
+ private void addChangeRecord(ChangeRecord c) {
|
|
|
synchronized (zks.outstandingChanges) {
|
|
|
zks.outstandingChanges.add(c);
|
|
|
zks.outstandingChangesForPath.put(c.path, c);
|
|
@@ -182,21 +181,21 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
*
|
|
|
* @param multiRequest
|
|
|
*/
|
|
|
- HashMap<String, ChangeRecord> getPendingChanges(MultiTransactionRecord multiRequest) {
|
|
|
- HashMap<String, ChangeRecord> pendingChangeRecords = new HashMap<String, ChangeRecord>();
|
|
|
+ private Map<String, ChangeRecord> getPendingChanges(MultiTransactionRecord multiRequest) {
|
|
|
+ HashMap<String, ChangeRecord> pendingChangeRecords = new HashMap<String, ChangeRecord>();
|
|
|
|
|
|
for(Op op: multiRequest) {
|
|
|
- String path = op.getPath();
|
|
|
-
|
|
|
- try {
|
|
|
- ChangeRecord cr = getRecordForPath(path);
|
|
|
- if (cr != null) {
|
|
|
- pendingChangeRecords.put(path, cr);
|
|
|
- }
|
|
|
- } catch (KeeperException.NoNodeException e) {
|
|
|
- // ignore this one
|
|
|
- }
|
|
|
- }
|
|
|
+ String path = op.getPath();
|
|
|
+
|
|
|
+ try {
|
|
|
+ ChangeRecord cr = getRecordForPath(path);
|
|
|
+ if (cr != null) {
|
|
|
+ pendingChangeRecords.put(path, cr);
|
|
|
+ }
|
|
|
+ } catch (KeeperException.NoNodeException e) {
|
|
|
+ // ignore this one
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
return pendingChangeRecords;
|
|
|
}
|
|
@@ -211,7 +210,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
* @param zxid
|
|
|
* @param pendingChangeRecords
|
|
|
*/
|
|
|
- void rollbackPendingChanges(long zxid, HashMap<String, ChangeRecord>pendingChangeRecords) {
|
|
|
+ void rollbackPendingChanges(long zxid, Map<String, ChangeRecord>pendingChangeRecords) {
|
|
|
|
|
|
synchronized (zks.outstandingChanges) {
|
|
|
// Grab a list iterator starting at the END of the list so we can iterate in reverse
|
|
@@ -291,8 +290,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
* @param record
|
|
|
*/
|
|
|
protected void pRequest2Txn(int type, long zxid, Request request, Record record) throws KeeperException {
|
|
|
- request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid,
|
|
|
- zks.getTime(), type);
|
|
|
+ request.hdr = new TxnHeader(request.sessionId, request.cxid, zxid, zks.getTime(), type);
|
|
|
|
|
|
switch (type) {
|
|
|
case OpCode.create:
|
|
@@ -312,11 +310,9 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
String parentPath = path.substring(0, lastSlash);
|
|
|
ChangeRecord parentRecord = getRecordForPath(parentPath);
|
|
|
|
|
|
- checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
|
|
|
- request.authInfo);
|
|
|
+ checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE, request.authInfo);
|
|
|
int parentCVersion = parentRecord.stat.getCversion();
|
|
|
- CreateMode createMode =
|
|
|
- CreateMode.fromFlag(createRequest.getFlags());
|
|
|
+ CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
|
|
|
if (createMode.isSequential()) {
|
|
|
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
|
|
|
}
|
|
@@ -339,8 +335,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
throw new KeeperException.NoChildrenForEphemeralsException(path);
|
|
|
}
|
|
|
int newCversion = parentRecord.stat.getCversion()+1;
|
|
|
- request.txn = new CreateTxn(path, createRequest.getData(),
|
|
|
- listACL,
|
|
|
+ request.txn = new CreateTxn(path, createRequest.getData(), listACL,
|
|
|
createMode.isEphemeral(), newCversion);
|
|
|
StatPersisted s = new StatPersisted();
|
|
|
if (createMode.isEphemeral()) {
|
|
@@ -350,8 +345,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
parentRecord.childCount++;
|
|
|
parentRecord.stat.setCversion(newCversion);
|
|
|
addChangeRecord(parentRecord);
|
|
|
- addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s,
|
|
|
- 0, listACL));
|
|
|
+ addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, s, 0, listACL));
|
|
|
break;
|
|
|
case OpCode.delete:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
@@ -365,12 +359,8 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
parentPath = path.substring(0, lastSlash);
|
|
|
parentRecord = getRecordForPath(parentPath);
|
|
|
ChangeRecord nodeRecord = getRecordForPath(path);
|
|
|
- checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE,
|
|
|
- request.authInfo);
|
|
|
- int version = deleteRequest.getVersion();
|
|
|
- if (version != -1 && nodeRecord.stat.getVersion() != version) {
|
|
|
- throw new KeeperException.BadVersionException(path);
|
|
|
- }
|
|
|
+ checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo);
|
|
|
+ checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
|
|
|
if (nodeRecord.childCount > 0) {
|
|
|
throw new KeeperException.NotEmptyException(path);
|
|
|
}
|
|
@@ -378,25 +368,18 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
parentRecord = parentRecord.duplicate(request.hdr.getZxid());
|
|
|
parentRecord.childCount--;
|
|
|
addChangeRecord(parentRecord);
|
|
|
- addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path,
|
|
|
- null, -1, null));
|
|
|
+ addChangeRecord(new ChangeRecord(request.hdr.getZxid(), path, null, -1, null));
|
|
|
break;
|
|
|
case OpCode.setData:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
|
SetDataRequest setDataRequest = (SetDataRequest)record;
|
|
|
path = setDataRequest.getPath();
|
|
|
nodeRecord = getRecordForPath(path);
|
|
|
- checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
|
|
|
- request.authInfo);
|
|
|
- version = setDataRequest.getVersion();
|
|
|
- int currentVersion = nodeRecord.stat.getVersion();
|
|
|
- if (version != -1 && version != currentVersion) {
|
|
|
- throw new KeeperException.BadVersionException(path);
|
|
|
- }
|
|
|
- version = currentVersion + 1;
|
|
|
- request.txn = new SetDataTxn(path, setDataRequest.getData(), version);
|
|
|
+ checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo);
|
|
|
+ int newVersion = checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
|
|
|
+ request.txn = new SetDataTxn(path, setDataRequest.getData(), newVersion);
|
|
|
nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
|
|
|
- nodeRecord.stat.setVersion(version);
|
|
|
+ nodeRecord.stat.setVersion(newVersion);
|
|
|
addChangeRecord(nodeRecord);
|
|
|
break;
|
|
|
case OpCode.setACL:
|
|
@@ -408,17 +391,11 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
throw new KeeperException.InvalidACLException(path);
|
|
|
}
|
|
|
nodeRecord = getRecordForPath(path);
|
|
|
- checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN,
|
|
|
- request.authInfo);
|
|
|
- version = setAclRequest.getVersion();
|
|
|
- currentVersion = nodeRecord.stat.getAversion();
|
|
|
- if (version != -1 && version != currentVersion) {
|
|
|
- throw new KeeperException.BadVersionException(path);
|
|
|
- }
|
|
|
- version = currentVersion + 1;
|
|
|
- request.txn = new SetACLTxn(path, listACL, version);
|
|
|
+ checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo);
|
|
|
+ newVersion = checkAndIncVersion(nodeRecord.stat.getAversion(), setAclRequest.getVersion(), path);
|
|
|
+ request.txn = new SetACLTxn(path, listACL, newVersion);
|
|
|
nodeRecord = nodeRecord.duplicate(request.hdr.getZxid());
|
|
|
- nodeRecord.stat.setAversion(version);
|
|
|
+ nodeRecord.stat.setAversion(newVersion);
|
|
|
addChangeRecord(nodeRecord);
|
|
|
break;
|
|
|
case OpCode.createSession:
|
|
@@ -458,19 +435,21 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
CheckVersionRequest checkVersionRequest = (CheckVersionRequest)record;
|
|
|
path = checkVersionRequest.getPath();
|
|
|
nodeRecord = getRecordForPath(path);
|
|
|
- checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ,
|
|
|
- request.authInfo);
|
|
|
- version = checkVersionRequest.getVersion();
|
|
|
- currentVersion = nodeRecord.stat.getVersion();
|
|
|
- if (version != -1 && version != currentVersion) {
|
|
|
- throw new KeeperException.BadVersionException(path);
|
|
|
- }
|
|
|
- version = currentVersion + 1;
|
|
|
- request.txn = new CheckVersionTxn(path, version);
|
|
|
+ checkACL(zks, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo);
|
|
|
+ request.txn = new CheckVersionTxn(path,
|
|
|
+ checkAndIncVersion(nodeRecord.stat.getVersion(), checkVersionRequest.getVersion(), path));
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private static int checkAndIncVersion(int currentVersion, int expectedVersion, String path)
|
|
|
+ throws KeeperException.BadVersionException {
|
|
|
+ if (expectedVersion != -1 && expectedVersion != currentVersion) {
|
|
|
+ throw new KeeperException.BadVersionException(path);
|
|
|
+ }
|
|
|
+ return currentVersion + 1;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This method will be called inside the ProcessRequestThread, which is a
|
|
|
* singleton, so there will be a single thread calling this code.
|
|
@@ -520,7 +499,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
KeeperException ke = null;
|
|
|
|
|
|
//Store off current pending change records in case we need to rollback
|
|
|
- HashMap<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
|
|
|
+ Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
|
|
|
|
|
|
for(Op op: multiRequest) {
|
|
|
Record subrequest = op.toRequestRecord() ;
|
|
@@ -688,8 +667,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
return false;
|
|
|
}
|
|
|
} else {
|
|
|
- AuthenticationProvider ap = ProviderRegistry.getProvider(id
|
|
|
- .getScheme());
|
|
|
+ AuthenticationProvider ap = ProviderRegistry.getProvider(id.getScheme());
|
|
|
if (ap == null) {
|
|
|
return false;
|
|
|
}
|
|
@@ -707,7 +685,6 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
|
|
|
}
|
|
|
|
|
|
public void processRequest(Request request) {
|
|
|
- // request.addRQRec(">prep="+zks.outstandingChanges.size());
|
|
|
submittedRequests.add(request);
|
|
|
}
|
|
|
|