فهرست منبع

ZOOKEEPER-3319: Add metrics for follower and observer

Author: Jie Huang <jiehuang@fb.com>

Reviewers: eolivelli@apache.org, fangmin@apache.org

Closes #856 from jhuan31/ZOOKEEPER-3319
Jie Huang 6 سال پیش
والد
کامیت
36bca12d5e

+ 36 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/Request.java

@@ -26,6 +26,8 @@ import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.ZooDefs.OpCode;
 import org.apache.zookeeper.common.Time;
 import org.apache.zookeeper.data.Id;
+import org.apache.zookeeper.metrics.Summary;
+import org.apache.zookeeper.metrics.SummarySet;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.txn.TxnHeader;
 
@@ -311,4 +313,38 @@ public class Request {
     public KeeperException getException() {
         return e;
     }
+
+    public void logLatency(Summary metric) {
+        logLatency(metric, Time.currentWallTime());
+    }
+
+    public void logLatency(Summary metric, long currentTime){
+        if (hdr != null) {
+            /* Request header is created by leader. If there is clock drift
+             * latency might be negative. Headers use wall time, not
+             * CLOCK_MONOTONIC.
+             */
+            long latency = currentTime - hdr.getTime();
+            if (latency > 0) {
+                metric.add(latency);
+            }
+        }
+    }
+
+    public void logLatency(SummarySet metric, String key, long currentTime) {
+        if (hdr != null) {
+            /* Request header is created by leader. If there is clock drift
+             * latency might be negative. Headers use wall time, not
+             * CLOCK_MONOTONIC.
+             */
+            long latency = currentTime - hdr.getTime();
+            if (latency > 0) {
+                metric.add(key, latency);
+            }
+        }
+    }
+
+    public void logLatency(SummarySet metric, String key) {
+        logLatency(metric, key, Time.currentWallTime());
+    }
 }

+ 12 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/ServerMetrics.java

@@ -186,6 +186,12 @@ public final class ServerMetrics {
         READ_FINAL_PROC_TIME = metricsContext.getSummary("read_final_proc_time_ms", DetailLevel.ADVANCED);
         WRITE_FINAL_PROC_TIME = metricsContext.getSummary("write_final_proc_time_ms", DetailLevel.ADVANCED);
 
+        PROPOSAL_LATENCY = metricsContext.getSummary("proposal_latency", DetailLevel.ADVANCED);
+        PROPOSAL_ACK_CREATION_LATENCY = metricsContext.getSummary("proposal_ack_creation_latency", DetailLevel.ADVANCED);
+        COMMIT_PROPAGATION_LATENCY = metricsContext.getSummary("commit_propagation_latency", DetailLevel.ADVANCED);
+        LEARNER_PROPOSAL_RECEIVED_COUNT = metricsContext.getCounter("learner_proposal_received_count");
+        LEARNER_COMMIT_RECEIVED_COUNT = metricsContext.getCounter("learner_commit_received_count");
+
     }
 
     /**
@@ -257,6 +263,12 @@ public final class ServerMetrics {
     public final Summary PREP_PROCESS_TIME;
     public final Summary CLOSE_SESSION_PREP_TIME;
 
+    public final Summary PROPOSAL_LATENCY;
+    public final Summary PROPOSAL_ACK_CREATION_LATENCY;
+    public final Summary COMMIT_PROPAGATION_LATENCY;
+    public final Counter LEARNER_PROPOSAL_RECEIVED_COUNT;
+    public final Counter LEARNER_COMMIT_RECEIVED_COUNT;
+
     /**
      * Fired watcher stats.
      */

+ 15 - 2
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Follower.java

