浏览代码

HDFS-16394.RPCMetrics increases the number of handlers in processing. (#3822)

jianghuazhu 3 年之前
父节点
当前提交
43afd1753a

+ 7 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -498,6 +498,7 @@ public abstract class Server {
   private Map<Integer, Listener> auxiliaryListenerMap;
   private Responder responder = null;
   private Handler[] handlers = null;
+  private final AtomicInteger numInProcessHandler = new AtomicInteger();
 
   private boolean logSlowRPC = false;
 
@@ -509,6 +510,10 @@ public abstract class Server {
     return logSlowRPC;
   }
 
+  public int getNumInProcessHandler() {
+    return numInProcessHandler.get();
+  }
+
   /**
    * Sets slow RPC flag.
    * @param logSlowRPCFlag
@@ -3080,6 +3085,7 @@ public abstract class Server {
 
         try {
           call = callQueue.take(); // pop the queue; maybe blocked here
+          numInProcessHandler.incrementAndGet();
           startTimeNanos = Time.monotonicNowNanos();
           if (alignmentContext != null && call.isCallCoordinated() &&
               call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
@@ -3133,6 +3139,7 @@ public abstract class Server {
           }
         } finally {
           CurCall.set(null);
+          numInProcessHandler.decrementAndGet();
           IOUtils.cleanupWithLogger(LOG, traceScope);
           if (call != null) {
             updateMetrics(call, startTimeNanos, connDropped);

+ 6 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

@@ -133,6 +133,11 @@ public class RpcMetrics {
     return server.getNumOpenConnections();
   }
 
+  @Metric("Number of in process handlers")
+  public int getNumInProcessHandler() {
+    return server.getNumInProcessHandler();
+  }
+
   @Metric("Number of open connections per user")
   public String numOpenConnectionsPerUser() {
     return server.getNumOpenConnectionsPerUser();
@@ -288,6 +293,7 @@ public class RpcMetrics {
   public  void incrSlowRpc() {
     rpcSlowCalls.incr();
   }
+
   /**
    * Returns a MutableRate Counter.
    * @return Mutable Rate

+ 1 - 0
hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md

@@ -83,6 +83,7 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
 | `RpcAuthorizationFailures` | Total number of authorization failures |
 | `RpcAuthorizationSuccesses` | Total number of authorization successes |
 | `NumOpenConnections` | Current number of open connections |
+| `NumInProcessHandler` | Current number of handlers on working |
 | `CallQueueLength` | Current length of the call queue |
 | `numDroppedConnections` | Total number of dropped connections |
 | `rpcQueueTime`*num*`sNumOps` | Shows total number of RPC calls (*num* seconds granularity) if `rpc.metrics.quantile.enable` is set to true. *num* is specified by `rpc.metrics.percentiles.intervals`. |

+ 32 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ipc;
 
+import org.apache.hadoop.ipc.metrics.RpcMetrics;
 import org.apache.hadoop.thirdparty.protobuf.ServiceException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
@@ -1107,6 +1108,37 @@ public class TestRPC extends TestRpcBase {
     }
   }
 
+  @Test
+  public void testNumInProcessHandlerMetrics() throws Exception {
+    UserGroupInformation ugi = UserGroupInformation.
+        createUserForTesting("user123", new String[0]);
+    // use 1 handler so the callq can be plugged
+    final Server server = setupTestServer(conf, 1);
+    try {
+      RpcMetrics rpcMetrics = server.getRpcMetrics();
+      assertEquals(0, rpcMetrics.getNumInProcessHandler());
+
+      ExternalCall<String> call1 = newExtCall(ugi, () -> {
+        assertEquals(1, rpcMetrics.getNumInProcessHandler());
+        return UserGroupInformation.getCurrentUser().getUserName();
+      });
+      ExternalCall<Void> call2 = newExtCall(ugi, () -> {
+        assertEquals(1, rpcMetrics.getNumInProcessHandler());
+        return null;
+      });
+
+      server.queueCall(call1);
+      server.queueCall(call2);
+
+      // Wait for call1 and call2 to enter the handler.
+      call1.get();
+      call2.get();
+      assertEquals(0, rpcMetrics.getNumInProcessHandler());
+    } finally {
+      server.stop();
+    }
+  }
+
   /**
    *  Test RPC backoff by queue full.
    */