DataTree.java 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.zookeeper.server;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Collection;
  22. import java.util.HashSet;
  23. import java.util.List;
  24. import java.util.Set;
  25. import java.util.concurrent.ConcurrentHashMap;
  26. import org.apache.jute.InputArchive;
  27. import org.apache.jute.OutputArchive;
  28. import org.apache.jute.Record;
  29. import org.apache.log4j.Logger;
  30. import org.apache.zookeeper.KeeperException;
  31. import org.apache.zookeeper.Watcher;
  32. import org.apache.zookeeper.KeeperException.Code;
  33. import org.apache.zookeeper.Watcher.Event;
  34. import org.apache.zookeeper.Watcher.Event.EventType;
  35. import org.apache.zookeeper.ZooDefs.OpCode;
  36. import org.apache.zookeeper.data.ACL;
  37. import org.apache.zookeeper.data.Stat;
  38. import org.apache.zookeeper.data.StatPersisted;
  39. import org.apache.zookeeper.txn.CreateTxn;
  40. import org.apache.zookeeper.txn.DeleteTxn;
  41. import org.apache.zookeeper.txn.ErrorTxn;
  42. import org.apache.zookeeper.txn.SetACLTxn;
  43. import org.apache.zookeeper.txn.SetDataTxn;
  44. import org.apache.zookeeper.txn.TxnHeader;
  45. /**
  46. * This class maintains the tree data structure. It doesn't have any networking
  47. * or client connection code in it so that it can be tested in a stand alone
  48. * way.
  49. * <p>
  50. * The tree maintains two parallel data structures: a hashtable that maps from
  51. * full paths to DataNodes and a tree of DataNodes. All accesses to a path is
  52. * through the hashtable. The tree is traversed only when serializing to disk.
  53. */
  54. public class DataTree {
  55. private static final Logger LOG = Logger.getLogger(DataTree.class);
  56. /**
  57. * This hashtable provides a fast lookup to the datanodes. The tree is the
  58. * source of truth and is where all the locking occurs
  59. */
  60. private ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
  61. private WatchManager dataWatches = new WatchManager();
  62. private WatchManager childWatches = new WatchManager();
  63. /**
  64. * This hashtable lists the paths of the ephemeral nodes of a session.
  65. */
  66. private ConcurrentHashMap<Long, HashSet<String>> ephemerals = new ConcurrentHashMap<Long, HashSet<String>>();
  67. /** A debug string * */
  68. private String debug = "debug";
  69. @SuppressWarnings("unchecked")
  70. public HashSet<String> getEphemerals(long sessionId) {
  71. HashSet<String> retv = ephemerals.get(sessionId);
  72. if (retv == null) {
  73. return new HashSet<String>();
  74. }
  75. HashSet<String> cloned = null;
  76. synchronized(retv) {
  77. cloned = (HashSet<String>) retv.clone();
  78. }
  79. return cloned;
  80. }
  81. public Collection<Long> getSessions() {
  82. return ephemerals.keySet();
  83. }
  84. public DataNode getNode(String path) {
  85. return nodes.get(path);
  86. }
  87. public int getNodeCount(){
  88. return nodes.size();
  89. }
  90. public int getWatchCount(){
  91. return dataWatches.size()+childWatches.size();
  92. }
  93. /**
  94. * This is a pointer to the root of the DataTree. It is the source of truth,
  95. * but we usually use the nodes hashmap to find nodes in the tree.
  96. */
  97. private DataNode root =
  98. new DataNode(null, new byte[0], null, new StatPersisted());
  99. public DataTree() {
  100. /* Rather than fight it, let root have an alias */
  101. nodes.put("", root);
  102. nodes.put("/", root);
  103. }
  104. static public void copyStatPersisted(StatPersisted from, StatPersisted to) {
  105. to.setAversion(from.getAversion());
  106. to.setCtime(from.getCtime());
  107. to.setCversion(from.getCversion());
  108. to.setCzxid(from.getCzxid());
  109. to.setMtime(from.getMtime());
  110. to.setMzxid(from.getMzxid());
  111. to.setVersion(from.getVersion());
  112. to.setEphemeralOwner(from.getEphemeralOwner());
  113. }
  114. static public void copyStat(Stat from, Stat to) {
  115. to.setAversion(from.getAversion());
  116. to.setCtime(from.getCtime());
  117. to.setCversion(from.getCversion());
  118. to.setCzxid(from.getCzxid());
  119. to.setMtime(from.getMtime());
  120. to.setMzxid(from.getMzxid());
  121. to.setVersion(from.getVersion());
  122. to.setEphemeralOwner(from.getEphemeralOwner());
  123. to.setDataLength(from.getDataLength());
  124. to.setNumChildren(from.getNumChildren());
  125. }
  126. // public void remooveInterest(String path, Watcher nw) {
  127. // DataNode n = nodes.get(path);
  128. // if (n == null) {
  129. // synchronized (nonExistentWatches) {
  130. // HashSet<Watcher> list = nonExistentWatches.get(path);
  131. // if (list != null) {
  132. // list.remove(nw);
  133. // }
  134. // }
  135. // }
  136. // synchronized (n) {
  137. // n.dataWatchers.remove(nw);
  138. // n.childWatchers.remove(nw);
  139. // }
  140. // }
  141. /**
  142. * @param path
  143. * @param data
  144. * @param acl
  145. * @param ephemeralOwner
  146. * the session id that owns this node. -1 indicates this is
  147. * not an ephemeral node.
  148. * @param zxid
  149. * @param time
  150. * @return the patch of the created node
  151. * @throws KeeperException
  152. */
  153. public String createNode(String path, byte data[], List<ACL> acl,
  154. long ephemeralOwner, long zxid, long time) throws KeeperException.NoNodeException, KeeperException.NodeExistsException {
  155. int lastSlash = path.lastIndexOf('/');
  156. String parentName = path.substring(0, lastSlash);
  157. String childName = path.substring(lastSlash + 1);
  158. StatPersisted stat = new StatPersisted();
  159. stat.setCtime(time);
  160. stat.setMtime(time);
  161. stat.setCzxid(zxid);
  162. stat.setMzxid(zxid);
  163. stat.setVersion(0);
  164. stat.setAversion(0);
  165. stat.setEphemeralOwner(ephemeralOwner);
  166. DataNode parent = nodes.get(parentName);
  167. if (parent == null) {
  168. throw new KeeperException.NoNodeException();
  169. }
  170. synchronized (parent) {
  171. if (parent.children.contains(childName)) {
  172. throw new KeeperException.NodeExistsException();
  173. }
  174. int cver = parent.stat.getCversion();
  175. cver++;
  176. parent.stat.setCversion(cver);
  177. DataNode child = new DataNode(parent, data, acl, stat);
  178. parent.children.add(childName);
  179. nodes.put(path, child);
  180. if (ephemeralOwner != 0) {
  181. HashSet<String> list = ephemerals.get(ephemeralOwner);
  182. if (list == null) {
  183. list = new HashSet<String>();
  184. ephemerals.put(ephemeralOwner, list);
  185. }
  186. synchronized(list) {
  187. list.add(path);
  188. }
  189. }
  190. }
  191. dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
  192. childWatches.triggerWatch(parentName.equals("")?"/":parentName, Event.EventType.NodeChildrenChanged);
  193. return path;
  194. }
  195. public void deleteNode(String path) throws KeeperException.NoNodeException {
  196. int lastSlash = path.lastIndexOf('/');
  197. String parentName = path.substring(0, lastSlash);
  198. String childName = path.substring(lastSlash + 1);
  199. DataNode node = nodes.get(path);
  200. if (node == null) {
  201. throw new KeeperException.NoNodeException();
  202. }
  203. nodes.remove(path);
  204. DataNode parent = nodes.get(parentName);
  205. if (parent == null) {
  206. throw new KeeperException.NoNodeException();
  207. }
  208. synchronized (parent) {
  209. parent.children.remove(childName);
  210. parent.stat.setCversion(parent.stat.getCversion() + 1);
  211. long eowner = node.stat.getEphemeralOwner();
  212. if (eowner != 0) {
  213. HashSet<String> nodes = ephemerals.get(eowner);
  214. if (nodes != null) {
  215. synchronized(nodes) {
  216. nodes.remove(path);
  217. }
  218. }
  219. }
  220. node.parent = null;
  221. }
  222. ZooTrace.logTraceMessage(LOG,
  223. ZooTrace.EVENT_DELIVERY_TRACE_MASK,
  224. "dataWatches.triggerWatch " + path);
  225. ZooTrace.logTraceMessage(LOG,
  226. ZooTrace.EVENT_DELIVERY_TRACE_MASK,
  227. "childWatches.triggerWatch " + parentName);
  228. Set<Watcher> processed =
  229. dataWatches.triggerWatch(path, EventType.NodeDeleted);
  230. childWatches.triggerWatch(path, EventType.NodeDeleted, processed);
  231. childWatches.triggerWatch(parentName.equals("")?"/":parentName, EventType.NodeChildrenChanged);
  232. }
  233. public Stat setData(String path, byte data[], int version, long zxid,
  234. long time) throws KeeperException.NoNodeException {
  235. Stat s = new Stat();
  236. DataNode n = nodes.get(path);
  237. if (n == null) {
  238. throw new KeeperException.NoNodeException();
  239. }
  240. synchronized (n) {
  241. n.data = data;
  242. n.stat.setMtime(time);
  243. n.stat.setMzxid(zxid);
  244. n.stat.setVersion(version);
  245. n.copyStat(s);
  246. }
  247. dataWatches.triggerWatch(path, EventType.NodeDataChanged);
  248. return s;
  249. }
  250. public byte[] getData(String path, Stat stat, Watcher watcher) throws KeeperException.NoNodeException {
  251. DataNode n = nodes.get(path);
  252. if (n == null) {
  253. throw new KeeperException.NoNodeException();
  254. }
  255. synchronized (n) {
  256. n.copyStat(stat);
  257. if (watcher != null) {
  258. dataWatches.addWatch(path, watcher);
  259. }
  260. return n.data;
  261. }
  262. }
  263. public Stat statNode(String path, Watcher watcher) throws KeeperException.NoNodeException {
  264. Stat stat = new Stat();
  265. DataNode n = nodes.get(path);
  266. if (watcher != null) {
  267. dataWatches.addWatch(path, watcher);
  268. }
  269. if (n == null) {
  270. throw new KeeperException.NoNodeException();
  271. }
  272. synchronized (n) {
  273. n.copyStat(stat);
  274. return stat;
  275. }
  276. }
  277. public List<String> getChildren(String path, Stat stat, Watcher watcher)
  278. throws KeeperException.NoNodeException {
  279. DataNode n = nodes.get(path);
  280. if (n == null) {
  281. throw new KeeperException.NoNodeException();
  282. }
  283. synchronized (n) {
  284. ArrayList<String> children = new ArrayList<String>();
  285. children.addAll(n.children);
  286. if (watcher != null) {
  287. childWatches.addWatch(path, watcher);
  288. }
  289. return children;
  290. }
  291. }
  292. public Stat setACL(String path, List<ACL> acl, int version) throws KeeperException.NoNodeException {
  293. Stat stat = new Stat();
  294. DataNode n = nodes.get(path);
  295. if (n == null) {
  296. throw new KeeperException.NoNodeException();
  297. }
  298. synchronized (n) {
  299. n.stat.setAversion(version);
  300. n.acl = acl;
  301. n.copyStat(stat);
  302. return stat;
  303. }
  304. }
  305. @SuppressWarnings("unchecked")
  306. public List<ACL> getACL(String path, Stat stat) throws KeeperException.NoNodeException {
  307. DataNode n = nodes.get(path);
  308. if (n == null) {
  309. throw new KeeperException.NoNodeException();
  310. }
  311. synchronized (n) {
  312. n.copyStat(stat);
  313. return new ArrayList<ACL>(n.acl);
  314. }
  315. }
  316. static public class ProcessTxnResult {
  317. public long clientId;
  318. public int cxid;
  319. public long zxid;
  320. public int err;
  321. public int type;
  322. public String path;
  323. public Stat stat;
  324. /**
  325. * Equality is defined as the clientId and the cxid being the same. This
  326. * allows us to use hash tables to track completion of transactions.
  327. *
  328. * @see java.lang.Object#equals(java.lang.Object)
  329. */
  330. @Override
  331. public boolean equals(Object o) {
  332. if (o instanceof ProcessTxnResult) {
  333. ProcessTxnResult other = (ProcessTxnResult) o;
  334. return other.clientId == clientId && other.cxid == cxid;
  335. }
  336. return false;
  337. }
  338. /**
  339. * See equals() to find the rational for how this hashcode is generated.
  340. *
  341. * @see ProcessTxnResult#equals(Object)
  342. * @see java.lang.Object#hashCode()
  343. */
  344. @Override
  345. public int hashCode() {
  346. return (int) ((clientId ^ cxid) % Integer.MAX_VALUE);
  347. }
  348. }
  349. public volatile long lastProcessedZxid = 0;
  350. @SuppressWarnings("unchecked")
  351. public ProcessTxnResult processTxn(TxnHeader header, Record txn) {
  352. ProcessTxnResult rc = new ProcessTxnResult();
  353. try {
  354. rc.clientId = header.getClientId();
  355. rc.cxid = header.getCxid();
  356. rc.zxid = header.getZxid();
  357. rc.type = header.getType();
  358. rc.err = 0;
  359. if (rc.zxid > lastProcessedZxid) {
  360. lastProcessedZxid = rc.zxid;
  361. }
  362. switch (header.getType()) {
  363. case OpCode.create:
  364. CreateTxn createTxn = (CreateTxn) txn;
  365. debug = "Create transaction for " + createTxn.getPath();
  366. createNode(createTxn.getPath(), createTxn.getData(), createTxn
  367. .getAcl(), createTxn.getEphemeral() ? header
  368. .getClientId() : 0, header.getZxid(), header.getTime());
  369. rc.path = createTxn.getPath();
  370. break;
  371. case OpCode.delete:
  372. DeleteTxn deleteTxn = (DeleteTxn) txn;
  373. debug = "Delete transaction for " + deleteTxn.getPath();
  374. deleteNode(deleteTxn.getPath());
  375. break;
  376. case OpCode.setData:
  377. SetDataTxn setDataTxn = (SetDataTxn) txn;
  378. debug = "Set data for transaction for " + setDataTxn.getPath();
  379. rc.stat = setData(setDataTxn.getPath(), setDataTxn.getData(),
  380. setDataTxn.getVersion(), header.getZxid(), header
  381. .getTime());
  382. break;
  383. case OpCode.setACL:
  384. SetACLTxn setACLTxn = (SetACLTxn) txn;
  385. rc.stat = setACL(setACLTxn.getPath(), setACLTxn.getAcl(),
  386. setACLTxn.getVersion());
  387. break;
  388. case OpCode.closeSession:
  389. killSession(header.getClientId());
  390. break;
  391. case OpCode.error:
  392. ErrorTxn errTxn = (ErrorTxn) txn;
  393. rc.err = errTxn.getErr();
  394. break;
  395. }
  396. } catch (KeeperException e) {
  397. // These are expected errors since we take a lazy snapshot
  398. if (initialized
  399. || (e.getCode() != Code.NoNode && e.getCode() != Code.NodeExists)) {
  400. LOG.warn(debug);
  401. LOG.error("FIXMSG",e);
  402. }
  403. }
  404. return rc;
  405. }
  406. void killSession(long session) {
  407. // the list is already removed from the ephemerals
  408. // so we do not have to worry about synchronyzing on
  409. // the list. This is only called from FinalRequestProcessor
  410. // so there is no need for synchornization. The list is not
  411. // changed here. Only create and delete change the list which
  412. // are again called from FinalRequestProcessor in sequence.
  413. HashSet<String> list = ephemerals.remove(session);
  414. if (list != null) {
  415. for (String path : list) {
  416. try {
  417. deleteNode(path);
  418. ZooTrace.logTraceMessage(LOG,
  419. ZooTrace.SESSION_TRACE_MASK,
  420. "Deleting ephemeral node "
  421. + path + " for session 0x"
  422. + Long.toHexString(session));
  423. } catch (KeeperException e) {
  424. LOG.error("FIXMSG",e);
  425. }
  426. }
  427. }
  428. }
  429. /**
  430. * this method uses a stringbuilder to create a new
  431. * path for children. This is faster than string
  432. * appends ( str1 + str2).
  433. * @param oa OutputArchive to write to.
  434. * @param path a string builder.
  435. * @throws IOException
  436. * @throws InterruptedException
  437. */
  438. void serializeNode(OutputArchive oa, StringBuilder path) throws IOException {
  439. String pathString = path.toString();
  440. DataNode node = getNode(pathString);
  441. if (node == null) {
  442. return;
  443. }
  444. String children[] = null;
  445. synchronized (node) {
  446. scount++;
  447. oa.writeString(pathString, "path");
  448. oa.writeRecord(node, "node");
  449. children = node.children.toArray(new String[node.children.size()]);
  450. }
  451. path.append('/');
  452. int off = path.length();
  453. if (children != null) {
  454. for (String child : children) {
  455. //since this is single buffer being resused
  456. // we need
  457. // to truncate the previous bytes of string.
  458. path.delete(off, Integer.MAX_VALUE);
  459. path.append(child);
  460. serializeNode(oa, path);
  461. }
  462. }
  463. }
  464. int scount;
  465. public boolean initialized = false;
  466. public void serialize(OutputArchive oa, String tag) throws IOException {
  467. scount = 0;
  468. serializeNode(oa, new StringBuilder(""));
  469. // / marks end of stream
  470. // we need to check if clear had been called in between the snapshot.
  471. if (root != null) {
  472. oa.writeString("/", "path");
  473. }
  474. }
  475. public void deserialize(InputArchive ia, String tag) throws IOException {
  476. nodes.clear();
  477. String path = ia.readString("path");
  478. while (!path.equals("/")) {
  479. DataNode node = new DataNode();
  480. ia.readRecord(node, "node");
  481. nodes.put(path, node);
  482. int lastSlash = path.lastIndexOf('/');
  483. if (lastSlash == -1) {
  484. root = node;
  485. } else {
  486. String parentPath = path.substring(0, lastSlash);
  487. node.parent = nodes.get(parentPath);
  488. node.parent.children.add(path.substring(lastSlash + 1));
  489. long eowner = node.stat.getEphemeralOwner();
  490. if (eowner != 0) {
  491. HashSet<String> list = ephemerals.get(eowner);
  492. if (list == null) {
  493. list = new HashSet<String>();
  494. ephemerals.put(eowner, list);
  495. }
  496. list.add(path);
  497. }
  498. }
  499. path = ia.readString("path");
  500. }
  501. nodes.put("/", root);
  502. }
  503. public String dumpEphemerals() {
  504. Set<Long> keys = ephemerals.keySet();
  505. StringBuffer sb = new StringBuffer("Sessions with Ephemerals ("
  506. + keys.size() + "):\n");
  507. for (long k : keys) {
  508. sb.append("0x" + Long.toHexString(k));
  509. sb.append(":\n");
  510. HashSet<String> tmp = ephemerals.get(k);
  511. synchronized(tmp) {
  512. for (String path : tmp) {
  513. sb.append("\t" + path + "\n");
  514. }
  515. }
  516. }
  517. return sb.toString();
  518. }
  519. public void removeCnxn(Watcher watcher) {
  520. dataWatches.removeWatcher(watcher);
  521. childWatches.removeWatcher(watcher);
  522. }
  523. public void clear() {
  524. root = null;
  525. nodes.clear();
  526. ephemerals.clear();
  527. // dataWatches = null;
  528. // childWatches = null;
  529. }
  530. }