Browse Source

HADOOP-17127. Use RpcMetrics.TIMEUNIT to initialize rpc queueTime and processingTime. Contributed by Jim Brennan.

Erik Krogen 4 years ago
parent
commit
317fe4584a

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -43,6 +43,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.metrics.DecayRpcSchedulerDetailedMetrics;
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
@@ -632,8 +633,8 @@ public class DecayRpcScheduler implements RpcScheduler,
     addCost(user, processingCost);
 
     int priorityLevel = schedulable.getPriorityLevel();
-    long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
-    long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
+    long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
+    long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
 
     this.decayRpcSchedulerDetailedMetrics.addQueueTime(
         priorityLevel, queueTime);

+ 6 - 6
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
-import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
 
 /**
  * Implement this interface to be used for RPC scheduling and backoff.
@@ -62,12 +62,12 @@ public interface RpcScheduler {
     // this interface, a default implementation is supplied which uses the old
     // method. All new implementations MUST override this interface and should
     // NOT use the other addResponseTime method.
-    int queueTimeMs = (int)
-        details.get(ProcessingDetails.Timing.QUEUE, TimeUnit.MILLISECONDS);
-    int processingTimeMs = (int)
-        details.get(ProcessingDetails.Timing.PROCESSING, TimeUnit.MILLISECONDS);
+    int queueTime = (int)
+        details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
+    int processingTime = (int)
+        details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
     addResponseTime(callName, schedulable.getPriorityLevel(),
-        queueTimeMs, processingTimeMs);
+        queueTime, processingTime);
   }
 
   void stop();

+ 5 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
 import org.apache.hadoop.ipc.Server.Call;
 import org.apache.hadoop.ipc.Server.Connection;
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
 import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
 import org.apache.hadoop.ipc.protobuf.TestProtos;
@@ -81,6 +82,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -1095,7 +1097,9 @@ public class TestRPC extends TestRpcBase {
 
       proxy.lockAndSleep(null, newSleepRequest(5));
       rpcMetrics = getMetrics(server.getRpcMetrics().name());
-      assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics);
+      assertGauge("RpcLockWaitTimeAvgTime",
+          (double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
+          rpcMetrics);
     } finally {
       if (proxy2 != null) {
         RPC.stopProxy(proxy2);