|
@@ -60,6 +60,16 @@ public class Follower {
|
|
|
this.zk=zk;
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ sb.append("Follower ").append(sock);
|
|
|
+ sb.append(" lastQueuedZxid:").append(lastQueued);
|
|
|
+ sb.append(" pendingRevalidationCount:")
|
|
|
+ .append(pendingRevalidations.size());
|
|
|
+ return sb.toString();
|
|
|
+ }
|
|
|
+
|
|
|
private InputArchive leaderIs;
|
|
|
|
|
|
private OutputArchive leaderOs;
|
|
@@ -111,182 +121,193 @@ public class Follower {
|
|
|
* @throws InterruptedException
|
|
|
*/
|
|
|
void followLeader() throws InterruptedException {
|
|
|
- InetSocketAddress addr = null;
|
|
|
- // Find the leader by id
|
|
|
- Vote current = self.getCurrentVote();
|
|
|
- for (QuorumServer s : self.quorumPeers.values()) {
|
|
|
- if (s.id == current.id) {
|
|
|
- addr = s.addr;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (addr == null) {
|
|
|
- LOG.warn("Couldn't find the leader with id = "
|
|
|
- + current.id);
|
|
|
- }
|
|
|
- LOG.info("Following " + addr);
|
|
|
- sock = new Socket();
|
|
|
+ zk.registerJMX(new FollowerBean(this, zk), self.jmxLocalPeerBean);
|
|
|
+
|
|
|
try {
|
|
|
- QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
|
|
|
- sock.setSoTimeout(self.tickTime * self.initLimit);
|
|
|
- for (int tries = 0; tries < 5; tries++) {
|
|
|
- try {
|
|
|
- //sock = new Socket();
|
|
|
- //sock.setSoTimeout(self.tickTime * self.initLimit);
|
|
|
- sock.connect(addr, self.tickTime * self.syncLimit);
|
|
|
- sock.setTcpNoDelay(true);
|
|
|
+ InetSocketAddress addr = null;
|
|
|
+ // Find the leader by id
|
|
|
+ Vote current = self.getCurrentVote();
|
|
|
+ for (QuorumServer s : self.quorumPeers.values()) {
|
|
|
+ if (s.id == current.id) {
|
|
|
+ addr = s.addr;
|
|
|
break;
|
|
|
- } catch (ConnectException e) {
|
|
|
- if (tries == 4) {
|
|
|
- LOG.error("Unexpected exception",e);
|
|
|
- throw e;
|
|
|
- } else {
|
|
|
- LOG.warn("Unexpected exception",e);
|
|
|
- sock = new Socket();
|
|
|
- sock.setSoTimeout(self.tickTime * self.initLimit);
|
|
|
- }
|
|
|
}
|
|
|
- Thread.sleep(1000);
|
|
|
}
|
|
|
- leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
|
|
|
- sock.getInputStream()));
|
|
|
- bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
|
|
|
- leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
|
|
|
- QuorumPacket qp = new QuorumPacket();
|
|
|
- qp.setType(Leader.LASTZXID);
|
|
|
- long sentLastZxid = self.getLastLoggedZxid();
|
|
|
- qp.setZxid(sentLastZxid);
|
|
|
- writePacket(qp);
|
|
|
- readPacket(qp);
|
|
|
- long newLeaderZxid = qp.getZxid();
|
|
|
-
|
|
|
- if (qp.getType() != Leader.NEWLEADER) {
|
|
|
- LOG.error("First packet should have been NEWLEADER");
|
|
|
- throw new IOException("First packet should have been NEWLEADER");
|
|
|
+ if (addr == null) {
|
|
|
+ LOG.warn("Couldn't find the leader with id = "
|
|
|
+ + current.id);
|
|
|
}
|
|
|
- readPacket(qp);
|
|
|
- synchronized (zk) {
|
|
|
- if (qp.getType() == Leader.DIFF) {
|
|
|
- LOG.info("Getting a diff from the leader!");
|
|
|
- zk.loadData();
|
|
|
- }
|
|
|
- else if (qp.getType() == Leader.SNAP) {
|
|
|
- LOG.info("Getting a snapshot from leader");
|
|
|
- // The leader is going to dump the database
|
|
|
- zk.deserializeSnapshot(leaderIs);
|
|
|
- String signature = leaderIs.readString("signature");
|
|
|
- if (!signature.equals("BenWasHere")) {
|
|
|
- LOG.error("Missing signature. Got " + signature);
|
|
|
- throw new IOException("Missing signature");
|
|
|
- }
|
|
|
- } else if (qp.getType() == Leader.TRUNC) {
|
|
|
- //we need to truncate the log to the lastzxid of the leader
|
|
|
- LOG.warn("Truncating log to get in sync with the leader 0x"
|
|
|
- + Long.toHexString(qp.getZxid()));
|
|
|
- boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
|
|
|
- if (!truncated) {
|
|
|
- // not able to truncate the log
|
|
|
- LOG.error("Not able to truncate the log "
|
|
|
- + Long.toHexString(qp.getZxid()));
|
|
|
- System.exit(13);
|
|
|
+ LOG.info("Following " + addr);
|
|
|
+ sock = new Socket();
|
|
|
+ try {
|
|
|
+ QuorumPacket ack = new QuorumPacket(Leader.ACK, 0, null, null);
|
|
|
+ sock.setSoTimeout(self.tickTime * self.initLimit);
|
|
|
+ for (int tries = 0; tries < 5; tries++) {
|
|
|
+ try {
|
|
|
+ //sock = new Socket();
|
|
|
+ //sock.setSoTimeout(self.tickTime * self.initLimit);
|
|
|
+ sock.connect(addr, self.tickTime * self.syncLimit);
|
|
|
+ sock.setTcpNoDelay(true);
|
|
|
+ break;
|
|
|
+ } catch (ConnectException e) {
|
|
|
+ if (tries == 4) {
|
|
|
+ LOG.error("Unexpected exception",e);
|
|
|
+ throw e;
|
|
|
+ } else {
|
|
|
+ LOG.warn("Unexpected exception",e);
|
|
|
+ sock = new Socket();
|
|
|
+ sock.setSoTimeout(self.tickTime * self.initLimit);
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- zk.loadData();
|
|
|
+ Thread.sleep(1000);
|
|
|
}
|
|
|
- else {
|
|
|
- LOG.error("Got unexpected packet from leader "
|
|
|
- + qp.getType() + " exiting ... " );
|
|
|
- System.exit(13);
|
|
|
+ leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
|
|
|
+ sock.getInputStream()));
|
|
|
+ bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
|
|
|
+ leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
|
|
|
+ QuorumPacket qp = new QuorumPacket();
|
|
|
+ qp.setType(Leader.LASTZXID);
|
|
|
+ long sentLastZxid = self.getLastLoggedZxid();
|
|
|
+ qp.setZxid(sentLastZxid);
|
|
|
+ writePacket(qp);
|
|
|
+ readPacket(qp);
|
|
|
+ long newLeaderZxid = qp.getZxid();
|
|
|
+
|
|
|
+ if (qp.getType() != Leader.NEWLEADER) {
|
|
|
+ LOG.error("First packet should have been NEWLEADER");
|
|
|
+ throw new IOException("First packet should have been NEWLEADER");
|
|
|
}
|
|
|
- zk.dataTree.lastProcessedZxid = newLeaderZxid;
|
|
|
- }
|
|
|
- ack.setZxid(newLeaderZxid & ~0xffffffffL);
|
|
|
- writePacket(ack);
|
|
|
- sock.setSoTimeout(self.tickTime * self.syncLimit);
|
|
|
- zk.startup();
|
|
|
- while (self.running) {
|
|
|
readPacket(qp);
|
|
|
- switch (qp.getType()) {
|
|
|
- case Leader.PING:
|
|
|
- // Send back the ping with our session data
|
|
|
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
|
|
- DataOutputStream dos = new DataOutputStream(bos);
|
|
|
- HashMap<Long, Integer> touchTable = zk
|
|
|
- .getTouchSnapshot();
|
|
|
- for (Entry<Long, Integer> entry : touchTable.entrySet()) {
|
|
|
- dos.writeLong(entry.getKey());
|
|
|
- dos.writeInt(entry.getValue());
|
|
|
+ synchronized (zk) {
|
|
|
+ if (qp.getType() == Leader.DIFF) {
|
|
|
+ LOG.info("Getting a diff from the leader!");
|
|
|
+ zk.loadData();
|
|
|
}
|
|
|
- qp.setData(bos.toByteArray());
|
|
|
- writePacket(qp);
|
|
|
- break;
|
|
|
- case Leader.PROPOSAL:
|
|
|
- TxnHeader hdr = new TxnHeader();
|
|
|
- BinaryInputArchive ia = BinaryInputArchive
|
|
|
- .getArchive(new ByteArrayInputStream(qp.getData()));
|
|
|
- Record txn = SerializeUtils.deserializeTxn(ia, hdr);
|
|
|
- if (hdr.getZxid() != lastQueued + 1) {
|
|
|
- LOG.warn("Got zxid 0x"
|
|
|
- + Long.toHexString(hdr.getZxid())
|
|
|
- + " expected 0x"
|
|
|
- + Long.toHexString(lastQueued + 1));
|
|
|
+ else if (qp.getType() == Leader.SNAP) {
|
|
|
+ LOG.info("Getting a snapshot from leader");
|
|
|
+ // The leader is going to dump the database
|
|
|
+ zk.deserializeSnapshot(leaderIs);
|
|
|
+ String signature = leaderIs.readString("signature");
|
|
|
+ if (!signature.equals("BenWasHere")) {
|
|
|
+ LOG.error("Missing signature. Got " + signature);
|
|
|
+ throw new IOException("Missing signature");
|
|
|
+ }
|
|
|
+ } else if (qp.getType() == Leader.TRUNC) {
|
|
|
+ //we need to truncate the log to the lastzxid of the leader
|
|
|
+ LOG.warn("Truncating log to get in sync with the leader 0x"
|
|
|
+ + Long.toHexString(qp.getZxid()));
|
|
|
+ boolean truncated=zk.getLogWriter().truncateLog(qp.getZxid());
|
|
|
+ if (!truncated) {
|
|
|
+ // not able to truncate the log
|
|
|
+ LOG.error("Not able to truncate the log "
|
|
|
+ + Long.toHexString(qp.getZxid()));
|
|
|
+ System.exit(13);
|
|
|
+ }
|
|
|
+
|
|
|
+ zk.loadData();
|
|
|
}
|
|
|
- lastQueued = hdr.getZxid();
|
|
|
- zk.logRequest(hdr, txn);
|
|
|
- break;
|
|
|
- case Leader.COMMIT:
|
|
|
- zk.commit(qp.getZxid());
|
|
|
- break;
|
|
|
- case Leader.UPTODATE:
|
|
|
- zk.takeSnapshot();
|
|
|
- self.cnxnFactory.setZooKeeperServer(zk);
|
|
|
- break;
|
|
|
- case Leader.REVALIDATE:
|
|
|
- ByteArrayInputStream bis = new ByteArrayInputStream(qp
|
|
|
- .getData());
|
|
|
- DataInputStream dis = new DataInputStream(bis);
|
|
|
- long sessionId = dis.readLong();
|
|
|
- boolean valid = dis.readBoolean();
|
|
|
- synchronized (pendingRevalidations) {
|
|
|
- ServerCnxn cnxn = pendingRevalidations
|
|
|
- .remove(sessionId);
|
|
|
- if (cnxn == null) {
|
|
|
- LOG.warn("Missing session 0x"
|
|
|
- + Long.toHexString(sessionId)
|
|
|
- + " for validation");
|
|
|
- } else {
|
|
|
- cnxn.finishSessionInit(valid);
|
|
|
+ else {
|
|
|
+ LOG.error("Got unexpected packet from leader "
|
|
|
+ + qp.getType() + " exiting ... " );
|
|
|
+ System.exit(13);
|
|
|
+ }
|
|
|
+ zk.dataTree.lastProcessedZxid = newLeaderZxid;
|
|
|
+ }
|
|
|
+ ack.setZxid(newLeaderZxid & ~0xffffffffL);
|
|
|
+ writePacket(ack);
|
|
|
+ sock.setSoTimeout(self.tickTime * self.syncLimit);
|
|
|
+ zk.startup();
|
|
|
+ while (self.running) {
|
|
|
+ readPacket(qp);
|
|
|
+ switch (qp.getType()) {
|
|
|
+ case Leader.PING:
|
|
|
+ // Send back the ping with our session data
|
|
|
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
|
|
+ DataOutputStream dos = new DataOutputStream(bos);
|
|
|
+ HashMap<Long, Integer> touchTable = zk
|
|
|
+ .getTouchSnapshot();
|
|
|
+ for (Entry<Long, Integer> entry : touchTable.entrySet()) {
|
|
|
+ dos.writeLong(entry.getKey());
|
|
|
+ dos.writeInt(entry.getValue());
|
|
|
+ }
|
|
|
+ qp.setData(bos.toByteArray());
|
|
|
+ writePacket(qp);
|
|
|
+ break;
|
|
|
+ case Leader.PROPOSAL:
|
|
|
+ TxnHeader hdr = new TxnHeader();
|
|
|
+ BinaryInputArchive ia = BinaryInputArchive
|
|
|
+ .getArchive(new ByteArrayInputStream(qp.getData()));
|
|
|
+ Record txn = SerializeUtils.deserializeTxn(ia, hdr);
|
|
|
+ if (hdr.getZxid() != lastQueued + 1) {
|
|
|
+ LOG.warn("Got zxid 0x"
|
|
|
+ + Long.toHexString(hdr.getZxid())
|
|
|
+ + " expected 0x"
|
|
|
+ + Long.toHexString(lastQueued + 1));
|
|
|
+ }
|
|
|
+ lastQueued = hdr.getZxid();
|
|
|
+ zk.logRequest(hdr, txn);
|
|
|
+ break;
|
|
|
+ case Leader.COMMIT:
|
|
|
+ zk.commit(qp.getZxid());
|
|
|
+ break;
|
|
|
+ case Leader.UPTODATE:
|
|
|
+ zk.takeSnapshot();
|
|
|
+ self.cnxnFactory.setZooKeeperServer(zk);
|
|
|
+ break;
|
|
|
+ case Leader.REVALIDATE:
|
|
|
+ ByteArrayInputStream bis = new ByteArrayInputStream(qp
|
|
|
+ .getData());
|
|
|
+ DataInputStream dis = new DataInputStream(bis);
|
|
|
+ long sessionId = dis.readLong();
|
|
|
+ boolean valid = dis.readBoolean();
|
|
|
+ synchronized (pendingRevalidations) {
|
|
|
+ ServerCnxn cnxn = pendingRevalidations
|
|
|
+ .remove(sessionId);
|
|
|
+ if (cnxn == null) {
|
|
|
+ LOG.warn("Missing session 0x"
|
|
|
+ + Long.toHexString(sessionId)
|
|
|
+ + " for validation");
|
|
|
+ } else {
|
|
|
+ cnxn.finishSessionInit(valid);
|
|
|
+ }
|
|
|
}
|
|
|
+ ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
|
|
|
+ "Session 0x"
|
|
|
+ + Long.toHexString(sessionId)
|
|
|
+ + " is valid: " + valid);
|
|
|
+ break;
|
|
|
+ case Leader.SYNC:
|
|
|
+ zk.sync();
|
|
|
+ break;
|
|
|
}
|
|
|
- ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
|
|
|
- "Session 0x"
|
|
|
- + Long.toHexString(sessionId)
|
|
|
- + " is valid: " + valid);
|
|
|
- break;
|
|
|
- case Leader.SYNC:
|
|
|
- zk.sync();
|
|
|
- break;
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Exception when following the leader", e);
|
|
|
+ try {
|
|
|
+ sock.close();
|
|
|
+ } catch (IOException e1) {
|
|
|
+ e1.printStackTrace();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (pendingRevalidations) {
|
|
|
+ // clear pending revalitions
|
|
|
+ pendingRevalidations.clear();
|
|
|
+ pendingRevalidations.notifyAll();
|
|
|
}
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Exception when following the leader", e);
|
|
|
- try {
|
|
|
- sock.close();
|
|
|
- } catch (IOException e1) {
|
|
|
- e1.printStackTrace();
|
|
|
- }
|
|
|
-
|
|
|
- synchronized (pendingRevalidations) {
|
|
|
- // clear pending revalitions
|
|
|
- pendingRevalidations.clear();
|
|
|
- pendingRevalidations.notifyAll();
|
|
|
- }
|
|
|
+ } finally {
|
|
|
+ zk.unregisterJMX(this);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private long lastQueued;
|
|
|
|
|
|
- ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations = new ConcurrentHashMap<Long, ServerCnxn>();
|
|
|
+ final ConcurrentHashMap<Long, ServerCnxn> pendingRevalidations =
|
|
|
+ new ConcurrentHashMap<Long, ServerCnxn>();
|
|
|
+
|
|
|
+ public int getPendingRevalidationsCount() {
|
|
|
+ return pendingRevalidations.size();
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* validate a seesion for a client
|
|
@@ -362,6 +383,10 @@ public class Follower {
|
|
|
}
|
|
|
return -1;
|
|
|
}
|
|
|
+
|
|
|
+ public long getLastQueued() {
|
|
|
+ return lastQueued;
|
|
|
+ }
|
|
|
|
|
|
public void shutdown() {
|
|
|
// set the zookeeper server to null
|