Bläddra i källkod

MAPREDUCE-2529. Add support for regex-based shuffle metric counting
exceptions. (Thomas Graves via cdouglas)


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-204@1136704 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 år sedan
förälder
incheckning
e75abb2e55

+ 3 - 0
CHANGES.txt

@@ -86,6 +86,9 @@ Release 0.20.204.0 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-2529. Add support for regex-based shuffle metric counting
+    exceptions. (Thomas Graves via cdouglas)
+
     HADOOP-7398. Suppress warnings about use of HADOOP_HOME. (omalley)
 
     MAPREDUCE-2415. Distribute the user task logs on to multiple disks.

+ 8 - 0
src/mapred/org/apache/hadoop/mapred/ShuffleServerInstrumentation.java

@@ -37,6 +37,8 @@ class ShuffleServerInstrumentation implements MetricsSource {
       registry.newCounter("shuffle_failed_outputs", "", 0);
   final MetricMutableCounterInt successOutputs =
       registry.newCounter("shuffle_success_outputs", "", 0);
+  final MetricMutableCounterInt exceptionsCaught =
+    registry.newCounter("shuffle_exceptions_caught", "", 0);
 
   ShuffleServerInstrumentation(TaskTracker tt) {
     ttWorkerThreads = tt.workerThreads;
@@ -69,6 +71,12 @@ class ShuffleServerInstrumentation implements MetricsSource {
     successOutputs.incr();
   }
 
+  //@Override
+  void exceptionsCaught() {
+    exceptionsCaught.incr();
+  }
+
+
   @Override
   public void getMetrics(MetricsBuilder builder, boolean all) {
     MetricsRecordBuilder rb = builder.addRecord(registry.name());

+ 47 - 0
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -1466,6 +1466,15 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
     server.setAttribute("log", LOG);
     server.setAttribute("localDirAllocator", localDirAllocator);
     server.setAttribute("shuffleServerMetrics", shuffleServerMetrics);
+
+    String exceptionStackRegex =
+      conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
+    String exceptionMsgRegex =
+      conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
+
+    server.setAttribute("exceptionStackRegex", exceptionStackRegex);
+    server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+
     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
     server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
     server.start();
@@ -3673,6 +3682,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         (ShuffleServerInstrumentation) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
+      String exceptionStackRegex =
+        (String) context.getAttribute("exceptionStackRegex");
+      String exceptionMsgRegex =
+        (String) context.getAttribute("exceptionMsgRegex");
 
       verifyRequest(request, response, tracker, jobId);
 
@@ -3778,12 +3791,14 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
                  " from map: " + mapId + " given " + info.partLength + "/" + 
                  info.rawLength);
         }
+
       } catch (IOException ie) {
         Log log = (Log) context.getAttribute("log");
         String errorMsg = ("getMapOutput(" + mapId + "," + reduceId + 
                            ") failed :\n"+
                            StringUtils.stringifyException(ie));
         log.warn(errorMsg);
+        checkException(ie, exceptionMsgRegex, exceptionStackRegex, shuffleMetrics);
         if (isInputException) {
           tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
         }
@@ -3807,6 +3822,38 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       shuffleMetrics.successOutput();
     }
     
