Procházet zdrojové kódy

HADOOP-9420. Add percentile or max metric for rpcQueueTime, processing time. Contributed by Liang Xie.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1556984 13f79535-47bb-0310-9956-ffa450edef68
Andrew Wang před 11 roky
rodič
revize
a2af7a32f1

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -118,6 +118,9 @@ Release 2.4.0 - UNRELEASED
     HADOOP-10208. Remove duplicate initialization in StringUtils.getStringCollection.
     (Benoy Antony via jing9)
 
+    HADOOP-9420. Add percentile or max metric for rpcQueueTime, processing time.
+    (Liang Xie via wang)
+
   OPTIMIZATIONS
 
     HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -242,4 +242,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
 
   public static final String HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS =
     "hadoop.user.group.metrics.percentiles.intervals";
+
+  public static final String RPC_METRICS_QUANTILE_ENABLE =
+      "rpc.metrics.quantile.enable";
+  public static final String  RPC_METRICS_PERCENTILES_INTERVALS_KEY =
+      "rpc.metrics.percentiles.intervals";
 }

+ 1 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -2107,7 +2107,7 @@ public abstract class Server {
     listener = new Listener();
     this.port = listener.getAddress().getPort();    
     connectionManager = new ConnectionManager();
-    this.rpcMetrics = RpcMetrics.create(this);
+    this.rpcMetrics = RpcMetrics.create(this, conf);
     this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
     this.tcpNoDelay = conf.getBoolean(
         CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,

+ 40 - 5
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

@@ -19,14 +19,17 @@ package org.apache.hadoop.ipc.metrics;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.MetricsRegistry;
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
 import org.apache.hadoop.metrics2.lib.MutableRate;
 
 /**
@@ -41,26 +44,48 @@ public class RpcMetrics {
   final Server server;
   final MetricsRegistry registry;
   final String name;
+  final boolean rpcQuantileEnable;
   
-  RpcMetrics(Server server) {
+  RpcMetrics(Server server, Configuration conf) {
     String port = String.valueOf(server.getListenerAddress().getPort());
-    name = "RpcActivityForPort"+ port;
+    name = "RpcActivityForPort" + port;
     this.server = server;
     registry = new MetricsRegistry("rpc").tag("port", "RPC port", port);
-    LOG.debug("Initialized "+ registry);
+    int[] intervals = conf.getInts(
+        CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY);
+    rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
+        CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE, false);
+    if (rpcQuantileEnable) {
+      rpcQueueTimeMillisQuantiles =
+          new MutableQuantiles[intervals.length];
+      rpcProcessingTimeMillisQuantiles =
+          new MutableQuantiles[intervals.length];
+      for (int i = 0; i < intervals.length; i++) {
+        int interval = intervals[i];
+        rpcQueueTimeMillisQuantiles[i] = registry.newQuantiles("rpcQueueTime"
+            + interval + "s", "rpc queue time in milli second", "ops",
+            "latency", interval);
+        rpcProcessingTimeMillisQuantiles[i] = registry.newQuantiles(
+            "rpcProcessingTime" + interval + "s",
+            "rpc processing time in milli second", "ops", "latency", interval);
+      }
+    }
+    LOG.debug("Initialized " + registry);
   }
 
   public String name() { return name; }
 
-  public static RpcMetrics create(Server server) {
-    RpcMetrics m = new RpcMetrics(server);
+  public static RpcMetrics create(Server server, Configuration conf) {
+    RpcMetrics m = new RpcMetrics(server, conf);
     return DefaultMetricsSystem.instance().register(m.name, null, m);
   }
 
   @Metric("Number of received bytes") MutableCounterLong receivedBytes;
   @Metric("Number of sent bytes") MutableCounterLong sentBytes;
   @Metric("Queue time") MutableRate rpcQueueTime;
+  MutableQuantiles[] rpcQueueTimeMillisQuantiles;
   @Metric("Processsing time") MutableRate rpcProcessingTime;
+  MutableQuantiles[] rpcProcessingTimeMillisQuantiles;
   @Metric("Number of authentication failures")
   MutableCounterInt rpcAuthenticationFailures;
   @Metric("Number of authentication successes")
@@ -146,6 +171,11 @@ public class RpcMetrics {
   //@Override
   public void addRpcQueueTime(int qTime) {
     rpcQueueTime.add(qTime);
+    if (rpcQuantileEnable) {
+      for (MutableQuantiles q : rpcQueueTimeMillisQuantiles) {
+        q.add(qTime);
+      }
+    }
   }
 
   /**
@@ -155,5 +185,10 @@ public class RpcMetrics {
   //@Override
   public void addRpcProcessingTime(int processingTime) {
     rpcProcessingTime.add(processingTime);
+    if (rpcQuantileEnable) {
+      for (MutableQuantiles q : rpcProcessingTimeMillisQuantiles) {
+        q.add(processingTime);
+      }
+    }
   }
 }

+ 40 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.ipc;
 
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotSame;
@@ -67,6 +68,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.Service;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.test.MetricsAsserts;
 import org.apache.hadoop.test.MockitoUtil;
 import org.junit.Before;
 import org.junit.Test;
@@ -957,6 +959,44 @@ public class TestRPC {
     }
   }
 
+  @Test
+  public void testRpcMetrics() throws Exception {
+    Configuration configuration = new Configuration();
+    final int interval = 1;
+    configuration.setBoolean(CommonConfigurationKeys.
+        RPC_METRICS_QUANTILE_ENABLE, true);
+    configuration.set(CommonConfigurationKeys.
+        RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
+    final Server server = new RPC.Builder(configuration)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+        .build();
+    server.start();
+    final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
+        TestProtocol.versionID, server.getListenerAddress(), configuration);
+    try {
+      for (int i=0; i<1000; i++) {
+        proxy.ping();
+        proxy.echo("" + i);
+      }
+      MetricsRecordBuilder rpcMetrics =
+          getMetrics(server.getRpcMetrics().name());
+      assertTrue("Expected non-zero rpc queue time",
+          getLongCounter("RpcQueueTimeNumOps", rpcMetrics) > 0);
+      assertTrue("Expected non-zero rpc processing time",
+          getLongCounter("RpcProcessingTimeNumOps", rpcMetrics) > 0);
+      MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
+          rpcMetrics);
+      MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
+          rpcMetrics);
+    } finally {
+      if (proxy != null) {
+        RPC.stopProxy(proxy);
+      }
+      server.stop();
+    }
+  }
+
   public static void main(String[] args) throws IOException {
     new TestRPC().testCallsInternal(conf);