|
@@ -19,25 +19,73 @@
|
|
|
package org.apache.zookeeper.server;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.io.PrintWriter;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.SocketException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SelectionKey;
|
|
|
import java.nio.channels.Selector;
|
|
|
import java.nio.channels.ServerSocketChannel;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
+import java.util.Iterator;
|
|
|
+import java.util.Queue;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable {
|
|
|
+/**
|
|
|
+ * NIOServerCnxnFactory implements a multi-threaded ServerCnxnFactory using
|
|
|
+ * NIO non-blocking socket calls. Communication between threads is handled via
|
|
|
+ * queues.
|
|
|
+ *
|
|
|
+ * - 1 accept thread, which accepts new connections and assigns to a
|
|
|
+ * selector thread
|
|
|
+ * - 1-N selector threads, each of which selects on 1/N of the connections.
|
|
|
+ * The reason the factory supports more than one selector thread is that
|
|
|
+ * with large numbers of connections, select() itself can become a
|
|
|
+ * performance bottleneck.
|
|
|
+ * - 0-M socket I/O worker threads, which perform basic socket reads and
|
|
|
+ * writes. If configured with 0 worker threads, the selector threads
|
|
|
+ * do the socket I/O directly.
|
|
|
+ * - 1 connection expiration thread, which closes idle connections; this is
|
|
|
+ * necessary to expire connections on which no session is established.
|
|
|
+ *
|
|
|
+ * Typical (default) thread counts are: on a 32 core machine, 1 accept thread,
|
|
|
+ * 1 connection expiration thread, 4 selector threads, and 64 worker threads.
|
|
|
+ */
|
|
|
+public class NIOServerCnxnFactory extends ServerCnxnFactory {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(NIOServerCnxnFactory.class);
|
|
|
|
|
|
+ /** Default sessionless connection timeout in ms: 10000 (10s) */
|
|
|
+ public static final String ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT =
|
|
|
+ "zookeeper.nio.sessionlessCnxnTimeout";
|
|
|
+ /**
|
|
|
+ * With 500 connections to an observer with watchers firing on each, is
|
|
|
+ * unable to exceed 1GigE rates with only 1 selector.
|
|
|
+ * Defaults to using 2 selector threads with 8 cores and 4 with 32 cores.
|
|
|
+ * Expressed as sqrt(numCores/2). Must have at least 1 selector thread.
|
|
|
+ */
|
|
|
+ public static final String ZOOKEEPER_NIO_NUM_SELECTOR_THREADS =
|
|
|
+ "zookeeper.nio.numSelectorThreads";
|
|
|
+ /** Default: 2 * numCores */
|
|
|
+ public static final String ZOOKEEPER_NIO_NUM_WORKER_THREADS =
|
|
|
+ "zookeeper.nio.numWorkerThreads";
|
|
|
+ /** Default: 64kB */
|
|
|
+ public static final String ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES =
|
|
|
+ "zookeeper.nio.directBufferBytes";
|
|
|
+ /** Default worker pool shutdown timeout in ms: 5000 (5s) */
|
|
|
+ public static final String ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT =
|
|
|
+ "zookeeper.nio.shutdownTimeout";
|
|
|
+
|
|
|
static {
|
|
|
Thread.setDefaultUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
|
|
|
public void uncaughtException(Thread t, Throwable e) {
|
|
@@ -54,47 +102,565 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
|
|
|
} catch(IOException ie) {
|
|
|
LOG.error("Selector failed to open", ie);
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Value of 0 disables use of direct buffers and instead uses
|
|
|
+ * gathered write call.
|
|
|
+ *
|
|
|
+ * Default to using 64k direct buffers.
|
|
|
+ */
|
|
|
+ directBufferBytes = Integer.getInteger(
|
|
|
+ ZOOKEEPER_NIO_DIRECT_BUFFER_BYTES, 64 * 1024);
|
|
|
}
|
|
|
|
|
|
- ServerSocketChannel ss;
|
|
|
+ /**
|
|
|
+ * AbstractSelectThread is an abstract base class containing a few bits
|
|
|
+ * of code shared by the AcceptThread (which selects on the listen socket)
|
|
|
+ * and SelectorThread (which selects on client connections) classes.
|
|
|
+ */
|
|
|
+ private abstract class AbstractSelectThread extends Thread {
|
|
|
+ protected final Selector selector;
|
|
|
+
|
|
|
+ public AbstractSelectThread(String name) throws IOException {
|
|
|
+ super(name);
|
|
|
+ // Allows the JVM to shutdown even if this thread is still running.
|
|
|
+ setDaemon(true);
|
|
|
+ this.selector = Selector.open();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void wakeupSelector() {
|
|
|
+ selector.wakeup();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void cleanupSelectionKey(SelectionKey key) {
|
|
|
+ if (key != null) {
|
|
|
+ try {
|
|
|
+ key.cancel();
|
|
|
+ } catch (Exception ex) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("ignoring exception during selectionkey cancel", ex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void fastCloseSock(SocketChannel sc) {
|
|
|
+ if (sc != null) {
|
|
|
+ try {
|
|
|
+ // Hard close immediately, discarding buffers
|
|
|
+ sc.socket().setSoLinger(true, 0);
|
|
|
+ } catch (SocketException e) {
|
|
|
+ LOG.warn("Unable to set socket linger to 0, socket close"
|
|
|
+ + " may stall in CLOSE_WAIT", e);
|
|
|
+ }
|
|
|
+ NIOServerCnxn.closeSock(sc);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * There is a single AcceptThread which accepts new connections and assigns
|
|
|
+ * them to a SelectorThread using a simple round-robin scheme to spread
|
|
|
+ * them across the SelectorThreads. It enforces maximum number of
|
|
|
+ * connections per IP and attempts to cope with running out of file
|
|
|
+ * descriptors by briefly sleeping before retrying.
|
|
|
+ */
|
|
|
+ private class AcceptThread extends AbstractSelectThread {
|
|
|
+ private final ServerSocketChannel acceptSocket;
|
|
|
+ private final SelectionKey acceptKey;
|
|
|
+ private final RateLogger acceptErrorLogger = new RateLogger(LOG);
|
|
|
+ private final Collection<SelectorThread> selectorThreads;
|
|
|
+ private Iterator<SelectorThread> selectorIterator;
|
|
|
+
|
|
|
+ public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
|
|
|
+ Set<SelectorThread> selectorThreads) throws IOException {
|
|
|
+ super("NIOServerCxnFactory.AcceptThread:" + addr);
|
|
|
+ this.acceptSocket = ss;
|
|
|
+ this.acceptKey =
|
|
|
+ acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
|
|
|
+ this.selectorThreads = Collections.unmodifiableList(
|
|
|
+ new ArrayList<SelectorThread>(selectorThreads));
|
|
|
+ selectorIterator = this.selectorThreads.iterator();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ while (!stopped && !acceptSocket.socket().isClosed()) {
|
|
|
+ try {
|
|
|
+ select();
|
|
|
+ } catch (RuntimeException e) {
|
|
|
+ LOG.warn("Ignoring unexpected runtime exception", e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Ignoring unexpected exception", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ // This will wake up the selector threads, and tell the
|
|
|
+ // worker thread pool to begin shutdown.
|
|
|
+ NIOServerCnxnFactory.this.stop();
|
|
|
+ LOG.info("accept thread exitted run method");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void select() {
|
|
|
+ try {
|
|
|
+ selector.select();
|
|
|
+
|
|
|
+ Iterator<SelectionKey> selectedKeys =
|
|
|
+ selector.selectedKeys().iterator();
|
|
|
+ while (!stopped && selectedKeys.hasNext()) {
|
|
|
+ SelectionKey key = selectedKeys.next();
|
|
|
+ selectedKeys.remove();
|
|
|
+
|
|
|
+ if (!key.isValid()) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (key.isAcceptable()) {
|
|
|
+ if (!doAccept()) {
|
|
|
+ // If unable to pull a new connection off the accept
|
|
|
+ // queue, pause accepting to give us time to free
|
|
|
+ // up file descriptors and so the accept thread
|
|
|
+ // doesn't spin in a tight loop.
|
|
|
+ pauseAccept(10);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ LOG.warn("Unexpected ops in accept select "
|
|
|
+ + key.readyOps());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Ignoring IOException while selecting", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Mask off the listen socket interest ops and use select() to sleep
|
|
|
+ * so that other threads can wake us up by calling wakeup() on the
|
|
|
+ * selector.
|
|
|
+ */
|
|
|
+ private void pauseAccept(long millisecs) {
|
|
|
+ acceptKey.interestOps(0);
|
|
|
+ try {
|
|
|
+ selector.select(millisecs);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // ignore
|
|
|
+ } finally {
|
|
|
+ acceptKey.interestOps(SelectionKey.OP_ACCEPT);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Accept new socket connections. Enforces maximum number of connections
|
|
|
+ * per client IP address. Round-robin assigns to selector thread for
|
|
|
+ * handling. Returns whether pulled a connection off the accept queue
|
|
|
+ * or not. If encounters an error attempts to fast close the socket.
|
|
|
+ *
|
|
|
+ * @return whether was able to accept a connection or not
|
|
|
+ */
|
|
|
+ private boolean doAccept() {
|
|
|
+ boolean accepted = false;
|
|
|
+ SocketChannel sc = null;
|
|
|
+ try {
|
|
|
+ sc = acceptSocket.accept();
|
|
|
+ accepted = true;
|
|
|
+ InetAddress ia = sc.socket().getInetAddress();
|
|
|
+ int cnxncount = getClientCnxnCount(ia);
|
|
|
+
|
|
|
+ if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
|
|
|
+ throw new IOException("Too many connections from " + ia
|
|
|
+ + " - max is " + maxClientCnxns );
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG.info("Accepted socket connection from "
|
|
|
+ + sc.socket().getRemoteSocketAddress());
|
|
|
+ sc.configureBlocking(false);
|
|
|
+
|
|
|
+ // Round-robin assign this connection to a selector thread
|
|
|
+ if (!selectorIterator.hasNext()) {
|
|
|
+ selectorIterator = selectorThreads.iterator();
|
|
|
+ }
|
|
|
+ SelectorThread selectorThread = selectorIterator.next();
|
|
|
+ if (!selectorThread.addAcceptedConnection(sc)) {
|
|
|
+ throw new IOException(
|
|
|
+ "Unable to add connection to selector queue"
|
|
|
+ + (stopped ? " (shutdown in progress)" : ""));
|
|
|
+ }
|
|
|
+ acceptErrorLogger.flush();
|
|
|
+ } catch (IOException e) {
|
|
|
+ // accept, maxClientCnxns, configureBlocking
|
|
|
+ acceptErrorLogger.rateLimitLog(
|
|
|
+ "Error accepting new connection: " + e.getMessage());
|
|
|
+ fastCloseSock(sc);
|
|
|
+ }
|
|
|
+ return accepted;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The SelectorThread receives newly accepted connections from the
|
|
|
+ * AcceptThread and is responsible for selecting for I/O readiness
|
|
|
+ * across the connections. This thread is the only thread that performs
|
|
|
+ * any non-threadsafe or potentially blocking calls on the selector
|
|
|
+ * (registering new connections and reading/writing interest ops).
|
|
|
+ *
|
|
|
+ * Assignment of a connection to a SelectorThread is permanent and only
|
|
|
+ * one SelectorThread will ever interact with the connection. There are
|
|
|
+ * 1-N SelectorThreads, with connections evenly apportioned between the
|
|
|
+ * SelectorThreads.
|
|
|
+ *
|
|
|
+ * If there is a worker thread pool, when a connection has I/O to perform
|
|
|
+ * the SelectorThread removes it from selection by clearing its interest
|
|
|
+ * ops and schedules the I/O for processing by a worker thread. When the
|
|
|
+ * work is complete, the connection is placed on the ready queue to have
|
|
|
+ * its interest ops restored and resume selection.
|
|
|
+ *
|
|
|
+ * If there is no worker thread pool, the SelectorThread performs the I/O
|
|
|
+ * directly.
|
|
|
+ */
|
|
|
+ class SelectorThread extends AbstractSelectThread {
|
|
|
+ private final int id;
|
|
|
+ private final Queue<SocketChannel> acceptedQueue;
|
|
|
+ private final Queue<SelectionKey> updateQueue;
|
|
|
+
|
|
|
+ public SelectorThread(int id) throws IOException {
|
|
|
+ super("NIOServerCxnFactory.SelectorThread-" + id);
|
|
|
+ this.id = id;
|
|
|
+ acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
|
|
|
+ updateQueue = new LinkedBlockingQueue<SelectionKey>();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Place new accepted connection onto a queue for adding. Do this
|
|
|
+ * so only the selector thread modifies what keys are registered
|
|
|
+ * with the selector.
|
|
|
+ */
|
|
|
+ public boolean addAcceptedConnection(SocketChannel accepted) {
|
|
|
+ if (stopped || !acceptedQueue.offer(accepted)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ wakeupSelector();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Place interest op update requests onto a queue so that only the
|
|
|
+ * selector thread modifies interest ops, because interest ops
|
|
|
+ * reads/sets are potentially blocking operations if other select
|
|
|
+ * operations are happening.
|
|
|
+ */
|
|
|
+ public boolean addInterestOpsUpdateRequest(SelectionKey sk) {
|
|
|
+ if (stopped || !updateQueue.offer(sk)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ wakeupSelector();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The main loop for the thread selects() on the connections and
|
|
|
+ * dispatches ready I/O work requests, then registers all pending
|
|
|
+ * newly accepted connections and updates any interest ops on the
|
|
|
+ * queue.
|
|
|
+ */
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ while (!stopped) {
|
|
|
+ try {
|
|
|
+ select();
|
|
|
+ processAcceptedConnections();
|
|
|
+ processInterestOpsUpdateRequests();
|
|
|
+ } catch (RuntimeException e) {
|
|
|
+ LOG.warn("Ignoring unexpected runtime exception", e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Ignoring unexpected exception", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Close connections still pending on the selector. Any others
|
|
|
+ // with in-flight work, let drain out of the work queue.
|
|
|
+ for (SelectionKey key : selector.keys()) {
|
|
|
+ NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
|
|
|
+ if (cnxn.isSelectable()) {
|
|
|
+ cnxn.close();
|
|
|
+ }
|
|
|
+ cleanupSelectionKey(key);
|
|
|
+ }
|
|
|
+ SocketChannel accepted;
|
|
|
+ while ((accepted = acceptedQueue.poll()) != null) {
|
|
|
+ fastCloseSock(accepted);
|
|
|
+ }
|
|
|
+ updateQueue.clear();
|
|
|
+ } finally {
|
|
|
+ // This will wake up the accept thread and the other selector
|
|
|
+ // threads, and tell the worker thread pool to begin shutdown.
|
|
|
+ NIOServerCnxnFactory.this.stop();
|
|
|
+ LOG.info("selector thread exitted run method");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void select() {
|
|
|
+ try {
|
|
|
+ selector.select();
|
|
|
+
|
|
|
+ Set<SelectionKey> selected = selector.selectedKeys();
|
|
|
+ ArrayList<SelectionKey> selectedList =
|
|
|
+ new ArrayList<SelectionKey>(selected);
|
|
|
+ Collections.shuffle(selectedList);
|
|
|
+ Iterator<SelectionKey> selectedKeys = selectedList.iterator();
|
|
|
+ while(!stopped && selectedKeys.hasNext()) {
|
|
|
+ SelectionKey key = selectedKeys.next();
|
|
|
+ selected.remove(key);
|
|
|
+
|
|
|
+ if (!key.isValid()) {
|
|
|
+ cleanupSelectionKey(key);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if (key.isReadable() || key.isWritable()) {
|
|
|
+ handleIO(key);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Unexpected ops in select " + key.readyOps());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Ignoring IOException while selecting", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Schedule I/O for processing on the connection associated with
|
|
|
+ * the given SelectionKey. If a worker thread pool is not being used,
|
|
|
+ * I/O is run directly by this thread.
|
|
|
+ */
|
|
|
+ private void handleIO(SelectionKey key) {
|
|
|
+ IOWorkRequest workRequest = new IOWorkRequest(this, key);
|
|
|
+ NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
|
|
|
+
|
|
|
+ // Stop selecting this key while processing on its
|
|
|
+ // connection
|
|
|
+ cnxn.disableSelectable();
|
|
|
+ key.interestOps(0);
|
|
|
+ touchCnxn(cnxn);
|
|
|
+ workerPool.schedule(workRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Iterate over the queue of accepted connections that have been
|
|
|
+ * assigned to this thread but not yet placed on the selector.
|
|
|
+ */
|
|
|
+ private void processAcceptedConnections() {
|
|
|
+ SocketChannel accepted;
|
|
|
+ while (!stopped && (accepted = acceptedQueue.poll()) != null) {
|
|
|
+ SelectionKey key = null;
|
|
|
+ try {
|
|
|
+ key = accepted.register(selector, SelectionKey.OP_READ);
|
|
|
+ NIOServerCnxn cnxn = createConnection(accepted, key, this);
|
|
|
+ key.attach(cnxn);
|
|
|
+ addCnxn(cnxn);
|
|
|
+ } catch (IOException e) {
|
|
|
+ // register, createConnection
|
|
|
+ cleanupSelectionKey(key);
|
|
|
+ fastCloseSock(accepted);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Iterate over the queue of connections ready to resume selection,
|
|
|
+ * and restore their interest ops selection mask.
|
|
|
+ */
|
|
|
+ private void processInterestOpsUpdateRequests() {
|
|
|
+ SelectionKey key;
|
|
|
+ while (!stopped && (key = updateQueue.poll()) != null) {
|
|
|
+ if (!key.isValid()) {
|
|
|
+ cleanupSelectionKey(key);
|
|
|
+ }
|
|
|
+ NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
|
|
|
+ if (cnxn.isSelectable()) {
|
|
|
+ key.interestOps(cnxn.getInterestOps());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * IOWorkRequest is a small wrapper class to allow doIO() calls to be
|
|
|
+ * run on a connection using a WorkerService.
|
|
|
+ */
|
|
|
+ private class IOWorkRequest extends WorkerService.WorkRequest {
|
|
|
+ private final SelectorThread selectorThread;
|
|
|
+ private final SelectionKey key;
|
|
|
+ private final NIOServerCnxn cnxn;
|
|
|
+
|
|
|
+ IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
|
|
|
+ this.selectorThread = selectorThread;
|
|
|
+ this.key = key;
|
|
|
+ this.cnxn = (NIOServerCnxn) key.attachment();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void doWork() throws InterruptedException {
|
|
|
+ if (!key.isValid()) {
|
|
|
+ selectorThread.cleanupSelectionKey(key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (key.isReadable() || key.isWritable()) {
|
|
|
+ cnxn.doIO(key);
|
|
|
+
|
|
|
+ // Check if we shutdown or doIO() closed this connection
|
|
|
+ if (stopped) {
|
|
|
+ cnxn.close();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (!key.isValid()) {
|
|
|
+ selectorThread.cleanupSelectionKey(key);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ touchCnxn(cnxn);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Mark this connection as once again ready for selection
|
|
|
+ cnxn.enableSelectable();
|
|
|
+ // Push an update request on the queue to resume selecting
|
|
|
+ // on the current set of interest ops, which may have changed
|
|
|
+ // as a result of the I/O operations we just performed.
|
|
|
+ if (!selectorThread.addInterestOpsUpdateRequest(key)) {
|
|
|
+ cnxn.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- final Selector selector = Selector.open();
|
|
|
+ @Override
|
|
|
+ public void cleanup() {
|
|
|
+ cnxn.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
- * We use this buffer to do efficient socket I/O. Since there is a single
|
|
|
- * sender thread per NIOServerCnxn instance, we can use a member variable to
|
|
|
- * only allocate it once.
|
|
|
- */
|
|
|
- final ByteBuffer directBuffer = ByteBuffer.allocateDirect(64 * 1024);
|
|
|
+ * This thread is responsible for closing stale connections so that
|
|
|
+ * connections on which no session is established are properly expired.
|
|
|
+ */
|
|
|
+ private class ConnectionExpirerThread extends Thread {
|
|
|
+ ConnectionExpirerThread() {
|
|
|
+ super("ConnnectionExpirer");
|
|
|
+ }
|
|
|
|
|
|
- final HashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
|
|
|
- new HashMap<InetAddress, Set<NIOServerCnxn>>( );
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ while (!stopped) {
|
|
|
+ long waitTime = cnxnExpiryQueue.getWaitTime();
|
|
|
+ if (waitTime > 0) {
|
|
|
+ Thread.sleep(waitTime);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
|
|
|
+ conn.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- int maxClientCnxns = 60;
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ LOG.info("ConnnectionExpirerThread interrupted");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ ServerSocketChannel ss;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * We use this buffer to do efficient socket I/O. Because I/O is handled
|
|
|
+ * by the worker threads (or the selector threads directly, if no worker
|
|
|
+ * thread pool is created), we can create a fixed set of these to be
|
|
|
+ * shared by connections.
|
|
|
+ */
|
|
|
+ private static final ThreadLocal<ByteBuffer> directBuffer =
|
|
|
+ new ThreadLocal<ByteBuffer>() {
|
|
|
+ @Override protected ByteBuffer initialValue() {
|
|
|
+ return ByteBuffer.allocateDirect(directBufferBytes);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ public static ByteBuffer getDirectBuffer() {
|
|
|
+ return directBufferBytes > 0 ? directBuffer.get() : null;
|
|
|
+ }
|
|
|
+
|
|
|
+ // sessionMap is used by closeSession()
|
|
|
+ private final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap =
|
|
|
+ new ConcurrentHashMap<Long, NIOServerCnxn>();
|
|
|
+ // ipMap is used to limit connections per IP
|
|
|
+ private final ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap =
|
|
|
+ new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>( );
|
|
|
+
|
|
|
+ protected int maxClientCnxns = 60;
|
|
|
+
|
|
|
+ int sessionlessCnxnTimeout;
|
|
|
+ private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;
|
|
|
+
|
|
|
+
|
|
|
+ protected WorkerService workerPool;
|
|
|
+
|
|
|
+ private static int directBufferBytes;
|
|
|
+ private int numSelectorThreads;
|
|
|
+ private int numWorkerThreads;
|
|
|
+ private long workerShutdownTimeoutMS;
|
|
|
|
|
|
/**
|
|
|
* Construct a new server connection factory which will accept an unlimited number
|
|
|
* of concurrent connections from each client (up to the file descriptor
|
|
|
* limits of the operating system). startup(zks) must be called subsequently.
|
|
|
- * @throws IOException
|
|
|
*/
|
|
|
- public NIOServerCnxnFactory() throws IOException {
|
|
|
+ public NIOServerCnxnFactory() {
|
|
|
}
|
|
|
|
|
|
- Thread thread;
|
|
|
+ private volatile boolean stopped = true;
|
|
|
+ private ConnectionExpirerThread expirerThread;
|
|
|
+ private AcceptThread acceptThread;
|
|
|
+ private final Set<SelectorThread> selectorThreads =
|
|
|
+ new HashSet<SelectorThread>();
|
|
|
+
|
|
|
@Override
|
|
|
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
|
|
|
configureSaslLogin();
|
|
|
|
|
|
- thread = new Thread(this, "NIOServerCxn.Factory:" + addr);
|
|
|
- thread.setDaemon(true);
|
|
|
maxClientCnxns = maxcc;
|
|
|
+ sessionlessCnxnTimeout = Integer.getInteger(
|
|
|
+ ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
|
|
|
+ // We also use the sessionlessCnxnTimeout as expiring interval for
|
|
|
+ // cnxnExpiryQueue. These don't need to be the same, but the expiring
|
|
|
+ // interval passed into the ExpiryQueue() constructor below should be
|
|
|
+ // less than or equal to the timeout.
|
|
|
+ cnxnExpiryQueue =
|
|
|
+ new ExpiryQueue<NIOServerCnxn>(sessionlessCnxnTimeout);
|
|
|
+ expirerThread = new ConnectionExpirerThread();
|
|
|
+
|
|
|
+ int numCores = Runtime.getRuntime().availableProcessors();
|
|
|
+ // 32 cores sweet spot seems to be 4 selector threads
|
|
|
+ numSelectorThreads = Integer.getInteger(
|
|
|
+ ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
|
|
|
+ Math.max((int) Math.sqrt((float) numCores/2), 1));
|
|
|
+ if (numSelectorThreads < 1) {
|
|
|
+ throw new IOException("numSelectorThreads must be at least 1");
|
|
|
+ }
|
|
|
+
|
|
|
+ numWorkerThreads = Integer.getInteger(
|
|
|
+ ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
|
|
|
+ workerShutdownTimeoutMS = Long.getLong(
|
|
|
+ ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
|
|
|
+
|
|
|
+ LOG.info("Configuring NIO connection handler with "
|
|
|
+ + (sessionlessCnxnTimeout/1000) + "s sessionless connection"
|
|
|
+ + " timeout, " + numSelectorThreads + " selector thread(s), "
|
|
|
+ + (numWorkerThreads > 0 ? numWorkerThreads : "no")
|
|
|
+ + " worker threads, and "
|
|
|
+ + (directBufferBytes == 0 ? "gathered writes." :
|
|
|
+ ("" + (directBufferBytes/1024) + " kB direct buffers.")));
|
|
|
+ for(int i=0; i<numSelectorThreads; ++i) {
|
|
|
+ selectorThreads.add(new SelectorThread(i));
|
|
|
+ }
|
|
|
+
|
|
|
this.ss = ServerSocketChannel.open();
|
|
|
ss.socket().setReuseAddress(true);
|
|
|
LOG.info("binding to port " + addr);
|
|
|
ss.socket().bind(addr);
|
|
|
ss.configureBlocking(false);
|
|
|
- ss.register(selector, SelectionKey.OP_ACCEPT);
|
|
|
+ acceptThread = new AcceptThread(ss, addr, selectorThreads);
|
|
|
}
|
|
|
|
|
|
/** {@inheritDoc} */
|
|
@@ -109,9 +675,22 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
|
|
|
|
|
|
@Override
|
|
|
public void start() {
|
|
|
+ stopped = false;
|
|
|
+ if (workerPool == null) {
|
|
|
+ workerPool = new WorkerService(
|
|
|
+ "NIOWorker", numWorkerThreads, false);
|
|
|
+ }
|
|
|
+ for(SelectorThread thread : selectorThreads) {
|
|
|
+ if (thread.getState() == Thread.State.NEW) {
|
|
|
+ thread.start();
|
|
|
+ }
|
|
|
+ }
|
|
|
// ensure thread is started once and only once
|
|
|
- if (thread.getState() == Thread.State.NEW) {
|
|
|
- thread.start();
|
|
|
+ if (acceptThread.getState() == Thread.State.NEW) {
|
|
|
+ acceptThread.start();
|
|
|
+ }
|
|
|
+ if (expirerThread.getState() == Thread.State.NEW) {
|
|
|
+ expirerThread.start();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -134,94 +713,79 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
|
|
|
return ss.socket().getLocalPort();
|
|
|
}
|
|
|
|
|
|
- private void addCnxn(NIOServerCnxn cnxn) {
|
|
|
- synchronized (cnxns) {
|
|
|
- cnxns.add(cnxn);
|
|
|
- synchronized (ipMap){
|
|
|
- InetAddress addr = cnxn.sock.socket().getInetAddress();
|
|
|
- Set<NIOServerCnxn> s = ipMap.get(addr);
|
|
|
- if (s == null) {
|
|
|
- // in general we will see 1 connection from each
|
|
|
- // host, setting the initial cap to 2 allows us
|
|
|
- // to minimize mem usage in the common case
|
|
|
- // of 1 entry -- we need to set the initial cap
|
|
|
- // to 2 to avoid rehash when the first entry is added
|
|
|
- s = new HashSet<NIOServerCnxn>(2);
|
|
|
- s.add(cnxn);
|
|
|
- ipMap.put(addr,s);
|
|
|
- } else {
|
|
|
- s.add(cnxn);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * De-registers the connection from the various mappings maintained
|
|
|
+ * by the factory.
|
|
|
+ */
|
|
|
+ public boolean removeCnxn(NIOServerCnxn cnxn) {
|
|
|
+ // If the connection is not in the master list it's already been closed
|
|
|
+ if (!cnxns.remove(cnxn)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ cnxnExpiryQueue.remove(cnxn);
|
|
|
+
|
|
|
+ long sessionId = cnxn.getSessionId();
|
|
|
+ if (sessionId != 0) {
|
|
|
+ sessionMap.remove(sessionId);
|
|
|
+ }
|
|
|
+
|
|
|
+ InetAddress addr = cnxn.getSocketAddress();
|
|
|
+ if (addr != null) {
|
|
|
+ Set<NIOServerCnxn> set = ipMap.get(addr);
|
|
|
+ if (set != null) {
|
|
|
+ set.remove(cnxn);
|
|
|
+ // Note that we make no effort here to remove empty mappings
|
|
|
+ // from ipMap.
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- protected NIOServerCnxn createConnection(SocketChannel sock,
|
|
|
- SelectionKey sk) throws IOException {
|
|
|
- return new NIOServerCnxn(zkServer, sock, sk, this);
|
|
|
+ // unregister from JMX
|
|
|
+ unregisterConnection(cnxn);
|
|
|
+ return true;
|
|
|
}
|
|
|
|
|
|
- private int getClientCnxnCount(InetAddress cl) {
|
|
|
- // The ipMap lock covers both the map, and its contents
|
|
|
- // (that is, the cnxn sets shouldn't be modified outside of
|
|
|
- // this lock)
|
|
|
- synchronized (ipMap) {
|
|
|
- Set<NIOServerCnxn> s = ipMap.get(cl);
|
|
|
- if (s == null) return 0;
|
|
|
- return s.size();
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Add or update cnxn in our cnxnExpiryQueue
|
|
|
+ * @param cnxn
|
|
|
+ */
|
|
|
+ public void touchCnxn(NIOServerCnxn cnxn) {
|
|
|
+ cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
|
|
|
}
|
|
|
|
|
|
- public void run() {
|
|
|
- while (!ss.socket().isClosed()) {
|
|
|
- try {
|
|
|
- selector.select(1000);
|
|
|
- Set<SelectionKey> selected;
|
|
|
- synchronized (this) {
|
|
|
- selected = selector.selectedKeys();
|
|
|
- }
|
|
|
- ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
|
|
|
- selected);
|
|
|
- Collections.shuffle(selectedList);
|
|
|
- for (SelectionKey k : selectedList) {
|
|
|
- if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
|
|
|
- SocketChannel sc = ((ServerSocketChannel) k
|
|
|
- .channel()).accept();
|
|
|
- InetAddress ia = sc.socket().getInetAddress();
|
|
|
- int cnxncount = getClientCnxnCount(ia);
|
|
|
- if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns){
|
|
|
- LOG.warn("Too many connections from " + ia
|
|
|
- + " - max is " + maxClientCnxns );
|
|
|
- sc.close();
|
|
|
- } else {
|
|
|
- LOG.info("Accepted socket connection from "
|
|
|
- + sc.socket().getRemoteSocketAddress());
|
|
|
- sc.configureBlocking(false);
|
|
|
- SelectionKey sk = sc.register(selector,
|
|
|
- SelectionKey.OP_READ);
|
|
|
- NIOServerCnxn cnxn = createConnection(sc, sk);
|
|
|
- sk.attach(cnxn);
|
|
|
- addCnxn(cnxn);
|
|
|
- }
|
|
|
- } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
|
|
|
- NIOServerCnxn c = (NIOServerCnxn) k.attachment();
|
|
|
- c.doIO(k);
|
|
|
- } else {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Unexpected ops in select "
|
|
|
- + k.readyOps());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- selected.clear();
|
|
|
- } catch (RuntimeException e) {
|
|
|
- LOG.warn("Ignoring unexpected runtime exception", e);
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Ignoring exception", e);
|
|
|
+ private void addCnxn(NIOServerCnxn cnxn) {
|
|
|
+ InetAddress addr = cnxn.getSocketAddress();
|
|
|
+ Set<NIOServerCnxn> set = ipMap.get(addr);
|
|
|
+ if (set == null) {
|
|
|
+ // in general we will see 1 connection from each
|
|
|
+ // host, setting the initial cap to 2 allows us
|
|
|
+ // to minimize mem usage in the common case
|
|
|
+ // of 1 entry -- we need to set the initial cap
|
|
|
+ // to 2 to avoid rehash when the first entry is added
|
|
|
+ // Construct a ConcurrentHashSet using a ConcurrentHashMap
|
|
|
+ set = Collections.newSetFromMap(
|
|
|
+ new ConcurrentHashMap<NIOServerCnxn, Boolean>(2));
|
|
|
+ // Put the new set in the map, but only if another thread
|
|
|
+ // hasn't beaten us to it
|
|
|
+ Set<NIOServerCnxn> existingSet = ipMap.putIfAbsent(addr, set);
|
|
|
+ if (existingSet != null) {
|
|
|
+ set = existingSet;
|
|
|
}
|
|
|
}
|
|
|
- closeAll();
|
|
|
- LOG.info("NIOServerCnxn factory exited run method");
|
|
|
+ set.add(cnxn);
|
|
|
+
|
|
|
+ cnxns.add(cnxn);
|
|
|
+ touchCnxn(cnxn);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected NIOServerCnxn createConnection(SocketChannel sock,
|
|
|
+ SelectionKey sk, SelectorThread selectorThread) throws IOException {
|
|
|
+ return new NIOServerCnxn(zkServer, sock, sk, this, selectorThread);
|
|
|
+ }
|
|
|
+
|
|
|
+ private int getClientCnxnCount(InetAddress cl) {
|
|
|
+ Set<NIOServerCnxn> s = ipMap.get(cl);
|
|
|
+ if (s == null) return 0;
|
|
|
+ return s.size();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -230,30 +794,54 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
|
|
|
*/
|
|
|
@Override
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- synchronized public void closeAll() {
|
|
|
- selector.wakeup();
|
|
|
- HashSet<NIOServerCnxn> cnxns;
|
|
|
- synchronized (this.cnxns) {
|
|
|
- cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
|
|
|
- }
|
|
|
- // got to clear all the connections that we have in the selector
|
|
|
- for (NIOServerCnxn cnxn: cnxns) {
|
|
|
+ public void closeAll() {
|
|
|
+ // clear all the connections on which we are selecting
|
|
|
+ for (ServerCnxn cnxn : cnxns) {
|
|
|
try {
|
|
|
- // don't hold this.cnxns lock as deadlock may occur
|
|
|
+ // This will remove the cnxn from cnxns
|
|
|
cnxn.close();
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Ignoring exception closing cnxn sessionid 0x"
|
|
|
- + Long.toHexString(cnxn.sessionId), e);
|
|
|
+ + Long.toHexString(cnxn.getSessionId()), e);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void shutdown() {
|
|
|
+ public void stop() {
|
|
|
+ stopped = true;
|
|
|
+
|
|
|
+ // Stop queuing connection attempts
|
|
|
try {
|
|
|
ss.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.warn("Error closing listen socket", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (acceptThread != null) {
|
|
|
+ acceptThread.wakeupSelector();
|
|
|
+ }
|
|
|
+ if (expirerThread != null) {
|
|
|
+ expirerThread.interrupt();
|
|
|
+ }
|
|
|
+ for (SelectorThread thread : selectorThreads) {
|
|
|
+ thread.wakeupSelector();
|
|
|
+ }
|
|
|
+ if (workerPool != null) {
|
|
|
+ workerPool.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void shutdown() {
|
|
|
+ try {
|
|
|
+ // close listen socket and signal selector threads to stop
|
|
|
+ stop();
|
|
|
+
|
|
|
+ // wait for selector and worker threads to shutdown
|
|
|
+ join();
|
|
|
+
|
|
|
+ // close all open connections
|
|
|
closeAll();
|
|
|
- thread.interrupt();
|
|
|
- thread.join();
|
|
|
+
|
|
|
if (login != null) {
|
|
|
login.shutdown();
|
|
|
}
|
|
@@ -262,48 +850,45 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory implements Runnable
|
|
|
} catch (Exception e) {
|
|
|
LOG.warn("Ignoring unexpected exception during shutdown", e);
|
|
|
}
|
|
|
- try {
|
|
|
- selector.close();
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.warn("Selector closing", e);
|
|
|
- }
|
|
|
+
|
|
|
if (zkServer != null) {
|
|
|
zkServer.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public synchronized void closeSession(long sessionId) {
|
|
|
- selector.wakeup();
|
|
|
- closeSessionWithoutWakeup(sessionId);
|
|
|
+ public void addSession(long sessionId, NIOServerCnxn cnxn) {
|
|
|
+ sessionMap.put(sessionId, cnxn);
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private void closeSessionWithoutWakeup(long sessionId) {
|
|
|
- HashSet<NIOServerCnxn> cnxns;
|
|
|
- synchronized (this.cnxns) {
|
|
|
- cnxns = (HashSet<NIOServerCnxn>)this.cnxns.clone();
|
|
|
- }
|
|
|
-
|
|
|
- for (NIOServerCnxn cnxn : cnxns) {
|
|
|
- if (cnxn.getSessionId() == sessionId) {
|
|
|
- try {
|
|
|
- cnxn.close();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("exception during session close", e);
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void closeSession(long sessionId) {
|
|
|
+ NIOServerCnxn cnxn = sessionMap.remove(sessionId);
|
|
|
+ if (cnxn != null) {
|
|
|
+ cnxn.close();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void join() throws InterruptedException {
|
|
|
- thread.join();
|
|
|
+ if (acceptThread != null) {
|
|
|
+ acceptThread.join();
|
|
|
+ }
|
|
|
+ for (SelectorThread thread : selectorThreads) {
|
|
|
+ thread.join();
|
|
|
+ }
|
|
|
+ if (workerPool != null) {
|
|
|
+ workerPool.join(workerShutdownTimeoutMS);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Iterable<ServerCnxn> getConnections() {
|
|
|
return cnxns;
|
|
|
}
|
|
|
+
|
|
|
+ public void dumpConnections(PrintWriter pwriter) {
|
|
|
+ pwriter.print("Connections ");
|
|
|
+ cnxnExpiryQueue.dump(pwriter);
|
|
|
+ }
|
|
|
+
|
|
|
}
|