NIOServerCnxn.java 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package com.yahoo.zookeeper.server;
  17. import java.io.ByteArrayOutputStream;
  18. import java.io.IOException;
  19. import java.io.InputStream;
  20. import java.net.InetAddress;
  21. import java.net.InetSocketAddress;
  22. import java.nio.ByteBuffer;
  23. import java.nio.channels.CancelledKeyException;
  24. import java.nio.channels.Channel;
  25. import java.nio.channels.SelectionKey;
  26. import java.nio.channels.Selector;
  27. import java.nio.channels.ServerSocketChannel;
  28. import java.nio.channels.SocketChannel;
  29. import java.util.ArrayList;
  30. import java.util.Collections;
  31. import java.util.HashSet;
  32. import java.util.Iterator;
  33. import java.util.LinkedList;
  34. import java.util.Set;
  35. import java.util.concurrent.LinkedBlockingQueue;
  36. import org.apache.log4j.Logger;
  37. import com.yahoo.jute.BinaryInputArchive;
  38. import com.yahoo.jute.BinaryOutputArchive;
  39. import com.yahoo.jute.Record;
  40. import com.yahoo.zookeeper.KeeperException;
  41. import com.yahoo.zookeeper.Version;
  42. import com.yahoo.zookeeper.Watcher;
  43. import com.yahoo.zookeeper.ZooDefs.OpCode;
  44. import com.yahoo.zookeeper.data.Id;
  45. import com.yahoo.zookeeper.proto.AuthPacket;
  46. import com.yahoo.zookeeper.proto.ConnectRequest;
  47. import com.yahoo.zookeeper.proto.ConnectResponse;
  48. import com.yahoo.zookeeper.proto.ReplyHeader;
  49. import com.yahoo.zookeeper.proto.RequestHeader;
  50. import com.yahoo.zookeeper.proto.WatcherEvent;
  51. import com.yahoo.zookeeper.server.auth.AuthenticationProvider;
  52. import com.yahoo.zookeeper.server.auth.ProviderRegistry;
  53. /**
  54. * This class handles communication with clients using NIO. There is one per
  55. * client, but only one thread doing the communication.
  56. */
  57. public class NIOServerCnxn implements Watcher, ServerCnxn {
  58. private static final Logger LOG = Logger.getLogger(NIOServerCnxn.class);
  59. static public class Factory extends Thread {
  60. ZooKeeperServer zks;
  61. ServerSocketChannel ss;
  62. Selector selector = Selector.open();
  63. /**
  64. * We use this buffer to do efficient socket I/O. Since there is a single
  65. * sender thread per NIOServerCnxn instance, we can use a member variable to
  66. * only allocate it once.
  67. */
  68. ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
  69. HashSet<NIOServerCnxn> cnxns = new HashSet<NIOServerCnxn>();
  70. int outstandingLimit = 1;
  71. public Factory(int port) throws IOException {
  72. super("NIOServerCxn.Factory");
  73. setDaemon(true);
  74. this.ss = ServerSocketChannel.open();
  75. ss.socket().bind(new InetSocketAddress(port));
  76. ss.configureBlocking(false);
  77. ss.register(selector, SelectionKey.OP_ACCEPT);
  78. start();
  79. }
  80. public void startup(ZooKeeperServer zks) throws IOException,
  81. InterruptedException {
  82. zks.startup();
  83. setZooKeeperServer(zks);
  84. }
  85. public void setZooKeeperServer(ZooKeeperServer zks) {
  86. this.zks = zks;
  87. if (zks != null) {
  88. this.outstandingLimit = zks.getGlobalOutstandingLimit();
  89. zks.setServerCnxnFactory(this);
  90. } else {
  91. this.outstandingLimit = 1;
  92. }
  93. }
  94. public InetSocketAddress getLocalAddress(){
  95. return (InetSocketAddress)ss.socket().getLocalSocketAddress();
  96. }
  97. private void addCnxn(NIOServerCnxn cnxn) {
  98. synchronized (cnxns) {
  99. cnxns.add(cnxn);
  100. }
  101. }
  102. protected NIOServerCnxn createConnection(SocketChannel sock,
  103. SelectionKey sk) throws IOException {
  104. return new NIOServerCnxn(zks, sock, sk, this);
  105. }
  106. public void run() {
  107. while (!ss.socket().isClosed()) {
  108. try {
  109. selector.select(1000);
  110. Set<SelectionKey> selected;
  111. synchronized (this) {
  112. selected = selector.selectedKeys();
  113. }
  114. ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
  115. selected);
  116. Collections.shuffle(selectedList);
  117. for (SelectionKey k : selectedList) {
  118. if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
  119. SocketChannel sc = ((ServerSocketChannel) k
  120. .channel()).accept();
  121. sc.configureBlocking(false);
  122. SelectionKey sk = sc.register(selector,
  123. SelectionKey.OP_READ);
  124. NIOServerCnxn cnxn = createConnection(sc, sk);
  125. sk.attach(cnxn);
  126. addCnxn(cnxn);
  127. } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
  128. NIOServerCnxn c = (NIOServerCnxn) k.attachment();
  129. c.doIO(k);
  130. }
  131. }
  132. selected.clear();
  133. } catch (Exception e) {
  134. LOG.error("FIXMSG",e);
  135. }
  136. }
  137. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  138. "NIOServerCnxn factory exitedloop.");
  139. clear();
  140. LOG.error("=====> Goodbye cruel world <======");
  141. // System.exit(0);
  142. }
  143. /**
  144. * clear all the connections in the selector
  145. *
  146. */
  147. synchronized public void clear() {
  148. selector.wakeup();
  149. synchronized (cnxns) {
  150. // got to clear all the connections that we have in the selector
  151. for (Iterator<NIOServerCnxn> it = cnxns.iterator(); it
  152. .hasNext();) {
  153. NIOServerCnxn cnxn = it.next();
  154. it.remove();
  155. try {
  156. cnxn.close();
  157. } catch (Exception e) {
  158. // Do nothing.
  159. }
  160. }
  161. }
  162. }
  163. public void shutdown() {
  164. try {
  165. ss.close();
  166. clear();
  167. this.interrupt();
  168. this.join();
  169. } catch (InterruptedException e) {
  170. LOG.warn("Interrupted",e);
  171. } catch (Exception e) {
  172. LOG.error("Unexpected exception", e);
  173. }
  174. if (zks != null) {
  175. zks.shutdown();
  176. }
  177. }
  178. synchronized void closeSession(long sessionId) {
  179. selector.wakeup();
  180. synchronized (cnxns) {
  181. for (Iterator<NIOServerCnxn> it = cnxns.iterator(); it
  182. .hasNext();) {
  183. NIOServerCnxn cnxn = it.next();
  184. if (cnxn.sessionId == sessionId) {
  185. it.remove();
  186. try {
  187. cnxn.close();
  188. } catch (Exception e) {
  189. }
  190. break;
  191. }
  192. }
  193. }
  194. }
  195. }
  196. /**
  197. * The buffer will cause the connection to be close when we do a send.
  198. */
  199. static final ByteBuffer closeConn = ByteBuffer.allocate(0);
  200. Factory factory;
  201. ZooKeeperServer zk;
  202. private SocketChannel sock;
  203. private SelectionKey sk;
  204. boolean initialized;
  205. ByteBuffer lenBuffer = ByteBuffer.allocate(4);
  206. ByteBuffer incomingBuffer = lenBuffer;
  207. LinkedBlockingQueue<ByteBuffer> outgoingBuffers = new LinkedBlockingQueue<ByteBuffer>();
  208. int sessionTimeout;
  209. ArrayList<Id> authInfo = new ArrayList<Id>();
  210. LinkedList<Request> outstanding = new LinkedList<Request>();
  211. void sendBuffer(ByteBuffer bb) {
  212. synchronized (factory) {
  213. try {
  214. sk.selector().wakeup();
  215. // ZooLog.logTraceMessage(LOG,
  216. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  217. // "Add a buffer to outgoingBuffers");
  218. // ZooLog.logTraceMessage(LOG,
  219. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  220. //"sk " + sk + " is valid: " +
  221. // sk.isValid(), );
  222. outgoingBuffers.add(bb);
  223. if (sk.isValid()) {
  224. sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
  225. }
  226. } catch (RuntimeException e) {
  227. LOG.error("FIXMSG",e);
  228. throw e;
  229. }
  230. }
  231. }
  232. void doIO(SelectionKey k) throws InterruptedException {
  233. try {
  234. if (sock == null) {
  235. return;
  236. }
  237. if (k.isReadable()) {
  238. int rc = sock.read(incomingBuffer);
  239. if (rc < 0) {
  240. throw new IOException("Read error");
  241. }
  242. if (incomingBuffer.remaining() == 0) {
  243. incomingBuffer.flip();
  244. if (incomingBuffer == lenBuffer) {
  245. readLength(k);
  246. } else if (!initialized) {
  247. stats.packetsReceived++;
  248. ServerStats.getInstance().incrementPacketsReceived();
  249. readConnectRequest();
  250. lenBuffer.clear();
  251. incomingBuffer = lenBuffer;
  252. } else {
  253. stats.packetsReceived++;
  254. ServerStats.getInstance().incrementPacketsReceived();
  255. readRequest();
  256. lenBuffer.clear();
  257. incomingBuffer = lenBuffer;
  258. }
  259. }
  260. }
  261. if (k.isWritable()) {
  262. // ZooLog.logTraceMessage(LOG,
  263. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
  264. // "outgoingBuffers.size() = " +
  265. // outgoingBuffers.size());
  266. if (outgoingBuffers.size() > 0) {
  267. // ZooLog.logTraceMessage(LOG,
  268. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
  269. // "sk " + k + " is valid: " +
  270. // k.isValid());
  271. /*
  272. * This is going to reset the buffer position to 0 and the
  273. * limit to the size of the buffer, so that we can fill it
  274. * with data from the non-direct buffers that we need to
  275. * send.
  276. */
  277. ByteBuffer directBuffer = factory.directBuffer;
  278. directBuffer.clear();
  279. for (ByteBuffer b : outgoingBuffers) {
  280. if (directBuffer.remaining() < b.remaining()) {
  281. /*
  282. * When we call put later, if the directBuffer is to
  283. * small to hold everything, nothing will be copied,
  284. * so we've got to slice the buffer if it's too big.
  285. */
  286. b = (ByteBuffer) b.slice().limit(
  287. directBuffer.remaining());
  288. }
  289. /*
  290. * put() is going to modify the positions of both
  291. * buffers, put we don't want to change the position of
  292. * the source buffers (we'll do that after the send, if
  293. * needed), so we save and reset the position after the
  294. * copy
  295. */
  296. int p = b.position();
  297. directBuffer.put(b);
  298. b.position(p);
  299. if (directBuffer.remaining() == 0) {
  300. break;
  301. }
  302. }
  303. /*
  304. * Do the flip: limit becomes position, position gets set to
  305. * 0. This sets us up for the write.
  306. */
  307. directBuffer.flip();
  308. int sent = sock.write(directBuffer);
  309. ByteBuffer bb;
  310. // Remove the buffers that we have sent
  311. while (outgoingBuffers.size() > 0) {
  312. bb = outgoingBuffers.peek();
  313. if (bb == closeConn) {
  314. throw new IOException("closing");
  315. }
  316. int left = bb.remaining() - sent;
  317. if (left > 0) {
  318. /*
  319. * We only partially sent this buffer, so we update
  320. * the position and exit the loop.
  321. */
  322. bb.position(bb.position() + sent);
  323. break;
  324. }
  325. stats.packetsSent++;
  326. /* We've sent the whole buffer, so drop the buffer */
  327. sent -= bb.remaining();
  328. ServerStats.getInstance().incrementPacketsSent();
  329. outgoingBuffers.remove();
  330. }
  331. // ZooLog.logTraceMessage(LOG,
  332. // ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
  333. // outgoingBuffers.size() = " + outgoingBuffers.size());
  334. }
  335. synchronized (this) {
  336. if (outgoingBuffers.size() == 0) {
  337. if (!initialized
  338. && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
  339. throw new IOException("Responded to info probe");
  340. }
  341. sk.interestOps(sk.interestOps()
  342. & (~SelectionKey.OP_WRITE));
  343. } else {
  344. sk.interestOps(sk.interestOps()
  345. | SelectionKey.OP_WRITE);
  346. }
  347. }
  348. }
  349. } catch (CancelledKeyException e) {
  350. close();
  351. } catch (IOException e) {
  352. // LOG.error("FIXMSG",e);
  353. close();
  354. }
  355. }
  356. private void readRequest() throws IOException {
  357. // We have the request, now process and setup for next
  358. InputStream bais = new ByteBufferInputStream(incomingBuffer);
  359. BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
  360. RequestHeader h = new RequestHeader();
  361. h.deserialize(bia, "header");
  362. // Through the magic of byte buffers, txn will not be
  363. // pointing
  364. // to the start of the txn
  365. incomingBuffer = incomingBuffer.slice();
  366. if (h.getType() == OpCode.auth) {
  367. AuthPacket authPacket = new AuthPacket();
  368. ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
  369. String scheme = authPacket.getScheme();
  370. AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
  371. if (ap == null
  372. || ap.handleAuthentication(this, authPacket.getAuth()) != KeeperException.Code.Ok) {
  373. if (ap == null)
  374. LOG.error("No authentication provider for scheme: "
  375. + scheme);
  376. else
  377. LOG.error("Authentication failed for scheme: "
  378. + scheme);
  379. // send a response...
  380. ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
  381. KeeperException.Code.AuthFailed);
  382. sendResponse(rh, null, null);
  383. // ... and close connection
  384. sendBuffer(NIOServerCnxn.closeConn);
  385. disableRecv();
  386. } else {
  387. LOG.error("Authentication succeeded for scheme: "
  388. + scheme);
  389. ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
  390. KeeperException.Code.Ok);
  391. sendResponse(rh, null, null);
  392. }
  393. return;
  394. } else {
  395. zk.submitRequest(this, sessionId, h.getType(), h.getXid(),
  396. incomingBuffer, authInfo);
  397. }
  398. if (h.getXid() >= 0) {
  399. synchronized (this) {
  400. outstandingRequests++;
  401. // check throttling
  402. if (zk.getInProcess() > factory.outstandingLimit) {
  403. disableRecv();
  404. // following lines should not be needed since we are already
  405. // reading
  406. // } else {
  407. // enableRecv();
  408. }
  409. }
  410. }
  411. }
  412. public void disableRecv() {
  413. sk.interestOps(sk.interestOps() & (~SelectionKey.OP_READ));
  414. }
  415. public void enableRecv() {
  416. if (sk.isValid()) {
  417. int interest = sk.interestOps();
  418. if ((interest & SelectionKey.OP_READ) == 0) {
  419. sk.interestOps(interest | SelectionKey.OP_READ);
  420. }
  421. }
  422. }
  423. private void readConnectRequest() throws IOException, InterruptedException {
  424. BinaryInputArchive bia = BinaryInputArchive
  425. .getArchive(new ByteBufferInputStream(incomingBuffer));
  426. ConnectRequest connReq = new ConnectRequest();
  427. connReq.deserialize(bia, "connect");
  428. LOG.warn("Connected to " + sock.socket().getRemoteSocketAddress()
  429. + " lastZxid " + connReq.getLastZxidSeen());
  430. if (zk == null) {
  431. throw new IOException("ZooKeeperServer not running");
  432. }
  433. if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) {
  434. LOG.error("Client has seen "
  435. + Long.toHexString(connReq.getLastZxidSeen())
  436. + " our last zxid is "
  437. + Long.toHexString(zk.dataTree.lastProcessedZxid));
  438. throw new IOException("We are out of date");
  439. }
  440. sessionTimeout = connReq.getTimeOut();
  441. byte passwd[] = connReq.getPasswd();
  442. if (sessionTimeout < zk.tickTime * 2) {
  443. sessionTimeout = zk.tickTime * 2;
  444. }
  445. if (sessionTimeout > zk.tickTime * 20) {
  446. sessionTimeout = zk.tickTime * 20;
  447. }
  448. // We don't want to receive any packets until we are sure that the
  449. // session is setup
  450. disableRecv();
  451. if (connReq.getSessionId() != 0) {
  452. setSessionId(connReq.getSessionId());
  453. zk.reopenSession(this, sessionId, passwd, sessionTimeout);
  454. LOG.warn("Renewing session " + Long.toHexString(sessionId));
  455. } else {
  456. zk.createSession(this, passwd, sessionTimeout);
  457. LOG.warn("Creating new session "
  458. + Long.toHexString(sessionId));
  459. }
  460. initialized = true;
  461. }
  462. private void readLength(SelectionKey k) throws IOException {
  463. // Read the length, now get the buffer
  464. int len = lenBuffer.getInt();
  465. if (!initialized) {
  466. // We take advantage of the limited size of the length to look
  467. // for cmds. They are all 4-bytes which fits inside of an int
  468. if (len == ruokCmd) {
  469. sendBuffer(imok.duplicate());
  470. sendBuffer(NIOServerCnxn.closeConn);
  471. k.interestOps(SelectionKey.OP_WRITE);
  472. return;
  473. } else if (len == killCmd) {
  474. System.exit(0);
  475. } else if (len == getTraceMaskCmd) {
  476. long traceMask = ZooTrace.getTextTraceLevel();
  477. ByteBuffer resp = ByteBuffer.allocate(8);
  478. resp.putLong(traceMask);
  479. resp.flip();
  480. sendBuffer(resp);
  481. sendBuffer(NIOServerCnxn.closeConn);
  482. k.interestOps(SelectionKey.OP_WRITE);
  483. return;
  484. } else if (len == setTraceMaskCmd) {
  485. incomingBuffer = ByteBuffer.allocate(8);
  486. int rc = sock.read(incomingBuffer);
  487. if (rc < 0) {
  488. throw new IOException("Read error");
  489. }
  490. System.out.println("rc=" + rc);
  491. incomingBuffer.flip();
  492. long traceMask = incomingBuffer.getLong();
  493. ZooTrace.setTextTraceLevel(traceMask);
  494. ByteBuffer resp = ByteBuffer.allocate(8);
  495. resp.putLong(traceMask);
  496. resp.flip();
  497. sendBuffer(resp);
  498. sendBuffer(NIOServerCnxn.closeConn);
  499. k.interestOps(SelectionKey.OP_WRITE);
  500. return;
  501. } else if (len == dumpCmd) {
  502. if (zk == null) {
  503. sendBuffer(ByteBuffer.wrap("ZooKeeper not active \n"
  504. .getBytes()));
  505. } else {
  506. StringBuffer sb = new StringBuffer();
  507. sb.append("SessionTracker dump: \n");
  508. sb.append(zk.sessionTracker.toString()).append("\n");
  509. sb.append("ephemeral nodes dump:\n");
  510. sb.append(zk.dataTree.dumpEphemerals()).append("\n");
  511. sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
  512. }
  513. k.interestOps(SelectionKey.OP_WRITE);
  514. return;
  515. } else if (len == reqsCmd) {
  516. StringBuffer sb = new StringBuffer();
  517. sb.append("Requests:\n");
  518. synchronized (outstanding) {
  519. for (Request r : outstanding) {
  520. sb.append(r.toString());
  521. sb.append('\n');
  522. }
  523. }
  524. sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
  525. k.interestOps(SelectionKey.OP_WRITE);
  526. return;
  527. } else if (len == statCmd) {
  528. StringBuffer sb = new StringBuffer();
  529. if(zk!=null){
  530. sb.append("Zookeeper version: ").append(Version.getFullVersion())
  531. .append("\n");
  532. sb.append("Clients:\n");
  533. synchronized(factory.cnxns){
  534. for(NIOServerCnxn c : factory.cnxns){
  535. sb.append(c.getStats().toString());
  536. }
  537. }
  538. sb.append("\n");
  539. sb.append(ServerStats.getInstance().toString());
  540. sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
  541. append("\n");
  542. }else
  543. sb.append("ZooKeeperServer not running\n");
  544. sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
  545. k.interestOps(SelectionKey.OP_WRITE);
  546. return;
  547. }
  548. }
  549. if (len < 0 || len > BinaryInputArchive.maxBuffer) {
  550. throw new IOException("Len error " + len);
  551. }
  552. if (zk == null) {
  553. throw new IOException("ZooKeeperServer not running");
  554. }
  555. incomingBuffer = ByteBuffer.allocate(len);
  556. }
  557. /**
  558. * The number of requests that have been submitted but not yet responded to.
  559. */
  560. int outstandingRequests;
  561. /*
  562. * (non-Javadoc)
  563. *
  564. * @see com.yahoo.zookeeper.server.ServerCnxnIface#getSessionTimeout()
  565. */
  566. public int getSessionTimeout() {
  567. return sessionTimeout;
  568. }
  569. /**
  570. * This is the id that uniquely identifies the session of a client. Once
  571. * this session is no longer active, the ephemeral nodes will go away.
  572. */
  573. long sessionId;
  574. static long nextSessionId = 1;
  575. public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
  576. SelectionKey sk, Factory factory) throws IOException {
  577. this.zk = zk;
  578. this.sock = sock;
  579. this.sk = sk;
  580. this.factory = factory;
  581. sock.socket().setTcpNoDelay(true);
  582. sock.socket().setSoLinger(true, 2);
  583. InetAddress addr = ((InetSocketAddress) sock.socket()
  584. .getRemoteSocketAddress()).getAddress();
  585. authInfo.add(new Id("ip", addr.getHostAddress()));
  586. authInfo.add(new Id("host", addr.getCanonicalHostName()));
  587. sk.interestOps(SelectionKey.OP_READ);
  588. }
  589. public String toString() {
  590. return "NIOServerCnxn object with sock = " + sock + " and sk = " + sk;
  591. }
  592. boolean closed;
  593. /*
  594. * (non-Javadoc)
  595. *
  596. * @see com.yahoo.zookeeper.server.ServerCnxnIface#close()
  597. */
  598. public void close() {
  599. if (closed) {
  600. return;
  601. }
  602. closed = true;
  603. synchronized (factory.cnxns) {
  604. factory.cnxns.remove(this);
  605. }
  606. if (zk != null) {
  607. zk.removeCnxn(this);
  608. }
  609. ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
  610. "close NIOServerCnxn: " + sock);
  611. try {
  612. /*
  613. * The following sequence of code is stupid! You would think that
  614. * only sock.close() is needed, but alas, it doesn't work that way.
  615. * If you just do sock.close() there are cases where the socket
  616. * doesn't actually close...
  617. */
  618. sock.socket().shutdownOutput();
  619. } catch (IOException e) {
  620. // This is a relatively common exception that we can't avoid
  621. }
  622. try {
  623. sock.socket().shutdownInput();
  624. } catch (IOException e) {
  625. }
  626. try {
  627. sock.socket().close();
  628. } catch (IOException e) {
  629. LOG.error("FIXMSG",e);
  630. }
  631. try {
  632. sock.close();
  633. // XXX The next line doesn't seem to be needed, but some posts
  634. // to forums suggest that it is needed. Keep in mind if errors in
  635. // this section arise.
  636. // factory.selector.wakeup();
  637. } catch (IOException e) {
  638. LOG.error("FIXMSG",e);
  639. }
  640. sock = null;
  641. if (sk != null) {
  642. try {
  643. // need to cancel this selection key from the selector
  644. sk.cancel();
  645. } catch (Exception e) {
  646. }
  647. }
  648. }
  649. private final static byte fourBytes[] = new byte[4];
  650. /*
  651. * (non-Javadoc)
  652. *
  653. * @see com.yahoo.zookeeper.server.ServerCnxnIface#sendResponse(com.yahoo.zookeeper.proto.ReplyHeader,
  654. * com.yahoo.jute.Record, java.lang.String)
  655. */
  656. synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
  657. if (closed) {
  658. return;
  659. }
  660. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  661. // Make space for length
  662. BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
  663. try {
  664. baos.write(fourBytes);
  665. bos.writeRecord(h, "header");
  666. if (r != null) {
  667. bos.writeRecord(r, tag);
  668. }
  669. baos.close();
  670. } catch (IOException e) {
  671. LOG.error("Error serializing response");
  672. }
  673. byte b[] = baos.toByteArray();
  674. ByteBuffer bb = ByteBuffer.wrap(b);
  675. bb.putInt(b.length - 4).rewind();
  676. sendBuffer(bb);
  677. if (h.getXid() > 0) {
  678. synchronized (this.factory) {
  679. outstandingRequests--;
  680. // check throttling
  681. if (zk.getInProcess() < factory.outstandingLimit
  682. || outstandingRequests < 1) {
  683. sk.selector().wakeup();
  684. enableRecv();
  685. }
  686. }
  687. }
  688. }
  689. /*
  690. * (non-Javadoc)
  691. *
  692. * @see com.yahoo.zookeeper.server.ServerCnxnIface#process(com.yahoo.zookeeper.proto.WatcherEvent)
  693. */
  694. synchronized public void process(WatcherEvent event) {
  695. ReplyHeader h = new ReplyHeader(-1, -1L, 0);
  696. ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
  697. "Deliver event " + event + " to "
  698. + this.sessionId + " through " + this);
  699. sendResponse(h, event, "notification");
  700. }
  701. public void finishSessionInit(boolean valid) {
  702. try {
  703. ConnectResponse rsp = new ConnectResponse(0, valid ? sessionTimeout
  704. : 0, valid ? sessionId : 0, // send 0 if session is no
  705. // longer valid
  706. valid ? zk.generatePasswd(sessionId) : new byte[16]);
  707. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  708. BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
  709. bos.writeInt(-1, "len");
  710. rsp.serialize(bos, "connect");
  711. baos.close();
  712. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  713. bb.putInt(bb.remaining() - 4).rewind();
  714. sendBuffer(bb);
  715. LOG.warn("Finished init of " + Long.toHexString(sessionId)
  716. + ": " + valid);
  717. if (!valid) {
  718. sendBuffer(closeConn);
  719. }
  720. // Now that the session is ready we can start receiving packets
  721. synchronized (this.factory) {
  722. sk.selector().wakeup();
  723. enableRecv();
  724. }
  725. } catch (Exception e) {
  726. LOG.error("FIXMSG",e);
  727. close();
  728. }
  729. }
  730. /*
  731. * (non-Javadoc)
  732. *
  733. * @see com.yahoo.zookeeper.server.ServerCnxnIface#getSessionId()
  734. */
  735. public long getSessionId() {
  736. return sessionId;
  737. }
  738. public void setSessionId(long sessionId) {
  739. this.sessionId = sessionId;
  740. }
  741. public ArrayList<Id> getAuthInfo() {
  742. return authInfo;
  743. }
  744. public InetSocketAddress getRemoteAddress() {
  745. return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
  746. }
  747. private class CnxnStats implements ServerCnxn.Stats{
  748. long packetsReceived;
  749. long packetsSent;
  750. /**
  751. * The number of requests that have been submitted but not yet responded to.
  752. */
  753. public long getOutstandingRequests() {
  754. return outstandingRequests;
  755. }
  756. public long getPacketsReceived() {
  757. return packetsReceived;
  758. }
  759. public long getPacketsSent() {
  760. return packetsSent;
  761. }
  762. public String toString(){
  763. StringBuilder sb=new StringBuilder();
  764. Channel channel = sk.channel();
  765. if (channel instanceof SocketChannel) {
  766. sb.append(" ").append(((SocketChannel)channel).socket()
  767. .getRemoteSocketAddress())
  768. .append("[").append(Integer.toHexString(sk.interestOps()))
  769. .append("](queued=").append(getOutstandingRequests())
  770. .append(",recved=").append(getPacketsReceived())
  771. .append(",sent=").append(getPacketsSent()).append(")\n");
  772. }
  773. return sb.toString();
  774. }
  775. }
  776. private CnxnStats stats=new CnxnStats();
  777. public Stats getStats() {
  778. return stats;
  779. }
  780. }