|
@@ -23,6 +23,7 @@ import java.io.BufferedOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
|
import java.io.IOException;
|
|
|
+import java.net.BindException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.net.ServerSocket;
|
|
|
import java.net.Socket;
|
|
@@ -58,11 +59,11 @@ import org.slf4j.LoggerFactory;
|
|
|
* maintains one connection for every pair of servers. The tricky part is to
|
|
|
* guarantee that there is exactly one connection for every pair of servers that
|
|
|
* are operating correctly and that can communicate over the network.
|
|
|
- *
|
|
|
+ *
|
|
|
* If two servers try to start a connection concurrently, then the connection
|
|
|
* manager uses a very simple tie-breaking mechanism to decide which connection
|
|
|
- * to drop based on the IP addressed of the two parties.
|
|
|
- *
|
|
|
+ * to drop based on the IP addressed of the two parties.
|
|
|
+ *
|
|
|
* For every peer, the manager maintains a queue of messages to send. If the
|
|
|
* connection to any particular peer drops, then the sender thread puts the
|
|
|
* message back on the list. As this implementation currently uses a queue
|
|
@@ -70,7 +71,7 @@ import org.slf4j.LoggerFactory;
|
|
|
* message to the tail of the queue, thus changing the order of messages.
|
|
|
* Although this is not a problem for the leader election, it could be a problem
|
|
|
* when consolidating peer communication. This is to be verified, though.
|
|
|
- *
|
|
|
+ *
|
|
|
*/
|
|
|
|
|
|
public class QuorumCnxManager {
|
|
@@ -85,7 +86,7 @@ public class QuorumCnxManager {
|
|
|
static final int SEND_CAPACITY = 1;
|
|
|
|
|
|
static final int PACKETMAXSIZE = 1024 * 512;
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Negative counter for observer server ids.
|
|
|
*/
|
|
@@ -103,9 +104,9 @@ public class QuorumCnxManager {
|
|
|
static public final int maxBuffer = 2048;
|
|
|
|
|
|
/*
|
|
|
- * Connection time out value in milliseconds
|
|
|
+ * Connection time out value in milliseconds
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
private int cnxTO = 5000;
|
|
|
|
|
|
final QuorumPeer self;
|
|
@@ -255,12 +256,12 @@ public class QuorumCnxManager {
|
|
|
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
|
|
|
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
|
|
|
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
|
|
|
-
|
|
|
+
|
|
|
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
|
|
|
if(cnxToValue != null){
|
|
|
this.cnxTO = Integer.parseInt(cnxToValue);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
this.self = self;
|
|
|
|
|
|
this.mySid = mySid;
|
|
@@ -313,7 +314,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Invokes initiateConnection for testing purposes
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sid
|
|
|
*/
|
|
|
public void testInitiateConnection(long sid) throws Exception {
|
|
@@ -436,24 +437,24 @@ public class QuorumCnxManager {
|
|
|
sw.setRecv(rw);
|
|
|
|
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
|
-
|
|
|
+
|
|
|
if(vsw != null)
|
|
|
vsw.finish();
|
|
|
-
|
|
|
+
|
|
|
senderWorkerMap.put(sid, sw);
|
|
|
queueSendMap.putIfAbsent(sid, new ArrayBlockingQueue<ByteBuffer>(
|
|
|
SEND_CAPACITY));
|
|
|
-
|
|
|
+
|
|
|
sw.start();
|
|
|
rw.start();
|
|
|
-
|
|
|
- return true;
|
|
|
-
|
|
|
+
|
|
|
+ return true;
|
|
|
+
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* If this server receives a connection request, then it gives up on the new
|
|
|
* connection if it wins. Notice that it checks whether it has a connection
|
|
@@ -575,7 +576,7 @@ public class QuorumCnxManager {
|
|
|
sw.setRecv(rw);
|
|
|
|
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
|
-
|
|
|
+
|
|
|
if (vsw != null) {
|
|
|
vsw.finish();
|
|
|
}
|
|
@@ -584,14 +585,14 @@ public class QuorumCnxManager {
|
|
|
|
|
|
queueSendMap.putIfAbsent(sid,
|
|
|
new ArrayBlockingQueue<ByteBuffer>(SEND_CAPACITY));
|
|
|
-
|
|
|
+
|
|
|
sw.start();
|
|
|
rw.start();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Processes invoke this message to queue a message to send. Currently,
|
|
|
+ * Processes invoke this message to queue a message to send. Currently,
|
|
|
* only leader election uses it.
|
|
|
*/
|
|
|
public void toSend(Long sid, ByteBuffer b) {
|
|
@@ -617,13 +618,13 @@ public class QuorumCnxManager {
|
|
|
addToSendQueue(bq, b);
|
|
|
}
|
|
|
connectOne(sid);
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Try to establish a connection to server with id sid using its electionAddr.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sid server id
|
|
|
* @return boolean success indication
|
|
|
*/
|
|
@@ -666,12 +667,12 @@ public class QuorumCnxManager {
|
|
|
closeSocket(sock);
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Try to establish a connection to server with id sid.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sid server id
|
|
|
*/
|
|
|
synchronized void connectOne(long sid){
|
|
@@ -705,22 +706,22 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Try to establish a connection with each server if one
|
|
|
* doesn't exist.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
public void connectAll(){
|
|
|
long sid;
|
|
|
for(Enumeration<Long> en = queueSendMap.keys();
|
|
|
en.hasMoreElements();){
|
|
|
sid = en.nextElement();
|
|
|
connectOne(sid);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Check if all queues are empty, indicating that all messages have been delivered.
|
|
@@ -743,7 +744,7 @@ public class QuorumCnxManager {
|
|
|
shutdown = true;
|
|
|
LOG.debug("Halting listener");
|
|
|
listener.halt();
|
|
|
-
|
|
|
+
|
|
|
// Wait for the listener to terminate.
|
|
|
try {
|
|
|
listener.join();
|
|
@@ -759,7 +760,7 @@ public class QuorumCnxManager {
|
|
|
inprogressConnections.clear();
|
|
|
resetConnectionThreadCount();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* A soft halt simply finishes workers.
|
|
|
*/
|
|
@@ -772,7 +773,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Helper method to set socket options.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sock
|
|
|
* Reference to socket
|
|
|
*/
|
|
@@ -784,7 +785,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Helper method to close a socket.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sock
|
|
|
* Reference to socket
|
|
|
*/
|
|
@@ -842,6 +843,7 @@ public class QuorumCnxManager {
|
|
|
int numRetries = 0;
|
|
|
InetSocketAddress addr;
|
|
|
Socket client = null;
|
|
|
+ IOException exitException = null;
|
|
|
while((!shutdown) && (numRetries < 3)){
|
|
|
try {
|
|
|
ss = new ServerSocket();
|
|
@@ -886,6 +888,7 @@ public class QuorumCnxManager {
|
|
|
break;
|
|
|
}
|
|
|
LOG.error("Exception while listening", e);
|
|
|
+ exitException = e;
|
|
|
numRetries++;
|
|
|
try {
|
|
|
ss.close();
|
|
@@ -905,6 +908,12 @@ public class QuorumCnxManager {
|
|
|
+ "I won't be able to participate in leader "
|
|
|
+ "election any longer: "
|
|
|
+ self.getElectionAddress());
|
|
|
+ if (exitException instanceof BindException) {
|
|
|
+ // After leaving listener thread, the host cannot join the
|
|
|
+ // quorum anymore, this is a severe error that we cannot
|
|
|
+ // recover from, so we need to exit
|
|
|
+ System.exit(14);
|
|
|
+ }
|
|
|
} else if (ss != null) {
|
|
|
// Clean up for shutdown.
|
|
|
try {
|
|
@@ -948,7 +957,7 @@ public class QuorumCnxManager {
|
|
|
/**
|
|
|
* An instance of this thread receives messages to send
|
|
|
* through a queue and sends them to the server sid.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sock
|
|
|
* Socket to remote peer
|
|
|
* @param sid
|
|
@@ -975,23 +984,23 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Returns RecvWorker that pairs up with this SendWorker.
|
|
|
- *
|
|
|
- * @return RecvWorker
|
|
|
+ *
|
|
|
+ * @return RecvWorker
|
|
|
*/
|
|
|
synchronized RecvWorker getRecvWorker(){
|
|
|
return recvWorker;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized boolean finish() {
|
|
|
LOG.debug("Calling finish for " + sid);
|
|
|
-
|
|
|
+
|
|
|
if(!running){
|
|
|
/*
|
|
|
- * Avoids running finish() twice.
|
|
|
+ * Avoids running finish() twice.
|
|
|
*/
|
|
|
return running;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
running = false;
|
|
|
closeSocket(sock);
|
|
|
|
|
@@ -1006,7 +1015,7 @@ public class QuorumCnxManager {
|
|
|
threadCnt.decrementAndGet();
|
|
|
return running;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized void send(ByteBuffer b) throws IOException {
|
|
|
byte[] msgBytes = new byte[b.capacity()];
|
|
|
try {
|
|
@@ -1050,7 +1059,7 @@ public class QuorumCnxManager {
|
|
|
LOG.error("Failed to send last message. Shutting down thread.", e);
|
|
|
this.finish();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
while (running && !shutdown && sock != null) {
|
|
|
|
|
@@ -1111,20 +1120,20 @@ public class QuorumCnxManager {
|
|
|
running = false;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Shuts down this worker
|
|
|
- *
|
|
|
+ *
|
|
|
* @return boolean Value of variable running
|
|
|
*/
|
|
|
synchronized boolean finish() {
|
|
|
if(!running){
|
|
|
/*
|
|
|
- * Avoids running finish() twice.
|
|
|
+ * Avoids running finish() twice.
|
|
|
*/
|
|
|
return running;
|
|
|
}
|
|
|
- running = false;
|
|
|
+ running = false;
|
|
|
|
|
|
this.interrupt();
|
|
|
threadCnt.decrementAndGet();
|