|
@@ -18,9 +18,13 @@
|
|
|
|
|
|
package org.apache.zookeeper.server;
|
|
|
|
|
|
+import java.io.BufferedWriter;
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
+import java.io.PrintWriter;
|
|
|
+import java.io.StringWriter;
|
|
|
+import java.io.Writer;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.nio.ByteBuffer;
|
|
@@ -188,10 +192,17 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
InetAddress addr = cnxn.sock.socket().getInetAddress();
|
|
|
Set<NIOServerCnxn> s = ipMap.get(addr);
|
|
|
if (s == null) {
|
|
|
- s = new HashSet<NIOServerCnxn>();
|
|
|
+ // in general we will see 1 connection from each
|
|
|
+ // host, setting the initial cap to 2 allows us
|
|
|
+ // to minimize mem usage in the common case
|
|
|
+ // of 1 entry -- we need to set the initial cap
|
|
|
+ // to 2 to avoid rehash when the first entry is added
|
|
|
+ s = new HashSet<NIOServerCnxn>(2);
|
|
|
+ s.add(cnxn);
|
|
|
+ ipMap.put(addr,s);
|
|
|
+ } else {
|
|
|
+ s.add(cnxn);
|
|
|
}
|
|
|
- s.add(cnxn);
|
|
|
- ipMap.put(addr,s);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -399,7 +410,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
LOG.error("Unexpected Exception: ", e);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private static class CloseRequestException extends IOException {
|
|
|
private static final long serialVersionUID = -7854505709816442681L;
|
|
|
|
|
@@ -466,6 +477,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
if (incomingBuffer == lenBuffer) { // start of next request
|
|
|
incomingBuffer.flip();
|
|
|
isPayload = readLength(k);
|
|
|
+ incomingBuffer.clear();
|
|
|
} else {
|
|
|
// continuation
|
|
|
isPayload = true;
|
|
@@ -740,162 +752,384 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /** Return if four letter word found and responded to, otw false **/
|
|
|
- private boolean checkFourLetterWord(SelectionKey k, int len)
|
|
|
- throws IOException
|
|
|
- {
|
|
|
- // We take advantage of the limited size of the length to look
|
|
|
- // for cmds. They are all 4-bytes which fits inside of an int
|
|
|
- if (len == ruokCmd) {
|
|
|
- LOG.info("Processing ruok command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int consCmd =
|
|
|
+ ByteBuffer.wrap("cons".getBytes()).getInt();
|
|
|
|
|
|
- sendBuffer(imok.duplicate());
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == getTraceMaskCmd) {
|
|
|
- LOG.info("Processing getracemask command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int crstCmd =
|
|
|
+ ByteBuffer.wrap("crst".getBytes()).getInt();
|
|
|
|
|
|
- long traceMask = ZooTrace.getTextTraceLevel();
|
|
|
- ByteBuffer resp = ByteBuffer.allocate(8);
|
|
|
- resp.putLong(traceMask);
|
|
|
- resp.flip();
|
|
|
- sendBuffer(resp);
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == setTraceMaskCmd) {
|
|
|
- LOG.info("Processing settracemask command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- incomingBuffer = ByteBuffer.allocate(8);
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int dumpCmd =
|
|
|
+ ByteBuffer.wrap("dump".getBytes()).getInt();
|
|
|
|
|
|
- int rc = sock.read(incomingBuffer);
|
|
|
- if (rc < 0) {
|
|
|
- throw new IOException("Read error");
|
|
|
- }
|
|
|
- packetReceived();
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int enviCmd =
|
|
|
+ ByteBuffer.wrap("envi".getBytes()).getInt();
|
|
|
|
|
|
- System.out.println("rc=" + rc);
|
|
|
- incomingBuffer.flip();
|
|
|
- long traceMask = incomingBuffer.getLong();
|
|
|
- ZooTrace.setTextTraceLevel(traceMask);
|
|
|
- ByteBuffer resp = ByteBuffer.allocate(8);
|
|
|
- resp.putLong(traceMask);
|
|
|
- resp.flip();
|
|
|
- sendBuffer(resp);
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == dumpCmd) {
|
|
|
- LOG.info("Processing dump command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int getTraceMaskCmd =
|
|
|
+ ByteBuffer.wrap("gtmk".getBytes()).getInt();
|
|
|
|
|
|
- if (zk == null) {
|
|
|
- sendBuffer(ByteBuffer.wrap("ZooKeeper not active \n"
|
|
|
- .getBytes()));
|
|
|
- } else {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- sb.append("SessionTracker dump: \n");
|
|
|
- sb.append(zk.sessionTracker.toString()).append("\n");
|
|
|
- sb.append("ephemeral nodes dump:\n");
|
|
|
- sb.append(zk.getZKDatabase().dumpEphemerals()).append("\n");
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int ruokCmd =
|
|
|
+ ByteBuffer.wrap("ruok".getBytes()).getInt();
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int setTraceMaskCmd =
|
|
|
+ ByteBuffer.wrap("stmk".getBytes()).getInt();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int srvrCmd =
|
|
|
+ ByteBuffer.wrap("srvr".getBytes()).getInt();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int srstCmd =
|
|
|
+ ByteBuffer.wrap("srst".getBytes()).getInt();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int statCmd =
|
|
|
+ ByteBuffer.wrap("stat".getBytes()).getInt();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int wchcCmd =
|
|
|
+ ByteBuffer.wrap("wchc".getBytes()).getInt();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int wchpCmd =
|
|
|
+ ByteBuffer.wrap("wchp".getBytes()).getInt();
|
|
|
+
|
|
|
+ /*
|
|
|
+ * See <a href="{@docRoot}/../../../docs/zookeeperAdmin.html#sc_zkCommands">
|
|
|
+ * Zk Admin</a>. this link is for all the commands.
|
|
|
+ */
|
|
|
+ private final static int wchsCmd =
|
|
|
+ ByteBuffer.wrap("wchs".getBytes()).getInt();
|
|
|
+
|
|
|
+ private final static HashMap<Integer, String> cmd2String =
|
|
|
+ new HashMap<Integer, String>();
|
|
|
+
|
|
|
+ // specify all of the commands that are available
|
|
|
+ static {
|
|
|
+ cmd2String.put(consCmd, "cons");
|
|
|
+ cmd2String.put(crstCmd, "crst");
|
|
|
+ cmd2String.put(dumpCmd, "dump");
|
|
|
+ cmd2String.put(enviCmd, "envi");
|
|
|
+ cmd2String.put(getTraceMaskCmd, "gtmk");
|
|
|
+ cmd2String.put(ruokCmd, "ruok");
|
|
|
+ cmd2String.put(setTraceMaskCmd, "stmk");
|
|
|
+ cmd2String.put(srstCmd, "srst");
|
|
|
+ cmd2String.put(srvrCmd, "srvr");
|
|
|
+ cmd2String.put(statCmd, "stat");
|
|
|
+ cmd2String.put(wchcCmd, "wchc");
|
|
|
+ cmd2String.put(wchpCmd, "wchp");
|
|
|
+ cmd2String.put(wchsCmd, "wchs");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * This class wraps the sendBuffer method of NIOServerCnxn. It is
|
|
|
+ * responsible for chunking up the response to a client. Rather
|
|
|
+ * than cons'ing up a response fully in memory, which may be large
|
|
|
+ * for some commands, this class chunks up the result.
|
|
|
+ */
|
|
|
+ private class SendBufferWriter extends Writer {
|
|
|
+ private StringBuffer sb = new StringBuffer();
|
|
|
+
|
|
|
+ /* FYI: clearing the READ interestOps on the key results in
|
|
|
+ * the cnxn being closed in doIO.
|
|
|
+ */
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Check if we are ready to send another chunk.
|
|
|
+ * @param force force sending, even if not a full chunk
|
|
|
+ */
|
|
|
+ private void checkFlush(boolean force) {
|
|
|
+ if ((force && sb.length() > 0) || sb.length() > 2048) {
|
|
|
sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
|
|
|
+ // including op_read keeps doio from closing the conn
|
|
|
+ wakeup(SelectionKey.OP_READ
|
|
|
+ | SelectionKey.OP_WRITE);
|
|
|
+
|
|
|
+ // clear our internal buffer
|
|
|
+ sb.setLength(0);
|
|
|
}
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == statCmd || len == srvrCmd) {
|
|
|
- LOG.info("Processing " + (len == statCmd ? "stat" : "srvr")
|
|
|
- + " command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ }
|
|
|
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- if (zk != null){
|
|
|
- sb.append("Zookeeper version: ").append(Version.getFullVersion())
|
|
|
- .append("\n");
|
|
|
- if (len == statCmd) {
|
|
|
- sb.append("Clients:\n");
|
|
|
- synchronized(factory.cnxns){
|
|
|
- for(NIOServerCnxn c : factory.cnxns){
|
|
|
- sb.append(((CnxnStats)c.getStats()).toString(true));
|
|
|
- }
|
|
|
- }
|
|
|
- sb.append("\n");
|
|
|
- }
|
|
|
- sb.append(zk.serverStats().toString());
|
|
|
- sb.append("Node count: ").append(zk.getZKDatabase().getNodeCount()).
|
|
|
- append("\n");
|
|
|
- } else {
|
|
|
- sb.append("ZooKeeperServer not running\n");
|
|
|
+ /**
|
|
|
+ * Wakeup the selector. This is necessary as the cnxn is
|
|
|
+ * waiting for interestOps to be satisfied. If we want the
|
|
|
+ * selector to wakeup immediately (rather than the last
|
|
|
+ * select(timeout) period) we need to force a wakeup.
|
|
|
+ * @param sel the new interest ops
|
|
|
+ */
|
|
|
+ private void wakeup(int sel) {
|
|
|
+ synchronized(factory) {
|
|
|
+ sk.selector().wakeup();
|
|
|
+ sk.interestOps(sel);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == consCmd) {
|
|
|
- LOG.info("Processing cons command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ if (sb == null) return;
|
|
|
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
- if (zk != null){
|
|
|
- synchronized(factory.cnxns){
|
|
|
- for(NIOServerCnxn c : factory.cnxns){
|
|
|
- sb.append(((CnxnStats)c.getStats()).toString(false));
|
|
|
- }
|
|
|
- }
|
|
|
- sb.append("\n");
|
|
|
- } else {
|
|
|
- sb.append("ZooKeeperServer not running\n");
|
|
|
- }
|
|
|
+ checkFlush(true);
|
|
|
|
|
|
- sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == enviCmd) {
|
|
|
- LOG.info("Processing envi command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ // nothing left, please close
|
|
|
+ wakeup(SelectionKey.OP_WRITE);
|
|
|
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
+ sb = null; // clear out the ref to ensure no reuse
|
|
|
+ }
|
|
|
|
|
|
- List<Environment.Entry> env = Environment.list();
|
|
|
+ @Override
|
|
|
+ public void flush() throws IOException {
|
|
|
+ checkFlush(true);
|
|
|
+ }
|
|
|
|
|
|
- sb.append("Environment:\n");
|
|
|
- for(Environment.Entry e : env) {
|
|
|
- sb.append(e.getKey()).append("=").append(e.getValue())
|
|
|
- .append("\n");
|
|
|
- }
|
|
|
+ @Override
|
|
|
+ public void write(char[] cbuf, int off, int len) throws IOException {
|
|
|
+ sb.append(cbuf, off, len);
|
|
|
+ checkFlush(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == srstCmd) {
|
|
|
- LOG.info("Processing srst command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ private static final String ZK_NOT_SERVING =
|
|
|
+ "This ZooKeeper instance is not currently serving requests";
|
|
|
|
|
|
- zk.serverStats().reset();
|
|
|
+ /** Return if four letter word found and responded to, otw false **/
|
|
|
+ private boolean checkFourLetterWord(final SelectionKey k, final int len)
|
|
|
+ throws IOException
|
|
|
+ {
|
|
|
+ // We take advantage of the limited size of the length to look
|
|
|
+ // for cmds. They are all 4-bytes which fits inside of an int
|
|
|
+ String cmd = cmd2String.get(len);
|
|
|
+ if (cmd == null) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ LOG.info("Processing " + cmd + " command from "
|
|
|
+ + sock.socket().getRemoteSocketAddress());
|
|
|
+ packetReceived();
|
|
|
|
|
|
- sendBuffer(ByteBuffer.wrap("Server stats reset.\n".getBytes()));
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
- } else if (len == crstCmd) {
|
|
|
- LOG.info("Processing crst command from "
|
|
|
- + sock.socket().getRemoteSocketAddress());
|
|
|
- packetReceived();
|
|
|
+ final PrintWriter pwriter = new PrintWriter(
|
|
|
+ new BufferedWriter(new SendBufferWriter()));
|
|
|
+ boolean threadWillClosePWriter = false;
|
|
|
+ try {
|
|
|
+ if (len == ruokCmd) {
|
|
|
+ pwriter.print("imok");
|
|
|
+ return true;
|
|
|
+ } else if (len == getTraceMaskCmd) {
|
|
|
+ long traceMask = ZooTrace.getTextTraceLevel();
|
|
|
+ pwriter.print(traceMask);
|
|
|
+ return true;
|
|
|
+ } else if (len == setTraceMaskCmd) {
|
|
|
+ int rc = sock.read(incomingBuffer);
|
|
|
+ if (rc < 0) {
|
|
|
+ throw new IOException("Read error");
|
|
|
+ }
|
|
|
|
|
|
- synchronized(factory.cnxns){
|
|
|
- for(NIOServerCnxn c : factory.cnxns){
|
|
|
- c.getStats().reset();
|
|
|
+ incomingBuffer.flip();
|
|
|
+ long traceMask = incomingBuffer.getLong();
|
|
|
+ ZooTrace.setTextTraceLevel(traceMask);
|
|
|
+ pwriter.print(traceMask);
|
|
|
+ return true;
|
|
|
+ } else if (len == enviCmd) {
|
|
|
+ List<Environment.Entry> env = Environment.list();
|
|
|
+
|
|
|
+ pwriter.println("Environment:");
|
|
|
+ for(Environment.Entry e : env) {
|
|
|
+ pwriter.print(e.getKey());
|
|
|
+ pwriter.print("=");
|
|
|
+ pwriter.println(e.getValue());
|
|
|
}
|
|
|
- }
|
|
|
+ return true;
|
|
|
+ } else if (len == srstCmd) {
|
|
|
+ if (zk == null) {
|
|
|
+ pwriter.println(ZK_NOT_SERVING);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ zk.serverStats().reset();
|
|
|
+ pwriter.println("Server stats reset.");
|
|
|
+ return true;
|
|
|
+ } else if (len == crstCmd) {
|
|
|
+ if (zk == null) {
|
|
|
+ pwriter.println(ZK_NOT_SERVING);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ for(NIOServerCnxn c : factory.cnxns){
|
|
|
+ c.getStats().reset();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pwriter.println("Connection stats reset.");
|
|
|
+ return true;
|
|
|
+ } else if (len == dumpCmd) {
|
|
|
+ if (zk == null) {
|
|
|
+ pwriter.println(ZK_NOT_SERVING);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ // this could be a long running task, spawn a thread so
|
|
|
+ // that we don't block the processing of other requests
|
|
|
+ threadWillClosePWriter = true;
|
|
|
+ new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ pwriter.println("SessionTracker dump:");
|
|
|
+ zk.sessionTracker.dumpSessions(pwriter);
|
|
|
+ pwriter.println("ephemeral nodes dump:");
|
|
|
+ zk.dumpEphemerals(pwriter);
|
|
|
+ } finally {
|
|
|
+ pwriter.flush();
|
|
|
+ pwriter.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }.start();
|
|
|
|
|
|
- sendBuffer(ByteBuffer.wrap("Connection stats reset.\n".getBytes()));
|
|
|
- k.interestOps(SelectionKey.OP_WRITE);
|
|
|
- return true;
|
|
|
+ return true;
|
|
|
+ } else if (len == statCmd || len == srvrCmd) {
|
|
|
+ if (zk == null) {
|
|
|
+ pwriter.println(ZK_NOT_SERVING);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ // this could be a long running task, spawn a thread so
|
|
|
+ // that we don't block the processing of other requests
|
|
|
+ threadWillClosePWriter = true;
|
|
|
+ new Thread() {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ pwriter.print("Zookeeper version: ");
|
|
|
+ pwriter.println(Version.getFullVersion());
|
|
|
+ if (len == statCmd) {
|
|
|
+ pwriter.println("Clients:");
|
|
|
+ // clone should be faster than iteration
|
|
|
+ // ie give up the cnxns lock faster
|
|
|
+ HashSet<NIOServerCnxn> cnxns;
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ cnxns = (HashSet<NIOServerCnxn>)factory
|
|
|
+ .cnxns.clone();
|
|
|
+ }
|
|
|
+ for(NIOServerCnxn c : cnxns){
|
|
|
+ ((CnxnStats)c.getStats())
|
|
|
+ .dumpConnectionInfo(pwriter, true);
|
|
|
+ }
|
|
|
+ pwriter.println();
|
|
|
+ }
|
|
|
+ pwriter.print(zk.serverStats().toString());
|
|
|
+ pwriter.print("Node count: ");
|
|
|
+ pwriter.println(zk.getZKDatabase().getNodeCount());
|
|
|
+ } finally {
|
|
|
+ pwriter.flush();
|
|
|
+ pwriter.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == consCmd) {
|
|
|
+ if (zk == null) {
|
|
|
+ pwriter.println(ZK_NOT_SERVING);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ // this could be a long running task, spawn a thread so
|
|
|
+ // that we don't block the processing of other requests
|
|
|
+ threadWillClosePWriter = true;
|
|
|
+ new Thread() {
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ // clone should be faster than iteration
|
|
|
+ // ie give up the cnxns lock faster
|
|
|
+ HashSet<NIOServerCnxn> cnxns;
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ cnxns = (HashSet<NIOServerCnxn>)factory
|
|
|
+ .cnxns.clone();
|
|
|
+ }
|
|
|
+ for(NIOServerCnxn c : cnxns){
|
|
|
+ ((CnxnStats)c.getStats())
|
|
|
+ .dumpConnectionInfo(pwriter, false);
|
|
|
+ }
|
|
|
+ pwriter.println();
|
|
|
+ } finally {
|
|
|
+ pwriter.flush();
|
|
|
+ pwriter.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
|
|
|
+ if (zk == null) {
|
|
|
+ pwriter.println(ZK_NOT_SERVING);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ // this could be a long running task, spawn a thread so
|
|
|
+ // that we don't block the processing of other requests
|
|
|
+ threadWillClosePWriter = true;
|
|
|
+ new Thread() {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ DataTree dt = zk.getZKDatabase().getDataTree();
|
|
|
+ if (len == wchsCmd) {
|
|
|
+ dt.dumpWatchesSummary(pwriter);
|
|
|
+ } else if (len == wchpCmd) {
|
|
|
+ dt.dumpWatches(pwriter, true);
|
|
|
+ } else {
|
|
|
+ dt.dumpWatches(pwriter, false);
|
|
|
+ }
|
|
|
+ pwriter.println();
|
|
|
+ } finally {
|
|
|
+ pwriter.flush();
|
|
|
+ pwriter.close();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }.start();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ // if we spawned a thread it is responsible for eventually
|
|
|
+ // flushing and closeing the writer
|
|
|
+ if (!threadWillClosePWriter) {
|
|
|
+ pwriter.flush();
|
|
|
+ pwriter.close();
|
|
|
+ }
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -1303,13 +1537,19 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
return lastLatency;
|
|
|
}
|
|
|
|
|
|
- /** Prints brief stats information for the connection.
|
|
|
- *
|
|
|
- * @see toString(boolean) for detailed stats
|
|
|
+ /**
|
|
|
+ * Prints detailed stats information for the connection.
|
|
|
+ *
|
|
|
+ * @see dumpConnectionInfo(PrintWriter, boolean) for brief stats
|
|
|
*/
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
- return toString(false);
|
|
|
+ StringWriter sw = new StringWriter();
|
|
|
+ PrintWriter pwriter = new PrintWriter(sw);
|
|
|
+ dumpConnectionInfo(pwriter, false);
|
|
|
+ pwriter.flush();
|
|
|
+ pwriter.close();
|
|
|
+ return sw.toString();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1317,41 +1557,55 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
* @param brief iff true prints brief details, otw full detail
|
|
|
* @return information about this connection
|
|
|
*/
|
|
|
- public String toString(boolean brief) {
|
|
|
- StringBuilder sb = new StringBuilder();
|
|
|
+ public synchronized void
|
|
|
+ dumpConnectionInfo(PrintWriter pwriter, boolean brief)
|
|
|
+ {
|
|
|
Channel channel = sk.channel();
|
|
|
if (channel instanceof SocketChannel) {
|
|
|
- sb.append(" ").append(((SocketChannel)channel).socket()
|
|
|
- .getRemoteSocketAddress())
|
|
|
- .append("[").append(Integer.toHexString(sk.interestOps()))
|
|
|
- .append("](queued=").append(getOutstandingRequests())
|
|
|
- .append(",recved=").append(getPacketsReceived())
|
|
|
- .append(",sent=").append(getPacketsSent());
|
|
|
+ pwriter.print(" ");
|
|
|
+ pwriter.print(((SocketChannel)channel).socket()
|
|
|
+ .getRemoteSocketAddress());
|
|
|
+ pwriter.print("[");
|
|
|
+ pwriter.print(Integer.toHexString(sk.interestOps()));
|
|
|
+ pwriter.print("](queued=");
|
|
|
+ pwriter.print(getOutstandingRequests());
|
|
|
+ pwriter.print(",recved=");
|
|
|
+ pwriter.print(getPacketsReceived());
|
|
|
+ pwriter.print(",sent=");
|
|
|
+ pwriter.print(getPacketsSent());
|
|
|
|
|
|
if (!brief) {
|
|
|
long sessionId = getSessionId();
|
|
|
if (sessionId != 0) {
|
|
|
- sb.append(",sid=0x").append(Long.toHexString(sessionId))
|
|
|
- .append(",lop=").append(getLastOperation())
|
|
|
- .append(",est=").append(getEstablished().getTime())
|
|
|
- .append(",to=").append(getSessionTimeout());
|
|
|
+ pwriter.print(",sid=0x");
|
|
|
+ pwriter.print(Long.toHexString(sessionId));
|
|
|
+ pwriter.print(",lop=");
|
|
|
+ pwriter.print(getLastOperation());
|
|
|
+ pwriter.print(",est=");
|
|
|
+ pwriter.print(getEstablished().getTime());
|
|
|
+ pwriter.print(",to=");
|
|
|
+ pwriter.print(getSessionTimeout());
|
|
|
long lastCxid = getLastCxid();
|
|
|
if (lastCxid >= 0) {
|
|
|
- sb.append(",lcxid=0x")
|
|
|
- .append(Long.toHexString(lastCxid));
|
|
|
+ pwriter.print(",lcxid=0x");
|
|
|
+ pwriter.print(Long.toHexString(lastCxid));
|
|
|
}
|
|
|
- sb.append(",lzxid=0x")
|
|
|
- .append(Long.toHexString(getLastZxid()))
|
|
|
- .append(",lresp=").append(getLastResponseTime())
|
|
|
- .append(",llat=").append(getLastLatency())
|
|
|
- .append(",minlat=").append(getMinLatency())
|
|
|
- .append(",avglat=").append(getAvgLatency())
|
|
|
- .append(",maxlat=").append(getMaxLatency());
|
|
|
+ pwriter.print(",lzxid=0x");
|
|
|
+ pwriter.print(Long.toHexString(getLastZxid()));
|
|
|
+ pwriter.print(",lresp=");
|
|
|
+ pwriter.print(getLastResponseTime());
|
|
|
+ pwriter.print(",llat=");
|
|
|
+ pwriter.print(getLastLatency());
|
|
|
+ pwriter.print(",minlat=");
|
|
|
+ pwriter.print(getMinLatency());
|
|
|
+ pwriter.print(",avglat=");
|
|
|
+ pwriter.print(getAvgLatency());
|
|
|
+ pwriter.print(",maxlat=");
|
|
|
+ pwriter.print(getMaxLatency());
|
|
|
}
|
|
|
}
|
|
|
- sb.append(")\n");
|
|
|
+ pwriter.println(")");
|
|
|
}
|
|
|
- return sb.toString();
|
|
|
}
|
|
|
}
|
|
|
|