|
|
@@ -18,6 +18,8 @@
|
|
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
+import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
|
|
|
+
|
|
|
import java.io.BufferedInputStream;
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
@@ -27,6 +29,7 @@ import java.net.InetSocketAddress;
|
|
|
import java.net.ServerSocket;
|
|
|
import java.net.Socket;
|
|
|
import java.net.SocketException;
|
|
|
+import java.net.SocketTimeoutException;
|
|
|
import java.nio.BufferUnderflowException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
|
@@ -34,7 +37,6 @@ import java.util.Collections;
|
|
|
import java.util.Enumeration;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
-import java.util.NoSuchElementException;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
@@ -42,12 +44,12 @@ import java.util.concurrent.SynchronousQueue;
|
|
|
import java.util.concurrent.ThreadFactory;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.NoSuchElementException;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
-
|
|
|
import javax.net.ssl.SSLSocket;
|
|
|
-
|
|
|
import org.apache.zookeeper.common.X509Exception;
|
|
|
+import org.apache.zookeeper.server.ExitCode;
|
|
|
import org.apache.zookeeper.server.ZooKeeperThread;
|
|
|
import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
|
|
|
import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
|
|
|
@@ -60,11 +62,11 @@ import org.slf4j.LoggerFactory;
|
|
|
* maintains one connection for every pair of servers. The tricky part is to
|
|
|
* guarantee that there is exactly one connection for every pair of servers that
|
|
|
* are operating correctly and that can communicate over the network.
|
|
|
- *
|
|
|
+ *
|
|
|
* If two servers try to start a connection concurrently, then the connection
|
|
|
* manager uses a very simple tie-breaking mechanism to decide which connection
|
|
|
- * to drop based on the IP addressed of the two parties.
|
|
|
- *
|
|
|
+ * to drop based on the IP addressed of the two parties.
|
|
|
+ *
|
|
|
* For every peer, the manager maintains a queue of messages to send. If the
|
|
|
* connection to any particular peer drops, then the sender thread puts the
|
|
|
* message back on the list. As this implementation currently uses a queue
|
|
|
@@ -72,7 +74,7 @@ import org.slf4j.LoggerFactory;
|
|
|
* message to the tail of the queue, thus changing the order of messages.
|
|
|
* Although this is not a problem for the leader election, it could be a problem
|
|
|
* when consolidating peer communication. This is to be verified, though.
|
|
|
- *
|
|
|
+ *
|
|
|
*/
|
|
|
|
|
|
public class QuorumCnxManager {
|
|
|
@@ -87,7 +89,7 @@ public class QuorumCnxManager {
|
|
|
static final int SEND_CAPACITY = 1;
|
|
|
|
|
|
static final int PACKETMAXSIZE = 1024 * 512;
|
|
|
-
|
|
|
+
|
|
|
/*
|
|
|
* Negative counter for observer server ids.
|
|
|
*/
|
|
|
@@ -105,9 +107,9 @@ public class QuorumCnxManager {
|
|
|
static public final int maxBuffer = 2048;
|
|
|
|
|
|
/*
|
|
|
- * Connection time out value in milliseconds
|
|
|
+ * Connection time out value in milliseconds
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
private int cnxTO = 5000;
|
|
|
|
|
|
final QuorumPeer self;
|
|
|
@@ -256,7 +258,7 @@ public class QuorumCnxManager {
|
|
|
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
|
|
|
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
|
|
|
this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
|
|
|
-
|
|
|
+
|
|
|
String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
|
|
|
if(cnxToValue != null){
|
|
|
this.cnxTO = Integer.parseInt(cnxToValue);
|
|
|
@@ -314,7 +316,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Invokes initiateConnection for testing purposes
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sid
|
|
|
*/
|
|
|
public void testInitiateConnection(long sid) throws Exception {
|
|
|
@@ -590,7 +592,7 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Processes invoke this message to queue a message to send. Currently,
|
|
|
+ * Processes invoke this message to queue a message to send. Currently,
|
|
|
* only leader election uses it.
|
|
|
*/
|
|
|
public void toSend(Long sid, ByteBuffer b) {
|
|
|
@@ -616,13 +618,13 @@ public class QuorumCnxManager {
|
|
|
addToSendQueue(bq, b);
|
|
|
}
|
|
|
connectOne(sid);
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Try to establish a connection to server with id sid using its electionAddr.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sid server id
|
|
|
* @return boolean success indication
|
|
|
*/
|
|
|
@@ -681,10 +683,10 @@ public class QuorumCnxManager {
|
|
|
return false;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Try to establish a connection to server with id sid.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sid server id
|
|
|
*/
|
|
|
synchronized void connectOne(long sid){
|
|
|
@@ -718,22 +720,22 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Try to establish a connection with each server if one
|
|
|
* doesn't exist.
|
|
|
*/
|
|
|
-
|
|
|
+
|
|
|
public void connectAll(){
|
|
|
long sid;
|
|
|
for(Enumeration<Long> en = queueSendMap.keys();
|
|
|
en.hasMoreElements();){
|
|
|
sid = en.nextElement();
|
|
|
connectOne(sid);
|
|
|
- }
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
|
|
|
/**
|
|
|
* Check if all queues are empty, indicating that all messages have been delivered.
|
|
|
@@ -756,7 +758,7 @@ public class QuorumCnxManager {
|
|
|
shutdown = true;
|
|
|
LOG.debug("Halting listener");
|
|
|
listener.halt();
|
|
|
-
|
|
|
+
|
|
|
// Wait for the listener to terminate.
|
|
|
try {
|
|
|
listener.join();
|
|
|
@@ -772,7 +774,7 @@ public class QuorumCnxManager {
|
|
|
inprogressConnections.clear();
|
|
|
resetConnectionThreadCount();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* A soft halt simply finishes workers.
|
|
|
*/
|
|
|
@@ -785,7 +787,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Helper method to set socket options.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sock
|
|
|
* Reference to socket
|
|
|
*/
|
|
|
@@ -797,7 +799,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Helper method to close a socket.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sock
|
|
|
* Reference to socket
|
|
|
*/
|
|
|
@@ -839,12 +841,39 @@ public class QuorumCnxManager {
|
|
|
*/
|
|
|
public class Listener extends ZooKeeperThread {
|
|
|
|
|
|
+ private static final String ELECTION_PORT_BIND_RETRY = "zookeeper.electionPortBindRetry";
|
|
|
+ private static final int DEFAULT_PORT_BIND_MAX_RETRY = 3;
|
|
|
+
|
|
|
+ private final int portBindMaxRetry;
|
|
|
+ private Runnable socketBindErrorHandler = () -> System.exit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
|
|
|
volatile ServerSocket ss = null;
|
|
|
|
|
|
public Listener() {
|
|
|
// During startup of thread, thread name will be overridden to
|
|
|
// specific election address
|
|
|
super("ListenerThread");
|
|
|
+
|
|
|
+ // maximum retry count while trying to bind to election port
|
|
|
+ // see ZOOKEEPER-3320 for more details
|
|
|
+ final Integer maxRetry = Integer.getInteger(ELECTION_PORT_BIND_RETRY,
|
|
|
+ DEFAULT_PORT_BIND_MAX_RETRY);
|
|
|
+ if (maxRetry >= 0) {
|
|
|
+ LOG.info("Election port bind maximum retries is {}",
|
|
|
+ maxRetry == 0 ? "infinite" : maxRetry);
|
|
|
+ portBindMaxRetry = maxRetry;
|
|
|
+ } else {
|
|
|
+ LOG.info("'{}' contains invalid value: {}(must be >= 0). "
|
|
|
+ + "Use default value of {} instead.",
|
|
|
+ ELECTION_PORT_BIND_RETRY, maxRetry, DEFAULT_PORT_BIND_MAX_RETRY);
|
|
|
+ portBindMaxRetry = DEFAULT_PORT_BIND_MAX_RETRY;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Change socket bind error handler. Used for testing.
|
|
|
+ */
|
|
|
+ public void setSocketBindErrorHandler(Runnable errorHandler) {
|
|
|
+ this.socketBindErrorHandler = errorHandler;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -855,7 +884,8 @@ public class QuorumCnxManager {
|
|
|
int numRetries = 0;
|
|
|
InetSocketAddress addr;
|
|
|
Socket client = null;
|
|
|
- while((!shutdown) && (numRetries < 3)){
|
|
|
+ Exception exitException = null;
|
|
|
+ while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
|
|
|
try {
|
|
|
if (self.shouldUsePortUnification()) {
|
|
|
LOG.info("Creating TLS-enabled quorum server socket");
|
|
|
@@ -882,28 +912,34 @@ public class QuorumCnxManager {
|
|
|
setName(addr.toString());
|
|
|
ss.bind(addr);
|
|
|
while (!shutdown) {
|
|
|
- client = ss.accept();
|
|
|
-
|
|
|
- setSockOpts(client);
|
|
|
- LOG.info("Received connection request "
|
|
|
- + client.getRemoteSocketAddress());
|
|
|
- // Receive and handle the connection request
|
|
|
- // asynchronously if the quorum sasl authentication is
|
|
|
- // enabled. This is required because sasl server
|
|
|
- // authentication process may take few seconds to finish,
|
|
|
- // this may delay next peer connection requests.
|
|
|
- if (quorumSaslAuthEnabled) {
|
|
|
- receiveConnectionAsync(client);
|
|
|
- } else {
|
|
|
- receiveConnection(client);
|
|
|
+ try {
|
|
|
+ client = ss.accept();
|
|
|
+ setSockOpts(client);
|
|
|
+ LOG.info("Received connection request "
|
|
|
+ + formatInetAddr((InetSocketAddress)client.getRemoteSocketAddress()));
|
|
|
+ // Receive and handle the connection request
|
|
|
+ // asynchronously if the quorum sasl authentication is
|
|
|
+ // enabled. This is required because sasl server
|
|
|
+ // authentication process may take few seconds to finish,
|
|
|
+ // this may delay next peer connection requests.
|
|
|
+ if (quorumSaslAuthEnabled) {
|
|
|
+ receiveConnectionAsync(client);
|
|
|
+ } else {
|
|
|
+ receiveConnection(client);
|
|
|
+ }
|
|
|
+ numRetries = 0;
|
|
|
+ } catch (SocketTimeoutException e) {
|
|
|
+ LOG.warn("The socket is listening for the election accepted "
|
|
|
+ + "and it timed out unexpectedly, but will retry."
|
|
|
+ + "see ZOOKEEPER-2836");
|
|
|
}
|
|
|
- numRetries = 0;
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
if (shutdown) {
|
|
|
break;
|
|
|
}
|
|
|
LOG.error("Exception while listening", e);
|
|
|
+ exitException = e;
|
|
|
numRetries++;
|
|
|
try {
|
|
|
ss.close();
|
|
|
@@ -919,10 +955,19 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
LOG.info("Leaving listener");
|
|
|
if (!shutdown) {
|
|
|
- LOG.error("As I'm leaving the listener thread, "
|
|
|
- + "I won't be able to participate in leader "
|
|
|
- + "election any longer: "
|
|
|
- + self.getElectionAddress());
|
|
|
+ LOG.error("As I'm leaving the listener thread after "
|
|
|
+ + numRetries + " errors. "
|
|
|
+ + "I won't be able to participate in leader "
|
|
|
+ + "election any longer: "
|
|
|
+ + formatInetAddr(self.getElectionAddress())
|
|
|
+ + ". Use " + ELECTION_PORT_BIND_RETRY + " property to "
|
|
|
+ + "increase retry count.");
|
|
|
+ if (exitException instanceof SocketException) {
|
|
|
+ // After leaving listener thread, the host cannot join the
|
|
|
+ // quorum anymore, this is a severe error that we cannot
|
|
|
+ // recover from, so we need to exit
|
|
|
+ socketBindErrorHandler.run();
|
|
|
+ }
|
|
|
} else if (ss != null) {
|
|
|
// Clean up for shutdown.
|
|
|
try {
|
|
|
@@ -966,7 +1011,7 @@ public class QuorumCnxManager {
|
|
|
/**
|
|
|
* An instance of this thread receives messages to send
|
|
|
* through a queue and sends them to the server sid.
|
|
|
- *
|
|
|
+ *
|
|
|
* @param sock
|
|
|
* Socket to remote peer
|
|
|
* @param sid
|
|
|
@@ -993,23 +1038,23 @@ public class QuorumCnxManager {
|
|
|
|
|
|
/**
|
|
|
* Returns RecvWorker that pairs up with this SendWorker.
|
|
|
- *
|
|
|
- * @return RecvWorker
|
|
|
+ *
|
|
|
+ * @return RecvWorker
|
|
|
*/
|
|
|
synchronized RecvWorker getRecvWorker(){
|
|
|
return recvWorker;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized boolean finish() {
|
|
|
LOG.debug("Calling finish for " + sid);
|
|
|
-
|
|
|
+
|
|
|
if(!running){
|
|
|
/*
|
|
|
- * Avoids running finish() twice.
|
|
|
+ * Avoids running finish() twice.
|
|
|
*/
|
|
|
return running;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
running = false;
|
|
|
closeSocket(sock);
|
|
|
|
|
|
@@ -1024,7 +1069,7 @@ public class QuorumCnxManager {
|
|
|
threadCnt.decrementAndGet();
|
|
|
return running;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
synchronized void send(ByteBuffer b) throws IOException {
|
|
|
byte[] msgBytes = new byte[b.capacity()];
|
|
|
try {
|
|
|
@@ -1068,7 +1113,7 @@ public class QuorumCnxManager {
|
|
|
LOG.error("Failed to send last message. Shutting down thread.", e);
|
|
|
this.finish();
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
try {
|
|
|
while (running && !shutdown && sock != null) {
|
|
|
|
|
|
@@ -1129,20 +1174,20 @@ public class QuorumCnxManager {
|
|
|
running = false;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Shuts down this worker
|
|
|
- *
|
|
|
+ *
|
|
|
* @return boolean Value of variable running
|
|
|
*/
|
|
|
synchronized boolean finish() {
|
|
|
if(!running){
|
|
|
/*
|
|
|
- * Avoids running finish() twice.
|
|
|
+ * Avoids running finish() twice.
|
|
|
*/
|
|
|
return running;
|
|
|
}
|
|
|
- running = false;
|
|
|
+ running = false;
|
|
|
|
|
|
this.interrupt();
|
|
|
threadCnt.decrementAndGet();
|