ソースを参照

MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk. Contributed by Maysam Yabandeh

Jason Lowe 9 年 前
コミット
cb26cd4bee

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -399,6 +399,9 @@ Release 2.8.0 - UNRELEASED
     MAPREDUCE-6479. Add missing mapred job command options in mapreduce
     document. (nijel via aajisaka)
 
+    MAPREDUCE-6489. Fail fast rogue tasks that write too much to local disk
+    (Maysam Yabandeh via jlowe)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6376. Add avro binary support for jhist files (Ray Chiang via

+ 61 - 9
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/Task.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem.Statistics;
 import org.apache.hadoop.io.BytesWritable;
@@ -64,6 +65,7 @@ import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
 import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -730,11 +732,49 @@ abstract public class Task implements Writable, Configurable {
       } else {
         return split;
       }
-    }  
-    /** 
-     * The communication thread handles communication with the parent (Task Tracker). 
-     * It sends progress updates if progress has been made or if the task needs to 
-     * let the parent know that it's alive. It also pings the parent to see if it's alive. 
+    }
+
+    /**
+     * exception thrown when the task exceeds some configured limits.
+     */
+    public class TaskLimitException extends IOException {
+      public TaskLimitException(String str) {
+        super(str);
+      }
+    }
+
+    /**
+     * check the counters to see whether the task has exceeded any configured
+     * limits.
+     * @throws TaskLimitException
+     */
+    protected void checkTaskLimits() throws TaskLimitException {
+      // check the limit for writing to local file system
+      long limit = conf.getLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES,
+              MRJobConfig.DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES);
+      if (limit >= 0) {
+        Counters.Counter localWritesCounter = null;
+        try {
+          LocalFileSystem localFS = FileSystem.getLocal(conf);
+          localWritesCounter = counters.findCounter(localFS.getScheme(),
+                  FileSystemCounter.BYTES_WRITTEN);
+        } catch (IOException e) {
+          LOG.warn("Could not get LocalFileSystem BYTES_WRITTEN counter");
+        }
+        if (localWritesCounter != null
+                && localWritesCounter.getCounter() > limit) {
+          throw new TaskLimitException("too much write to local file system." +
+                  " current value is " + localWritesCounter.getCounter() +
+                  " the limit is " + limit);
+        }
+      }
+    }
+
+    /**
+     * The communication thread handles communication with the parent (Task
+     * Tracker). It sends progress updates if progress has been made or if
+     * the task needs to let the parent know that it's alive. It also pings
+     * the parent to see if it's alive.
      */
     public void run() {
       final int MAX_RETRIES = 3;
@@ -765,8 +805,9 @@ abstract public class Task implements Writable, Configurable {
           if (sendProgress) {
             // we need to send progress update
             updateCounters();
+            checkTaskLimits();
             taskStatus.statusUpdate(taskProgress.get(),
-                                    taskProgress.toString(), 
+                                    taskProgress.toString(),
                                     counters);
             amFeedback = umbilical.statusUpdate(taskId, taskStatus);
             taskFound = amFeedback.getTaskFound();
@@ -797,10 +838,21 @@ abstract public class Task implements Writable, Configurable {
                 mustPreempt.get() + " given " + amFeedback.getPreemption() +
                 " for "+ taskId + " task status: " +taskStatus.getPhase());
           }
-          sendProgress = resetProgressFlag(); 
+          sendProgress = resetProgressFlag();
           remainingRetries = MAX_RETRIES;
-        } 
-        catch (Throwable t) {
+        } catch (TaskLimitException e) {
+          String errMsg = "Task exceeded the limits: " +
+                  StringUtils.stringifyException(e);
+          LOG.fatal(errMsg);
+          try {
+            umbilical.fatalError(taskId, errMsg);
+          } catch (IOException ioe) {
+            LOG.fatal("Failed to update failure diagnosis", ioe);
+          }
+          LOG.fatal("Killing " + taskId);
+          resetDoneFlag();
+          ExitUtil.terminate(69);
+        } catch (Throwable t) {
           LOG.info("Communication exception: " + StringUtils.stringifyException(t));
           remainingRetries -=1;
           if (remainingRetries == 0) {

+ 5 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java

@@ -52,6 +52,11 @@ public interface MRJobConfig {
 
   public static final String TASK_CLEANUP_NEEDED = "mapreduce.job.committer.task.cleanup.needed";
 
+  public static final String TASK_LOCAL_WRITE_LIMIT_BYTES =
+          "mapreduce.task.local-fs.write-limit.bytes";
+  // negative values disable the limit
+  public static final long DEFAULT_TASK_LOCAL_WRITE_LIMIT_BYTES = -1;
+
   public static final String TASK_PROGRESS_REPORT_INTERVAL =
       "mapreduce.task.progress-report.interval";
   /** The number of milliseconds between progress reports. */

+ 12 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml

@@ -1791,4 +1791,16 @@
   default is 128</description>
 </property>
 
+<property>
+  <name>mapreduce.task.local-fs.write-limit.bytes</name>
+  <value>-1</value>
+  <description>Limit on the byte written to the local file system by each task.
+  This limit only applies to writes that go through the Hadoop filesystem APIs
+  within the task process (i.e.: writes that will update the local filesystem's
+  BYTES_WRITTEN counter). It does not cover other writes such as logging,
+  sideband writes from subprocesses (e.g.: streaming jobs), etc.
+  Negative values disable the limit.
+  default is -1</description>
+</property>
+
 </configuration>

+ 80 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestTaskProgressReporter.java

@@ -19,16 +19,28 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
+import java.util.Random;
 
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.ipc.ProtocolSignature;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.checkpoint.TaskCheckpointID;
+import org.apache.hadoop.util.ExitUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class TestTaskProgressReporter {
   private static int statusUpdateTimes = 0;
+
+  // set to true if the thread is existed with ExitUtil.terminate
+  volatile boolean threadExited = false;
+
+  final static int LOCAL_BYTES_WRITTEN = 1024;
+
   private FakeUmbilical fakeUmbilical = new FakeUmbilical();
 
   private static class DummyTask extends Task {
@@ -133,13 +145,22 @@ public class TestTaskProgressReporter {
   }
 
   private class DummyTaskReporter extends Task.TaskReporter {
+    volatile boolean taskLimitIsChecked = false;
+
     public DummyTaskReporter(Task task) {
       task.super(task.getProgress(), fakeUmbilical);
     }
+
     @Override
     public void setProgress(float progress) {
       super.setProgress(progress);
     }
+
+    @Override
+    protected void checkTaskLimits() throws TaskLimitException {
+      taskLimitIsChecked = true;
+      super.checkTaskLimits();
+    }
   }
 
   @Test (timeout=10000)
@@ -157,4 +178,63 @@ public class TestTaskProgressReporter {
     t.join();
     Assert.assertEquals(statusUpdateTimes, 2);
   }
+
+  @Test(timeout=10000)
+  public void testBytesWrittenRespectingLimit() throws Exception {
+    // add 1024 to the limit to account for writes not controlled by the test
+    testBytesWrittenLimit(LOCAL_BYTES_WRITTEN + 1024, false);
+  }
+
+  @Test(timeout=10000)
+  public void testBytesWrittenExceedingLimit() throws Exception {
+    testBytesWrittenLimit(LOCAL_BYTES_WRITTEN - 1, true);
+  }
+
+  /**
+   * This is to test the limit on BYTES_WRITTEN. The test is limited in that
+   * the check is done only once at the first loop of TaskReport#run.
+   * @param limit the limit on BYTES_WRITTEN in local file system
+   * @param failFast should the task fail fast with such limit?
+   * @throws Exception
+   */
+  public void testBytesWrittenLimit(long limit, boolean failFast)
+          throws Exception {
+    ExitUtil.disableSystemExit();
+    threadExited = false;
+    Thread.UncaughtExceptionHandler h = new Thread.UncaughtExceptionHandler() {
+      public void uncaughtException(Thread th, Throwable ex) {
+        System.out.println("Uncaught exception: " + ex);
+        if (ex instanceof ExitUtil.ExitException) {
+          threadExited = true;
+        }
+      }
+    };
+    JobConf conf = new JobConf();
+    // To disable task reporter sleeping
+    conf.getLong(MRJobConfig.TASK_PROGRESS_REPORT_INTERVAL, 0);
+    conf.setLong(MRJobConfig.TASK_LOCAL_WRITE_LIMIT_BYTES, limit);
+    LocalFileSystem localFS = FileSystem.getLocal(conf);
+    Path tmpPath = new Path("/tmp/testBytesWrittenLimit-tmpFile-"
+            + new Random(System.currentTimeMillis()).nextInt());
+    FSDataOutputStream out = localFS.create(tmpPath, true);
+    out.write(new byte[LOCAL_BYTES_WRITTEN]);
+    out.close();
+
+    Task task = new DummyTask();
+    task.setConf(conf);
+    DummyTaskReporter reporter = new DummyTaskReporter(task);
+    Thread t = new Thread(reporter);
+    t.setUncaughtExceptionHandler(h);
+    reporter.setProgressFlag();
+
+    t.start();
+    while (!reporter.taskLimitIsChecked) {
+      Thread.yield();
+    }
+
+    task.setTaskDone();
+    reporter.resetDoneFlag();
+    t.join();
+    Assert.assertEquals(failFast, threadExited);
+  }
 }