PrepRequestProcessor.java 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package com.yahoo.zookeeper.server;
  17. import java.nio.ByteBuffer;
  18. import java.util.ArrayList;
  19. import java.util.HashSet;
  20. import java.util.Iterator;
  21. import java.util.LinkedList;
  22. import java.util.List;
  23. import java.util.concurrent.LinkedBlockingQueue;
  24. import org.apache.log4j.Logger;
  25. import com.yahoo.jute.Record;
  26. import com.yahoo.zookeeper.KeeperException;
  27. import com.yahoo.zookeeper.ZooDefs;
  28. import com.yahoo.zookeeper.KeeperException.Code;
  29. import com.yahoo.zookeeper.ZooDefs.CreateFlags;
  30. import com.yahoo.zookeeper.ZooDefs.OpCode;
  31. import com.yahoo.zookeeper.data.ACL;
  32. import com.yahoo.zookeeper.data.Id;
  33. import com.yahoo.zookeeper.data.Stat;
  34. import com.yahoo.zookeeper.proto.CreateRequest;
  35. import com.yahoo.zookeeper.proto.DeleteRequest;
  36. import com.yahoo.zookeeper.proto.SetACLRequest;
  37. import com.yahoo.zookeeper.proto.SetDataRequest;
  38. import com.yahoo.zookeeper.server.ZooKeeperServer.ChangeRecord;
  39. import com.yahoo.zookeeper.server.auth.AuthenticationProvider;
  40. import com.yahoo.zookeeper.server.auth.ProviderRegistry;
  41. import com.yahoo.zookeeper.txn.CreateSessionTxn;
  42. import com.yahoo.zookeeper.txn.CreateTxn;
  43. import com.yahoo.zookeeper.txn.DeleteTxn;
  44. import com.yahoo.zookeeper.txn.ErrorTxn;
  45. import com.yahoo.zookeeper.txn.SetACLTxn;
  46. import com.yahoo.zookeeper.txn.SetDataTxn;
  47. import com.yahoo.zookeeper.txn.TxnHeader;
  48. /**
  49. * This request processor is generally at the start of a RequestProcessor
  50. * change. It sets up any transactions associated with requests that change the
  51. * state of the system. It counts on ZooKeeperServer to update
  52. * outstandingRequests, so that it can take into account transactions that are
  53. * in the queue to be applied when generating a transaction.
  54. */
  55. public class PrepRequestProcessor extends Thread implements RequestProcessor {
  56. private static final Logger LOG = Logger.getLogger(PrepRequestProcessor.class);
  57. static boolean skipACL;
  58. static {
  59. skipACL = System.getProperty("zookeeper.skipACL", "no").equals("yes");
  60. }
  61. LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
  62. RequestProcessor nextProcessor;
  63. ZooKeeperServer zks;
  64. public PrepRequestProcessor(ZooKeeperServer zks,
  65. RequestProcessor nextProcessor) {
  66. super("ProcessThread");
  67. this.nextProcessor = nextProcessor;
  68. this.zks = zks;
  69. start();
  70. }
  71. public void run() {
  72. try {
  73. while (true) {
  74. Request request = submittedRequests.take();
  75. long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
  76. if (request.type == OpCode.ping) {
  77. traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
  78. }
  79. ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
  80. if (Request.requestOfDeath == request) {
  81. break;
  82. }
  83. pRequest(request);
  84. }
  85. } catch (InterruptedException e) {
  86. LOG.error("FIXMSG",e);
  87. }
  88. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  89. "PrepRequestProcessor exited loop!");
  90. }
  91. ChangeRecord getRecordForPath(String path) throws KeeperException.NoNodeException {
  92. ChangeRecord lastChange = null;
  93. synchronized (zks.outstandingChanges) {
  94. for (int i = 0; i < zks.outstandingChanges.size(); i++) {
  95. ChangeRecord c = zks.outstandingChanges.get(i);
  96. if (c.path.equals(path)) {
  97. lastChange = c;
  98. }
  99. }
  100. if (lastChange == null) {
  101. DataNode n = zks.dataTree.getNode(path);
  102. if (n != null) {
  103. lastChange = new ChangeRecord(-1, path, n.stat, n.children
  104. .size(), n.acl);
  105. }
  106. }
  107. }
  108. if (lastChange == null || lastChange.stat == null) {
  109. throw new KeeperException.NoNodeException();
  110. }
  111. return lastChange;
  112. }
  113. void addChangeRecord(ChangeRecord c) {
  114. synchronized (zks.outstandingChanges) {
  115. zks.outstandingChanges.add(c);
  116. }
  117. }
  118. static void checkACL(ZooKeeperServer zks, List<ACL> acl, int perm,
  119. List<Id> ids) throws KeeperException.NoAuthException {
  120. if (skipACL) {
  121. return;
  122. }
  123. if (acl == null || acl.size() == 0) {
  124. return;
  125. }
  126. for (ACL a : acl) {
  127. Id id = a.getId();
  128. if ((a.getPerms() & perm) != 0) {
  129. if (id.getScheme().equals("world")
  130. && id.getId().equals("anyone")) {
  131. return;
  132. }
  133. AuthenticationProvider ap = ProviderRegistry.getProvider(id
  134. .getScheme());
  135. if (ap != null) {
  136. for (Id authId : ids) {
  137. if (authId.getScheme().equals("super")) {
  138. return;
  139. }
  140. if (authId.getScheme().equals(id.getScheme())
  141. && ap.matches(authId.getId(), id.getId())) {
  142. return;
  143. }
  144. }
  145. }
  146. }
  147. }
  148. throw new KeeperException.NoAuthException();
  149. }
  150. /**
  151. * This method will be called inside the ProcessRequestThread, which is a
  152. * singleton, so there will be a single thread calling this code.
  153. *
  154. * @param request
  155. */
  156. @SuppressWarnings("unchecked")
  157. protected void pRequest(Request request) {
  158. // LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
  159. // request.type + " id = " + request.sessionId);
  160. TxnHeader txnHeader = null;
  161. Record txn = null;
  162. try {
  163. switch (request.type) {
  164. case OpCode.create:
  165. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  166. .getNextZxid(), zks.getTime(), OpCode.create);
  167. zks.sessionTracker.checkSession(request.sessionId);
  168. CreateRequest createRequest = new CreateRequest();
  169. ZooKeeperServer.byteBuffer2Record(request.request,
  170. createRequest);
  171. String path = createRequest.getPath();
  172. int lastSlash = path.lastIndexOf('/');
  173. if (lastSlash == -1 || path.indexOf('\0') != -1) {
  174. throw new KeeperException.BadArgumentsException();
  175. }
  176. if (!fixupACL(request.authInfo, createRequest.getAcl())) {
  177. throw new KeeperException.InvalidACLException();
  178. }
  179. String parentPath = path.substring(0, lastSlash);
  180. ChangeRecord parentRecord = getRecordForPath(parentPath);
  181. checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
  182. request.authInfo);
  183. int parentCVersion = parentRecord.stat.getCversion();
  184. if ((createRequest.getFlags() & CreateFlags.SEQUENCE) != 0) {
  185. path = path + parentCVersion;
  186. }
  187. try {
  188. if (getRecordForPath(path) != null) {
  189. throw new KeeperException.NodeExistsException();
  190. }
  191. } catch (KeeperException.NoNodeException e) {
  192. // ignore this one
  193. }
  194. boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
  195. if (ephemeralParent) {
  196. throw new KeeperException.NoChildrenForEphemeralsException();
  197. }
  198. txn = new CreateTxn(path, createRequest.getData(),
  199. createRequest.getAcl(),
  200. (createRequest.getFlags() & CreateFlags.EPHEMERAL) != 0);
  201. Stat s = new Stat();
  202. if ((createRequest.getFlags() & CreateFlags.EPHEMERAL) != 0) {
  203. s.setEphemeralOwner(request.sessionId);
  204. }
  205. parentRecord = parentRecord.duplicate(txnHeader.getZxid());
  206. parentRecord.childCount++;
  207. parentRecord.stat
  208. .setCversion(parentRecord.stat.getCversion() + 1);
  209. addChangeRecord(parentRecord);
  210. addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path, s,
  211. 0, createRequest.getAcl()));
  212. break;
  213. case OpCode.delete:
  214. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  215. .getNextZxid(), zks.getTime(), OpCode.delete);
  216. zks.sessionTracker.checkSession(request.sessionId);
  217. DeleteRequest deleteRequest = new DeleteRequest();
  218. ZooKeeperServer.byteBuffer2Record(request.request,
  219. deleteRequest);
  220. path = deleteRequest.getPath();
  221. lastSlash = path.lastIndexOf('/');
  222. if (lastSlash == -1 || path.indexOf('\0') != -1
  223. || path.equals("/")) {
  224. throw new KeeperException.BadArgumentsException();
  225. }
  226. parentPath = path.substring(0, lastSlash);
  227. parentRecord = getRecordForPath(parentPath);
  228. ChangeRecord nodeRecord = getRecordForPath(path);
  229. checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE,
  230. request.authInfo);
  231. int version = deleteRequest.getVersion();
  232. if (version != -1 && nodeRecord.stat.getVersion() != version) {
  233. throw new KeeperException.BadVersionException();
  234. }
  235. if (nodeRecord.childCount > 0) {
  236. throw new KeeperException.NotEmptyException();
  237. }
  238. txn = new DeleteTxn(path);
  239. parentRecord = parentRecord.duplicate(txnHeader.getZxid());
  240. parentRecord.childCount--;
  241. parentRecord.stat
  242. .setCversion(parentRecord.stat.getCversion() + 1);
  243. addChangeRecord(parentRecord);
  244. addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path,
  245. null, -1, null));
  246. break;
  247. case OpCode.setData:
  248. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  249. .getNextZxid(), zks.getTime(), OpCode.setData);
  250. zks.sessionTracker.checkSession(request.sessionId);
  251. SetDataRequest setDataRequest = new SetDataRequest();
  252. ZooKeeperServer.byteBuffer2Record(request.request,
  253. setDataRequest);
  254. path = setDataRequest.getPath();
  255. nodeRecord = getRecordForPath(path);
  256. checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
  257. request.authInfo);
  258. version = setDataRequest.getVersion();
  259. int currentVersion = nodeRecord.stat.getVersion();
  260. if (version != -1 && version != currentVersion) {
  261. throw new KeeperException.BadVersionException();
  262. }
  263. version = currentVersion + 1;
  264. txn = new SetDataTxn(path, setDataRequest.getData(), version);
  265. nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
  266. nodeRecord.stat.setVersion(version);
  267. addChangeRecord(nodeRecord);
  268. break;
  269. case OpCode.setACL:
  270. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  271. .getNextZxid(), zks.getTime(), OpCode.setACL);
  272. zks.sessionTracker.checkSession(request.sessionId);
  273. SetACLRequest setAclRequest = new SetACLRequest();
  274. if (!fixupACL(request.authInfo, setAclRequest.getAcl())) {
  275. throw new KeeperException.InvalidACLException();
  276. }
  277. ZooKeeperServer.byteBuffer2Record(request.request,
  278. setAclRequest);
  279. path = setAclRequest.getPath();
  280. nodeRecord = getRecordForPath(path);
  281. checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN,
  282. request.authInfo);
  283. version = setAclRequest.getVersion();
  284. currentVersion = nodeRecord.stat.getAversion();
  285. if (version != -1 && version != currentVersion) {
  286. throw new KeeperException.BadVersionException();
  287. }
  288. version = currentVersion + 1;
  289. txn = new SetACLTxn(path, setAclRequest.getAcl(), version);
  290. nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
  291. nodeRecord.stat.setAversion(version);
  292. addChangeRecord(nodeRecord);
  293. break;
  294. case OpCode.createSession:
  295. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  296. .getNextZxid(), zks.getTime(), OpCode.createSession);
  297. request.request.rewind();
  298. int to = request.request.getInt();
  299. txn = new CreateSessionTxn(to);
  300. request.request.rewind();
  301. zks.sessionTracker.addSession(request.sessionId, to);
  302. break;
  303. case OpCode.closeSession:
  304. txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
  305. .getNextZxid(), zks.getTime(), OpCode.closeSession);
  306. HashSet<String> es = zks.dataTree
  307. .getEphemerals(request.sessionId);
  308. synchronized (zks.outstandingChanges) {
  309. for (ChangeRecord c : zks.outstandingChanges) {
  310. if (c.stat == null) {
  311. // Doing a delete
  312. es.remove(c.path);
  313. } else if (c.stat.getEphemeralOwner() == request.sessionId) {
  314. es.add(c.path);
  315. }
  316. }
  317. for (String path2Delete : es) {
  318. addChangeRecord(new ChangeRecord(txnHeader.getZxid(),
  319. path2Delete, null, 0, null));
  320. }
  321. }
  322. LOG.info("Processed session termination request for id: "
  323. + Long.toHexString(request.sessionId));
  324. break;
  325. case OpCode.sync:
  326. case OpCode.exists:
  327. case OpCode.getData:
  328. case OpCode.getACL:
  329. case OpCode.getChildren:
  330. case OpCode.ping:
  331. break;
  332. }
  333. } catch (KeeperException e) {
  334. if (txnHeader != null) {
  335. txnHeader.setType(OpCode.error);
  336. txn = new ErrorTxn(e.getCode());
  337. }
  338. } catch (Exception e) {
  339. LOG.error("*********************************" + request);
  340. StringBuffer sb = new StringBuffer();
  341. ByteBuffer bb = request.request;
  342. if(bb!=null){
  343. bb.rewind();
  344. while (bb.hasRemaining()) {
  345. sb.append(Integer.toHexString(bb.get() & 0xff));
  346. }
  347. }else
  348. sb.append("request buffer is null");
  349. LOG.error(sb.toString());
  350. LOG.error("Unexpected exception", e);
  351. if (txnHeader != null) {
  352. txnHeader.setType(OpCode.error);
  353. txn = new ErrorTxn(Code.MarshallingError);
  354. }
  355. }
  356. request.hdr = txnHeader;
  357. request.txn = txn;
  358. if (request.hdr != null) {
  359. request.zxid = request.hdr.getZxid();
  360. }
  361. nextProcessor.processRequest(request);
  362. }
  363. /**
  364. *
  365. * @param authInfo list of ACL IDs associated with the client connection
  366. * @param acl list of ACLs being assigned to the node (create or setACL operation)
  367. * @return
  368. */
  369. private boolean fixupACL(List<Id> authInfo, List<ACL> acl) {
  370. if (skipACL) {
  371. return true;
  372. }
  373. if (acl == null || acl.size() == 0) {
  374. return false;
  375. }
  376. Iterator<ACL> it = acl.iterator();
  377. LinkedList<ACL> toAdd = null;
  378. while (it.hasNext()) {
  379. ACL a = it.next();
  380. Id id = a.getId();
  381. if (id.getScheme().equals("world") && id.getId().equals("anyone")) {
  382. } else if (id.getScheme().equals("auth")) {
  383. it.remove();
  384. if (toAdd == null) {
  385. toAdd = new LinkedList<ACL>();
  386. }
  387. for (Id cid : authInfo) {
  388. AuthenticationProvider ap = ProviderRegistry.getProvider(cid.getScheme());
  389. if (ap == null) {
  390. LOG.error("Missing AuthenticationProvider for "
  391. + cid.getScheme());
  392. } else if (ap.isAuthenticated()) {
  393. toAdd.add(new ACL(a.getPerms(), cid));
  394. }
  395. }
  396. } else {
  397. AuthenticationProvider ap = ProviderRegistry.getProvider(id
  398. .getScheme());
  399. if (ap == null) {
  400. return false;
  401. }
  402. if (!ap.isValid(id.getId())) {
  403. return false;
  404. }
  405. }
  406. }
  407. if (toAdd != null) {
  408. for (ACL a : toAdd) {
  409. acl.add(a);
  410. }
  411. }
  412. return true;
  413. }
  414. public void processRequest(Request request) {
  415. // request.addRQRec(">prep="+zks.outstandingChanges.size());
  416. submittedRequests.add(request);
  417. }
  418. public void shutdown() {
  419. submittedRequests.clear();
  420. submittedRequests.add(Request.requestOfDeath);
  421. nextProcessor.shutdown();
  422. }
  423. }