Przeglądaj źródła

HADOOP-19521. Fix time unit mismatch in method updateDeferredMetrics. (#7571). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 3 tygodni temu
rodzic
commit
1cfcdca17e

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -415,7 +415,7 @@ public class ProtobufRpcEngine implements RpcEngine {
         long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
         updateProcessingDetails(call, deltaNanos);
         call.setDeferredResponse(RpcWritable.wrap(message));
-        server.updateDeferredMetrics(call, methodName, deltaNanos);
+        server.updateDeferredMetrics(call, methodName);
       }
 
       @Override
@@ -424,7 +424,7 @@ public class ProtobufRpcEngine implements RpcEngine {
         updateProcessingDetails(call, deltaNanos);
         call.setDeferredError(t);
         String detailedMetricsName = t.getClass().getSimpleName();
-        server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
+        server.updateDeferredMetrics(call, detailedMetricsName);
       }
     }
 

+ 2 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

@@ -447,7 +447,7 @@ public class ProtobufRpcEngine2 implements RpcEngine {
         long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
         updateProcessingDetails(call, deltaNanos);
         call.setDeferredResponse(RpcWritable.wrap(message));
-        server.updateDeferredMetrics(call, methodName, deltaNanos);
+        server.updateDeferredMetrics(call, methodName);
       }
 
       @Override
@@ -456,7 +456,7 @@ public class ProtobufRpcEngine2 implements RpcEngine {
         updateProcessingDetails(call, deltaNanos);
         call.setDeferredError(t);
         String detailedMetricsName = t.getClass().getSimpleName();
-        server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
+        server.updateDeferredMetrics(call, detailedMetricsName);
       }
     }
 

+ 3 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -673,9 +673,8 @@ public abstract class Server {
    * Update rpc metrics for defered calls.
    * @param call The Rpc Call
    * @param name Rpc method name
-   * @param processingTime processing call in ms unit.
    */
-  void updateDeferredMetrics(Call call, String name, long processingTime) {
+  void updateDeferredMetrics(Call call, String name) {
     long completionTimeNanos = Time.monotonicNowNanos();
     long arrivalTimeNanos = call.timestampNanos;
 
@@ -684,6 +683,8 @@ public abstract class Server {
         details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
     long responseTime =
         details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
+    long processingTime =
+        details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
     rpcMetrics.addRpcLockWaitTime(waitTime);
     rpcMetrics.addRpcProcessingTime(processingTime);
     rpcMetrics.addRpcResponseTime(responseTime);

+ 21 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java

@@ -43,6 +43,7 @@ import org.slf4j.LoggerFactory;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class TestProtoBufRpcServerHandoff {
@@ -144,6 +145,26 @@ public class TestProtoBufRpcServerHandoff {
     assertCounter("RpcProcessingTimeNumOps", 2L, rb);
   }
 
+  @Test
+  public void testUpdateDeferredMetrics() throws Exception {
+    final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
+        TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);
+
+    ExecutorService executorService = Executors.newFixedThreadPool(1);
+    CompletionService<ClientInvocationCallable> completionService =
+        new ExecutorCompletionService<ClientInvocationCallable>(
+            executorService);
+
+    completionService.submit(new ClientInvocationCallable(client, 2000L));
+    Future<ClientInvocationCallable> future1 = completionService.take();
+    ClientInvocationCallable callable1 = future1.get();
+
+    double deferredProcessingTime = server.getRpcMetrics().getDeferredRpcProcessingTime()
+        .lastStat().max();
+    double processingTime = server.getRpcMetrics().getRpcProcessingTime().lastStat().max();
+    assertEquals(deferredProcessingTime, processingTime);
+  }
+
   private static class ClientInvocationCallable
       implements Callable<ClientInvocationCallable> {
     final TestProtoBufRpcServerHandoffProtocol client;