FollowerHandler.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. package com.yahoo.zookeeper.server.quorum;
  17. import java.io.BufferedInputStream;
  18. import java.io.BufferedOutputStream;
  19. import java.io.ByteArrayInputStream;
  20. import java.io.ByteArrayOutputStream;
  21. import java.io.DataInputStream;
  22. import java.io.DataOutputStream;
  23. import java.io.IOException;
  24. import java.net.Socket;
  25. import java.nio.ByteBuffer;
  26. import java.util.concurrent.LinkedBlockingQueue;
  27. import org.apache.log4j.Logger;
  28. import com.yahoo.jute.BinaryInputArchive;
  29. import com.yahoo.jute.BinaryOutputArchive;
  30. import com.yahoo.jute.Record;
  31. import com.yahoo.zookeeper.ZooDefs.OpCode;
  32. import com.yahoo.zookeeper.server.ZooKeeperServer;
  33. import com.yahoo.zookeeper.server.ZooTrace;
  34. import com.yahoo.zookeeper.server.quorum.Leader.Proposal;
  35. import com.yahoo.zookeeper.txn.TxnHeader;
  36. /**
  37. * There will be an instance of this class created by the Leader for each
  38. * follower.All communication for a given Follower will be handled by this
  39. * class.
  40. */
  41. public class FollowerHandler extends Thread {
  42. private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
  43. public Socket s;
  44. Leader leader;
  45. long tickOfLastAck;
  46. /**
  47. * The packets to be sent to the follower
  48. */
  49. LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
  50. private BinaryInputArchive ia;
  51. private BinaryOutputArchive oa;
  52. private BufferedOutputStream bufferedOutput;
  53. FollowerHandler(Socket s, Leader leader) throws IOException {
  54. super("FollowerHandler-" + s.getRemoteSocketAddress());
  55. this.s = s;
  56. this.leader = leader;
  57. leader.addFollowerHandler(this);
  58. start();
  59. }
  60. /**
  61. * If this packet is queued, the sender thread will exit
  62. */
  63. QuorumPacket proposalOfDeath = new QuorumPacket();
  64. /**
  65. * This method will use the thread to send packets added to the
  66. * queuedPackets list
  67. *
  68. * @throws InterruptedException
  69. */
  70. private void sendPackets() throws InterruptedException {
  71. long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  72. while (true) {
  73. QuorumPacket p;
  74. p = queuedPackets.take();
  75. if (p == proposalOfDeath) {
  76. // Packet of death!
  77. break;
  78. }
  79. if (p.getType() == Leader.PING) {
  80. traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  81. }
  82. ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
  83. try {
  84. oa.writeRecord(p, "packet");
  85. bufferedOutput.flush();
  86. } catch (IOException e) {
  87. if (!s.isClosed()) {
  88. LOG.warn("Unexpected exception",e);
  89. }
  90. break;
  91. }
  92. }
  93. }
  94. static public String packetToString(QuorumPacket p) {
  95. if (true)
  96. return null;
  97. String type = null;
  98. String mess = null;
  99. Record txn = null;
  100. switch (p.getType()) {
  101. case Leader.ACK:
  102. type = "ACK";
  103. break;
  104. case Leader.COMMIT:
  105. type = "COMMIT";
  106. break;
  107. case Leader.LASTZXID:
  108. type = "LASTZXID";
  109. break;
  110. case Leader.NEWLEADER:
  111. type = "NEWLEADER";
  112. break;
  113. case Leader.PING:
  114. type = "PING";
  115. break;
  116. case Leader.PROPOSAL:
  117. type = "PROPOSAL";
  118. BinaryInputArchive ia = BinaryInputArchive
  119. .getArchive(new ByteArrayInputStream(p.getData()));
  120. TxnHeader hdr = new TxnHeader();
  121. try {
  122. txn = ZooKeeperServer.deserializeTxn(ia, hdr);
  123. // mess = "transaction: " + txn.toString();
  124. } catch (IOException e) {
  125. LOG.warn("Unexpected exception",e);
  126. }
  127. break;
  128. case Leader.REQUEST:
  129. type = "REQUEST";
  130. break;
  131. case Leader.REVALIDATE:
  132. type = "REVALIDATE";
  133. ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
  134. DataInputStream dis = new DataInputStream(bis);
  135. try {
  136. long id = dis.readLong();
  137. mess = " sessionid = " + id;
  138. } catch (IOException e) {
  139. LOG.warn("Unexpected exception", e);
  140. }
  141. break;
  142. case Leader.UPTODATE:
  143. type = "UPTODATE";
  144. break;
  145. default:
  146. type = "UNKNOWN" + p.getType();
  147. }
  148. String entry = null;
  149. if (type != null) {
  150. entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
  151. }
  152. return entry;
  153. }
  154. /**
  155. * This thread will receive packets from the follower and process them and
  156. * also listen to new connections from new followers.
  157. */
  158. public void run() {
  159. try {
  160. ia = BinaryInputArchive.getArchive(new BufferedInputStream(s
  161. .getInputStream()));
  162. bufferedOutput = new BufferedOutputStream(s.getOutputStream());
  163. oa = BinaryOutputArchive.getArchive(bufferedOutput);
  164. QuorumPacket qp = new QuorumPacket();
  165. ia.readRecord(qp, "packet");
  166. if (qp.getType() != Leader.LASTZXID) {
  167. LOG.error("First packet " + qp.toString()
  168. + " is not LASTZXID!");
  169. return;
  170. }
  171. long peerLastZxid = qp.getZxid();
  172. int packetToSend = Leader.SNAP;
  173. boolean logTxns = true;
  174. long zxidToSend = 0;
  175. // we are sending the diff
  176. synchronized(leader.zk.committedLog) {
  177. if (leader.zk.committedLog.size() != 0) {
  178. if ((leader.zk.maxCommittedLog >= peerLastZxid)
  179. && (leader.zk.minCommittedLog <= peerLastZxid)) {
  180. packetToSend = Leader.DIFF;
  181. zxidToSend = leader.zk.maxCommittedLog;
  182. for (Proposal propose: leader.zk.committedLog) {
  183. if (propose.packet.getZxid() > peerLastZxid) {
  184. queuePacket(propose.packet);
  185. QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
  186. null, null);
  187. queuePacket(qcommit);
  188. }
  189. }
  190. }
  191. }
  192. else {
  193. logTxns = false;
  194. } }
  195. long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
  196. QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
  197. leaderLastZxid, null, null);
  198. oa.writeRecord(newLeaderQP, "packet");
  199. bufferedOutput.flush();
  200. // a special case when both the ids are the same
  201. if (peerLastZxid == leaderLastZxid) {
  202. packetToSend = Leader.DIFF;
  203. zxidToSend = leaderLastZxid;
  204. }
  205. //check if we decided to send a diff or we need to send a truncate
  206. // we avoid using epochs for truncating because epochs make things
  207. // complicated. Two epochs might have the last 32 bits as same.
  208. // only if we know that there is a committed zxid in the queue that
  209. // is less than the one the peer has we send a trunc else to make
  210. // things simple we just send sanpshot.
  211. if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
  212. // this is the only case that we are sure that
  213. // we can ask the follower to truncate the log
  214. packetToSend = Leader.TRUNC;
  215. zxidToSend = leader.zk.maxCommittedLog;
  216. }
  217. oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
  218. bufferedOutput.flush();
  219. // only if we are not truncating or fast sycning
  220. if (packetToSend == Leader.SNAP) {
  221. LOG.warn("Sending snapshot last zxid of peer is "
  222. + Long.toHexString(peerLastZxid) + " " + " zxid of leader is "
  223. + Long.toHexString(leaderLastZxid));
  224. // Dump data to follower
  225. leader.zk.snapshot(oa);
  226. oa.writeString("BenWasHere", "signature");
  227. }
  228. bufferedOutput.flush();
  229. //
  230. // Mutation packets will be queued during the serialize,
  231. // so we need to mark when the follower can actually start
  232. // using the data
  233. //
  234. queuedPackets
  235. .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
  236. // Start sending packets
  237. new Thread() {
  238. public void run() {
  239. Thread.currentThread().setName(
  240. "Sender-" + s.getRemoteSocketAddress());
  241. try {
  242. sendPackets();
  243. } catch (InterruptedException e) {
  244. LOG.warn("Interrupted",e);
  245. }
  246. }
  247. }.start();
  248. while (true) {
  249. qp = new QuorumPacket();
  250. ia.readRecord(qp, "packet");
  251. long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  252. if (qp.getType() == Leader.PING) {
  253. traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  254. }
  255. ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
  256. tickOfLastAck = leader.self.tick;
  257. ByteBuffer bb;
  258. long sessionId;
  259. int cxid;
  260. int type;
  261. switch (qp.getType()) {
  262. case Leader.ACK:
  263. leader.processAck(qp.getZxid(), s.getLocalSocketAddress());
  264. break;
  265. case Leader.PING:
  266. // Process the touches
  267. ByteArrayInputStream bis = new ByteArrayInputStream(qp
  268. .getData());
  269. DataInputStream dis = new DataInputStream(bis);
  270. while (dis.available() > 0) {
  271. long sess = dis.readLong();
  272. int to = dis.readInt();
  273. leader.zk.touch(sess, to);
  274. }
  275. break;
  276. case Leader.REVALIDATE:
  277. bis = new ByteArrayInputStream(qp.getData());
  278. dis = new DataInputStream(bis);
  279. long id = dis.readLong();
  280. int to = dis.readInt();
  281. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  282. DataOutputStream dos = new DataOutputStream(bos);
  283. dos.writeLong(id);
  284. boolean valid = leader.zk.touch(id, to);
  285. ZooTrace.logTraceMessage(LOG,
  286. ZooTrace.SESSION_TRACE_MASK,
  287. "Session " + Long.toHexString(id)
  288. + " is valid: "+ valid);
  289. dos.writeBoolean(valid);
  290. qp.setData(bos.toByteArray());
  291. queuedPackets.add(qp);
  292. break;
  293. case Leader.REQUEST:
  294. bb = ByteBuffer.wrap(qp.getData());
  295. sessionId = bb.getLong();
  296. cxid = bb.getInt();
  297. type = bb.getInt();
  298. bb = bb.slice();
  299. if(type == OpCode.sync){
  300. leader.setSyncHandler(this, sessionId);
  301. }
  302. leader.zk.submitRequest(null, sessionId, type, cxid, bb,
  303. qp.getAuthinfo());
  304. break;
  305. default:
  306. }
  307. }
  308. } catch (IOException e) {
  309. if (s != null && !s.isClosed()) {
  310. LOG.error("FIXMSG",e);
  311. }
  312. } catch (InterruptedException e) {
  313. LOG.error("FIXMSG",e);
  314. } finally {
  315. LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress()
  316. + " ********");
  317. // Send the packet of death
  318. try {
  319. queuedPackets.put(proposalOfDeath);
  320. } catch (InterruptedException e) {
  321. LOG.error("FIXMSG",e);
  322. }
  323. shutdown();
  324. }
  325. }
  326. public void shutdown() {
  327. try {
  328. if (s != null && !s.isClosed()) {
  329. s.close();
  330. }
  331. } catch (IOException e) {
  332. LOG.error("FIXMSG",e);
  333. }
  334. leader.removeFollowerHandler(this);
  335. }
  336. public long tickOfLastAck() {
  337. return tickOfLastAck;
  338. }
  339. /**
  340. * ping calls from the leader to the followers
  341. */
  342. public void ping() {
  343. QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed,
  344. null, null);
  345. queuePacket(ping);
  346. }
  347. void queuePacket(QuorumPacket p) {
  348. queuedPackets.add(p);
  349. }
  350. public boolean synced() {
  351. return isAlive()
  352. && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
  353. }
  354. }