|
@@ -56,6 +56,7 @@ import org.apache.zookeeper.proto.ReconfigRequest;
|
|
|
import org.apache.zookeeper.proto.SetACLRequest;
|
|
|
import org.apache.zookeeper.proto.SetDataRequest;
|
|
|
import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
|
|
|
+import org.apache.zookeeper.server.ZooKeeperServer.PrecalculatedDigest;
|
|
|
import org.apache.zookeeper.server.auth.ProviderRegistry;
|
|
|
import org.apache.zookeeper.server.auth.ServerAuthenticationProvider;
|
|
|
import org.apache.zookeeper.server.quorum.LeaderZooKeeperServer;
|
|
@@ -76,6 +77,7 @@ import org.apache.zookeeper.txn.MultiTxn;
|
|
|
import org.apache.zookeeper.txn.SetACLTxn;
|
|
|
import org.apache.zookeeper.txn.SetDataTxn;
|
|
|
import org.apache.zookeeper.txn.Txn;
|
|
|
+import org.apache.zookeeper.txn.TxnDigest;
|
|
|
import org.apache.zookeeper.txn.TxnHeader;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -100,9 +102,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
|
|
|
|
|
|
private final RequestProcessor nextProcessor;
|
|
|
+ private final boolean digestEnabled;
|
|
|
+ private DigestCalculator digestCalculator;
|
|
|
|
|
|
ZooKeeperServer zks;
|
|
|
|
|
|
+ public enum DigestOpCode {
|
|
|
+ NOOP, ADD, REMOVE, UPDATE;
|
|
|
+ }
|
|
|
+
|
|
|
public PrepRequestProcessor(ZooKeeperServer zks, RequestProcessor nextProcessor) {
|
|
|
super(
|
|
|
"ProcessThread(sid:" + zks.getServerId()
|
|
@@ -110,6 +118,10 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
+ "):", zks.getZooKeeperServerListener());
|
|
|
this.nextProcessor = nextProcessor;
|
|
|
this.zks = zks;
|
|
|
+ this.digestEnabled = ZooKeeperServer.isDigestEnabled();
|
|
|
+ if (this.digestEnabled) {
|
|
|
+ this.digestCalculator = new DigestCalculator();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -159,6 +171,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
children = n.getChildren();
|
|
|
}
|
|
|
lastChange = new ChangeRecord(-1, path, n.stat, children.size(), zks.getZKDatabase().aclForNode(n));
|
|
|
+
|
|
|
+ if (digestEnabled) {
|
|
|
+ lastChange.precalculatedDigest = new PrecalculatedDigest(
|
|
|
+ digestCalculator.calculateDigest(path, n), 0);
|
|
|
+ }
|
|
|
+ lastChange.data = n.getData();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -297,8 +315,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
* @param record
|
|
|
*/
|
|
|
protected void pRequest2Txn(int type, long zxid, Request request, Record record, boolean deserialize) throws KeeperException, IOException, RequestProcessorException {
|
|
|
- request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
|
|
|
+ if (request.getHdr() == null) {
|
|
|
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
|
|
|
+ Time.currentWallTime(), type));
|
|
|
+ }
|
|
|
|
|
|
+ PrecalculatedDigest precalculatedDigest;
|
|
|
switch (type) {
|
|
|
case OpCode.create:
|
|
|
case OpCode.create2:
|
|
@@ -321,8 +343,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
request.setTxn(new DeleteTxn(path));
|
|
|
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
|
|
|
parentRecord.childCount--;
|
|
|
+ parentRecord.stat.setPzxid(request.getHdr().getZxid());
|
|
|
+ parentRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
|
|
|
addChangeRecord(parentRecord);
|
|
|
- addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
|
|
|
+
|
|
|
+ nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
|
|
|
+ nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
|
|
|
+ setTxnDigest(request, nodeRecord.precalculatedDigest);
|
|
|
+ addChangeRecord(nodeRecord);
|
|
|
break;
|
|
|
}
|
|
|
case OpCode.delete:
|
|
@@ -343,8 +372,15 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
request.setTxn(new DeleteTxn(path));
|
|
|
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
|
|
|
parentRecord.childCount--;
|
|
|
+ parentRecord.stat.setPzxid(request.getHdr().getZxid());
|
|
|
+ parentRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
|
|
|
addChangeRecord(parentRecord);
|
|
|
- addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null));
|
|
|
+
|
|
|
+ nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
|
|
|
+ nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
|
|
|
+ setTxnDigest(request, nodeRecord.precalculatedDigest);
|
|
|
+ addChangeRecord(nodeRecord);
|
|
|
break;
|
|
|
case OpCode.setData:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
@@ -360,6 +396,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
|
|
|
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
|
|
|
nodeRecord.stat.setVersion(newVersion);
|
|
|
+ nodeRecord.stat.setMtime(request.getHdr().getTime());
|
|
|
+ nodeRecord.stat.setMzxid(zxid);
|
|
|
+ nodeRecord.data = setDataRequest.getData();
|
|
|
+ nodeRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
|
|
|
+ setTxnDigest(request, nodeRecord.precalculatedDigest);
|
|
|
addChangeRecord(nodeRecord);
|
|
|
break;
|
|
|
case OpCode.reconfig:
|
|
@@ -490,10 +532,20 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
|
|
|
nodeRecord = getRecordForPath(ZooDefs.CONFIG_NODE);
|
|
|
zks.checkACL(request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, null, null);
|
|
|
- request.setTxn(new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1));
|
|
|
+ SetDataTxn setDataTxn = new SetDataTxn(ZooDefs.CONFIG_NODE, request.qv.toString().getBytes(), -1);
|
|
|
+ request.setTxn(setDataTxn);
|
|
|
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
|
|
|
nodeRecord.stat.setVersion(-1);
|
|
|
+ nodeRecord.stat.setMtime(request.getHdr().getTime());
|
|
|
+ nodeRecord.stat.setMzxid(zxid);
|
|
|
+ nodeRecord.data = setDataTxn.getData();
|
|
|
+ // Reconfig is currently a noop from digest computation
|
|
|
+ // perspective since config node is not covered by the digests.
|
|
|
+ nodeRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.NOOP, ZooDefs.CONFIG_NODE, nodeRecord.data, nodeRecord.stat);
|
|
|
+ setTxnDigest(request, nodeRecord.precalculatedDigest);
|
|
|
addChangeRecord(nodeRecord);
|
|
|
+
|
|
|
break;
|
|
|
case OpCode.setACL:
|
|
|
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
|
|
@@ -510,6 +562,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
request.setTxn(new SetACLTxn(path, listACL, newVersion));
|
|
|
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
|
|
|
nodeRecord.stat.setAversion(newVersion);
|
|
|
+ nodeRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
|
|
|
+ setTxnDigest(request, nodeRecord.precalculatedDigest);
|
|
|
addChangeRecord(nodeRecord);
|
|
|
break;
|
|
|
case OpCode.createSession:
|
|
@@ -542,7 +597,20 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
}
|
|
|
}
|
|
|
for (String path2Delete : es) {
|
|
|
- addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null));
|
|
|
+ if (digestEnabled) {
|
|
|
+ parentPath = getParentPathAndValidate(path2Delete);
|
|
|
+ parentRecord = getRecordForPath(parentPath);
|
|
|
+ parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
|
|
|
+ parentRecord.stat.setPzxid(request.getHdr().getZxid());
|
|
|
+ parentRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
|
|
|
+ addChangeRecord(parentRecord);
|
|
|
+ }
|
|
|
+ nodeRecord = new ChangeRecord(
|
|
|
+ request.getHdr().getZxid(), path2Delete, null, 0, null);
|
|
|
+ nodeRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.REMOVE, path2Delete);
|
|
|
+ addChangeRecord(nodeRecord);
|
|
|
}
|
|
|
if (ZooKeeperServer.isCloseSessionTxnEnabled()) {
|
|
|
request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
|
|
@@ -569,6 +637,12 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
LOG.warn("unknown type {}", type);
|
|
|
break;
|
|
|
}
|
|
|
+
|
|
|
+ // If the txn is not going to mutate anything, like createSession,
|
|
|
+ // we just set the current tree digest in it
|
|
|
+ if (request.getTxnDigest() == null && digestEnabled) {
|
|
|
+ setTxnDigest(request);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void pRequest2TxnCreate(int type, Request request, Record record, boolean deserialize) throws IOException, KeeperException {
|
|
@@ -628,15 +702,31 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
} else {
|
|
|
request.setTxn(new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
|
|
|
}
|
|
|
- StatPersisted s = new StatPersisted();
|
|
|
- if (createMode.isEphemeral()) {
|
|
|
- s.setEphemeralOwner(request.sessionId);
|
|
|
+
|
|
|
+ TxnHeader hdr = request.getHdr();
|
|
|
+ long ephemeralOwner = 0;
|
|
|
+ if (createMode.isContainer()) {
|
|
|
+ ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
|
|
|
+ } else if (createMode.isTTL()) {
|
|
|
+ ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
|
|
|
+ } else if (createMode.isEphemeral()) {
|
|
|
+ ephemeralOwner = request.sessionId;
|
|
|
}
|
|
|
+ StatPersisted s = DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
|
|
|
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
|
|
|
parentRecord.childCount++;
|
|
|
parentRecord.stat.setCversion(newCversion);
|
|
|
+ parentRecord.stat.setPzxid(request.getHdr().getZxid());
|
|
|
+ parentRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
|
|
|
addChangeRecord(parentRecord);
|
|
|
- addChangeRecord(new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL));
|
|
|
+ ChangeRecord nodeRecord = new ChangeRecord(
|
|
|
+ request.getHdr().getZxid(), path, s, 0, listACL);
|
|
|
+ nodeRecord.data = data;
|
|
|
+ nodeRecord.precalculatedDigest = precalculateDigest(
|
|
|
+ DigestOpCode.ADD, path, nodeRecord.data, s);
|
|
|
+ setTxnDigest(request, nodeRecord.precalculatedDigest);
|
|
|
+ addChangeRecord(nodeRecord);
|
|
|
}
|
|
|
|
|
|
private void validatePath(String path, long sessionId) throws BadArgumentsException {
|
|
@@ -724,6 +814,8 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
|
|
|
//Store off current pending change records in case we need to rollback
|
|
|
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
|
|
|
+ request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid,
|
|
|
+ Time.currentWallTime(), request.type));
|
|
|
|
|
|
for (Op op : multiRequest) {
|
|
|
Record subrequest = op.toRequestRecord();
|
|
@@ -741,7 +833,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
/* Prep the request and convert to a Txn */
|
|
|
try {
|
|
|
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
|
|
|
- type = request.getHdr().getType();
|
|
|
+ type = op.getType();
|
|
|
txn = request.getTxn();
|
|
|
} catch (KeeperException e) {
|
|
|
ke = e;
|
|
@@ -774,8 +866,10 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- request.setHdr(new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
|
|
|
request.setTxn(new MultiTxn(txns));
|
|
|
+ if (digestEnabled) {
|
|
|
+ setTxnDigest(request);
|
|
|
+ }
|
|
|
|
|
|
break;
|
|
|
|
|
@@ -956,4 +1050,81 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements Req
|
|
|
nextProcessor.shutdown();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Calculate the node digest and tree digest after the change.
|
|
|
+ *
|
|
|
+ * @param type the type of operations about the digest change
|
|
|
+ * @param path the path of the node
|
|
|
+ * @param data the data of the node
|
|
|
+ * @param s the stat of the node
|
|
|
+ *
|
|
|
+ * @return PrecalculatedDigest the pair of node and tree digest
|
|
|
+ */
|
|
|
+ private PrecalculatedDigest precalculateDigest(DigestOpCode type, String path,
|
|
|
+ byte[] data, StatPersisted s) throws KeeperException.NoNodeException {
|
|
|
+
|
|
|
+ if (!digestEnabled) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ long prevNodeDigest;
|
|
|
+ long newNodeDigest;
|
|
|
+
|
|
|
+ switch (type) {
|
|
|
+ case ADD:
|
|
|
+ prevNodeDigest = 0;
|
|
|
+ newNodeDigest = digestCalculator.calculateDigest(path, data, s);
|
|
|
+ break;
|
|
|
+ case REMOVE:
|
|
|
+ prevNodeDigest = getRecordForPath(path).precalculatedDigest.nodeDigest;
|
|
|
+ newNodeDigest = 0;
|
|
|
+ break;
|
|
|
+ case UPDATE:
|
|
|
+ prevNodeDigest = getRecordForPath(path).precalculatedDigest.nodeDigest;
|
|
|
+ newNodeDigest = digestCalculator.calculateDigest(path, data, s);
|
|
|
+ break;
|
|
|
+ case NOOP:
|
|
|
+ newNodeDigest = prevNodeDigest = 0;
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ long treeDigest = getCurrentTreeDigest() - prevNodeDigest + newNodeDigest;
|
|
|
+ return new PrecalculatedDigest(newNodeDigest, treeDigest);
|
|
|
+ }
|
|
|
+
|
|
|
+ private PrecalculatedDigest precalculateDigest(
|
|
|
+ DigestOpCode type, String path) throws KeeperException.NoNodeException {
|
|
|
+ return precalculateDigest(type, path, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Query the current tree digest from DataTree or outstandingChanges list.
|
|
|
+ *
|
|
|
+ * @return current tree digest
|
|
|
+ */
|
|
|
+ private long getCurrentTreeDigest() {
|
|
|
+ long digest;
|
|
|
+ synchronized (zks.outstandingChanges) {
|
|
|
+ if (zks.outstandingChanges.isEmpty()) {
|
|
|
+ digest = zks.getZKDatabase().getDataTree().getTreeDigest();
|
|
|
+ LOG.debug("Digest got from data tree is: {}", digest);
|
|
|
+ } else {
|
|
|
+ digest = zks.outstandingChanges.peekLast().precalculatedDigest.treeDigest;
|
|
|
+ LOG.debug("Digest got from outstandingChanges is: {}", digest);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return digest;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setTxnDigest(Request request) {
|
|
|
+ request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), getCurrentTreeDigest()));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setTxnDigest(Request request, PrecalculatedDigest preCalculatedDigest) {
|
|
|
+ if (preCalculatedDigest == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), preCalculatedDigest.treeDigest));
|
|
|
+ }
|
|
|
}
|