|
@@ -26,16 +26,16 @@ import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SelectionKey;
|
|
|
import java.nio.channels.Selector;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
+import java.util.ListIterator;
|
|
|
import java.util.Set;
|
|
|
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
|
|
|
import org.apache.zookeeper.ClientCnxn.Packet;
|
|
|
import org.apache.zookeeper.ZooDefs.OpCode;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
public class ClientCnxnSocketNIO extends ClientCnxnSocket {
|
|
|
private static final Logger LOG = LoggerFactory
|
|
@@ -104,83 +104,85 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket {
|
|
|
}
|
|
|
}
|
|
|
if (sockKey.isWritable()) {
|
|
|
- LinkedList<Packet> pending = new LinkedList<Packet>();
|
|
|
- Packet p = null;
|
|
|
synchronized(outgoingQueue) {
|
|
|
- p = findSendablePacket(outgoingQueue,
|
|
|
+ Packet p = findSendablePacket(outgoingQueue,
|
|
|
cnxn.sendThread.clientTunneledAuthenticationInProgress());
|
|
|
|
|
|
if (p != null) {
|
|
|
- outgoingQueue.removeFirstOccurrence(p);
|
|
|
updateLastSend();
|
|
|
- if ((p.requestHeader != null) &&
|
|
|
- (p.requestHeader.getType() != OpCode.ping) &&
|
|
|
- (p.requestHeader.getType() != OpCode.auth)) {
|
|
|
- p.requestHeader.setXid(cnxn.getXid());
|
|
|
+ // If we already started writing p, p.bb will already exist
|
|
|
+ if (p.bb == null) {
|
|
|
+ if ((p.requestHeader != null) &&
|
|
|
+ (p.requestHeader.getType() != OpCode.ping) &&
|
|
|
+ (p.requestHeader.getType() != OpCode.auth)) {
|
|
|
+ p.requestHeader.setXid(cnxn.getXid());
|
|
|
+ }
|
|
|
+ p.createBB();
|
|
|
}
|
|
|
- p.createBB();
|
|
|
- ByteBuffer pbb = p.bb;
|
|
|
- sock.write(pbb);
|
|
|
- if (!pbb.hasRemaining()) {
|
|
|
+ sock.write(p.bb);
|
|
|
+ if (!p.bb.hasRemaining()) {
|
|
|
sentCount++;
|
|
|
+ outgoingQueue.removeFirstOccurrence(p);
|
|
|
if (p.requestHeader != null
|
|
|
&& p.requestHeader.getType() != OpCode.ping
|
|
|
&& p.requestHeader.getType() != OpCode.auth) {
|
|
|
- pending.add(p);
|
|
|
+ synchronized (pendingQueue) {
|
|
|
+ pendingQueue.add(p);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- } else {
|
|
|
- // No suitable packet to send: turn off write interest flag.
|
|
|
+ }
|
|
|
+ if (outgoingQueue.isEmpty()) {
|
|
|
+ // No more packets to send: turn off write interest flag.
|
|
|
// Will be turned on later by a later call to enableWrite(),
|
|
|
// from within ZooKeeperSaslClient (if client is configured
|
|
|
// to attempt SASL authentication), or in either doIO() or
|
|
|
// in doTransport() if not.
|
|
|
disableWrite();
|
|
|
+ } else {
|
|
|
+ // Just in case
|
|
|
+ enableWrite();
|
|
|
}
|
|
|
}
|
|
|
- synchronized(pendingQueue) {
|
|
|
- pendingQueue.addAll(pending);
|
|
|
- }
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private Packet findSendablePacket(LinkedList<Packet> outgoingQueue,
|
|
|
boolean clientTunneledAuthenticationInProgress) {
|
|
|
synchronized (outgoingQueue) {
|
|
|
- if (!outgoingQueue.isEmpty()) {
|
|
|
- if (clientTunneledAuthenticationInProgress) {
|
|
|
- Packet p = null;
|
|
|
- // Since client's authentication with server is in progress,
|
|
|
- // send only the null-header packet queued by primeConnection().
|
|
|
- // This packet must be sent so that the SASL authentication process
|
|
|
- // can proceed, but all other packets should wait until
|
|
|
- // SASL authentication completes.
|
|
|
- Iterator<Packet> iter = outgoingQueue.listIterator();
|
|
|
- while(iter.hasNext()) {
|
|
|
- p = iter.next();
|
|
|
- if (p.requestHeader == null) {
|
|
|
- // We've found the priming-packet.
|
|
|
- return p;
|
|
|
- } else {
|
|
|
- // Non-priming packet: defer it until later, leaving it in the queue
|
|
|
- // until authentication completes.
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("deferring non-priming packet: " + p +
|
|
|
- "until SASL authentication completes.");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // no sendable packet found.
|
|
|
- return null;
|
|
|
+ if (outgoingQueue.isEmpty()) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ if (outgoingQueue.getFirst().bb != null // If we've already starting sending the first packet, we better finish
|
|
|
+ || !clientTunneledAuthenticationInProgress) {
|
|
|
+ return outgoingQueue.getFirst();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Since client's authentication with server is in progress,
|
|
|
+ // send only the null-header packet queued by primeConnection().
|
|
|
+ // This packet must be sent so that the SASL authentication process
|
|
|
+ // can proceed, but all other packets should wait until
|
|
|
+ // SASL authentication completes.
|
|
|
+ ListIterator<Packet> iter = outgoingQueue.listIterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ Packet p = iter.next();
|
|
|
+ if (p.requestHeader == null) {
|
|
|
+ // We've found the priming-packet. Move it to the beginning of the queue.
|
|
|
+ iter.remove();
|
|
|
+ outgoingQueue.add(0, p);
|
|
|
+ return p;
|
|
|
} else {
|
|
|
- // Tunnelled authentication is not in progress: just
|
|
|
- // send the first packet in the queue.
|
|
|
- return outgoingQueue.getFirst();
|
|
|
+ // Non-priming packet: defer it until later, leaving it in the queue
|
|
|
+ // until authentication completes.
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("deferring non-priming packet: " + p +
|
|
|
+ "until SASL authentication completes.");
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ // no sendable packet found.
|
|
|
+ return null;
|
|
|
}
|
|
|
- return null;
|
|
|
}
|
|
|
|
|
|
@Override
|