Follower.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. /*
  2. * Copyright 2008, Yahoo! Inc.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License");
  5. * you may not use this file except in compliance with the License.
  6. * You may obtain a copy of the License at
  7. *
  8. * http://www.apache.org/licenses/LICENSE-2.0
  9. *
  10. * Unless required by applicable law or agreed to in writing, software
  11. * distributed under the License is distributed on an "AS IS" BASIS,
  12. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. * See the License for the specific language governing permissions and
  14. * limitations under the License.
  15. */
  16. /**
  17. * This class has the control logic for the Follower.
  18. */
  19. package com.yahoo.zookeeper.server.quorum;
  20. import java.io.BufferedInputStream;
  21. import java.io.BufferedOutputStream;
  22. import java.io.ByteArrayInputStream;
  23. import java.io.ByteArrayOutputStream;
  24. import java.io.DataInputStream;
  25. import java.io.DataOutputStream;
  26. import java.io.IOException;
  27. import java.net.ConnectException;
  28. import java.net.InetSocketAddress;
  29. import java.net.Socket;
  30. import java.util.HashMap;
  31. import java.util.Map.Entry;
  32. import java.util.concurrent.ConcurrentHashMap;
  33. import org.apache.log4j.Logger;
  34. import com.yahoo.jute.BinaryInputArchive;
  35. import com.yahoo.jute.BinaryOutputArchive;
  36. import com.yahoo.jute.InputArchive;
  37. import com.yahoo.jute.OutputArchive;
  38. import com.yahoo.jute.Record;
  39. import com.yahoo.zookeeper.server.Request;
  40. import com.yahoo.zookeeper.server.ServerCnxn;
  41. import com.yahoo.zookeeper.server.ZooKeeperServer;
  42. import com.yahoo.zookeeper.server.ZooTrace;
  43. import com.yahoo.zookeeper.server.quorum.QuorumPeer.QuorumServer;
  44. import com.yahoo.zookeeper.txn.TxnHeader;
  45. public class Follower {
  46. private static final Logger LOG = Logger.getLogger(Follower.class);
  47. QuorumPeer self;
  48. FollowerZooKeeperServer zk;
  49. Follower(QuorumPeer self,FollowerZooKeeperServer zk) {
  50. this.self = self;
  51. this.zk=zk;
  52. }
  53. private InputArchive leaderIs;
  54. private OutputArchive leaderOs;
  55. private BufferedOutputStream bufferedOutput;
  56. public Socket sock;
  57. /**
  58. * write a packet to the leader
  59. *
  60. * @param pp
  61. * the proposal packet to be sent to the leader
  62. * @throws IOException
  63. */
  64. void writePacket(QuorumPacket pp) throws IOException {
  65. long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  66. if (pp.getType() == Leader.PING) {
  67. traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  68. }
  69. ZooTrace.logQuorumPacket(LOG, traceMask, 'o', pp);
  70. synchronized (leaderOs) {
  71. leaderOs.writeRecord(pp, "packet");
  72. bufferedOutput.flush();
  73. }
  74. }
  75. /**
  76. * read a packet from the leader
  77. *
  78. * @param pp
  79. * the packet to be instantiated
  80. * @throws IOException
  81. */
  82. void readPacket(QuorumPacket pp) throws IOException {
  83. synchronized (leaderIs) {
  84. leaderIs.readRecord(pp, "packet");
  85. }
  86. long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
  87. if (pp.getType() == Leader.PING) {
  88. traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
  89. }
  90. ZooTrace.logQuorumPacket(LOG, traceMask, 'i', pp);
  91. }
  92. /**
  93. * the main method called by the follower to follow the leader
  94. *
  95. * @throws InterruptedException
  96. */
  97. void followLeader() throws InterruptedException {
  98. InetSocketAddress addr = null;
  99. // Find the leader by id
  100. for (QuorumServer s : self.quorumPeers) {
  101. if (s.id == self.currentVote.id) {
  102. addr = s.addr;
  103. break;
  104. }
  105. }
  106. if (addr == null) {
  107. LOG.warn("Couldn't find the leader with id = "
  108. + self.currentVote.id);
  109. }
  110. LOG.info("Following " + addr);
  111. sock = new Socket();
  112. try {
  113. QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
  114. sock.setSoTimeout(self.tickTime * self.initLimit);
  115. for (int tries = 0; tries < 5; tries++) {
  116. try {
  117. //sock = new Socket();
  118. //sock.setSoTimeout(self.tickTime * self.initLimit);
  119. sock.connect(addr, self.tickTime * self.syncLimit);
  120. sock.setTcpNoDelay(true);
  121. break;
  122. } catch (ConnectException e) {
  123. if (tries == 4) {
  124. LOG.error("Unexpected exception",e);
  125. throw e;
  126. } else {
  127. LOG.warn("Unexpected exception",e);
  128. sock = new Socket();
  129. sock.setSoTimeout(self.tickTime * self.initLimit);
  130. }
  131. }
  132. Thread.sleep(1000);
  133. }
  134. leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
  135. sock.getInputStream()));
  136. bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
  137. leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
  138. QuorumPacket qp = new QuorumPacket();
  139. qp.setType(Leader.LASTZXID);
  140. long sentLastZxid = self.getLastLoggedZxid();
  141. qp.setZxid(sentLastZxid);
  142. writePacket(qp);
  143. readPacket(qp);
  144. long newLeaderZxid = qp.getZxid();
  145. if (qp.getType() != Leader.NEWLEADER) {
  146. LOG.error("First packet should have been NEWLEADER");
  147. throw new IOException("First packet should have been NEWLEADER");
  148. }
  149. readPacket(qp);
  150. synchronized (zk) {
  151. if (qp.getType() == Leader.DIFF) {
  152. LOG.info("Getting a diff from the leader!");
  153. zk.loadData();
  154. }
  155. else if (qp.getType() == Leader.SNAP) {
  156. LOG.info("Getting a snapshot from leader");
  157. // The leader is going to dump the database
  158. zk.loadData(leaderIs);
  159. String signature = leaderIs.readString("signature");
  160. if (!signature.equals("BenWasHere")) {
  161. LOG.error("Missing signature. Got " + signature);
  162. throw new IOException("Missing signature");
  163. }
  164. } else if (qp.getType() == Leader.TRUNC) {
  165. //we need to truncate the log to the lastzxid of the leader
  166. LOG.warn("Truncating log to get in sync with the leader "
  167. + Long.toHexString(qp.getZxid()));
  168. zk.truncateLog(qp.getZxid());
  169. zk.loadData();
  170. }
  171. else {
  172. LOG.error("Got unexpected packet from leader "
  173. + qp.getType() + " exiting ... " );
  174. System.exit(13);
  175. }
  176. zk.dataTree.lastProcessedZxid = newLeaderZxid;
  177. }
  178. ack.setZxid(newLeaderZxid & ~0xffffffffL);
  179. writePacket(ack);
  180. sock.setSoTimeout(self.tickTime * self.syncLimit);
  181. zk.startup();
  182. while (self.running) {
  183. readPacket(qp);
  184. switch (qp.getType()) {
  185. case Leader.PING:
  186. // Send back the ping with our session data
  187. ByteArrayOutputStream bos = new ByteArrayOutputStream();
  188. DataOutputStream dos = new DataOutputStream(bos);
  189. HashMap<Long, Integer> touchTable = ((FollowerZooKeeperServer) zk)
  190. .getTouchSnapshot();
  191. for (Entry<Long, Integer> entry : touchTable.entrySet()) {
  192. dos.writeLong(entry.getKey());
  193. dos.writeInt(entry.getValue());
  194. }
  195. qp.setData(bos.toByteArray());
  196. writePacket(qp);
  197. break;
  198. case Leader.PROPOSAL:
  199. TxnHeader hdr = new TxnHeader();
  200. BinaryInputArchive ia = BinaryInputArchive
  201. .getArchive(new ByteArrayInputStream(qp.getData()));
  202. Record txn = ZooKeeperServer.deserializeTxn(ia, hdr);
  203. if (hdr.getZxid() != lastQueued + 1) {
  204. LOG.warn("Got zxid "
  205. + Long.toHexString(hdr.getZxid())
  206. + " expected "
  207. + Long.toHexString(lastQueued + 1));
  208. }
  209. lastQueued = hdr.getZxid();
  210. zk.logRequest(hdr, txn);
  211. break;
  212. case Leader.COMMIT:
  213. zk.commit(qp.getZxid());
  214. break;
  215. case Leader.UPTODATE:
  216. zk.snapshot();
  217. self.cnxnFactory.setZooKeeperServer(zk);
  218. break;
  219. case Leader.REVALIDATE:
  220. ByteArrayInputStream bis = new ByteArrayInputStream(qp
  221. .getData());
  222. DataInputStream dis = new DataInputStream(bis);
  223. long sessionId = dis.readLong();
  224. boolean valid = dis.readBoolean();
  225. synchronized (pendingRevalidations) {
  226. ServerCnxn cnxn = pendingRevalidations
  227. .remove(sessionId);
  228. if (cnxn == null) {
  229. LOG.warn("Missing "
  230. + Long.toHexString(sessionId)
  231. + " for validation");
  232. } else {
  233. cnxn.finishSessionInit(valid);
  234. }
  235. }
  236. ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
  237. "Session " + sessionId
  238. + " is valid: " + valid);
  239. break;
  240. case Leader.SYNC:
  241. zk.sync();
  242. break;
  243. }
  244. }
  245. } catch (IOException e) {
  246. e.printStackTrace();
  247. try {
  248. sock.close();
  249. } catch (IOException e1) {
  250. e1.printStackTrace();
  251. }
  252. synchronized (pendingRevalidations) {
  253. // clear pending revalitions
  254. pendingRevalidations.clear();
  255. pendingRevalidations.notifyAll();
  256. }
  257. }
  258. }
  259. private long lastQueued;
  260. ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
  261. /**
  262. * validate a seesion for a client
  263. *
  264. * @param clientId
  265. * the client to be revailidated
  266. * @param timeout
  267. * the timeout for which the session is valid
  268. * @return
  269. * @throws IOException
  270. * @throws InterruptedException
  271. */
  272. void validateSession(ServerCnxn cnxn, long clientId, int timeout)
  273. throws IOException, InterruptedException {
  274. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  275. DataOutputStream dos = new DataOutputStream(baos);
  276. dos.writeLong(clientId);
  277. dos.writeInt(timeout);
  278. dos.close();
  279. QuorumPacket qp = new QuorumPacket(Leader.REVALIDATE, -1, baos
  280. .toByteArray(), null);
  281. pendingRevalidations.put(clientId, cnxn);
  282. ZooTrace.logTraceMessage(LOG,
  283. ZooTrace.SESSION_TRACE_MASK,
  284. "To validate session "
  285. + Long.toHexString(clientId));
  286. writePacket(qp);
  287. }
  288. /**
  289. * send a request packet to the leader
  290. *
  291. * @param request
  292. * the request from the client
  293. * @throws IOException
  294. */
  295. void request(Request request) throws IOException {
  296. ByteArrayOutputStream baos = new ByteArrayOutputStream();
  297. DataOutputStream oa = new DataOutputStream(baos);
  298. oa.writeLong(request.sessionId);
  299. oa.writeInt(request.cxid);
  300. oa.writeInt(request.type);
  301. if (request.request != null) {
  302. request.request.rewind();
  303. int len = request.request.remaining();
  304. byte b[] = new byte[len];
  305. request.request.get(b);
  306. request.request.rewind();
  307. oa.write(b);
  308. }
  309. oa.close();
  310. QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
  311. .toByteArray(), request.authInfo);
  312. // QuorumPacket qp;
  313. // if(request.type == OpCode.sync){
  314. // qp = new QuorumPacket(Leader.SYNC, -1, baos
  315. // .toByteArray(), request.authInfo);
  316. // }
  317. // else{
  318. // qp = new QuorumPacket(Leader.REQUEST, -1, baos
  319. // .toByteArray(), request.authInfo);
  320. // }
  321. writePacket(qp);
  322. }
  323. public long getZxid() {
  324. try {
  325. synchronized (zk) {
  326. return zk.getZxid();
  327. }
  328. } catch (NullPointerException e) {
  329. }
  330. return -1;
  331. }
  332. public void shutdown() {
  333. // set the zookeeper server to null
  334. self.cnxnFactory.setZooKeeperServer(null);
  335. // clear all the connections
  336. self.cnxnFactory.clear();
  337. // shutdown previous zookeeper
  338. if (zk != null) {
  339. zk.shutdown();
  340. }
  341. LOG.error("FIXMSG",new Exception("shutdown Follower"));
  342. }
  343. }