|
@@ -39,7 +39,6 @@ import java.util.Collections;
|
|
|
import java.util.Date;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.LinkedBlockingQueue;
|
|
@@ -386,6 +385,29 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
sendBuffer(closeConn);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * send buffer without using the asynchronous
|
|
|
+ * calls to selector and then close the socket
|
|
|
+ * @param bb
|
|
|
+ */
|
|
|
+ void sendBufferSync(ByteBuffer bb) {
|
|
|
+ try {
|
|
|
+ /* configure socket to be blocking
|
|
|
+ * so that we dont have to do write in
|
|
|
+ * a tight while loop
|
|
|
+ */
|
|
|
+ sock.configureBlocking(true);
|
|
|
+ if (bb != closeConn) {
|
|
|
+ if (sock != null) {
|
|
|
+ sock.write(bb);
|
|
|
+ }
|
|
|
+ packetSent();
|
|
|
+ }
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.error("Error sending data synchronously ", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
void sendBuffer(ByteBuffer bb) {
|
|
|
try {
|
|
|
if (bb != closeConn) {
|
|
@@ -497,6 +519,11 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
if (isPayload) { // not the case for 4letterword
|
|
|
readPayload();
|
|
|
}
|
|
|
+ else {
|
|
|
+ // four letter words take care
|
|
|
+ // need not do anything else
|
|
|
+ return;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
if (k.isWritable()) {
|
|
@@ -885,6 +912,30 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
cmd2String.put(wchsCmd, "wchs");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * clean up the socket related to a command and also make sure we flush the
|
|
|
+ * data before we do that
|
|
|
+ *
|
|
|
+ * @param pwriter
|
|
|
+ * the pwriter for a command socket
|
|
|
+ */
|
|
|
+ private void cleanupWriterSocket(PrintWriter pwriter) {
|
|
|
+ try {
|
|
|
+ if (pwriter != null) {
|
|
|
+ pwriter.flush();
|
|
|
+ pwriter.close();
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Error closing PrintWriter ", e);
|
|
|
+ } finally {
|
|
|
+ try {
|
|
|
+ close();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Error closing a command socket ", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* This class wraps the sendBuffer method of NIOServerCnxn. It is
|
|
|
* responsible for chunking up the response to a client. Rather
|
|
@@ -893,50 +944,23 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
*/
|
|
|
private class SendBufferWriter extends Writer {
|
|
|
private StringBuffer sb = new StringBuffer();
|
|
|
-
|
|
|
- /* FYI: clearing the READ interestOps on the key results in
|
|
|
- * the cnxn being closed in doIO.
|
|
|
- */
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Check if we are ready to send another chunk.
|
|
|
* @param force force sending, even if not a full chunk
|
|
|
*/
|
|
|
private void checkFlush(boolean force) {
|
|
|
if ((force && sb.length() > 0) || sb.length() > 2048) {
|
|
|
- sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
|
|
|
- // including op_read keeps doio from closing the conn
|
|
|
- wakeup(SelectionKey.OP_READ
|
|
|
- | SelectionKey.OP_WRITE);
|
|
|
-
|
|
|
+ sendBufferSync(ByteBuffer.wrap(sb.toString().getBytes()));
|
|
|
// clear our internal buffer
|
|
|
sb.setLength(0);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Wakeup the selector. This is necessary as the cnxn is
|
|
|
- * waiting for interestOps to be satisfied. If we want the
|
|
|
- * selector to wakeup immediately (rather than the last
|
|
|
- * select(timeout) period) we need to force a wakeup.
|
|
|
- * @param sel the new interest ops
|
|
|
- */
|
|
|
- private void wakeup(int sel) {
|
|
|
- synchronized(factory) {
|
|
|
- sk.selector().wakeup();
|
|
|
- sk.interestOps(sel);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
public void close() throws IOException {
|
|
|
if (sb == null) return;
|
|
|
-
|
|
|
checkFlush(true);
|
|
|
-
|
|
|
- // nothing left, please close
|
|
|
- wakeup(SelectionKey.OP_WRITE);
|
|
|
-
|
|
|
sb = null; // clear out the ref to ensure no reuse
|
|
|
}
|
|
|
|
|
@@ -954,10 +978,254 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
|
|
|
private static final String ZK_NOT_SERVING =
|
|
|
"This ZooKeeper instance is not currently serving requests";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Set of threads for commmand ports. All the 4
|
|
|
+ * letter commands are run via a thread. Each class
|
|
|
+ * maps to a correspoding 4 letter command. CommandThread
|
|
|
+ * is the abstract class from which all the others inherit.
|
|
|
+ */
|
|
|
+ private abstract class CommandThread extends Thread {
|
|
|
+ PrintWriter pw;
|
|
|
+
|
|
|
+ CommandThread(PrintWriter pw) {
|
|
|
+ this.pw = pw;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ commandRun();
|
|
|
+ } catch (IOException ie) {
|
|
|
+ LOG.error("Error in running command ", ie);
|
|
|
+ } finally {
|
|
|
+ cleanupWriterSocket(pw);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public abstract void commandRun() throws IOException;
|
|
|
+ }
|
|
|
+
|
|
|
+ private class RuokCommand extends CommandThread {
|
|
|
+ public RuokCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ pw.print("imok");
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class TraceMaskCommand extends CommandThread {
|
|
|
+ TraceMaskCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ long traceMask = ZooTrace.getTextTraceLevel();
|
|
|
+ pw.print(traceMask);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class SetTraceMaskCommand extends CommandThread {
|
|
|
+ long trace = 0;
|
|
|
+ SetTraceMaskCommand(PrintWriter pw, long trace) {
|
|
|
+ super(pw);
|
|
|
+ this.trace = trace;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ pw.print(trace);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class EnvCommand extends CommandThread {
|
|
|
+ EnvCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ List<Environment.Entry> env = Environment.list();
|
|
|
+
|
|
|
+ pw.println("Environment:");
|
|
|
+ for(Environment.Entry e : env) {
|
|
|
+ pw.print(e.getKey());
|
|
|
+ pw.print("=");
|
|
|
+ pw.println(e.getValue());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class ConfCommand extends CommandThread {
|
|
|
+ ConfCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ if (zk == null) {
|
|
|
+ pw.println(ZK_NOT_SERVING);
|
|
|
+ } else {
|
|
|
+ zk.dumpConf(pw);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class StatResetCommand extends CommandThread {
|
|
|
+ public StatResetCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ if (zk == null) {
|
|
|
+ pw.println(ZK_NOT_SERVING);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ zk.serverStats().reset();
|
|
|
+ pw.println("Server stats reset.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class CnxnStatResetCommand extends CommandThread {
|
|
|
+ public CnxnStatResetCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ if (zk == null) {
|
|
|
+ pw.println(ZK_NOT_SERVING);
|
|
|
+ } else {
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ for(NIOServerCnxn c : factory.cnxns){
|
|
|
+ c.getStats().reset();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ pw.println("Connection stats reset.");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class DumpCommand extends CommandThread {
|
|
|
+ public DumpCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ if (zk == null) {
|
|
|
+ pw.println(ZK_NOT_SERVING);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ pw.println("SessionTracker dump:");
|
|
|
+ zk.sessionTracker.dumpSessions(pw);
|
|
|
+ pw.println("ephemeral nodes dump:");
|
|
|
+ zk.dumpEphemerals(pw);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class StatCommand extends CommandThread {
|
|
|
+ int len;
|
|
|
+ public StatCommand(PrintWriter pw, int len) {
|
|
|
+ super(pw);
|
|
|
+ this.len = len;
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ if (zk == null) {
|
|
|
+ pw.println(ZK_NOT_SERVING);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ pw.print("Zookeeper version: ");
|
|
|
+ pw.println(Version.getFullVersion());
|
|
|
+ if (len == statCmd) {
|
|
|
+ LOG.info("Stat command output");
|
|
|
+ pw.println("Clients:");
|
|
|
+ // clone should be faster than iteration
|
|
|
+ // ie give up the cnxns lock faster
|
|
|
+ HashSet<NIOServerCnxn> cnxnset;
|
|
|
+ synchronized(factory.cnxns){
|
|
|
+ cnxnset = (HashSet<NIOServerCnxn>)factory
|
|
|
+ .cnxns.clone();
|
|
|
+ }
|
|
|
+ for(NIOServerCnxn c : cnxnset){
|
|
|
+ ((CnxnStats)c.getStats())
|
|
|
+ .dumpConnectionInfo(pw, true);
|
|
|
+ }
|
|
|
+ pw.println();
|
|
|
+ }
|
|
|
+ pw.print(zk.serverStats().toString());
|
|
|
+ pw.print("Node count: ");
|
|
|
+ pw.println(zk.getZKDatabase().getNodeCount());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class ConsCommand extends CommandThread {
|
|
|
+ public ConsCommand(PrintWriter pw) {
|
|
|
+ super(pw);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ if (zk == null) {
|
|
|
+ pw.println(ZK_NOT_SERVING);
|
|
|
+ } else {
|
|
|
+ // clone should be faster than iteration
|
|
|
+ // ie give up the cnxns lock faster
|
|
|
+ HashSet<NIOServerCnxn> cnxns;
|
|
|
+ synchronized (factory.cnxns) {
|
|
|
+ cnxns = (HashSet<NIOServerCnxn>) factory.cnxns.clone();
|
|
|
+ }
|
|
|
+ for (NIOServerCnxn c : cnxns) {
|
|
|
+ ((CnxnStats) c.getStats()).dumpConnectionInfo(pw, false);
|
|
|
+ }
|
|
|
+ pw.println();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class WatchCommand extends CommandThread {
|
|
|
+ int len = 0;
|
|
|
+ public WatchCommand(PrintWriter pw, int len) {
|
|
|
+ super(pw);
|
|
|
+ this.len = len;
|
|
|
+ }
|
|
|
|
|
|
+ @Override
|
|
|
+ public void commandRun() {
|
|
|
+ if (zk == null) {
|
|
|
+ pw.println(ZK_NOT_SERVING);
|
|
|
+ } else {
|
|
|
+ DataTree dt = zk.getZKDatabase().getDataTree();
|
|
|
+ if (len == wchsCmd) {
|
|
|
+ dt.dumpWatchesSummary(pw);
|
|
|
+ } else if (len == wchpCmd) {
|
|
|
+ dt.dumpWatches(pw, true);
|
|
|
+ } else {
|
|
|
+ dt.dumpWatches(pw, false);
|
|
|
+ }
|
|
|
+ pw.println();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
/** Return if four letter word found and responded to, otw false **/
|
|
|
private boolean checkFourLetterWord(final SelectionKey k, final int len)
|
|
|
- throws IOException
|
|
|
+ throws IOException
|
|
|
{
|
|
|
// We take advantage of the limited size of the length to look
|
|
|
// for cmds. They are all 4-bytes which fits inside of an int
|
|
@@ -969,197 +1237,77 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
+ sock.socket().getRemoteSocketAddress());
|
|
|
packetReceived();
|
|
|
|
|
|
+ /** cancel the selection key to remove the socket handling
|
|
|
+ * from selector. This is to prevent netcat problem wherein
|
|
|
+ * netcat immediately closes the sending side after sending the
|
|
|
+ * commands and still keeps the receiving channel open.
|
|
|
+ * The idea is to remove the selectionkey from the selector
|
|
|
+ * so that the selector does not notice the closed read on the
|
|
|
+ * socket channel and keep the socket alive to write the data to
|
|
|
+ * and makes sure to close the socket after its done writing the data
|
|
|
+ */
|
|
|
+ if (k != null) {
|
|
|
+ try {
|
|
|
+ k.cancel();
|
|
|
+ } catch(Exception e) {
|
|
|
+ LOG.error("Error cancelling command selection key ", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
final PrintWriter pwriter = new PrintWriter(
|
|
|
new BufferedWriter(new SendBufferWriter()));
|
|
|
- boolean threadWillClosePWriter = false;
|
|
|
- try {
|
|
|
- if (len == ruokCmd) {
|
|
|
- pwriter.print("imok");
|
|
|
- return true;
|
|
|
- } else if (len == getTraceMaskCmd) {
|
|
|
- long traceMask = ZooTrace.getTextTraceLevel();
|
|
|
- pwriter.print(traceMask);
|
|
|
- return true;
|
|
|
- } else if (len == setTraceMaskCmd) {
|
|
|
- int rc = sock.read(incomingBuffer);
|
|
|
- if (rc < 0) {
|
|
|
- throw new IOException("Read error");
|
|
|
- }
|
|
|
-
|
|
|
- incomingBuffer.flip();
|
|
|
- long traceMask = incomingBuffer.getLong();
|
|
|
- ZooTrace.setTextTraceLevel(traceMask);
|
|
|
- pwriter.print(traceMask);
|
|
|
- return true;
|
|
|
- } else if (len == enviCmd) {
|
|
|
- List<Environment.Entry> env = Environment.list();
|
|
|
-
|
|
|
- pwriter.println("Environment:");
|
|
|
- for(Environment.Entry e : env) {
|
|
|
- pwriter.print(e.getKey());
|
|
|
- pwriter.print("=");
|
|
|
- pwriter.println(e.getValue());
|
|
|
- }
|
|
|
- return true;
|
|
|
- } else if (len == confCmd) {
|
|
|
- if (zk == null) {
|
|
|
- pwriter.println(ZK_NOT_SERVING);
|
|
|
- return true;
|
|
|
- }
|
|
|
- zk.dumpConf(pwriter);
|
|
|
- return true;
|
|
|
- } else if (len == srstCmd) {
|
|
|
- if (zk == null) {
|
|
|
- pwriter.println(ZK_NOT_SERVING);
|
|
|
- return true;
|
|
|
- }
|
|
|
- zk.serverStats().reset();
|
|
|
- pwriter.println("Server stats reset.");
|
|
|
- return true;
|
|
|
- } else if (len == crstCmd) {
|
|
|
- if (zk == null) {
|
|
|
- pwriter.println(ZK_NOT_SERVING);
|
|
|
- return true;
|
|
|
- }
|
|
|
- synchronized(factory.cnxns){
|
|
|
- for(NIOServerCnxn c : factory.cnxns){
|
|
|
- c.getStats().reset();
|
|
|
- }
|
|
|
- }
|
|
|
- pwriter.println("Connection stats reset.");
|
|
|
- return true;
|
|
|
- } else if (len == dumpCmd) {
|
|
|
- if (zk == null) {
|
|
|
- pwriter.println(ZK_NOT_SERVING);
|
|
|
- return true;
|
|
|
- }
|
|
|
- // this could be a long running task, spawn a thread so
|
|
|
- // that we don't block the processing of other requests
|
|
|
- threadWillClosePWriter = true;
|
|
|
- new Thread() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- pwriter.println("SessionTracker dump:");
|
|
|
- zk.sessionTracker.dumpSessions(pwriter);
|
|
|
- pwriter.println("ephemeral nodes dump:");
|
|
|
- zk.dumpEphemerals(pwriter);
|
|
|
- } finally {
|
|
|
- pwriter.flush();
|
|
|
- pwriter.close();
|
|
|
- }
|
|
|
- }
|
|
|
- }.start();
|
|
|
-
|
|
|
- return true;
|
|
|
- } else if (len == statCmd || len == srvrCmd) {
|
|
|
- if (zk == null) {
|
|
|
- pwriter.println(ZK_NOT_SERVING);
|
|
|
- return true;
|
|
|
- }
|
|
|
- // this could be a long running task, spawn a thread so
|
|
|
- // that we don't block the processing of other requests
|
|
|
- threadWillClosePWriter = true;
|
|
|
- new Thread() {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- pwriter.print("Zookeeper version: ");
|
|
|
- pwriter.println(Version.getFullVersion());
|
|
|
- if (len == statCmd) {
|
|
|
- pwriter.println("Clients:");
|
|
|
- // clone should be faster than iteration
|
|
|
- // ie give up the cnxns lock faster
|
|
|
- HashSet<NIOServerCnxn> cnxns;
|
|
|
- synchronized(factory.cnxns){
|
|
|
- cnxns = (HashSet<NIOServerCnxn>)factory
|
|
|
- .cnxns.clone();
|
|
|
- }
|
|
|
- for(NIOServerCnxn c : cnxns){
|
|
|
- ((CnxnStats)c.getStats())
|
|
|
- .dumpConnectionInfo(pwriter, true);
|
|
|
- }
|
|
|
- pwriter.println();
|
|
|
- }
|
|
|
- pwriter.print(zk.serverStats().toString());
|
|
|
- pwriter.print("Node count: ");
|
|
|
- pwriter.println(zk.getZKDatabase().getNodeCount());
|
|
|
- } finally {
|
|
|
- pwriter.flush();
|
|
|
- pwriter.close();
|
|
|
- }
|
|
|
- }
|
|
|
- }.start();
|
|
|
- return true;
|
|
|
- } else if (len == consCmd) {
|
|
|
- if (zk == null) {
|
|
|
- pwriter.println(ZK_NOT_SERVING);
|
|
|
- return true;
|
|
|
- }
|
|
|
- // this could be a long running task, spawn a thread so
|
|
|
- // that we don't block the processing of other requests
|
|
|
- threadWillClosePWriter = true;
|
|
|
- new Thread() {
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- // clone should be faster than iteration
|
|
|
- // ie give up the cnxns lock faster
|
|
|
- HashSet<NIOServerCnxn> cnxns;
|
|
|
- synchronized(factory.cnxns){
|
|
|
- cnxns = (HashSet<NIOServerCnxn>)factory
|
|
|
- .cnxns.clone();
|
|
|
- }
|
|
|
- for(NIOServerCnxn c : cnxns){
|
|
|
- ((CnxnStats)c.getStats())
|
|
|
- .dumpConnectionInfo(pwriter, false);
|
|
|
- }
|
|
|
- pwriter.println();
|
|
|
- } finally {
|
|
|
- pwriter.flush();
|
|
|
- pwriter.close();
|
|
|
- }
|
|
|
- }
|
|
|
- }.start();
|
|
|
- return true;
|
|
|
- } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
|
|
|
- if (zk == null) {
|
|
|
- pwriter.println(ZK_NOT_SERVING);
|
|
|
- return true;
|
|
|
- }
|
|
|
- // this could be a long running task, spawn a thread so
|
|
|
- // that we don't block the processing of other requests
|
|
|
- threadWillClosePWriter = true;
|
|
|
- new Thread() {
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- try {
|
|
|
- DataTree dt = zk.getZKDatabase().getDataTree();
|
|
|
- if (len == wchsCmd) {
|
|
|
- dt.dumpWatchesSummary(pwriter);
|
|
|
- } else if (len == wchpCmd) {
|
|
|
- dt.dumpWatches(pwriter, true);
|
|
|
- } else {
|
|
|
- dt.dumpWatches(pwriter, false);
|
|
|
- }
|
|
|
- pwriter.println();
|
|
|
- } finally {
|
|
|
- pwriter.flush();
|
|
|
- pwriter.close();
|
|
|
- }
|
|
|
- }
|
|
|
- }.start();
|
|
|
- return true;
|
|
|
- }
|
|
|
- } finally {
|
|
|
- // if we spawned a thread it is responsible for eventually
|
|
|
- // flushing and closeing the writer
|
|
|
- if (!threadWillClosePWriter) {
|
|
|
- pwriter.flush();
|
|
|
- pwriter.close();
|
|
|
+ if (len == ruokCmd) {
|
|
|
+ RuokCommand ruok = new RuokCommand(pwriter);
|
|
|
+ ruok.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == getTraceMaskCmd) {
|
|
|
+ TraceMaskCommand tmask = new TraceMaskCommand(pwriter);
|
|
|
+ tmask.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == setTraceMaskCmd) {
|
|
|
+ int rc = sock.read(incomingBuffer);
|
|
|
+ if (rc < 0) {
|
|
|
+ throw new IOException("Read error");
|
|
|
}
|
|
|
+
|
|
|
+ incomingBuffer.flip();
|
|
|
+ long traceMask = incomingBuffer.getLong();
|
|
|
+ ZooTrace.setTextTraceLevel(traceMask);
|
|
|
+ SetTraceMaskCommand setMask = new SetTraceMaskCommand(pwriter, traceMask);
|
|
|
+ setMask.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == enviCmd) {
|
|
|
+ EnvCommand env = new EnvCommand(pwriter);
|
|
|
+ env.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == confCmd) {
|
|
|
+ ConfCommand ccmd = new ConfCommand(pwriter);
|
|
|
+ ccmd.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == srstCmd) {
|
|
|
+ StatResetCommand strst = new StatResetCommand(pwriter);
|
|
|
+ strst.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == crstCmd) {
|
|
|
+ CnxnStatResetCommand crst = new CnxnStatResetCommand(pwriter);
|
|
|
+ crst.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == dumpCmd) {
|
|
|
+ DumpCommand dump = new DumpCommand(pwriter);
|
|
|
+ dump.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == statCmd || len == srvrCmd) {
|
|
|
+ StatCommand stat = new StatCommand(pwriter, len);
|
|
|
+ stat.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == consCmd) {
|
|
|
+ ConsCommand cons = new ConsCommand(pwriter);
|
|
|
+ cons.start();
|
|
|
+ return true;
|
|
|
+ } else if (len == wchpCmd || len == wchcCmd || len == wchsCmd) {
|
|
|
+ WatchCommand wcmd = new WatchCommand(pwriter, len);
|
|
|
+ wcmd.start();
|
|
|
+ return true;
|
|
|
}
|
|
|
return false;
|
|
|
}
|
|
@@ -1618,7 +1766,8 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
|
|
|
pwriter.print(((SocketChannel)channel).socket()
|
|
|
.getRemoteSocketAddress());
|
|
|
pwriter.print("[");
|
|
|
- pwriter.print(Integer.toHexString(sk.interestOps()));
|
|
|
+ pwriter.print(sk.isValid() ? Integer.toHexString(sk.interestOps())
|
|
|
+ : "0");
|
|
|
pwriter.print("](queued=");
|
|
|
pwriter.print(getOutstandingRequests());
|
|
|
pwriter.print(",recved=");
|