|
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.concurrent.Semaphore;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
import junit.framework.TestCase;
|
|
|
|
|
@@ -33,6 +34,7 @@ import org.apache.log4j.Logger;
|
|
|
import org.apache.zookeeper.PortAssignment;
|
|
|
import org.apache.zookeeper.server.quorum.FastLeaderElection;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumCnxManager;
|
|
|
+import org.apache.zookeeper.server.quorum.QuorumCnxManager.Message;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer;
|
|
|
import org.apache.zookeeper.server.quorum.Vote;
|
|
|
import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
|
|
@@ -53,6 +55,7 @@ import org.junit.Test;
|
|
|
*/
|
|
|
public class CnxManagerTest extends TestCase {
|
|
|
protected static final Logger LOG = Logger.getLogger(FLENewEpochTest.class);
|
|
|
+ protected static final int THRESHOLD = 4;
|
|
|
|
|
|
int count;
|
|
|
HashMap<Long,QuorumServer> peers;
|
|
@@ -101,7 +104,10 @@ public class CnxManagerTest extends TestCase {
|
|
|
|
|
|
class CnxManagerThread extends Thread {
|
|
|
|
|
|
- CnxManagerThread(){}
|
|
|
+ boolean failed;
|
|
|
+ CnxManagerThread(){
|
|
|
+ failed = false;
|
|
|
+ }
|
|
|
|
|
|
public void run(){
|
|
|
try {
|
|
@@ -116,10 +122,26 @@ public class CnxManagerTest extends TestCase {
|
|
|
|
|
|
long sid = 1;
|
|
|
cnxManager.toSend(sid, createMsg(ServerState.LOOKING.ordinal(), 0, -1, 1));
|
|
|
- cnxManager.recvQueue.take();
|
|
|
+
|
|
|
+ Message m = null;
|
|
|
+ int numRetries = 1;
|
|
|
+ while((m == null) && (numRetries++ <= THRESHOLD)){
|
|
|
+ m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
|
|
|
+ if(m == null) cnxManager.connectAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ if(numRetries > THRESHOLD){
|
|
|
+ failed = true;
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
cnxManager.testInitiateConnection(sid);
|
|
|
|
|
|
- cnxManager.recvQueue.take();
|
|
|
+ m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
|
|
|
+ if(m == null){
|
|
|
+ failed = true;
|
|
|
+ return;
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
LOG.error("Exception while running mock thread", e);
|
|
|
fail("Unexpected exception");
|
|
@@ -129,7 +151,7 @@ public class CnxManagerTest extends TestCase {
|
|
|
|
|
|
@Test
|
|
|
public void testCnxManager() throws Exception {
|
|
|
- Thread thread = new CnxManagerThread();
|
|
|
+ CnxManagerThread thread = new CnxManagerThread();
|
|
|
|
|
|
thread.start();
|
|
|
|
|
@@ -143,11 +165,22 @@ public class CnxManagerTest extends TestCase {
|
|
|
}
|
|
|
|
|
|
cnxManager.toSend(new Long(0), createMsg(ServerState.LOOKING.ordinal(), 1, -1, 1));
|
|
|
- cnxManager.recvQueue.take();
|
|
|
|
|
|
+ Message m = null;
|
|
|
+ int numRetries = 1;
|
|
|
+ while((m == null) && (numRetries++ <= THRESHOLD)){
|
|
|
+ m = cnxManager.recvQueue.poll(3000, TimeUnit.MILLISECONDS);
|
|
|
+ if(m == null) cnxManager.connectAll();
|
|
|
+ }
|
|
|
+
|
|
|
+ assertTrue("Exceeded number of retries", numRetries <= THRESHOLD);
|
|
|
+
|
|
|
thread.join(5000);
|
|
|
if (thread.isAlive()) {
|
|
|
- fail("Threads didn't join");
|
|
|
+ fail("Thread didn't join");
|
|
|
+ } else {
|
|
|
+ if(thread.failed)
|
|
|
+ fail("Did not receive expected message");
|
|
|
}
|
|
|
}
|
|
|
|