Parcourir la source

HADOOP-16850. Support getting thread info from thread group for JvmMetrics to improve the performance. Contributed by Tao Yang.

Akira Ajisaka il y a 5 ans
Parent
commit
954930e9d9

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -426,4 +426,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
       "dfs.client.ignore.namenode.default.kms.uri";
   public static final boolean
       DFS_CLIENT_IGNORE_NAMENODE_DEFAULT_KMS_URI_DEFAULT = false;
+
+  /**
+   * Whether or not ThreadMXBean is used for getting thread info in JvmMetrics,
+   * ThreadGroup approach is preferred for better performance.
+   */
+  public static final String HADOOP_METRICS_JVM_USE_THREAD_MXBEAN =
+      "hadoop.metrics.jvm.use-thread-mxbean";
+  public static final boolean HADOOP_METRICS_JVM_USE_THREAD_MXBEAN_DEFAULT =
+      false;
 }

+ 53 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java

@@ -31,6 +31,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.log.metrics.EventCounter;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsInfo;
@@ -84,7 +86,7 @@ public class JvmMetrics implements MetricsSource {
   final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
   final List<GarbageCollectorMXBean> gcBeans =
       ManagementFactory.getGarbageCollectorMXBeans();
-  final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
+  private ThreadMXBean threadMXBean;
   final String processName, sessionId;
   private JvmPauseMonitor pauseMonitor = null;
   final ConcurrentHashMap<String, MetricsInfo[]> gcInfoCache =
@@ -92,9 +94,12 @@ public class JvmMetrics implements MetricsSource {
   private GcTimeMonitor gcTimeMonitor = null;
 
   @VisibleForTesting
-  JvmMetrics(String processName, String sessionId) {
+  JvmMetrics(String processName, String sessionId, boolean useThreadMXBean) {
     this.processName = processName;
     this.sessionId = sessionId;
+    if (useThreadMXBean) {
+      this.threadMXBean = ManagementFactory.getThreadMXBean();
+    }
   }
 
   public void setPauseMonitor(final JvmPauseMonitor pauseMonitor) {
@@ -108,8 +113,15 @@ public class JvmMetrics implements MetricsSource {
 
   public static JvmMetrics create(String processName, String sessionId,
                                   MetricsSystem ms) {
+    // Reloading conf instead of getting from outside since it's redundant in
+    // code level to update all the callers across lots of modules,
+    // this method is called at most once for components (NN/DN/RM/NM/...)
+    // so that the overall cost is not expensive.
+    boolean useThreadMXBean = new Configuration().getBoolean(
+        CommonConfigurationKeys.HADOOP_METRICS_JVM_USE_THREAD_MXBEAN,
+        CommonConfigurationKeys.HADOOP_METRICS_JVM_USE_THREAD_MXBEAN_DEFAULT);
     return ms.register(JvmMetrics.name(), JvmMetrics.description(),
-                       new JvmMetrics(processName, sessionId));
+                       new JvmMetrics(processName, sessionId, useThreadMXBean));
   }
 
   public static void reattach(MetricsSystem ms, JvmMetrics jvmMetrics) {
@@ -137,7 +149,11 @@ public class JvmMetrics implements MetricsSource {
         .tag(SessionId, sessionId);
     getMemoryUsage(rb);
     getGcUsage(rb);
-    getThreadUsage(rb);
+    if (threadMXBean != null) {
+      getThreadUsage(rb);
+    } else {
+      getThreadUsageFromGroup(rb);
+    }
     getEventCounters(rb);
   }
 
@@ -235,6 +251,39 @@ public class JvmMetrics implements MetricsSource {
       .addGauge(ThreadsTerminated, threadsTerminated);
   }
 
+  private void getThreadUsageFromGroup(MetricsRecordBuilder rb) {
+    int threadsNew = 0;
+    int threadsRunnable = 0;
+    int threadsBlocked = 0;
+    int threadsWaiting = 0;
+    int threadsTimedWaiting = 0;
+    int threadsTerminated = 0;
+    ThreadGroup threadGroup = Thread.currentThread().getThreadGroup();
+    Thread[] threads = new Thread[threadGroup.activeCount()];
+    threadGroup.enumerate(threads);
+    for (Thread thread : threads) {
+      if (thread == null) {
+        // race protection
+        continue;
+      }
+      switch (thread.getState()) {
+      case NEW:           threadsNew++;           break;
+      case RUNNABLE:      threadsRunnable++;      break;
+      case BLOCKED:       threadsBlocked++;       break;
+      case WAITING:       threadsWaiting++;       break;
+      case TIMED_WAITING: threadsTimedWaiting++;  break;
+      case TERMINATED:    threadsTerminated++;    break;
+      default:
+      }
+    }
+    rb.addGauge(ThreadsNew, threadsNew)
+        .addGauge(ThreadsRunnable, threadsRunnable)
+        .addGauge(ThreadsBlocked, threadsBlocked)
+        .addGauge(ThreadsWaiting, threadsWaiting)
+        .addGauge(ThreadsTimedWaiting, threadsTimedWaiting)
+        .addGauge(ThreadsTerminated, threadsTerminated);
+  }
+
   private void getEventCounters(MetricsRecordBuilder rb) {
     rb.addCounter(LogFatal, EventCounter.getFatal())
       .addCounter(LogError, EventCounter.getError())

+ 9 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -3843,4 +3843,13 @@
       Enable Server Name Indication (SNI) host check for HTTPS enabled server.
     </description>
   </property>
+
+  <property>
+    <name>hadoop.metrics.jvm.use-thread-mxbean</name>
+    <value>false</value>
+    <description>
+      Whether or not ThreadMXBean is used for getting thread info in JvmMetrics,
+      ThreadGroup approach is preferred for better performance.
+    </description>
+  </property>
 </configuration>

+ 89 - 2
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.metrics2.source;
 
+import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
 import org.apache.hadoop.util.GcTimeMonitor;
 import org.junit.After;
 import org.junit.Assert;
@@ -37,6 +38,7 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.JvmPauseMonitor;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import static org.apache.hadoop.metrics2.source.JvmMetricsInfo.*;
@@ -65,7 +67,7 @@ public class TestJvmMetrics {
     pauseMonitor = new JvmPauseMonitor();
     pauseMonitor.init(new Configuration());
     pauseMonitor.start();
-    JvmMetrics jvmMetrics = new JvmMetrics("test", "test");
+    JvmMetrics jvmMetrics = new JvmMetrics("test", "test", false);
     jvmMetrics.setPauseMonitor(pauseMonitor);
     MetricsRecordBuilder rb = getMetrics(jvmMetrics);
     MetricsCollector mc = rb.parent();
@@ -91,7 +93,7 @@ public class TestJvmMetrics {
   public void testGcTimeMonitorPresence() {
     gcTimeMonitor = new GcTimeMonitor(60000, 1000, 70, null);
     gcTimeMonitor.start();
-    JvmMetrics jvmMetrics = new JvmMetrics("test", "test");
+    JvmMetrics jvmMetrics = new JvmMetrics("test", "test", false);
     jvmMetrics.setGcTimeMonitor(gcTimeMonitor);
     MetricsRecordBuilder rb = getMetrics(jvmMetrics);
     MetricsCollector mc = rb.parent();
@@ -226,4 +228,89 @@ public class TestJvmMetrics {
     Assert.assertEquals("unexpected process name of the singleton instance",
         process1Name, jvmMetrics2.processName);
   }
+
+  /**
+   * Performance test for JvmMetrics#getMetrics, comparing performance of
+   * getting thread usage from ThreadMXBean with that from ThreadGroup.
+   */
+  @Test
+  public void testGetMetricsPerf() {
+    JvmMetrics jvmMetricsUseMXBean = new JvmMetrics("test", "test", true);
+    JvmMetrics jvmMetrics = new JvmMetrics("test", "test", false);
+    MetricsCollectorImpl collector = new MetricsCollectorImpl();
+    // warm up
+    jvmMetrics.getMetrics(collector, true);
+    jvmMetricsUseMXBean.getMetrics(collector, true);
+    // test cases with different numbers of threads
+    int[] numThreadsCases = {100, 200, 500, 1000, 2000, 3000};
+    List<TestThread> threads = new ArrayList();
+    for (int numThreads : numThreadsCases) {
+      updateThreadsAndWait(threads, numThreads);
+      long startNs = System.nanoTime();
+      jvmMetricsUseMXBean.getMetrics(collector, true);
+      long processingNsFromMXBean = System.nanoTime() - startNs;
+      startNs = System.nanoTime();
+      jvmMetrics.getMetrics(collector, true);
+      long processingNsFromGroup = System.nanoTime() - startNs;
+      System.out.println(
+          "#Threads=" + numThreads + ", ThreadMXBean=" + processingNsFromMXBean
+              + " ns, ThreadGroup=" + processingNsFromGroup + " ns, ratio: " + (
+              processingNsFromMXBean / processingNsFromGroup));
+    }
+    // cleanup
+    updateThreadsAndWait(threads, 0);
+  }
+
+  private static void updateThreadsAndWait(List<TestThread> threads,
+      int expectedNumThreads) {
+    // add/remove threads according to expected number
+    int addNum = expectedNumThreads - threads.size();
+    if (addNum > 0) {
+      for (int i = 0; i < addNum; i++) {
+        TestThread testThread = new TestThread();
+        testThread.start();
+        threads.add(testThread);
+      }
+    } else if (addNum < 0) {
+      for (int i = 0; i < Math.abs(addNum); i++) {
+        threads.get(i).exit = true;
+      }
+    } else {
+      return;
+    }
+    // wait for threads to reach the expected number
+    while (true) {
+      Iterator<TestThread> it = threads.iterator();
+      while (it.hasNext()) {
+        if (it.next().exited) {
+          it.remove();
+        }
+      }
+      if (threads.size() == expectedNumThreads) {
+        break;
+      } else {
+        try {
+          Thread.sleep(500);
+        } catch (InterruptedException e) {
+          //ignore
+        }
+      }
+    }
+  }
+
+  static class TestThread extends Thread {
+    private volatile boolean exit = false;
+    private boolean exited = false;
+    @Override
+    public void run() {
+      while (!exit) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      exited = true;
+    }
+  }
 }