123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563 |
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.zookeeper.server;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Collection;
- import java.util.HashSet;
- import java.util.List;
- import java.util.Set;
- import java.util.concurrent.ConcurrentHashMap;
- import org.apache.jute.InputArchive;
- import org.apache.jute.OutputArchive;
- import org.apache.jute.Record;
- import org.apache.log4j.Logger;
- import org.apache.zookeeper.KeeperException;
- import org.apache.zookeeper.Watcher;
- import org.apache.zookeeper.KeeperException.Code;
- import org.apache.zookeeper.Watcher.Event;
- import org.apache.zookeeper.ZooDefs.OpCode;
- import org.apache.zookeeper.data.ACL;
- import org.apache.zookeeper.data.Stat;
- import org.apache.zookeeper.txn.CreateTxn;
- import org.apache.zookeeper.txn.DeleteTxn;
- import org.apache.zookeeper.txn.ErrorTxn;
- import org.apache.zookeeper.txn.SetACLTxn;
- import org.apache.zookeeper.txn.SetDataTxn;
- import org.apache.zookeeper.txn.TxnHeader;
- /**
- * This class maintains the tree data structure. It doesn't have any networking
- * or client connection code in it so that it can be tested in a stand alone
- * way.
- * <p>
- * The tree maintains two parallel data structures: a hashtable that maps from
- * full paths to DataNodes and a tree of DataNodes. All accesses to a path is
- * through the hashtable. The tree is traversed only when serializing to disk.
- */
- public class DataTree {
- private static final Logger LOG = Logger.getLogger(DataTree.class);
- /**
- * This hashtable provides a fast lookup to the datanodes. The tree is the
- * source of truth and is where all the locking occurs
- */
- private ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
- private WatchManager dataWatches = new WatchManager();
- private WatchManager childWatches = new WatchManager();
- /**
- * This hashtable lists the paths of the ephemeral nodes of a session.
- */
- private ConcurrentHashMap<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
- /** A debug string * */
- private String debug = "debug";
- public HashSet<String> getEphemerals(long sessionId) {
- HashSet<String> retv = ephemerals.get(sessionId);
- if (retv == null) {
- return new HashSet<String>();
- }
- HashSet<String> cloned = null;
- synchronized(retv) {
- cloned = (HashSet<String>) retv.clone();
- }
- return cloned;
- }
- public Collection<Long> getSessions() {
- return ephemerals.keySet();
- }
- public DataNode getNode(String path) {
- return nodes.get(path);
- }
- public int getNodeCount(){
- return nodes.size();
- }
- public int getWatchCount(){
- return dataWatches.size()+childWatches.size();
- }
- /**
- * This is a pointer to the root of the DataTree. It is the source of truth,
- * but we usually use the nodes hashmap to find nodes in the tree.
- */
- private DataNode root = new DataNode(null, new byte[0], null, new Stat());
- public DataTree() {
- /* Rather than fight it, let root have an alias */
- nodes.put("", root);
- nodes.put("/", root);
- }
- static public void copyStat(Stat from, Stat to) {
- to.setAversion(from.getAversion());
- to.setCtime(from.getCtime());
- to.setCversion(from.getCversion());
- to.setCzxid(from.getCzxid());
- to.setMtime(from.getMtime());
- to.setMzxid(from.getMzxid());
- to.setVersion(from.getVersion());
- to.setEphemeralOwner(from.getEphemeralOwner());
- }
- // public void remooveInterest(String path, Watcher nw) {
- // DataNode n = nodes.get(path);
- // if (n == null) {
- // synchronized (nonExistentWatches) {
- // HashSet<Watcher> list = nonExistentWatches.get(path);
- // if (list != null) {
- // list.remove(nw);
- // }
- // }
- // }
- // synchronized (n) {
- // n.dataWatchers.remove(nw);
- // n.childWatchers.remove(nw);
- // }
- // }
- /**
- * @param path
- * @param data
- * @param acl
- * @param ephemeralOwner
- * the session id that owns this node. -1 indicates this is
- * not an ephemeral node.
- * @param zxid
- * @param time
- * @return
- * @throws KeeperException
- */
- public String createNode(String path, byte data[], List<ACL> acl,
- long ephemeralOwner, long zxid, long time) throws KeeperException.NoNodeException, KeeperException.NodeExistsException {
- int lastSlash = path.lastIndexOf('/');
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- Stat stat = new Stat();
- stat.setCtime(time);
- stat.setMtime(time);
- stat.setCzxid(zxid);
- stat.setMzxid(zxid);
- stat.setVersion(0);
- stat.setAversion(0);
- stat.setEphemeralOwner(ephemeralOwner);
- DataNode parent = nodes.get(parentName);
- if (parent == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (parent) {
- if (parent.children.contains(childName)) {
- throw new KeeperException.NodeExistsException();
- }
- int cver = parent.stat.getCversion();
- cver++;
- parent.stat.setCversion(cver);
- DataNode child = new DataNode(parent, data, acl, stat);
- parent.children.add(childName);
- nodes.put(path, child);
- if (ephemeralOwner != 0) {
- HashSet<String> list = ephemerals.get(ephemeralOwner);
- if (list == null) {
- list = new HashSet<String>();
- ephemerals.put(ephemeralOwner, list);
- }
- synchronized(list) {
- list.add(path);
- }
- }
- }
- dataWatches.triggerWatch(path, Event.EventNodeCreated);
- childWatches.triggerWatch(parentName.equals("")?"/":parentName, Event.EventNodeChildrenChanged);
- return path;
- }
- public void deleteNode(String path) throws KeeperException.NoNodeException {
- int lastSlash = path.lastIndexOf('/');
- String parentName = path.substring(0, lastSlash);
- String childName = path.substring(lastSlash + 1);
- DataNode node = nodes.get(path);
- if (node == null) {
- throw new KeeperException.NoNodeException();
- }
- nodes.remove(path);
- DataNode parent = nodes.get(parentName);
- if (parent == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (parent) {
- parent.children.remove(childName);
- parent.stat.setCversion(parent.stat.getCversion() + 1);
- long eowner = node.stat.getEphemeralOwner();
- if (eowner != 0) {
- HashSet<String> nodes = ephemerals.get(eowner);
- if (nodes != null) {
- synchronized(nodes) {
- nodes.remove(path);
- }
- }
- }
- node.parent = null;
- }
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.EVENT_DELIVERY_TRACE_MASK,
- "dataWatches.triggerWatch " + path);
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.EVENT_DELIVERY_TRACE_MASK,
- "childWatches.triggerWatch " + parentName);
- Set<Watcher> processed =
- dataWatches.triggerWatch(path, Event.EventNodeDeleted);
- childWatches.triggerWatch(path, Event.EventNodeDeleted, processed);
- childWatches.triggerWatch(parentName.equals("")?"/":parentName, Event.EventNodeChildrenChanged);
- }
- public Stat setData(String path, byte data[], int version, long zxid,
- long time) throws KeeperException.NoNodeException {
- Stat s = new Stat();
- DataNode n = nodes.get(path);
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (n) {
- n.data = data;
- n.stat.setMtime(time);
- n.stat.setMzxid(zxid);
- n.stat.setVersion(version);
- copyStat(n.stat, s);
- }
- dataWatches.triggerWatch(path, Event.EventNodeDataChanged);
- return s;
- }
- public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
- DataNode n = nodes.get(path);
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (n) {
- copyStat(n.stat, stat);
- if (watcher != null) {
- dataWatches.addWatch(path, watcher);
- }
- return n.data;
- }
- }
- public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException {
- Stat stat = new Stat();
- DataNode n = nodes.get(path);
- if (watcher != null) {
- dataWatches.addWatch(path, watcher);
- }
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (n) {
- copyStat(n.stat, stat);
- return stat;
- }
- }
- public ArrayList<String> getChildren(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
- DataNode n = nodes.get(path);
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (n) {
- ArrayList<String> children = new ArrayList<String>();
- children.addAll(n.children);
- if (watcher != null) {
- childWatches.addWatch(path, watcher);
- }
- return children;
- }
- }
- public Stat setACL(String path, List<ACL> acl, int version) throws KeeperException.NoNodeException {
- Stat stat = new Stat();
- DataNode n = nodes.get(path);
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (n) {
- n.stat.setAversion(version);
- n.acl = acl;
- copyStat(n.stat, stat);
- return stat;
- }
- }
- @SuppressWarnings("unchecked")
- public List<ACL> getACL(String path, Stat stat) throws KeeperException.NoNodeException {
- DataNode n = nodes.get(path);
- if (n == null) {
- throw new KeeperException.NoNodeException();
- }
- synchronized (n) {
- copyStat(n.stat, stat);
- return new ArrayList<ACL>(n.acl);
- }
- }
- static public class ProcessTxnResult {
- public long clientId;
- public int cxid;
- public long zxid;
- public int err;
- public int type;
- public String path;
- public Stat stat;
- /**
- * Equality is defined as the clientId and the cxid being the same. This
- * allows us to use hash tables to track completion of transactions.
- *
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object o) {
- if (o instanceof ProcessTxnResult) {
- ProcessTxnResult other = (ProcessTxnResult) o;
- return other.clientId == clientId && other.cxid == cxid;
- }
- return false;
- }
- /**
- * See equals() to find the rational for how this hashcode is generated.
- *
- * @see ProcessTxnResult#equals(Object)
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- return (int) ((clientId ^ cxid) % Integer.MAX_VALUE);
- }
- }
- public volatile long lastProcessedZxid = 0;
- @SuppressWarnings("unchecked")
- public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
- ProcessTxnResult rc = new ProcessTxnResult();
- try {
- rc.clientId = header.getClientId();
- rc.cxid = header.getCxid();
- rc.zxid = header.getZxid();
- rc.type = header.getType();
- rc.err = 0;
- if (rc.zxid > lastProcessedZxid) {
- lastProcessedZxid = rc.zxid;
- }
- switch (header.getType()) {
- case OpCode.create:
- CreateTxn createTxn = (CreateTxn) txn;
- debug = "Create transaction for " + createTxn.getPath();
- createNode(createTxn.getPath(), createTxn.getData(), createTxn
- .getAcl(), createTxn.getEphemeral() ? header
- .getClientId() : 0, header.getZxid(), header.getTime());
- rc.path = createTxn.getPath();
- break;
- case OpCode.delete:
- DeleteTxn deleteTxn = (DeleteTxn) txn;
- debug = "Delete transaction for " + deleteTxn.getPath();
- deleteNode(deleteTxn.getPath());
- break;
- case OpCode.setData:
- SetDataTxn setDataTxn = (SetDataTxn) txn;
- debug = "Set data for transaction for " + setDataTxn.getPath();
- rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(),
- setDataTxn.getVersion(), header.getZxid(), header
- .getTime());
- break;
- case OpCode.setACL:
- SetACLTxn setACLTxn = (SetACLTxn) txn;
- rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
- setACLTxn.getVersion());
- break;
- case OpCode.closeSession:
- killSession(header.getClientId());
- break;
- case OpCode.error:
- ErrorTxn errTxn = (ErrorTxn) txn;
- rc.err = errTxn.getErr();
- break;
- }
- } catch (KeeperException e) {
- // These are expected errors since we take a lazy snapshot
- if (initialized
- || (e.getCode() != Code.NoNode && e.getCode() != Code.NodeExists)) {
- LOG.warn(debug);
- LOG.error("FIXMSG",e);
- }
- }
- return rc;
- }
- void killSession(long session) {
- // the list is already removed from the ephemerals
- // so we do not have to worry about synchronyzing on
- // the list. This is only called from FinalRequestProcessor
- // so there is no need for synchornization. The list is not
- // changed here. Only create and delete change the list which
- // are again called from FinalRequestProcessor in sequence.
- HashSet<String> list = ephemerals.remove(session);
- if (list != null) {
- for (String path : list) {
- try {
- deleteNode(path);
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "Deleting ephemeral node "
- + path + " for session 0x"
- + Long.toHexString(session));
- } catch (KeeperException e) {
- LOG.error("FIXMSG",e);
- }
- }
- }
- }
- /**
- * this method uses a stringbuilder to create a new
- * path for children. This is faster than string
- * appends ( str1 + str2).
- * @param oa OutputArchive to write to.
- * @param path a string builder.
- * @throws IOException
- * @throws InterruptedException
- */
- void serializeNode(OutputArchive oa, StringBuilder path)
- throws IOException, InterruptedException {
- String pathString = path.toString();
- DataNode node = getNode(pathString);
- if (node == null) {
- return;
- }
- String children[] = null;
- synchronized (node) {
- scount++;
- oa.writeString(pathString, "path");
- oa.writeRecord(node, "node");
- children = node.children.toArray(new String[node.children.size()]);
- }
- path.append('/');
- int off = path.length();
- if (children != null) {
- for (String child : children) {
- //since this is single buffer being resused
- // we need
- // to truncate the previous bytes of string.
- path.delete(off, Integer.MAX_VALUE);
- path.append(child);
- serializeNode(oa, path);
- }
- }
- }
- int scount;
- public boolean initialized = false;
- public void serialize(OutputArchive oa, String tag) throws IOException,
- InterruptedException {
- scount = 0;
- serializeNode(oa, new StringBuilder(""));
- // / marks end of stream
- // we need to check if clear had been called in between the snapshot.
- if (root != null) {
- oa.writeString("/", "path");
- }
- }
- public void deserialize(InputArchive ia, String tag) throws IOException {
- nodes.clear();
- String path = ia.readString("path");
- while (!path.equals("/")) {
- DataNode node = new DataNode();
- ia.readRecord(node, "node");
- nodes.put(path, node);
- int lastSlash = path.lastIndexOf('/');
- if (lastSlash == -1) {
- root = node;
- } else {
- String parentPath = path.substring(0, lastSlash);
- node.parent = nodes.get(parentPath);
- node.parent.children.add(path.substring(lastSlash + 1));
- long eowner = node.stat.getEphemeralOwner();
- if (eowner != 0) {
- HashSet<String> list = ephemerals.get(eowner);
- if (list == null) {
- list = new HashSet<String>();
- ephemerals.put(eowner, list);
- }
- list.add(path);
- }
- }
- path = ia.readString("path");
- }
- nodes.put("/", root);
- }
- public String dumpEphemerals() {
- Set<Long> keys = ephemerals.keySet();
- StringBuffer sb = new StringBuffer("Sessions with Ephemerals ("
- + keys.size() + "):\n");
- for (long k : keys) {
- sb.append("0x" + Long.toHexString(k));
- sb.append(":\n");
- HashSet<String> tmp = ephemerals.get(k);
- synchronized(tmp) {
- for (String path : tmp) {
- sb.append("\t" + path + "\n");
- }
- }
- }
- return sb.toString();
- }
- public void removeCnxn(Watcher watcher) {
- dataWatches.removeWatcher(watcher);
- childWatches.removeWatcher(watcher);
- }
- public void clear() {
- root = null;
- nodes.clear();
- ephemerals.clear();
- // dataWatches = null;
- // childWatches = null;
- }
- }
|