+    protected void checkException(IOException ie, String exceptionMsgRegex,
+        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
+      // parse exception to see if it looks like a regular expression you
+      // configure. If both msgRegex and StackRegex set then make sure both
+      // match, otherwise only the one set has to match.
+      if (exceptionMsgRegex != null) {
+        String msg = ie.getMessage();
+        if (msg == null || !msg.matches(exceptionMsgRegex)) {
+          return;
+        }
+      }
+      if (exceptionStackRegex != null
+          && !checkStackException(ie, exceptionStackRegex)) {
+        return;
+      }
+      shuffleMetrics.exceptionsCaught();
+    }
+
+    private boolean checkStackException(IOException ie,
+        String exceptionStackRegex) {
+      StackTraceElement[] stack = ie.getStackTrace();
+
+      for (StackTraceElement elem : stack) {
+        String stacktrace = elem.toString();
+        if (stacktrace.matches(exceptionStackRegex)) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+
     /**
      * verify that request has correct HASH for the url
      * and also add a field to reply header with hash of the HASH

+ 177 - 0
src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java

@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import static org.apache.hadoop.test.MetricsAsserts.*;
+
+import org.junit.Test;
+
+public class TestShuffleExceptionCount {
+
+  public static class TestMapOutputServlet extends TaskTracker.MapOutputServlet {
+
+    public void checkException(IOException ie, String exceptionMsgRegex,
+        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
+      super.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+          shuffleMetrics);
+    }
+
+  }
+
+  @Test
+  public void testCheckException() throws IOException, InterruptedException {
+    TestMapOutputServlet testServlet = new TestMapOutputServlet();
+    JobConf conf = new JobConf();
+    conf.setUser("testuser");
+    conf.setJobName("testJob");
+    conf.setSessionId("testSession");
+
+    TaskTracker tt = new TaskTracker();
+    tt.setConf(conf);
+    ShuffleServerInstrumentation shuffleMetrics =
+      ShuffleServerInstrumentation.create(tt);
+
+    // first test with only MsgRegex set but doesn't match
+    String exceptionMsgRegex = "Broken pipe";
+    String exceptionStackRegex = null;
+    IOException ie = new IOException("EOFException");
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    MetricsRecordBuilder rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 0, rb);
+
+    // test with only MsgRegex set that does match
+    ie = new IOException("Broken pipe");
+    exceptionStackRegex = null;
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 1, rb);
+
+    // test with neither set, make sure incremented
+    exceptionMsgRegex = null;
+    exceptionStackRegex = null;
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 2, rb);
+
+    // test with only StackRegex set doesn't match
+    exceptionMsgRegex = null;
+    exceptionStackRegex = ".*\\.doesnt\\$SelectSet\\.wakeup.*";
+    ie.setStackTrace(constructStackTrace());
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 2, rb);
+
+    // test with only StackRegex set does match
+    exceptionMsgRegex = null;
+    exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 3, rb);
+
+    // test with both regex set and matches
+    exceptionMsgRegex = "Broken pipe";
+    ie.setStackTrace(constructStackTraceTwo());
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 4, rb);
+
+    // test with both regex set and only msg matches
+    exceptionStackRegex = ".*[1-9]+BOGUSREGEX";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 4, rb);
+
+    // test with both regex set and only stack matches
+    exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    exceptionMsgRegex = "EOFException";
+    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
+        shuffleMetrics);
+    rb = getMetrics(shuffleMetrics);
+    assertCounter("shuffle_exceptions_caught", 4, rb);
+
+  }
+
+  /*
+   * Construction exception like:
+   * java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   * org.mortbay.io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+   * org.mortbay.io.nio.SelectorManager$SelectSet.doSelect(SelectorManager.java:709) at
+   * org.mortbay.io.nio.SelectorManager.doSelect(SelectorManager.java:192) at
+   * org.mortbay.jetty.nio.SelectChannelConnector.accept(SelectChannelConnector.java:124) at
+   * org.mortbay.jetty.AbstractConnector$Acceptor.run(AbstractConnector.java:708) at
+   * org.mortbay.thread.QueuedThreadPool$PoolThread.run(QueuedThreadPool.java:582)
+   */
+  private StackTraceElement[] constructStackTrace() {
+    StackTraceElement[] stack = new StackTraceElement[9];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "EPollArrayWrapper.java", 256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup", "EPollSelectorImpl.java", 175);
+    stack[3] = new StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup", "SelectorManager.java", 831);
+    stack[4] = new StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "doSelect", "SelectorManager.java", 709);
+    stack[5] = new StackTraceElement("org.mortbay.io.nio.SelectorManager", "doSelect", "SelectorManager.java", 192);
+    stack[6] = new StackTraceElement("org.mortbay.jetty.nio.SelectChannelConnector", "accept", "SelectChannelConnector.java", 124);
+    stack[7] = new StackTraceElement("org.mortbay.jetty.AbstractConnector$Acceptor", "run", "AbstractConnector.java", 708);
+    stack[8] = new StackTraceElement("org.mortbay.thread.QueuedThreadPool$PoolThread", "run", "QueuedThreadPool.java", 582);
+
+    return stack;
+  }
+
+  /*
+   * java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   * org.mortbay.io.nio.SelectorManager$SelectSet.wakeup(SelectorManager.java:831) at
+   * org.mortbay.io.nio.SelectChannelEndPoint.updateKey(SelectChannelEndPoint.java:335) at
+   * org.mortbay.io.nio.SelectChannelEndPoint.blockWritable(SelectChannelEndPoint.java:278) at
+   * org.mortbay.jetty.AbstractGenerator$Output.blockForOutput(AbstractGenerator.java:545) at
+   * org.mortbay.jetty.AbstractGenerator$Output.flush(AbstractGenerator.java:572) at
+   * org.mortbay.jetty.HttpConnection$Output.flush(HttpConnection.java:1012) at
+   * org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:651)at
+   * org.mortbay.jetty.AbstractGenerator$Output.write(AbstractGenerator.java:580) at
+   */
+  private StackTraceElement[] constructStackTraceTwo() {
+    StackTraceElement[] stack = new StackTraceElement[11];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "EPollArrayWrapper.java", 256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup", "EPollSelectorImpl.java", 175);
+    stack[3] = new StackTraceElement("org.mortbay.io.nio.SelectorManager$SelectSet", "wakeup", "SelectorManager.java", 831);
+    stack[4] = new StackTraceElement("org.mortbay.io.nio.SelectChannelEndPoint", "updateKey", "SelectChannelEndPoint.java", 335);
+    stack[5] = new StackTraceElement("org.mortbay.io.nio.SelectChannelEndPoint", "blockWritable", "SelectChannelEndPoint.java", 278);
+    stack[6] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "blockForOutput", "AbstractGenerator.java", 545);
+    stack[7] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "flush", "AbstractGenerator.java", 572);
+    stack[8] = new StackTraceElement("org.mortbay.jetty.HttpConnection$Output", "flush", "HttpConnection.java", 1012);
+    stack[9] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "write", "AbstractGenerator.java", 651);
+    stack[10] = new StackTraceElement("org.mortbay.jetty.AbstractGenerator$Output", "write", "AbstractGenerator.java", 580);
+
+    return stack;
+  }
+
+}