浏览代码

Merge -r 736800:736801 from trunk onto 0.20 branch. Fixes HADOOP-4939.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@736802 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 年之前
父节点
当前提交
0d38812066

+ 3 - 0
CHANGES.txt

@@ -311,6 +311,9 @@ Release 0.20.0 - Unreleased
     HADOOP-4828. Updates documents to do with configuration (HADOOP-4631).
     (Sharad Agarwal via ddas)
 
+    HADOOP-4939. Adds a test that would inject random failures for tasks in 
+    large jobs and would also inject TaskTracker failures. (ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

+ 16 - 0
src/core/org/apache/hadoop/util/Shell.java

@@ -334,7 +334,23 @@ abstract public class Shell {
    * @return the output of the executed command.
    */
   public static String execCommand(String ... cmd) throws IOException {
+    return execCommand(null, cmd);
+  }
+  
+  /** 
+   * Static method to execute a shell command. 
+   * Covers most of the simple cases without requiring the user to implement  
+   * the <code>Shell</code> interface.
+   * @param env the map of environment key=value
+   * @param cmd shell command to execute.
+   * @return the output of the executed command.
+   */
+  public static String execCommand(Map<String,String> env, String ... cmd) 
+  throws IOException {
     ShellCommandExecutor exec = new ShellCommandExecutor(cmd);
+    if (env != null) {
+      exec.setEnvironment(env);
+    }
     exec.execute();
     return exec.getOutput();
   }

+ 21 - 4
src/mapred/org/apache/hadoop/mapred/ClusterStatus.java

@@ -61,6 +61,7 @@ public class ClusterStatus implements Writable {
   private Collection<String> activeTrackers = new ArrayList<String>();
   private Collection<String> blacklistedTrackers = new ArrayList<String>();
   private int numBlacklistedTrackers;
+  private long ttExpiryInterval;
   private int map_tasks;
   private int reduce_tasks;
   private int max_map_tasks;
@@ -85,7 +86,8 @@ public class ClusterStatus implements Writable {
   @Deprecated
   ClusterStatus(int trackers, int maps, int reduces, int maxMaps,
                 int maxReduces, JobTracker.State state) {
-    this(trackers, 0, maps, reduces, maxMaps, maxReduces, state);
+    this(trackers, 0, JobTracker.TASKTRACKER_EXPIRY_INTERVAL, maps, reduces,
+        maxMaps, maxReduces, state);
   }
   
   /**
@@ -93,16 +95,19 @@ public class ClusterStatus implements Writable {
    * 
    * @param trackers no. of tasktrackers in the cluster
    * @param blacklists no of blacklisted task trackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
    * @param maps no. of currently running map-tasks in the cluster
    * @param reduces no. of currently running reduce-tasks in the cluster
    * @param maxMaps the maximum no. of map tasks in the cluster
    * @param maxReduces the maximum no. of reduce tasks in the cluster
    * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
    */
-  ClusterStatus(int trackers, int blacklists, int maps, int reduces,
+  ClusterStatus(int trackers, int blacklists, long ttExpiryInterval, 
+                int maps, int reduces,
                 int maxMaps, int maxReduces, JobTracker.State state) {
     numActiveTrackers = trackers;
     numBlacklistedTrackers = blacklists;
+    this.ttExpiryInterval = ttExpiryInterval;
     map_tasks = maps;
     reduce_tasks = reduces;
     max_map_tasks = maxMaps;
@@ -117,6 +122,7 @@ public class ClusterStatus implements Writable {
    * 
    * @param activeTrackers active tasktrackers in the cluster
    * @param blacklistedTrackers blacklisted tasktrackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
    * @param maps no. of currently running map-tasks in the cluster
    * @param reduces no. of currently running reduce-tasks in the cluster
    * @param maxMaps the maximum no. of map tasks in the cluster
@@ -125,10 +131,11 @@ public class ClusterStatus implements Writable {
    */
   ClusterStatus(Collection<String> activeTrackers, 
       Collection<String> blacklistedTrackers,
+      long ttExpiryInterval,
       int maps, int reduces, int maxMaps, int maxReduces, 
       JobTracker.State state) {
-    this(activeTrackers.size(), blacklistedTrackers.size(), maps, reduces,
-        maxMaps, maxReduces, state);
+    this(activeTrackers.size(), blacklistedTrackers.size(), ttExpiryInterval, 
+        maps, reduces, maxMaps, maxReduces, state);
     this.activeTrackers = activeTrackers;
     this.blacklistedTrackers = blacklistedTrackers;
   }
@@ -170,6 +177,14 @@ public class ClusterStatus implements Writable {
     return numBlacklistedTrackers;
   }
   
+  /**
+   * Get the tasktracker expiry interval for the cluster
+   * @return the expiry interval in msec
+   */
+  public long getTTExpiryInterval() {
+    return ttExpiryInterval;
+  }
+  
   /**
    * Get the number of currently running map tasks in the cluster.
    * 
@@ -255,6 +270,7 @@ public class ClusterStatus implements Writable {
         Text.writeString(out, tracker);
       }
     }
+    out.writeLong(ttExpiryInterval);
     out.writeInt(map_tasks);
     out.writeInt(reduce_tasks);
     out.writeInt(max_map_tasks);
@@ -281,6 +297,7 @@ public class ClusterStatus implements Writable {
         blacklistedTrackers.add(name);
       }
     }
+    ttExpiryInterval = in.readLong();
     map_tasks = in.readInt();
     reduce_tasks = in.readInt();
     max_map_tasks = in.readInt();

+ 14 - 1
src/mapred/org/apache/hadoop/mapred/JobClient.java

@@ -1193,7 +1193,20 @@ public class JobClient extends Configured implements MRConstants, Tool  {
    * @throws IOException
    */
   public ClusterStatus getClusterStatus() throws IOException {
-    return jobSubmitClient.getClusterStatus(false);
+    return getClusterStatus(false);
+  }
+
+  /**
+   * Get status information about the Map-Reduce cluster.
+   *  
+   * @param  detailed if true then get a detailed status including the
+   *         tracker names
+   * @return the status information about the Map-Reduce cluster as an object
+   *         of {@link ClusterStatus}.
+   * @throws IOException
+   */
+  public ClusterStatus getClusterStatus(boolean detailed) throws IOException {
+    return jobSubmitClient.getClusterStatus(detailed);
   }
     
 

+ 4 - 2
src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java

@@ -55,9 +55,11 @@ interface JobSubmissionProtocol extends VersionedProtocol {
    *             for HADOOP-4305
    * Version 19: Modified TaskReport to have TIP status and modified the
    *             method getClusterStatus() to take a boolean argument
-   *             for HADOOP-4807                     
+   *             for HADOOP-4807
+   * Version 20: Modified ClusterStatus to have the tasktracker expiry
+   *             interval for HADOOP-4939                     
    */
-  public static final long versionID = 19L;
+  public static final long versionID = 20L;
 
   /**
    * Allocate a name for the job.

+ 2 - 0
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -2711,6 +2711,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         List<List<String>> trackerNames = taskTrackerNames();
         return new ClusterStatus(trackerNames.get(0),
             trackerNames.get(1),
+            TASKTRACKER_EXPIRY_INTERVAL,
             totalMaps,
             totalReduces,
             totalMapTaskCapacity,
@@ -2720,6 +2721,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         return new ClusterStatus(taskTrackers.size() - 
             getBlacklistedTrackerCount(),
             getBlacklistedTrackerCount(),
+            TASKTRACKER_EXPIRY_INTERVAL,
             totalMaps,
             totalReduces,
             totalMapTaskCapacity,

+ 1 - 1
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -386,7 +386,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
   }
   
   public ClusterStatus getClusterStatus(boolean detailed) {
-    return new ClusterStatus(1, 0, map_tasks, reduce_tasks, 1, 1, 
+    return new ClusterStatus(1, 0, 0, map_tasks, reduce_tasks, 1, 1, 
                              JobTracker.State.RUNNING);
   }
 

+ 495 - 0
src/test/org/apache/hadoop/mapred/ReliabilityTest.java

@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * This class tests reliability of the framework in the face of failures of 
+ * both tasks and tasktrackers. Steps:
+ * 1) Get the cluster status
+ * 2) Get the number of slots in the cluster
+ * 3) Spawn a sleepjob that occupies the entire cluster (with two waves of maps)
+ * 4) Get the list of running attempts for the job
+ * 5) Fail a few of them
+ * 6) Now fail a few trackers (ssh)
+ * 7) Job should run to completion
+ * 8) The above is repeated for the Sort suite of job (randomwriter, sort,
+ *    validator). All jobs must complete, and finally, the sort validation
+ *    should succeed.
+ * To run the test:
+ * ./bin/hadoop --config <config> jar
+ *   build/hadoop-<version>-test.jar MRReliabilityTest -libjars
+ *   build/hadoop-<version>-examples.jar [-scratchdir <dir>]"
+ *   
+ *   The scratchdir is optional and by default the current directory on the client
+ *   will be used as the scratch space. Note that password-less SSH must be set up 
+ *   between the client machine from where the test is submitted, and the cluster 
+ *   nodes where the test runs.
+ */
+
+public class ReliabilityTest extends Configured implements Tool {
+
+  private String dir;
+  private static final Log LOG = LogFactory.getLog(ReliabilityTest.class); 
+
+  private void displayUsage() {
+    LOG.info("This must be run in only the distributed mode " +
+    		"(LocalJobRunner not supported).\n\tUsage: MRReliabilityTest " +
+    		"-libjars <path to hadoop-examples.jar> [-scratchdir <dir>]" +
+    		"\n[-scratchdir] points to a scratch space on this host where temp" +
+    		" files for this test will be created. Defaults to current working" +
+    		" dir. \nPasswordless SSH must be set up between this host and the" +
+    		" nodes which the test is going to use");
+    System.exit(-1);
+  }
+  
+  public int run(String[] args) throws Exception {
+    Configuration conf = getConf();
+    if ("local".equals(conf.get("mapred.job.tracker", "local"))) {
+      displayUsage();
+    }
+    String[] otherArgs = 
+      new GenericOptionsParser(conf, args).getRemainingArgs();
+    if (otherArgs.length == 2) {
+      if (otherArgs[0].equals("-scratchdir")) {
+        dir = otherArgs[1];
+      } else {
+        displayUsage();
+      }
+    }
+    else if (otherArgs.length == 0) {
+      dir = System.getProperty("user.dir");
+    } else {
+      displayUsage();
+    }
+    
+    //to protect against the case of jobs failing even when multiple attempts
+    //fail, set some high values for the max attempts
+    conf.setInt("mapred.map.max.attempts", 10);
+    conf.setInt("mapred.reduce.max.attempts", 10);
+    runSleepJobTest(new JobClient(new JobConf(conf)), conf);
+    runSortJobTests(new JobClient(new JobConf(conf)), conf);
+    return 0;
+  }
+  
+  private void runSleepJobTest(final JobClient jc, final Configuration conf) 
+  throws Exception {
+    ClusterStatus c = jc.getClusterStatus();
+    int maxMaps = c.getMaxMapTasks() * 2;
+    int maxReduces = maxMaps;
+    int mapSleepTime = (int)c.getTTExpiryInterval();
+    int reduceSleepTime = mapSleepTime;
+    String[] sleepJobArgs = new String[] {     
+        "-m", Integer.toString(maxMaps), 
+        "-r", Integer.toString(maxReduces),
+        "-mt", Integer.toString(mapSleepTime),
+        "-rt", Integer.toString(reduceSleepTime)};
+    runTest(jc, conf, "org.apache.hadoop.examples.SleepJob", sleepJobArgs, 
+        new KillTaskThread(jc, 2, 0.2f, false, 2),
+        new KillTrackerThread(jc, 2, 0.4f, false, 1));
+    LOG.info("SleepJob done");
+  }
+  
+  private void runSortJobTests(final JobClient jc, final Configuration conf) 
+  throws Exception {
+    String inputPath = "my_reliability_test_input";
+    String outputPath = "my_reliability_test_output";
+    FileSystem fs = jc.getFs();
+    fs.delete(new Path(inputPath), true);
+    fs.delete(new Path(outputPath), true);
+    runRandomWriterTest(jc, conf, inputPath);
+    runSortTest(jc, conf, inputPath, outputPath);
+    runSortValidatorTest(jc, conf, inputPath, outputPath);
+  }
+  
+  private void runRandomWriterTest(final JobClient jc, 
+      final Configuration conf, final String inputPath) 
+  throws Exception {
+    runTest(jc, conf, "org.apache.hadoop.examples.RandomWriter", 
+        new String[]{inputPath}, 
+        null, new KillTrackerThread(jc, 0, 0.4f, false, 1));
+    LOG.info("RandomWriter job done");
+  }
+  
+  private void runSortTest(final JobClient jc, final Configuration conf,
+      final String inputPath, final String outputPath) 
+  throws Exception {
+    runTest(jc, conf, "org.apache.hadoop.examples.Sort", 
+        new String[]{inputPath, outputPath},
+        new KillTaskThread(jc, 2, 0.2f, false, 2),
+        new KillTrackerThread(jc, 2, 0.8f, false, 1));
+    LOG.info("Sort job done");
+  }
+  
+  private void runSortValidatorTest(final JobClient jc, 
+      final Configuration conf, final String inputPath, final String outputPath)
+  throws Exception {
+    runTest(jc, conf, "org.apache.hadoop.mapred.SortValidator", new String[] {
+        "-sortInput", inputPath, "-sortOutput", outputPath},
+        new KillTaskThread(jc, 2, 0.2f, false, 1),
+        new KillTrackerThread(jc, 2, 0.8f, false, 1));  
+    LOG.info("SortValidator job done");    
+  }
+  
+  private String normalizeCommandPath(String command) {
+    final String hadoopHome;
+    if ((hadoopHome = System.getenv("HADOOP_HOME")) != null) {
+      command = hadoopHome + "/" + command;
+    }
+    return command;
+  }
+  
+  private void checkJobExitStatus(int status, String jobName) {
+    if (status != 0) {
+      LOG.info(jobName + " job failed with status: " + status);
+      System.exit(status);
+    } else {
+      LOG.info(jobName + " done.");
+    }
+  }
+
+  //Starts the job in a thread. It also starts the taskKill/tasktrackerKill
+  //threads.
+  private void runTest(final JobClient jc, final Configuration conf,
+      final String jobClass, final String[] args, KillTaskThread killTaskThread,
+      KillTrackerThread killTrackerThread) throws Exception {
+    int prevJobsNum = jc.getAllJobs().length;
+    Thread t = new Thread("Job Test") {
+      public void run() {
+        try {
+          Class<?> jobClassObj = conf.getClassByName(jobClass);
+          int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()), 
+              args);
+          checkJobExitStatus(status, jobClass);
+        } catch (Exception e) {
+          LOG.fatal("JOB " + jobClass + " failed to run");
+          System.exit(-1);
+        }
+      }
+    };
+    t.setDaemon(true);
+    t.start();
+    JobStatus[] jobs;
+    //get the job ID. This is the job that we just submitted
+    while ((jobs = jc.getAllJobs()).length - prevJobsNum == 0) {
+      LOG.info("Waiting for the job " + jobClass +" to start");
+      Thread.sleep(1000);
+    }
+    JobID jobId = jobs[jobs.length - 1].getJobID();
+    RunningJob rJob = jc.getJob(jobId);
+    while (rJob.getJobState() == JobStatus.PREP) {
+      LOG.info("JobID : " + jobId + " not started RUNNING yet");
+      Thread.sleep(1000);
+      rJob = jc.getJob(jobId);
+    }
+    if (killTaskThread != null) {
+      killTaskThread.setRunningJob(rJob);
+      killTaskThread.start();
+      killTaskThread.join();
+      LOG.info("DONE WITH THE TASK KILL/FAIL TESTS");
+    }
+    if (killTrackerThread != null) {
+      killTrackerThread.setRunningJob(rJob);
+      killTrackerThread.start();
+      killTrackerThread.join();
+      LOG.info("DONE WITH THE TESTS TO DO WITH LOST TASKTRACKERS");
+    }
+    t.join();
+  }
+  
+  private class KillTrackerThread extends Thread {
+    private volatile boolean killed = false;
+    private JobClient jc;
+    private RunningJob rJob;
+    final private int thresholdMultiplier;
+    private float threshold = 0.2f;
+    private boolean onlyMapsProgress;
+    private int numIterations;
+    final private String slavesFile = dir + "/_reliability_test_slaves_file_";
+    final String shellCommand = normalizeCommandPath("bin/slaves.sh");
+    final private String STOP_COMMAND = "ps uwwx | grep java | grep " + 
+    "org.apache.hadoop.mapred.TaskTracker"+ " |" + 
+    " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP";
+    final private String RESUME_COMMAND = "ps uwwx | grep java | grep " + 
+    "org.apache.hadoop.mapred.TaskTracker"+ " |" + 
+    " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT";
+    //Only one instance must be active at any point
+    public KillTrackerThread(JobClient jc, int threshaldMultiplier,
+        float threshold, boolean onlyMapsProgress, int numIterations) {
+      this.jc = jc;
+      this.thresholdMultiplier = threshaldMultiplier;
+      this.threshold = threshold;
+      this.onlyMapsProgress = onlyMapsProgress;
+      this.numIterations = numIterations;
+      setDaemon(true);
+    }
+    public void setRunningJob(RunningJob rJob) {
+      this.rJob = rJob;
+    }
+    public void kill() {
+      killed = true;
+    }
+    public void run() {
+      stopStartTrackers(true);
+      if (!onlyMapsProgress) {
+        stopStartTrackers(false);
+      }
+    }
+    private void stopStartTrackers(boolean considerMaps) {
+      if (considerMaps) {
+        LOG.info("Will STOP/RESUME tasktrackers based on Maps'" +
+                " progress");
+      } else {
+        LOG.info("Will STOP/RESUME tasktrackers based on " +
+                "Reduces' progress");
+      }
+      LOG.info("Initial progress threshold: " + threshold + 
+          ". Threshold Multiplier: " + thresholdMultiplier + 
+          ". Number of iterations: " + numIterations);
+      float thresholdVal = threshold;
+      int numIterationsDone = 0;
+      while (!killed) {
+        try {
+          float progress;
+          if (jc.getJob(rJob.getID()).isComplete() ||
+              numIterationsDone == numIterations) {
+            break;
+          }
+
+          if (considerMaps) {
+            progress = jc.getJob(rJob.getID()).mapProgress();
+          } else {
+            progress = jc.getJob(rJob.getID()).reduceProgress();
+          }
+          if (progress >= thresholdVal) {
+            numIterationsDone++;
+            ClusterStatus c;
+            stopTaskTrackers((c = jc.getClusterStatus(true)));
+            Thread.sleep((int)Math.ceil(1.5 * c.getTTExpiryInterval()));
+            startTaskTrackers();
+            thresholdVal = thresholdVal * thresholdMultiplier;
+          }
+          Thread.sleep(5000);
+        } catch (InterruptedException ie) {
+          killed = true;
+          return;
+        } catch (Exception e) {
+          LOG.fatal(StringUtils.stringifyException(e));
+        }
+      }
+    }
+    private void stopTaskTrackers(ClusterStatus c) throws Exception {
+
+      Collection <String> trackerNames = c.getActiveTrackerNames();
+      ArrayList<String> trackerNamesList = new ArrayList<String>(trackerNames);
+      Collections.shuffle(trackerNamesList);
+
+      int count = 0;
+
+      FileOutputStream fos = new FileOutputStream(new File(slavesFile));
+      LOG.info(new Date() + " Stopping a few trackers");
+
+      for (String tracker : trackerNamesList) {
+        String host = convertTrackerNameToHostName(tracker);
+        LOG.info(new Date() + " Marking tracker on host: " + host);
+        fos.write((host + "\n").getBytes());
+        if (count++ >= trackerNamesList.size()/2) {
+          break;
+        }
+      }
+      fos.close();
+
+      runOperationOnTT("suspend");
+    }
+
+    private void startTaskTrackers() throws Exception {
+      LOG.info(new Date() + " Resuming the stopped trackers");
+      runOperationOnTT("resume");
+      new File(slavesFile).delete();
+    }
+    
+    private void runOperationOnTT(String operation) throws IOException {
+      Map<String,String> hMap = new HashMap<String,String>();
+      hMap.put("HADOOP_SLAVES", slavesFile);
+      StringTokenizer strToken;
+      if (operation.equals("suspend")) {
+        strToken = new StringTokenizer(STOP_COMMAND, " ");
+      } else {
+        strToken = new StringTokenizer(RESUME_COMMAND, " ");
+      }
+      String commandArgs[] = new String[strToken.countTokens() + 1];
+      int i = 0;
+      commandArgs[i++] = shellCommand;
+      while (strToken.hasMoreTokens()) {
+        commandArgs[i++] = strToken.nextToken();
+      }
+      String output = Shell.execCommand(hMap, commandArgs);
+      if (output != null && !output.equals("")) {
+        LOG.info(output);
+      }
+    }
+
+    private String convertTrackerNameToHostName(String trackerName) {
+      // Convert the trackerName to it's host name
+      int indexOfColon = trackerName.indexOf(":");
+      String trackerHostName = (indexOfColon == -1) ? 
+          trackerName : 
+            trackerName.substring(0, indexOfColon);
+      return trackerHostName.substring("tracker_".length());
+    }
+
+  }
+  
+  private class KillTaskThread extends Thread {
+
+    private volatile boolean killed = false;
+    private RunningJob rJob;
+    private JobClient jc;
+    final private int thresholdMultiplier;
+    private float threshold = 0.2f;
+    private boolean onlyMapsProgress;
+    private int numIterations;
+    public KillTaskThread(JobClient jc, int thresholdMultiplier, 
+        float threshold, boolean onlyMapsProgress, int numIterations) {
+      this.jc = jc;
+      this.thresholdMultiplier = thresholdMultiplier;
+      this.threshold = threshold;
+      this.onlyMapsProgress = onlyMapsProgress;
+      this.numIterations = numIterations;
+      setDaemon(true);
+    }
+    public void setRunningJob(RunningJob rJob) {
+      this.rJob = rJob;
+    }
+    public void kill() {
+      killed = true;
+    }
+    public void run() {
+      killBasedOnProgress(true);
+      if (!onlyMapsProgress) {
+        killBasedOnProgress(false);
+      }
+    }
+    private void killBasedOnProgress(boolean considerMaps) {
+      boolean fail = false;
+      if (considerMaps) {
+        LOG.info("Will kill tasks based on Maps' progress");
+      } else {
+        LOG.info("Will kill tasks based on Reduces' progress");
+      }
+      LOG.info("Initial progress threshold: " + threshold + 
+          ". Threshold Multiplier: " + thresholdMultiplier + 
+          ". Number of iterations: " + numIterations);
+      float thresholdVal = threshold;
+      int numIterationsDone = 0;
+      while (!killed) {
+        try {
+          float progress;
+          if (jc.getJob(rJob.getID()).isComplete() || 
+              numIterationsDone == numIterations) {
+            break;
+          }
+          if (considerMaps) {
+            progress = jc.getJob(rJob.getID()).mapProgress();
+          } else {
+            progress = jc.getJob(rJob.getID()).reduceProgress();
+          }
+          if (progress >= thresholdVal) {
+            numIterationsDone++;
+            if (numIterationsDone > 0 && numIterationsDone % 2 == 0) {
+              fail = true; //fail tasks instead of kill
+            }
+            ClusterStatus c = jc.getClusterStatus();
+
+            LOG.info(new Date() + " Killing a few tasks");
+
+            Collection<TaskAttemptID> runningTasks =
+              new ArrayList<TaskAttemptID>();
+            TaskReport mapReports[] = jc.getMapTaskReports(rJob.getID());
+            for (TaskReport mapReport : mapReports) {
+              if (mapReport.getCurrentStatus() == TIPStatus.RUNNING) {
+                runningTasks.addAll(mapReport.getRunningTaskAttempts());
+              }
+            }
+            if (runningTasks.size() > c.getTaskTrackers()/2) {
+              int count = 0;
+              for (TaskAttemptID t : runningTasks) {
+                LOG.info(new Date() + " Killed task : " + t);
+                rJob.killTask(t, fail);
+                if (count++ > runningTasks.size()/2) { //kill 50%
+                  break;
+                }
+              }
+            }
+            runningTasks.clear();
+            TaskReport reduceReports[] = jc.getReduceTaskReports(rJob.getID());
+            for (TaskReport reduceReport : reduceReports) {
+              if (reduceReport.getCurrentStatus() == TIPStatus.RUNNING) {
+                runningTasks.addAll(reduceReport.getRunningTaskAttempts());
+              }
+            }
+            if (runningTasks.size() > c.getTaskTrackers()/2) {
+              int count = 0;
+              for (TaskAttemptID t : runningTasks) {
+                LOG.info(new Date() + " Killed task : " + t);
+                rJob.killTask(t, fail);
+                if (count++ > runningTasks.size()/2) { //kill 50%
+                  break;
+                }
+              }
+            }
+            thresholdVal = thresholdVal * thresholdMultiplier;
+          }
+          Thread.sleep(5000);
+        } catch (InterruptedException ie) {
+          killed = true;
+        } catch (Exception e) {
+          LOG.fatal(StringUtils.stringifyException(e));
+        }
+      }
+    }
+  }
+  
+  public static void main(String args[]) throws Exception {
+    int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args);
+    System.exit(res);
+  }
+}

+ 3 - 1
src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

@@ -135,7 +135,9 @@ public class TestJobQueueTaskScheduler extends TestCase {
     @Override
     public ClusterStatus getClusterStatus() {
       int numTrackers = trackers.size();
-      return new ClusterStatus(numTrackers, 0, maps, reduces,
+      return new ClusterStatus(numTrackers, 0, 
+                               JobTracker.TASKTRACKER_EXPIRY_INTERVAL,
+                               maps, reduces,
                                numTrackers * maxMapTasksPerTracker,
                                numTrackers * maxReduceTasksPerTracker,
                                JobTracker.State.RUNNING);

+ 4 - 0
src/test/org/apache/hadoop/test/AllTestDriver.java

@@ -22,6 +22,7 @@ import org.apache.hadoop.util.ProgramDriver;
 import org.apache.hadoop.mapred.BigMapOutput;
 import org.apache.hadoop.mapred.GenericMRLoadGenerator;
 import org.apache.hadoop.mapred.MRBench;
+import org.apache.hadoop.mapred.ReliabilityTest;
 import org.apache.hadoop.mapred.SortValidator;
 import org.apache.hadoop.mapred.TestMapRed;
 import org.apache.hadoop.mapred.TestSequenceFileInputFormat;
@@ -74,6 +75,9 @@ public class AllTestDriver {
       pgd.addClass("filebench", FileBench.class, "Benchmark SequenceFile(Input|Output)Format (block,record compressed and uncompressed), Text(Input|Output)Format (compressed and uncompressed)");
       pgd.addClass("dfsthroughput", BenchmarkThroughput.class, 
                    "measure hdfs throughput");
+      pgd.addClass("MRReliabilityTest", ReliabilityTest.class,
+          "A program that tests the reliability of the MR framework by " +
+          "injecting faults/failures");
       pgd.driver(argv);
     } catch(Throwable e) {
       e.printStackTrace();