|
@@ -21,10 +21,13 @@ package org.apache.zookeeper.server.quorum;
|
|
import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
|
|
import static org.apache.zookeeper.common.NetUtils.formatInetAddr;
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedInputStream;
|
|
import java.io.BufferedOutputStream;
|
|
import java.io.BufferedOutputStream;
|
|
|
|
+import java.io.Closeable;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
|
|
+import java.net.InetAddress;
|
|
import java.net.InetSocketAddress;
|
|
import java.net.InetSocketAddress;
|
|
|
|
+import java.net.NoRouteToHostException;
|
|
import java.net.ServerSocket;
|
|
import java.net.ServerSocket;
|
|
import java.net.Socket;
|
|
import java.net.Socket;
|
|
import java.net.SocketException;
|
|
import java.net.SocketException;
|
|
@@ -32,20 +35,28 @@ import java.net.SocketTimeoutException;
|
|
import java.nio.BufferUnderflowException;
|
|
import java.nio.BufferUnderflowException;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.ByteBuffer;
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.Enumeration;
|
|
import java.util.Enumeration;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
|
|
+import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
import java.util.Set;
|
|
import java.util.Set;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.BlockingQueue;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
+import java.util.concurrent.Executors;
|
|
import java.util.concurrent.SynchronousQueue;
|
|
import java.util.concurrent.SynchronousQueue;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadFactory;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
+import java.util.stream.Collectors;
|
|
import javax.net.ssl.SSLSocket;
|
|
import javax.net.ssl.SSLSocket;
|
|
|
|
+import org.apache.zookeeper.common.NetUtils;
|
|
import org.apache.zookeeper.common.X509Exception;
|
|
import org.apache.zookeeper.common.X509Exception;
|
|
import org.apache.zookeeper.server.ExitCode;
|
|
import org.apache.zookeeper.server.ExitCode;
|
|
import org.apache.zookeeper.server.ZooKeeperThread;
|
|
import org.apache.zookeeper.server.ZooKeeperThread;
|
|
@@ -59,6 +70,7 @@ import org.apache.zookeeper.util.ServiceUtils;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* This class implements a connection manager for leader election using TCP. It
|
|
* This class implements a connection manager for leader election using TCP. It
|
|
* maintains one connection for every pair of servers. The tricky part is to
|
|
* maintains one connection for every pair of servers. The tricky part is to
|
|
@@ -102,7 +114,7 @@ public class QuorumCnxManager {
|
|
/*
|
|
/*
|
|
* Protocol identifier used among peers
|
|
* Protocol identifier used among peers
|
|
*/
|
|
*/
|
|
- public static final long PROTOCOL_VERSION = -65536L;
|
|
|
|
|
|
+ public static final long PROTOCOL_VERSION = -65535L;
|
|
|
|
|
|
/*
|
|
/*
|
|
* Max buffer size to be read from the network.
|
|
* Max buffer size to be read from the network.
|
|
@@ -125,7 +137,7 @@ public class QuorumCnxManager {
|
|
final Map<Long, QuorumPeer.QuorumServer> view;
|
|
final Map<Long, QuorumPeer.QuorumServer> view;
|
|
final boolean listenOnAllIPs;
|
|
final boolean listenOnAllIPs;
|
|
private ThreadPoolExecutor connectionExecutor;
|
|
private ThreadPoolExecutor connectionExecutor;
|
|
- private final Set<Long> inprogressConnections = Collections.synchronizedSet(new HashSet<Long>());
|
|
|
|
|
|
+ private final Set<Long> inprogressConnections = Collections.synchronizedSet(new HashSet<>());
|
|
private QuorumAuthServer authServer;
|
|
private QuorumAuthServer authServer;
|
|
private QuorumAuthLearner authLearner;
|
|
private QuorumAuthLearner authLearner;
|
|
private boolean quorumSaslAuthEnabled;
|
|
private boolean quorumSaslAuthEnabled;
|
|
@@ -186,11 +198,11 @@ public class QuorumCnxManager {
|
|
public static class InitialMessage {
|
|
public static class InitialMessage {
|
|
|
|
|
|
public Long sid;
|
|
public Long sid;
|
|
- public InetSocketAddress electionAddr;
|
|
|
|
|
|
+ public List<InetSocketAddress> electionAddr;
|
|
|
|
|
|
- InitialMessage(Long sid, InetSocketAddress address) {
|
|
|
|
|
|
+ InitialMessage(Long sid, List<InetSocketAddress> addresses) {
|
|
this.sid = sid;
|
|
this.sid = sid;
|
|
- this.electionAddr = address;
|
|
|
|
|
|
+ this.electionAddr = addresses;
|
|
}
|
|
}
|
|
|
|
|
|
@SuppressWarnings("serial")
|
|
@SuppressWarnings("serial")
|
|
@@ -223,33 +235,41 @@ public class QuorumCnxManager {
|
|
throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", num_read, remaining, sid);
|
|
throw new InitialMessageException("Read only %s bytes out of %s sent by server %s", num_read, remaining, sid);
|
|
}
|
|
}
|
|
|
|
|
|
- String addr = new String(b);
|
|
|
|
- String[] host_port;
|
|
|
|
- try {
|
|
|
|
- host_port = ConfigUtils.getHostAndPort(addr);
|
|
|
|
- } catch (ConfigException e) {
|
|
|
|
- throw new InitialMessageException("Badly formed address: %s", addr);
|
|
|
|
- }
|
|
|
|
|
|
+ String[] addressStrings = new String(b).split("\\|");
|
|
|
|
+ List<InetSocketAddress> addresses = new ArrayList<>(addressStrings.length);
|
|
|
|
+ for (String addr : addressStrings) {
|
|
|
|
|
|
- if (host_port.length != 2) {
|
|
|
|
- throw new InitialMessageException("Badly formed address: %s", addr);
|
|
|
|
- }
|
|
|
|
|
|
+ String[] host_port;
|
|
|
|
+ try {
|
|
|
|
+ host_port = ConfigUtils.getHostAndPort(addr);
|
|
|
|
+ } catch (ConfigException e) {
|
|
|
|
+ throw new InitialMessageException("Badly formed address: %s", addr);
|
|
|
|
+ }
|
|
|
|
|
|
- int port;
|
|
|
|
- try {
|
|
|
|
- port = Integer.parseInt(host_port[1]);
|
|
|
|
- } catch (NumberFormatException e) {
|
|
|
|
- throw new InitialMessageException("Bad port number: %s", host_port[1]);
|
|
|
|
- } catch (ArrayIndexOutOfBoundsException e) {
|
|
|
|
- throw new InitialMessageException("No port number in: %s", addr);
|
|
|
|
|
|
+ if (host_port.length != 2) {
|
|
|
|
+ throw new InitialMessageException("Badly formed address: %s", addr);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ int port;
|
|
|
|
+ try {
|
|
|
|
+ port = Integer.parseInt(host_port[1]);
|
|
|
|
+ } catch (NumberFormatException e) {
|
|
|
|
+ throw new InitialMessageException("Bad port number: %s", host_port[1]);
|
|
|
|
+ } catch (ArrayIndexOutOfBoundsException e) {
|
|
|
|
+ throw new InitialMessageException("No port number in: %s", addr);
|
|
|
|
+ }
|
|
|
|
+ addresses.add(new InetSocketAddress(host_port[0], port));
|
|
}
|
|
}
|
|
|
|
|
|
- return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
|
|
|
|
|
|
+ return new InitialMessage(sid, addresses);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
- public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view, QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs, int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
|
|
|
|
|
|
+ public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view,
|
|
|
|
+ QuorumAuthServer authServer, QuorumAuthLearner authLearner, int socketTimeout, boolean listenOnAllIPs,
|
|
|
|
+ int quorumCnxnThreadsSize, boolean quorumSaslAuthEnabled) {
|
|
|
|
+
|
|
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
|
|
this.recvQueue = new CircularBlockingQueue<>(RECV_CAPACITY);
|
|
this.queueSendMap = new ConcurrentHashMap<>();
|
|
this.queueSendMap = new ConcurrentHashMap<>();
|
|
this.senderWorkerMap = new ConcurrentHashMap<>();
|
|
this.senderWorkerMap = new ConcurrentHashMap<>();
|
|
@@ -274,7 +294,9 @@ public class QuorumCnxManager {
|
|
listener.setName("QuorumPeerListener");
|
|
listener.setName("QuorumPeerListener");
|
|
}
|
|
}
|
|
|
|
|
|
- private void initializeAuth(final long mySid, final QuorumAuthServer authServer, final QuorumAuthLearner authLearner, final int quorumCnxnThreadsSize, final boolean quorumSaslAuthEnabled) {
|
|
|
|
|
|
+ private void initializeAuth(final long mySid, final QuorumAuthServer authServer,
|
|
|
|
+ final QuorumAuthLearner authLearner, final int quorumCnxnThreadsSize, final boolean quorumSaslAuthEnabled) {
|
|
|
|
+
|
|
this.authServer = authServer;
|
|
this.authServer = authServer;
|
|
this.authLearner = authLearner;
|
|
this.authLearner = authLearner;
|
|
this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
|
|
this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
|
|
@@ -311,7 +333,8 @@ public class QuorumCnxManager {
|
|
LOG.debug("Opening channel to server {}", sid);
|
|
LOG.debug("Opening channel to server {}", sid);
|
|
Socket sock = new Socket();
|
|
Socket sock = new Socket();
|
|
setSockOpts(sock);
|
|
setSockOpts(sock);
|
|
- sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
|
|
|
|
|
|
+ InetSocketAddress address = self.getVotingView().get(sid).electionAddr.getReachableOrOne();
|
|
|
|
+ sock.connect(address, cnxTO);
|
|
initiateConnection(sock, sid);
|
|
initiateConnection(sock, sid);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -324,10 +347,10 @@ public class QuorumCnxManager {
|
|
startConnection(sock, sid);
|
|
startConnection(sock, sid);
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.error(
|
|
LOG.error(
|
|
- "Exception while connecting, id: {}, addr: {}, closing learner connection",
|
|
|
|
- sid,
|
|
|
|
- sock.getRemoteSocketAddress(),
|
|
|
|
- e);
|
|
|
|
|
|
+ "Exception while connecting, id: {}, addr: {}, closing learner connection",
|
|
|
|
+ sid,
|
|
|
|
+ sock.getRemoteSocketAddress(),
|
|
|
|
+ e);
|
|
closeSocket(sock);
|
|
closeSocket(sock);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -395,7 +418,8 @@ public class QuorumCnxManager {
|
|
// represents protocol version (in other words - message type)
|
|
// represents protocol version (in other words - message type)
|
|
dout.writeLong(PROTOCOL_VERSION);
|
|
dout.writeLong(PROTOCOL_VERSION);
|
|
dout.writeLong(self.getId());
|
|
dout.writeLong(self.getId());
|
|
- String addr = formatInetAddr(self.getElectionAddress());
|
|
|
|
|
|
+ String addr = self.getElectionAddress().getAllAddresses().stream()
|
|
|
|
+ .map(NetUtils::formatInetAddr).collect(Collectors.joining("|"));
|
|
byte[] addr_bytes = addr.getBytes();
|
|
byte[] addr_bytes = addr.getBytes();
|
|
dout.writeInt(addr_bytes.length);
|
|
dout.writeInt(addr_bytes.length);
|
|
dout.write(addr_bytes);
|
|
dout.write(addr_bytes);
|
|
@@ -500,7 +524,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
|
|
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
|
|
Long sid = null, protocolVersion = null;
|
|
Long sid = null, protocolVersion = null;
|
|
- InetSocketAddress electionAddr = null;
|
|
|
|
|
|
+ MultipleAddresses electionAddr = null;
|
|
|
|
|
|
try {
|
|
try {
|
|
protocolVersion = din.readLong();
|
|
protocolVersion = din.readLong();
|
|
@@ -510,7 +534,7 @@ public class QuorumCnxManager {
|
|
try {
|
|
try {
|
|
InitialMessage init = InitialMessage.parse(protocolVersion, din);
|
|
InitialMessage init = InitialMessage.parse(protocolVersion, din);
|
|
sid = init.sid;
|
|
sid = init.sid;
|
|
- electionAddr = init.electionAddr;
|
|
|
|
|
|
+ electionAddr = new MultipleAddresses(init.electionAddr);
|
|
} catch (InitialMessage.InitialMessageException ex) {
|
|
} catch (InitialMessage.InitialMessageException ex) {
|
|
LOG.error(ex.toString());
|
|
LOG.error(ex.toString());
|
|
closeSocket(sock);
|
|
closeSocket(sock);
|
|
@@ -610,9 +634,13 @@ public class QuorumCnxManager {
|
|
* @param sid server id
|
|
* @param sid server id
|
|
* @return boolean success indication
|
|
* @return boolean success indication
|
|
*/
|
|
*/
|
|
- synchronized boolean connectOne(long sid, InetSocketAddress electionAddr) {
|
|
|
|
|
|
+ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
|
|
if (senderWorkerMap.get(sid) != null) {
|
|
if (senderWorkerMap.get(sid) != null) {
|
|
LOG.debug("There is a connection already for server {}", sid);
|
|
LOG.debug("There is a connection already for server {}", sid);
|
|
|
|
+ // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
|
|
|
|
+ // one we are using is already dead and we need to clean-up, so when we will create a new connection
|
|
|
|
+ // then we will choose an other one, which is actually reachable
|
|
|
|
+ senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -625,7 +653,7 @@ public class QuorumCnxManager {
|
|
sock = new Socket();
|
|
sock = new Socket();
|
|
}
|
|
}
|
|
setSockOpts(sock);
|
|
setSockOpts(sock);
|
|
- sock.connect(electionAddr, cnxTO);
|
|
|
|
|
|
+ sock.connect(electionAddr.getReachableAddress(), cnxTO);
|
|
if (sock instanceof SSLSocket) {
|
|
if (sock instanceof SSLSocket) {
|
|
SSLSocket sslSock = (SSLSocket) sock;
|
|
SSLSocket sslSock = (SSLSocket) sock;
|
|
sslSock.startHandshake();
|
|
sslSock.startHandshake();
|
|
@@ -635,7 +663,8 @@ public class QuorumCnxManager {
|
|
sslSock.getSession().getCipherSuite());
|
|
sslSock.getSession().getCipherSuite());
|
|
}
|
|
}
|
|
|
|
|
|
- LOG.debug("Connected to server {}", sid);
|
|
|
|
|
|
+ LOG.debug("Connected to server {} using election address: {}:{}",
|
|
|
|
+ sid, sock.getInetAddress(), sock.getPort());
|
|
// Sends connection request asynchronously if the quorum
|
|
// Sends connection request asynchronously if the quorum
|
|
// sasl authentication is enabled. This is required because
|
|
// sasl authentication is enabled. This is required because
|
|
// sasl server authentication process may take few seconds to
|
|
// sasl server authentication process may take few seconds to
|
|
@@ -658,6 +687,10 @@ public class QuorumCnxManager {
|
|
LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);
|
|
LOG.warn("Cannot open secure channel to {} at election address {}", sid, electionAddr, e);
|
|
closeSocket(sock);
|
|
closeSocket(sock);
|
|
return false;
|
|
return false;
|
|
|
|
+ } catch (NoRouteToHostException e) {
|
|
|
|
+ LOG.warn("None of the addresses ({}) are reachable for sid {}", electionAddr, sid, e);
|
|
|
|
+ closeSocket(sock);
|
|
|
|
+ return false;
|
|
} catch (IOException e) {
|
|
} catch (IOException e) {
|
|
LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);
|
|
LOG.warn("Cannot open channel to {} at election address {}", sid, electionAddr, e);
|
|
closeSocket(sock);
|
|
closeSocket(sock);
|
|
@@ -673,6 +706,10 @@ public class QuorumCnxManager {
|
|
synchronized void connectOne(long sid) {
|
|
synchronized void connectOne(long sid) {
|
|
if (senderWorkerMap.get(sid) != null) {
|
|
if (senderWorkerMap.get(sid) != null) {
|
|
LOG.debug("There is a connection already for server {}", sid);
|
|
LOG.debug("There is a connection already for server {}", sid);
|
|
|
|
+ // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
|
|
|
|
+ // one we are using is already dead and we need to clean-up, so when we will create a new connection
|
|
|
|
+ // then we will choose an other one, which is actually reachable
|
|
|
|
+ senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
synchronized (self.QV_LOCK) {
|
|
synchronized (self.QV_LOCK) {
|
|
@@ -818,7 +855,7 @@ public class QuorumCnxManager {
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Thread to listen on some port
|
|
|
|
|
|
+ * Thread to listen on some ports
|
|
*/
|
|
*/
|
|
public class Listener extends ZooKeeperThread {
|
|
public class Listener extends ZooKeeperThread {
|
|
|
|
|
|
@@ -827,25 +864,30 @@ public class QuorumCnxManager {
|
|
|
|
|
|
private final int portBindMaxRetry;
|
|
private final int portBindMaxRetry;
|
|
private Runnable socketBindErrorHandler = () -> ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
|
|
private Runnable socketBindErrorHandler = () -> ServiceUtils.requestSystemExit(ExitCode.UNABLE_TO_BIND_QUORUM_PORT.getValue());
|
|
- volatile ServerSocket ss = null;
|
|
|
|
|
|
+ private List<ListenerHandler> listenerHandlers;
|
|
|
|
+ private final AtomicBoolean socketException;
|
|
|
|
+
|
|
|
|
|
|
public Listener() {
|
|
public Listener() {
|
|
// During startup of thread, thread name will be overridden to
|
|
// During startup of thread, thread name will be overridden to
|
|
// specific election address
|
|
// specific election address
|
|
super("ListenerThread");
|
|
super("ListenerThread");
|
|
|
|
|
|
|
|
+ socketException = new AtomicBoolean(false);
|
|
|
|
+
|
|
// maximum retry count while trying to bind to election port
|
|
// maximum retry count while trying to bind to election port
|
|
// see ZOOKEEPER-3320 for more details
|
|
// see ZOOKEEPER-3320 for more details
|
|
- final Integer maxRetry = Integer.getInteger(ELECTION_PORT_BIND_RETRY, DEFAULT_PORT_BIND_MAX_RETRY);
|
|
|
|
|
|
+ final Integer maxRetry = Integer.getInteger(ELECTION_PORT_BIND_RETRY,
|
|
|
|
+ DEFAULT_PORT_BIND_MAX_RETRY);
|
|
if (maxRetry >= 0) {
|
|
if (maxRetry >= 0) {
|
|
LOG.info("Election port bind maximum retries is {}", maxRetry == 0 ? "infinite" : maxRetry);
|
|
LOG.info("Election port bind maximum retries is {}", maxRetry == 0 ? "infinite" : maxRetry);
|
|
portBindMaxRetry = maxRetry;
|
|
portBindMaxRetry = maxRetry;
|
|
} else {
|
|
} else {
|
|
LOG.info(
|
|
LOG.info(
|
|
- "'{}' contains invalid value: {}(must be >= 0). Use default value of {} instead.",
|
|
|
|
- ELECTION_PORT_BIND_RETRY,
|
|
|
|
- maxRetry,
|
|
|
|
- DEFAULT_PORT_BIND_MAX_RETRY);
|
|
|
|
|
|
+ "'{}' contains invalid value: {}(must be >= 0). Use default value of {} instead.",
|
|
|
|
+ ELECTION_PORT_BIND_RETRY,
|
|
|
|
+ maxRetry,
|
|
|
|
+ DEFAULT_PORT_BIND_MAX_RETRY);
|
|
portBindMaxRetry = DEFAULT_PORT_BIND_MAX_RETRY;
|
|
portBindMaxRetry = DEFAULT_PORT_BIND_MAX_RETRY;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -857,106 +899,54 @@ public class QuorumCnxManager {
|
|
this.socketBindErrorHandler = errorHandler;
|
|
this.socketBindErrorHandler = errorHandler;
|
|
}
|
|
}
|
|
|
|
|
|
- /**
|
|
|
|
- * Sleeps on accept().
|
|
|
|
- */
|
|
|
|
@Override
|
|
@Override
|
|
public void run() {
|
|
public void run() {
|
|
- int numRetries = 0;
|
|
|
|
- InetSocketAddress addr;
|
|
|
|
- Socket client = null;
|
|
|
|
- Exception exitException = null;
|
|
|
|
- while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
|
|
|
|
- try {
|
|
|
|
- if (self.shouldUsePortUnification()) {
|
|
|
|
- LOG.info("Creating TLS-enabled quorum server socket");
|
|
|
|
- ss = new UnifiedServerSocket(self.getX509Util(), true);
|
|
|
|
- } else if (self.isSslQuorum()) {
|
|
|
|
- LOG.info("Creating TLS-only quorum server socket");
|
|
|
|
- ss = new UnifiedServerSocket(self.getX509Util(), false);
|
|
|
|
- } else {
|
|
|
|
- ss = new ServerSocket();
|
|
|
|
- }
|
|
|
|
|
|
+ if (!shutdown) {
|
|
|
|
+ Set<InetSocketAddress> addresses;
|
|
|
|
|
|
- ss.setReuseAddress(true);
|
|
|
|
|
|
+ if (self.getQuorumListenOnAllIPs()) {
|
|
|
|
+ addresses = self.getElectionAddress().getWildcardAddresses();
|
|
|
|
+ } else {
|
|
|
|
+ addresses = self.getElectionAddress().getAllAddresses();
|
|
|
|
+ }
|
|
|
|
|
|
- if (self.getQuorumListenOnAllIPs()) {
|
|
|
|
- int port = self.getElectionAddress().getPort();
|
|
|
|
- addr = new InetSocketAddress(port);
|
|
|
|
- } else {
|
|
|
|
- // Resolve hostname for this server in case the
|
|
|
|
- // underlying ip address has changed.
|
|
|
|
- self.recreateSocketAddresses(self.getId());
|
|
|
|
- addr = self.getElectionAddress();
|
|
|
|
- }
|
|
|
|
- LOG.info("My election bind port: {}", addr.toString());
|
|
|
|
- setName(addr.toString());
|
|
|
|
- ss.bind(addr);
|
|
|
|
- while (!shutdown) {
|
|
|
|
|
|
+ CountDownLatch latch = new CountDownLatch(addresses.size());
|
|
|
|
+ listenerHandlers = addresses.stream().map(address ->
|
|
|
|
+ new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+
|
|
|
|
+ ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
|
|
|
|
+ listenerHandlers.forEach(executor::submit);
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ latch.await();
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.error("Interrupted while sleeping. Ignoring exception", ie);
|
|
|
|
+ } finally {
|
|
|
|
+ // Clean up for shutdown.
|
|
|
|
+ for (ListenerHandler handler : listenerHandlers) {
|
|
try {
|
|
try {
|
|
- client = ss.accept();
|
|
|
|
- setSockOpts(client);
|
|
|
|
- LOG.info("Received connection request {}", formatInetAddr((InetSocketAddress) client.getRemoteSocketAddress()));
|
|
|
|
- // Receive and handle the connection request
|
|
|
|
- // asynchronously if the quorum sasl authentication is
|
|
|
|
- // enabled. This is required because sasl server
|
|
|
|
- // authentication process may take few seconds to finish,
|
|
|
|
- // this may delay next peer connection requests.
|
|
|
|
- if (quorumSaslAuthEnabled) {
|
|
|
|
- receiveConnectionAsync(client);
|
|
|
|
- } else {
|
|
|
|
- receiveConnection(client);
|
|
|
|
- }
|
|
|
|
- numRetries = 0;
|
|
|
|
- } catch (SocketTimeoutException e) {
|
|
|
|
- LOG.warn(
|
|
|
|
- "The socket is listening for the election accepted "
|
|
|
|
- + "and it timed out unexpectedly, but will retry."
|
|
|
|
- + "see ZOOKEEPER-2836");
|
|
|
|
|
|
+ handler.close();
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ // Don't log an error for shutdown.
|
|
|
|
+ LOG.debug("Error closing server socket", ie);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- } catch (IOException e) {
|
|
|
|
- if (shutdown) {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- LOG.error("Exception while listening", e);
|
|
|
|
- exitException = e;
|
|
|
|
- numRetries++;
|
|
|
|
- try {
|
|
|
|
- ss.close();
|
|
|
|
- Thread.sleep(1000);
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- LOG.error("Error closing server socket", ie);
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- LOG.error("Interrupted while sleeping. Ignoring exception", ie);
|
|
|
|
- }
|
|
|
|
- closeSocket(client);
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
LOG.info("Leaving listener");
|
|
LOG.info("Leaving listener");
|
|
if (!shutdown) {
|
|
if (!shutdown) {
|
|
LOG.error(
|
|
LOG.error(
|
|
- "As I'm leaving the listener thread after {} errors. "
|
|
|
|
- + "I won't be able to participate in leader election any longer: {}."
|
|
|
|
- + "Use {} property to increase retry count.",
|
|
|
|
- numRetries,
|
|
|
|
- formatInetAddr(self.getElectionAddress()),
|
|
|
|
- ELECTION_PORT_BIND_RETRY);
|
|
|
|
-
|
|
|
|
- if (exitException instanceof SocketException) {
|
|
|
|
- // After leaving listener thread, the host cannot join the
|
|
|
|
- // quorum anymore, this is a severe error that we cannot
|
|
|
|
- // recover from, so we need to exit
|
|
|
|
|
|
+ "As I'm leaving the listener thread, I won't be able to participate in leader election any longer: {}",
|
|
|
|
+ self.getElectionAddress().getAllAddresses().stream()
|
|
|
|
+ .map(NetUtils::formatInetAddr)
|
|
|
|
+ .collect(Collectors.joining("|")));
|
|
|
|
+ if (socketException.get()) {
|
|
|
|
+ // After leaving listener thread, the host cannot join the quorum anymore,
|
|
|
|
+ // this is a severe error that we cannot recover from, so we need to exit
|
|
socketBindErrorHandler.run();
|
|
socketBindErrorHandler.run();
|
|
}
|
|
}
|
|
- } else if (ss != null) {
|
|
|
|
- // Clean up for shutdown.
|
|
|
|
- try {
|
|
|
|
- ss.close();
|
|
|
|
- } catch (IOException ie) {
|
|
|
|
- // Don't log an error for shutdown.
|
|
|
|
- LOG.debug("Error closing server socket", ie);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -964,14 +954,145 @@ public class QuorumCnxManager {
|
|
* Halts this listener thread.
|
|
* Halts this listener thread.
|
|
*/
|
|
*/
|
|
void halt() {
|
|
void halt() {
|
|
- try {
|
|
|
|
- LOG.debug("Trying to close listener: {}", ss);
|
|
|
|
- if (ss != null) {
|
|
|
|
- LOG.debug("Closing listener: {}", QuorumCnxManager.this.mySid);
|
|
|
|
- ss.close();
|
|
|
|
|
|
+ LOG.debug("Trying to close listeners");
|
|
|
|
+ if (listenerHandlers != null) {
|
|
|
|
+ LOG.debug("Closing listener: {}", QuorumCnxManager.this.mySid);
|
|
|
|
+ for (ListenerHandler handler : listenerHandlers) {
|
|
|
|
+ try {
|
|
|
|
+ handler.close();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.warn("Exception when shutting down listener: ", e);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- } catch (IOException e) {
|
|
|
|
- LOG.warn("Exception when shutting down listener", e);
|
|
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ class ListenerHandler implements Runnable, Closeable {
|
|
|
|
+ private ServerSocket serverSocket;
|
|
|
|
+ private InetSocketAddress address;
|
|
|
|
+ private boolean portUnification;
|
|
|
|
+ private boolean sslQuorum;
|
|
|
|
+ private CountDownLatch latch;
|
|
|
|
+
|
|
|
|
+ ListenerHandler(InetSocketAddress address, boolean portUnification, boolean sslQuorum,
|
|
|
|
+ CountDownLatch latch) {
|
|
|
|
+ this.address = address;
|
|
|
|
+ this.portUnification = portUnification;
|
|
|
|
+ this.sslQuorum = sslQuorum;
|
|
|
|
+ this.latch = latch;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Sleeps on acceptConnections().
|
|
|
|
+ */
|
|
|
|
+ @Override
|
|
|
|
+ public void run() {
|
|
|
|
+ try {
|
|
|
|
+ Thread.currentThread().setName("ListenerHandler-" + address);
|
|
|
|
+ acceptConnections();
|
|
|
|
+ try {
|
|
|
|
+ close();
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ LOG.warn("Exception when shutting down listener: ", e);
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ // Output of unexpected exception, should never happen
|
|
|
|
+ LOG.error("Unexpected error ", e);
|
|
|
|
+ } finally {
|
|
|
|
+ latch.countDown();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void close() throws IOException {
|
|
|
|
+ if (serverSocket != null && !serverSocket.isClosed()) {
|
|
|
|
+ LOG.debug("Trying to close listeners: {}", serverSocket);
|
|
|
|
+ serverSocket.close();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Sleeps on accept().
|
|
|
|
+ */
|
|
|
|
+ private void acceptConnections() {
|
|
|
|
+ int numRetries = 0;
|
|
|
|
+ Socket client = null;
|
|
|
|
+
|
|
|
|
+ while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
|
|
|
|
+ try {
|
|
|
|
+ serverSocket = createNewServerSocket();
|
|
|
|
+ LOG.info("My election bind port: {}", address.toString());
|
|
|
|
+ while (!shutdown) {
|
|
|
|
+ try {
|
|
|
|
+ client = serverSocket.accept();
|
|
|
|
+ setSockOpts(client);
|
|
|
|
+ LOG.info("Received connection request {}", client.getRemoteSocketAddress());
|
|
|
|
+ // Receive and handle the connection request
|
|
|
|
+ // asynchronously if the quorum sasl authentication is
|
|
|
|
+ // enabled. This is required because sasl server
|
|
|
|
+ // authentication process may take few seconds to finish,
|
|
|
|
+ // this may delay next peer connection requests.
|
|
|
|
+ if (quorumSaslAuthEnabled) {
|
|
|
|
+ receiveConnectionAsync(client);
|
|
|
|
+ } else {
|
|
|
|
+ receiveConnection(client);
|
|
|
|
+ }
|
|
|
|
+ numRetries = 0;
|
|
|
|
+ } catch (SocketTimeoutException e) {
|
|
|
|
+ LOG.warn("The socket is listening for the election accepted "
|
|
|
|
+ + "and it timed out unexpectedly, but will retry."
|
|
|
|
+ + "see ZOOKEEPER-2836");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } catch (IOException e) {
|
|
|
|
+ if (shutdown) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ LOG.error("Exception while listening", e);
|
|
|
|
+
|
|
|
|
+ if (e instanceof SocketException) {
|
|
|
|
+ socketException.set(true);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ numRetries++;
|
|
|
|
+ try {
|
|
|
|
+ close();
|
|
|
|
+ Thread.sleep(1000);
|
|
|
|
+ } catch (IOException ie) {
|
|
|
|
+ LOG.error("Error closing server socket", ie);
|
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
|
+ LOG.error("Interrupted while sleeping. Ignoring exception", ie);
|
|
|
|
+ }
|
|
|
|
+ closeSocket(client);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ if (!shutdown) {
|
|
|
|
+ LOG.error(
|
|
|
|
+ "Leaving listener thread for address {} after {} errors. Use {} property to increase retry count.",
|
|
|
|
+ formatInetAddr(address),
|
|
|
|
+ numRetries,
|
|
|
|
+ ELECTION_PORT_BIND_RETRY);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private ServerSocket createNewServerSocket() throws IOException {
|
|
|
|
+ ServerSocket socket;
|
|
|
|
+
|
|
|
|
+ if (portUnification) {
|
|
|
|
+ LOG.info("Creating TLS-enabled quorum server socket");
|
|
|
|
+ socket = new UnifiedServerSocket(self.getX509Util(), true);
|
|
|
|
+ } else if (sslQuorum) {
|
|
|
|
+ LOG.info("Creating TLS-only quorum server socket");
|
|
|
|
+ socket = new UnifiedServerSocket(self.getX509Util(), false);
|
|
|
|
+ } else {
|
|
|
|
+ socket = new ServerSocket();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ socket.setReuseAddress(true);
|
|
|
|
+ socket.bind(address);
|
|
|
|
+
|
|
|
|
+ return socket;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -989,6 +1110,7 @@ public class QuorumCnxManager {
|
|
RecvWorker recvWorker;
|
|
RecvWorker recvWorker;
|
|
volatile boolean running = true;
|
|
volatile boolean running = true;
|
|
DataOutputStream dout;
|
|
DataOutputStream dout;
|
|
|
|
+ AtomicBoolean ongoingAsyncValidation = new AtomicBoolean(false);
|
|
|
|
|
|
/**
|
|
/**
|
|
* An instance of this thread receives messages to send
|
|
* An instance of this thread receives messages to send
|
|
@@ -1129,6 +1251,33 @@ public class QuorumCnxManager {
|
|
LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId());
|
|
LOG.warn("Send worker leaving thread id {} my id = {}", sid, self.getId());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+ public void asyncValidateIfSocketIsStillReachable() {
|
|
|
|
+ if (ongoingAsyncValidation.compareAndSet(false, true)) {
|
|
|
|
+ new Thread(() -> {
|
|
|
|
+ LOG.debug("validate if destination address is reachable for sid {}", sid);
|
|
|
|
+ if (sock != null) {
|
|
|
|
+ InetAddress address = sock.getInetAddress();
|
|
|
|
+ try {
|
|
|
|
+ if (address.isReachable(500)) {
|
|
|
|
+ LOG.debug("destination address {} is reachable for sid {}", address.toString(), sid);
|
|
|
|
+ ongoingAsyncValidation.set(false);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ } catch (NullPointerException | IOException ignored) {
|
|
|
|
+ }
|
|
|
|
+ LOG.warn(
|
|
|
|
+ "destination address {} not reachable anymore, shutting down the SendWorker for sid {}",
|
|
|
|
+ address.toString(),
|
|
|
|
+ sid);
|
|
|
|
+ this.finish();
|
|
|
|
+ }
|
|
|
|
+ }).start();
|
|
|
|
+ } else {
|
|
|
|
+ LOG.debug("validation of destination address for sid {} is skipped (it is already running)", sid);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|