|
@@ -77,25 +77,16 @@ public class NIOServerCnxn extends ServerCnxn {
|
|
|
|
|
|
private int sessionTimeout;
|
|
|
|
|
|
- private final ZooKeeperServer zkServer;
|
|
|
-
|
|
|
- /**
|
|
|
- * The number of requests that have been submitted but not yet responded to.
|
|
|
- */
|
|
|
- private final AtomicInteger outstandingRequests = new AtomicInteger(0);
|
|
|
-
|
|
|
/**
|
|
|
* This is the id that uniquely identifies the session of a client. Once
|
|
|
* this session is no longer active, the ephemeral nodes will go away.
|
|
|
*/
|
|
|
private long sessionId;
|
|
|
|
|
|
- private final int outstandingLimit;
|
|
|
-
|
|
|
public NIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
|
|
|
SelectionKey sk, NIOServerCnxnFactory factory,
|
|
|
SelectorThread selectorThread) throws IOException {
|
|
|
- this.zkServer = zk;
|
|
|
+ super(zk);
|
|
|
this.sock = sock;
|
|
|
this.sk = sk;
|
|
|
this.factory = factory;
|
|
@@ -103,11 +94,6 @@ public class NIOServerCnxn extends ServerCnxn {
|
|
|
if (this.factory.login != null) {
|
|
|
this.zooKeeperSaslServer = new ZooKeeperSaslServer(factory.login);
|
|
|
}
|
|
|
- if (zk != null) {
|
|
|
- outstandingLimit = zk.getGlobalOutstandingLimit();
|
|
|
- } else {
|
|
|
- outstandingLimit = 1;
|
|
|
- }
|
|
|
sock.socket().setTcpNoDelay(true);
|
|
|
/* set socket linger to false, so that socket close does not block */
|
|
|
sock.socket().setSoLinger(false, -1);
|
|
@@ -380,21 +366,6 @@ public class NIOServerCnxn extends ServerCnxn {
|
|
|
zkServer.processPacket(this, incomingBuffer);
|
|
|
}
|
|
|
|
|
|
- // Only called as callback from zkServer.processPacket()
|
|
|
- protected void incrOutstandingRequests(RequestHeader h) {
|
|
|
- if (h.getXid() >= 0) {
|
|
|
- outstandingRequests.incrementAndGet();
|
|
|
- // check throttling
|
|
|
- int inProcess = zkServer.getInProcess();
|
|
|
- if (inProcess > outstandingLimit) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Throttling recv " + inProcess);
|
|
|
- }
|
|
|
- disableRecv();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// returns whether we are interested in writing, which is determined
|
|
|
// by whether we have any pending buffers on the output queue or not
|
|
|
private boolean getWriteInterest() {
|
|
@@ -411,7 +382,9 @@ public class NIOServerCnxn extends ServerCnxn {
|
|
|
|
|
|
// Throttle acceptance of new requests. If this entailed a state change,
|
|
|
// register an interest op update request with the selector.
|
|
|
- public void disableRecv() {
|
|
|
+ //
|
|
|
+ // Don't support wait disable receive in NIO, ignore the parameter
|
|
|
+ public void disableRecv(boolean waitDisableRecv) {
|
|
|
if (throttled.compareAndSet(false, true)) {
|
|
|
requestInterestOpsUpdate();
|
|
|
}
|
|
@@ -566,10 +539,6 @@ public class NIOServerCnxn extends ServerCnxn {
|
|
|
return zkServer != null && zkServer.isRunning();
|
|
|
}
|
|
|
|
|
|
- public long getOutstandingRequests() {
|
|
|
- return outstandingRequests.get();
|
|
|
- }
|
|
|
-
|
|
|
/*
|
|
|
* (non-Javadoc)
|
|
|
*
|
|
@@ -689,13 +658,7 @@ public class NIOServerCnxn extends ServerCnxn {
|
|
|
public void sendResponse(ReplyHeader h, Record r, String tag) {
|
|
|
try {
|
|
|
super.sendResponse(h, r, tag);
|
|
|
- if (h.getXid() > 0) {
|
|
|
- // check throttling
|
|
|
- if (outstandingRequests.decrementAndGet() < 1 ||
|
|
|
- zkServer.getInProcess() < outstandingLimit) {
|
|
|
- enableRecv();
|
|
|
- }
|
|
|
- }
|
|
|
+ decrOutstandingAndCheckThrottle(h);
|
|
|
} catch(Exception e) {
|
|
|
LOG.warn("Unexpected exception. Destruction averted.", e);
|
|
|
}
|