@@ -141,7 +141,8 @@ public class Follower extends Learner{
         case Leader.PING:            
             ping(qp);            
             break;
-        case Leader.PROPOSAL:           
+        case Leader.PROPOSAL:
+            ServerMetrics.getMetrics().LEARNER_PROPOSAL_RECEIVED_COUNT.add(1);
             TxnHeader hdr = new TxnHeader();
             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
             if (hdr.getZxid() != lastQueued + 1) {
@@ -159,12 +160,24 @@ public class Follower extends Learner{
             }
             
             fzk.logRequest(hdr, txn);
-
+            if (hdr != null) {
+                /*
+                 * Request header is created only by the leader, so this is only set
+                 * for quorum packets. If there is a clock drift, the latency may be
+                 * negative. Headers use wall time, not CLOCK_MONOTONIC.
+                 */
+                long now = Time.currentWallTime();
+                long latency = now - hdr.getTime();
+                if (latency > 0) {
+                    ServerMetrics.getMetrics().PROPOSAL_LATENCY.add(latency);
+                }
+            }
             if (om != null) {
                 om.proposalReceived(qp);
             }
             break;
         case Leader.COMMIT:
+            ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
             fzk.commit(qp.getZxid());
             if (om != null) {
                 om.proposalCommitted(qp.getZxid());

+ 2 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/FollowerZooKeeperServer.java

@@ -33,6 +33,7 @@ import org.apache.zookeeper.server.RequestProcessor;
 import org.apache.zookeeper.server.SyncRequestProcessor;
 import org.apache.zookeeper.server.ZKDatabase;
 import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.txn.TxnHeader;
 
 import javax.management.JMException;
@@ -113,6 +114,7 @@ public class FollowerZooKeeperServer extends LearnerZooKeeperServer {
             System.exit(ExitCode.UNMATCHED_TXN_COMMIT.getValue());
         }
         Request request = pendingTxns.remove();
+        request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
         commitProcessor.commit(request);
     }
 

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/Observer.java

@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.jute.Record;
 import org.apache.zookeeper.server.ObserverBean;
 import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerMetrics;
 import org.apache.zookeeper.server.quorum.QuorumPeer.QuorumServer;
 import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
 import org.apache.zookeeper.server.util.SerializeUtils;
@@ -166,9 +167,11 @@ public class Observer extends Learner{
             ((ObserverZooKeeperServer)zk).sync();
             break;
         case Leader.INFORM:
+            ServerMetrics.getMetrics().LEARNER_COMMIT_RECEIVED_COUNT.add(1);
             TxnHeader hdr = new TxnHeader();
             Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
             Request request = new Request (hdr.getClientId(),  hdr.getCxid(), hdr.getType(), hdr, txn, 0);
+            request.logLatency(ServerMetrics.getMetrics().COMMIT_PROPAGATION_LATENCY);
             ObserverZooKeeperServer obs = (ObserverZooKeeperServer)zk;
             obs.commitRequest(request);
             break;

+ 3 - 0
zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/SendAckRequestProcessor.java

@@ -21,6 +21,7 @@ package org.apache.zookeeper.server.quorum;
 import java.io.Flushable;
 import java.io.IOException;
 
+import org.apache.zookeeper.server.ServerMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,6 +43,8 @@ public class SendAckRequestProcessor implements RequestProcessor, Flushable {
             QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,
                 null);
             try {
+                si.logLatency(ServerMetrics.getMetrics().PROPOSAL_ACK_CREATION_LATENCY);
+
                 learner.writePacket(qp, false);
             } catch (IOException e) {
                 LOG.warn("Closing connection to leader, exception during packet send", e);

+ 92 - 0
zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/LearnerMetricsTest.java

@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *uuuuu
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "/RequuuAS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.quorum;
+
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.PortAssignment;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.metrics.MetricsUtils;
+import org.apache.zookeeper.server.ServerMetrics;
+import org.apache.zookeeper.test.ClientBase;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.hamcrest.number.OrderingComparison.greaterThan;
+
+public class LearnerMetricsTest extends QuorumPeerTestBase {
+
+    @Test
+    public void testLearnerMetricsTest() throws Exception {
+        ServerMetrics.getMetrics().resetAll();
+        ClientBase.setupTestEnv();
+
+        final int SERVER_COUNT = 6; // 5 participants, 1 observer
+        final String path = "/zk-testLeanerMetrics";
+        final byte[] data = new byte[512];
+        final int clientPorts[] = new int[SERVER_COUNT];
+        StringBuilder sb = new StringBuilder();
+        int observer = 0 ;
+        clientPorts[observer] = PortAssignment.unique();
+        sb.append("server."+observer+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+":observer\n");
+        for(int i = 1; i < SERVER_COUNT; i++) {
+            clientPorts[i] = PortAssignment.unique();
+            sb.append("server."+i+"=127.0.0.1:"+PortAssignment.unique()+":"+PortAssignment.unique()+"\n");
+        }
+
+        // start the participants
+        String quorumCfgSection = sb.toString();
+        QuorumPeerTestBase.MainThread mt[] = new QuorumPeerTestBase.MainThread[SERVER_COUNT];
+        for(int i = 1; i < SERVER_COUNT; i++) {
+            mt[i] = new QuorumPeerTestBase.MainThread(i, clientPorts[i], quorumCfgSection);
+            mt[i].start();
+        }
+
+        // start the observer
+        Map<String, String> observerConfig = new HashMap<>();
+        observerConfig.put("peerType", "observer");
+        mt[observer] = new QuorumPeerTestBase.MainThread(observer, clientPorts[observer], quorumCfgSection, observerConfig);
+        mt[observer].start();
+
+        ZooKeeper zk = new ZooKeeper("127.0.0.1:" + clientPorts[1], ClientBase.CONNECTION_TIMEOUT, this);
+
+        waitForOne(zk, ZooKeeper.States.CONNECTED);
+
+        // send one create request
+        zk.create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
+
+        Thread.sleep(200);
+
+        Map<String, Object> values = MetricsUtils.currentServerMetrics();
+        // there are 4 followers, each received two proposals, one for leader election, one for the create request
+        Assert.assertEquals(8L, values.get("learner_proposal_received_count"));
+        Assert.assertEquals(8L, values.get("cnt_proposal_latency"));
+        Assert.assertThat((long)values.get("min_proposal_latency"), greaterThan(0L));
+        Assert.assertEquals(8L, values.get("cnt_proposal_ack_creation_latency"));
+        Assert.assertThat((long)values.get("min_proposal_ack_creation_latency"), greaterThan(0L));
+
+        // there are five learners, each received two commits, one for leader election, one for the create request
+        Assert.assertEquals(10L, values.get("learner_commit_received_count"));
+        Assert.assertEquals(10L, values.get("cnt_commit_propagation_latency"));
+        Assert.assertThat((long)values.get("min_commit_propagation_latency"), greaterThan(0L));
+    }
+}