FinalRequestProcessor.java 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.apache.zookeeper.server;
  17. import java.io.IOException;
  18. import java.nio.ByteBuffer;
  19. import java.util.ArrayList;
  20. import java.util.List;
  21. import org.apache.log4j.Logger;
  22. import org.apache.jute.Record;
  23. import org.apache.zookeeper.KeeperException;
  24. import org.apache.zookeeper.ZooDefs;
  25. import org.apache.zookeeper.KeeperException.Code;
  26. import org.apache.zookeeper.ZooDefs.OpCode;
  27. import org.apache.zookeeper.data.ACL;
  28. import org.apache.zookeeper.data.Stat;
  29. import org.apache.zookeeper.proto.CreateResponse;
  30. import org.apache.zookeeper.proto.ExistsRequest;
  31. import org.apache.zookeeper.proto.ExistsResponse;
  32. import org.apache.zookeeper.proto.GetACLRequest;
  33. import org.apache.zookeeper.proto.GetACLResponse;
  34. import org.apache.zookeeper.proto.GetChildrenRequest;
  35. import org.apache.zookeeper.proto.GetChildrenResponse;
  36. import org.apache.zookeeper.proto.GetDataRequest;
  37. import org.apache.zookeeper.proto.GetDataResponse;
  38. import org.apache.zookeeper.proto.ReplyHeader;
  39. import org.apache.zookeeper.proto.SetACLResponse;
  40. import org.apache.zookeeper.proto.SetDataResponse;
  41. import org.apache.zookeeper.proto.SyncRequest;
  42. import org.apache.zookeeper.proto.SyncResponse;
  43. import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
  44. import org.apache.zookeeper.server.NIOServerCnxn.Factory;
  45. import org.apache.zookeeper.txn.CreateSessionTxn;
  46. import org.apache.zookeeper.txn.ErrorTxn;
  47. /**
  48. * This Request processor actually applies any transaction associated with a
  49. * request and services any queries. It is always at the end of a
  50. * RequestProcessor chain (hence the name), so it does not have a nextProcessor
  51. * member.
  52. *
  53. * This RequestProcessor counts on ZooKeeperServer to populate the
  54. * outstandingRequests member of ZooKeeperServer.
  55. */
  56. public class FinalRequestProcessor implements RequestProcessor {
  57. private static final Logger LOG = Logger.getLogger(FinalRequestProcessor.class);
  58. ZooKeeperServer zks;
  59. public FinalRequestProcessor(ZooKeeperServer zks) {
  60. this.zks = zks;
  61. }
  62. public void processRequest(Request request) {
  63. // LOG.info("Zoo>>> cxid = " + request.cxid + " type = " +
  64. // request.type + " id = " + request.sessionId + " cnxn " +
  65. // request.cnxn);
  66. // request.addRQRec(">final");
  67. long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
  68. if (request.type == OpCode.ping) {
  69. traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  70. }
  71. ZooTrace.logRequest(LOG, traceMask, 'E', request, "");
  72. ProcessTxnResult rc = null;
  73. synchronized (zks.outstandingChanges) {
  74. while (!zks.outstandingChanges.isEmpty()
  75. && zks.outstandingChanges.get(0).zxid <= request.zxid) {
  76. if (zks.outstandingChanges.get(0).zxid < request.zxid) {
  77. LOG.warn("Zxid outstanding "
  78. + zks.outstandingChanges.get(0).zxid
  79. + " is less than current " + request.zxid);
  80. }
  81. zks.outstandingChanges.remove(0);
  82. }
  83. if (request.hdr != null) {
  84. rc = zks.dataTree.processTxn(request.hdr, request.txn);
  85. if (request.type == OpCode.createSession) {
  86. if (request.txn instanceof CreateSessionTxn) {
  87. CreateSessionTxn cst = (CreateSessionTxn) request.txn;
  88. zks.sessionTracker.addSession(request.sessionId, cst
  89. .getTimeOut());
  90. } else {
  91. LOG.warn("*****>>>>> Got "
  92. + request.txn.getClass() + " "
  93. + request.txn.toString());
  94. }
  95. } else if (request.type == OpCode.closeSession) {
  96. zks.sessionTracker.removeSession(request.sessionId);
  97. }
  98. }
  99. // do not add non quorum packets to the queue.
  100. if (Request.isQuorum(request.type)) {
  101. zks.addCommittedProposal(request);
  102. }
  103. }
  104. if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
  105. Factory scxn = zks.getServerCnxnFactory();
  106. // this might be possible since
  107. // we might just be playing diffs from the leader
  108. if (scxn != null) {
  109. scxn.closeSession(request.sessionId);
  110. }
  111. }
  112. if (request.cnxn == null) {
  113. return;
  114. }
  115. zks.decInProcess();
  116. int err = 0;
  117. Record rsp = null;
  118. try {
  119. if (request.hdr != null && request.hdr.getType() == OpCode.error) {
  120. throw KeeperException.create(((ErrorTxn) request.txn).getErr());
  121. }
  122. switch (request.type) {
  123. case OpCode.ping:
  124. request.cnxn.sendResponse(new ReplyHeader(-2,
  125. zks.dataTree.lastProcessedZxid, 0), null, "response");
  126. return;
  127. case OpCode.createSession:
  128. request.cnxn.finishSessionInit(true);
  129. return;
  130. case OpCode.create:
  131. rsp = new CreateResponse(rc.path);
  132. err = rc.err;
  133. break;
  134. case OpCode.delete:
  135. err = rc.err;
  136. break;
  137. case OpCode.setData:
  138. rsp = new SetDataResponse(rc.stat);
  139. err = rc.err;
  140. break;
  141. case OpCode.setACL:
  142. rsp = new SetACLResponse(rc.stat);
  143. err = rc.err;
  144. break;
  145. case OpCode.closeSession:
  146. err = rc.err;
  147. break;
  148. case OpCode.sync:
  149. SyncRequest syncRequest = new SyncRequest();
  150. ZooKeeperServer.byteBuffer2Record(request.request,
  151. syncRequest);
  152. rsp = new SyncResponse(syncRequest.getPath());
  153. break;
  154. case OpCode.exists:
  155. // TODO we need to figure out the security requirement for this!
  156. ExistsRequest existsRequest = new ExistsRequest();
  157. ZooKeeperServer.byteBuffer2Record(request.request,
  158. existsRequest);
  159. String path = existsRequest.getPath();
  160. if (path.indexOf('\0') != -1) {
  161. throw new KeeperException.BadArgumentsException();
  162. }
  163. Stat stat = zks.dataTree.statNode(path, existsRequest
  164. .getWatch() ? request.cnxn : null);
  165. rsp = new ExistsResponse(stat);
  166. break;
  167. case OpCode.getData:
  168. GetDataRequest getDataRequest = new GetDataRequest();
  169. ZooKeeperServer.byteBuffer2Record(request.request,
  170. getDataRequest);
  171. DataNode n = zks.dataTree.getNode(getDataRequest.getPath());
  172. if (n == null) {
  173. throw new KeeperException.NoNodeException();
  174. }
  175. PrepRequestProcessor.checkACL(zks, n.acl, ZooDefs.Perms.READ,
  176. request.authInfo);
  177. stat = new Stat();
  178. byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat,
  179. getDataRequest.getWatch() ? request.cnxn : null);
  180. rsp = new GetDataResponse(b, stat);
  181. break;
  182. case OpCode.getACL:
  183. GetACLRequest getACLRequest = new GetACLRequest();
  184. ZooKeeperServer.byteBuffer2Record(request.request,
  185. getACLRequest);
  186. stat = new Stat();
  187. List<ACL> acl =
  188. zks.dataTree.getACL(getACLRequest.getPath(), stat);
  189. rsp = new GetACLResponse(acl, stat);
  190. break;
  191. case OpCode.getChildren:
  192. GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
  193. ZooKeeperServer.byteBuffer2Record(request.request,
  194. getChildrenRequest);
  195. stat = new Stat();
  196. n = zks.dataTree.getNode(getChildrenRequest.getPath());
  197. if (n == null) {
  198. throw new KeeperException.NoNodeException();
  199. }
  200. PrepRequestProcessor.checkACL(zks, n.acl, ZooDefs.Perms.READ,
  201. request.authInfo);
  202. ArrayList<String> children = zks.dataTree.getChildren(
  203. getChildrenRequest.getPath(), stat, getChildrenRequest
  204. .getWatch() ? request.cnxn : null);
  205. rsp = new GetChildrenResponse(children);
  206. break;
  207. }
  208. } catch (KeeperException e) {
  209. err = e.getCode();
  210. } catch (Exception e) {
  211. LOG.warn("****************************** " + request);
  212. StringBuffer sb = new StringBuffer();
  213. ByteBuffer bb = request.request;
  214. bb.rewind();
  215. while (bb.hasRemaining()) {
  216. sb.append(Integer.toHexString(bb.get() & 0xff));
  217. }
  218. LOG.warn(sb.toString());
  219. LOG.error("FIXMSG",e);
  220. err = Code.MarshallingError;
  221. }
  222. ReplyHeader hdr = new ReplyHeader(request.cxid, request.zxid, err);
  223. ServerStats.getInstance().updateLatency(request.createTime);
  224. try {
  225. request.cnxn.sendResponse(hdr, rsp, "response");
  226. } catch (IOException e) {
  227. LOG.error("FIXMSG",e);
  228. }
  229. }
  230. public void shutdown() {
  231. }
  232. }