浏览代码

[ 1947090 ] Improper timeout tracking at clients

git-svn-id: https://svn.apache.org/repos/asf/hadoop/zookeeper/trunk@670933 13f79535-47bb-0310-9956-ffa450edef68
Benjamin Reed 17 年之前
父节点
当前提交
1a85483357

+ 68 - 50
zookeeper/java/src/com/yahoo/zookeeper/ClientCnxn.java

@@ -69,7 +69,7 @@ import com.yahoo.zookeeper.server.ZooLog;
  * connected to as needed.
  * 
  */
-public class ClientCnxn {
+class ClientCnxn {
     private static final Logger LOG = Logger.getLogger(ZooKeeperServer.class);
 
     private ArrayList<InetSocketAddress> serverAddrs = new ArrayList<InetSocketAddress>();
@@ -107,8 +107,6 @@ public class ClientCnxn {
 
     private int sessionTimeout;
 
-    private int timeout;
-
     private ZooKeeper zooKeeper;
 
     private long sessionId;
@@ -342,6 +340,35 @@ public class ClientCnxn {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    private void finishPacket(Packet p) {
+        p.finished = true;
+        if (p.cb == null) {
+            synchronized (p) {
+                p.notifyAll();
+            }
+        } else {
+            waitingEvents.add(p);
+        }
+    }
+
+    private void conLossPacket(Packet p) {
+        if (p.replyHeader == null) {
+            return;
+        }
+        switch(zooKeeper.state) {
+        case AUTH_FAILED:
+            p.replyHeader.setErr(KeeperException.Code.AuthFailed);
+            break;
+        case CLOSED:
+            p.replyHeader.setErr(KeeperException.Code.SessionExpired);
+            break;
+        default:
+            p.replyHeader.setErr(KeeperException.Code.ConnectionLoss);
+        }
+        finishPacket(p);
+    }
+
     long lastZxid;
 
     /**
@@ -376,11 +403,10 @@ public class ClientCnxn {
                 zooKeeper.state = States.CLOSED;
                 waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone,
                         Watcher.Event.KeeperStateExpired, null));
-                throw new IOException("Connect failed");
+                throw new IOException("Session Expired");
             }
             readTimeout = sessionTimeout * 2 / 3;
             connectTimeout = sessionTimeout / serverAddrs.size();
-            timeout = readTimeout / 2;
             sessionId = conRsp.getSessionId();
             sessionPasswd = conRsp.getPasswd();
             waitingEvents.add(new WatcherEvent(Watcher.Event.EventNone,
@@ -389,7 +415,6 @@ public class ClientCnxn {
 
         @SuppressWarnings("unchecked")
         void readResponse() throws IOException {
-            timeout = readTimeout / 2;
             ByteBufferInputStream bbis = new ByteBufferInputStream(
                     incomingBuffer);
             BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
@@ -441,18 +466,6 @@ public class ClientCnxn {
             finishPacket(p);
         }
 
-        @SuppressWarnings("unchecked")
-        private void finishPacket(Packet p) {
-            p.finished = true;
-            if (p.cb == null) {
-                synchronized (p) {
-                    p.notifyAll();
-                }
-            } else {
-                waitingEvents.add(p);
-            }
-        }
-
         /**
          * @return true if a packet was received
          * @param k
@@ -579,14 +592,6 @@ public class ClientCnxn {
             }
         }
 
-        private void conLossPacket(Packet p) {
-            if (p.replyHeader == null) {
-                return;
-            }
-            p.replyHeader.setErr(KeeperException.Code.ConnectionLoss);
-            finishPacket(p);
-        }
-
         private void sendPing() {
             RequestHeader h = new RequestHeader(-2, OpCode.ping);
             queuePacket(h, null, null, null, null, null, null);
@@ -641,77 +646,86 @@ public class ClientCnxn {
 
         @Override
         public void run() {
-            timeout = connectTimeout;
             long now = System.currentTimeMillis();
             long lastHeard = now;
-            int idle = 0;
+            long lastSend = now;
             while (zooKeeper.state.isAlive()) {
                 try {
                     if (sockKey == null) {
                         startConnect();
+                        lastSend = now;
                         lastHeard = now;
                     }
-                    int to = (int) (timeout - idle);
+                    int idleRecv = (int) (now - lastHeard);
+                    int idleSend = (int) (now - lastSend);
+                    int to = readTimeout - idleRecv;
+                    if (zooKeeper.state != States.CONNECTED) {
+                        to = connectTimeout - idleRecv;
+                    }
                     if (to <= 0) {
                         throw new IOException("TIMED OUT");
                     }
+                    if (zooKeeper.state == States.CONNECTED) {
+                        int timeToNextPing = readTimeout/2 - idleSend;
+                        if (timeToNextPing <= 0) {
+                            sendPing();
+                            lastSend = now;
+                            enableWrite();
+                        } else {
+                            if (timeToNextPing < to) {
+                                to = timeToNextPing;
+                            }
+                        }
+                    }
+                    
                     selector.select(to);
                     Set<SelectionKey> selected;
                     synchronized (this) {
                         selected = selector.selectedKeys();
                     }
+                    // Everything below and until we get back to the select is
+                    // non blocking, so time is effectively a constant. That is
+                    // Why we just have to do this once, here
                     now = System.currentTimeMillis();
-                    idle = (int) (now - lastHeard);
                     for (SelectionKey k : selected) {
                         SocketChannel sc = ((SocketChannel) k.channel());
                         if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                             if (sc.finishConnect()) {
                                 zooKeeper.state = States.CONNECTED;
-                                timeout = readTimeout / 2;
                                 lastHeard = now;
+                                lastSend = now;
                                 primeConnection(k);
                             }
                         } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
+                            if (outgoingQueue.size() > 0) {
+                                // We have something to send so it's the same
+                                // as if we do the send now.
+                                lastSend = now;
+                            }
                             if (doIO()) {
                                 lastHeard = now;
                             }
                         }
                     }
-                    // ZooLog.logWarn("interest = " +
-                    // Integer.toHexString(sockKey.interestOps()) + " ready = "
-                    // + Integer.toHexString(sockKey.readyOps()) + " PQq = " +
-                    // pendingQueue.size() + " timout = " + timeout + "
-                    // outgoingQueue = " + outgoingQueue.size());
                     if (zooKeeper.state == States.CONNECTED) {
-                        if (pendingQueue.size() == 0) {
-                            if (idle >= timeout && timeout == readTimeout / 2) {
-                                sendPing();
-                                timeout = readTimeout;
-                            }
-                        } else {
-                            timeout = readTimeout;
-                        }
                         if (outgoingQueue.size() > 0) {
                             enableWrite();
                         } else {
                             disableWrite();
                         }
-                    } else {
-                        timeout = connectTimeout;
                     }
                     selected.clear();
                 } catch (Exception e) {
-                    LOG.warn("Closing: " + e.getMessage());
+                    LOG.warn("Closing: ", e);
                     cleanup();
                     if (zooKeeper.state.isAlive()) {
                         waitingEvents.add(new WatcherEvent(Event.EventNone,
                                 Event.KeeperStateDisconnected, null));
                     }
 
-                    timeout = connectTimeout;
                     now = System.currentTimeMillis();
                     lastHeard = now;
-                    idle = 0;
+                    lastSend = now;
                 }
             }
             cleanup();
@@ -805,7 +819,11 @@ public class ClientCnxn {
             packet.cb = cb;
             packet.ctx = ctx;
             packet.path = path;
-            outgoingQueue.add(packet);
+            if (!zooKeeper.state.isAlive()) {
+                conLossPacket(packet);
+            } else {
+                outgoingQueue.add(packet);
+            }
         }
         synchronized (sendThread) {
             selector.wakeup();

+ 2 - 0
zookeeper/java/src/com/yahoo/zookeeper/KeeperException.java

@@ -100,6 +100,8 @@ public class KeeperException extends Exception {
             return "InvalidACL";
         case Code.AuthFailed:
             return "AuthFailed";
+        case Code.SessionExpired:
+            return "SessionExpired";
         default:
             return "Unknown error " + code;
         }

+ 1 - 1
zookeeper/java/src/com/yahoo/zookeeper/ZooKeeper.java

@@ -113,7 +113,7 @@ public class ZooKeeper {
         }
     }
 
-    States state;
+    volatile States state;
 
     ClientCnxn cnxn;
 

+ 31 - 2
zookeeper/test/com/yahoo/zookeeper/test/ClientTest.java

@@ -35,7 +35,7 @@ public class ClientTest extends TestCase implements Watcher {
     static File baseTest = new File(System.getProperty("build.test.dir", "build"));
     NIOServerCnxn.Factory f = null;
     File tmpDir = null;
-    private CountDownLatch clientConnected;
+    volatile private CountDownLatch clientConnected;
 
     protected void setUp() throws Exception {
         LOG.error("Client test setup");
@@ -76,13 +76,42 @@ public class ClientTest extends TestCase implements Watcher {
 
     private ZooKeeper createClient() throws KeeperException, IOException,InterruptedException{
         clientConnected=new CountDownLatch(1);
-		ZooKeeper zk = new ZooKeeper(hostPort, 30000, this);
+		ZooKeeper zk = new ZooKeeper(hostPort, 20000, this);
 		if(!clientConnected.await(CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS)){
 			fail("Unable to connect to server");
 		}
 		return zk;
     }
     
+    @Test
+    public void testPing() throws Exception {
+        ZooKeeper zkIdle = null;
+        ZooKeeper zkWatchCreator = null;
+        try {
+            zkIdle = createClient();
+            zkWatchCreator = createClient();
+            for(int i = 0 ; i < 30; i++) {
+                zkWatchCreator.create("/" + i, new byte[0], Ids.OPEN_ACL_UNSAFE, 0);
+            }
+            for(int i = 0 ; i < 30; i++) {
+                zkIdle.exists("/" + i, true);
+            }
+            for(int i = 0; i < 30; i++) {
+                Thread.sleep(1000);
+                zkWatchCreator.delete("/"+i, -1);
+            }
+            // The bug will manifest itself here because zkIdle will expire
+            zkIdle.exists("/0", false);
+        } finally {
+            if (zkIdle != null) {
+                zkIdle.close();
+            }
+            if (zkWatchCreator != null) {
+                zkWatchCreator.close();
+            }
+        }
+    }
+
     @Test
     public void testClient() throws KeeperException, IOException,
             InterruptedException {