Browse Source

HDFS-14084. Need for more stats in DFSClient. Contributed by Pranay Singh.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
(cherry picked from commit ecdeaa7e6ad43555031aed032e6ba7a14a17d7bc)
(cherry picked from commit 1f39eae7e6f59206b86f96063ffb2ebe15a9cbe1)
Pranay Singh 6 years ago
parent
commit
e8e55839a0

+ 25 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
+import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
 import org.apache.hadoop.ipc.RPC.RpcKind;
 import org.apache.hadoop.ipc.Server.AuthProtocol;
 import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@@ -86,6 +87,7 @@ import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID;
 public class Client implements AutoCloseable {
   
   public static final Logger LOG = LoggerFactory.getLogger(Client.class);
+  private final RpcDetailedMetrics rpcDetailedMetrics;
 
   /** A counter for generating call IDs. */
   private static final AtomicInteger callIdCounter = new AtomicInteger();
@@ -208,6 +210,24 @@ public class Client implements AutoCloseable {
     }
   };
   
+  /**
+   * Update a particular metric by recording the processing
+   * time of the metric.
+   *
+   * @param name Metric name
+   * @param processingTime time spent in processing the metric.
+   */
+  public void updateMetrics(String name, long processingTime) {
+    rpcDetailedMetrics.addProcessingTime(name, processingTime);
+  }
+
+  /**
+   * Get the RpcDetailedMetrics associated with the Client.
+   */
+  public RpcDetailedMetrics getRpcDetailedMetrics() {
+    return rpcDetailedMetrics;
+  }
+
   /**
    * set the ping interval value in configuration
    * 
@@ -1301,6 +1321,11 @@ public class Client implements AutoCloseable {
     this.maxAsyncCalls = conf.getInt(
         CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
         CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
+    /**
+     * Create with port of -1, dummy port since the function
+     * takes default argument.
+     */
+    this.rpcDetailedMetrics = RpcDetailedMetrics.create(-1);
   }
 
   /**

+ 13 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java

@@ -49,6 +49,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.metrics2.MetricStringBuilder;
+import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
 
 /**
  * RPC Engine for for protobuf based RPCs.
@@ -190,7 +192,7 @@ public class ProtobufRpcEngine implements RpcEngine {
         throws ServiceException {
       long startTime = 0;
       if (LOG.isDebugEnabled()) {
-        startTime = Time.now();
+        startTime = System.currentTimeMillis();
       }
       
       if (args.length != 2) { // RpcController + Message
@@ -245,8 +247,16 @@ public class ProtobufRpcEngine implements RpcEngine {
       }
 
       if (LOG.isDebugEnabled()) {
-        long callTime = Time.now() - startTime;
-        LOG.debug("Call: " + method.getName() + " took " + callTime + "ms");
+        long callTime = System.currentTimeMillis() - startTime;
+        if (callTime > 0) {
+          MetricStringBuilder rb =
+              new MetricStringBuilder(null, "", " = ", "\n");
+          client.updateMetrics(method.getName(), callTime);
+          MutableRatesWithAggregation rates =
+              client.getRpcDetailedMetrics().getMutableRates();
+          rates.snapshot(rb, true);
+          LOG.debug("RPC Client stats: {}", rb);
+        }
       }
       
       if (Client.isAsynchronousMode()) {

+ 8 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java

@@ -70,12 +70,16 @@ public class RpcDetailedMetrics {
    * @param processingTime  the processing time
    */
   //@Override // some instrumentation interface
-  public void addProcessingTime(String name, int processingTime) {
-    rates.add(name, processingTime);
+  public void addProcessingTime(String metName, long processingTime) {
+    rates.add(metName, processingTime);
   }
 
-  public void addDeferredProcessingTime(String name, long processingTime) {
-    deferredRpcRates.add(name, processingTime);
+  public void addDeferredProcessingTime(String metName, long processingTime) {
+    deferredRpcRates.add(metName, processingTime);
+  }
+
+  public MutableRatesWithAggregation getMutableRates() {
+    return rates;
   }
 
   /**