|
@@ -0,0 +1,241 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.mapred;
|
|
|
+
|
|
|
+import java.lang.management.ManagementFactory;
|
|
|
+import java.lang.management.ThreadInfo;
|
|
|
+import java.lang.management.ThreadMXBean;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.mortbay.jetty.nio.SelectChannelConnector;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Class that monitors for a certain class of Jetty bug known to
|
|
|
+ * affect TaskTrackers. In this type of bug, the Jetty selector
|
|
|
+ * thread starts spinning and using ~100% CPU while no actual
|
|
|
+ * HTTP content is being served. Given that this bug has been
|
|
|
+ * active in Jetty/JDK for a long time with no resolution in site,
|
|
|
+ * this class provides a temporary workaround.
|
|
|
+ *
|
|
|
+ * Upon detecting the selector thread spinning, it simply exits the
|
|
|
+ * JVM with a Fatal message.
|
|
|
+ */
|
|
|
+class JettyBugMonitor extends Thread {
|
|
|
+ private final static Log LOG = LogFactory.getLog(
|
|
|
+ JettyBugMonitor.class);
|
|
|
+
|
|
|
+ private static final ThreadMXBean threadBean =
|
|
|
+ ManagementFactory.getThreadMXBean();
|
|
|
+
|
|
|
+ private static final String CHECK_ENABLED_KEY =
|
|
|
+ "mapred.tasktracker.jetty.cpu.check.enabled";
|
|
|
+ private static final boolean CHECK_ENABLED_DEFAULT = true;
|
|
|
+
|
|
|
+ static final String CHECK_INTERVAL_KEY =
|
|
|
+ "mapred.tasktracker.jetty.cpu.check.interval";
|
|
|
+ private static final long CHECK_INTERVAL_DEFAULT = 15*1000;
|
|
|
+ private long checkInterval;
|
|
|
+
|
|
|
+ private static final String WARN_THRESHOLD_KEY =
|
|
|
+ "mapred.tasktracker.jetty.cpu.threshold.warn";
|
|
|
+ private static final float WARN_THRESHOLD_DEFAULT = 0.50f;
|
|
|
+ private float warnThreshold;
|
|
|
+
|
|
|
+ private static final String FATAL_THRESHOLD_KEY =
|
|
|
+ "mapred.tasktracker.jetty.cpu.threshold.fatal";
|
|
|
+ private static final float FATAL_THRESHOLD_DEFAULT = 0.90f;
|
|
|
+ private float fatalThreshold;
|
|
|
+
|
|
|
+ private boolean stopping = false;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create the monitoring thread.
|
|
|
+ * @return null if thread CPU monitoring is not supported
|
|
|
+ */
|
|
|
+ public static JettyBugMonitor create(Configuration conf) {
|
|
|
+ if (!conf.getBoolean(CHECK_ENABLED_KEY, CHECK_ENABLED_DEFAULT)) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!threadBean.isThreadCpuTimeSupported()) {
|
|
|
+ LOG.info("Not starting monitor for Jetty bug since thread CPU time " +
|
|
|
+ "measurement is not supported by this JVM");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ return new JettyBugMonitor(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ JettyBugMonitor(Configuration conf) {
|
|
|
+ setName("Monitor for Jetty bugs");
|
|
|
+ setDaemon(true);
|
|
|
+
|
|
|
+ this.warnThreshold = conf.getFloat(
|
|
|
+ WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
|
|
|
+ this.fatalThreshold = conf.getFloat(
|
|
|
+ FATAL_THRESHOLD_KEY, FATAL_THRESHOLD_DEFAULT);
|
|
|
+ this.checkInterval = conf.getLong(
|
|
|
+ CHECK_INTERVAL_KEY, CHECK_INTERVAL_DEFAULT);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ try {
|
|
|
+ doRun();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ if (!stopping) {
|
|
|
+ LOG.warn("Jetty monitor unexpectedly interrupted", ie);
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Jetty bug monitor failed", t);
|
|
|
+ }
|
|
|
+ LOG.debug("JettyBugMonitor shutting down");
|
|
|
+ }
|
|
|
+
|
|
|
+ private void doRun() throws InterruptedException {
|
|
|
+ List<Long> tids = waitForJettyThreads();
|
|
|
+ if (tids.isEmpty()) {
|
|
|
+ LOG.warn("Could not locate Jetty selector threads");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ while (true) {
|
|
|
+ try {
|
|
|
+ monitorThreads(tids);
|
|
|
+ } catch (ThreadNotRunningException tnre) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Monitor the given list of threads, summing their CPU usage.
|
|
|
+ * If the usage exceeds the configured threshold, aborts the JVM.
|
|
|
+ * @param tids thread ids to monitor
|
|
|
+ * @throws InterruptedException if interrupted
|
|
|
+ * @throws ThreadNotRunningException if one of the threads is no longer
|
|
|
+ * running
|
|
|
+ */
|
|
|
+ private void monitorThreads(List<Long> tids)
|
|
|
+ throws InterruptedException, ThreadNotRunningException {
|
|
|
+
|
|
|
+ long timeBefore = System.nanoTime();
|
|
|
+ long usageBefore = getCpuUsageNanos(tids);
|
|
|
+ while (true) {
|
|
|
+ Thread.sleep(checkInterval);
|
|
|
+ long usageAfter = getCpuUsageNanos(tids);
|
|
|
+ long timeAfter = System.nanoTime();
|
|
|
+
|
|
|
+ long delta = usageAfter - usageBefore;
|
|
|
+ double percentCpu = (double)delta / (timeAfter - timeBefore);
|
|
|
+
|
|
|
+ String msg = String.format("Jetty CPU usage: %.1f%%", percentCpu * 100);
|
|
|
+ if (percentCpu > fatalThreshold) {
|
|
|
+ LOG.fatal(
|
|
|
+ "************************************************************\n" +
|
|
|
+ msg + ". This is greater than the fatal threshold " +
|
|
|
+ FATAL_THRESHOLD_KEY + ". Aborting JVM.\n" +
|
|
|
+ "************************************************************");
|
|
|
+ doAbort();
|
|
|
+ } else if (percentCpu > warnThreshold) {
|
|
|
+ LOG.warn(msg);
|
|
|
+ } else if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug(msg);
|
|
|
+ }
|
|
|
+
|
|
|
+ usageBefore = usageAfter;
|
|
|
+ timeBefore = timeAfter;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected void doAbort() {
|
|
|
+ Runtime.getRuntime().exit(1);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Wait for jetty selector threads to start.
|
|
|
+ * @return the list of thread IDs
|
|
|
+ * @throws InterruptedException if interrupted
|
|
|
+ */
|
|
|
+ protected List<Long> waitForJettyThreads() throws InterruptedException {
|
|
|
+ List<Long> tids = new ArrayList<Long>();
|
|
|
+ int i = 0;
|
|
|
+ while (tids.isEmpty() & i++ < 30) {
|
|
|
+ Thread.sleep(1000);
|
|
|
+ tids = getJettyThreadIds();
|
|
|
+ }
|
|
|
+ return tids;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static long getCpuUsageNanos(List<Long> tids)
|
|
|
+ throws ThreadNotRunningException {
|
|
|
+ long total = 0;
|
|
|
+ for (long tid : tids) {
|
|
|
+ long time = threadBean.getThreadCpuTime(tid);
|
|
|
+ if (time == -1) {
|
|
|
+ LOG.warn("Unable to monitor CPU usage for thread: " + tid);
|
|
|
+ throw new ThreadNotRunningException();
|
|
|
+ }
|
|
|
+ total += time;
|
|
|
+ }
|
|
|
+ return total;
|
|
|
+ }
|
|
|
+
|
|
|
+ static List<Long> getJettyThreadIds() {
|
|
|
+ List<Long> tids = new ArrayList<Long>();
|
|
|
+ long[] threadIds = threadBean.getAllThreadIds();
|
|
|
+ for (long tid : threadIds) {
|
|
|
+ if (isJettySelectorThread(tid)) {
|
|
|
+ tids.add(tid);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return tids;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return true if the given thread ID appears to be a Jetty selector thread
|
|
|
+ * based on its stack trace
|
|
|
+ */
|
|
|
+ private static boolean isJettySelectorThread(long tid) {
|
|
|
+ ThreadInfo info = threadBean.getThreadInfo(tid, 20);
|
|
|
+ for (StackTraceElement stack : info.getStackTrace()) {
|
|
|
+ // compare class names instead of classses, since
|
|
|
+ // jetty uses a different classloader
|
|
|
+ if (SelectChannelConnector.class.getName().equals(
|
|
|
+ stack.getClassName())) {
|
|
|
+ LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
|
|
|
+ "is a Jetty selector thread.");
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ LOG.debug("Thread #" + tid + " (" + info.getThreadName() + ") " +
|
|
|
+ "is not a jetty thread");
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class ThreadNotRunningException extends Exception {
|
|
|
+ private static final long serialVersionUID = 1L;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void shutdown() {
|
|
|
+ this.stopping = true;
|
|
|
+ this.interrupt();
|
|
|
+ }
|
|
|
+}
|