Browse Source

HADOOP-18840. Add enQueue time to RpcMetrics (#5926). Contributed by Liangjun He.

Reviewed-by: Shilun Fan <slfan1989@apache.org>
Reviewed-by: Xing Lin <linxingnku@gmail.com>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Liangjun He 1 year ago
parent
commit
b6edcb9a84

+ 3 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -616,6 +616,9 @@ public abstract class Server {
     deltaNanos -= details.get(Timing.RESPONSE);
     details.set(Timing.HANDLER, deltaNanos);
 
+    long enQueueTime = details.get(Timing.ENQUEUE, rpcMetrics.getMetricsTimeUnit());
+    rpcMetrics.addRpcEnQueueTime(enQueueTime);
+
     long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit());
     rpcMetrics.addRpcQueueTime(queueTime);
 

+ 24 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

@@ -69,6 +69,8 @@ public class RpcMetrics {
         CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
     metricsTimeUnit = getMetricsTimeUnit(conf);
     if (rpcQuantileEnable) {
+      rpcEnQueueTimeQuantiles =
+          new MutableQuantiles[intervals.length];
       rpcQueueTimeQuantiles =
           new MutableQuantiles[intervals.length];
       rpcLockWaitTimeQuantiles =
@@ -81,6 +83,9 @@ public class RpcMetrics {
           new MutableQuantiles[intervals.length];
       for (int i = 0; i < intervals.length; i++) {
         int interval = intervals[i];
+        rpcEnQueueTimeQuantiles[i] = registry.newQuantiles("rpcEnQueueTime"
+            + interval + "s", "rpc enqueue time in " + metricsTimeUnit, "ops",
+            "latency", interval);
         rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
             + interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
             "latency", interval);
@@ -114,6 +119,8 @@ public class RpcMetrics {
 
   @Metric("Number of received bytes") MutableCounterLong receivedBytes;
   @Metric("Number of sent bytes") MutableCounterLong sentBytes;
+  @Metric("EQueue time") MutableRate rpcEnQueueTime;
+  MutableQuantiles[] rpcEnQueueTimeQuantiles;
   @Metric("Queue time") MutableRate rpcQueueTime;
   MutableQuantiles[] rpcQueueTimeQuantiles;
   @Metric("Lock wait time") MutableRate rpcLockWaitTime;
@@ -257,6 +264,23 @@ public class RpcMetrics {
     receivedBytes.incr(count);
   }
 
+  /**
+   * Sometimes, the request time observed by the client is much longer than
+   * the queue + process time on the RPC server.Perhaps the RPC request
+   * 'waiting enQueue' took too long on the RPC server, so we should add
+   * enQueue time to RpcMetrics. See HADOOP-18840 for details.
+   * Add an RPC enqueue time sample
+   * @param enQTime the queue time
+   */
+  public void addRpcEnQueueTime(long enQTime) {
+    rpcEnQueueTime.add(enQTime);
+    if (rpcQuantileEnable) {
+      for (MutableQuantiles q : rpcEnQueueTimeQuantiles) {
+        q.add(enQTime);
+      }
+    }
+  }
+
   /**
    * Add an RPC queue time sample
    * @param qTime the queue time

+ 10 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -1334,6 +1334,8 @@ public class TestRPC extends TestRpcBase {
       }
       MetricsRecordBuilder rpcMetrics =
           getMetrics(server.getRpcMetrics().name());
+      assertEquals("Expected correct rpc en queue count",
+          3000, getLongCounter("RpcEnQueueTimeNumOps", rpcMetrics));
       assertEquals("Expected correct rpc queue count",
           3000, getLongCounter("RpcQueueTimeNumOps", rpcMetrics));
       assertEquals("Expected correct rpc processing count",
@@ -1344,6 +1346,8 @@ public class TestRPC extends TestRpcBase {
           3000, getLongCounter("RpcResponseTimeNumOps", rpcMetrics));
       assertEquals("Expected zero rpc lock wait time",
           0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
+      MetricsAsserts.assertQuantileGauges("RpcEnQueueTime" + interval + "s",
+          rpcMetrics);
       MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
           rpcMetrics);
       MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
@@ -2007,6 +2011,8 @@ public class TestRPC extends TestRpcBase {
           getMetrics(server.getRpcMetrics().name());
       assertEquals("Expected zero rpc lock wait time",
           0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
+      MetricsAsserts.assertQuantileGauges("RpcEnQueueTime" + interval + "s",
+          rpcMetrics);
       MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
           rpcMetrics);
       MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
@@ -2017,12 +2023,15 @@ public class TestRPC extends TestRpcBase {
       assertGauge("RpcLockWaitTimeAvgTime",
           (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
               TimeUnit.SECONDS)), rpcMetrics);
-      LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
+      LOG.info("RpcProcessingTimeAvgTime: {} , RpcEnQueueTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
           getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
+          getDoubleGauge("RpcEnQueueTimeAvgTime", rpcMetrics),
           getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));
 
       assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
           > 4000000D);
+      assertTrue(getDoubleGauge("RpcEnQueueTimeAvgTime", rpcMetrics)
+          > 4000D);
       assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
           > 4000D);
     } finally {