PrepRequestProcessor.java 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.zookeeper.server;
  19. import java.nio.ByteBuffer;
  20. import java.util.HashSet;
  21. import java.util.Iterator;
  22. import java.util.LinkedList;
  23. import java.util.List;
  24. import java.util.concurrent.LinkedBlockingQueue;
  25. import org.apache.jute.Record;
  26. import org.apache.log4j.Logger;
  27. import org.apache.zookeeper.KeeperException;
  28. import org.apache.zookeeper.ZooDefs;
  29. import org.apache.zookeeper.CreateMode;
  30. import org.apache.zookeeper.KeeperException.Code;
  31. import org.apache.zookeeper.ZooDefs.OpCode;
  32. import org.apache.zookeeper.data.ACL;
  33. import org.apache.zookeeper.data.Id;
  34. import org.apache.zookeeper.data.Stat;
  35. import org.apache.zookeeper.data.StatPersisted;
  36. import org.apache.zookeeper.proto.CreateRequest;
  37. import org.apache.zookeeper.proto.DeleteRequest;
  38. import org.apache.zookeeper.proto.SetACLRequest;
  39. import org.apache.zookeeper.proto.SetDataRequest;
  40. import org.apache.zookeeper.server.ZooKeeperServer.ChangeRecord;
  41. import org.apache.zookeeper.server.auth.AuthenticationProvider;
  42. import org.apache.zookeeper.server.auth.ProviderRegistry;
  43. import org.apache.zookeeper.txn.CreateSessionTxn;
  44. import org.apache.zookeeper.txn.CreateTxn;
  45. import org.apache.zookeeper.txn.DeleteTxn;
  46. import org.apache.zookeeper.txn.ErrorTxn;
  47. import org.apache.zookeeper.txn.SetACLTxn;
  48. import org.apache.zookeeper.txn.SetDataTxn;
  49. import org.apache.zookeeper.txn.TxnHeader;
  50. /**
  51. * This request processor is generally at the start of a RequestProcessor
  52. * change. It sets up any transactions associated with requests that change the
  53. * state of the system. It counts on ZooKeeperServer to update
  54. * outstandingRequests, so that it can take into account transactions that are
  55. * in the queue to be applied when generating a transaction.
  56. */
  57. public class PrepRequestProcessor extends Thread implements RequestProcessor {
  58. private static final Logger LOG = Logger.getLogger(PrepRequestProcessor.class);
  59. static boolean skipACL;
  60. static {
  61. skipACL = System.getProperty("zookeeper.skipACL", "no").equals("yes");
  62. if (skipACL) {
  63. LOG.info("zookeeper.skipACL==\"yes\", ACL checks will be skipped");
  64. }
  65. }
  66. LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
  67. RequestProcessor nextProcessor;
  68. ZooKeeperServer zks;
  69. public PrepRequestProcessor(ZooKeeperServer zks,
  70. RequestProcessor nextProcessor) {
  71. super("ProcessThread:" + zks.getClientPort());
  72. this.nextProcessor = nextProcessor;
  73. this.zks = zks;
  74. start();
  75. }
  76. @Override
  77. public void run() {
  78. try {
  79. while (true) {
  80. Request request = submittedRequests.take();
  81. long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
  82. if (request.type == OpCode.ping) {
  83. traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
  84. }
  85. ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
  86. if (Request.requestOfDeath == request) {
  87. break;
  88. }
  89. pRequest(request);
  90. }
  91. } catch (InterruptedException e) {
  92. LOG.error("FIXMSG",e);
  93. }
  94. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  95. "PrepRequestProcessor exited loop!");
  96. }
  97. ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
  98. ChangeRecord lastChange = null;
  99. synchronized (zks.outstandingChanges) {
  100. for (int i = 0; i < zks.outstandingChanges.size(); i++) {
  101. ChangeRecord c = zks.outstandingChanges.get(i);
  102. if (c.path.equals(path)) {
  103. lastChange = c;
  104. }
  105. }
  106. if (lastChange == null) {
  107. DataNode n = zks.dataTree.getNode(path);
  108. if (n != null) {
  109. lastChange = new ChangeRecord(-1, path, n.stat, n.children
  110. .size(), n.acl);
  111. }
  112. }
  113. }
  114. if (lastChange == null || lastChange.stat == null) {
  115. throw new KeeperException.NoNodeException();
  116. }
  117. return lastChange;
  118. }
  119. void addChangeRecord(ChangeRecord c) {
  120. synchronized (zks.outstandingChanges) {
  121. zks.outstandingChanges.add(c);
  122. }
  123. }
  124. static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
  125. List<Id> ids) throws KeeperException.NoAuthException {
  126. if (skipACL) {
  127. return;
  128. }
  129. if (acl == null || acl.size() == 0) {
  130. return;
  131. }
  132. for (ACL a : acl) {
  133. Id id = a.getId();
  134. if ((a.getPerms() & perm) != 0) {
  135. if (id.getScheme().equals("world")
  136. && id.getId().equals("anyone")) {
  137. return;
  138. }
  139. AuthenticationProvider ap = ProviderRegistry.getProvider(id
  140. .getScheme());
  141. if (ap != null) {
  142. for (Id authId : ids) {
  143. if (authId.getScheme().equals("super")) {
  144. return;
  145. }
  146. if (authId.getScheme().equals(id.getScheme())
  147. && ap.matches(authId.getId(), id.getId())) {
  148. return;
  149. }
  150. }
  151. }
  152. }
  153. }
  154. throw new KeeperException.NoAuthException();
  155. }
  156. /**
  157. * This method will be called inside the ProcessRequestThread, which is a
  158. * singleton, so there will be a single thread calling this code.
  159. *
  160. * @param request
  161. */
  162. @SuppressWarnings("unchecked")
  163. protected void pRequest(Request request) {
  164. // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
  165. // request.type + " id = 0x" + Long.toHexString(request.sessionId));
  166. TxnHeader txnHeader = null;
  167. Record txn = null;
  168. try {
  169. switch (request.type) {
  170. case OpCode.create:
  171. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  172. .getNextZxid(), zks.getTime(), OpCode.create);
  173. zks.sessionTracker.checkSession(request.sessionId);
  174. CreateRequest createRequest = new CreateRequest();
  175. ZooKeeperServer.byteBuffer2Record(request.request,
  176. createRequest);
  177. String path = createRequest.getPath();
  178. int lastSlash = path.lastIndexOf('/');
  179. if (lastSlash == -1 || path.indexOf('\0') != -1) {
  180. throw new KeeperException.BadArgumentsException();
  181. }
  182. if (!fixupACL(request.authInfo, createRequest.getAcl())) {
  183. throw new KeeperException.InvalidACLException();
  184. }
  185. String parentPath = path.substring(0, lastSlash);
  186. ChangeRecord parentRecord = getRecordForPath(parentPath);
  187. checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
  188. request.authInfo);
  189. int parentCVersion = parentRecord.stat.getCversion();
  190. CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
  191. if (createMode.isSequential()) {
  192. path = path + String.format("%010d", parentCVersion);
  193. }
  194. try {
  195. if (getRecordForPath(path) != null) {
  196. throw new KeeperException.NodeExistsException();
  197. }
  198. } catch (KeeperException.NoNodeException e) {
  199. // ignore this one
  200. }
  201. boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
  202. if (ephemeralParent) {
  203. throw new KeeperException.NoChildrenForEphemeralsException();
  204. }
  205. txn = new CreateTxn(path, createRequest.getData(),
  206. createRequest.getAcl(),
  207. createMode.isEphemeral());
  208. StatPersisted s = new StatPersisted();
  209. if (createMode.isEphemeral()) {
  210. s.setEphemeralOwner(request.sessionId);
  211. }
  212. parentRecord = parentRecord.duplicate(txnHeader.getZxid());
  213. parentRecord.childCount++;
  214. parentRecord.stat
  215. .setCversion(parentRecord.stat.getCversion() + 1);
  216. addChangeRecord(parentRecord);
  217. addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path, s,
  218. 0, createRequest.getAcl()));
  219. break;
  220. case OpCode.delete:
  221. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  222. .getNextZxid(), zks.getTime(), OpCode.delete);
  223. zks.sessionTracker.checkSession(request.sessionId);
  224. DeleteRequest deleteRequest = new DeleteRequest();
  225. ZooKeeperServer.byteBuffer2Record(request.request,
  226. deleteRequest);
  227. path = deleteRequest.getPath();
  228. lastSlash = path.lastIndexOf('/');
  229. if (lastSlash == -1 || path.indexOf('\0') != -1
  230. || path.equals("/")) {
  231. throw new KeeperException.BadArgumentsException();
  232. }
  233. parentPath = path.substring(0, lastSlash);
  234. parentRecord = getRecordForPath(parentPath);
  235. ChangeRecord nodeRecord = getRecordForPath(path);
  236. checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE,
  237. request.authInfo);
  238. int version = deleteRequest.getVersion();
  239. if (version != -1 && nodeRecord.stat.getVersion() != version) {
  240. throw new KeeperException.BadVersionException();
  241. }
  242. if (nodeRecord.childCount > 0) {
  243. throw new KeeperException.NotEmptyException();
  244. }
  245. txn = new DeleteTxn(path);
  246. parentRecord = parentRecord.duplicate(txnHeader.getZxid());
  247. parentRecord.childCount--;
  248. parentRecord.stat
  249. .setCversion(parentRecord.stat.getCversion() + 1);
  250. addChangeRecord(parentRecord);
  251. addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path,
  252. null, -1, null));
  253. break;
  254. case OpCode.setData:
  255. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  256. .getNextZxid(), zks.getTime(), OpCode.setData);
  257. zks.sessionTracker.checkSession(request.sessionId);
  258. SetDataRequest setDataRequest = new SetDataRequest();
  259. ZooKeeperServer.byteBuffer2Record(request.request,
  260. setDataRequest);
  261. path = setDataRequest.getPath();
  262. nodeRecord = getRecordForPath(path);
  263. checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
  264. request.authInfo);
  265. version = setDataRequest.getVersion();
  266. int currentVersion = nodeRecord.stat.getVersion();
  267. if (version != -1 && version != currentVersion) {
  268. throw new KeeperException.BadVersionException();
  269. }
  270. version = currentVersion + 1;
  271. txn = new SetDataTxn(path, setDataRequest.getData(), version);
  272. nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
  273. nodeRecord.stat.setVersion(version);
  274. addChangeRecord(nodeRecord);
  275. break;
  276. case OpCode.setACL:
  277. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  278. .getNextZxid(), zks.getTime(), OpCode.setACL);
  279. zks.sessionTracker.checkSession(request.sessionId);
  280. SetACLRequest setAclRequest = new SetACLRequest();
  281. ZooKeeperServer.byteBuffer2Record(request.request,
  282. setAclRequest);
  283. if (!fixupACL(request.authInfo, setAclRequest.getAcl())) {
  284. throw new KeeperException.InvalidACLException();
  285. }
  286. path = setAclRequest.getPath();
  287. nodeRecord = getRecordForPath(path);
  288. checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN,
  289. request.authInfo);
  290. version = setAclRequest.getVersion();
  291. currentVersion = nodeRecord.stat.getAversion();
  292. if (version != -1 && version != currentVersion) {
  293. throw new KeeperException.BadVersionException();
  294. }
  295. version = currentVersion + 1;
  296. txn = new SetACLTxn(path, setAclRequest.getAcl(), version);
  297. nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
  298. nodeRecord.stat.setAversion(version);
  299. addChangeRecord(nodeRecord);
  300. break;
  301. case OpCode.createSession:
  302. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  303. .getNextZxid(), zks.getTime(), OpCode.createSession);
  304. request.request.rewind();
  305. int to = request.request.getInt();
  306. txn = new CreateSessionTxn(to);
  307. request.request.rewind();
  308. zks.sessionTracker.addSession(request.sessionId, to);
  309. break;
  310. case OpCode.closeSession:
  311. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  312. .getNextZxid(), zks.getTime(), OpCode.closeSession);
  313. HashSet<String> es = zks.dataTree
  314. .getEphemerals(request.sessionId);
  315. synchronized (zks.outstandingChanges) {
  316. for (ChangeRecord c : zks.outstandingChanges) {
  317. if (c.stat == null) {
  318. // Doing a delete
  319. es.remove(c.path);
  320. } else if (c.stat.getEphemeralOwner() == request.sessionId) {
  321. es.add(c.path);
  322. }
  323. }
  324. for (String path2Delete : es) {
  325. addChangeRecord(new ChangeRecord(txnHeader.getZxid(),
  326. path2Delete, null, 0, null));
  327. }
  328. }
  329. LOG.info("Processed session termination request for id: 0x"
  330. + Long.toHexString(request.sessionId));
  331. break;
  332. case OpCode.sync:
  333. case OpCode.exists:
  334. case OpCode.getData:
  335. case OpCode.getACL:
  336. case OpCode.getChildren:
  337. case OpCode.ping:
  338. break;
  339. }
  340. } catch (KeeperException e) {
  341. if (txnHeader != null) {
  342. txnHeader.setType(OpCode.error);
  343. txn = new ErrorTxn(e.getCode());
  344. }
  345. } catch (Exception e) {
  346. LOG.error("*********************************" + request);
  347. StringBuffer sb = new StringBuffer();
  348. ByteBuffer bb = request.request;
  349. if(bb!=null){
  350. bb.rewind();
  351. while (bb.hasRemaining()) {
  352. sb.append(Integer.toHexString(bb.get() & 0xff));
  353. }
  354. }else
  355. sb.append("request buffer is null");
  356. LOG.error(sb.toString());
  357. LOG.error("Unexpected exception", e);
  358. if (txnHeader != null) {
  359. txnHeader.setType(OpCode.error);
  360. txn = new ErrorTxn(Code.MarshallingError);
  361. }
  362. }
  363. request.hdr = txnHeader;
  364. request.txn = txn;
  365. if (request.hdr != null) {
  366. request.zxid = request.hdr.getZxid();
  367. }
  368. nextProcessor.processRequest(request);
  369. }
  370. /**
  371. * This method checks out the acl making sure it isn't null or empty,
  372. * it has valid schemes and ids, and expanding any relative ids that
  373. * depend on the requestor's authentication information.
  374. *
  375. * @param authInfo list of ACL IDs associated with the client connection
  376. * @param acl list of ACLs being assigned to the node (create or setACL operation)
  377. * @return
  378. */
  379. private boolean fixupACL(List<Id> authInfo, List<ACL> acl) {
  380. if (skipACL) {
  381. return true;
  382. }
  383. if (acl == null || acl.size() == 0) {
  384. return false;
  385. }
  386. Iterator<ACL> it = acl.iterator();
  387. LinkedList<ACL> toAdd = null;
  388. while (it.hasNext()) {
  389. ACL a = it.next();
  390. Id id = a.getId();
  391. if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
  392. // wide open
  393. } else if (id.getScheme().equals("auth")) {
  394. // This is the "auth" id, so we have to expand it to the
  395. // authenticated ids of the requestor
  396. it.remove();
  397. if (toAdd == null) {
  398. toAdd = new LinkedList<ACL>();
  399. }
  400. boolean authIdValid = false;
  401. for (Id cid : authInfo) {
  402. AuthenticationProvider ap = ProviderRegistry.getProvider(cid.getScheme());
  403. if (ap == null) {
  404. LOG.error("Missing AuthenticationProvider for "
  405. + cid.getScheme());
  406. } else if (ap.isAuthenticated()) {
  407. authIdValid = true;
  408. toAdd.add(new ACL(a.getPerms(), cid));
  409. }
  410. }
  411. if (!authIdValid) {
  412. return false;
  413. }
  414. } else {
  415. AuthenticationProvider ap = ProviderRegistry.getProvider(id
  416. .getScheme());
  417. if (ap == null) {
  418. return false;
  419. }
  420. if (!ap.isValid(id.getId())) {
  421. return false;
  422. }
  423. }
  424. }
  425. if (toAdd != null) {
  426. for (ACL a : toAdd) {
  427. acl.add(a);
  428. }
  429. }
  430. return acl.size() > 0;
  431. }
  432. public void processRequest(Request request) {
  433. // request.addRQRec(">prep="+zks.outstandingChanges.size());
  434. submittedRequests.add(request);
  435. }
  436. public void shutdown() {
  437. submittedRequests.clear();
  438. submittedRequests.add(Request.requestOfDeath);
  439. nextProcessor.shutdown();
  440. }
  441. }