123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389 |
- /*
- * Copyright 2008, Yahoo! Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package com.yahoo.zookeeper.server.quorum;
- import java.io.BufferedInputStream;
- import java.io.BufferedOutputStream;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.net.Socket;
- import java.nio.ByteBuffer;
- import java.util.concurrent.LinkedBlockingQueue;
- import org.apache.log4j.Logger;
- import com.yahoo.jute.BinaryInputArchive;
- import com.yahoo.jute.BinaryOutputArchive;
- import com.yahoo.jute.Record;
- import com.yahoo.zookeeper.ZooDefs.OpCode;
- import com.yahoo.zookeeper.server.ZooKeeperServer;
- import com.yahoo.zookeeper.server.ZooTrace;
- import com.yahoo.zookeeper.server.quorum.Leader.Proposal;
- import com.yahoo.zookeeper.txn.TxnHeader;
- /**
- * There will be an instance of this class created by the Leader for each
- * follower.All communication for a given Follower will be handled by this
- * class.
- */
- public class FollowerHandler extends Thread {
- private static final Logger LOG = Logger.getLogger(FollowerHandler.class);
- public Socket s;
- Leader leader;
- long tickOfLastAck;
- /**
- * The packets to be sent to the follower
- */
- LinkedBlockingQueue<QuorumPacket> queuedPackets = new LinkedBlockingQueue<QuorumPacket>();
- private BinaryInputArchive ia;
- private BinaryOutputArchive oa;
- private BufferedOutputStream bufferedOutput;
- FollowerHandler(Socket s, Leader leader) throws IOException {
- super("FollowerHandler-" + s.getRemoteSocketAddress());
- this.s = s;
- this.leader = leader;
- leader.addFollowerHandler(this);
- start();
- }
- /**
- * If this packet is queued, the sender thread will exit
- */
- QuorumPacket proposalOfDeath = new QuorumPacket();
- /**
- * This method will use the thread to send packets added to the
- * queuedPackets list
- *
- * @throws InterruptedException
- */
- private void sendPackets() throws InterruptedException {
- long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
- while (true) {
- QuorumPacket p;
- p = queuedPackets.take();
- if (p == proposalOfDeath) {
- // Packet of death!
- break;
- }
- if (p.getType() == Leader.PING) {
- traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
- }
- ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
- try {
- oa.writeRecord(p, "packet");
- bufferedOutput.flush();
- } catch (IOException e) {
- if (!s.isClosed()) {
- LOG.warn("Unexpected exception",e);
- }
- break;
- }
- }
- }
- static public String packetToString(QuorumPacket p) {
- if (true)
- return null;
- String type = null;
- String mess = null;
- Record txn = null;
- switch (p.getType()) {
- case Leader.ACK:
- type = "ACK";
- break;
- case Leader.COMMIT:
- type = "COMMIT";
- break;
- case Leader.LASTZXID:
- type = "LASTZXID";
- break;
- case Leader.NEWLEADER:
- type = "NEWLEADER";
- break;
- case Leader.PING:
- type = "PING";
- break;
- case Leader.PROPOSAL:
- type = "PROPOSAL";
- BinaryInputArchive ia = BinaryInputArchive
- .getArchive(new ByteArrayInputStream(p.getData()));
- TxnHeader hdr = new TxnHeader();
- try {
- txn = ZooKeeperServer.deserializeTxn(ia, hdr);
- // mess = "transaction: " + txn.toString();
- } catch (IOException e) {
- LOG.warn("Unexpected exception",e);
- }
- break;
- case Leader.REQUEST:
- type = "REQUEST";
- break;
- case Leader.REVALIDATE:
- type = "REVALIDATE";
- ByteArrayInputStream bis = new ByteArrayInputStream(p.getData());
- DataInputStream dis = new DataInputStream(bis);
- try {
- long id = dis.readLong();
- mess = " sessionid = " + id;
- } catch (IOException e) {
- LOG.warn("Unexpected exception", e);
- }
- break;
- case Leader.UPTODATE:
- type = "UPTODATE";
- break;
- default:
- type = "UNKNOWN" + p.getType();
- }
- String entry = null;
- if (type != null) {
- entry = type + " " + Long.toHexString(p.getZxid()) + " " + mess;
- }
- return entry;
- }
- /**
- * This thread will receive packets from the follower and process them and
- * also listen to new connections from new followers.
- */
- public void run() {
- try {
- ia = BinaryInputArchive.getArchive(new BufferedInputStream(s
- .getInputStream()));
- bufferedOutput = new BufferedOutputStream(s.getOutputStream());
- oa = BinaryOutputArchive.getArchive(bufferedOutput);
- QuorumPacket qp = new QuorumPacket();
- ia.readRecord(qp, "packet");
- if (qp.getType() != Leader.LASTZXID) {
- LOG.error("First packet " + qp.toString()
- + " is not LASTZXID!");
- return;
- }
- long peerLastZxid = qp.getZxid();
- int packetToSend = Leader.SNAP;
- boolean logTxns = true;
- long zxidToSend = 0;
- // we are sending the diff
- synchronized(leader.zk.committedLog) {
- if (leader.zk.committedLog.size() != 0) {
- if ((leader.zk.maxCommittedLog >= peerLastZxid)
- && (leader.zk.minCommittedLog <= peerLastZxid)) {
- packetToSend = Leader.DIFF;
- zxidToSend = leader.zk.maxCommittedLog;
- for (Proposal propose: leader.zk.committedLog) {
- if (propose.packet.getZxid() > peerLastZxid) {
- queuePacket(propose.packet);
- QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
- null, null);
- queuePacket(qcommit);
- }
- }
- }
- }
- else {
- logTxns = false;
- } }
- long leaderLastZxid = leader.startForwarding(this, peerLastZxid);
- QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
- leaderLastZxid, null, null);
- oa.writeRecord(newLeaderQP, "packet");
- bufferedOutput.flush();
- // a special case when both the ids are the same
- if (peerLastZxid == leaderLastZxid) {
- packetToSend = Leader.DIFF;
- zxidToSend = leaderLastZxid;
- }
- //check if we decided to send a diff or we need to send a truncate
- // we avoid using epochs for truncating because epochs make things
- // complicated. Two epochs might have the last 32 bits as same.
- // only if we know that there is a committed zxid in the queue that
- // is less than the one the peer has we send a trunc else to make
- // things simple we just send sanpshot.
- if (logTxns && (peerLastZxid > leader.zk.maxCommittedLog)) {
- // this is the only case that we are sure that
- // we can ask the follower to truncate the log
- packetToSend = Leader.TRUNC;
- zxidToSend = leader.zk.maxCommittedLog;
- }
- oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
- bufferedOutput.flush();
- // only if we are not truncating or fast sycning
- if (packetToSend == Leader.SNAP) {
- LOG.warn("Sending snapshot last zxid of peer is "
- + Long.toHexString(peerLastZxid) + " " + " zxid of leader is "
- + Long.toHexString(leaderLastZxid));
- // Dump data to follower
- leader.zk.snapshot(oa);
- oa.writeString("BenWasHere", "signature");
- }
- bufferedOutput.flush();
- //
- // Mutation packets will be queued during the serialize,
- // so we need to mark when the follower can actually start
- // using the data
- //
- queuedPackets
- .add(new QuorumPacket(Leader.UPTODATE, -1, null, null));
- // Start sending packets
- new Thread() {
- public void run() {
- Thread.currentThread().setName(
- "Sender-" + s.getRemoteSocketAddress());
- try {
- sendPackets();
- } catch (InterruptedException e) {
- LOG.warn("Interrupted",e);
- }
- }
- }.start();
- while (true) {
- qp = new QuorumPacket();
- ia.readRecord(qp, "packet");
- long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
- if (qp.getType() == Leader.PING) {
- traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
- }
- ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
- tickOfLastAck = leader.self.tick;
- ByteBuffer bb;
- long sessionId;
- int cxid;
- int type;
- switch (qp.getType()) {
- case Leader.ACK:
- leader.processAck(qp.getZxid(), s.getLocalSocketAddress());
- break;
- case Leader.PING:
- // Process the touches
- ByteArrayInputStream bis = new ByteArrayInputStream(qp
- .getData());
- DataInputStream dis = new DataInputStream(bis);
- while (dis.available() > 0) {
- long sess = dis.readLong();
- int to = dis.readInt();
- leader.zk.touch(sess, to);
- }
- break;
- case Leader.REVALIDATE:
- bis = new ByteArrayInputStream(qp.getData());
- dis = new DataInputStream(bis);
- long id = dis.readLong();
- int to = dis.readInt();
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- DataOutputStream dos = new DataOutputStream(bos);
- dos.writeLong(id);
- boolean valid = leader.zk.touch(id, to);
- ZooTrace.logTraceMessage(LOG,
- ZooTrace.SESSION_TRACE_MASK,
- "Session " + Long.toHexString(id)
- + " is valid: "+ valid);
- dos.writeBoolean(valid);
- qp.setData(bos.toByteArray());
- queuedPackets.add(qp);
- break;
- case Leader.REQUEST:
- bb = ByteBuffer.wrap(qp.getData());
- sessionId = bb.getLong();
- cxid = bb.getInt();
- type = bb.getInt();
- bb = bb.slice();
- if(type == OpCode.sync){
- leader.setSyncHandler(this, sessionId);
- }
- leader.zk.submitRequest(null, sessionId, type, cxid, bb,
- qp.getAuthinfo());
- break;
- default:
- }
- }
- } catch (IOException e) {
- if (s != null && !s.isClosed()) {
- LOG.error("FIXMSG",e);
- }
- } catch (InterruptedException e) {
- LOG.error("FIXMSG",e);
- } finally {
- LOG.warn("******* GOODBYE " + s.getRemoteSocketAddress()
- + " ********");
- // Send the packet of death
- try {
- queuedPackets.put(proposalOfDeath);
- } catch (InterruptedException e) {
- LOG.error("FIXMSG",e);
- }
- shutdown();
- }
- }
- public void shutdown() {
- try {
- if (s != null && !s.isClosed()) {
- s.close();
- }
- } catch (IOException e) {
- LOG.error("FIXMSG",e);
- }
- leader.removeFollowerHandler(this);
- }
- public long tickOfLastAck() {
- return tickOfLastAck;
- }
- /**
- * ping calls from the leader to the followers
- */
- public void ping() {
- QuorumPacket ping = new QuorumPacket(Leader.PING, leader.lastProposed,
- null, null);
- queuePacket(ping);
- }
- void queuePacket(QuorumPacket p) {
- queuedPackets.add(p);
- }
- public boolean synced() {
- return isAlive()
- && tickOfLastAck >= leader.self.tick - leader.self.syncLimit;
- }
- }
|