Ver código fonte

ZOOKEEPER-558. server "sent" stats not being updated

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@833620 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 15 anos atrás
pai
commit
e3b97366f8

+ 121 - 31
src/java/main/org/apache/zookeeper/ClientCnxn.java

@@ -91,7 +91,10 @@ public class ClientCnxn {
         // to test
         disableAutoWatchReset =
             Boolean.getBoolean("zookeeper.disableAutoWatchReset");
-        LOG.info("zookeeper.disableAutoWatchReset is " + disableAutoWatchReset);
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("zookeeper.disableAutoWatchReset is "
+                    + disableAutoWatchReset);
+        }
         packetLen = Integer.getInteger("jute.maxbuffer", 4096 * 1024);
     }
 
@@ -416,7 +419,7 @@ public class ClientCnxn {
                     Object event = waitingEvents.take();
                     try {
                         if (event == eventOfDeath) {
-                            break;
+                            return;
                         }
 
                         if (event instanceof WatcherSetEventPair) {
@@ -561,6 +564,34 @@ public class ClientCnxn {
 
     volatile long lastZxid;
 
+    private static class EndOfStreamException extends IOException {
+        private static final long serialVersionUID = -5438877188796231422L;
+
+        public EndOfStreamException(String msg) {
+            super(msg);
+        }
+        
+        public String toString() {
+            return "EndOfStreamException: " + getMessage();
+        }
+    }
+
+    private static class SessionTimeoutException extends IOException {
+        private static final long serialVersionUID = 824482094072071178L;
+
+        public SessionTimeoutException(String msg) {
+            super(msg);
+        }
+    }
+    
+    private static class SessionExpiredException extends IOException {
+        private static final long serialVersionUID = -1388816932076193249L;
+
+        public SessionExpiredException(String msg) {
+            super(msg);
+        }
+    }
+    
     /**
      * This class services the outgoing request queue and generates the heart
      * beats. It also spawns the ReadThread.
@@ -597,13 +628,17 @@ public class ClientCnxn {
                 eventThread.queueEvent(new WatchedEvent(
                         Watcher.Event.EventType.None,
                         Watcher.Event.KeeperState.Expired, null));
-                throw new IOException("Session Expired");
+                throw new SessionExpiredException(
+                        "Unable to reconnect to ZooKeeper service, session 0x"
+                        + Long.toHexString(sessionId) + " has expired");
             }
             readTimeout = sessionTimeout * 2 / 3;
             connectTimeout = sessionTimeout / serverAddrs.size();
             sessionId = conRsp.getSessionId();
             sessionPasswd = conRsp.getPasswd();
             zooKeeper.state = States.CONNECTED;
+            LOG.info("Session establishment complete, sessionid = 0x"
+                    + Long.toHexString(sessionId));
             eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                     Watcher.Event.KeeperState.SyncConnected, null));
         }
@@ -618,7 +653,7 @@ public class ClientCnxn {
             if (replyHdr.getXid() == -2) {
                 // -2 is the xid for pings
                 if (LOG.isDebugEnabled()) {
-                    LOG.debug("Got ping response for sessionid:0x"
+                    LOG.debug("Got ping response for sessionid: 0x"
                             + Long.toHexString(sessionId)
                             + " after "
                             + ((System.nanoTime() - lastPingSentNs) / 1000000)
@@ -629,14 +664,18 @@ public class ClientCnxn {
             if (replyHdr.getXid() == -4) {
                 // -2 is the xid for AuthPacket
                 // TODO: process AuthPacket here
-                LOG.debug("Got auth sessionid:0x"
-                        + Long.toHexString(sessionId));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got auth sessionid:0x"
+                            + Long.toHexString(sessionId));
+                }
                 return;
             }
             if (replyHdr.getXid() == -1) {
                 // -1 means notification
-                LOG.debug("Got notification sessionid:0x"
-                    + Long.toHexString(sessionId));
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Got notification sessionid:0x"
+                        + Long.toHexString(sessionId));
+                }
                 WatcherEvent event = new WatcherEvent();
                 event.deserialize(bbia, "response");
 
@@ -709,8 +748,10 @@ public class ClientCnxn {
             if (sockKey.isReadable()) {
                 int rc = sock.read(incomingBuffer);
                 if (rc < 0) {
-                    throw new IOException("Read error rc = " + rc + " "
-                            + incomingBuffer);
+                    throw new EndOfStreamException(
+                            "Unable to read additional data from server sessionid 0x"
+                            + Long.toHexString(sessionId)
+                            + ", likely server has closed socket");
                 }
                 if (incomingBuffer.remaining() == 0) {
                     incomingBuffer.flip();
@@ -793,7 +834,10 @@ public class ClientCnxn {
         }
 
         private void primeConnection(SelectionKey k) throws IOException {
-            LOG.info("Priming connection to " + sockKey.channel());
+            LOG.info("Socket connection established to "
+                    + ((SocketChannel)sockKey.channel())
+                        .socket().getRemoteSocketAddress()
+                    + ", initiating session");
             lastConnectIndex = currentConnectIndex;
             ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                     sessionTimeout, sessionId, sessionPasswd);
@@ -831,6 +875,11 @@ public class ClientCnxn {
             synchronized (this) {
                 k.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
             }
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Session establishment request sent on "
+                        + ((SocketChannel)sockKey.channel())
+                            .socket().getRemoteSocketAddress());
+            }
         }
 
         private void sendPing() {
@@ -872,12 +921,12 @@ public class ClientCnxn {
             if (nextAddrToTry == serverAddrs.size()) {
                 nextAddrToTry = 0;
             }
+            LOG.info("Opening socket connection to server " + addr);
             SocketChannel sock;
             sock = SocketChannel.open();
             sock.configureBlocking(false);
             sock.socket().setSoLinger(false, -1);
             sock.socket().setTcpNoDelay(true);
-            LOG.info("Attempting connection to server " + addr);
             setName(getName().replaceAll("\\(.*\\)",
                     "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
             sockKey = sock.register(selector, SelectionKey.OP_CONNECT);
@@ -893,6 +942,9 @@ public class ClientCnxn {
             incomingBuffer = lenBuffer;
         }
 
+        private static final String RETRY_CONN_MSG =
+            ", closing socket connection and attempting reconnect";
+        
         @Override
         public void run() {
             long now = System.currentTimeMillis();
@@ -916,7 +968,11 @@ public class ClientCnxn {
                         to = connectTimeout - idleRecv;
                     }
                     if (to <= 0) {
-                        throw new IOException("TIMED OUT");
+                        throw new SessionTimeoutException(
+                                "Client session timed out, have not heard from server in "
+                                + idleRecv + "ms"
+                                + " for sessionid 0x"
+                                + Long.toHexString(sessionId));
                     }
                     if (zooKeeper.state == States.CONNECTED) {
                         int timeToNextPing = readTimeout/2 - idleSend;
@@ -947,7 +1003,6 @@ public class ClientCnxn {
                                 lastHeard = now;
                                 lastSend = now;
                                 primeConnection(k);
-                                LOG.info("Server connection successful");
                             }
                         } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                             if (outgoingQueue.size() > 0) {
@@ -970,16 +1025,31 @@ public class ClientCnxn {
                     selected.clear();
                 } catch (Exception e) {
                     if (closing) {
-                        // closing so this is expected
-                        LOG
-                           .info("Exception while closing send thread for session 0x"
-                                + Long.toHexString(getSessionId())
-                                + " : " + e.getMessage());
+                        if (LOG.isDebugEnabled()) {
+                            // closing so this is expected
+                            LOG.debug("An exception was thrown while closing send thread for session 0x"
+                                    + Long.toHexString(getSessionId())
+                                    + " : " + e.getMessage());
+                        }
                         break;
                     } else {
-                        LOG.warn("Exception closing session 0x"
-                                + Long.toHexString(getSessionId()) + " to "
-                                + sockKey, e);
+                        // this is ugly, you have a better way speak up
+                        if (e instanceof SessionExpiredException) {
+                            LOG.info(e.getMessage() + ", closing socket connection");
+                        } else if (e instanceof SessionTimeoutException) {
+                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
+                        } else if (e instanceof EndOfStreamException) {
+                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
+                        } else {
+                            LOG.warn("Session 0x"
+                                    + Long.toHexString(getSessionId())
+                                    + " for server "
+                                    + ((SocketChannel)sockKey.channel())
+                                        .socket().getRemoteSocketAddress()
+                                    + ", unexpected error"
+                                    + RETRY_CONN_MSG,
+                                    e);
+                        }
                         cleanup();
                         if (zooKeeper.state.isAlive()) {
                             eventThread.queueEvent(new WatchedEvent(
@@ -1000,6 +1070,12 @@ public class ClientCnxn {
             } catch (IOException e) {
                 LOG.warn("Ignoring exception during selector close", e);
             }
+            if (zooKeeper.state.isAlive()) {
+                eventThread.queueEvent(new WatchedEvent(
+                        Event.EventType.None,
+                        Event.KeeperState.Disconnected,
+                        null));
+            }
             ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                                      "SendThread exitedloop.");
         }
@@ -1011,28 +1087,38 @@ public class ClientCnxn {
                 try {
                     sock.socket().shutdownInput();
                 } catch (IOException e) {
-                    LOG.warn("Ignoring exception during shutdown input", e);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ignoring exception during shutdown input", e);
+                    }
                 }
                 try {
                     sock.socket().shutdownOutput();
                 } catch (IOException e) {
-                    LOG.warn("Ignoring exception during shutdown output", e);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ignoring exception during shutdown output", e);
+                    }
                 }
                 try {
                     sock.socket().close();
                 } catch (IOException e) {
-                    LOG.warn("Ignoring exception during socket close", e);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ignoring exception during socket close", e);
+                    }
                 }
                 try {
                     sock.close();
                 } catch (IOException e) {
-                    LOG.warn("Ignoring exception during channel close", e);
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Ignoring exception during channel close", e);
+                    }
                 }
             }
             try {
                 Thread.sleep(100);
             } catch (InterruptedException e) {
-                LOG.warn("SendThread interrupted during sleep, ignoring");
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("SendThread interrupted during sleep, ignoring");
+                }
             }
             sockKey = null;
             synchronized (pendingQueue) {
@@ -1064,8 +1150,10 @@ public class ClientCnxn {
      * behavior.
      */
     public void disconnect() {
-        LOG.info("Disconnecting ClientCnxn for session: 0x"
-                + Long.toHexString(getSessionId()));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Disconnecting client for session: 0x"
+                      + Long.toHexString(getSessionId()));
+        }
 
         sendThread.close();
         eventThread.queueEventOfDeath();
@@ -1078,8 +1166,10 @@ public class ClientCnxn {
      * @throws IOException
      */
     public void close() throws IOException {
-        LOG.info("Closing ClientCnxn for session: 0x"
-                + Long.toHexString(getSessionId()));
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Closing client for session: 0x"
+                      + Long.toHexString(getSessionId()));
+        }
 
         closing = true;
 

+ 35 - 2
src/java/main/org/apache/zookeeper/ZooKeeper.java

@@ -502,12 +502,23 @@ public class ZooKeeper {
      * @throws InterruptedException
      */
     public synchronized void close() throws InterruptedException {
-        LOG.info("Closing session: 0x" + Long.toHexString(getSessionId()));
+        if (!state.isAlive()) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Close called on already closed client");
+            }
+            return;
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Closing session: 0x" + Long.toHexString(getSessionId()));
+        }
 
         try {
             cnxn.close();
         } catch (IOException e) {
-            LOG.warn("Ignoring unexpected exception", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Ignoring unexpected exception during close", e);
+            }
         }
 
         LOG.info("Session: 0x" + Long.toHexString(getSessionId()) + " closed");
@@ -1436,4 +1447,26 @@ public class ZooKeeper {
     public States getState() {
         return state;
     }
+    
+    /*
+     * Methods to aid in testing follow.
+     * 
+     * THESE METHODS ARE EXPECTED TO BE USED FOR TESTING ONLY!!!
+     */
+
+    /**
+     * Wait up to wait milliseconds for the underlying threads to shutdown.
+     * THIS METHOD IS EXPECTED TO BE USED FOR TESTING ONLY!!!
+     * @param wait max wait in milliseconds
+     * @return true iff all threads are shutdown, otw false
+     */
+    protected boolean testableWaitForShutdown(int wait)
+        throws InterruptedException
+    {
+        cnxn.sendThread.join(wait);
+        if (cnxn.sendThread.isAlive()) return false;
+        cnxn.eventThread.join(wait);
+        if (cnxn.eventThread.isAlive()) return false;
+        return true;
+    }
 }

+ 1 - 1
src/java/main/org/apache/zookeeper/server/ConnectionBean.java

@@ -93,7 +93,7 @@ public class ConnectionBean implements ConnectionMXBean, ZKMBeanInfo {
     }
     
     public void terminateConnection() {
-        connection.close();
+        connection.sendCloseSession();
     }
 
     @Override

+ 11 - 1
src/java/main/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -123,8 +123,13 @@ public class FinalRequestProcessor implements RequestProcessor {
             Factory scxn = zks.getServerCnxnFactory();
             // this might be possible since
             // we might just be playing diffs from the leader
-            if (scxn != null) {
+            if (scxn != null && request.cnxn == null) {
+                // calling this if we have the cnxn results in the client's
+                // close session response being lost - we've already closed
+                // the session/socket here before we can send the closeSession
+                // in the switch block below
                 scxn.closeSession(request.sessionId);
+                return;
             }
         }
 
@@ -134,6 +139,7 @@ public class FinalRequestProcessor implements RequestProcessor {
         zks.decInProcess();
         Code err = Code.OK;
         Record rsp = null;
+        boolean closeSession = false;
         try {
             if (request.hdr != null && request.hdr.getType() == OpCode.error) {
                 throw KeeperException.create(KeeperException.Code.get((
@@ -178,6 +184,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 break;
             }
             case OpCode.closeSession: {
+                closeSession = true;
                 err = Code.get(rc.err);
                 break;
             }
@@ -309,6 +316,9 @@ public class FinalRequestProcessor implements RequestProcessor {
         zks.serverStats().updateLatency(request.createTime);
         try {
             request.cnxn.sendResponse(hdr, rsp, "response");
+            if (closeSession) {
+                request.cnxn.sendCloseSession();
+            }
         } catch (IOException e) {
             LOG.error("FIXMSG",e);
         }

+ 325 - 179
src/java/main/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -70,7 +70,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
 
     private ConnectionBean jmxConnectionBean;
 
-   
+
     static public class Factory extends Thread {
         ZooKeeperServer zks;
 
@@ -210,6 +210,8 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                                          + " - max is " + maxClientCnxns );
                                 sc.close();
                             } else {
+                                LOG.info("Accepted socket connection from "
+                                        + sc.socket().getRemoteSocketAddress());
                                 sc.configureBlocking(false);
                                 SelectionKey sk = sc.register(selector,
                                         SelectionKey.OP_READ);
@@ -228,6 +230,8 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                         }
                     }
                     selected.clear();
+                } catch (RuntimeException e) {
+                    LOG.warn("Ignoring unexpected runtime exception", e);
                 } catch (Exception e) {
                     LOG.warn("Ignoring exception", e);
                 }
@@ -312,7 +316,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
 
     Factory factory;
 
-    ZooKeeperServer zk;
+    private final ZooKeeperServer zk;
 
     private SocketChannel sock;
 
@@ -332,19 +336,30 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
 
     LinkedList<Request> outstanding = new LinkedList<Request>();
 
+    /* Send close connection packet to the client, doIO will eventually
+     * close the underlying machinery (like socket, selectorkey, etc...)
+     */
+    public void sendCloseSession() {
+        sendBuffer(closeConn);
+    }
+
     void sendBuffer(ByteBuffer bb) {
-        // We check if write interest here because if it is NOT set, nothing is queued, so
-        // we can try to send the buffer right away without waking up the selector
-        if ((sk.interestOps()&SelectionKey.OP_WRITE) == 0) {
-            try {
-                sock.write(bb);
-            } catch (IOException e) {
-                // we are just doing best effort right now
+        if (bb != closeConn) {
+            // We check if write interest here because if it is NOT set,
+            // nothing is queued, so we can try to send the buffer right
+            // away without waking up the selector
+            if ((sk.interestOps() & SelectionKey.OP_WRITE) == 0) {
+                try {
+                    sock.write(bb);
+                } catch (IOException e) {
+                    // we are just doing best effort right now
+                }
+            }
+            // if there is nothing left to send, we are done
+            if (bb.remaining() == 0) {
+                packetSent();
+                return;
             }
-        }
-        // if there is nothing left to send, we are done
-        if (bb.remaining() == 0) {
-            return;
         }
         synchronized (factory) {
             sk.selector().wakeup();
@@ -359,6 +374,51 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
         }
     }
 
+    private static class CloseRequestException extends IOException {
+        private static final long serialVersionUID = -7854505709816442681L;
+
+        public CloseRequestException(String msg) {
+            super(msg);
+        }
+    }
+
+    private static class EndOfStreamException extends IOException {
+        private static final long serialVersionUID = -8255690282104294178L;
+
+        public EndOfStreamException(String msg) {
+            super(msg);
+        }
+        
+        public String toString() {
+            return "EndOfStreamException: " + getMessage();
+        }
+    }
+
+    /** Read the request payload (everything followng the length prefix) */
+    private void readPayload() throws IOException, InterruptedException {
+        if (incomingBuffer.remaining() != 0) { // have we read length bytes?
+            int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
+            if (rc < 0) {
+                throw new EndOfStreamException(
+                        "Unable to read additional data from client sessionid 0x"
+                        + Long.toHexString(sessionId)
+                        + ", likely client has closed socket");
+            }
+        }
+
+        if (incomingBuffer.remaining() == 0) { // have we read length bytes?
+            packetReceived();
+            incomingBuffer.flip();
+            if (!initialized) {
+                readConnectRequest();
+            } else {
+                readRequest();
+            }
+            lenBuffer.clear();
+            incomingBuffer = lenBuffer;
+        }
+    }
+    
     void doIO(SelectionKey k) throws InterruptedException {
         try {
             if (sock == null) {
@@ -370,24 +430,22 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             if (k.isReadable()) {
                 int rc = sock.read(incomingBuffer);
                 if (rc < 0) {
-                    throw new IOException("Read error");
+                    throw new EndOfStreamException(
+                            "Unable to read additional data from client sessionid 0x"
+                            + Long.toHexString(sessionId)
+                            + ", likely client has closed socket");
                 }
                 if (incomingBuffer.remaining() == 0) {
-                    incomingBuffer.flip();
-                    if (incomingBuffer == lenBuffer) {
-                        readLength(k);
-                    } else if (!initialized) {
-                        stats.packetsReceived++;
-                        zk.serverStats().incrementPacketsReceived();
-                        readConnectRequest();
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
+                    boolean isPayload;
+                    if (incomingBuffer == lenBuffer) { // start of next request
+                        incomingBuffer.flip();
+                        isPayload = readLength(k);
                     } else {
-                        stats.packetsReceived++;
-                        zk.serverStats().incrementPacketsReceived();
-                        readRequest();
-                        lenBuffer.clear();
-                        incomingBuffer = lenBuffer;
+                        // continuation
+                        isPayload = true;
+                    }
+                    if (isPayload) { // not the case for 4letterword
+                        readPayload();
                     }
                 }
             }
@@ -448,7 +506,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                     while (outgoingBuffers.size() > 0) {
                         bb = outgoingBuffers.peek();
                         if (bb == closeConn) {
-                            throw new IOException("closing");
+                            throw new CloseRequestException("close requested");
                         }
                         int left = bb.remaining() - sent;
                         if (left > 0) {
@@ -459,12 +517,9 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                             bb.position(bb.position() + sent);
                             break;
                         }
-                        stats.packetsSent++;
+                        packetSent();
                         /* We've sent the whole buffer, so drop the buffer */
                         sent -= bb.remaining();
-                        if (zk != null) {
-                            zk.serverStats().incrementPacketsSent();
-                        }
                         outgoingBuffers.remove();
                     }
                     // ZooLog.logTraceMessage(LOG,
@@ -475,7 +530,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                     if (outgoingBuffers.size() == 0) {
                         if (!initialized
                                 && (sk.interestOps() & SelectionKey.OP_READ) == 0) {
-                            throw new IOException("Responded to info probe");
+                            throw new CloseRequestException("responded to info probe");
                         }
                         sk.interestOps(sk.interestOps()
                                 & (~SelectionKey.OP_WRITE));
@@ -489,13 +544,25 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             LOG.warn("Exception causing close of session 0x"
                     + Long.toHexString(sessionId)
                     + " due to " + e);
-            LOG.debug("CancelledKeyException stack trace", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("CancelledKeyException stack trace", e);
+            }
+            close();
+        } catch (CloseRequestException e) {
+            // expecting close to log session closure
+            close();
+        } catch (EndOfStreamException e) {
+            LOG.warn(e); // tell user why
+
+            // expecting close to log session closure
             close();
         } catch (IOException e) {
             LOG.warn("Exception causing close of session 0x"
                     + Long.toHexString(sessionId)
                     + " due to " + e);
-            LOG.debug("IOException stack trace", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("IOException stack trace", e);
+            }
             close();
         }
     }
@@ -530,11 +597,13 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                         KeeperException.Code.AUTHFAILED.intValue());
                 sendResponse(rh, null, null);
                 // ... and close connection
-                sendBuffer(NIOServerCnxn.closeConn);
+                sendCloseSession();
                 disableRecv();
             } else {
-                LOG.debug("Authentication succeeded for scheme: "
-                        + scheme);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Authentication succeeded for scheme: "
+                              + scheme);
+                }
                 ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                         KeeperException.Code.OK.intValue());
                 sendResponse(rh, null, null);
@@ -551,7 +620,9 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                     outstandingRequests++;
                     // check throttling
                     if (zk.getInProcess() > factory.outstandingLimit) {
-                        LOG.debug("Throttling recv " + zk.getInProcess());
+                        if (LOG.isDebugEnabled()) {
+                            LOG.debug("Throttling recv " + zk.getInProcess());
+                        }
                         disableRecv();
                         // following lines should not be needed since we are
                         // already reading
@@ -581,19 +652,26 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                 .getArchive(new ByteBufferInputStream(incomingBuffer));
         ConnectRequest connReq = new ConnectRequest();
         connReq.deserialize(bia, "connect");
-        LOG.info("Connected to " + sock.socket().getRemoteSocketAddress()
-                + " lastZxid " + connReq.getLastZxidSeen());
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Session establishment request from client "
+                    + sock.socket().getRemoteSocketAddress()
+                    + " client's lastZxid is 0x"
+                    + Long.toHexString(connReq.getLastZxidSeen()));
+        }
         if (zk == null) {
             throw new IOException("ZooKeeperServer not running");
         }
         if (connReq.getLastZxidSeen() > zk.dataTree.lastProcessedZxid) {
-            String msg = "Client has seen zxid 0x"
+            String msg = "Refusing session request for client "
+                + sock.socket().getRemoteSocketAddress()
+                + " as it has seen zxid 0x"
                 + Long.toHexString(connReq.getLastZxidSeen())
                 + " our last zxid is 0x"
-                + Long.toHexString(zk.dataTree.lastProcessedZxid);
+                + Long.toHexString(zk.dataTree.lastProcessedZxid)
+                + " client must try another server";
 
-            LOG.warn(msg);
-            throw new IOException(msg);
+            LOG.info(msg);
+            throw new CloseRequestException(msg);
         }
         sessionTimeout = connReq.getTimeOut();
         byte passwd[] = connReq.getPasswd();
@@ -607,145 +685,186 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
         // session is setup
         disableRecv();
         if (connReq.getSessionId() != 0) {
-            factory.closeSessionWithoutWakeup(connReq.getSessionId());
-            setSessionId(connReq.getSessionId());
+            long clientSessionId = connReq.getSessionId();
+            LOG.info("Client attempting to renew session 0x"
+                    + Long.toHexString(clientSessionId)
+                    + " at " + sock.socket().getRemoteSocketAddress());
+            factory.closeSessionWithoutWakeup(clientSessionId);
+            setSessionId(clientSessionId);
             zk.reopenSession(this, sessionId, passwd, sessionTimeout);
-            LOG.info("Renewing session 0x" + Long.toHexString(sessionId));
         } else {
+            LOG.info("Client attempting to establish new session at "
+                    + sock.socket().getRemoteSocketAddress());
             zk.createSession(this, passwd, sessionTimeout);
-            LOG.info("Creating new session 0x" + Long.toHexString(sessionId));
         }
         initialized = true;
     }
 
-    private void readLength(SelectionKey k) throws IOException {
-        // Read the length, now get the buffer
-        int len = lenBuffer.getInt();
-        if (!initialized) {
-            // We take advantage of the limited size of the length to look
-            // for cmds. They are all 4-bytes which fits inside of an int
-            if (len == ruokCmd) {
-                LOG.info("Processing ruok command from "
-                        + sock.socket().getRemoteSocketAddress());
+    private void packetReceived() {
+        stats.packetsReceived++;
+        if (zk != null) {
+            zk.serverStats().incrementPacketsReceived();
+        }
+    }
 
-                sendBuffer(imok.duplicate());
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
-            } else if (len == getTraceMaskCmd) {
-                LOG.info("Processing getracemask command from "
-                        + sock.socket().getRemoteSocketAddress());
-                long traceMask = ZooTrace.getTextTraceLevel();
-                ByteBuffer resp = ByteBuffer.allocate(8);
-                resp.putLong(traceMask);
-                resp.flip();
-                sendBuffer(resp);
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
-            } else if (len == setTraceMaskCmd) {
-                LOG.info("Processing settracemask command from "
-                        + sock.socket().getRemoteSocketAddress());
-                incomingBuffer = ByteBuffer.allocate(8);
+    private void packetSent() {
+        stats.packetsSent++;
+        if (zk != null) {
+            zk.serverStats().incrementPacketsSent();
+        }
+    }
 
-                int rc = sock.read(incomingBuffer);
-                if (rc < 0) {
-                    throw new IOException("Read error");
-                }
-                System.out.println("rc=" + rc);
-                incomingBuffer.flip();
-                long traceMask = incomingBuffer.getLong();
-                ZooTrace.setTextTraceLevel(traceMask);
-                ByteBuffer resp = ByteBuffer.allocate(8);
-                resp.putLong(traceMask);
-                resp.flip();
-                sendBuffer(resp);
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
-            } else if (len == dumpCmd) {
-                LOG.info("Processing dump command from "
-                        + sock.socket().getRemoteSocketAddress());
-                if (zk == null) {
-                    sendBuffer(ByteBuffer.wrap("ZooKeeper not active \n"
-                            .getBytes()));
-                } else {
-                    StringBuffer sb = new StringBuffer();
-                    sb.append("SessionTracker dump: \n");
-                    sb.append(zk.sessionTracker.toString()).append("\n");
-                    sb.append("ephemeral nodes dump:\n");
-                    sb.append(zk.dataTree.dumpEphemerals()).append("\n");
-                    sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
-                }
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
-            } else if (len == reqsCmd) {
-                LOG.info("Processing reqs command from "
-                        + sock.socket().getRemoteSocketAddress());
+    /** Return if four letter word found and responded to, otw false **/
+    private boolean checkFourLetterWord(SelectionKey k, int len)
+        throws IOException
+    {
+        // We take advantage of the limited size of the length to look
+        // for cmds. They are all 4-bytes which fits inside of an int
+        if (len == ruokCmd) {
+            LOG.info("Processing ruok command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
+
+            sendBuffer(imok.duplicate());
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == getTraceMaskCmd) {
+            LOG.info("Processing getracemask command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
+
+            long traceMask = ZooTrace.getTextTraceLevel();
+            ByteBuffer resp = ByteBuffer.allocate(8);
+            resp.putLong(traceMask);
+            resp.flip();
+            sendBuffer(resp);
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == setTraceMaskCmd) {
+            LOG.info("Processing settracemask command from "
+                    + sock.socket().getRemoteSocketAddress());
+            incomingBuffer = ByteBuffer.allocate(8);
+
+            int rc = sock.read(incomingBuffer);
+            if (rc < 0) {
+                throw new IOException("Read error");
+            }
+            packetReceived();
+
+            System.out.println("rc=" + rc);
+            incomingBuffer.flip();
+            long traceMask = incomingBuffer.getLong();
+            ZooTrace.setTextTraceLevel(traceMask);
+            ByteBuffer resp = ByteBuffer.allocate(8);
+            resp.putLong(traceMask);
+            resp.flip();
+            sendBuffer(resp);
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == dumpCmd) {
+            LOG.info("Processing dump command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
+
+            if (zk == null) {
+                sendBuffer(ByteBuffer.wrap("ZooKeeper not active \n"
+                        .getBytes()));
+            } else {
                 StringBuffer sb = new StringBuffer();
-                sb.append("Requests:\n");
-                synchronized (outstanding) {
-                    for (Request r : outstanding) {
-                        sb.append(r.toString());
-                        sb.append('\n');
-                    }
-                }
+                sb.append("SessionTracker dump: \n");
+                sb.append(zk.sessionTracker.toString()).append("\n");
+                sb.append("ephemeral nodes dump:\n");
+                sb.append(zk.dataTree.dumpEphemerals()).append("\n");
                 sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
-            } else if (len == statCmd) {
-                LOG.info("Processing stat command from "
-                        + sock.socket().getRemoteSocketAddress());
-                StringBuffer sb = new StringBuffer();
-                if(zk!=null){
-                    sb.append("Zookeeper version: ").append(Version.getFullVersion())
-                        .append("\n");
-                    sb.append("Clients:\n");
-                    synchronized(factory.cnxns){
-                        for(NIOServerCnxn c : factory.cnxns){
-                            sb.append(c.getStats().toString());
-                        }
+            }
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == reqsCmd) {
+            LOG.info("Processing reqs command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
+
+            StringBuffer sb = new StringBuffer();
+            sb.append("Requests:\n");
+            synchronized (outstanding) {
+                for (Request r : outstanding) {
+                    sb.append(r.toString());
+                    sb.append('\n');
+                }
+            }
+            sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == statCmd) {
+            LOG.info("Processing stat command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
+
+            StringBuffer sb = new StringBuffer();
+            if(zk != null){
+                sb.append("Zookeeper version: ").append(Version.getFullVersion())
+                    .append("\n");
+                sb.append("Clients:\n");
+                synchronized(factory.cnxns){
+                    for(NIOServerCnxn c : factory.cnxns){
+                        sb.append(c.getStats().toString());
                     }
-                    sb.append("\n");
-                    sb.append(zk.serverStats().toString());
-                    sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
-                        append("\n");
-                }else
-                    sb.append("ZooKeeperServer not running\n");
-
-                sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
-            } else if (len == enviCmd) {
-                LOG.info("Processing envi command from "
-                        + sock.socket().getRemoteSocketAddress());
-                StringBuffer sb = new StringBuffer();
+                }
+                sb.append("\n");
+                sb.append(zk.serverStats().toString());
+                sb.append("Node count: ").append(zk.dataTree.getNodeCount()).
+                    append("\n");
+            } else {
+                sb.append("ZooKeeperServer not running\n");
+            }
 
-                List<Environment.Entry> env = Environment.list();
+            sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == enviCmd) {
+            LOG.info("Processing envi command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
 
-                sb.append("Environment:\n");
-                for(Environment.Entry e : env) {
-                    sb.append(e.getKey()).append("=").append(e.getValue())
-                        .append("\n");
-                }
+            StringBuffer sb = new StringBuffer();
 
-                sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
-            } else if (len == srstCmd) {
-                LOG.info("Processing srst command from "
-                        + sock.socket().getRemoteSocketAddress());
-                zk.serverStats().reset();
+            List<Environment.Entry> env = Environment.list();
 
-                sendBuffer(ByteBuffer.wrap("Stats reset.\n".getBytes()));
-                sendBuffer(NIOServerCnxn.closeConn);
-                k.interestOps(SelectionKey.OP_WRITE);
-                return;
+            sb.append("Environment:\n");
+            for(Environment.Entry e : env) {
+                sb.append(e.getKey()).append("=").append(e.getValue())
+                    .append("\n");
             }
+
+            sendBuffer(ByteBuffer.wrap(sb.toString().getBytes()));
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        } else if (len == srstCmd) {
+            LOG.info("Processing srst command from "
+                    + sock.socket().getRemoteSocketAddress());
+            packetReceived();
+
+            zk.serverStats().reset();
+
+            sendBuffer(ByteBuffer.wrap("Stats reset.\n".getBytes()));
+            k.interestOps(SelectionKey.OP_WRITE);
+            return true;
+        }
+        return false;
+    }
+
+    /** Reads the first 4 bytes of lenBuffer, which could be true length or
+     *  four letter word.
+     *
+     * @param k selection key
+     * @return true if length read, otw false (wasn't really the length)
+     * @throws IOException if buffer size exceeds maxBuffer size
+     */
+    private boolean readLength(SelectionKey k) throws IOException {
+        // Read the length, now get the buffer
+        int len = lenBuffer.getInt();
+        if (!initialized && checkFourLetterWord(k, len)) {
+            return false;
         }
         if (len < 0 || len > BinaryInputArchive.maxBuffer) {
             throw new IOException("Len error " + len);
@@ -754,6 +873,7 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             throw new IOException("ZooKeeperServer not running");
         }
         incomingBuffer = ByteBuffer.allocate(len);
+        return true;
     }
 
     /**
@@ -831,8 +951,11 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             zk.removeCnxn(this);
         }
 
-        LOG.info("closing session:0x" + Long.toHexString(sessionId)
-                + " NIOServerCnxn: " + sock);
+        LOG.info("Closed socket connection for client "
+                + sock.socket().getRemoteSocketAddress()
+                + (sessionId != 0 ?
+                        " which had sessionid 0x" + Long.toHexString(sessionId) :
+                        " (no session established for client)"));
         try {
             /*
              * The following sequence of code is stupid! You would think that
@@ -843,18 +966,24 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             sock.socket().shutdownOutput();
         } catch (IOException e) {
             // This is a relatively common exception that we can't avoid
-            LOG.debug("ignoring exception during output shutdown", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ignoring exception during output shutdown", e);
+            }
         }
         try {
             sock.socket().shutdownInput();
         } catch (IOException e) {
             // This is a relatively common exception that we can't avoid
-            LOG.debug("ignoring exception during input shutdown", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ignoring exception during input shutdown", e);
+            }
         }
         try {
             sock.socket().close();
         } catch (IOException e) {
-            LOG.warn("ignoring exception during socket close", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ignoring exception during socket close", e);
+            }
         }
         try {
             sock.close();
@@ -863,7 +992,9 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             // this section arise.
             // factory.selector.wakeup();
         } catch (IOException e) {
-            LOG.warn("ignoring exception during socketchannel close", e);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("ignoring exception during socketchannel close", e);
+            }
         }
         sock = null;
         if (sk != null) {
@@ -871,7 +1002,9 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
                 // need to cancel this selection key from the selector
                 sk.cancel();
             } catch (Exception e) {
-                LOG.warn("ignoring exception during selectionkey cancel", e);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("ignoring exception during selectionkey cancel", e);
+                }
             }
         }
     }
@@ -886,6 +1019,11 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
      */
     synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
         if (closed) {
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("send called on closed session 0x"
+                          + Long.toHexString(sessionId)
+                          + " with record " + r);
+            }
             return;
         }
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -962,12 +1100,20 @@ public class NIOServerCnxn implements Watcher, ServerCnxn {
             bb.putInt(bb.remaining() - 4).rewind();
             sendBuffer(bb);
 
-            LOG.info("Finished init of 0x" + Long.toHexString(sessionId)
-                    + " valid:" + valid);
-
             if (!valid) {
-                sendBuffer(closeConn);
+                LOG.info("Invalid session 0x"
+                        + Long.toHexString(sessionId)
+                        + " for client "
+                        + sock.socket().getRemoteSocketAddress()
+                        + ", probably expired");
+                sendCloseSession();
+            } else {
+                LOG.info("Established session 0x"
+                        + Long.toHexString(sessionId)
+                        + " for client "
+                        + sock.socket().getRemoteSocketAddress());
             }
+
             // Now that the session is ready we can start receiving packets
             synchronized (this.factory) {
                 sk.selector().wakeup();

+ 11 - 10
src/java/main/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -69,13 +69,13 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
             LOG.info("zookeeper.skipACL==\"yes\", ACL checks will be skipped");
         }
     }
-    
+
     /**
      * this is only for testing purposes.
      * should never be useed otherwise
      */
     private static  boolean failCreate = false;
-    
+
     LinkedBlockingQueue<Request> submittedRequests = new LinkedBlockingQueue<Request>();
 
     RequestProcessor nextProcessor;
@@ -88,7 +88,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
         this.nextProcessor = nextProcessor;
         this.zks = zks;
     }
-    
+
     /**
      * method for tests to set failCreate
      * @param b
@@ -215,7 +215,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 String path = createRequest.getPath();
                 int lastSlash = path.lastIndexOf('/');
                 if (lastSlash == -1 || path.indexOf('\0') != -1 || failCreate) {
-                    LOG.warn("Invalid path " + path + " with session " +
+                    LOG.info("Invalid path " + path + " with session " +
                             Long.toHexString(request.sessionId));
                     throw new KeeperException.BadArgumentsException();
                 }
@@ -236,7 +236,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 try {
                     PathUtils.validatePath(path);
                 } catch(IllegalArgumentException ie) {
-                    LOG.warn("Invalid path " + path + " with session " +
+                    LOG.info("Invalid path " + path + " with session " +
                             Long.toHexString(request.sessionId));
                     throw new KeeperException.BadArgumentsException();
                 }
@@ -381,7 +381,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                                 path2Delete, null, 0, null));
                     }
                 }
-                LOG.info("Processed session termination request for id: 0x"
+                LOG.info("Processed session termination for sessionid: 0x"
                         + Long.toHexString(request.sessionId));
                 break;
             case OpCode.sync:
@@ -392,7 +392,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
             case OpCode.getChildren2:
             case OpCode.ping:
             case OpCode.setWatches:
-            	zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
+                zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
                 break;
             }
         } catch (KeeperException e) {
@@ -400,13 +400,14 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
                 txnHeader.setType(OpCode.error);
                 txn = new ErrorTxn(e.code().intValue());
             }
-            LOG.warn("Got exception when processing " + request.toString(), e);
+            LOG.info("Got user-level KeeperException when processing "
+                    + request.toString() + " Error:" + e.getMessage());
             request.setException(e);
         } catch (Exception e) {
             // log at error level as we are returning a marshalling
             // error to the user
             LOG.error("Failed to process " + request, e);
-            
+
             StringBuffer sb = new StringBuffer();
             ByteBuffer bb = request.request;
             if(bb != null){
@@ -417,7 +418,7 @@ public class PrepRequestProcessor extends Thread implements RequestProcessor {
             } else {
                 sb.append("request buffer is null");
             }
-            
+
             LOG.error("Dumping request buffer: 0x" + sb.toString());
             if (txnHeader != null) {
                 txnHeader.setType(OpCode.error);

+ 13 - 13
src/java/main/org/apache/zookeeper/server/ServerCnxn.java

@@ -59,30 +59,30 @@ public interface ServerCnxn extends Watcher {
     // (aka owned by) this class
     final public static Object me = new Object();
 
-    public abstract int getSessionTimeout();
+    int getSessionTimeout();
 
-    public abstract void close();
+    void sendResponse(ReplyHeader h, Record r, String tag) throws IOException;
 
-    public abstract void sendResponse(ReplyHeader h, Record r, String tag)
-            throws IOException;
-
-    public void finishSessionInit(boolean valid);
+    /* notify the client the session is closing and close/cleanup socket */
+    void sendCloseSession(); 
+    
+    void finishSessionInit(boolean valid);
 
-    public abstract void process(WatchedEvent event);
+    void process(WatchedEvent event);
 
-    public abstract long getSessionId();
+    long getSessionId();
 
-    public abstract void setSessionId(long sessionId);
+    void setSessionId(long sessionId);
 
-    public abstract ArrayList<Id> getAuthInfo();
+    ArrayList<Id> getAuthInfo();
 
-    public InetSocketAddress getRemoteAddress();
+    InetSocketAddress getRemoteAddress();
     
-    public interface Stats{
+    interface Stats {
         public long getOutstandingRequests();
         public long getPacketsReceived();
         public long getPacketsSent();
     }
     
-    public Stats getStats();
+    Stats getStats();
 }

+ 6 - 2
src/java/main/org/apache/zookeeper/server/SessionTracker.java

@@ -29,10 +29,14 @@ import org.apache.zookeeper.KeeperException.SessionMovedException;
  * shell to track information to be forwarded to the leader.
  */
 public interface SessionTracker {
+    public static interface Session {
+        long getSessionId();
+        int getTimeout();
+    }
     public static interface SessionExpirer {
-        public void expire(long sessionId);
+        void expire(Session session);
 
-        public long getServerId();
+        long getServerId();
     }
 
     long createSession(int sessionTimeout);

+ 18 - 17
src/java/main/org/apache/zookeeper/server/SessionTrackerImpl.java

@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.SessionExpiredException;
-import org.apache.zookeeper.KeeperException.SessionMovedException;
 
 /**
  * This is a full featured SessionTracker. It tracks session in grouped by tick
@@ -40,7 +39,7 @@ import org.apache.zookeeper.KeeperException.SessionMovedException;
 public class SessionTrackerImpl extends Thread implements SessionTracker {
     private static final Logger LOG = Logger.getLogger(SessionTrackerImpl.class);
 
-    HashMap<Long, Session> sessionsById = new HashMap<Long, Session>();
+    HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
 
     HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();
 
@@ -50,17 +49,21 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
 
     int expirationInterval;
 
-    public static class Session {
-        Session(long sessionId, long expireTime) {
+    public static class SessionImpl implements Session {
+        SessionImpl(long sessionId, int timeout, long expireTime) {
             this.sessionId = sessionId;
+            this.timeout = timeout;
             this.tickTime = expireTime;
         }
 
+        final long sessionId;
+        final int timeout;
         long tickTime;
 
-        long sessionId;
-        
         Object owner;
+
+        public long getSessionId() { return sessionId; }
+        public int getTimeout() { return timeout; }
     }
 
     public static long initializeNextSession(long id) {
@@ -71,7 +74,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     }
 
     static class SessionSet {
-        HashSet<Session> sessions = new HashSet<Session>();
+        HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
     }
 
     SessionExpirer expirer;
@@ -109,7 +112,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
         for (long time : keys) {
             sb.append(sessionSets.get(time).sessions.size() + " expire at "
                     + new Date(time) + ":\n");
-            for (Session s : sessionSets.get(time).sessions) {
+            for (SessionImpl s : sessionSets.get(time).sessions) {
                 sb.append("\t" + s.sessionId + "\n");
             }
         }
@@ -128,11 +131,9 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
                 SessionSet set;
                 set = sessionSets.remove(nextExpirationTime);
                 if (set != null) {
-                    for (Session s : set.sessions) {
+                    for (SessionImpl s : set.sessions) {
                         sessionsById.remove(s.sessionId);
-                        LOG.info("Expiring session 0x"
-                                + Long.toHexString(s.sessionId));
-                        expirer.expire(s.sessionId);
+                        expirer.expire(s);
                     }
                 }
                 nextExpirationTime += expirationInterval;
@@ -150,7 +151,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
                                      "SessionTrackerImpl --- Touch session: 0x"
                     + Long.toHexString(sessionId) + " with timeout " + timeout);
         }
-        Session s = sessionsById.get(sessionId);
+        SessionImpl s = sessionsById.get(sessionId);
         if (s == null) {
             return false;
         }
@@ -174,7 +175,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     }
 
     synchronized public void removeSession(long sessionId) {
-        Session s = sessionsById.remove(sessionId);
+        SessionImpl s = sessionsById.remove(sessionId);
         sessionsWithTimeout.remove(sessionId);
         if (LOG.isTraceEnabled()) {
             ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
@@ -203,7 +204,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     synchronized public void addSession(long id, int sessionTimeout) {
         sessionsWithTimeout.put(id, sessionTimeout);
         if (sessionsById.get(id) == null) {
-            Session s = new Session(id, 0);
+            SessionImpl s = new SessionImpl(id, sessionTimeout, 0);
             sessionsById.put(id, s);
             if (LOG.isTraceEnabled()) {
                 ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK,
@@ -221,7 +222,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     }
 
     synchronized public void checkSession(long sessionId, Object owner) throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException {
-        Session session = sessionsById.get(sessionId);
+        SessionImpl session = sessionsById.get(sessionId);
 		if (session == null) {
             throw new KeeperException.SessionExpiredException();
         }
@@ -233,7 +234,7 @@ public class SessionTrackerImpl extends Thread implements SessionTracker {
     }
 
 	synchronized public void setOwner(long id, Object owner) throws SessionExpiredException {
-		Session session = sessionsById.get(id);
+		SessionImpl session = sessionsById.get(id);
 		if (session == null) {
             throw new KeeperException.SessionExpiredException();
         }

+ 21 - 7
src/java/main/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -29,7 +29,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.jute.BinaryInputArchive;
 import org.apache.jute.BinaryOutputArchive;
@@ -45,6 +44,7 @@ import org.apache.zookeeper.data.Id;
 import org.apache.zookeeper.data.StatPersisted;
 import org.apache.zookeeper.jmx.MBeanRegistry;
 import org.apache.zookeeper.proto.RequestHeader;
+import org.apache.zookeeper.server.SessionTracker.Session;
 import org.apache.zookeeper.server.SessionTracker.SessionExpirer;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog.PlayBackListener;
@@ -322,19 +322,31 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
         }
     }
 
-    public void expire(long sessionId) {
-        LOG.info("Expiring session 0x" + Long.toHexString(sessionId));
+    public void expire(Session session) {
+        long sessionId = session.getSessionId();
+        LOG.info("Expiring session 0x" + Long.toHexString(sessionId)
+                + ", timeout of " + session.getTimeout() + "ms exceeded");
         close(sessionId);
     }
 
-    void touch(ServerCnxn cnxn) throws IOException {
+    public static class MissingSessionException extends IOException {
+        private static final long serialVersionUID = 7467414635467261007L;
+
+        public MissingSessionException(String msg) {
+            super(msg);
+        }
+    }
+    
+    void touch(ServerCnxn cnxn) throws MissingSessionException {
         if (cnxn == null) {
             return;
         }
         long id = cnxn.getSessionId();
         int to = cnxn.getSessionTimeout();
         if (!sessionTracker.touchSession(id, to)) {
-            throw new IOException("Missing session 0x" + Long.toHexString(id));
+            throw new MissingSessionException(
+                    "No session with sessionid 0x" + Long.toHexString(id)
+                    + " exists, probably expired and removed");
         }
     }
 
@@ -578,8 +590,10 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
                 LOG.warn("Dropping packet at server of type " + si.type);
                 // if invalid packet drop the packet.
             }
-        } catch (IOException e) {
-            LOG.warn("Ignoring unexpected exception", e);
+        } catch (MissingSessionException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Dropping request: " + e.getMessage());
+            }
         }
     }
 

+ 6 - 0
src/java/test/org/apache/zookeeper/TestableZooKeeper.java

@@ -69,4 +69,10 @@ public class TestableZooKeeper extends ZooKeeper {
             }
         }.start();
     }
+    
+    public boolean testableWaitForShutdown(int wait)
+        throws InterruptedException
+    {
+        return super.testableWaitForShutdown(wait);
+    }
 }

+ 2 - 2
src/java/test/org/apache/zookeeper/test/ClientBase.java

@@ -126,13 +126,13 @@ public abstract class ClientBase extends TestCase {
         }
     }
 
-    protected ZooKeeper createClient()
+    protected TestableZooKeeper createClient()
         throws IOException, InterruptedException
     {
         return createClient(hostPort);
     }
 
-    protected ZooKeeper createClient(String hp)
+    protected TestableZooKeeper createClient(String hp)
         throws IOException, InterruptedException
     {
         CountdownWatcher watcher = new CountdownWatcher();

+ 8 - 3
src/java/test/org/apache/zookeeper/test/ClientTest.java

@@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.ZooKeeper;
 import org.apache.zookeeper.KeeperException.Code;
@@ -614,8 +615,12 @@ public class ClientTest extends ClientBase {
         public void run() {
             try {
                 for (; current < count; current++) {
-                    ZooKeeper zk = createClient();
+                    TestableZooKeeper zk = createClient();
                     zk.close();
+                    // we've asked to close, wait for it to finish closing
+                    // all the sub-threads otw the selector may not be
+                    // closed when we check (false positive on test failure
+                    zk.testableWaitForShutdown(CONNECTION_TIMEOUT);
                 }
             } catch (Throwable t) {
                 LOG.error("test failed", t);
@@ -661,10 +666,10 @@ public class ClientTest extends ClientBase {
         }
 
         for (int i = 0; i < threads.length; i++) {
-            threads[i].join(600000);
+            threads[i].join(CONNECTION_TIMEOUT);
             assertTrue(threads[i].current == threads[i].count);
         }
-        
+
         // if this fails it means we are not cleaning up after the closed
         // sessions.
         assertTrue("open fds after test are not significantly higher than before",

+ 3 - 3
src/java/test/org/apache/zookeeper/test/HierarchicalQuorumTest.java

@@ -28,7 +28,7 @@ import java.util.Set;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.server.quorum.FastLeaderElection;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
@@ -251,13 +251,13 @@ public class HierarchicalQuorumTest extends ClientBase {
         }
     }
 
-    protected ZooKeeper createClient()
+    protected TestableZooKeeper createClient()
         throws IOException, InterruptedException
     {
         return createClient(hostPort);
     }
 
-    protected ZooKeeper createClient(String hp)
+    protected TestableZooKeeper createClient(String hp)
         throws IOException, InterruptedException
     {
         CountdownWatcher watcher = new CountdownWatcher();

+ 3 - 3
src/java/test/org/apache/zookeeper/test/QuorumBase.java

@@ -29,7 +29,7 @@ import java.util.Set;
 
 import org.apache.log4j.Logger;
 import org.apache.zookeeper.PortAssignment;
-import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.TestableZooKeeper;
 import org.apache.zookeeper.server.quorum.QuorumPeer;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.junit.After;
@@ -200,13 +200,13 @@ public class QuorumBase extends ClientBase {
         }
     }
 
-    protected ZooKeeper createClient()
+    protected TestableZooKeeper createClient()
         throws IOException, InterruptedException
     {
         return createClient(hostPort);
     }
 
-    protected ZooKeeper createClient(String hp)
+    protected TestableZooKeeper createClient(String hp)
         throws IOException, InterruptedException
     {
         CountdownWatcher watcher = new CountdownWatcher();