|
@@ -395,7 +395,8 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- synchronized (factory) {
|
|
|
|
|
|
+
|
|
|
|
+ synchronized(this.factory){
|
|
sk.selector().wakeup();
|
|
sk.selector().wakeup();
|
|
if (LOG.isTraceEnabled()) {
|
|
if (LOG.isTraceEnabled()) {
|
|
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
|
|
LOG.trace("Add a buffer to outgoingBuffers, sk " + sk
|
|
@@ -406,6 +407,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
|
|
sk.interestOps(sk.interestOps() | SelectionKey.OP_WRITE);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
} catch(Exception e) {
|
|
} catch(Exception e) {
|
|
LOG.error("Unexpected Exception: ", e);
|
|
LOG.error("Unexpected Exception: ", e);
|
|
}
|
|
}
|
|
@@ -564,7 +566,8 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
|
|
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
|
|
// outgoingBuffers.size() = " + outgoingBuffers.size());
|
|
// outgoingBuffers.size() = " + outgoingBuffers.size());
|
|
}
|
|
}
|
|
- synchronized (this) {
|
|
|
|
|
|
+
|
|
|
|
+ synchronized(this.factory){
|
|
if (outgoingBuffers.size() == 0) {
|
|
if (outgoingBuffers.size() == 0) {
|
|
if (!initialized
|
|
if (!initialized
|
|
&& (sk.interestOps() & SelectionKey.OP_READ) == 0) {
|
|
&& (sk.interestOps() & SelectionKey.OP_READ) == 0) {
|
|
@@ -574,7 +577,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
& (~SelectionKey.OP_WRITE));
|
|
& (~SelectionKey.OP_WRITE));
|
|
} else {
|
|
} else {
|
|
sk.interestOps(sk.interestOps()
|
|
sk.interestOps(sk.interestOps()
|
|
- | SelectionKey.OP_WRITE);
|
|
|
|
|
|
+ | SelectionKey.OP_WRITE);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -654,19 +657,19 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
}
|
|
}
|
|
if (h.getXid() >= 0) {
|
|
if (h.getXid() >= 0) {
|
|
synchronized (this) {
|
|
synchronized (this) {
|
|
- synchronized (this.factory) {
|
|
|
|
- outstandingRequests++;
|
|
|
|
- // check throttling
|
|
|
|
- if (zk.getInProcess() > factory.outstandingLimit) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Throttling recv " + zk.getInProcess());
|
|
|
|
- }
|
|
|
|
- disableRecv();
|
|
|
|
- // following lines should not be needed since we are
|
|
|
|
- // already reading
|
|
|
|
- // } else {
|
|
|
|
- // enableRecv();
|
|
|
|
|
|
+ outstandingRequests++;
|
|
|
|
+ }
|
|
|
|
+ synchronized (this.factory) {
|
|
|
|
+ // check throttling
|
|
|
|
+ if (zk.getInProcess() > factory.outstandingLimit) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Throttling recv " + zk.getInProcess());
|
|
}
|
|
}
|
|
|
|
+ disableRecv();
|
|
|
|
+ // following lines should not be needed since we are
|
|
|
|
+ // already reading
|
|
|
|
+ // } else {
|
|
|
|
+ // enableRecv();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1312,9 +1315,11 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
bb.putInt(b.length - 4).rewind();
|
|
bb.putInt(b.length - 4).rewind();
|
|
sendBuffer(bb);
|
|
sendBuffer(bb);
|
|
if (h.getXid() > 0) {
|
|
if (h.getXid() > 0) {
|
|
- synchronized (this.factory) {
|
|
|
|
|
|
+ synchronized(this){
|
|
outstandingRequests--;
|
|
outstandingRequests--;
|
|
- // check throttling
|
|
|
|
|
|
+ }
|
|
|
|
+ // check throttling
|
|
|
|
+ synchronized (this.factory) {
|
|
if (zk.getInProcess() < factory.outstandingLimit
|
|
if (zk.getInProcess() < factory.outstandingLimit
|
|
|| outstandingRequests < 1) {
|
|
|| outstandingRequests < 1) {
|
|
sk.selector().wakeup();
|
|
sk.selector().wakeup();
|