Forráskód Böngészése

svn merge -c 1302058 branch-1 to branch-1.0 FIXES MAPREDUCE-3851. Allow more aggressive action on detection of the jetty issue (tgraves via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1.0@1302059 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 13 éve
szülő
commit
7a3afb8f0c

+ 3 - 0
CHANGES.txt

@@ -48,6 +48,9 @@ Release 1.0.2 - unreleased
 
     HDFS-3101. Cannot read empty file using WebHDFS.  (szetszwo)
 
+    MAPREDUCE-3851.  Allow more aggressive action on detection of the jetty
+    issue (tgraves via bobby)
+
 Release 1.0.1 - 2012.02.14
 
   NEW FEATURES

+ 168 - 0
src/mapred/org/apache/hadoop/mapred/ShuffleExceptionTracker.java

@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.BitSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This class is used to track shuffle exceptions. It contains routines
+ * to check an exception that occurred while fetching the Map output to see if it
+ * matches what was configured. It also provides functions to keep track of
+ * the number of exceptions that occurred and if a limit is set, it will
+ * abort the TT.  The limit is a percent of exceptions of the last X number of
+ * requests.
+ *
+ */
+public class ShuffleExceptionTracker {
+  public static final Log LOG = LogFactory
+      .getLog(ShuffleExceptionTracker.class);
+
+  // a clear bit is success, set bit is exception occurred
+  private BitSet requests;
+  final private int size;
+  private int index;
+  final private String exceptionStackRegex;
+  final private String exceptionMsgRegex;
+  final private float shuffleExceptionLimit;
+
+  /**
+   * Constructor
+   *
+   * @param numberRequests
+   *          the tailing number of requests to track
+   * @param exceptionStackRegex
+   *          the exception stack regular expression to look for
+   * @param exceptionMsgRegex
+   *          the exception message regular expression to look for
+   * @param shuffleExceptionLimit
+   *          the exception limit (0-1.0) representing a percent. 0 disables the
+   *          abort check.
+   */
+  ShuffleExceptionTracker(int numberRequests, String exceptionStackRegex,
+      String exceptionMsgRegex, float shuffleExceptionLimit) {
+    this.exceptionStackRegex = exceptionStackRegex;
+    this.exceptionMsgRegex = exceptionMsgRegex;
+    this.shuffleExceptionLimit = shuffleExceptionLimit;
+    this.size = numberRequests;
+    this.index = 0;
+    this.requests = new BitSet(size);
+  }
+
+  /**
+   * Gets the number of requests we are tracking
+   *
+   * @return number of requests
+   */
+  public int getNumRequests() {
+    return this.size;
+  }
+
+  /**
+   * Gets the percent of the requests that had exceptions occur.
+   *
+   * @return percent failures as float
+   */
+  public synchronized float getPercentExceptions() {
+    return (float) requests.cardinality() / (float) size;
+  }
+
+  /**
+   * Mark the request as success.
+   */
+  public synchronized void success() {
+    if (shuffleExceptionLimit > 0) {
+      requests.clear(index);
+      incrIndex();
+    }
+  }
+
+  /**
+   * Mark the request as an exception occurred.
+   */
+  public synchronized void exception() {
+    if (shuffleExceptionLimit > 0) {
+      requests.set(index);
+      incrIndex();
+    }
+  }
+
+  /**
+   * Parse the exception to see if it matches the regular expression you
+   * configured. If both msgRegex and StackRegex are set then make sure both
+   * match, otherwise only one has to match. Abort if the limit is hit.
+   * @param ie - the shuffle exception that occurred
+   * @return true if the exception matches, false otherwise
+   */
+  public boolean checkException(IOException ie) {
+    if (exceptionMsgRegex != null) {
+      String msg = ie.getMessage();
+      if (msg == null || !msg.matches(exceptionMsgRegex)) {
+        // for exception tracking purposes, if the exception didn't
+        // match the one we are looking for consider it a successful
+        // request
+        this.success();
+        return false;
+      }
+    }
+    if (exceptionStackRegex != null && !checkStackException(ie)) {
+      this.success();
+      return false;
+    }
+    this.exception();
+    if (shuffleExceptionLimit > 0
+        && this.getPercentExceptions() >= shuffleExceptionLimit) {
+      LOG.fatal("************************************************************\n"
+          + "Shuffle exception count is greater than the fatal limit: "
+          + shuffleExceptionLimit
+          + "Aborting JVM.\n"
+          + "************************************************************");
+      doAbort();
+    }
+
+    return true;
+  }
+
+  private boolean checkStackException(IOException ie) {
+    StackTraceElement[] stack = ie.getStackTrace();
+
+    for (StackTraceElement elem : stack) {
+      String stacktrace = elem.toString();
+      if (stacktrace.matches(exceptionStackRegex)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  protected void doAbort() {
+    System.exit(1);
+  }
+
+  private void incrIndex() {
+    if (index == (size - 1)) {
+      index = 0;
+    } else {
+      index++;
+    }
+  }
+
+}

+ 35 - 39
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -411,6 +411,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
 
   private ShuffleServerInstrumentation shuffleServerMetrics;
 
+  private ShuffleExceptionTracker shuffleExceptionTracking;
+
   private TaskTrackerInstrumentation myInstrumentation = null;
 
   public TaskTrackerInstrumentation getTaskTrackerInstrumentation() {
@@ -1467,9 +1469,33 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
       conf.get("mapreduce.reduce.shuffle.catch.exception.stack.regex");
     String exceptionMsgRegex =
       conf.get("mapreduce.reduce.shuffle.catch.exception.message.regex");
-
-    server.setAttribute("exceptionStackRegex", exceptionStackRegex);
-    server.setAttribute("exceptionMsgRegex", exceptionMsgRegex);
+    // Percent of shuffle exceptions (out of sample size) seen before it's
+    // fatal - acceptable values are from 0 to 1.0, 0 disables the check.
+    // ie. 0.3 = 30% of the last X number of requests matched the exception,
+    // so abort.
+    float shuffleExceptionLimit =
+      conf.getFloat(
+          "mapreduce.reduce.shuffle.catch.exception.percent.limit.fatal", 0);
+    if ((shuffleExceptionLimit > 1) || (shuffleExceptionLimit < 0)) {
+      throw new IllegalArgumentException(
+          "mapreduce.reduce.shuffle.catch.exception.percent.limit.fatal "
+              + " must be between 0 and 1.0");
+    }
+
+    // The number of trailing requests we track, used for the fatal
+    // limit calculation
+    int shuffleExceptionSampleSize =
+      conf.getInt("mapreduce.reduce.shuffle.catch.exception.sample.size", 1000);
+    if (shuffleExceptionSampleSize <= 0) {
+      throw new IllegalArgumentException(
+          "mapreduce.reduce.shuffle.catch.exception.sample.size "
+              + " must be greater than 0");
+    }
+    shuffleExceptionTracking =
+      new ShuffleExceptionTracker(shuffleExceptionSampleSize, exceptionStackRegex,
+          exceptionMsgRegex, shuffleExceptionLimit );
+
+    server.setAttribute("shuffleExceptionTracking", shuffleExceptionTracking);
 
     server.addInternalServlet("mapOutput", "/mapOutput", MapOutputServlet.class);
     server.addServlet("taskLog", "/tasklog", TaskLogServlet.class);
@@ -3796,10 +3822,8 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         (ShuffleServerInstrumentation) context.getAttribute("shuffleServerMetrics");
       TaskTracker tracker = 
         (TaskTracker) context.getAttribute("task.tracker");
-      String exceptionStackRegex =
-        (String) context.getAttribute("exceptionStackRegex");
-      String exceptionMsgRegex =
-        (String) context.getAttribute("exceptionMsgRegex");
+      ShuffleExceptionTracker shuffleExceptionTracking =
+        (ShuffleExceptionTracker) context.getAttribute("shuffleExceptionTracking");
 
       verifyRequest(request, response, tracker, jobId);
 
@@ -3912,7 +3936,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
                            ") failed :\n"+
                            StringUtils.stringifyException(ie));
         log.warn(errorMsg);
-        checkException(ie, exceptionMsgRegex, exceptionStackRegex, shuffleMetrics);
+        if (shuffleExceptionTracking.checkException(ie)) {
+          shuffleMetrics.exceptionsCaught();
+        }
         if (isInputException) {
           tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg);
         }
@@ -3933,40 +3959,10 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
         }
       }
       outStream.close();
+      shuffleExceptionTracking.success();
       shuffleMetrics.successOutput();
     }
     
