瀏覽代碼

HADOOP-18920. RPC Metrics : Optimize logic for log slow RPCs (#6146)

huhaiyang 1 年之前
父節點
當前提交
f85ac5b60d

+ 4 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -504,6 +504,10 @@ public class CommonConfigurationKeysPublic {
                                                 "ipc.server.log.slow.rpc";
   public static final boolean IPC_SERVER_LOG_SLOW_RPC_DEFAULT = false;
 
+  public static final String IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY =
+      "ipc.server.log.slow.rpc.threshold.ms";
+  public static final long IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT = 0;
+
   public static final String IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY =
     "ipc.server.purge.interval";
   public static final int IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT = 15;

+ 35 - 17
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -516,16 +516,22 @@ public abstract class Server {
   private final long metricsUpdaterInterval;
   private final ScheduledExecutorService scheduledExecutorService;
 
-  private boolean logSlowRPC = false;
+  private volatile boolean logSlowRPC = false;
+  /** Threshold time for log slow rpc. */
+  private volatile long logSlowRPCThresholdTime;
 
   /**
    * Checks if LogSlowRPC is set true.
    * @return true, if LogSlowRPC is set true, false, otherwise.
    */
-  protected boolean isLogSlowRPC() {
+  public boolean isLogSlowRPC() {
     return logSlowRPC;
   }
 
+  public long getLogSlowRPCThresholdTime() {
+    return logSlowRPCThresholdTime;
+  }
+
   public int getNumInProcessHandler() {
     return numInProcessHandler.get();
   }
@@ -543,10 +549,16 @@ public abstract class Server {
    * @param logSlowRPCFlag input logSlowRPCFlag.
    */
   @VisibleForTesting
-  protected void setLogSlowRPC(boolean logSlowRPCFlag) {
+  public void setLogSlowRPC(boolean logSlowRPCFlag) {
     this.logSlowRPC = logSlowRPCFlag;
   }
 
+  @VisibleForTesting
+  public void setLogSlowRPCThresholdTime(long logSlowRPCThresholdMs) {
+    this.logSlowRPCThresholdTime = rpcMetrics.getMetricsTimeUnit().
+        convert(logSlowRPCThresholdMs, TimeUnit.MILLISECONDS);
+  }
+
   private void setPurgeIntervalNanos(int purgeInterval) {
     int tmpPurgeInterval = CommonConfigurationKeysPublic.
         IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT;
@@ -568,12 +580,15 @@ public abstract class Server {
    * @param methodName - RPC Request method name
    * @param details - Processing Detail.
    *
-   * if this request took too much time relative to other requests
-   * we consider that as a slow RPC. 3 is a magic number that comes
-   * from 3 sigma deviation. A very simple explanation can be found
-   * by searching for 68-95-99.7 rule. We flag an RPC as slow RPC
-   * if and only if it falls above 99.7% of requests. We start this logic
-   * only once we have enough sample size.
+   * If a request took significant more time than other requests,
+   * and its processing time is at least `logSlowRPCThresholdMs` we consider that as a slow RPC.
+   *
+   * The definition rules for calculating whether the current request took too much time
+   * compared to other requests are as follows:
+   * 3 is a magic number that comes from 3 sigma deviation.
+   * A very simple explanation can be found by searching for 68-95-99.7 rule.
+   * We flag an RPC as slow RPC if and only if it falls above 99.7% of requests.
+   * We start this logic only once we have enough sample size.
    */
   void logSlowRpcCalls(String methodName, Call call,
       ProcessingDetails details) {
@@ -587,15 +602,14 @@ public abstract class Server {
     final double threeSigma = rpcMetrics.getProcessingMean() +
         (rpcMetrics.getProcessingStdDev() * deviation);
 
-    long processingTime =
-            details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
+    final TimeUnit metricsTimeUnit = rpcMetrics.getMetricsTimeUnit();
+    long processingTime = details.get(Timing.PROCESSING, metricsTimeUnit);
     if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
-        (processingTime > threeSigma)) {
-      LOG.warn(
-          "Slow RPC : {} took {} {} to process from client {},"
-              + " the processing detail is {}",
-          methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
-          details.toString());
+        (processingTime > threeSigma) &&
+        (processingTime > getLogSlowRPCThresholdTime())) {
+      LOG.warn("Slow RPC : {} took {} {} to process from client {}, the processing detail is {}," +
+              " and the threshold time is {} {}.", methodName, processingTime, metricsTimeUnit,
+          call, details.toString(), getLogSlowRPCThresholdTime(), metricsTimeUnit);
       rpcMetrics.incrSlowRpc();
     }
   }
@@ -3359,6 +3373,10 @@ public abstract class Server {
         CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC,
         CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT));
 
+    this.setLogSlowRPCThresholdTime(conf.getLong(
+        CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
+        CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT));
+
     this.setPurgeIntervalNanos(conf.getInt(
         CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_KEY,
         CommonConfigurationKeysPublic.IPC_SERVER_PURGE_INTERVAL_MINUTES_DEFAULT));

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2526,6 +2526,15 @@ The switch to turn S3A auditing on or off.
     </description>
 </property>
 
+<property>
+  <name>ipc.server.log.slow.rpc.threshold.ms</name>
+  <value>0</value>
+  <description>The threshold in milliseconds for logging slow rpc when ipc.server.log.slow.rpc is enabled.
+    Besides of being much slower than other RPC requests, an RPC request has to take at least the threshold value
+    defined by this property before it can be considered as slow. By default, this threshold is set to 0 (disabled).
+  </description>
+</property>
+
 <property>
   <name>ipc.server.purge.interval</name>
   <value>15</value>

+ 8 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java

@@ -355,6 +355,7 @@ public class TestProtoBufRpc extends TestRpcBase {
       TimeoutException, InterruptedException {
     //No test with legacy
     assumeFalse(testWithLegacy);
+    server.setLogSlowRPCThresholdTime(SLEEP_DURATION);
     TestRpcService2 client = getClient2();
     // make 10 K fast calls
     for (int x = 0; x < 10000; x++) {
@@ -370,7 +371,13 @@ public class TestProtoBufRpc extends TestRpcBase {
     assertThat(rpcMetrics.getProcessingSampleCount()).isGreaterThan(999L);
     long before = rpcMetrics.getRpcSlowCalls();
 
-    // make a really slow call. Sleep sleeps for 1000ms
+    // Sleep sleeps for 500ms(less than `logSlowRPCThresholdTime`),
+    // make sure we never called into Log slow RPC routine.
+    client.sleep(null, newSleepRequest(SLEEP_DURATION / 2));
+    long after = rpcMetrics.getRpcSlowCalls();
+    assertThat(before).isEqualTo(after);
+
+    // Make a really slow call. Sleep sleeps for 3000ms.
     client.sleep(null, newSleepRequest(SLEEP_DURATION * 3));
 
     // Ensure slow call is logged.

+ 47 - 1
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java

@@ -126,6 +126,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
@@ -365,7 +369,9 @@ public class NameNode extends ReconfigurableBase implements
           DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_KEY,
           DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_LIMIT,
           DFS_NAMENODE_DECOMMISSION_BACKOFF_MONITOR_PENDING_BLOCKS_PER_LOCK,
-          DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY));
+          DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY,
+          IPC_SERVER_LOG_SLOW_RPC,
+          IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY));
 
   private static final String USAGE = "Usage: hdfs namenode ["
       + StartupOption.BACKUP.getName() + "] | \n\t["
@@ -2369,6 +2375,9 @@ public class NameNode extends ReconfigurableBase implements
           newVal);
     } else if (property.equals(DFS_NAMENODE_BLOCKPLACEMENTPOLICY_MIN_BLOCKS_FOR_WRITE_KEY)) {
       return reconfigureMinBlocksForWrite(property, newVal);
+    } else if (property.equals(IPC_SERVER_LOG_SLOW_RPC) ||
+        (property.equals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY))) {
+      return reconfigureLogSlowRPC(property, newVal);
     } else {
       throw new ReconfigurationException(property, newVal, getConf().get(
           property));
@@ -2511,6 +2520,43 @@ public class NameNode extends ReconfigurableBase implements
     return Boolean.toString(clientBackoffEnabled);
   }
 
+  String reconfigureLogSlowRPC(String property, String newVal) throws ReconfigurationException {
+    String result = null;
+    try {
+      if (property.equals(IPC_SERVER_LOG_SLOW_RPC)) {
+        if (newVal != null && !newVal.equalsIgnoreCase("true") &&
+            !newVal.equalsIgnoreCase("false")) {
+          throw new IllegalArgumentException(newVal + " is not boolean value");
+        }
+        boolean logSlowRPC = (newVal == null ? IPC_SERVER_LOG_SLOW_RPC_DEFAULT :
+            Boolean.parseBoolean(newVal));
+        rpcServer.getClientRpcServer().setLogSlowRPC(logSlowRPC);
+        if (rpcServer.getServiceRpcServer() != null) {
+          rpcServer.getServiceRpcServer().setLogSlowRPC(logSlowRPC);
+        }
+        if (rpcServer.getLifelineRpcServer() != null) {
+          rpcServer.getLifelineRpcServer().setLogSlowRPC(logSlowRPC);
+        }
+        result = Boolean.toString(logSlowRPC);
+      } else if (property.equals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY)) {
+        long logSlowRPCThresholdTime = (newVal == null ?
+            IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT : Long.parseLong(newVal));
+        rpcServer.getClientRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime);
+        if (rpcServer.getServiceRpcServer() != null) {
+          rpcServer.getServiceRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime);
+        }
+        if (rpcServer.getLifelineRpcServer() != null) {
+          rpcServer.getLifelineRpcServer().setLogSlowRPCThresholdTime(logSlowRPCThresholdTime);
+        }
+        result = Long.toString(logSlowRPCThresholdTime);
+      }
+      LOG.info("RECONFIGURE* changed reconfigureLogSlowRPC {} to {}", property, result);
+      return result;
+    } catch (IllegalArgumentException e) {
+      throw new ReconfigurationException(property, newVal, getConf().get(property), e);
+    }
+  }
+
   String reconfigureSPSModeEvent(String newVal, String property)
       throws ReconfigurationException {
     if (newVal == null

+ 46 - 0
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNameNodeReconfigure.java

@@ -29,6 +29,9 @@ import org.junit.Test;
 import org.junit.Before;
 import org.junit.After;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_NODES_TO_REPORT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_IMAGE_PARALLEL_LOAD_KEY;
 import static org.junit.Assert.*;
@@ -701,6 +704,49 @@ public class TestNameNodeReconfigure {
     assertEquals(3, bm.getMinBlocksForWrite(BlockType.STRIPED));
   }
 
+  @Test
+  public void testReconfigureLogSlowRPC() throws ReconfigurationException {
+    final NameNode nameNode = cluster.getNameNode();
+    final NameNodeRpcServer nnrs = (NameNodeRpcServer) nameNode.getRpcServer();
+    // verify default value.
+    assertFalse(nnrs.getClientRpcServer().isLogSlowRPC());
+    assertEquals(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_DEFAULT,
+        nnrs.getClientRpcServer().getLogSlowRPCThresholdTime());
+
+    // try invalid logSlowRPC.
+    try {
+      nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "non-boolean");
+      fail("should not reach here");
+    } catch (ReconfigurationException e) {
+      assertEquals(
+          "Could not change property ipc.server.log.slow.rpc from 'false' to 'non-boolean'",
+          e.getMessage());
+    }
+
+    // try correct logSlowRPC.
+    nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, "True");
+    assertTrue(nnrs.getClientRpcServer().isLogSlowRPC());
+
+    // revert to defaults.
+    nameNode.reconfigurePropertyImpl(IPC_SERVER_LOG_SLOW_RPC, null);
+    assertFalse(nnrs.getClientRpcServer().isLogSlowRPC());
+
+    // try invalid logSlowRPCThresholdTime.
+    try {
+      nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
+          "non-numeric");
+      fail("Should not reach here");
+    } catch (ReconfigurationException e) {
+      assertEquals("Could not change property " +
+          "ipc.server.log.slow.rpc.threshold.ms from '0' to 'non-numeric'", e.getMessage());
+    }
+
+    // try correct logSlowRPCThresholdTime.
+    nameNode.reconfigureProperty(IPC_SERVER_LOG_SLOW_RPC_THRESHOLD_MS_KEY,
+        "20000");
+    assertEquals(nnrs.getClientRpcServer().getLogSlowRPCThresholdTime(), 20000);
+  }
+
   @After
   public void shutDown() throws IOException {
     if (cluster != null) {

+ 1 - 1
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java

@@ -442,7 +442,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("namenode", address, outs, errs);
-    assertEquals(23, outs.size());
+    assertEquals(25, outs.size());
     assertTrue(outs.get(0).contains("Reconfigurable properties:"));
     assertEquals(DFS_BLOCK_INVALIDATE_LIMIT_KEY, outs.get(1));
     assertEquals(DFS_BLOCK_PLACEMENT_EC_CLASSNAME_KEY, outs.get(2));