ClientCnxn.java 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package org.apache.zookeeper;
  17. import java.io.ByteArrayOutputStream;
  18. import java.io.IOException;
  19. import java.lang.Thread.UncaughtExceptionHandler;
  20. import java.net.InetAddress;
  21. import java.net.InetSocketAddress;
  22. import java.nio.ByteBuffer;
  23. import java.nio.channels.SelectionKey;
  24. import java.nio.channels.Selector;
  25. import java.nio.channels.SocketChannel;
  26. import java.util.ArrayList;
  27. import java.util.Collections;
  28. import java.util.LinkedList;
  29. import java.util.Random;
  30. import java.util.Set;
  31. import java.util.concurrent.LinkedBlockingQueue;
  32. import org.apache.log4j.Logger;
  33. import org.apache.jute.BinaryInputArchive;
  34. import org.apache.jute.BinaryOutputArchive;
  35. import org.apache.jute.Record;
  36. import org.apache.zookeeper.AsyncCallback.ACLCallback;
  37. import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
  38. import org.apache.zookeeper.AsyncCallback.DataCallback;
  39. import org.apache.zookeeper.AsyncCallback.StatCallback;
  40. import org.apache.zookeeper.AsyncCallback.StringCallback;
  41. import org.apache.zookeeper.AsyncCallback.VoidCallback;
  42. import org.apache.zookeeper.Watcher.Event;
  43. import org.apache.zookeeper.ZooDefs.OpCode;
  44. import org.apache.zookeeper.ZooKeeper.States;
  45. import org.apache.zookeeper.ZooKeeper.WatchRegistration;
  46. import org.apache.zookeeper.proto.AuthPacket;
  47. import org.apache.zookeeper.proto.ConnectRequest;
  48. import org.apache.zookeeper.proto.ConnectResponse;
  49. import org.apache.zookeeper.proto.CreateResponse;
  50. import org.apache.zookeeper.proto.ExistsResponse;
  51. import org.apache.zookeeper.proto.GetACLResponse;
  52. import org.apache.zookeeper.proto.GetChildrenResponse;
  53. import org.apache.zookeeper.proto.GetDataResponse;
  54. import org.apache.zookeeper.proto.ReplyHeader;
  55. import org.apache.zookeeper.proto.RequestHeader;
  56. import org.apache.zookeeper.proto.SetACLResponse;
  57. import org.apache.zookeeper.proto.SetDataResponse;
  58. import org.apache.zookeeper.proto.WatcherEvent;
  59. import org.apache.zookeeper.server.ByteBufferInputStream;
  60. import org.apache.zookeeper.server.ZooKeeperServer;
  61. import org.apache.zookeeper.server.ZooTrace;
  62. /**
  63. * This class manages the socket i/o for the client. ClientCnxn maintains a list
  64. * of available servers to connect to and "transparently" switches servers it is
  65. * connected to as needed.
  66. *
  67. */
  68. class ClientCnxn {
  69. private static final Logger LOG = Logger.getLogger(ZooKeeperServer.class);
  70. private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
  71. static class AuthData {
  72. AuthData(String scheme, byte data[]) {
  73. this.scheme = scheme;
  74. this.data = data;
  75. }
  76. String scheme;
  77. byte data[];
  78. }
  79. private ArrayList<AuthData> authInfo = new ArrayList<AuthData>();
  80. /**
  81. * These are the packets that have been sent and are waiting for a response.
  82. */
  83. private LinkedList<Packet> pendingQueue = new LinkedList<Packet>();
  84. private LinkedBlockingQueue waitingEvents = new LinkedBlockingQueue();
  85. /**
  86. * These are the packets that need to be sent.
  87. */
  88. private LinkedList<Packet> outgoingQueue = new LinkedList<Packet>();
  89. private int nextAddrToTry = 0;
  90. private int connectTimeout;
  91. private int readTimeout;
  92. private final int sessionTimeout;
  93. private final ZooKeeper zooKeeper;
  94. private long sessionId;
  95. private byte sessionPasswd[] = new byte[16];
  96. final SendThread sendThread;
  97. final EventThread eventThread;
  98. final Selector selector = Selector.open();
  99. public long getSessionId() {
  100. return sessionId;
  101. }
  102. public byte[] getSessionPasswd() {
  103. return sessionPasswd;
  104. }
  105. public String toString() {
  106. StringBuffer sb = new StringBuffer();
  107. sb.append("sessionId: ").append(sessionId).append("\n");
  108. sb.append("lastZxid: ").append(lastZxid).append("\n");
  109. sb.append("xid: ").append(xid).append("\n");
  110. sb.append("nextAddrToTry: ").append(nextAddrToTry).append("\n");
  111. sb.append("serverAddrs: ").append(serverAddrs.get(nextAddrToTry))
  112. .append("\n");
  113. return sb.toString();
  114. }
  115. /**
  116. * This class allows us to pass the headers and the relevant records around.
  117. */
  118. static class Packet {
  119. RequestHeader header;
  120. ByteBuffer bb;
  121. String path;
  122. ReplyHeader replyHeader;
  123. Record request;
  124. Record response;
  125. boolean finished;
  126. AsyncCallback cb;
  127. Object ctx;
  128. WatchRegistration watchRegistration;
  129. Packet(RequestHeader header, ReplyHeader replyHeader, Record record,
  130. Record response, ByteBuffer bb,
  131. WatchRegistration watchRegistration) {
  132. this.header = header;
  133. this.replyHeader = replyHeader;
  134. this.request = record;
  135. this.response = response;
  136. if (bb != null) {
  137. this.bb = bb;
  138. } else {
  139. try {
  140. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  141. BinaryOutputArchive boa = BinaryOutputArchive
  142. .getArchive(baos);
  143. boa.writeInt(-1, "len"); // We'll fill this in later
  144. header.serialize(boa, "header");
  145. if (record != null) {
  146. record.serialize(boa, "request");
  147. }
  148. baos.close();
  149. this.bb = ByteBuffer.wrap(baos.toByteArray());
  150. this.bb.putInt(this.bb.capacity() - 4);
  151. this.bb.rewind();
  152. } catch (IOException e) {
  153. LOG.warn("Unexpected exception",e);
  154. }
  155. }
  156. this.watchRegistration = watchRegistration;
  157. }
  158. }
  159. public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper)
  160. throws IOException {
  161. this(hosts, sessionTimeout, zooKeeper, 0, new byte[16]);
  162. }
  163. /**
  164. * Creates a connection object. The actual network connect doesn't get
  165. * established until needed.
  166. *
  167. * @param hosts
  168. * a comma separated list of hosts that can be connected to.
  169. * @param sessionTimeout
  170. * the timeout for connections.
  171. * @param zooKeeper
  172. * the zookeeper object that this connection is related to.
  173. * @throws KeeperException
  174. * @throws IOException
  175. */
  176. public ClientCnxn(String hosts, int sessionTimeout, ZooKeeper zooKeeper,
  177. long sessionId, byte[] sessionPasswd) throws IOException {
  178. this.zooKeeper = zooKeeper;
  179. this.sessionId = sessionId;
  180. this.sessionPasswd = sessionPasswd;
  181. String hostsList[] = hosts.split(",");
  182. for (String host : hostsList) {
  183. int port = 2181;
  184. String parts[] = host.split(":");
  185. if (parts.length > 1) {
  186. port = Integer.parseInt(parts[1]);
  187. host = parts[0];
  188. }
  189. InetAddress addrs[] = InetAddress.getAllByName(host);
  190. for (InetAddress addr : addrs) {
  191. serverAddrs.add(new InetSocketAddress(addr, port));
  192. }
  193. }
  194. this.sessionTimeout = sessionTimeout;
  195. connectTimeout = sessionTimeout / hostsList.length;
  196. readTimeout = sessionTimeout * 2 / 3;
  197. Collections.shuffle(serverAddrs);
  198. sendThread = new SendThread();
  199. sendThread.start();
  200. eventThread = new EventThread();
  201. eventThread.start();
  202. }
  203. WatcherEvent eventOfDeath = new WatcherEvent();
  204. final static UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler() {
  205. public void uncaughtException(Thread t, Throwable e) {
  206. LOG.error("from " + t.getName(), e);
  207. }
  208. };
  209. class EventThread extends Thread {
  210. EventThread() {
  211. super("EventThread");
  212. setUncaughtExceptionHandler(uncaughtExceptionHandler);
  213. setDaemon(true);
  214. }
  215. public void run() {
  216. try {
  217. while (true) {
  218. Object event = waitingEvents.take();
  219. if (event == eventOfDeath) {
  220. break;
  221. }
  222. if (event instanceof WatcherEvent) {
  223. zooKeeper.processWatchEvent((WatcherEvent) event);
  224. } else {
  225. Packet p = (Packet) event;
  226. int rc = 0;
  227. String path = p.path;
  228. if (p.replyHeader.getErr() != 0) {
  229. rc = p.replyHeader.getErr();
  230. }
  231. if (p.cb == null) {
  232. LOG.warn("Somehow a null cb got to EventThread!");
  233. } else if (p.response instanceof ExistsResponse
  234. || p.response instanceof SetDataResponse
  235. || p.response instanceof SetACLResponse) {
  236. StatCallback cb = (StatCallback) p.cb;
  237. if (rc == 0) {
  238. if (p.response instanceof ExistsResponse) {
  239. cb.processResult(rc, path, p.ctx,
  240. ((ExistsResponse) p.response)
  241. .getStat());
  242. } else if (p.response instanceof SetDataResponse) {
  243. cb.processResult(rc, path, p.ctx,
  244. ((SetDataResponse) p.response)
  245. .getStat());
  246. } else if (p.response instanceof SetACLResponse) {
  247. cb.processResult(rc, path, p.ctx,
  248. ((SetACLResponse) p.response)
  249. .getStat());
  250. }
  251. } else {
  252. cb.processResult(rc, path, p.ctx, null);
  253. }
  254. } else if (p.response instanceof GetDataResponse) {
  255. DataCallback cb = (DataCallback) p.cb;
  256. GetDataResponse rsp = (GetDataResponse) p.response;
  257. if (rc == 0) {
  258. cb.processResult(rc, path, p.ctx,
  259. rsp.getData(), rsp.getStat());
  260. } else {
  261. cb.processResult(rc, path, p.ctx, null, null);
  262. }
  263. } else if (p.response instanceof GetACLResponse) {
  264. ACLCallback cb = (ACLCallback) p.cb;
  265. GetACLResponse rsp = (GetACLResponse) p.response;
  266. if (rc == 0) {
  267. cb.processResult(rc, path, p.ctx, rsp.getAcl(),
  268. rsp.getStat());
  269. } else {
  270. cb.processResult(rc, path, p.ctx, null, null);
  271. }
  272. } else if (p.response instanceof GetChildrenResponse) {
  273. ChildrenCallback cb = (ChildrenCallback) p.cb;
  274. GetChildrenResponse rsp = (GetChildrenResponse) p.response;
  275. if (rc == 0) {
  276. cb.processResult(rc, path, p.ctx, rsp
  277. .getChildren());
  278. } else {
  279. cb.processResult(rc, path, p.ctx, null);
  280. }
  281. } else if (p.response instanceof CreateResponse) {
  282. StringCallback cb = (StringCallback) p.cb;
  283. CreateResponse rsp = (CreateResponse) p.response;
  284. if (rc == 0) {
  285. cb
  286. .processResult(rc, path, p.ctx, rsp
  287. .getPath());
  288. } else {
  289. cb.processResult(rc, path, p.ctx, null);
  290. }
  291. } else if (p.cb instanceof VoidCallback) {
  292. VoidCallback cb = (VoidCallback) p.cb;
  293. cb.processResult(rc, path, p.ctx);
  294. }
  295. }
  296. }
  297. } catch (InterruptedException e) {
  298. }
  299. }
  300. }
  301. @SuppressWarnings("unchecked")
  302. private void finishPacket(Packet p) {
  303. if (p.watchRegistration != null) {
  304. p.watchRegistration.register(p.replyHeader.getErr());
  305. }
  306. p.finished = true;
  307. if (p.cb == null) {
  308. synchronized (p) {
  309. p.notifyAll();
  310. }
  311. } else {
  312. waitingEvents.add(p);
  313. }
  314. }
  315. private void conLossPacket(Packet p) {
  316. if (p.replyHeader == null) {
  317. return;
  318. }
  319. switch(zooKeeper.state) {
  320. case AUTH_FAILED:
  321. p.replyHeader.setErr(KeeperException.Code.AuthFailed);
  322. break;
  323. case CLOSED:
  324. p.replyHeader.setErr(KeeperException.Code.SessionExpired);
  325. break;
  326. default:
  327. p.replyHeader.setErr(KeeperException.Code.ConnectionLoss);
  328. }
  329. finishPacket(p);
  330. }
  331. long lastZxid;
  332. /**
  333. * This class services the outgoing request queue and generates the heart
  334. * beats. It also spawns the ReadThread.
  335. */
  336. class SendThread extends Thread {
  337. SelectionKey sockKey;
  338. ByteBuffer lenBuffer = ByteBuffer.allocateDirect(4);
  339. ByteBuffer incomingBuffer = lenBuffer;
  340. boolean initialized;
  341. void readLength() throws IOException {
  342. int len = incomingBuffer.getInt();
  343. if (len < 0 || len >= 4096 * 1024) {
  344. throw new IOException("Packet len" + len + " is out of range!");
  345. }
  346. incomingBuffer = ByteBuffer.allocate(len);
  347. }
  348. void readConnectResult() throws IOException {
  349. ByteBufferInputStream bbis = new ByteBufferInputStream(
  350. incomingBuffer);
  351. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  352. ConnectResponse conRsp = new ConnectResponse();
  353. conRsp.deserialize(bbia, "connect");
  354. int sessionTimeout = conRsp.getTimeOut();
  355. if (sessionTimeout <= 0) {
  356. zooKeeper.state = States.CLOSED;
  357. waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone,
  358. Watcher.Event.KeeperStateExpired, null));
  359. throw new IOException("Session Expired");
  360. }
  361. readTimeout = sessionTimeout * 2 / 3;
  362. connectTimeout = sessionTimeout / serverAddrs.size();
  363. sessionId = conRsp.getSessionId();
  364. sessionPasswd = conRsp.getPasswd();
  365. waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone,
  366. Watcher.Event.KeeperStateSyncConnected, null));
  367. }
  368. @SuppressWarnings("unchecked")
  369. void readResponse() throws IOException {
  370. ByteBufferInputStream bbis = new ByteBufferInputStream(
  371. incomingBuffer);
  372. BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
  373. ReplyHeader r = new ReplyHeader();
  374. r.deserialize(bbia, "header");
  375. if (r.getXid() == -2) {
  376. // -2 is the xid for pings
  377. return;
  378. }
  379. if (r.getXid() == -4) {
  380. // -2 is the xid for AuthPacket
  381. // TODO: process AuthPacket here
  382. return;
  383. }
  384. if (r.getXid() == -1) {
  385. // -1 means notification
  386. WatcherEvent event = new WatcherEvent();
  387. event.deserialize(bbia, "response");
  388. // System.out.println("Got an event: " + event + " for " +
  389. // sessionId + " through" + _cnxn);
  390. waitingEvents.add(event);
  391. return;
  392. }
  393. if (pendingQueue.size() == 0) {
  394. throw new IOException("Nothing in the queue, but got "
  395. + r.getXid());
  396. }
  397. Packet p = null;
  398. synchronized (pendingQueue) {
  399. p = pendingQueue.remove();
  400. }
  401. /*
  402. * Since requests are processed in order, we better get a response
  403. * to the first request!
  404. */
  405. if (p.header.getXid() != r.getXid()) {
  406. throw new IOException("Xid out of order. Got " + r.getXid()
  407. + " expected " + p.header.getXid());
  408. }
  409. p.replyHeader.setXid(r.getXid());
  410. p.replyHeader.setErr(r.getErr());
  411. p.replyHeader.setZxid(r.getZxid());
  412. lastZxid = r.getZxid();
  413. if (p.response != null && r.getErr() == 0) {
  414. p.response.deserialize(bbia, "response");
  415. }
  416. p.finished = true;
  417. finishPacket(p);
  418. }
  419. /**
  420. * @return true if a packet was received
  421. * @throws InterruptedException
  422. * @throws IOException
  423. */
  424. boolean doIO() throws InterruptedException, IOException {
  425. boolean packetReceived = false;
  426. SocketChannel sock = (SocketChannel) sockKey.channel();
  427. if (sock == null) {
  428. throw new IOException("Socket is null!");
  429. }
  430. if (sockKey.isReadable()) {
  431. int rc = sock.read(incomingBuffer);
  432. if (rc < 0) {
  433. throw new IOException("Read error rc = " + rc + " "
  434. + incomingBuffer);
  435. }
  436. if (incomingBuffer.remaining() == 0) {
  437. incomingBuffer.flip();
  438. if (incomingBuffer == lenBuffer) {
  439. readLength();
  440. } else if (!initialized) {
  441. readConnectResult();
  442. enableRead();
  443. if (outgoingQueue.size() > 0) {
  444. enableWrite();
  445. }
  446. lenBuffer.clear();
  447. incomingBuffer = lenBuffer;
  448. packetReceived = true;
  449. initialized = true;
  450. } else {
  451. readResponse();
  452. lenBuffer.clear();
  453. incomingBuffer = lenBuffer;
  454. packetReceived = true;
  455. }
  456. }
  457. }
  458. if (sockKey.isWritable()) {
  459. synchronized (outgoingQueue) {
  460. if (outgoingQueue.size() > 0) {
  461. int rc = sock.write(outgoingQueue.getFirst().bb);
  462. if (outgoingQueue.getFirst().bb.remaining() == 0) {
  463. Packet p = outgoingQueue.removeFirst();
  464. if (p.header != null
  465. && p.header.getType() != OpCode.ping
  466. && p.header.getType() != OpCode.auth) {
  467. pendingQueue.add(p);
  468. }
  469. }
  470. }
  471. }
  472. }
  473. if (outgoingQueue.size() == 0) {
  474. disableWrite();
  475. } else {
  476. enableWrite();
  477. }
  478. return packetReceived;
  479. }
  480. synchronized private void enableWrite() {
  481. int i = sockKey.interestOps();
  482. if ((i & SelectionKey.OP_WRITE) == 0) {
  483. sockKey.interestOps(i | SelectionKey.OP_WRITE);
  484. }
  485. }
  486. synchronized private void disableWrite() {
  487. int i = sockKey.interestOps();
  488. if ((i & SelectionKey.OP_WRITE) != 0) {
  489. sockKey.interestOps(i & (~SelectionKey.OP_WRITE));
  490. }
  491. }
  492. synchronized private void enableRead() {
  493. int i = sockKey.interestOps();
  494. if ((i & SelectionKey.OP_READ) == 0) {
  495. sockKey.interestOps(i | SelectionKey.OP_READ);
  496. }
  497. }
  498. synchronized private void disableRead() {
  499. int i = sockKey.interestOps();
  500. if ((i & SelectionKey.OP_READ) != 0) {
  501. sockKey.interestOps(i & (~SelectionKey.OP_READ));
  502. }
  503. }
  504. SendThread() {
  505. super("SendThread");
  506. zooKeeper.state = States.CONNECTING;
  507. setUncaughtExceptionHandler(uncaughtExceptionHandler);
  508. setDaemon(true);
  509. }
  510. private void primeConnection(SelectionKey k) throws IOException {
  511. LOG.info("Priming connection to "
  512. + ((SocketChannel) sockKey.channel()));
  513. lastConnectIndex = currentConnectIndex;
  514. ConnectRequest conReq = new ConnectRequest(0, lastZxid,
  515. sessionTimeout, sessionId, sessionPasswd);
  516. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  517. BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
  518. boa.writeInt(-1, "len");
  519. conReq.serialize(boa, "connect");
  520. baos.close();
  521. ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
  522. bb.putInt(bb.capacity() - 4);
  523. bb.rewind();
  524. synchronized (outgoingQueue) {
  525. for (AuthData id : authInfo) {
  526. outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
  527. OpCode.auth), null, new AuthPacket(0, id.scheme,
  528. id.data), null, null, null));
  529. }
  530. outgoingQueue.addFirst((new Packet(null, null, null, null, bb,
  531. null)));
  532. }
  533. synchronized (this) {
  534. k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
  535. }
  536. }
  537. private void sendPing() {
  538. RequestHeader h = new RequestHeader(-2, OpCode.ping);
  539. queuePacket(h, null, null, null, null, null, null, null);
  540. }
  541. int lastConnectIndex = -1;
  542. int currentConnectIndex;
  543. Random r = new Random(System.nanoTime());
  544. private void startConnect() throws IOException {
  545. if (lastConnectIndex == -1) {
  546. // We don't want to delay the first try at a connect, so we
  547. // start with -1 the first time around
  548. lastConnectIndex = 0;
  549. } else {
  550. try {
  551. Thread.sleep(r.nextInt(1000));
  552. } catch (InterruptedException e1) {
  553. LOG.warn("Unexpected exception", e1);
  554. }
  555. if (nextAddrToTry == lastConnectIndex) {
  556. try {
  557. // Try not to spin too fast!
  558. Thread.sleep(1000);
  559. } catch (InterruptedException e) {
  560. LOG.warn("Unexpected exception", e);
  561. }
  562. }
  563. }
  564. zooKeeper.state = States.CONNECTING;
  565. currentConnectIndex = nextAddrToTry;
  566. InetSocketAddress addr = serverAddrs.get(nextAddrToTry);
  567. nextAddrToTry++;
  568. if (nextAddrToTry == serverAddrs.size()) {
  569. nextAddrToTry = 0;
  570. }
  571. SocketChannel sock;
  572. sock = SocketChannel.open();
  573. sock.configureBlocking(false);
  574. sock.socket().setSoLinger(false, -1);
  575. sock.socket().setTcpNoDelay(true);
  576. LOG.info("Attempting connection to server " + addr);
  577. sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
  578. if (sock.connect(addr)) {
  579. primeConnection(sockKey);
  580. }
  581. initialized = false;
  582. }
  583. @Override
  584. public void run() {
  585. long now = System.currentTimeMillis();
  586. long lastHeard = now;
  587. long lastSend = now;
  588. while (zooKeeper.state.isAlive()) {
  589. try {
  590. if (sockKey == null) {
  591. startConnect();
  592. lastSend = now;
  593. lastHeard = now;
  594. }
  595. int idleRecv = (int) (now - lastHeard);
  596. int idleSend = (int) (now - lastSend);
  597. int to = readTimeout - idleRecv;
  598. if (zooKeeper.state != States.CONNECTED) {
  599. to = connectTimeout - idleRecv;
  600. }
  601. if (to <= 0) {
  602. throw new IOException("TIMED OUT");
  603. }
  604. if (zooKeeper.state == States.CONNECTED) {
  605. int timeToNextPing = readTimeout/2 - idleSend;
  606. if (timeToNextPing <= 0) {
  607. sendPing();
  608. lastSend = now;
  609. enableWrite();
  610. } else {
  611. if (timeToNextPing < to) {
  612. to = timeToNextPing;
  613. }
  614. }
  615. }
  616. selector.select(to);
  617. Set<SelectionKey> selected;
  618. synchronized (this) {
  619. selected = selector.selectedKeys();
  620. }
  621. // Everything below and until we get back to the select is
  622. // non blocking, so time is effectively a constant. That is
  623. // Why we just have to do this once, here
  624. now = System.currentTimeMillis();
  625. for (SelectionKey k : selected) {
  626. SocketChannel sc = ((SocketChannel) k.channel());
  627. if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
  628. if (sc.finishConnect()) {
  629. zooKeeper.state = States.CONNECTED;
  630. lastHeard = now;
  631. lastSend = now;
  632. primeConnection(k);
  633. LOG.info("Server connection successful");
  634. }
  635. } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
  636. if (outgoingQueue.size() > 0) {
  637. // We have something to send so it's the same
  638. // as if we do the send now.
  639. lastSend = now;
  640. }
  641. if (doIO()) {
  642. lastHeard = now;
  643. }
  644. }
  645. }
  646. if (zooKeeper.state == States.CONNECTED) {
  647. if (outgoingQueue.size() > 0) {
  648. enableWrite();
  649. } else {
  650. disableWrite();
  651. }
  652. }
  653. selected.clear();
  654. } catch (Exception e) {
  655. LOG.warn("Closing: ", e);
  656. cleanup();
  657. if (zooKeeper.state.isAlive()) {
  658. waitingEvents.add(new WatcherEvent(Event.EventNone,
  659. Event.KeeperStateDisconnected, null));
  660. }
  661. now = System.currentTimeMillis();
  662. lastHeard = now;
  663. lastSend = now;
  664. }
  665. }
  666. cleanup();
  667. ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
  668. "SendThread exitedloop.");
  669. }
  670. private void cleanup() {
  671. if (sockKey != null) {
  672. SocketChannel sock = (SocketChannel) sockKey.channel();
  673. sockKey.cancel();
  674. try {
  675. sock.socket().shutdownInput();
  676. } catch (IOException e2) {
  677. }
  678. try {
  679. sock.socket().shutdownOutput();
  680. } catch (IOException e2) {
  681. }
  682. try {
  683. sock.socket().close();
  684. } catch (IOException e1) {
  685. }
  686. try {
  687. sock.close();
  688. } catch (IOException e1) {
  689. }
  690. }
  691. try {
  692. Thread.sleep(100);
  693. } catch (InterruptedException e1) {
  694. e1.printStackTrace();
  695. }
  696. sockKey = null;
  697. synchronized (pendingQueue) {
  698. for (Packet p : pendingQueue) {
  699. conLossPacket(p);
  700. }
  701. pendingQueue.clear();
  702. }
  703. synchronized (outgoingQueue) {
  704. for (Packet p : outgoingQueue) {
  705. conLossPacket(p);
  706. }
  707. outgoingQueue.clear();
  708. }
  709. }
  710. public void close() {
  711. zooKeeper.state = States.CLOSED;
  712. synchronized (this) {
  713. selector.wakeup();
  714. }
  715. }
  716. }
  717. @SuppressWarnings("unchecked")
  718. public void close() throws IOException {
  719. long traceMask = ZooTrace.SESSION_TRACE_MASK;
  720. if (ZooTrace.isTraceEnabled(LOG, traceMask)) {
  721. ZooTrace.logTraceMessage(LOG, traceMask,
  722. "Close ClientCnxn for session: " + sessionId + "!");
  723. }
  724. sendThread.close();
  725. waitingEvents.add(eventOfDeath);
  726. }
  727. private int xid = 1;
  728. synchronized private int getXid() {
  729. return xid++;
  730. }
  731. public ReplyHeader submitRequest(RequestHeader h, Record request,
  732. Record response,
  733. WatchRegistration watchRegistration)
  734. throws InterruptedException
  735. {
  736. ReplyHeader r = new ReplyHeader();
  737. Packet packet =
  738. queuePacket(h, r, request, response, null, null, null,
  739. watchRegistration);
  740. synchronized (packet) {
  741. while (!packet.finished) {
  742. packet.wait();
  743. }
  744. }
  745. return r;
  746. }
  747. Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
  748. Record response, AsyncCallback cb, String path, Object ctx,
  749. WatchRegistration watchRegistration)
  750. {
  751. Packet packet = null;
  752. synchronized (outgoingQueue) {
  753. if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) {
  754. h.setXid(getXid());
  755. }
  756. packet = new Packet(h, r, request, response, null,
  757. watchRegistration);
  758. packet.cb = cb;
  759. packet.ctx = ctx;
  760. packet.path = path;
  761. if (!zooKeeper.state.isAlive()) {
  762. conLossPacket(packet);
  763. } else {
  764. outgoingQueue.add(packet);
  765. }
  766. }
  767. synchronized (sendThread) {
  768. selector.wakeup();
  769. }
  770. return packet;
  771. }
  772. public void addAuthInfo(String scheme, byte auth[]) {
  773. authInfo.add(new AuthData(scheme, auth));
  774. if (zooKeeper.state == States.CONNECTED) {
  775. queuePacket(new RequestHeader(-4, OpCode.auth), null,
  776. new AuthPacket(0, scheme, auth), null, null, null, null,
  777. null);
  778. }
  779. }
  780. }