-    protected void checkException(IOException ie, String exceptionMsgRegex,
-        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
-      // parse exception to see if it looks like a regular expression you
-      // configure. If both msgRegex and StackRegex set then make sure both
-      // match, otherwise only the one set has to match.
-      if (exceptionMsgRegex != null) {
-        String msg = ie.getMessage();
-        if (msg == null || !msg.matches(exceptionMsgRegex)) {
-          return;
-        }
-      }
-      if (exceptionStackRegex != null
-          && !checkStackException(ie, exceptionStackRegex)) {
-        return;
-      }
-      shuffleMetrics.exceptionsCaught();
-    }
-
-    private boolean checkStackException(IOException ie,
-        String exceptionStackRegex) {
-      StackTraceElement[] stack = ie.getStackTrace();
-
-      for (StackTraceElement elem : stack) {
-        String stacktrace = elem.toString();
-        if (stacktrace.matches(exceptionStackRegex)) {
-          return true;
-        }
-      }
-      return false;
-    }
-
 
     /**
      * verify that request has correct HASH for the url

+ 5 - 0
src/test/findbugsExcludeFile.xml

@@ -132,4 +132,9 @@
        <Method name="run" />
        <Bug pattern="DM_EXIT" />
     </Match>
+     <Match>
+       <Class name="org.apache.hadoop.mapred.ShuffleExceptionTracker" />
+       <Method name="doAbort" />
+       <Bug pattern="DM_EXIT" />
+    </Match>
 </FindBugsFilter>

+ 156 - 49
src/test/org/apache/hadoop/mapred/TestShuffleExceptionCount.java

@@ -17,104 +17,197 @@
  */
 package org.apache.hadoop.mapred;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import static org.apache.hadoop.test.MetricsAsserts.*;
 
 import org.junit.Test;
 
 public class TestShuffleExceptionCount {
 
-  public static class TestMapOutputServlet extends TaskTracker.MapOutputServlet {
+  static boolean abortCalled = false;
+  private final float epsilon = 1e-5f;
+
+  public static class TestShuffleExceptionTracker extends ShuffleExceptionTracker {
+    private static final long serialVersionUID = 1L;
 
-    public void checkException(IOException ie, String exceptionMsgRegex,
-        String exceptionStackRegex, ShuffleServerInstrumentation shuffleMetrics) {
-      super.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-          shuffleMetrics);
+    TestShuffleExceptionTracker(int size, String exceptionStackRegex,
+        String exceptionMsgRegex, float shuffleExceptionLimit) {
+      super(size, exceptionStackRegex,
+          exceptionMsgRegex, shuffleExceptionLimit);
     }
 
+    protected void doAbort() {
+      abortCalled = true;
+  }
   }
 
   @Test
   public void testCheckException() throws IOException, InterruptedException {
-    TestMapOutputServlet testServlet = new TestMapOutputServlet();
-    JobConf conf = new JobConf();
-    conf.setUser("testuser");
-    conf.setJobName("testJob");
-    conf.setSessionId("testSession");
-
-    TaskTracker tt = new TaskTracker();
-    tt.setConf(conf);
-    ShuffleServerInstrumentation shuffleMetrics =
-      ShuffleServerInstrumentation.create(tt);
 
     // first test with only MsgRegex set but doesn't match
     String exceptionMsgRegex = "Broken pipe";
     String exceptionStackRegex = null;
+    TestShuffleExceptionTracker shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
     IOException ie = new IOException("EOFException");
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    MetricsRecordBuilder rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 0, rb);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with only MsgRegex set that does match
     ie = new IOException("Broken pipe");
     exceptionStackRegex = null;
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 1, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with neither set, make sure incremented
     exceptionMsgRegex = null;
     exceptionStackRegex = null;
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 2, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with only StackRegex set doesn't match
     exceptionMsgRegex = null;
     exceptionStackRegex = ".*\\.doesnt\\$SelectSet\\.wakeup.*";
     ie.setStackTrace(constructStackTrace());
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 2, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with only StackRegex set does match
     exceptionMsgRegex = null;
     exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 3, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with both regex set and matches
     exceptionMsgRegex = "Broken pipe";
     ie.setStackTrace(constructStackTraceTwo());
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 4, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with both regex set and only msg matches
     exceptionStackRegex = ".*[1-9]+BOGUSREGEX";
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 4, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
 
     // test with both regex set and only stack matches
     exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
     exceptionMsgRegex = "EOFException";
-    testServlet.checkException(ie, exceptionMsgRegex, exceptionStackRegex,
-        shuffleMetrics);
-    rb = getMetrics(shuffleMetrics);
-    assertCounter("shuffle_exceptions_caught", 4, rb);
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+
+    exceptionMsgRegex = "Broken pipe";
+    ie.setStackTrace(constructStackTraceTwo());
+    shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+  }
+
+  @Test
+  public void testExceptionCount() {
+    String exceptionMsgRegex = "Broken pipe";
+    String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    IOException ie = new IOException("Broken pipe");
+    ie.setStackTrace(constructStackTraceTwo());
+
+    TestShuffleExceptionTracker shuffleExceptionTracker = new TestShuffleExceptionTracker(
+        10, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    ie.setStackTrace(constructStackTraceThree());
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 1 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    ie.setStackTrace(constructStackTrace());
+    shuffleExceptionTracker.checkException(ie);
+    assertFalse("abort called when set to off", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 2 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
+
+    shuffleExceptionTracker.checkException(ie);
+    assertTrue("abort not called", abortCalled);
+    assertEquals("shuffleExceptionCount wrong", (float) 3 / (float) 10,
+        shuffleExceptionTracker.getPercentExceptions(), epsilon);
 
   }
 
+  @Test
+  public void testShuffleExceptionTrailing() {
+    String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    String exceptionMsgRegex = "Broken pipe";
+    int size = 5;
+    ShuffleExceptionTracker tracker = new ShuffleExceptionTracker(
+        size, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    assertEquals(size, tracker.getNumRequests());
+    assertEquals(0, tracker.getPercentExceptions(), 0);
+    tracker.success();
+    assertEquals(0, tracker.getPercentExceptions(), 0);
+    tracker.exception();
+    assertEquals((float) 1 / (float) size, tracker.getPercentExceptions(), epsilon);
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 3 / (float) size, tracker.getPercentExceptions(), epsilon);
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 5 / (float) size, tracker.getPercentExceptions(), epsilon);
+    // make sure we push out old ones
+    tracker.success();
+    tracker.success();
+    assertEquals((float) 3 / (float) size, tracker.getPercentExceptions(), epsilon);
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 5 / (float) size, tracker.getPercentExceptions(), epsilon);
+  }
+
+  @Test
+  public void testShuffleExceptionTrailingSize() {
+    String exceptionStackRegex = ".*\\.SelectorManager\\$SelectSet\\.wakeup.*";
+    String exceptionMsgRegex = "Broken pipe";
+    int size = 1000;
+    ShuffleExceptionTracker tracker = new ShuffleExceptionTracker(
+        size, exceptionStackRegex, exceptionMsgRegex, 0.3f);
+    assertEquals(size, tracker.getNumRequests());
+    tracker.success();
+    tracker.success();
+    tracker.exception();
+    tracker.exception();
+    assertEquals((float) 2 / (float) size, tracker.getPercentExceptions(),
+        epsilon);
+  }
+
+
   /*
    * Construction exception like:
    * java.io.IOException: Broken pipe at
@@ -174,4 +267,18 @@ public class TestShuffleExceptionCount {
     return stack;
   }
 
+  /*
+   * java.io.IOException: Broken pipe at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(Native Method) at
+   * sun.nio.ch.EPollArrayWrapper.interrupt(EPollArrayWrapper.java:256) at
+   * sun.nio.ch.EPollSelectorImpl.wakeup(EPollSelectorImpl.java:175) at
+   */
+  private StackTraceElement[] constructStackTraceThree() {
+    StackTraceElement[] stack = new StackTraceElement[3];
+    stack[0] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "", -2);
+    stack[1] = new StackTraceElement("sun.nio.ch.EPollArrayWrapper", "interrupt", "EPollArrayWrapper.java", 256);
+    stack[2] = new StackTraceElement("sun.nio.ch.EPollSelectorImpl", "wakeup", "EPollSelectorImpl.java", 175);
+
+    return stack;
+}
 }