|
@@ -32,6 +32,7 @@ import java.net.ServerSocket;
|
|
|
import java.net.Socket;
|
|
|
import java.net.SocketException;
|
|
|
import java.net.SocketTimeoutException;
|
|
|
+import java.net.UnknownHostException;
|
|
|
import java.nio.BufferUnderflowException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
@@ -266,12 +267,33 @@ public class QuorumCnxManager {
|
|
|
} catch (ArrayIndexOutOfBoundsException e) {
|
|
|
throw new InitialMessageException("No port number in: %s", addr);
|
|
|
}
|
|
|
- addresses.add(new InetSocketAddress(host_port[0], port));
|
|
|
+ if (!isWildcardAddress(host_port[0])) {
|
|
|
+ addresses.add(new InetSocketAddress(host_port[0], port));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
return new InitialMessage(sid, addresses);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns true if the specified hostname is a wildcard address,
|
|
|
+ * like 0.0.0.0 for IPv4 or :: for IPv6
|
|
|
+ *
|
|
|
+ * (the function is package-private to be visible for testing)
|
|
|
+ */
|
|
|
+ static boolean isWildcardAddress(final String hostname) {
|
|
|
+ try {
|
|
|
+ return InetAddress.getByName(hostname).isAnyLocalAddress();
|
|
|
+ } catch (UnknownHostException e) {
|
|
|
+ // if we can not resolve, it can not be a wildcard address
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return "InitialMessage{sid=" + sid + ", electionAddr=" + electionAddr + '}';
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
public QuorumCnxManager(QuorumPeer self, final long mySid, Map<Long, QuorumPeer.QuorumServer> view,
|
|
@@ -415,6 +437,7 @@ public class QuorumCnxManager {
|
|
|
private boolean startConnection(Socket sock, Long sid) throws IOException {
|
|
|
DataOutputStream dout = null;
|
|
|
DataInputStream din = null;
|
|
|
+ LOG.debug("startConnection (myId:{} --> sid:{})", self.getId(), sid);
|
|
|
try {
|
|
|
// Use BufferedOutputStream to reduce the number of IP packets. This is
|
|
|
// important for x-DC scenarios.
|
|
@@ -459,13 +482,11 @@ public class QuorumCnxManager {
|
|
|
|
|
|
// If lost the challenge, then drop the new connection
|
|
|
if (sid > self.getId()) {
|
|
|
- LOG.info(
|
|
|
- "Have smaller server identifier, so dropping the connection: ({}, {})",
|
|
|
- sid,
|
|
|
- self.getId());
|
|
|
+ LOG.info("Have smaller server identifier, so dropping the connection: (myId:{} --> sid:{})", self.getId(), sid);
|
|
|
closeSocket(sock);
|
|
|
// Otherwise proceed with the connection
|
|
|
} else {
|
|
|
+ LOG.debug("Have larger server identifier, so keeping the connection: (myId:{} --> sid:{})", self.getId(), sid);
|
|
|
SendWorker sw = new SendWorker(sock, sid);
|
|
|
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
|
|
|
sw.setRecv(rw);
|
|
@@ -501,9 +522,11 @@ public class QuorumCnxManager {
|
|
|
try {
|
|
|
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
|
|
|
|
|
|
+ LOG.debug("Sync handling of connection request received from: {}", sock.getRemoteSocketAddress());
|
|
|
handleConnection(sock, din);
|
|
|
} catch (IOException e) {
|
|
|
LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
|
|
|
+ LOG.debug("Exception details: ", e);
|
|
|
closeSocket(sock);
|
|
|
}
|
|
|
}
|
|
@@ -514,10 +537,12 @@ public class QuorumCnxManager {
|
|
|
*/
|
|
|
public void receiveConnectionAsync(final Socket sock) {
|
|
|
try {
|
|
|
+ LOG.debug("Async handling of connection request received from: {}", sock.getRemoteSocketAddress());
|
|
|
connectionExecutor.execute(new QuorumConnectionReceiverThread(sock));
|
|
|
connectionThreadCnt.incrementAndGet();
|
|
|
} catch (Throwable e) {
|
|
|
LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
|
|
|
+ LOG.debug("Exception details: ", e);
|
|
|
closeSocket(sock);
|
|
|
}
|
|
|
}
|
|
@@ -552,10 +577,13 @@ public class QuorumCnxManager {
|
|
|
try {
|
|
|
InitialMessage init = InitialMessage.parse(protocolVersion, din);
|
|
|
sid = init.sid;
|
|
|
- electionAddr = new MultipleAddresses(init.electionAddr,
|
|
|
- Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
|
|
|
+ if (!init.electionAddr.isEmpty()) {
|
|
|
+ electionAddr = new MultipleAddresses(init.electionAddr,
|
|
|
+ Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
|
|
|
+ }
|
|
|
+ LOG.debug("Initial message parsed by {}: {}", self.getId(), init.toString());
|
|
|
} catch (InitialMessage.InitialMessageException ex) {
|
|
|
- LOG.error(ex.toString());
|
|
|
+ LOG.error("Initial message parsing error!", ex);
|
|
|
closeSocket(sock);
|
|
|
return;
|
|
|
}
|
|
@@ -601,6 +629,10 @@ public class QuorumCnxManager {
|
|
|
connectOne(sid);
|
|
|
}
|
|
|
|
|
|
+ } else if (sid == self.getId()) {
|
|
|
+ // we saw this case in ZOOKEEPER-2164
|
|
|
+ LOG.warn("We got a connection request from a server with our own ID. "
|
|
|
+ + "This should be either a configuration error, or a bug.");
|
|
|
} else { // Otherwise start worker threads to receive data.
|
|
|
SendWorker sw = new SendWorker(sock, sid);
|
|
|
RecvWorker rw = new RecvWorker(sock, din, sid, sw);
|
|
@@ -745,6 +777,7 @@ public class QuorumCnxManager {
|
|
|
Map<Long, QuorumPeer.QuorumServer> lastProposedView = lastSeenQV.getAllMembers();
|
|
|
if (lastCommittedView.containsKey(sid)) {
|
|
|
knownId = true;
|
|
|
+ LOG.debug("Server {} knows {} already, it is in the lastCommittedView", self.getId(), sid);
|
|
|
if (connectOne(sid, lastCommittedView.get(sid).electionAddr)) {
|
|
|
return;
|
|
|
}
|
|
@@ -754,6 +787,8 @@ public class QuorumCnxManager {
|
|
|
&& (!knownId
|
|
|
|| (lastProposedView.get(sid).electionAddr != lastCommittedView.get(sid).electionAddr))) {
|
|
|
knownId = true;
|
|
|
+ LOG.debug("Server {} knows {} already, it is in the lastProposedView", self.getId(), sid);
|
|
|
+
|
|
|
if (connectOne(sid, lastProposedView.get(sid).electionAddr)) {
|
|
|
return;
|
|
|
}
|
|
@@ -821,7 +856,7 @@ public class QuorumCnxManager {
|
|
|
*/
|
|
|
public void softHalt() {
|
|
|
for (SendWorker sw : senderWorkerMap.values()) {
|
|
|
- LOG.debug("Halting sender: {}", sw);
|
|
|
+ LOG.debug("Server {} is soft-halting sender towards: {}", self.getId(), sw);
|
|
|
sw.finish();
|
|
|
}
|
|
|
}
|
|
@@ -925,6 +960,7 @@ public class QuorumCnxManager {
|
|
|
@Override
|
|
|
public void run() {
|
|
|
if (!shutdown) {
|
|
|
+ LOG.debug("Listener thread started, myId: {}", self.getId());
|
|
|
Set<InetSocketAddress> addresses;
|
|
|
|
|
|
if (self.getQuorumListenOnAllIPs()) {
|
|
@@ -977,7 +1013,7 @@ public class QuorumCnxManager {
|
|
|
* Halts this listener thread.
|
|
|
*/
|
|
|
void halt() {
|
|
|
- LOG.debug("Trying to close listeners");
|
|
|
+ LOG.debug("Halt called: Trying to close listeners");
|
|
|
if (listenerHandlers != null) {
|
|
|
LOG.debug("Closing listener: {}", QuorumCnxManager.this.mySid);
|
|
|
for (ListenerHandler handler : listenerHandlers) {
|
|
@@ -1044,12 +1080,12 @@ public class QuorumCnxManager {
|
|
|
while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
|
|
|
try {
|
|
|
serverSocket = createNewServerSocket();
|
|
|
- LOG.info("My election bind port: {}", address.toString());
|
|
|
+ LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());
|
|
|
while (!shutdown) {
|
|
|
try {
|
|
|
client = serverSocket.accept();
|
|
|
setSockOpts(client);
|
|
|
- LOG.info("Received connection request {}", client.getRemoteSocketAddress());
|
|
|
+ LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
|
|
|
// Receive and handle the connection request
|
|
|
// asynchronously if the quorum sasl authentication is
|
|
|
// enabled. This is required because sasl server
|
|
@@ -1173,7 +1209,7 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
|
|
|
synchronized boolean finish() {
|
|
|
- LOG.debug("Calling finish for {}", sid);
|
|
|
+ LOG.debug("Calling SendWorker.finish for {}", sid);
|
|
|
|
|
|
if (!running) {
|
|
|
/*
|
|
@@ -1240,6 +1276,7 @@ public class QuorumCnxManager {
|
|
|
LOG.error("Failed to send last message. Shutting down thread.", e);
|
|
|
this.finish();
|
|
|
}
|
|
|
+ LOG.debug("SendWorker thread started towards {}. myId: {}", sid, QuorumCnxManager.this.mySid);
|
|
|
|
|
|
try {
|
|
|
while (running && !shutdown && sock != null) {
|
|
@@ -1337,6 +1374,7 @@ public class QuorumCnxManager {
|
|
|
* @return boolean Value of variable running
|
|
|
*/
|
|
|
synchronized boolean finish() {
|
|
|
+ LOG.debug("RecvWorker.finish called. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
|
|
|
if (!running) {
|
|
|
/*
|
|
|
* Avoids running finish() twice.
|
|
@@ -1354,6 +1392,7 @@ public class QuorumCnxManager {
|
|
|
public void run() {
|
|
|
threadCnt.incrementAndGet();
|
|
|
try {
|
|
|
+ LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
|
|
|
while (running && !shutdown && sock != null) {
|
|
|
/**
|
|
|
* Reads the first int to determine the length of the
|
|
@@ -1377,7 +1416,7 @@ public class QuorumCnxManager {
|
|
|
QuorumCnxManager.this.mySid,
|
|
|
e);
|
|
|
} finally {
|
|
|
- LOG.warn("Interrupting SendWorker");
|
|
|
+ LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
|
|
|
sw.finish();
|
|
|
closeSocket(sock);
|
|
|
}
|