|
@@ -58,8 +58,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
|
|
|
* @throws InterruptedException
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- boolean doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
|
|
|
- boolean packetReceived = false;
|
|
|
+ void doIO(List<Packet> pendingQueue, LinkedList<Packet> outgoingQueue) throws InterruptedException, IOException {
|
|
|
SocketChannel sock = (SocketChannel) sockKey.channel();
|
|
|
if (sock == null) {
|
|
|
throw new IOException("Socket is null!");
|
|
@@ -85,19 +84,21 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
|
|
|
}
|
|
|
lenBuffer.clear();
|
|
|
incomingBuffer = lenBuffer;
|
|
|
- packetReceived = true;
|
|
|
+ updateLastHeard();
|
|
|
initialized = true;
|
|
|
} else {
|
|
|
sendThread.readResponse(incomingBuffer);
|
|
|
lenBuffer.clear();
|
|
|
incomingBuffer = lenBuffer;
|
|
|
- packetReceived = true;
|
|
|
+ updateLastHeard();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
if (sockKey.isWritable()) {
|
|
|
+ LinkedList<Packet> pending = new LinkedList<Packet>();
|
|
|
synchronized (outgoingQueue) {
|
|
|
if (!outgoingQueue.isEmpty()) {
|
|
|
+ updateLastSend();
|
|
|
ByteBuffer pbb = outgoingQueue.getFirst().bb;
|
|
|
sock.write(pbb);
|
|
|
if (!pbb.hasRemaining()) {
|
|
@@ -106,18 +107,15 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
|
|
|
if (p.requestHeader != null
|
|
|
&& p.requestHeader.getType() != OpCode.ping
|
|
|
&& p.requestHeader.getType() != OpCode.auth) {
|
|
|
- pendingQueue.add(p);
|
|
|
+ pending.add(p);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ synchronized(pendingQueue) {
|
|
|
+ pendingQueue.addAll(pending);
|
|
|
+ }
|
|
|
}
|
|
|
- if (outgoingQueue.isEmpty()) {
|
|
|
- disableWrite();
|
|
|
- } else {
|
|
|
- enableWrite();
|
|
|
- }
|
|
|
- return packetReceived;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -263,21 +261,16 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
|
|
|
sendThread.primeConnection();
|
|
|
}
|
|
|
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
|
|
|
- if (outgoingQueue.size() > 0) {
|
|
|
- // We have something to send so it's the same
|
|
|
- // as if we do the send now.
|
|
|
- updateLastSend();
|
|
|
- }
|
|
|
- if (doIO(pendingQueue, outgoingQueue)) {
|
|
|
- updateLastHeard();
|
|
|
- }
|
|
|
+ doIO(pendingQueue, outgoingQueue);
|
|
|
}
|
|
|
}
|
|
|
if (sendThread.getZkState().isConnected()) {
|
|
|
- if (outgoingQueue.size() > 0) {
|
|
|
- enableWrite();
|
|
|
- } else {
|
|
|
- disableWrite();
|
|
|
+ synchronized(outgoingQueue) {
|
|
|
+ if (!outgoingQueue.isEmpty()) {
|
|
|
+ enableWrite();
|
|
|
+ } else {
|
|
|
+ disableWrite();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
selected.clear();
|