Bläddra i källkod

ZOOKEEPER-3310: Add metrics for prep processor

Author: Jie Huang <jiehuang@fb.com>

Reviewers: andor@apache.org

Closes #855 from jhuan31/ZOOKEEPER-3310 and squashes the following commits:

ab0b81c2b [Jie Huang] move to the new metric framework
8f4d53942 [Jie Huang] remove sleep in tests
9cf1fc064 [Jie Huang] Reconstructed unit test. Add left out metric
b07aa8c24 [Jie Huang] ZOOKEEPER-3310: Add metrics for prep processor
Jie Huang 6 år sedan
förälder
incheckning
ff47fc3dfd

+ 1 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/FinalRequestProcessor.java

@@ -126,6 +126,7 @@ public class FinalRequestProcessor implements RequestProcessor {
                 while (!zks.outstandingChanges.isEmpty()
                        && zks.outstandingChanges.peek().zxid <= zxid) {
                     ChangeRecord cr = zks.outstandingChanges.remove();
+                    ServerMetrics.getMetrics().OUTSTANDING_CHANGES_REMOVED.add(1);
                     if (cr.zxid < zxid) {
                         LOG.warn("Zxid outstanding " + cr.zxid
                                  + " is less than current " + zxid);

+ 10 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/PrepRequestProcessor.java

@@ -131,6 +131,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
     public void run() {
         try {
             while (true) {
+                ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_SIZE.add(submittedRequests.size());
                 Request request = submittedRequests.take();
                 long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                 if (request.type == OpCode.ping) {
@@ -142,7 +143,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
                 if (Request.requestOfDeath == request) {
                     break;
                 }
+                long prepStartTime = Time.currentElapsedTime();
                 pRequest(request);
+                ServerMetrics.getMetrics().PREP_PROCESS_TIME.add(Time.currentElapsedTime() - prepStartTime);
             }
         } catch (RequestProcessorException e) {
             if (e.getCause() instanceof XidRolloverException) {
@@ -183,10 +186,11 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
         }
     }
 
-    private void addChangeRecord(ChangeRecord c) {
+    protected void addChangeRecord(ChangeRecord c) {
         synchronized (zks.outstandingChanges) {
             zks.outstandingChanges.add(c);
             zks.outstandingChangesForPath.put(c.path, c);
+            ServerMetrics.getMetrics().OUTSTANDING_CHANGES_QUEUED.add(1);
         }
     }
 
@@ -588,6 +592,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
                 // queues up this operation without being the session owner.
                 // this request is the last of the session so it should be ok
                 //zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
+                long startTime =  Time.currentElapsedTime();
                 Set<String> es = zks.getZKDatabase()
                         .getEphemerals(request.sessionId);
                 synchronized (zks.outstandingChanges) {
@@ -605,6 +610,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
 
                     zks.sessionTracker.setSessionClosing(request.sessionId);
                 }
+                ServerMetrics.getMetrics().CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
                 break;
             case OpCode.check:
                 zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
@@ -902,6 +908,7 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
             }
         }
         request.zxid = zks.getZxid();
+        ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUE_TIME.add(Time.currentElapsedTime() - request.prepQueueStartTime);
         nextProcessor.processRequest(request);
     }
 
@@ -1005,7 +1012,9 @@ public class PrepRequestProcessor extends ZooKeeperCriticalThread implements
     }
 
     public void processRequest(Request request) {
+        request.prepQueueStartTime =  Time.currentElapsedTime();
         submittedRequests.add(request);
+        ServerMetrics.getMetrics().PREP_PROCESSOR_QUEUED.add(1);
     }
 
     public void shutdown() {

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -78,6 +78,8 @@ public class Request {
 
     public final long createTime = Time.currentElapsedTime();
 
+    public long prepQueueStartTime= -1;
+
     private Object owner;
 
     private KeeperException e;

+ 16 - 1
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -99,7 +99,7 @@ public final class ServerMetrics {
 
 
         /*
-     * Number of dead watchers in DeadWatcherListener
+         * Number of dead watchers in DeadWatcherListener
          */
         ADD_DEAD_WATCHER_STALL_TIME = metricsContext.getCounter("add_dead_watcher_stall_time");
         DEAD_WATCHERS_QUEUED = metricsContext.getCounter("dead_watchers_queued");
@@ -115,6 +115,13 @@ public final class ServerMetrics {
 
         ENSEMBLE_AUTH_SKIP = metricsContext.getCounter("ensemble_auth_skip");
 
+        PREP_PROCESSOR_QUEUE_TIME = metricsContext.getSummary("prep_processor_queue_time_ms", DetailLevel.ADVANCED);
+        PREP_PROCESSOR_QUEUE_SIZE = metricsContext.getSummary("prep_processor_queue_size", DetailLevel.BASIC);
+        PREP_PROCESSOR_QUEUED = metricsContext.getCounter("prep_processor_request_queued");
+        OUTSTANDING_CHANGES_QUEUED = metricsContext.getCounter("outstanding_changes_queued");
+        OUTSTANDING_CHANGES_REMOVED = metricsContext.getCounter("outstanding_changes_removed");
+        PREP_PROCESS_TIME = metricsContext.getSummary("prep_process_time", DetailLevel.BASIC);
+        CLOSE_SESSION_PREP_TIME = metricsContext.getSummary("close_session_prep_time", DetailLevel.ADVANCED);
     }
 
     /**
@@ -169,6 +176,14 @@ public final class ServerMetrics {
     public final SummarySet READ_PER_NAMESPACE;
     public final Counter BYTES_RECEIVED_COUNT;
 
+    public final Summary PREP_PROCESSOR_QUEUE_TIME;
+    public final Summary PREP_PROCESSOR_QUEUE_SIZE;
+    public final Counter PREP_PROCESSOR_QUEUED;
+    public final Counter OUTSTANDING_CHANGES_QUEUED;
+    public final Counter OUTSTANDING_CHANGES_REMOVED;
+    public final Summary PREP_PROCESS_TIME;
+    public final Summary CLOSE_SESSION_PREP_TIME;
+
     /**
      * Fired watcher stats.
      */

+ 176 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/PrepRequestProcessorMetricsTest.java

@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server;
+
+import org.apache.jute.BinaryOutputArchive;
+import org.apache.jute.Record;
+import org.apache.zookeeper.*;
+import org.apache.zookeeper.data.StatPersisted;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.proto.DeleteRequest;
+import org.apache.zookeeper.proto.SetDataRequest;
+import org.apache.zookeeper.test.ClientBase;
+import org.apache.zookeeper.test.QuorumUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.*;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.*;
+
+public class PrepRequestProcessorMetricsTest extends ZKTestCase {
+    private static final Logger LOG = LoggerFactory.getLogger(PrepRequestProcessorMetricsTest.class);
+
+    ZooKeeperServer zks;
+    RequestProcessor nextProcessor;
+
+    @Before
+    public void setup() {
+        zks = spy(new ZooKeeperServer());
+        zks.sessionTracker = mock(SessionTracker.class);
+
+        ZKDatabase db = mock(ZKDatabase.class);
+        when(zks.getZKDatabase()).thenReturn(db);
+
+        DataNode node = new DataNode(new byte[1], null, mock(StatPersisted.class));
+        when(db.getNode(anyString())).thenReturn(node);
+
+        Set<String> ephemerals = new HashSet<>();
+        ephemerals.add("/crystalmountain");
+        ephemerals.add("/stevenspass");
+        when(db.getEphemerals(anyLong())).thenReturn(ephemerals);
+
+        nextProcessor = mock(RequestProcessor.class);
+        ServerMetrics.getMetrics().resetAll();
+    }
+
+    private Request createRequest(Record record, int opCode) throws IOException {
+        // encoding
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
+        record.serialize(boa, "request");
+        baos.close();
+        return new Request(null, 1l, 0, opCode, ByteBuffer.wrap(baos.toByteArray()), null);
+    }
+
+    private Request createRequest(String path, int opCode) throws IOException {
+        Record record;
+        switch (opCode) {
+            case ZooDefs.OpCode.setData:
+                record = new SetDataRequest(path, new byte[0], -1);
+                break;
+            case ZooDefs.OpCode.delete:
+                record = new DeleteRequest(path, -1);
+                break;
+            default:
+                record = new DeleteRequest(path, -1);
+                break;
+        }
+
+        return createRequest(record, opCode);
+    }
+
+    private Request createRequest(long sessionId, int opCode) {
+        return new Request(null, sessionId, 0, opCode, null, null);
+    }
+
+    @Test
+    public void testPrepRequestProcessorMetrics() throws Exception {
+        CountDownLatch threeRequests = new CountDownLatch(3);
+        doAnswer(invocationOnMock -> {
+            threeRequests.countDown();
+            return  null;}).when(nextProcessor).processRequest(any(Request.class));
+
+        PrepRequestProcessor prepRequestProcessor = new PrepRequestProcessor(zks, nextProcessor);
+        PrepRequestProcessor.skipACL = true;
+
+        //setData will generate one change
+        prepRequestProcessor.processRequest(createRequest("/foo", ZooDefs.OpCode.setData));
+        //delete will generate two changes, one for itself, one for its parent
+        prepRequestProcessor.processRequest(createRequest("/foo/bar", ZooDefs.OpCode.delete));
+        //mocking two ephemeral nodes exists for this session so two changes
+        prepRequestProcessor.processRequest(createRequest(2, ZooDefs.OpCode.closeSession));
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(3L, values.get("prep_processor_request_queued"));
+
+        prepRequestProcessor.start();
+
+        threeRequests.await(500, TimeUnit.MILLISECONDS);
+
+        values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(3L, values.get("max_prep_processor_queue_size"));
+
+        Assert.assertThat((long)values.get("min_prep_processor_queue_time_ms"), greaterThan(0l));
+        Assert.assertEquals(3L, values.get("cnt_prep_processor_queue_time_ms"));
+
+        Assert.assertEquals(3L, values.get("cnt_prep_process_time"));
+        Assert.assertThat((long)values.get("max_prep_process_time"), greaterThan(0l));
+
+        Assert.assertEquals(1L, values.get("cnt_close_session_prep_time"));
+        Assert.assertThat((long)values.get("max_close_session_prep_time"), greaterThanOrEqualTo(0L));
+
+        Assert.assertEquals(5L, values.get("outstanding_changes_queued"));
+    }
+
+    private class SimpleWatcher implements Watcher {
+        CountDownLatch created;
+        public SimpleWatcher(CountDownLatch latch) {
+            this.created = latch;
+        }
+        @Override
+        public void process(WatchedEvent e) {
+            created.countDown();
+        }
+    }
+
+    @Test
+    public void testOutstandingChangesRemoved() throws Exception {
+        // this metric is currently recorded in FinalRequestProcessor but it is tightly related to the Prep metrics
+        QuorumUtil util = new QuorumUtil(1);
+        util.startAll();
+
+        ServerMetrics.getMetrics().resetAll();
+
+        ZooKeeper zk = ClientBase.createZKClient(util.getConnString());
+        zk.create("/test", new byte[50], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        CountDownLatch created = new CountDownLatch(1);
+        zk.exists("/test", new SimpleWatcher(created));
+        created.await(200, TimeUnit.MILLISECONDS);
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertThat((long)values.get("outstanding_changes_removed"), greaterThan(0L));
+
+        util.shutdownAll();
+    }
+}