|
@@ -23,23 +23,212 @@ import java.io.IOException;
|
|
|
import java.net.DatagramPacket;
|
|
|
import java.net.DatagramSocket;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.SocketException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
+import org.apache.zookeeper.jmx.MBeanRegistry;
|
|
|
+import org.apache.zookeeper.server.NIOServerCnxn;
|
|
|
+import org.apache.zookeeper.server.quorum.Election;
|
|
|
+import org.apache.zookeeper.server.quorum.LeaderElection;
|
|
|
+import org.apache.zookeeper.server.quorum.LeaderElectionBean;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
|
|
import org.apache.zookeeper.server.quorum.Vote;
|
|
|
+import org.apache.zookeeper.server.quorum.LeaderElection.ElectionResult;
|
|
|
+import org.apache.zookeeper.server.quorum.QuorumPeer.LearnerType;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.ServerState;
|
|
|
+import org.apache.zookeeper.server.quorum.flexible.QuorumMaj;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
/**
|
|
|
* Tests that a particular run of LeaderElection terminates correctly.
|
|
|
*/
|
|
|
public class LENonTerminateTest extends TestCase {
|
|
|
+ public class MockLeaderElection extends LeaderElection {
|
|
|
+ public MockLeaderElection(QuorumPeer self) {
|
|
|
+ super(self);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Temporary for 3.3.0 - we want to ensure that a round of voting happens
|
|
|
+ * before any of the peers update their votes. The easiest way to do that
|
|
|
+ * is to add a latch that all wait on after counting their votes.
|
|
|
+ *
|
|
|
+ * In 3.4.0 we intend to make this class more testable, and therefore
|
|
|
+ * there should be much less duplicated code.
|
|
|
+ *
|
|
|
+ * JMX bean method calls are removed to reduce noise.
|
|
|
+ */
|
|
|
+ public Vote lookForLeader() throws InterruptedException {
|
|
|
+ self.setCurrentVote(new Vote(self.getId(),
|
|
|
+ self.getLastLoggedZxid()));
|
|
|
+ // We are going to look for a leader by casting a vote for ourself
|
|
|
+ byte requestBytes[] = new byte[4];
|
|
|
+ ByteBuffer requestBuffer = ByteBuffer.wrap(requestBytes);
|
|
|
+ byte responseBytes[] = new byte[28];
|
|
|
+ ByteBuffer responseBuffer = ByteBuffer.wrap(responseBytes);
|
|
|
+ /* The current vote for the leader. Initially me! */
|
|
|
+ DatagramSocket s = null;
|
|
|
+ try {
|
|
|
+ s = new DatagramSocket();
|
|
|
+ s.setSoTimeout(200);
|
|
|
+ } catch (SocketException e1) {
|
|
|
+ LOG.error("Socket exception when creating socket for leader election", e1);
|
|
|
+ System.exit(4);
|
|
|
+ }
|
|
|
+ DatagramPacket requestPacket = new DatagramPacket(requestBytes,
|
|
|
+ requestBytes.length);
|
|
|
+ DatagramPacket responsePacket = new DatagramPacket(responseBytes,
|
|
|
+ responseBytes.length);
|
|
|
+ HashMap<InetSocketAddress, Vote> votes =
|
|
|
+ new HashMap<InetSocketAddress, Vote>(self.getVotingView().size());
|
|
|
+ int xid = epochGen.nextInt();
|
|
|
+ while (self.isRunning()) {
|
|
|
+ votes.clear();
|
|
|
+ requestBuffer.clear();
|
|
|
+ requestBuffer.putInt(xid);
|
|
|
+ requestPacket.setLength(4);
|
|
|
+ HashSet<Long> heardFrom = new HashSet<Long>();
|
|
|
+ for (QuorumServer server : self.getVotingView().values()) {
|
|
|
+ LOG.info("Server address: " + server.addr);
|
|
|
+ try {
|
|
|
+ requestPacket.setSocketAddress(server.addr);
|
|
|
+ } catch (IllegalArgumentException e) {
|
|
|
+ // Sun doesn't include the address that causes this
|
|
|
+ // exception to be thrown, so we wrap the exception
|
|
|
+ // in order to capture this critical detail.
|
|
|
+ throw new IllegalArgumentException(
|
|
|
+ "Unable to set socket address on packet, msg:"
|
|
|
+ + e.getMessage() + " with addr:" + server.addr,
|
|
|
+ e);
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ s.send(requestPacket);
|
|
|
+ responsePacket.setLength(responseBytes.length);
|
|
|
+ s.receive(responsePacket);
|
|
|
+ if (responsePacket.getLength() != responseBytes.length) {
|
|
|
+ LOG.error("Got a short response: "
|
|
|
+ + responsePacket.getLength());
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ responseBuffer.clear();
|
|
|
+ int recvedXid = responseBuffer.getInt();
|
|
|
+ if (recvedXid != xid) {
|
|
|
+ LOG.error("Got bad xid: expected " + xid
|
|
|
+ + " got " + recvedXid);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ long peerId = responseBuffer.getLong();
|
|
|
+ heardFrom.add(peerId);
|
|
|
+ //if(server.id != peerId){
|
|
|
+ Vote vote = new Vote(responseBuffer.getLong(),
|
|
|
+ responseBuffer.getLong());
|
|
|
+ InetSocketAddress addr =
|
|
|
+ (InetSocketAddress) responsePacket
|
|
|
+ .getSocketAddress();
|
|
|
+ votes.put(addr, vote);
|
|
|
+ //}
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Ignoring exception while looking for leader",
|
|
|
+ e);
|
|
|
+ // Errors are okay, since hosts may be
|
|
|
+ // down
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ElectionResult result = countVotes(votes, heardFrom);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This is the only difference from LeaderElection - wait for
|
|
|
+ * this latch on the first time through this method. This ensures
|
|
|
+ * that the first round of voting happens before setCurrentVote
|
|
|
+ * is called below.
|
|
|
+ */
|
|
|
+ LOG.info("Waiting for first round of voting to complete");
|
|
|
+ latch.countDown();
|
|
|
+ assertTrue("Thread timed out waiting for latch",
|
|
|
+ latch.await(10000, TimeUnit.MILLISECONDS));
|
|
|
+
|
|
|
+ // ZOOKEEPER-569:
|
|
|
+ // If no votes are received for live peers, reset to voting
|
|
|
+ // for ourselves as otherwise we may hang on to a vote
|
|
|
+ // for a dead peer
|
|
|
+ if (votes.size() == 0) {
|
|
|
+ self.setCurrentVote(new Vote(self.getId(),
|
|
|
+ self.getLastLoggedZxid()));
|
|
|
+ } else {
|
|
|
+ if (result.winner.id >= 0) {
|
|
|
+ self.setCurrentVote(result.vote);
|
|
|
+ // To do: this doesn't use a quorum verifier
|
|
|
+ if (result.winningCount > (self.getVotingView().size() / 2)) {
|
|
|
+ self.setCurrentVote(result.winner);
|
|
|
+ s.close();
|
|
|
+ Vote current = self.getCurrentVote();
|
|
|
+ LOG.info("Found leader: my type is: " + self.getPeerType());
|
|
|
+ /*
|
|
|
+ * We want to make sure we implement the state machine
|
|
|
+ * correctly. If we are a PARTICIPANT, once a leader
|
|
|
+ * is elected we can move either to LEADING or
|
|
|
+ * FOLLOWING. However if we are an OBSERVER, it is an
|
|
|
+ * error to be elected as a Leader.
|
|
|
+ */
|
|
|
+ if (self.getPeerType() == LearnerType.OBSERVER) {
|
|
|
+ if (current.id == self.getId()) {
|
|
|
+ // This should never happen!
|
|
|
+ LOG.error("OBSERVER elected as leader!");
|
|
|
+ Thread.sleep(100);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ self.setPeerState(ServerState.OBSERVING);
|
|
|
+ Thread.sleep(100);
|
|
|
+ return current;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ self.setPeerState((current.id == self.getId())
|
|
|
+ ? ServerState.LEADING: ServerState.FOLLOWING);
|
|
|
+ if (self.getPeerState() == ServerState.FOLLOWING) {
|
|
|
+ Thread.sleep(100);
|
|
|
+ }
|
|
|
+ return current;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ Thread.sleep(1000);
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public class MockQuorumPeer extends QuorumPeer {
|
|
|
+ public MockQuorumPeer(Map<Long,QuorumServer> quorumPeers, File snapDir,
|
|
|
+ File logDir, int clientPort, int electionAlg,
|
|
|
+ long myid, int tickTime, int initLimit, int syncLimit)
|
|
|
+ throws IOException
|
|
|
+ {
|
|
|
+ super(quorumPeers, snapDir, logDir, electionAlg,
|
|
|
+ myid,tickTime, initLimit,syncLimit,
|
|
|
+ new NIOServerCnxn.Factory(new InetSocketAddress(clientPort)),
|
|
|
+ new QuorumMaj(countParticipants(quorumPeers)));
|
|
|
+ }
|
|
|
+
|
|
|
+ protected Election createElectionAlgorithm(int electionAlgorithm){
|
|
|
+ LOG.info("Returning mocked leader election");
|
|
|
+ return new MockLeaderElection(this);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
protected static final Logger LOG = Logger.getLogger(FLELostMessageTest.class);
|
|
|
|
|
|
int count;
|
|
@@ -62,11 +251,14 @@ public class LENonTerminateTest extends TestCase {
|
|
|
public void tearDown() throws Exception {
|
|
|
LOG.info("FINISHED " + getName());
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ static final CountDownLatch latch = new CountDownLatch(2);
|
|
|
+ static final CountDownLatch mockLatch = new CountDownLatch(1);
|
|
|
|
|
|
class LEThread extends Thread {
|
|
|
int i;
|
|
|
QuorumPeer peer;
|
|
|
+
|
|
|
|
|
|
LEThread(QuorumPeer peer, int i) {
|
|
|
this.i = i;
|
|
@@ -84,7 +276,7 @@ public class LENonTerminateTest extends TestCase {
|
|
|
|
|
|
if (v == null){
|
|
|
fail("Thread " + i + " got a null vote");
|
|
|
- }
|
|
|
+ }
|
|
|
|
|
|
/*
|
|
|
* A real zookeeper would take care of setting the current vote. Here
|
|
@@ -125,15 +317,13 @@ public class LENonTerminateTest extends TestCase {
|
|
|
/*
|
|
|
* peer1 and peer2 are A and B in the above example.
|
|
|
*/
|
|
|
- QuorumPeer peer1 = new QuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 0, 0, 2, 2, 2);
|
|
|
+ QuorumPeer peer1 = new MockQuorumPeer(peers, tmpdir[0], tmpdir[0], port[0], 0, 0, 2, 2, 2);
|
|
|
peer1.startLeaderElection();
|
|
|
LEThread thread1 = new LEThread(peer1, 0);
|
|
|
- thread1.start();
|
|
|
|
|
|
- QuorumPeer peer2 = new QuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 0, 1, 2, 2, 2);
|
|
|
+ QuorumPeer peer2 = new MockQuorumPeer(peers, tmpdir[1], tmpdir[1], port[1], 0, 1, 2, 2, 2);
|
|
|
peer2.startLeaderElection();
|
|
|
LEThread thread2 = new LEThread(peer2, 1);
|
|
|
- thread2.start();
|
|
|
|
|
|
/*
|
|
|
* Start mock server.
|
|
@@ -150,6 +340,10 @@ public class LENonTerminateTest extends TestCase {
|
|
|
};
|
|
|
|
|
|
thread3.start();
|
|
|
+ assertTrue("mockServer did not start in 5s",
|
|
|
+ mockLatch.await(5000, TimeUnit.MILLISECONDS));
|
|
|
+ thread1.start();
|
|
|
+ thread2.start();
|
|
|
/*
|
|
|
* Occasionally seen false negatives with a 5s timeout.
|
|
|
*/
|
|
@@ -171,9 +365,14 @@ public class LENonTerminateTest extends TestCase {
|
|
|
DatagramPacket packet = new DatagramPacket(b, b.length);
|
|
|
QuorumServer server = peers.get(Long.valueOf(2));
|
|
|
DatagramSocket udpSocket = new DatagramSocket(server.addr.getPort());
|
|
|
+ LOG.info("In MockServer");
|
|
|
+ mockLatch.countDown();
|
|
|
Vote current = new Vote(2, 1);
|
|
|
for (int i=0;i<2;++i) {
|
|
|
udpSocket.receive(packet);
|
|
|
+ responseBuffer.rewind();
|
|
|
+ LOG.info("Received " + responseBuffer.getInt() + " " + responseBuffer.getLong() + " " + responseBuffer.getLong());
|
|
|
+ LOG.info("From " + packet.getSocketAddress());
|
|
|
responseBuffer.clear();
|
|
|
responseBuffer.getInt(); // Skip the xid
|
|
|
responseBuffer.putLong(2);
|