|
@@ -32,13 +32,14 @@ import java.nio.channels.ServerSocketChannel;
|
|
|
import java.nio.channels.SocketChannel;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collections;
|
|
|
+import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
-import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.jute.BinaryInputArchive;
|
|
|
import org.apache.jute.BinaryOutputArchive;
|
|
@@ -334,8 +335,6 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
|
|
|
ArrayList<Id> authInfo = new ArrayList<Id>();
|
|
|
|
|
|
- LinkedList<Request> outstanding = new LinkedList<Request>();
|
|
|
-
|
|
|
/* Send close connection packet to the client, doIO will eventually
|
|
|
* close the underlying machinery (like socket, selectorkey, etc...)
|
|
|
*/
|
|
@@ -388,7 +387,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
public EndOfStreamException(String msg) {
|
|
|
super(msg);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public String toString() {
|
|
|
return "EndOfStreamException: " + getMessage();
|
|
|
}
|
|
@@ -418,7 +417,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
incomingBuffer = lenBuffer;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
void doIO(SelectionKey k) throws InterruptedException {
|
|
|
try {
|
|
|
if (sock == null) {
|
|
@@ -701,14 +700,14 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
|
|
|
private void packetReceived() {
|
|
|
- stats.packetsReceived++;
|
|
|
+ stats.incrPacketsReceived();
|
|
|
if (zk != null) {
|
|
|
zk.serverStats().incrementPacketsReceived();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void packetSent() {
|
|
|
- stats.packetsSent++;
|
|
|
+ stats.incrPacketsSent();
|
|
|
if (zk != null) {
|
|
|
zk.serverStats().incrementPacketsSent();
|
|
|
}
|
|
@@ -779,41 +778,48 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
k.interestOps(SelectionKey.OP_WRITE);
|
|
|
return true;
|
|
|
- } else if (len == reqsCmd) {
|
|
|
- LOG.info("Processing reqs command from "
|
|
|
+ } else if (len == statCmd || len == srvrCmd) {
|
|
|
+ LOG.info("Processing " + (len == statCmd ? "stat" : "srvr")
|
|
|
+ + " command from "
|
|
|
+ sock.socket().getRemoteSocketAddress());
|
|
|
packetReceived();
|
|
|
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
- sb.append("Requests:\n");
|
|
|
- synchronized (outstanding) {
|
|
|
- for (Request r : outstanding) {
|
|
|
- sb.append(r.toString());
|
|
|
- sb.append('\n');
|
|
|
+ if (zk != null){
|
|
|
+ sb.append("Zookeeper version: ").append(Version.getFullVersion())
|
|
|
+ .append("\n");
|
|
|
+ if (len == statCmd) {
|
|
|
+ sb.append("Clients:\n");
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ for(NIOServerCnxn c : factory.cnxns){
|
|
|
+ sb.append(((CnxnStats)c.getStats()).toString(true));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sb.append("\n");
|
|
|
}
|
|
|
+ sb.append(zk.serverStats().toString());
|
|
|
+ sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
|
|
|
+ append("\n");
|
|
|
+ } else {
|
|
|
+ sb.append("ZooKeeperServer not running\n");
|
|
|
}
|
|
|
+
|
|
|
sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
|
|
|
k.interestOps(SelectionKey.OP_WRITE);
|
|
|
return true;
|
|
|
- } else if (len == statCmd) {
|
|
|
- LOG.info("Processing stat command from "
|
|
|
+ } else if (len == consCmd) {
|
|
|
+ LOG.info("Processing cons command from "
|
|
|
+ sock.socket().getRemoteSocketAddress());
|
|
|
packetReceived();
|
|
|
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
- if(zk != null){
|
|
|
- sb.append("Zookeeper version: ").append(Version.getFullVersion())
|
|
|
- .append("\n");
|
|
|
- sb.append("Clients:\n");
|
|
|
+ if (zk != null){
|
|
|
synchronized(factory.cnxns){
|
|
|
for(NIOServerCnxn c : factory.cnxns){
|
|
|
- sb.append(c.getStats().toString());
|
|
|
+ sb.append(((CnxnStats)c.getStats()).toString(false));
|
|
|
}
|
|
|
}
|
|
|
sb.append("\n");
|
|
|
- sb.append(zk.serverStats().toString());
|
|
|
- sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
|
|
|
- append("\n");
|
|
|
} else {
|
|
|
sb.append("ZooKeeperServer not running\n");
|
|
|
}
|
|
@@ -846,7 +852,21 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
|
|
|
zk.serverStats().reset();
|
|
|
|
|
|
- sendBuffer(ByteBuffer.wrap("Stats reset.\n".getBytes()));
|
|
|
+ sendBuffer(ByteBuffer.wrap("Server stats reset.\n".getBytes()));
|
|
|
+ k.interestOps(SelectionKey.OP_WRITE);
|
|
|
+ return true;
|
|
|
+ } else if (len == crstCmd) {
|
|
|
+ LOG.info("Processing crst command from "
|
|
|
+ + sock.socket().getRemoteSocketAddress());
|
|
|
+ packetReceived();
|
|
|
+
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ for(NIOServerCnxn c : factory.cnxns){
|
|
|
+ c.getStats().reset();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ sendBuffer(ByteBuffer.wrap("Connection stats reset.\n".getBytes()));
|
|
|
k.interestOps(SelectionKey.OP_WRITE);
|
|
|
return true;
|
|
|
}
|
|
@@ -1146,13 +1166,77 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
return (InetSocketAddress) sock.socket().getRemoteSocketAddress();
|
|
|
}
|
|
|
|
|
|
- private class CnxnStats implements ServerCnxn.Stats{
|
|
|
- long packetsReceived;
|
|
|
- long packetsSent;
|
|
|
+ class CnxnStats implements ServerCnxn.Stats {
|
|
|
+ private final Date established = new Date();
|
|
|
+
|
|
|
+ private final AtomicLong packetsReceived = new AtomicLong();
|
|
|
+ private final AtomicLong packetsSent = new AtomicLong();
|
|
|
+
|
|
|
+ private long minLatency;
|
|
|
+ private long maxLatency;
|
|
|
+ private String lastOp;
|
|
|
+ private long lastCxid;
|
|
|
+ private long lastZxid;
|
|
|
+ private long lastResponseTime;
|
|
|
+ private long lastLatency;
|
|
|
+
|
|
|
+ private long count;
|
|
|
+ private long totalLatency;
|
|
|
+
|
|
|
+ CnxnStats() {
|
|
|
+ reset();
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized void reset() {
|
|
|
+ packetsReceived.set(0);
|
|
|
+ packetsSent.set(0);
|
|
|
+ minLatency = Long.MAX_VALUE;
|
|
|
+ maxLatency = 0;
|
|
|
+ lastOp = "NA";
|
|
|
+ lastCxid = -1;
|
|
|
+ lastZxid = -1;
|
|
|
+ lastResponseTime = 0;
|
|
|
+ lastLatency = 0;
|
|
|
+
|
|
|
+ count = 0;
|
|
|
+ totalLatency = 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ long incrPacketsReceived() {
|
|
|
+ return packetsReceived.incrementAndGet();
|
|
|
+ }
|
|
|
+
|
|
|
+ long incrPacketsSent() {
|
|
|
+ return packetsSent.incrementAndGet();
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized void updateForResponse(long cxid, long zxid, String op,
|
|
|
+ long start, long end)
|
|
|
+ {
|
|
|
+ // don't overwrite with "special" xids - we're interested
|
|
|
+ // in the clients last real operation
|
|
|
+ if (cxid >= 0) {
|
|
|
+ lastCxid = cxid;
|
|
|
+ }
|
|
|
+ lastZxid = zxid;
|
|
|
+ lastOp = op;
|
|
|
+ lastResponseTime = end;
|
|
|
+ long elapsed = end - start;
|
|
|
+ lastLatency = elapsed;
|
|
|
+ if (elapsed < minLatency) {
|
|
|
+ minLatency = elapsed;
|
|
|
+ }
|
|
|
+ if (elapsed > maxLatency) {
|
|
|
+ maxLatency = elapsed;
|
|
|
+ }
|
|
|
+ count++;
|
|
|
+ totalLatency += elapsed;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Date getEstablished() {
|
|
|
+ return established;
|
|
|
+ }
|
|
|
|
|
|
- /**
|
|
|
- * The number of requests that have been submitted but not yet responded to.
|
|
|
- */
|
|
|
public long getOutstandingRequests() {
|
|
|
synchronized (NIOServerCnxn.this) {
|
|
|
synchronized (NIOServerCnxn.this.factory) {
|
|
@@ -1162,16 +1246,61 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
|
|
|
public long getPacketsReceived() {
|
|
|
- return packetsReceived;
|
|
|
+ return packetsReceived.longValue();
|
|
|
}
|
|
|
|
|
|
public long getPacketsSent() {
|
|
|
- return packetsSent;
|
|
|
+ return packetsSent.longValue();
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getMinLatency() {
|
|
|
+ return minLatency == Long.MAX_VALUE ? 0 : minLatency;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getAvgLatency() {
|
|
|
+ return count == 0 ? 0 : totalLatency / count;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getMaxLatency() {
|
|
|
+ return maxLatency;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized String getLastOperation() {
|
|
|
+ return lastOp;
|
|
|
}
|
|
|
|
|
|
+ public synchronized long getLastCxid() {
|
|
|
+ return lastCxid;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getLastZxid() {
|
|
|
+ return lastZxid;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getLastResponseTime() {
|
|
|
+ return lastResponseTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ public synchronized long getLastLatency() {
|
|
|
+ return lastLatency;
|
|
|
+ }
|
|
|
+
|
|
|
+ /** Prints brief stats information for the connection.
|
|
|
+ *
|
|
|
+ * @see toString(boolean) for detailed stats
|
|
|
+ */
|
|
|
@Override
|
|
|
- public String toString(){
|
|
|
- StringBuilder sb=new StringBuilder();
|
|
|
+ public String toString() {
|
|
|
+ return toString(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Print information about the connection.
|
|
|
+ * @param brief iff true prints brief details, otw full detail
|
|
|
+ * @return information about this connection
|
|
|
+ */
|
|
|
+ public String toString(boolean brief) {
|
|
|
+ StringBuilder sb = new StringBuilder();
|
|
|
Channel channel = sk.channel();
|
|
|
if (channel instanceof SocketChannel) {
|
|
|
sb.append(" ").append(((SocketChannel)channel).socket()
|
|
@@ -1179,15 +1308,37 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
.append("[").append(Integer.toHexString(sk.interestOps()))
|
|
|
.append("](queued=").append(getOutstandingRequests())
|
|
|
.append(",recved=").append(getPacketsReceived())
|
|
|
- .append(",sent=").append(getPacketsSent()).append(")\n");
|
|
|
+ .append(",sent=").append(getPacketsSent());
|
|
|
+
|
|
|
+ if (!brief) {
|
|
|
+ long sessionId = getSessionId();
|
|
|
+ if (sessionId != 0) {
|
|
|
+ sb.append(",sid=0x").append(Long.toHexString(sessionId))
|
|
|
+ .append(",lop=").append(getLastOperation())
|
|
|
+ .append(",est=").append(getEstablished().getTime())
|
|
|
+ .append(",to=").append(getSessionTimeout());
|
|
|
+ long lastCxid = getLastCxid();
|
|
|
+ if (lastCxid >= 0) {
|
|
|
+ sb.append(",lcxid=0x")
|
|
|
+ .append(Long.toHexString(lastCxid));
|
|
|
+ }
|
|
|
+ sb.append(",lzxid=0x")
|
|
|
+ .append(Long.toHexString(getLastZxid()))
|
|
|
+ .append(",lresp=").append(getLastResponseTime())
|
|
|
+ .append(",llat=").append(getLastLatency())
|
|
|
+ .append(",minlat=").append(getMinLatency())
|
|
|
+ .append(",avglat=").append(getAvgLatency())
|
|
|
+ .append(",maxlat=").append(getMaxLatency());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ sb.append(")\n");
|
|
|
}
|
|
|
return sb.toString();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private CnxnStats stats=new CnxnStats();
|
|
|
+ private final CnxnStats stats = new CnxnStats();
|
|
|
public Stats getStats() {
|
|
|
return stats;
|
|
|
}
|
|
|
-
|
|
|
}
|