فهرست منبع

ZOOKEEPER-3326: Add session/connection related metrics

Author: Jie Huang <jiehuang@fb.com>

Reviewers: eolivelli@apache.org, fangmin@apache.org

Closes #861 from jhuan31/ZOOKEEPER-3326
Jie Huang 6 سال پیش
والد
کامیت
d8db88914e

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxn.java

@@ -155,6 +155,7 @@ public class NIOServerCnxn extends ServerCnxn {
         if (incomingBuffer.remaining() != 0) { // have we read length bytes?
             int rc = sock.read(incomingBuffer); // sock is non-blocking, so ok
             if (rc < 0) {
+                ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
                 throw new EndOfStreamException(
                         "Unable to read additional data from client sessionid 0x"
                         + Long.toHexString(sessionId)
@@ -314,6 +315,7 @@ public class NIOServerCnxn extends ServerCnxn {
             if (k.isReadable()) {
                 int rc = sock.read(incomingBuffer);
                 if (rc < 0) {
+                    ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
                     throw new EndOfStreamException(
                             "Unable to read additional data from client sessionid 0x"
                             + Long.toHexString(sessionId)

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NIOServerCnxnFactory.java

@@ -576,6 +576,7 @@ public class NIOServerCnxnFactory extends ServerCnxnFactory {
                         continue;
                     }
                     for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
+                        ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
                         conn.close();
                     }
                 }

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java

@@ -124,6 +124,7 @@ public class NettyServerCnxn extends ServerCnxn {
                 }
             });
         } else {
+            ServerMetrics.getMetrics().CONNECTION_DROP_COUNT.add(1);
             channel.eventLoop().execute(this::releaseQueuedBuffer);
         }
     }

+ 18 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -122,6 +122,15 @@ public final class ServerMetrics {
         OUTSTANDING_CHANGES_REMOVED = metricsContext.getCounter("outstanding_changes_removed");
         PREP_PROCESS_TIME = metricsContext.getSummary("prep_process_time", DetailLevel.BASIC);
         CLOSE_SESSION_PREP_TIME = metricsContext.getSummary("close_session_prep_time", DetailLevel.ADVANCED);
+
+        REVALIDATE_COUNT = metricsContext.getCounter("revalidate_count");
+        CONNECTION_DROP_COUNT = metricsContext.getCounter("connection_drop_count");
+        CONNECTION_REVALIDATE_COUNT = metricsContext.getCounter("connection_revalidate_count");
+
+        // Expiry queue stats
+        SESSIONLESS_CONNECTIONS_EXPIRED = metricsContext.getCounter("sessionless_connections_expired");
+        STALE_SESSIONS_EXPIRED = metricsContext.getCounter("stale_sessions_expired");
+
     }
 
     /**
@@ -167,6 +176,15 @@ public final class ServerMetrics {
     public final Counter SNAP_COUNT;
     public final Counter COMMIT_COUNT;
     public final Counter CONNECTION_REQUEST_COUNT;
+
+    public final Counter REVALIDATE_COUNT;
+    public final Counter CONNECTION_DROP_COUNT;
+    public final Counter CONNECTION_REVALIDATE_COUNT;
+
+    // Expiry queue stats
+    public final Counter SESSIONLESS_CONNECTIONS_EXPIRED;
+    public final Counter STALE_SESSIONS_EXPIRED;
+
     // Connection throttling related
     public final Summary CONNECTION_TOKEN_DEFICIT;
     public final Counter CONNECTION_REJECTED;

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/SessionTrackerImpl.java

@@ -153,6 +153,7 @@ public class SessionTrackerImpl extends ZooKeeperCriticalThread implements
                 }
 
                 for (SessionImpl s : sessionExpiryQueue.poll()) {
+                    ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                     setSessionClosing(s.sessionId);
                     expirer.expire(s);
                 }

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ZooKeeperServer.java

@@ -1180,6 +1180,8 @@ public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
             }
             cnxn.setSessionId(sessionId);
             reopenSession(cnxn, sessionId, passwd, sessionTimeout);
+            ServerMetrics.getMetrics().CONNECTION_REVALIDATE_COUNT.add(1);
+
         }
     }
 

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/LearnerHandler.java

@@ -582,6 +582,7 @@ public class LearnerHandler extends ZooKeeperThread {
                     }
                     break;
                 case Leader.REVALIDATE:
+                    ServerMetrics.getMetrics().REVALIDATE_COUNT.add(1);
                     learnerMaster.revalidateSession(qp, this);
                     break;
                 case Leader.REQUEST:

+ 239 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/ConnectionMetricsTest.java

@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import io.netty.channel.Channel;
+import io.netty.channel.EventLoop;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZKTestCase;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.apache.zookeeper.server.NIOServerCnxnFactory.ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConnectionMetricsTest extends ZKTestCase {
+    protected static final Logger LOG = LoggerFactory.getLogger(ConnectionMetricsTest.class);
+
+    @Test
+    public void testRevalidateCount() throws  Exception {
+        ServerMetrics.getMetrics().resetAll();
+        QuorumUtil util = new QuorumUtil(1); // create a quorum of 3 servers
+        // disable local session to make sure we create a global session
+        util.enableLocalSession(false);
+        util.startAll();
+
+        int follower1 = (int)util.getFollowerQuorumPeers().get(0).getId();
+        int follower2 = (int)util.getFollowerQuorumPeers().get(1).getId();
+        LOG.info("connecting to server: {}", follower1);
+        ClientBase.CountdownWatcher watcher = new ClientBase.CountdownWatcher();
+        // create a connection to follower
+        ZooKeeper zk = new ZooKeeper(util.getConnectionStringForServer(follower1), ClientBase.CONNECTION_TIMEOUT, watcher);
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        LOG.info("connected");
+
+        // update the connection to allow to connect to the other follower
+        zk.updateServerList(util.getConnectionStringForServer(follower2));
+
+        // follower is shut down and zk should be disconnected
+        util.shutdown(follower1);
+        watcher.waitForDisconnected(ClientBase.CONNECTION_TIMEOUT);
+        LOG.info("disconnected");
+        // should reconnect to another follower, will ask leader to revalidate
+        watcher.waitForConnected(ClientBase.CONNECTION_TIMEOUT);
+        LOG.info("reconnected");
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(1L, values.get("connection_revalidate_count"));
+        Assert.assertEquals(1L, values.get("revalidate_count"));
+
+        zk.close();
+        util.shutdownAll();
+    }
+
+
+    private class MockNIOServerCnxn extends  NIOServerCnxn {
+        public MockNIOServerCnxn(ZooKeeperServer zk, SocketChannel sock,
+                                 SelectionKey sk, NIOServerCnxnFactory factory,
+                                 NIOServerCnxnFactory.SelectorThread selectorThread) throws IOException {
+            super(zk, sock, sk, factory, selectorThread);
+        }
+
+        @Override
+        protected boolean isSocketOpen() {
+            return true;
+        }
+    }
+
+    private static class FakeSK extends SelectionKey {
+
+        @Override
+        public SelectableChannel channel() {
+            return null;
+        }
+
+        @Override
+        public Selector selector() {
+            return mock(Selector.class);
+        }
+
+        @Override
+        public boolean isValid() {
+            return true;
+        }
+
+        @Override
+        public void cancel() {
+        }
+
+        @Override
+        public int interestOps() {
+            return ops;
+        }
+
+        private int ops = OP_WRITE + OP_READ;
+
+        @Override
+        public SelectionKey interestOps(int ops) {
+            this.ops = ops;
+            return this;
+        }
+
+        @Override
+        public int readyOps() {
+            return ops;
+        }
+
+    }
+
+    private NIOServerCnxn createMockNIOCnxn() throws IOException {
+        InetSocketAddress socketAddr = new InetSocketAddress(80);
+        Socket socket = mock(Socket.class);
+        when(socket.getRemoteSocketAddress()).thenReturn(socketAddr);
+        SocketChannel sock = mock(SocketChannel.class);
+        when(sock.socket()).thenReturn(socket);
+        when(sock.read(any(ByteBuffer.class))).thenReturn(-1);
+
+        return new MockNIOServerCnxn(mock(ZooKeeperServer.class), sock, null, mock(NIOServerCnxnFactory.class), null);
+    }
+
+    @Test
+    public void testNIOConnectionDropCount() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+
+        NIOServerCnxn cnxn = createMockNIOCnxn();
+        cnxn.doIO(new FakeSK());
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(1L, values.get("connection_drop_count"));
+    }
+
+    @Test
+    public void testNettyConnectionDropCount() throws Exception {
+        InetSocketAddress socketAddr = new InetSocketAddress(80);
+        Channel channel = mock(Channel.class);
+        when(channel.isOpen()).thenReturn(false);
+        when(channel.remoteAddress()).thenReturn(socketAddr);
+        EventLoop eventLoop = mock(EventLoop.class);
+        when(channel.eventLoop()).thenReturn(eventLoop);
+
+        ServerMetrics.getMetrics().resetAll();
+
+        NettyServerCnxnFactory factory = new NettyServerCnxnFactory();
+        NettyServerCnxn cnxn = new NettyServerCnxn(channel, mock(ZooKeeperServer.class), factory);
+
+        // pretend it's connected
+        factory.cnxns.add(cnxn);
+        cnxn.close();
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(1L, values.get("connection_drop_count"));
+    }
+
+    @Test
+    public void testSessionlessConnectionsExpired() throws Exception {
+        ServerCnxnFactory factory = new NIOServerCnxnFactory();
+        factory.configure(new InetSocketAddress(PortAssignment.unique()), 1000);
+        factory.start();
+        int timeout = Integer.getInteger(
+                ZOOKEEPER_NIO_SESSIONLESS_CNXN_TIMEOUT, 10000);
+
+        ServerMetrics.getMetrics().resetAll();
+        // add two connections w/o touching them so they will expire
+        ((NIOServerCnxnFactory) factory).touchCnxn(createMockNIOCnxn());
+        ((NIOServerCnxnFactory) factory).touchCnxn(createMockNIOCnxn());
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        int sleptTime = 0;
+        while (values.get("sessionless_connections_expired") == null || sleptTime < 2*timeout){
+            Thread.sleep(100);
+            sleptTime += 100;
+            values = MetricsUtils.currentServerMetrics();
+        }
+
+        Assert.assertEquals(2L, values.get("sessionless_connections_expired"));
+
+        factory.shutdown();
+    }
+
+    @Test
+    public void testStaleSessionsExpired() throws Exception {
+        int tickTime = 1000;
+        SessionTrackerImpl tracker = new SessionTrackerImpl(mock(ZooKeeperServer.class),
+                new ConcurrentHashMap<>(), tickTime, 1L, null);
+
+        tracker.sessionsById.put(1L, mock(SessionTrackerImpl.SessionImpl.class));
+        tracker.sessionsById.put(2L, mock(SessionTrackerImpl.SessionImpl.class));
+
+        tracker.touchSession(1L, tickTime);
+        tracker.touchSession(2L, tickTime);
+
+        ServerMetrics.getMetrics().resetAll();
+
+        tracker.start();
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        int sleptTime = 0;
+        while (values.get("stale_sessions_expired") == null || sleptTime < 2*tickTime) {
+            Thread.sleep(100);
+            sleptTime += 100;
+            values = MetricsUtils.currentServerMetrics();
+        }
+
+        Assert.assertEquals(2L, values.get("stale_sessions_expired"));
+
+        tracker.shutdown();
+    }
+}