|
@@ -21,6 +21,7 @@ package org.apache.zookeeper.server;
|
|
|
import org.apache.jute.InputArchive;
|
|
|
import org.apache.jute.OutputArchive;
|
|
|
import org.apache.jute.Record;
|
|
|
+import org.apache.zookeeper.DigestWatcher;
|
|
|
import org.apache.zookeeper.KeeperException;
|
|
|
import org.apache.zookeeper.KeeperException.Code;
|
|
|
import org.apache.zookeeper.KeeperException.NoNodeException;
|
|
@@ -39,6 +40,7 @@ import org.apache.zookeeper.common.PathTrie;
|
|
|
import org.apache.zookeeper.data.ACL;
|
|
|
import org.apache.zookeeper.data.Stat;
|
|
|
import org.apache.zookeeper.data.StatPersisted;
|
|
|
+import org.apache.zookeeper.server.util.DigestCalculator;
|
|
|
import org.apache.zookeeper.server.watch.IWatchManager;
|
|
|
import org.apache.zookeeper.server.watch.WatchManagerFactory;
|
|
|
import org.apache.zookeeper.server.watch.WatcherOrBitSet;
|
|
@@ -59,6 +61,7 @@ import org.apache.zookeeper.txn.TxnHeader;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.io.EOFException;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
|
import java.nio.ByteBuffer;
|
|
@@ -67,6 +70,7 @@ import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Map.Entry;
|
|
@@ -86,12 +90,13 @@ import java.util.concurrent.atomic.AtomicLong;
|
|
|
public class DataTree {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(DataTree.class);
|
|
|
|
|
|
+ private final RateLogger RATE_LOGGER = new RateLogger(LOG, 15 * 60 * 1000);
|
|
|
+
|
|
|
/**
|
|
|
- * This hashtable provides a fast lookup to the datanodes. The tree is the
|
|
|
+ * This map provides a fast lookup to the datanodes. The tree is the
|
|
|
* source of truth and is where all the locking occurs
|
|
|
*/
|
|
|
- private final ConcurrentHashMap<String, DataNode> nodes =
|
|
|
- new ConcurrentHashMap<String, DataNode>();
|
|
|
+ private final NodeHashMap nodes = new NodeHashMapImpl();
|
|
|
|
|
|
private IWatchManager dataWatches;
|
|
|
|
|
@@ -159,6 +164,26 @@ public class DataTree {
|
|
|
|
|
|
private final ReferenceCountedACLCache aclCache = new ReferenceCountedACLCache();
|
|
|
|
|
|
+ // The maximum number of tree digests that we will keep in our history
|
|
|
+ public static final int DIGEST_LOG_LIMIT = 1024;
|
|
|
+
|
|
|
+ // Dump digest every 128 txns, in hex it's 80, which will make it easier
|
|
|
+ // to align and compare between servers.
|
|
|
+ public static final int DIGEST_LOG_INTERVAL = 128;
|
|
|
+
|
|
|
+ // If this is not null, we are actively looking for a target zxid that we
|
|
|
+ // want to validate the digest for
|
|
|
+ private ZxidDigest digestFromLoadedSnapshot;
|
|
|
+
|
|
|
+ // The digest associated with the highest zxid in the data tree.
|
|
|
+ private volatile ZxidDigest lastProcessedZxidDigest;
|
|
|
+
|
|
|
+ // Will be notified when digest mismatch event triggered.
|
|
|
+ private final List<DigestWatcher> digestWatchers = new ArrayList<>();
|
|
|
+
|
|
|
+ // The historical digests list.
|
|
|
+ private LinkedList<ZxidDigest> digestLog = new LinkedList<>();
|
|
|
+
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public Set<String> getEphemerals(long sessionId) {
|
|
|
HashSet<String> retv = ephemerals.get(sessionId);
|
|
@@ -252,7 +277,7 @@ public class DataTree {
|
|
|
public DataTree() {
|
|
|
/* Rather than fight it, let root have an alias */
|
|
|
nodes.put("", root);
|
|
|
- nodes.put(rootZookeeper, root);
|
|
|
+ nodes.putWithoutDigest(rootZookeeper, root);
|
|
|
|
|
|
/** add the proc node and quota node */
|
|
|
root.addChild(procChildZookeeper);
|
|
@@ -468,6 +493,7 @@ public class DataTree {
|
|
|
throw new KeeperException.NodeExistsException();
|
|
|
}
|
|
|
|
|
|
+ nodes.preChange(parentName, parent);
|
|
|
if (parentCVersion == -1) {
|
|
|
parentCVersion = parent.stat.getCversion();
|
|
|
parentCVersion++;
|
|
@@ -483,6 +509,7 @@ public class DataTree {
|
|
|
}
|
|
|
DataNode child = new DataNode(data, longval, stat);
|
|
|
parent.addChild(childName);
|
|
|
+ nodes.postChange(parentName, parent);
|
|
|
nodeDataSize.addAndGet(getNodeSize(path, child.data));
|
|
|
nodes.put(path, child);
|
|
|
EphemeralType ephemeralType = EphemeralType.get(ephemeralOwner);
|
|
@@ -553,6 +580,7 @@ public class DataTree {
|
|
|
throw new KeeperException.NoNodeException();
|
|
|
}
|
|
|
synchronized (parent) {
|
|
|
+ nodes.preChange(parentName, parent);
|
|
|
parent.removeChild(childName);
|
|
|
// Only update pzxid when the zxid is larger than the current pzxid,
|
|
|
// otherwise we might override some higher pzxid set by a create
|
|
@@ -560,6 +588,7 @@ public class DataTree {
|
|
|
if (zxid > parent.stat.getPzxid()) {
|
|
|
parent.stat.setPzxid(zxid);
|
|
|
}
|
|
|
+ nodes.postChange(parentName, parent);
|
|
|
}
|
|
|
|
|
|
DataNode node = nodes.get(path);
|
|
@@ -634,11 +663,13 @@ public class DataTree {
|
|
|
byte lastdata[] = null;
|
|
|
synchronized (n) {
|
|
|
lastdata = n.data;
|
|
|
+ nodes.preChange(path, n);
|
|
|
n.data = data;
|
|
|
n.stat.setMtime(time);
|
|
|
n.stat.setMzxid(zxid);
|
|
|
n.stat.setVersion(version);
|
|
|
n.copyStat(s);
|
|
|
+ nodes.postChange(path, n);
|
|
|
}
|
|
|
// now update if the path is in a quota subtree.
|
|
|
String lastPrefix = getMaxPrefixWithQuota(path);
|
|
@@ -754,9 +785,11 @@ public class DataTree {
|
|
|
}
|
|
|
synchronized (n) {
|
|
|
aclCache.removeUsage(n.acl);
|
|
|
+ nodes.preChange(path, n);
|
|
|
n.stat.setAversion(version);
|
|
|
n.acl = aclCache.convertAcls(acl);
|
|
|
n.copyStat(stat);
|
|
|
+ nodes.postChange(path, n);
|
|
|
return stat;
|
|
|
}
|
|
|
}
|
|
@@ -1008,6 +1041,39 @@ public class DataTree {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Snapshots are taken lazily. It can happen that the child
|
|
|
+ * znodes of a parent are created after the parent
|
|
|
+ * is serialized. Therefore, while replaying logs during restore, a
|
|
|
+ * create might fail because the node was already
|
|
|
+ * created.
|
|
|
+ *
|
|
|
+ * After seeing this failure, we should increment
|
|
|
+ * the cversion of the parent znode since the parent was serialized
|
|
|
+ * before its children.
|
|
|
+ *
|
|
|
+ * Note, such failures on DT should be seen only during
|
|
|
+ * restore.
|
|
|
+ */
|
|
|
+ if (header.getType() == OpCode.create &&
|
|
|
+ rc.err == Code.NODEEXISTS.intValue()) {
|
|
|
+ LOG.debug("Adjusting parent cversion for Txn: " + header.getType() +
|
|
|
+ " path:" + rc.path + " err: " + rc.err);
|
|
|
+ int lastSlash = rc.path.lastIndexOf('/');
|
|
|
+ String parentName = rc.path.substring(0, lastSlash);
|
|
|
+ CreateTxn cTxn = (CreateTxn)txn;
|
|
|
+ try {
|
|
|
+ setCversionPzxid(parentName, cTxn.getParentCVersion(),
|
|
|
+ header.getZxid());
|
|
|
+ } catch (KeeperException.NoNodeException e) {
|
|
|
+ LOG.error("Failed to set parent cversion for: " +
|
|
|
+ parentName, e);
|
|
|
+ rc.err = e.code().intValue();
|
|
|
+ }
|
|
|
+ } else if (rc.err != Code.OK.intValue()) {
|
|
|
+ LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +
|
|
|
+ " : error: " + rc.err);
|
|
|
+ }
|
|
|
|
|
|
/*
|
|
|
* Things we can only update after the whole txn is applied to data
|
|
@@ -1042,41 +1108,15 @@ public class DataTree {
|
|
|
if (rc.zxid > lastProcessedZxid) {
|
|
|
lastProcessedZxid = rc.zxid;
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- /*
|
|
|
- * Snapshots are taken lazily. It can happen that the child
|
|
|
- * znodes of a parent are created after the parent
|
|
|
- * is serialized. Therefore, while replaying logs during restore, a
|
|
|
- * create might fail because the node was already
|
|
|
- * created.
|
|
|
- *
|
|
|
- * After seeing this failure, we should increment
|
|
|
- * the cversion of the parent znode since the parent was serialized
|
|
|
- * before its children.
|
|
|
- *
|
|
|
- * Note, such failures on DT should be seen only during
|
|
|
- * restore.
|
|
|
- */
|
|
|
- if (header.getType() == OpCode.create &&
|
|
|
- rc.err == Code.NODEEXISTS.intValue()) {
|
|
|
- LOG.debug("Adjusting parent cversion for Txn: " + header.getType() +
|
|
|
- " path:" + rc.path + " err: " + rc.err);
|
|
|
- int lastSlash = rc.path.lastIndexOf('/');
|
|
|
- String parentName = rc.path.substring(0, lastSlash);
|
|
|
- CreateTxn cTxn = (CreateTxn)txn;
|
|
|
- try {
|
|
|
- setCversionPzxid(parentName, cTxn.getParentCVersion(),
|
|
|
- header.getZxid());
|
|
|
- } catch (KeeperException.NoNodeException e) {
|
|
|
- LOG.error("Failed to set parent cversion for: " +
|
|
|
- parentName, e);
|
|
|
- rc.err = e.code().intValue();
|
|
|
+ if (digestFromLoadedSnapshot != null) {
|
|
|
+ compareSnapshotDigests(rc.zxid);
|
|
|
+ } else {
|
|
|
+ // only start recording digest when we're not in fuzzy state
|
|
|
+ logZxidDigest(rc.zxid, getTreeDigest());
|
|
|
}
|
|
|
- } else if (rc.err != Code.OK.intValue()) {
|
|
|
- LOG.debug("Ignoring processTxn failure hdr: " + header.getType() +
|
|
|
- " : error: " + rc.err);
|
|
|
}
|
|
|
+
|
|
|
return rc;
|
|
|
}
|
|
|
|
|
@@ -1163,7 +1203,9 @@ public class DataTree {
|
|
|
return;
|
|
|
}
|
|
|
synchronized (node) {
|
|
|
+ nodes.preChange(statPath, node);
|
|
|
node.data = strack.toString().getBytes();
|
|
|
+ nodes.postChange(statPath, node);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1317,7 +1359,9 @@ public class DataTree {
|
|
|
}
|
|
|
path = ia.readString("path");
|
|
|
}
|
|
|
- nodes.put("/", root);
|
|
|
+ // have counted digest for root node with "", ignore here to avoid
|
|
|
+ // counting twice for root node
|
|
|
+ nodes.putWithoutDigest("/", root);
|
|
|
|
|
|
nodeDataSize.set(approximateDataSize());
|
|
|
|
|
@@ -1492,8 +1536,10 @@ public class DataTree {
|
|
|
newCversion = node.stat.getCversion() + 1;
|
|
|
}
|
|
|
if (newCversion > node.stat.getCversion()) {
|
|
|
+ nodes.preChange(path, node);
|
|
|
node.stat.setCversion(newCversion);
|
|
|
node.stat.setPzxid(zxid);
|
|
|
+ nodes.postChange(path, node);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -1566,4 +1612,194 @@ public class DataTree {
|
|
|
}
|
|
|
ServerMetrics.getMetrics().WRITE_PER_NAMESPACE.add(namespace, path.length() + bytes);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add the digest to the historical list, and update the latest zxid digest.
|
|
|
+ */
|
|
|
+ private void logZxidDigest(long zxid, long digest) {
|
|
|
+ ZxidDigest zxidDigest = new ZxidDigest(zxid, DigestCalculator.DIGEST_VERSION, digest);
|
|
|
+ lastProcessedZxidDigest = zxidDigest;
|
|
|
+ if (zxidDigest.zxid % DIGEST_LOG_INTERVAL == 0) {
|
|
|
+ synchronized (digestLog) {
|
|
|
+ digestLog.add(zxidDigest);
|
|
|
+ if (digestLog.size() > DIGEST_LOG_LIMIT) {
|
|
|
+ digestLog.poll();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Serializing the digest to snapshot, this is done after the data tree
|
|
|
+ * is being serialized, so when we replay the txns and it hits this zxid
|
|
|
+ * we know we should be in a non-fuzzy state, and have the same digest.
|
|
|
+ *
|
|
|
+ * @param oa the output stream to write to
|
|
|
+ * @return true if the digest is serialized successfully
|
|
|
+ */
|
|
|
+ public boolean serializeZxidDigest(OutputArchive oa) throws IOException {
|
|
|
+ if (!DigestCalculator.digestEnabled()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ ZxidDigest zxidDigest = lastProcessedZxidDigest;
|
|
|
+ if (zxidDigest == null) {
|
|
|
+ // write an empty digest
|
|
|
+ zxidDigest = new ZxidDigest();
|
|
|
+ }
|
|
|
+ zxidDigest.serialize(oa);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Deserializing the zxid digest from the input stream and update the
|
|
|
+ * digestFromLoadedSnapshot.
|
|
|
+ *
|
|
|
+ * @param ia the input stream to read from
|
|
|
+ * @return the true if it deserialized successfully
|
|
|
+ */
|
|
|
+ public boolean deserializeZxidDigest(InputArchive ia) throws IOException {
|
|
|
+ if (!DigestCalculator.digestEnabled()) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ ZxidDigest zxidDigest = new ZxidDigest();
|
|
|
+ zxidDigest.deserialize(ia);
|
|
|
+ if (zxidDigest.zxid > 0) {
|
|
|
+ digestFromLoadedSnapshot = zxidDigest;
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ } catch (EOFException e) {
|
|
|
+ LOG.warn("Got EOF exception while reading the digest, " +
|
|
|
+ "likely due to the reading an older snapshot.");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Compares the actual tree's digest with that in the snapshot.
|
|
|
+ * Resets digestFromLoadedSnapshot after comparision.
|
|
|
+ *
|
|
|
+ * @param zxid zxid
|
|
|
+ */
|
|
|
+ public void compareSnapshotDigests(long zxid) {
|
|
|
+ if (zxid == digestFromLoadedSnapshot.zxid) {
|
|
|
+ if (DigestCalculator.DIGEST_VERSION != digestFromLoadedSnapshot.digestVersion) {
|
|
|
+ LOG.info("Digest version changed, local: {}, new: {}, " +
|
|
|
+ "skip comparing digest now.",
|
|
|
+ digestFromLoadedSnapshot.digestVersion, DigestCalculator.DIGEST_VERSION);
|
|
|
+ digestFromLoadedSnapshot = null;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (getTreeDigest() != digestFromLoadedSnapshot.getDigest()) {
|
|
|
+ reportDigestMismatch(zxid);
|
|
|
+ }
|
|
|
+ digestFromLoadedSnapshot = null;
|
|
|
+ } else if (digestFromLoadedSnapshot.zxid != 0 && zxid > digestFromLoadedSnapshot.zxid) {
|
|
|
+ LOG.error("Watching for zxid 0x{} during snapshot recovery, " +
|
|
|
+ "but it wasn't found.",
|
|
|
+ Long.toHexString(digestFromLoadedSnapshot.zxid));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Reports any mismatch in the transaction digest.
|
|
|
+ * @param zxid zxid for which the error is being reported.
|
|
|
+ */
|
|
|
+ public void reportDigestMismatch(long zxid) {
|
|
|
+ ServerMetrics.getMetrics().DIGEST_MISMATCHES_COUNT.add(1);
|
|
|
+ RATE_LOGGER.rateLimitLog("Digests are not matching. Value is Zxid.",
|
|
|
+ String.valueOf(zxid));
|
|
|
+
|
|
|
+ for (DigestWatcher watcher: digestWatchers) {
|
|
|
+ watcher.process(zxid);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getTreeDigest() {
|
|
|
+ return nodes.getDigest();
|
|
|
+ }
|
|
|
+
|
|
|
+ public ZxidDigest getLastProcessedZxidDigest() {
|
|
|
+ return lastProcessedZxidDigest;
|
|
|
+ }
|
|
|
+
|
|
|
+ public ZxidDigest getDigestFromLoadedSnapshot() {
|
|
|
+ return digestFromLoadedSnapshot;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Add digest mismatch event handler.
|
|
|
+ *
|
|
|
+ * @param digestWatcher the handler to add
|
|
|
+ */
|
|
|
+ public void addDigestWatcher(DigestWatcher digestWatcher) {
|
|
|
+ digestWatchers.add(digestWatcher);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return all the digests in the historical digest list.
|
|
|
+ */
|
|
|
+ public List<ZxidDigest> getDigestLog() {
|
|
|
+ synchronized (digestLog) {
|
|
|
+ // Return a copy of current digest log
|
|
|
+ return new LinkedList<ZxidDigest>(digestLog);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * A helper class to maintain the digest meta associated with specific
|
|
|
+ * zxid.
|
|
|
+ */
|
|
|
+ public static class ZxidDigest {
|
|
|
+
|
|
|
+ long zxid;
|
|
|
+ // the digest value associated with this zxid
|
|
|
+ long digest;
|
|
|
+ // the version when the digest was calculated
|
|
|
+ int digestVersion;
|
|
|
+
|
|
|
+ ZxidDigest() {
|
|
|
+ this(0, DigestCalculator.DIGEST_VERSION, 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ ZxidDigest(long zxid, int digestVersion, long digest) {
|
|
|
+ this.zxid = zxid;
|
|
|
+ this.digestVersion = digestVersion;
|
|
|
+ this.digest = digest;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void serialize(OutputArchive oa) throws IOException {
|
|
|
+ oa.writeLong(zxid, "zxid");
|
|
|
+ oa.writeInt(digestVersion, "digestVersion");
|
|
|
+ oa.writeLong(digest, "digest");
|
|
|
+ }
|
|
|
+
|
|
|
+ public void deserialize(InputArchive ia) throws IOException {
|
|
|
+ zxid = ia.readLong("zxid");
|
|
|
+ digestVersion = ia.readInt("digestVersion");
|
|
|
+ // the old version is using hex string as the digest
|
|
|
+ if (digestVersion < 2) {
|
|
|
+ String d = ia.readString("digest");
|
|
|
+ if (d != null) {
|
|
|
+ digest = Long.parseLong(d);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ digest = ia.readLong("digest");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public long getZxid() {
|
|
|
+ return zxid;
|
|
|
+ }
|
|
|
+
|
|
|
+ public int getDigestVersion() {
|
|
|
+ return digestVersion;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long getDigest() {
|
|
|
+ return digest;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|