|
@@ -23,6 +23,7 @@ import java.io.IOException;
|
|
|
import java.lang.Thread.UncaughtExceptionHandler;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
+import java.net.SocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.SelectionKey;
|
|
|
import java.nio.channels.Selector;
|
|
@@ -39,8 +40,8 @@ import org.apache.jute.BinaryOutputArchive;
|
|
|
import org.apache.jute.Record;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.apache.zookeeper.AsyncCallback.ACLCallback;
|
|
|
-import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.Children2Callback;
|
|
|
+import org.apache.zookeeper.AsyncCallback.ChildrenCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.DataCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.StatCallback;
|
|
|
import org.apache.zookeeper.AsyncCallback.StringCallback;
|
|
@@ -58,8 +59,8 @@ import org.apache.zookeeper.proto.ConnectResponse;
|
|
|
import org.apache.zookeeper.proto.CreateResponse;
|
|
|
import org.apache.zookeeper.proto.ExistsResponse;
|
|
|
import org.apache.zookeeper.proto.GetACLResponse;
|
|
|
-import org.apache.zookeeper.proto.GetChildrenResponse;
|
|
|
import org.apache.zookeeper.proto.GetChildren2Response;
|
|
|
+import org.apache.zookeeper.proto.GetChildrenResponse;
|
|
|
import org.apache.zookeeper.proto.GetDataResponse;
|
|
|
import org.apache.zookeeper.proto.ReplyHeader;
|
|
|
import org.apache.zookeeper.proto.RequestHeader;
|
|
@@ -167,15 +168,57 @@ public class ClientCnxn {
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
- sb.append("sessionId: 0x").append(Long.toHexString(getSessionId()))
|
|
|
- .append("\n");
|
|
|
- sb.append("lastZxid: ").append(lastZxid).append("\n");
|
|
|
- sb.append("xid: ").append(xid).append("\n");
|
|
|
- sb.append("nextAddrToTry: ").append(nextAddrToTry).append("\n");
|
|
|
- sb.append("serverAddrs: ").append(serverAddrs.get(nextAddrToTry))
|
|
|
- .append("\n");
|
|
|
+
|
|
|
+ SocketAddress local = getLocalSocketAddress();
|
|
|
+ SocketAddress remote = getRemoteSocketAddress();
|
|
|
+ sb
|
|
|
+ .append("sessionid:0x").append(Long.toHexString(getSessionId()))
|
|
|
+ .append(" local:").append(local)
|
|
|
+ .append(" remoteserver:").append(remote)
|
|
|
+ .append(" lastZxid:").append(lastZxid)
|
|
|
+ .append(" xid:").append(xid)
|
|
|
+ .append(" sent:").append(sendThread.sentCount)
|
|
|
+ .append(" recv:").append(sendThread.recvCount)
|
|
|
+ .append(" queuedpkts:").append(outgoingQueue.size())
|
|
|
+ .append(" pendingresp:").append(pendingQueue.size())
|
|
|
+ .append(" queuedevents:").append(eventThread.waitingEvents.size());
|
|
|
+
|
|
|
return sb.toString();
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the address to which the socket is connected.
|
|
|
+ * @return ip address of the remote side of the connection or null if
|
|
|
+ * not connected
|
|
|
+ */
|
|
|
+ SocketAddress getRemoteSocketAddress() {
|
|
|
+ // a lot could go wrong here, so rather than put in a bunch of code
|
|
|
+ // to check for nulls all down the chain let's do it the simple
|
|
|
+ // yet bulletproof way
|
|
|
+ try {
|
|
|
+ return ((SocketChannel)sendThread.sockKey.channel())
|
|
|
+ .socket().getRemoteSocketAddress();
|
|
|
+ } catch (NullPointerException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns the local address to which the socket is bound.
|
|
|
+ * @return ip address of the remote side of the connection or null if
|
|
|
+ * not connected
|
|
|
+ */
|
|
|
+ SocketAddress getLocalSocketAddress() {
|
|
|
+ // a lot could go wrong here, so rather than put in a bunch of code
|
|
|
+ // to check for nulls all down the chain let's do it the simple
|
|
|
+ // yet bulletproof way
|
|
|
+ try {
|
|
|
+ return ((SocketChannel)sendThread.sockKey.channel())
|
|
|
+ .socket().getLocalSocketAddress();
|
|
|
+ } catch (NullPointerException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
|
* This class allows us to pass the headers and the relevant records around.
|
|
@@ -608,6 +651,9 @@ public class ClientCnxn {
|
|
|
|
|
|
private long lastPingSentNs;
|
|
|
|
|
|
+ long sentCount = 0;
|
|
|
+ long recvCount = 0;
|
|
|
+
|
|
|
void readLength() throws IOException {
|
|
|
int len = incomingBuffer.getInt();
|
|
|
if (len < 0 || len >= packetLen) {
|
|
@@ -758,14 +804,15 @@ public class ClientCnxn {
|
|
|
+ Long.toHexString(sessionId)
|
|
|
+ ", likely server has closed socket");
|
|
|
}
|
|
|
- if (incomingBuffer.remaining() == 0) {
|
|
|
+ if (!incomingBuffer.hasRemaining()) {
|
|
|
incomingBuffer.flip();
|
|
|
if (incomingBuffer == lenBuffer) {
|
|
|
+ recvCount++;
|
|
|
readLength();
|
|
|
} else if (!initialized) {
|
|
|
readConnectResult();
|
|
|
enableRead();
|
|
|
- if (outgoingQueue.size() > 0) {
|
|
|
+ if (!outgoingQueue.isEmpty()) {
|
|
|
enableWrite();
|
|
|
}
|
|
|
lenBuffer.clear();
|
|
@@ -782,9 +829,11 @@ public class ClientCnxn {
|
|
|
}
|
|
|
if (sockKey.isWritable()) {
|
|
|
synchronized (outgoingQueue) {
|
|
|
- if (outgoingQueue.size() > 0) {
|
|
|
- sock.write(outgoingQueue.getFirst().bb);
|
|
|
- if (outgoingQueue.getFirst().bb.remaining() == 0) {
|
|
|
+ if (!outgoingQueue.isEmpty()) {
|
|
|
+ ByteBuffer pbb = outgoingQueue.getFirst().bb;
|
|
|
+ sock.write(pbb);
|
|
|
+ if (!pbb.hasRemaining()) {
|
|
|
+ sentCount++;
|
|
|
Packet p = outgoingQueue.removeFirst();
|
|
|
if (p.header != null
|
|
|
&& p.header.getType() != OpCode.ping
|
|
@@ -795,7 +844,7 @@ public class ClientCnxn {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- if (outgoingQueue.size() == 0) {
|
|
|
+ if (outgoingQueue.isEmpty()) {
|
|
|
disableWrite();
|
|
|
} else {
|
|
|
enableWrite();
|