NIOServerCnxn.java 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869
  1. /**
  2. * Licensed to the Apache Software Foundation (ASF) under one
  3. * or more contributor license agreements. See the NOTICE file
  4. * distributed with this work for additional information
  5. * regarding copyright ownership. The ASF licenses this file
  6. * to you under the Apache License, Version 2.0 (the
  7. * "License"); you may not use this file except in compliance
  8. * with the License. You may obtain a copy of the License at
  9. *
  10. * http://www.apache.org/licenses/LICENSE-2.0
  11. *
  12. * Unless required by applicable law or agreed to in writing, software
  13. * distributed under the License is distributed on an "AS IS" BASIS,
  14. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. * See the License for the specific language governing permissions and
  16. * limitations under the License.
  17. */
  18. package org.apache.zookeeper.server;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.net.InetAddress;
  23. import java.net.InetSocketAddress;
  24. import java.nio.ByteBuffer;
  25. import java.nio.channels.CancelledKeyException;
  26. import java.nio.channels.Channel;
  27. import java.nio.channels.SelectionKey;
  28. import java.nio.channels.Selector;
  29. import java.nio.channels.ServerSocketChannel;
  30. import java.nio.channels.SocketChannel;
  31. import java.util.ArrayList;
  32. import java.util.Collections;
  33. import java.util.HashSet;
  34. import java.util.Iterator;
  35. import java.util.LinkedList;
  36. import java.util.Set;
  37. import java.util.concurrent.LinkedBlockingQueue;
  38. import org.apache.log4j.Logger;
  39. import org.apache.jute.BinaryInputArchive;
  40. import org.apache.jute.BinaryOutputArchive;
  41. import org.apache.jute.Record;
  42. import org.apache.zookeeper.KeeperException;
  43. import org.apache.zookeeper.Version;
  44. import org.apache.zookeeper.Watcher;
  45. import org.apache.zookeeper.WatchedEvent;
  46. import org.apache.zookeeper.ZooDefs.OpCode;
  47. import org.apache.zookeeper.data.Id;
  48. import org.apache.zookeeper.proto.AuthPacket;
  49. import org.apache.zookeeper.proto.ConnectRequest;
  50. import org.apache.zookeeper.proto.ConnectResponse;
  51. import org.apache.zookeeper.proto.ReplyHeader;
  52. import org.apache.zookeeper.proto.RequestHeader;
  53. import org.apache.zookeeper.proto.WatcherEvent;
  54. import org.apache.zookeeper.server.auth.AuthenticationProvider;
  55. import org.apache.zookeeper.server.auth.ProviderRegistry;
  56. /**
  57. * This class handles communication with clients using NIO. There is one per
  58. * client, but only one thread doing the communication.
  59. */
  60. public class NIOServerCnxn implements Watcher, ServerCnxn {
  61. private static final Logger LOG = Logger.getLogger(NIOServerCnxn.class);
  62. static public class Factory extends Thread {
  63. ZooKeeperServer zks;
  64. ServerSocketChannel ss;
  65. Selector selector = Selector.open();
  66. /**
  67. * We use this buffer to do efficient socket I/O. Since there is a single
  68. * sender thread per NIOServerCnxn instance, we can use a member variable to
  69. * only allocate it once.
  70. */
  71. ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
  72. HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
  73. int outstandingLimit = 1;
  74. public Factory(int port) throws IOException {
  75. super("NIOServerCxn.Factory:" + port);
  76. setDaemon(true);
  77. this.ss = ServerSocketChannel.open();
  78. ss.socket().bind(new InetSocketAddress(port));
  79. ss.configureBlocking(false);
  80. ss.register(selector, SelectionKey.OP_ACCEPT);
  81. start();
  82. }
  83. public void startup(ZooKeeperServer zks) throws IOException,
  84. InterruptedException {
  85. zks.startup();
  86. setZooKeeperServer(zks);
  87. }
  88. public void setZooKeeperServer(ZooKeeperServer zks) {
  89. this.zks = zks;
  90. if (zks != null) {
  91. this.outstandingLimit = zks.getGlobalOutstandingLimit();
  92. zks.setServerCnxnFactory(this);
  93. } else {
  94. this.outstandingLimit = 1;
  95. }
  96. }
  97. public InetSocketAddress getLocalAddress(){
  98. return (InetSocketAddress)ss.socket().getLocalSocketAddress();
  99. }
  100. public int getLocalPort(){
  101. return ss.socket().getLocalPort();
  102. }
  103. private void addCnxn(NIOServerCnxn cnxn) {
  104. synchronized (cnxns) {
  105. cnxns.add(cnxn);
  106. }
  107. }
  108. protected NIOServerCnxn createConnection(SocketChannel sock,
  109. SelectionKey sk) throws IOException {
  110. return new NIOServerCnxn(zks, sock, sk, this);
  111. }
  112. public void run() {
  113. while (!ss.socket().isClosed()) {
  114. try {
  115. selector.select(1000);
  116. Set<SelectionKey> selected;
  117. synchronized (this) {
  118. selected = selector.selectedKeys();
  119. }
  120. ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
  121. selected);
  122. Collections.shuffle(selectedList);
  123. for (SelectionKey k : selectedList) {
  124. if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
  125. SocketChannel sc = ((ServerSocketChannel) k
  126. .channel()).accept();
  127. sc.configureBlocking(false);
  128. SelectionKey sk = sc.register(selector,
  129. SelectionKey.OP_READ);
  130. NIOServerCnxn cnxn = createConnection(sc, sk);
  131. sk.attach(cnxn);
  132. addCnxn(cnxn);
  133. } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
  134. NIOServerCnxn c = (NIOServerCnxn) k.attachment();
  135. c.doIO(k);
  136. }
  137. }
  138. selected.clear();
  139. } catch (Exception e) {
  140. LOG.error("FIXMSG",e);
  141. }
  142. }
  143. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  144. "NIOServerCnxn factory exitedloop.");
  145. clear();
  146. LOG.error("=====> Goodbye cruel world <======");
  147. // System.exit(0);
  148. }
  149. /**
  150. * clear all the connections in the selector
  151. *
  152. */
  153. synchronized public void clear() {
  154. selector.wakeup();
  155. synchronized (cnxns) {
  156. // got to clear all the connections that we have in the selector
  157. for (Iterator<NIOServerCnxn> it = cnxns.iterator(); it
  158. .hasNext();) {
  159. NIOServerCnxn cnxn = it.next();
  160. it.remove();
  161. try {
  162. cnxn.close();
  163. } catch (Exception e) {
  164. // Do nothing.
  165. }
  166. }
  167. }
  168. }
  169. public void shutdown() {
  170. try {
  171. ss.close();
  172. clear();
  173. this.interrupt();
  174. this.join();
  175. } catch (InterruptedException e) {
  176. LOG.warn("Interrupted",e);
  177. } catch (Exception e) {
  178. LOG.error("Unexpected exception", e);
  179. }
  180. if (zks != null) {
  181. zks.shutdown();
  182. }
  183. }
  184. synchronized void closeSession(long sessionId) {
  185. selector.wakeup();
  186. synchronized (cnxns) {
  187. for (Iterator<NIOServerCnxn> it = cnxns.iterator(); it
  188. .hasNext();) {
  189. NIOServerCnxn cnxn = it.next();
  190. if (cnxn.sessionId == sessionId) {
  191. it.remove();
  192. try {
  193. cnxn.close();
  194. } catch (Exception e) {
  195. LOG.warn("exception during session close", e);
  196. }
  197. break;
  198. }
  199. }
  200. }
  201. }
  202. }
  203. /**
  204. * The buffer will cause the connection to be close when we do a send.
  205. */
  206. static final ByteBuffer closeConn = ByteBuffer.allocate(0);
  207. Factory factory;
  208. ZooKeeperServer zk;
  209. private SocketChannel sock;
  210. private SelectionKey sk;
  211. boolean initialized;
  212. ByteBuffer lenBuffer = ByteBuffer.allocate(4);
  213. ByteBuffer incomingBuffer = lenBuffer;
  214. LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
  215. int sessionTimeout;
  216. ArrayList<Id> authInfo = new ArrayList<Id>();
  217. LinkedList<Request> outstanding = new LinkedList<Request>();
  218. void sendBuffer(ByteBuffer bb) {
  219. synchronized (factory) {
  220. try {
  221. sk.selector().wakeup();
  222. // ZooLog.logTraceMessage(LOG,
  223. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  224. // "Add a buffer to outgoingBuffers");
  225. // ZooLog.logTraceMessage(LOG,
  226. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  227. //"sk " + sk + " is valid: " +
  228. // sk.isValid(), );
  229. outgoingBuffers.add(bb);
  230. if (sk.isValid()) {
  231. sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
  232. }
  233. } catch (RuntimeException e) {
  234. LOG.error("FIXMSG",e);
  235. throw e;
  236. }
  237. }
  238. }
  239. void doIO(SelectionKey k) throws InterruptedException {
  240. try {
  241. if (sock == null) {
  242. return;
  243. }
  244. if (k.isReadable()) {
  245. int rc = sock.read(incomingBuffer);
  246. if (rc < 0) {
  247. throw new IOException("Read error");
  248. }
  249. if (incomingBuffer.remaining() == 0) {
  250. incomingBuffer.flip();
  251. if (incomingBuffer == lenBuffer) {
  252. readLength(k);
  253. } else if (!initialized) {
  254. stats.packetsReceived++;
  255. ServerStats.getInstance().incrementPacketsReceived();
  256. readConnectRequest();
  257. lenBuffer.clear();
  258. incomingBuffer = lenBuffer;
  259. } else {
  260. stats.packetsReceived++;
  261. ServerStats.getInstance().incrementPacketsReceived();
  262. readRequest();
  263. lenBuffer.clear();
  264. incomingBuffer = lenBuffer;
  265. }
  266. }
  267. }
  268. if (k.isWritable()) {
  269. // ZooLog.logTraceMessage(LOG,
  270. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
  271. // "outgoingBuffers.size() = " +
  272. // outgoingBuffers.size());
  273. if (outgoingBuffers.size() > 0) {
  274. // ZooLog.logTraceMessage(LOG,
  275. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  276. // "sk " + k + " is valid: " +
  277. // k.isValid());
  278. /*
  279. * This is going to reset the buffer position to 0 and the
  280. * limit to the size of the buffer, so that we can fill it
  281. * with data from the non-direct buffers that we need to
  282. * send.
  283. */
  284. ByteBuffer directBuffer = factory.directBuffer;
  285. directBuffer.clear();
  286. for (ByteBuffer b : outgoingBuffers) {
  287. if (directBuffer.remaining() < b.remaining()) {
  288. /*
  289. * When we call put later, if the directBuffer is to
  290. * small to hold everything, nothing will be copied,
  291. * so we've got to slice the buffer if it's too big.
  292. */
  293. b = (ByteBuffer) b.slice().limit(
  294. directBuffer.remaining());
  295. }
  296. /*
  297. * put() is going to modify the positions of both
  298. * buffers, put we don't want to change the position of
  299. * the source buffers (we'll do that after the send, if
  300. * needed), so we save and reset the position after the
  301. * copy
  302. */
  303. int p = b.position();
  304. directBuffer.put(b);
  305. b.position(p);
  306. if (directBuffer.remaining() == 0) {
  307. break;
  308. }
  309. }
  310. /*
  311. * Do the flip: limit becomes position, position gets set to
  312. * 0. This sets us up for the write.
  313. */
  314. directBuffer.flip();
  315. int sent = sock.write(directBuffer);
  316. ByteBuffer bb;
  317. // Remove the buffers that we have sent
  318. while (outgoingBuffers.size() > 0) {
  319. bb = outgoingBuffers.peek();
  320. if (bb == closeConn) {
  321. throw new IOException("closing");
  322. }
  323. int left = bb.remaining() - sent;
  324. if (left > 0) {
  325. /*
  326. * We only partially sent this buffer, so we update
  327. * the position and exit the loop.
  328. */
  329. bb.position(bb.position() + sent);
  330. break;
  331. }
  332. stats.packetsSent++;
  333. /* We've sent the whole buffer, so drop the buffer */
  334. sent -= bb.remaining();
  335. ServerStats.getInstance().incrementPacketsSent();
  336. outgoingBuffers.remove();
  337. }
  338. // ZooLog.logTraceMessage(LOG,
  339. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
  340. // outgoingBuffers.size() = " + outgoingBuffers.size());
  341. }
  342. synchronized (this) {
  343. if (outgoingBuffers.size() == 0) {
  344. if (!initialized
  345. && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
  346. throw new IOException("Responded to info probe");
  347. }
  348. sk.interestOps(sk.interestOps()
  349. & (~SelectionKey.OP_WRITE));
  350. } else {
  351. sk.interestOps(sk.interestOps()
  352. | SelectionKey.OP_WRITE);
  353. }
  354. }
  355. }
  356. } catch (CancelledKeyException e) {
  357. close();
  358. } catch (IOException e) {
  359. // LOG.error("FIXMSG",e);
  360. close();
  361. }
  362. }
  363. private void readRequest() throws IOException {
  364. // We have the request, now process and setup for next
  365. InputStream bais = new ByteBufferInputStream(incomingBuffer);
  366. BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
  367. RequestHeader h = new RequestHeader();
  368. h.deserialize(bia, "header");
  369. // Through the magic of byte buffers, txn will not be
  370. // pointing
  371. // to the start of the txn
  372. incomingBuffer = incomingBuffer.slice();
  373. if (h.getType() == OpCode.auth) {
  374. AuthPacket authPacket = new AuthPacket();
  375. ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
  376. String scheme = authPacket.getScheme();
  377. AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
  378. if (ap == null
  379. || ap.handleAuthentication(this, authPacket.getAuth()) != KeeperException.Code.Ok) {
  380. if (ap == null)
  381. LOG.error("No authentication provider for scheme: "
  382. + scheme + " has " + ProviderRegistry.listProviders());
  383. else
  384. LOG.debug("Authentication failed for scheme: "
  385. + scheme);
  386. // send a response...
  387. ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
  388. KeeperException.Code.AuthFailed);
  389. sendResponse(rh, null, null);
  390. // ... and close connection
  391. sendBuffer(NIOServerCnxn.closeConn);
  392. disableRecv();
  393. } else {
  394. LOG.debug("Authentication succeeded for scheme: "
  395. + scheme);
  396. ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
  397. KeeperException.Code.Ok);
  398. sendResponse(rh, null, null);
  399. }
  400. return;
  401. } else {
  402. zk.submitRequest(this, sessionId, h.getType(), h.getXid(),
  403. incomingBuffer, authInfo);
  404. }
  405. if (h.getXid() >= 0) {
  406. synchronized (this) {
  407. outstandingRequests++;
  408. // check throttling
  409. if (zk.getInProcess() > factory.outstandingLimit) {
  410. LOG.warn("Throttling recv " + zk.getInProcess());
  411. disableRecv();
  412. // following lines should not be needed since we are already
  413. // reading
  414. // } else {
  415. // enableRecv();
  416. }
  417. }
  418. }
  419. }
  420. public void disableRecv() {
  421. sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ));
  422. }
  423. public void enableRecv() {
  424. if (sk.isValid()) {
  425. int interest = sk.interestOps();
  426. if ((interest & SelectionKey.OP_READ) == 0) {
  427. sk.interestOps(interest | SelectionKey.OP_READ);
  428. }
  429. }
  430. }
  431. private void readConnectRequest() throws IOException, InterruptedException {
  432. BinaryInputArchive bia = BinaryInputArchive
  433. .getArchive(new ByteBufferInputStream(incomingBuffer));
  434. ConnectRequest connReq = new ConnectRequest();
  435. connReq.deserialize(bia, "connect");
  436. LOG.warn("Connected to " + sock.socket().getRemoteSocketAddress()
  437. + " lastZxid " + connReq.getLastZxidSeen());
  438. if (zk == null) {
  439. throw new IOException("ZooKeeperServer not running");
  440. }
  441. if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) {
  442. LOG.error("Client has seen zxid 0x"
  443. + Long.toHexString(connReq.getLastZxidSeen())
  444. + " our last zxid is 0x"
  445. + Long.toHexString(zk.dataTree.lastProcessedZxid));
  446. throw new IOException("We are out of date");
  447. }
  448. sessionTimeout = connReq.getTimeOut();
  449. byte passwd[] = connReq.getPasswd();
  450. if (sessionTimeout < zk.tickTime * 2) {
  451. sessionTimeout = zk.tickTime * 2;
  452. }
  453. if (sessionTimeout > zk.tickTime * 20) {
  454. sessionTimeout = zk.tickTime * 20;
  455. }
  456. // We don't want to receive any packets until we are sure that the
  457. // session is setup
  458. disableRecv();
  459. if (connReq.getSessionId() != 0) {
  460. setSessionId(connReq.getSessionId());
  461. zk.reopenSession(this, sessionId, passwd, sessionTimeout);
  462. LOG.warn("Renewing session 0x" + Long.toHexString(sessionId));
  463. } else {
  464. zk.createSession(this, passwd, sessionTimeout);
  465. LOG.warn("Creating new session 0x"
  466. + Long.toHexString(sessionId));
  467. }
  468. initialized = true;
  469. }
  470. private void readLength(SelectionKey k) throws IOException {
  471. // Read the length, now get the buffer
  472. int len = lenBuffer.getInt();
  473. if (!initialized) {
  474. // We take advantage of the limited size of the length to look
  475. // for cmds. They are all 4-bytes which fits inside of an int
  476. if (len == ruokCmd) {
  477. sendBuffer(imok.duplicate());
  478. sendBuffer(NIOServerCnxn.closeConn);
  479. k.interestOps(SelectionKey.OP_WRITE);
  480. return;
  481. } else if (len == killCmd) {
  482. System.exit(0);
  483. } else if (len == getTraceMaskCmd) {
  484. long traceMask = ZooTrace.getTextTraceLevel();
  485. ByteBuffer resp = ByteBuffer.allocate(8);
  486. resp.putLong(traceMask);
  487. resp.flip();
  488. sendBuffer(resp);
  489. sendBuffer(NIOServerCnxn.closeConn);
  490. k.interestOps(SelectionKey.OP_WRITE);
  491. return;
  492. } else if (len == setTraceMaskCmd) {
  493. incomingBuffer = ByteBuffer.allocate(8);
  494. int rc = sock.read(incomingBuffer);
  495. if (rc < 0) {
  496. throw new IOException("Read error");
  497. }
  498. System.out.println("rc=" + rc);
  499. incomingBuffer.flip();
  500. long traceMask = incomingBuffer.getLong();
  501. ZooTrace.setTextTraceLevel(traceMask);
  502. ByteBuffer resp = ByteBuffer.allocate(8);
  503. resp.putLong(traceMask);
  504. resp.flip();
  505. sendBuffer(resp);
  506. sendBuffer(NIOServerCnxn.closeConn);
  507. k.interestOps(SelectionKey.OP_WRITE);
  508. return;
  509. } else if (len == dumpCmd) {
  510. if (zk == null) {
  511. sendBuffer(ByteBuffer.wrap("ZooKeeper not active \n"
  512. .getBytes()));
  513. } else {
  514. StringBuffer sb = new StringBuffer();
  515. sb.append("SessionTracker dump: \n");
  516. sb.append(zk.sessionTracker.toString()).append("\n");
  517. sb.append("ephemeral nodes dump:\n");
  518. sb.append(zk.dataTree.dumpEphemerals()).append("\n");
  519. sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
  520. }
  521. k.interestOps(SelectionKey.OP_WRITE);
  522. return;
  523. } else if (len == reqsCmd) {
  524. StringBuffer sb = new StringBuffer();
  525. sb.append("Requests:\n");
  526. synchronized (outstanding) {
  527. for (Request r : outstanding) {
  528. sb.append(r.toString());
  529. sb.append('\n');
  530. }
  531. }
  532. sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
  533. k.interestOps(SelectionKey.OP_WRITE);
  534. return;
  535. } else if (len == statCmd) {
  536. StringBuffer sb = new StringBuffer();
  537. if(zk!=null){
  538. sb.append("Zookeeper version: ").append(Version.getFullVersion())
  539. .append("\n");
  540. sb.append("Clients:\n");
  541. synchronized(factory.cnxns){
  542. for(NIOServerCnxn c : factory.cnxns){
  543. sb.append(c.getStats().toString());
  544. }
  545. }
  546. sb.append("\n");
  547. sb.append(ServerStats.getInstance().toString());
  548. sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
  549. append("\n");
  550. }else
  551. sb.append("ZooKeeperServer not running\n");
  552. sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
  553. k.interestOps(SelectionKey.OP_WRITE);
  554. return;
  555. }
  556. }
  557. if (len < 0 || len > BinaryInputArchive.maxBuffer) {
  558. throw new IOException("Len error " + len);
  559. }
  560. if (zk == null) {
  561. throw new IOException("ZooKeeperServer not running");
  562. }
  563. incomingBuffer = ByteBuffer.allocate(len);
  564. }
  565. /**
  566. * The number of requests that have been submitted but not yet responded to.
  567. */
  568. int outstandingRequests;
  569. /*
  570. * (non-Javadoc)
  571. *
  572. * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionTimeout()
  573. */
  574. public int getSessionTimeout() {
  575. return sessionTimeout;
  576. }
  577. /**
  578. * This is the id that uniquely identifies the session of a client. Once
  579. * this session is no longer active, the ephemeral nodes will go away.
  580. */
  581. long sessionId;
  582. static long nextSessionId = 1;
  583. public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
  584. SelectionKey sk, Factory factory) throws IOException {
  585. this.zk = zk;
  586. this.sock = sock;
  587. this.sk = sk;
  588. this.factory = factory;
  589. sock.socket().setTcpNoDelay(true);
  590. sock.socket().setSoLinger(true, 2);
  591. InetAddress addr = ((InetSocketAddress) sock.socket()
  592. .getRemoteSocketAddress()).getAddress();
  593. authInfo.add(new Id("ip", addr.getHostAddress()));
  594. authInfo.add(new Id("host", addr.getCanonicalHostName()));
  595. sk.interestOps(SelectionKey.OP_READ);
  596. }
  597. @Override
  598. public String toString() {
  599. return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
  600. }
  601. boolean closed;
  602. /*
  603. * (non-Javadoc)
  604. *
  605. * @see org.apache.zookeeper.server.ServerCnxnIface#close()
  606. */
  607. public void close() {
  608. if (closed) {
  609. return;
  610. }
  611. closed = true;
  612. synchronized (factory.cnxns) {
  613. factory.cnxns.remove(this);
  614. }
  615. if (zk != null) {
  616. zk.removeCnxn(this);
  617. }
  618. ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
  619. "close NIOServerCnxn: " + sock);
  620. try {
  621. /*
  622. * The following sequence of code is stupid! You would think that
  623. * only sock.close() is needed, but alas, it doesn't work that way.
  624. * If you just do sock.close() there are cases where the socket
  625. * doesn't actually close...
  626. */
  627. sock.socket().shutdownOutput();
  628. } catch (IOException e) {
  629. // This is a relatively common exception that we can't avoid
  630. }
  631. try {
  632. sock.socket().shutdownInput();
  633. } catch (IOException e) {
  634. LOG.warn("ignoring exception during input shutdown", e);
  635. }
  636. try {
  637. sock.socket().close();
  638. } catch (IOException e) {
  639. LOG.warn("ignoring exception during socket close", e);
  640. }
  641. try {
  642. sock.close();
  643. // XXX The next line doesn't seem to be needed, but some posts
  644. // to forums suggest that it is needed. Keep in mind if errors in
  645. // this section arise.
  646. // factory.selector.wakeup();
  647. } catch (IOException e) {
  648. LOG.warn("ignoring exception during socketchannel close", e);
  649. }
  650. sock = null;
  651. if (sk != null) {
  652. try {
  653. // need to cancel this selection key from the selector
  654. sk.cancel();
  655. } catch (Exception e) {
  656. LOG.warn("ignoring exception during selectionkey cancel", e);
  657. }
  658. }
  659. }
  660. private final static byte fourBytes[] = new byte[4];
  661. /*
  662. * (non-Javadoc)
  663. *
  664. * @see org.apache.zookeeper.server.ServerCnxnIface#sendResponse(org.apache.zookeeper.proto.ReplyHeader,
  665. * org.apache.jute.Record, java.lang.String)
  666. */
  667. synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
  668. if (closed) {
  669. return;
  670. }
  671. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  672. // Make space for length
  673. BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
  674. try {
  675. baos.write(fourBytes);
  676. bos.writeRecord(h, "header");
  677. if (r != null) {
  678. bos.writeRecord(r, tag);
  679. }
  680. baos.close();
  681. } catch (IOException e) {
  682. LOG.error("Error serializing response");
  683. }
  684. byte b[] = baos.toByteArray();
  685. ByteBuffer bb = ByteBuffer.wrap(b);
  686. bb.putInt(b.length - 4).rewind();
  687. sendBuffer(bb);
  688. if (h.getXid() > 0) {
  689. synchronized (this.factory) {
  690. outstandingRequests--;
  691. // check throttling
  692. if (zk.getInProcess() < factory.outstandingLimit
  693. || outstandingRequests < 1) {
  694. sk.selector().wakeup();
  695. enableRecv();
  696. }
  697. }
  698. }
  699. }
  700. /*
  701. * (non-Javadoc)
  702. *
  703. * @see org.apache.zookeeper.server.ServerCnxnIface#process(org.apache.zookeeper.proto.WatcherEvent)
  704. */
  705. synchronized public void process(WatchedEvent event) {
  706. ReplyHeader h = new ReplyHeader(-1, -1L, 0);
  707. ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
  708. "Deliver event " + event + " to 0x"
  709. + Long.toHexString(this.sessionId)
  710. + " through " + this);
  711. // Convert WatchedEvent to a type that can be sent over the wire
  712. WatcherEvent e = event.getWrapper();
  713. sendResponse(h, e, "notification");
  714. }
  715. public void finishSessionInit(boolean valid) {
  716. try {
  717. ConnectResponse rsp = new ConnectResponse(0, valid ? sessionTimeout
  718. : 0, valid ? sessionId : 0, // send 0 if session is no
  719. // longer valid
  720. valid ? zk.generatePasswd(sessionId) : new byte[16]);
  721. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  722. BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
  723. bos.writeInt(-1, "len");
  724. rsp.serialize(bos, "connect");
  725. baos.close();
  726. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  727. bb.putInt(bb.remaining() - 4).rewind();
  728. sendBuffer(bb);
  729. LOG.warn("Finished init of 0x" + Long.toHexString(sessionId)
  730. + ": " + valid);
  731. if (!valid) {
  732. sendBuffer(closeConn);
  733. }
  734. // Now that the session is ready we can start receiving packets
  735. synchronized (this.factory) {
  736. sk.selector().wakeup();
  737. enableRecv();
  738. }
  739. } catch (Exception e) {
  740. LOG.error("FIXMSG",e);
  741. close();
  742. }
  743. }
  744. /*
  745. * (non-Javadoc)
  746. *
  747. * @see org.apache.zookeeper.server.ServerCnxnIface#getSessionId()
  748. */
  749. public long getSessionId() {
  750. return sessionId;
  751. }
  752. public void setSessionId(long sessionId) {
  753. this.sessionId = sessionId;
  754. }
  755. public ArrayList<Id> getAuthInfo() {
  756. return authInfo;
  757. }
  758. public InetSocketAddress getRemoteAddress() {
  759. return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
  760. }
  761. private class CnxnStats implements ServerCnxn.Stats{
  762. long packetsReceived;
  763. long packetsSent;
  764. /**
  765. * The number of requests that have been submitted but not yet responded to.
  766. */
  767. public long getOutstandingRequests() {
  768. return outstandingRequests;
  769. }
  770. public long getPacketsReceived() {
  771. return packetsReceived;
  772. }
  773. public long getPacketsSent() {
  774. return packetsSent;
  775. }
  776. @Override
  777. public String toString(){
  778. StringBuilder sb=new StringBuilder();
  779. Channel channel = sk.channel();
  780. if (channel instanceof SocketChannel) {
  781. sb.append(" ").append(((SocketChannel)channel).socket()
  782. .getRemoteSocketAddress())
  783. .append("[").append(Integer.toHexString(sk.interestOps()))
  784. .append("](queued=").append(getOutstandingRequests())
  785. .append(",recved=").append(getPacketsReceived())
  786. .append(",sent=").append(getPacketsSent()).append(")\n");
  787. }
  788. return sb.toString();
  789. }
  790. }
  791. private CnxnStats stats=new CnxnStats();
  792. public Stats getStats() {
  793. return stats;
  794. }
  795. }