Browse Source

HADOOP-19362. RPC metrics should be updated correctly when call is defered. (#7224). Contributed by hfutatzhanghb.

Reviewed-by: Jian Zhang <keepromise@apache.org>
Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
hfutatzhanghb 4 tháng trước cách đây
mục cha
commit
f32a937511

+ 18 - 8
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -26,6 +26,7 @@ import com.google.protobuf.TextFormat;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicy;
@@ -34,7 +35,6 @@ import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.tracing.TraceScope;
 import org.apache.hadoop.tracing.Tracer;
 import org.apache.hadoop.util.Time;
@@ -393,28 +393,38 @@ public class ProtobufRpcEngine implements RpcEngine {
       private final RPC.Server server;
       private final Call call;
       private final String methodName;
-      private final long setupTime;
 
       public ProtobufRpcEngineCallbackImpl() {
         this.server = CURRENT_CALL_INFO.get().getServer();
         this.call = Server.getCurCall().get();
         this.methodName = CURRENT_CALL_INFO.get().getMethodName();
-        this.setupTime = Time.now();
+      }
+
+      private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
+        ProcessingDetails details = rpcCall.getProcessingDetails();
+        rpcCall.getProcessingDetails().set(ProcessingDetails.Timing.PROCESSING, deltaNanos,
+            TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(ProcessingDetails.Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(ProcessingDetails.Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(ProcessingDetails.Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
+        details.set(ProcessingDetails.Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
       }
 
       @Override
       public void setResponse(Message message) {
-        long processingTime = Time.now() - setupTime;
+        long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
+        updateProcessingDetails(call, deltaNanos);
         call.setDeferredResponse(RpcWritable.wrap(message));
-        server.updateDeferredMetrics(methodName, processingTime);
+        server.updateDeferredMetrics(call, methodName, deltaNanos);
       }
 
       @Override
       public void error(Throwable t) {
-        long processingTime = Time.now() - setupTime;
-        String detailedMetricsName = t.getClass().getSimpleName();
-        server.updateDeferredMetrics(detailedMetricsName, processingTime);
+        long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
+        updateProcessingDetails(call, deltaNanos);
         call.setDeferredError(t);
+        String detailedMetricsName = t.getClass().getSimpleName();
+        server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
       }
     }
 

+ 17 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.Client.ConnectionId;
+import org.apache.hadoop.ipc.ProcessingDetails.Timing;
 import org.apache.hadoop.ipc.RPC.RpcInvoker;
 import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngine2Protos.RequestHeaderProto;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -425,28 +426,37 @@ public class ProtobufRpcEngine2 implements RpcEngine {
       private final RPC.Server server;
       private final Call call;
       private final String methodName;
-      private final long setupTime;
 
       ProtobufRpcEngineCallbackImpl() {
         this.server = CURRENT_CALL_INFO.get().getServer();
         this.call = Server.getCurCall().get();
         this.methodName = CURRENT_CALL_INFO.get().getMethodName();
-        this.setupTime = Time.now();
+      }
+
+      private void updateProcessingDetails(Call rpcCall, long deltaNanos) {
+        ProcessingDetails details = rpcCall.getProcessingDetails();
+        rpcCall.getProcessingDetails().set(Timing.PROCESSING, deltaNanos, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKWAIT, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKSHARED, TimeUnit.NANOSECONDS);
+        deltaNanos -= details.get(Timing.LOCKEXCLUSIVE, TimeUnit.NANOSECONDS);
+        details.set(Timing.LOCKFREE, deltaNanos, TimeUnit.NANOSECONDS);
       }
 
       @Override
       public void setResponse(Message message) {
-        long processingTime = Time.now() - setupTime;
+        long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
+        updateProcessingDetails(call, deltaNanos);
         call.setDeferredResponse(RpcWritable.wrap(message));
-        server.updateDeferredMetrics(methodName, processingTime);
+        server.updateDeferredMetrics(call, methodName, deltaNanos);
       }
 
       @Override
       public void error(Throwable t) {
-        long processingTime = Time.now() - setupTime;
-        String detailedMetricsName = t.getClass().getSimpleName();
-        server.updateDeferredMetrics(detailedMetricsName, processingTime);
+        long deltaNanos = Time.monotonicNowNanos() - call.getStartHandleTimestampNanos();
+        updateProcessingDetails(call, deltaNanos);
         call.setDeferredError(t);
+        String detailedMetricsName = t.getClass().getSimpleName();
+        server.updateDeferredMetrics(call, detailedMetricsName, deltaNanos);
       }
     }
 

+ 51 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -351,13 +351,13 @@ public abstract class Server {
    * after the call returns.
    */
   private static final ThreadLocal<Call> CurCall = new ThreadLocal<Call>();
-  
+
   /** @return Get the current call. */
   @VisibleForTesting
   public static ThreadLocal<Call> getCurCall() {
     return CurCall;
   }
-  
+
   /**
    * Returns the currently active RPC call's sequential ID number.  A negative
    * call ID indicates an invalid value, such as if there is no currently active
@@ -638,7 +638,8 @@ public abstract class Server {
     rpcMetrics.addRpcQueueTime(queueTime);
 
     if (call.isResponseDeferred() || connDropped) {
-      // call was skipped; don't include it in processing metrics
+      // The call was skipped; don't include it in processing metrics.
+      // Will update metrics in method updateDeferredMetrics.
       return;
     }
 
@@ -668,9 +669,41 @@ public abstract class Server {
     }
   }
 
-  void updateDeferredMetrics(String name, long processingTime) {
+  /**
+   * Update rpc metrics for defered calls.
+   * @param call The Rpc Call
+   * @param name Rpc method name
+   * @param processingTime processing call in ms unit.
+   */
+  void updateDeferredMetrics(Call call, String name, long processingTime) {
+    long completionTimeNanos = Time.monotonicNowNanos();
+    long arrivalTimeNanos = call.timestampNanos;
+
+    ProcessingDetails details = call.getProcessingDetails();
+    long waitTime =
+        details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
+    long responseTime =
+        details.get(Timing.RESPONSE, rpcMetrics.getMetricsTimeUnit());
+    rpcMetrics.addRpcLockWaitTime(waitTime);
+    rpcMetrics.addRpcProcessingTime(processingTime);
+    rpcMetrics.addRpcResponseTime(responseTime);
     rpcMetrics.addDeferredRpcProcessingTime(processingTime);
     rpcDetailedMetrics.addDeferredProcessingTime(name, processingTime);
+    // don't include lock wait for detailed metrics.
+    processingTime -= waitTime;
+    rpcDetailedMetrics.addProcessingTime(name, processingTime);
+
+    // Overall processing time is from arrival to completion.
+    long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
+        .convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
+    rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
+    callQueue.addResponseTime(name, call, details);
+    if (isLogSlowRPC()) {
+      logSlowRpcCalls(name, call, details);
+    }
+    if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
+      rpcMetrics.incrRpcCallSuccesses();
+    }
   }
 
   /**
@@ -963,6 +996,7 @@ public abstract class Server {
     final int callId;            // the client's call id
     final int retryCount;        // the retry count of the call
     private final long timestampNanos; // time the call was received
+    protected long startHandleTimestampNanos; // time the call was run
     long responseTimestampNanos; // time the call was served
     private AtomicInteger responseWaitCount = new AtomicInteger(1);
     final RPC.RpcKind rpcKind;
@@ -1167,6 +1201,15 @@ public abstract class Server {
     public long getTimestampNanos() {
       return timestampNanos;
     }
+
+
+    public long getStartHandleTimestampNanos() {
+      return startHandleTimestampNanos;
+    }
+
+    public void setStartHandleTimestampNanos(long startHandleTimestampNanos) {
+      this.startHandleTimestampNanos = startHandleTimestampNanos;
+    }
   }
 
   /** A RPC extended call queued for handling. */
@@ -1243,6 +1286,7 @@ public abstract class Server {
       }
 
       long startNanos = Time.monotonicNowNanos();
+      this.setStartHandleTimestampNanos(startNanos);
       Writable value = null;
       ResponseParams responseParams = new ResponseParams();
 
@@ -1331,6 +1375,7 @@ public abstract class Server {
      * Send a deferred response, ignoring errors.
      */
     private void sendDeferedResponse() {
+      long startNanos = Time.monotonicNowNanos();
       try {
         connection.sendResponse(this);
       } catch (Exception e) {
@@ -1342,6 +1387,8 @@ public abstract class Server {
             .currentThread().getName() + ", CallId="
             + callId + ", hostname=" + getHostAddress());
       }
+      getProcessingDetails().set(Timing.RESPONSE,
+          Time.monotonicNowNanos() - startNanos, TimeUnit.NANOSECONDS);
     }
 
     @Override

+ 53 - 5
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpcServerHandoff.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
@@ -26,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.thirdparty.protobuf.BlockingService;
 import org.apache.hadoop.thirdparty.protobuf.RpcController;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
@@ -33,18 +35,27 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.protobuf.TestProtos;
 import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos.TestProtobufRpcHandoffProto;
 import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
 public class TestProtoBufRpcServerHandoff {
 
   public static final Logger LOG =
       LoggerFactory.getLogger(TestProtoBufRpcServerHandoff.class);
 
-  @Test(timeout = 20000)
-  public void test() throws Exception {
-    Configuration conf = new Configuration();
+  private static Configuration conf = null;
+  private static RPC.Server server = null;
+  private static InetSocketAddress address = null;
+
+  @Before
+  public void setUp() throws IOException {
+    conf = new Configuration();
 
     TestProtoBufRpcServerHandoffServer serverImpl =
         new TestProtoBufRpcServerHandoffServer();
@@ -53,7 +64,7 @@ public class TestProtoBufRpcServerHandoff {
 
     RPC.setProtocolEngine(conf, TestProtoBufRpcServerHandoffProtocol.class,
         ProtobufRpcEngine2.class);
-    RPC.Server server = new RPC.Builder(conf)
+    server = new RPC.Builder(conf)
         .setProtocol(TestProtoBufRpcServerHandoffProtocol.class)
         .setInstance(blockingService)
         .setVerbose(true)
@@ -61,10 +72,13 @@ public class TestProtoBufRpcServerHandoff {
         .build();
     server.start();
 
-    InetSocketAddress address = server.getListenerAddress();
+    address = server.getListenerAddress();
     long serverStartTime = System.currentTimeMillis();
     LOG.info("Server started at: " + address + " at time: " + serverStartTime);
+  }
 
+  @Test(timeout = 20000)
+  public void test() throws Exception {
     final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
         TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);
 
@@ -93,6 +107,40 @@ public class TestProtoBufRpcServerHandoff {
 
   }
 
+  @Test(timeout = 20000)
+  public void testHandoffMetrics() throws Exception {
+    final TestProtoBufRpcServerHandoffProtocol client = RPC.getProxy(
+        TestProtoBufRpcServerHandoffProtocol.class, 1, address, conf);
+
+    ExecutorService executorService = Executors.newFixedThreadPool(2);
+    CompletionService<ClientInvocationCallable> completionService =
+        new ExecutorCompletionService<ClientInvocationCallable>(
+            executorService);
+
+    completionService.submit(new ClientInvocationCallable(client, 5000L));
+    completionService.submit(new ClientInvocationCallable(client, 5000L));
+
+    long submitTime = System.currentTimeMillis();
+    Future<ClientInvocationCallable> future1 = completionService.take();
+    Future<ClientInvocationCallable> future2 = completionService.take();
+
+    ClientInvocationCallable callable1 = future1.get();
+    ClientInvocationCallable callable2 = future2.get();
+
+    LOG.info(callable1.toString());
+    LOG.info(callable2.toString());
+
+    // Ensure the 5 second sleep responses are within a reasonable time of each
+    // other.
+    Assert.assertTrue(Math.abs(callable1.endTime - callable2.endTime) < 2000L);
+    Assert.assertTrue(System.currentTimeMillis() - submitTime < 7000L);
+
+    // Check rpcMetrics
+    MetricsRecordBuilder rb = getMetrics(server.rpcMetrics.name());
+    assertCounterGt("DeferredRpcProcessingTimeNumOps", 1L, rb);
+    assertCounter("RpcProcessingTimeNumOps", 2L, rb);
+  }
+
   private static class ClientInvocationCallable
       implements Callable<ClientInvocationCallable> {
     final TestProtoBufRpcServerHandoffProtocol client;