|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.zookeeper.server.quorum;
|
|
|
|
|
|
+import java.io.BufferedInputStream;
|
|
|
import java.io.BufferedOutputStream;
|
|
|
import java.io.DataInputStream;
|
|
|
import java.io.DataOutputStream;
|
|
@@ -30,15 +31,24 @@ import java.net.SocketTimeoutException;
|
|
|
import java.nio.BufferUnderflowException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.UnresolvedAddressException;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.Enumeration;
|
|
|
+import java.util.HashSet;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
import java.util.concurrent.ArrayBlockingQueue;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.SynchronousQueue;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.NoSuchElementException;
|
|
|
import java.util.concurrent.atomic.AtomicInteger;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.zookeeper.server.ZooKeeperThread;
|
|
|
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthLearner;
|
|
|
+import org.apache.zookeeper.server.quorum.auth.QuorumAuthServer;
|
|
|
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -79,8 +89,8 @@ public class QuorumCnxManager {
|
|
|
/*
|
|
|
* Negative counter for observer server ids.
|
|
|
*/
|
|
|
-
|
|
|
- private long observerCounter = -1;
|
|
|
+
|
|
|
+ private AtomicLong observerCounter = new AtomicLong(-1);
|
|
|
|
|
|
/*
|
|
|
* Protocol identifier used among peers
|
|
@@ -97,11 +107,26 @@ public class QuorumCnxManager {
|
|
|
*/
|
|
|
|
|
|
private int cnxTO = 5000;
|
|
|
-
|
|
|
+
|
|
|
+ final QuorumPeer self;
|
|
|
+
|
|
|
/*
|
|
|
* Local IP address
|
|
|
*/
|
|
|
- final QuorumPeer self;
|
|
|
+ final long mySid;
|
|
|
+ final int socketTimeout;
|
|
|
+ final Map<Long, QuorumPeer.QuorumServer> view;
|
|
|
+ final boolean listenOnAllIPs;
|
|
|
+ private ThreadPoolExecutor connectionExecutor;
|
|
|
+ private final Set<Long> inprogressConnections = Collections
|
|
|
+ .synchronizedSet(new HashSet<Long>());
|
|
|
+ private QuorumAuthServer authServer;
|
|
|
+ private QuorumAuthLearner authLearner;
|
|
|
+ private boolean quorumSaslAuthEnabled;
|
|
|
+ /*
|
|
|
+ * Counter to count connection processing threads.
|
|
|
+ */
|
|
|
+ private AtomicInteger connectionThreadCnt = new AtomicInteger(0);
|
|
|
|
|
|
/*
|
|
|
* Mapping from Peer to Thread number
|
|
@@ -217,7 +242,15 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public QuorumCnxManager(QuorumPeer self) {
|
|
|
+ public QuorumCnxManager(QuorumPeer self,
|
|
|
+ final long mySid,
|
|
|
+ Map<Long,QuorumPeer.QuorumServer> view,
|
|
|
+ QuorumAuthServer authServer,
|
|
|
+ QuorumAuthLearner authLearner,
|
|
|
+ int socketTimeout,
|
|
|
+ boolean listenOnAllIPs,
|
|
|
+ int quorumCnxnThreadsSize,
|
|
|
+ boolean quorumSaslAuthEnabled) {
|
|
|
this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
|
|
|
this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
|
|
|
this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
|
|
@@ -230,11 +263,54 @@ public class QuorumCnxManager {
|
|
|
|
|
|
this.self = self;
|
|
|
|
|
|
- // Starts listener thread that waits for connection requests
|
|
|
+ this.mySid = mySid;
|
|
|
+ this.socketTimeout = socketTimeout;
|
|
|
+ this.view = view;
|
|
|
+ this.listenOnAllIPs = listenOnAllIPs;
|
|
|
+
|
|
|
+ initializeAuth(mySid, authServer, authLearner, quorumCnxnThreadsSize,
|
|
|
+ quorumSaslAuthEnabled);
|
|
|
+
|
|
|
+ // Starts listener thread that waits for connection requests
|
|
|
listener = new Listener();
|
|
|
listener.setName("QuorumPeerListener");
|
|
|
}
|
|
|
|
|
|
+ private void initializeAuth(final long mySid,
|
|
|
+ final QuorumAuthServer authServer,
|
|
|
+ final QuorumAuthLearner authLearner,
|
|
|
+ final int quorumCnxnThreadsSize,
|
|
|
+ final boolean quorumSaslAuthEnabled) {
|
|
|
+ this.authServer = authServer;
|
|
|
+ this.authLearner = authLearner;
|
|
|
+ this.quorumSaslAuthEnabled = quorumSaslAuthEnabled;
|
|
|
+ if (!this.quorumSaslAuthEnabled) {
|
|
|
+ LOG.debug("Not initializing connection executor as quorum sasl auth is disabled");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ // init connection executors
|
|
|
+ final AtomicInteger threadIndex = new AtomicInteger(1);
|
|
|
+ SecurityManager s = System.getSecurityManager();
|
|
|
+ final ThreadGroup group = (s != null) ? s.getThreadGroup()
|
|
|
+ : Thread.currentThread().getThreadGroup();
|
|
|
+ ThreadFactory daemonThFactory = new ThreadFactory() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Thread newThread(Runnable r) {
|
|
|
+ Thread t = new Thread(group, r, "QuorumConnectionThread-"
|
|
|
+ + "[myid=" + mySid + "]-"
|
|
|
+ + threadIndex.getAndIncrement());
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ this.connectionExecutor = new ThreadPoolExecutor(3,
|
|
|
+ quorumCnxnThreadsSize, 60, TimeUnit.SECONDS,
|
|
|
+ new SynchronousQueue<Runnable>(), daemonThFactory);
|
|
|
+ this.connectionExecutor.allowCoreThreadTimeOut(true);
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/**
|
|
|
* Invokes initiateConnection for testing purposes
|
|
|
*
|
|
@@ -247,17 +323,80 @@ public class QuorumCnxManager {
|
|
|
sock.connect(self.getVotingView().get(sid).electionAddr, cnxTO);
|
|
|
initiateConnection(sock, sid);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* If this server has initiated the connection, then it gives up on the
|
|
|
* connection if it loses challenge. Otherwise, it keeps the connection.
|
|
|
*/
|
|
|
- public boolean initiateConnection(Socket sock, Long sid) {
|
|
|
+ public void initiateConnection(final Socket sock, final Long sid) {
|
|
|
+ try {
|
|
|
+ startConnection(sock, sid);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Exception while connecting, id: {}, addr: {}, closing learner connection",
|
|
|
+ new Object[] { sid, sock.getRemoteSocketAddress() }, e);
|
|
|
+ closeSocket(sock);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Server will initiate the connection request to its peer server
|
|
|
+ * asynchronously via separate connection thread.
|
|
|
+ */
|
|
|
+ public void initiateConnectionAsync(final Socket sock, final Long sid) {
|
|
|
+ if(!inprogressConnections.add(sid)){
|
|
|
+ // simply return as there is a connection request to
|
|
|
+ // server 'sid' already in progress.
|
|
|
+ LOG.debug("Connection request to server id: {} is already in progress, so skipping this request",
|
|
|
+ sid);
|
|
|
+ closeSocket(sock);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ connectionExecutor.execute(
|
|
|
+ new QuorumConnectionReqThread(sock, sid));
|
|
|
+ connectionThreadCnt.incrementAndGet();
|
|
|
+ } catch (Throwable e) {
|
|
|
+ // Imp: Safer side catching all type of exceptions and remove 'sid'
|
|
|
+ // from inprogress connections. This is to avoid blocking further
|
|
|
+ // connection requests from this 'sid' in case of errors.
|
|
|
+ inprogressConnections.remove(sid);
|
|
|
+ LOG.error("Exception while submitting quorum connection request", e);
|
|
|
+ closeSocket(sock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Thread to send connection request to peer server.
|
|
|
+ */
|
|
|
+ private class QuorumConnectionReqThread extends ZooKeeperThread {
|
|
|
+ final Socket sock;
|
|
|
+ final Long sid;
|
|
|
+ QuorumConnectionReqThread(final Socket sock, final Long sid) {
|
|
|
+ super("QuorumConnectionReqThread-" + sid);
|
|
|
+ this.sock = sock;
|
|
|
+ this.sid = sid;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try{
|
|
|
+ initiateConnection(sock, sid);
|
|
|
+ } finally {
|
|
|
+ inprogressConnections.remove(sid);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean startConnection(Socket sock, Long sid)
|
|
|
+ throws IOException {
|
|
|
+ DataOutputStream dout = null;
|
|
|
+ DataInputStream din = null;
|
|
|
try {
|
|
|
// Use BufferedOutputStream to reduce the number of IP packets. This is
|
|
|
// important for x-DC scenarios.
|
|
|
BufferedOutputStream buf = new BufferedOutputStream(sock.getOutputStream());
|
|
|
- DataOutputStream dout = new DataOutputStream(buf);
|
|
|
+ dout = new DataOutputStream(buf);
|
|
|
|
|
|
// Sending id and challenge
|
|
|
|
|
@@ -269,12 +408,22 @@ public class QuorumCnxManager {
|
|
|
dout.writeInt(addr_bytes.length);
|
|
|
dout.write(addr_bytes);
|
|
|
dout.flush();
|
|
|
+
|
|
|
+ din = new DataInputStream(
|
|
|
+ new BufferedInputStream(sock.getInputStream()));
|
|
|
} catch (IOException e) {
|
|
|
LOG.warn("Ignoring exception reading or writing challenge: ", e);
|
|
|
closeSocket(sock);
|
|
|
return false;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ // authenticate learner
|
|
|
+ QuorumPeer.QuorumServer qps = self.getVotingView().get(sid);
|
|
|
+ if (qps != null) {
|
|
|
+ // TODO - investigate why reconfig makes qps null.
|
|
|
+ authLearner.authenticate(sock, qps.hostname);
|
|
|
+ }
|
|
|
+
|
|
|
// If lost the challenge, then drop the new connection
|
|
|
if (sid > self.getId()) {
|
|
|
LOG.info("Have smaller server identifier, so dropping the " +
|
|
@@ -283,7 +432,7 @@ public class QuorumCnxManager {
|
|
|
// Otherwise proceed with the connection
|
|
|
} else {
|
|
|
SendWorker sw = new SendWorker(sock, sid);
|
|
|
- RecvWorker rw = new RecvWorker(sock, sid, sw);
|
|
|
+ RecvWorker rw = new RecvWorker(sock, din, sid, sw);
|
|
|
sw.setRecv(rw);
|
|
|
|
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
@@ -312,13 +461,58 @@ public class QuorumCnxManager {
|
|
|
* possible long value to lose the challenge.
|
|
|
*
|
|
|
*/
|
|
|
- public void receiveConnection(Socket sock) {
|
|
|
+ public void receiveConnection(final Socket sock) {
|
|
|
+ DataInputStream din = null;
|
|
|
+ try {
|
|
|
+ din = new DataInputStream(
|
|
|
+ new BufferedInputStream(sock.getInputStream()));
|
|
|
+
|
|
|
+ handleConnection(sock, din);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Exception handling connection, addr: {}, closing server connection",
|
|
|
+ sock.getRemoteSocketAddress());
|
|
|
+ closeSocket(sock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Server receives a connection request and handles it asynchronously via
|
|
|
+ * separate thread.
|
|
|
+ */
|
|
|
+ public void receiveConnectionAsync(final Socket sock) {
|
|
|
+ try {
|
|
|
+ connectionExecutor.execute(
|
|
|
+ new QuorumConnectionReceiverThread(sock));
|
|
|
+ connectionThreadCnt.incrementAndGet();
|
|
|
+ } catch (Throwable e) {
|
|
|
+ LOG.error("Exception handling connection, addr: {}, closing server connection",
|
|
|
+ sock.getRemoteSocketAddress());
|
|
|
+ closeSocket(sock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Thread to receive connection request from peer server.
|
|
|
+ */
|
|
|
+ private class QuorumConnectionReceiverThread extends ZooKeeperThread {
|
|
|
+ private final Socket sock;
|
|
|
+ QuorumConnectionReceiverThread(final Socket sock) {
|
|
|
+ super("QuorumConnectionReceiverThread-" + sock.getRemoteSocketAddress());
|
|
|
+ this.sock = sock;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ receiveConnection(sock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void handleConnection(Socket sock, DataInputStream din)
|
|
|
+ throws IOException {
|
|
|
Long sid = null, protocolVersion = null;
|
|
|
InetSocketAddress electionAddr = null;
|
|
|
|
|
|
try {
|
|
|
- DataInputStream din = new DataInputStream(sock.getInputStream());
|
|
|
-
|
|
|
protocolVersion = din.readLong();
|
|
|
if (protocolVersion >= 0) { // this is a server id and not a protocol version
|
|
|
sid = protocolVersion;
|
|
@@ -339,16 +533,18 @@ public class QuorumCnxManager {
|
|
|
* Choose identifier at random. We need a value to identify
|
|
|
* the connection.
|
|
|
*/
|
|
|
-
|
|
|
- sid = observerCounter--;
|
|
|
- LOG.info("Setting arbitrary identifier to observer: {}", sid);
|
|
|
+ sid = observerCounter.getAndDecrement();
|
|
|
+ LOG.info("Setting arbitrary identifier to observer: " + sid);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
closeSocket(sock);
|
|
|
LOG.warn("Exception reading or writing challenge: {}", e.toString());
|
|
|
return;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ // do authenticating learner
|
|
|
+ authServer.authenticate(sock, din);
|
|
|
+
|
|
|
//If wins the challenge, then close the new connection.
|
|
|
if (sid < self.getId()) {
|
|
|
/*
|
|
@@ -375,7 +571,7 @@ public class QuorumCnxManager {
|
|
|
|
|
|
} else { // Otherwise start worker threads to receive data.
|
|
|
SendWorker sw = new SendWorker(sock, sid);
|
|
|
- RecvWorker rw = new RecvWorker(sock, sid, sw);
|
|
|
+ RecvWorker rw = new RecvWorker(sock, din, sid, sw);
|
|
|
sw.setRecv(rw);
|
|
|
|
|
|
SendWorker vsw = senderWorkerMap.get(sid);
|
|
@@ -402,7 +598,7 @@ public class QuorumCnxManager {
|
|
|
/*
|
|
|
* If sending message to myself, then simply enqueue it (loopback).
|
|
|
*/
|
|
|
- if (self.getId() == sid) {
|
|
|
+ if (this.mySid == sid) {
|
|
|
b.position(0);
|
|
|
addToRecvQueue(new Message(b.duplicate(), sid));
|
|
|
/*
|
|
@@ -444,7 +640,15 @@ public class QuorumCnxManager {
|
|
|
setSockOpts(sock);
|
|
|
sock.connect(electionAddr, cnxTO);
|
|
|
LOG.debug("Connected to server " + sid);
|
|
|
- initiateConnection(sock, sid);
|
|
|
+ // Sends connection request asynchronously if the quorum
|
|
|
+ // sasl authentication is enabled. This is required because
|
|
|
+ // sasl server authentication process may take few seconds to
|
|
|
+ // finish, this may delay next peer connection requests.
|
|
|
+ if (quorumSaslAuthEnabled) {
|
|
|
+ initiateConnectionAsync(sock, sid);
|
|
|
+ } else {
|
|
|
+ initiateConnection(sock, sid);
|
|
|
+ }
|
|
|
return true;
|
|
|
} catch (UnresolvedAddressException e) {
|
|
|
// Sun doesn't include the address that causes this
|
|
@@ -547,6 +751,13 @@ public class QuorumCnxManager {
|
|
|
LOG.warn("Got interrupted before joining the listener", ex);
|
|
|
}
|
|
|
softHalt();
|
|
|
+
|
|
|
+ // clear data structures used for auth
|
|
|
+ if (connectionExecutor != null) {
|
|
|
+ connectionExecutor.shutdown();
|
|
|
+ }
|
|
|
+ inprogressConnections.clear();
|
|
|
+ resetConnectionThreadCount();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -595,11 +806,19 @@ public class QuorumCnxManager {
|
|
|
public long getThreadCount() {
|
|
|
return threadCnt.get();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return number of connection processing threads.
|
|
|
+ */
|
|
|
+ public long getConnectionThreadCount() {
|
|
|
+ return connectionThreadCnt.get();
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
- * Return reference to QuorumPeer
|
|
|
+ * Reset the value of connection processing threads count to zero.
|
|
|
*/
|
|
|
- public QuorumPeer getQuorumPeer() {
|
|
|
- return self;
|
|
|
+ private void resetConnectionThreadCount() {
|
|
|
+ connectionThreadCnt.set(0);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -645,7 +864,16 @@ public class QuorumCnxManager {
|
|
|
setSockOpts(client);
|
|
|
LOG.info("Received connection request "
|
|
|
+ client.getRemoteSocketAddress());
|
|
|
- receiveConnection(client);
|
|
|
+ // Receive and handle the connection request
|
|
|
+ // asynchronously if the quorum sasl authentication is
|
|
|
+ // enabled. This is required because sasl server
|
|
|
+ // authentication process may take few seconds to finish,
|
|
|
+ // this may delay next peer connection requests.
|
|
|
+ if (quorumSaslAuthEnabled) {
|
|
|
+ receiveConnectionAsync(client);
|
|
|
+ } else {
|
|
|
+ receiveConnection(client);
|
|
|
+ }
|
|
|
numRetries = 0;
|
|
|
} catch (SocketTimeoutException e) {
|
|
|
LOG.warn("The socket is listening for the election accepted "
|
|
@@ -695,7 +923,8 @@ public class QuorumCnxManager {
|
|
|
try{
|
|
|
LOG.debug("Trying to close listener: " + ss);
|
|
|
if(ss != null) {
|
|
|
- LOG.debug("Closing listener: " + self.getId());
|
|
|
+ LOG.debug("Closing listener: "
|
|
|
+ + QuorumCnxManager.this.mySid);
|
|
|
ss.close();
|
|
|
}
|
|
|
} catch (IOException e){
|
|
@@ -847,8 +1076,9 @@ public class QuorumCnxManager {
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Exception when using channel: for id " + sid + " my id = " +
|
|
|
- self.getId() + " error = " + e);
|
|
|
+ LOG.warn("Exception when using channel: for id " + sid
|
|
|
+ + " my id = " + QuorumCnxManager.this.mySid
|
|
|
+ + " error = " + e);
|
|
|
}
|
|
|
this.finish();
|
|
|
LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
|
|
@@ -863,16 +1093,16 @@ public class QuorumCnxManager {
|
|
|
Long sid;
|
|
|
Socket sock;
|
|
|
volatile boolean running = true;
|
|
|
- DataInputStream din;
|
|
|
+ final DataInputStream din;
|
|
|
final SendWorker sw;
|
|
|
|
|
|
- RecvWorker(Socket sock, Long sid, SendWorker sw) {
|
|
|
+ RecvWorker(Socket sock, DataInputStream din, Long sid, SendWorker sw) {
|
|
|
super("RecvWorker:" + sid);
|
|
|
this.sid = sid;
|
|
|
this.sock = sock;
|
|
|
this.sw = sw;
|
|
|
+ this.din = din;
|
|
|
try {
|
|
|
- din = new DataInputStream(sock.getInputStream());
|
|
|
// OK to wait until socket disconnects while reading.
|
|
|
sock.setSoTimeout(0);
|
|
|
} catch (IOException e) {
|
|
@@ -925,8 +1155,8 @@ public class QuorumCnxManager {
|
|
|
addToRecvQueue(new Message(message.duplicate(), sid));
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- LOG.warn("Connection broken for id " + sid + ", my id = " +
|
|
|
- self.getId() + ", error = " , e);
|
|
|
+ LOG.warn("Connection broken for id " + sid + ", my id = "
|
|
|
+ + QuorumCnxManager.this.mySid + ", error = " , e);
|
|
|
} finally {
|
|
|
LOG.warn("Interrupting SendWorker");
|
|
|
sw.finish();
|
|
@@ -1046,4 +1276,8 @@ public class QuorumCnxManager {
|
|
|
throws InterruptedException {
|
|
|
return recvQueue.poll(timeout, unit);
|
|
|
}
|
|
|
+
|
|
|
+ public boolean connectedToPeer(long peerSid) {
|
|
|
+ return senderWorkerMap.get(peerSid) != null;
|
|
|
+ }
|
|
|
}
|