فهرست منبع

ZOOKEEPER-3309: Add sync processor metrics

Author: Jie Huang <jiehuang@fb.com>

Reviewers: fangmin@apache.org, andor@apache.org, eolivelli@apache.org

Closes #850 from jhuan31/ZOOKEEPER-3309
Jie Huang 6 سال پیش
والد
کامیت
5cb10bb5c0

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -88,6 +88,8 @@ public class Request {
 
     public long commitRecvTime = -1;
 
+    public long syncQueueStartTime;
+
     private Object owner;
 
     private KeeperException e;

+ 17 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -202,6 +202,14 @@ public final class ServerMetrics {
         STARTUP_TXNS_LOAD_TIME = metricsContext.getSummary("startup_txns_load_time", DetailLevel.BASIC);
         STARTUP_SNAP_LOAD_TIME = metricsContext.getSummary("startup_snap_load_time", DetailLevel.BASIC);
 
+        SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME = metricsContext.getSummary("sync_processor_queue_and_flush_time_ms", DetailLevel.ADVANCED);
+        SYNC_PROCESSOR_QUEUE_SIZE = metricsContext.getSummary("sync_processor_queue_size", DetailLevel.BASIC);
+        SYNC_PROCESSOR_QUEUED = metricsContext.getCounter("sync_processor_request_queued");
+        SYNC_PROCESSOR_QUEUE_TIME = metricsContext.getSummary("sync_processor_queue_time_ms", DetailLevel.ADVANCED);
+        SYNC_PROCESSOR_FLUSH_TIME = metricsContext.getSummary("sync_processor_queue_flush_time_ms", DetailLevel.ADVANCED);
+        SYNC_PROCESS_TIME = metricsContext.getSummary("sync_process_time", DetailLevel.BASIC);
+
+        BATCH_SIZE = metricsContext.getSummary("sync_processor_batch_size", DetailLevel.BASIC);
     }
 
     /**
@@ -283,6 +291,15 @@ public final class ServerMetrics {
     public final Summary STARTUP_TXNS_LOAD_TIME;
     public final Summary STARTUP_SNAP_LOAD_TIME;
 
+    public final Summary SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME;
+    public final Summary SYNC_PROCESSOR_QUEUE_SIZE;
+    public final Counter SYNC_PROCESSOR_QUEUED;
+    public final Summary SYNC_PROCESSOR_QUEUE_TIME;
+    public final Summary SYNC_PROCESSOR_FLUSH_TIME;
+    public final Summary SYNC_PROCESS_TIME;
+
+    public final Summary BATCH_SIZE;
+
     /**
      * Fired watcher stats.
      */

+ 16 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/SyncRequestProcessor.java

@@ -165,6 +165,8 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
             resetSnapshotStats();
             lastFlushTime = Time.currentElapsedTime();
             while (true) {
+                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_SIZE.add(queuedRequests.size());
+
                 long pollTime = Math.min(zks.getMaxWriteQueuePollTime(), getRemainingDelay());
                 Request si = queuedRequests.poll(pollTime, TimeUnit.MILLISECONDS);
                 if (si == null) {
@@ -177,6 +179,10 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
                     break;
                 }
 
+                long startProcessTime = Time.currentElapsedTime();
+                ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_TIME.add(
+                                    startProcessTime - si.syncQueueStartTime);
+
                 // track the number of records written to the log
                 if (zks.getZKDatabase().append(si)) {
                     if (shouldSnapshot()) {
@@ -217,6 +223,7 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
                 if (shouldFlush()) {
                     flush();
                 }
+                ServerMetrics.getMetrics().SYNC_PROCESS_TIME.add(Time.currentElapsedTime() - startProcessTime);
             }
         } catch (Throwable t) {
             handleException(this.getName(), t);
@@ -229,13 +236,19 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
           return;
       }
 
+      ServerMetrics.getMetrics().BATCH_SIZE.add(toFlush.size());
+
+      long flushStartTime = Time.currentElapsedTime();
       zks.getZKDatabase().commit();
+      ServerMetrics.getMetrics().SYNC_PROCESSOR_FLUSH_TIME.add(Time.currentElapsedTime() - flushStartTime);
 
       if (this.nextProcessor == null) {
         this.toFlush.clear();
       } else {
           while (!this.toFlush.isEmpty()) {
               final Request i = this.toFlush.remove();
+              long latency = Time.currentElapsedTime() - i.syncQueueStartTime;
+              ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUE_AND_FLUSH_TIME.add(latency);
               this.nextProcessor.processRequest(i);
           }
           if (this.nextProcessor instanceof Flushable) {
@@ -266,7 +279,10 @@ public class SyncRequestProcessor extends ZooKeeperCriticalThread implements
 
     public void processRequest(final Request request) {
         Objects.requireNonNull(request, "Request cannot be null");
+
+        request.syncQueueStartTime = Time.currentElapsedTime();
         queuedRequests.add(request);
+        ServerMetrics.getMetrics().SYNC_PROCESSOR_QUEUED.add(1);
     }
 
 }

+ 104 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/SyncRequestProcessorMetricTest.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.*;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+import static org.hamcrest.number.OrderingComparison.greaterThanOrEqualTo;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class SyncRequestProcessorMetricTest {
+    ZooKeeperServer zks;
+    RequestProcessor nextProcessor;
+    CountDownLatch allRequestsFlushed;
+
+    @Before
+    public void setup() throws Exception {
+        ZKDatabase db = mock(ZKDatabase.class);
+        when(db.append(any(Request.class))).thenReturn(true);
+        doAnswer(invocation->{
+            Thread.sleep(100);
+            return null;
+        }).when(db).commit();
+        zks = mock(ZooKeeperServer.class);
+        when(zks.getZKDatabase()).thenReturn(db);
+
+        nextProcessor = mock(RequestProcessor.class);
+        doAnswer(invocationOnMock -> {
+            allRequestsFlushed.countDown();
+            return null;
+        }).when(nextProcessor).processRequest(any(Request.class));
+    }
+
+    private Request createRquest(long sessionId, int xid) {
+        return new Request(null, sessionId, xid, ZooDefs.OpCode.setData,
+                ByteBuffer.wrap(new byte[10]), null);
+    }
+
+    @Test
+    public void testSyncProcessorMetrics() throws  Exception{
+        SyncRequestProcessor syncProcessor = new SyncRequestProcessor(zks, nextProcessor);
+        for (int i=0; i<500; i++) {
+            syncProcessor.processRequest(createRquest(1, i));
+        }
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        Assert.assertEquals(500L, values.get("sync_processor_request_queued"));
+
+        allRequestsFlushed = new CountDownLatch(500);
+        syncProcessor.start();
+
+        allRequestsFlushed.await(5000, TimeUnit.MILLISECONDS);
+
+        values = MetricsUtils.currentServerMetrics();
+
+        Assert.assertEquals(501L, values.get("cnt_sync_processor_queue_size"));
+        Assert.assertEquals(500L, values.get("max_sync_processor_queue_size"));
+        Assert.assertEquals(0L, values.get("min_sync_processor_queue_size"));
+
+        Assert.assertEquals(500L, values.get("cnt_sync_processor_queue_time_ms"));
+        Assert.assertThat((long)values.get("max_sync_processor_queue_time_ms"), greaterThan(0L));
+
+        Assert.assertEquals(500L, values.get("cnt_sync_processor_queue_and_flush_time_ms"));
+        Assert.assertThat((long)values.get("max_sync_processor_queue_and_flush_time_ms"), greaterThan(0L));
+
+        Assert.assertEquals(500L, values.get("cnt_sync_process_time"));
+        Assert.assertThat((long)values.get("max_sync_process_time"), greaterThan(0L));
+
+        Assert.assertEquals(500L, values.get("max_sync_processor_batch_size"));
+        Assert.assertEquals(1L, values.get("cnt_sync_processor_queue_flush_time_ms"));
+        Assert.assertThat((long)values.get("max_sync_processor_queue_flush_time_ms"), greaterThanOrEqualTo(100L));
+
+        syncProcessor.shutdown();
+    }
+}