Browse Source

HADOOP-2141. Improves the speculative execution heuristic. The heuristic is currently based on the progress-rates of tasks and the expected time to complete. Also, statistics about trackers are collected, and speculative tasks are not given to the ones deduced to be slow. Contributed by Andy Konwinski and Devaraj Das.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@785065 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
311c480240

+ 6 - 0
CHANGES.txt

@@ -447,6 +447,12 @@ Trunk (unreleased changes)
     HADOOP-5938. Change org.apache.hadoop.mapred.jobcontrol to use new
     api. (Amareshwari Sriramadasu via sharad)
 
+    HADOOP-2141. Improves the speculative execution heuristic. The heuristic
+    is currently based on the progress-rates of tasks and the expected time
+    to complete. Also, statistics about trackers are collected, and speculative
+    tasks are not given to the ones deduced to be slow. 
+    (Andy Konwinski and ddas)
+
   OPTIMIZATIONS
 
     HADOOP-5595. NameNode does not need to run a replicator to choose a

+ 1 - 1
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -585,7 +585,7 @@ class CapacityTaskScheduler extends TaskScheduler {
       for(TaskInProgress tip : tips)  {
         if(tip.isRunning() 
             && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName())) 
-            && tip.hasSpeculativeTask(currentTime, progress)) {
+            && tip.canBeSpeculated(currentTime)) {
           return true;
         }
       }

+ 2 - 2
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -164,7 +164,7 @@ public class TestCapacityScheduler extends TestCase {
     private int speculativeReduceTaskCounter = 0;
     public FakeJobInProgress(JobID jId, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(jId, jobConf);
+      super(jId, jobConf, null);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP);
@@ -381,7 +381,7 @@ public class TestCapacityScheduler extends TestCase {
      *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
      *after the speculative tip has been scheduled.
      */
-    boolean hasSpeculativeTask(long currentTime, double averageProgress) {
+    boolean canBeSpeculated(long currentTime) {
       if(isMap && hasSpeculativeMap) {
         return fakeJob.getJobConf().getMapSpeculativeExecution();
       } 

+ 2 - 3
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java

@@ -33,9 +33,8 @@ public class DefaultTaskSelector extends TaskSelector {
   public int neededSpeculativeMaps(JobInProgress job) {
     int count = 0;
     long time = System.currentTimeMillis();
-    double avgProgress = job.getStatus().mapProgress();
     for (TaskInProgress tip: job.maps) {
-      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+      if (tip.isRunning() && tip.canBeSpeculated(time)) {
         count++;
       }
     }
@@ -48,7 +47,7 @@ public class DefaultTaskSelector extends TaskSelector {
     long time = System.currentTimeMillis();
     double avgProgress = job.getStatus().reduceProgress();
     for (TaskInProgress tip: job.reduces) {
-      if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+      if (tip.isRunning() && tip.canBeSpeculated(time)) {
         count++;
       }
     }

+ 0 - 9
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

@@ -87,15 +87,6 @@ public class FairScheduler extends TaskScheduler {
     double reduceFairShare = 0; // Fair share of reduce slots at last update
   }
   
-  /**
-   * A clock class - can be mocked out for testing.
-   */
-  static class Clock {
-    long getTime() {
-      return System.currentTimeMillis();
-    }
-  }
-  
   public FairScheduler() {
     this(new Clock(), true);
   }

+ 2 - 14
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java

@@ -34,6 +34,7 @@ import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -52,7 +53,7 @@ public class TestFairScheduler extends TestCase {
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+      super(new JobID("test", ++jobCounter), jobConf, null);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus();
@@ -220,19 +221,6 @@ public class TestFairScheduler extends TestCase {
     }
   }
   
-  protected class FakeClock extends FairScheduler.Clock {
-    private long time = 0;
-    
-    public void advance(long millis) {
-      time += millis;
-    }
-
-    @Override
-    long getTime() {
-      return time;
-    }
-  }
-  
   protected JobConf conf;
   protected FairScheduler scheduler;
   private FakeTaskTrackerManager taskTrackerManager;

+ 25 - 0
src/mapred/mapred-default.xml

@@ -501,6 +501,31 @@
   <description>If true, then multiple instances of some reduce tasks 
                may be executed in parallel.</description>
 </property>
+<property>
+  <name>mapred.speculative.execution.speculativeCap</name>
+  <value>0.1</value>
+  <description>The max percent (0-1) of running tasks that
+  can be speculatively re-executed at any time.</description>
+</property>
+ 
+<property>
+  <name>mapred.speculative.execution.slowTaskThreshold</name>
+  <value>1.0</value>The number of standard deviations by which a task's 
+  ave progress-rates must be lower than the average of all running tasks'
+  for the task to be considered too slow.
+  <description>
+  </description>
+</property>
+
+<property>
+  <name>mapred.speculative.execution.slowNodeThreshold</name>
+  <value>1.0</value>
+  <description>The number of standard deviations by which a Task 
+  Tracker's ave map and reduce progress-rates (finishTime-dispatchTime)
+  must be lower than the average of all successful map/reduce task's for
+  the TT to be considered too slow to give a speculative task to.
+  </description>
+</property>
 
 <property>
   <name>mapred.job.reuse.jvm.num.tasks</name>

+ 28 - 0
src/mapred/org/apache/hadoop/mapred/Clock.java

@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A clock class - can be mocked out for testing.
+ */
+class Clock {
+  long getTime() {
+    return System.currentTimeMillis();
+  }
+}

+ 20 - 0
src/mapred/org/apache/hadoop/mapred/ClusterStatus.java

@@ -93,6 +93,16 @@ public class ClusterStatus implements Writable {
   }
 
   /**
+   * Construct a new cluster status.
+   * 
+   * @param trackers no. of tasktrackers in the cluster
+   * @param blacklists no of blacklisted task trackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param maxMaps the maximum no. of map tasks in the cluster
+   * @param maxReduces the maximum no. of reduce tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
    * @param numDecommissionedNodes number of decommission trackers
    */
   ClusterStatus(int trackers, int blacklists, long ttExpiryInterval, 
@@ -133,6 +143,16 @@ public class ClusterStatus implements Writable {
   }
 
   /**
+   * Construct a new cluster status.
+   * 
+   * @param activeTrackers active tasktrackers in the cluster
+   * @param blacklistedTrackers blacklisted tasktrackers in the cluster
+   * @param ttExpiryInterval the tasktracker expiry interval
+   * @param maps no. of currently running map-tasks in the cluster
+   * @param reduces no. of currently running reduce-tasks in the cluster
+   * @param maxMaps the maximum no. of map tasks in the cluster
+   * @param maxReduces the maximum no. of reduce tasks in the cluster
+   * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
    * @param numDecommissionNodes number of decommission trackers
    */
   ClusterStatus(Collection<String> activeTrackers, 

+ 440 - 145
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -21,6 +21,10 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -103,7 +107,7 @@ class JobInProgress {
   private volatile boolean jobFailed = false;
 
   JobPriority priority = JobPriority.NORMAL;
-  final JobTracker jobtracker;
+  protected JobTracker jobtracker;
 
   // NetworkTopology Node to the set of TIPs
   Map<Node, List<TaskInProgress>> nonRunningMapCache;
@@ -129,14 +133,14 @@ class JobInProgress {
   // A list of cleanup tasks for the reduce task attempts, to be launched
   List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
-  private final int maxLevel;
+  private int maxLevel;
 
   /**
    * A special value indicating that 
    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
    * schedule any available map tasks for this job, including speculative tasks.
    */
-  private final int anyCacheLevel;
+  private int anyCacheLevel;
   
   /**
    * A special value indicating that 
@@ -189,9 +193,13 @@ class JobInProgress {
   
   private MetricsRecord jobMetrics;
   
-  // Maximum no. of fetch-failure notifications after which
-  // the map task is killed
+  // Maximum no. of fetch-failure notifications after which map task is killed
   private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
+
+  // Don't lower speculativeCap below one TT's worth (for small clusters)
+  private static final int MIN_SPEC_CAP = 10;
+  
+  private static final float MIN_SLOTS_CAP = 0.01f;
   
   // Map of mapTaskId -> no. of fetch failures
   private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
@@ -199,19 +207,65 @@ class JobInProgress {
 
   private Object schedulingInfo;
 
-  
+  //thresholds for speculative execution
+  private float slowTaskThreshold;
+  private float speculativeCap;
+  private float slowNodeThreshold; //standard deviations
+
+  //Statistics are maintained for a couple of things
+  //mapTaskStats is used for maintaining statistics about
+  //the completion time of map tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics mapTaskStats = new DataStatistics();
+  //reduceTaskStats is used for maintaining statistics about
+  //the completion time of reduce tasks on the trackers. On a per
+  //tracker basis, the mean time for task completion is maintained
+  private DataStatistics reduceTaskStats = new DataStatistics();
+  //trackerMapStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of map tasks
+  private Map<String,DataStatistics> trackerMapStats = 
+    new HashMap<String,DataStatistics>();
+  //trackerReduceStats used to maintain a mapping from the tracker to the
+  //the statistics about completion time of reduce tasks
+  private Map<String,DataStatistics> trackerReduceStats = 
+    new HashMap<String,DataStatistics>();
+  //runningMapStats used to maintain the RUNNING map tasks' statistics 
+  private DataStatistics runningMapTaskStats = new DataStatistics();
+  //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
+  private DataStatistics runningReduceTaskStats = new DataStatistics();
+ 
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
    */
-  protected JobInProgress(JobID jobid, JobConf conf) {
+  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
     this.conf = conf;
     this.jobId = jobid;
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
     this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
     this.anyCacheLevel = this.maxLevel+1;
-    this.jobtracker = null;
+    this.jobtracker = tracker;
     this.restartCount = 0;
+    
+    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.resourceEstimator = new ResourceEstimator(this);
+    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
+    (numMapTasks + numReduceTasks + 10);
+    
+    this.slowTaskThreshold = Math.max(0.0f,
+        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+    this.speculativeCap = conf.getFloat(
+        "mapred.speculative.execution.speculativeCap",0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+
   }
   
   /**
@@ -285,6 +339,19 @@ class JobInProgress {
     this.nonRunningReduces = new LinkedList<TaskInProgress>();    
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
+    
+    this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
+    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
+    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+    this.runningReduces = new LinkedHashSet<TaskInProgress>();
+    this.slowTaskThreshold = Math.max(0.0f,
+        conf.getFloat("mapred.speculative.execution.slowTaskThreshold",1.0f));
+    this.speculativeCap = conf.getFloat(
+        "mapred.speculative.execution.speculativeCap",0.1f);
+    this.slowNodeThreshold = conf.getFloat(
+        "mapred.speculative.execution.slowNodeThreshold",1.0f);
+
   }
 
   /**
@@ -443,7 +510,7 @@ class JobInProgress {
     }
         
     // set the launch time
-    this.launchTime = System.currentTimeMillis();
+    this.launchTime = jobtracker.getClock().getTime();
 
     //
     // Create reduce tasks
@@ -939,9 +1006,9 @@ class JobInProgress {
       LOG.info("Cannot create task split for " + profile.getJobID());
       return null;
     }
-        
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel,
-                                status.mapProgress());
+       
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
+        anyCacheLevel);
     if (target == -1) {
       return null;
     }
@@ -1001,8 +1068,7 @@ class JobInProgress {
       return null;
     }
 
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel, 
-                                status.mapProgress());
+    int target = findNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
     if (target == -1) {
       return null;
     }
@@ -1025,7 +1091,7 @@ class JobInProgress {
     }
 
     int target = findNewMapTask(tts, clusterSize, numUniqueHosts, 
-                                NON_LOCAL_CACHE_LEVEL, status.mapProgress());
+                                NON_LOCAL_CACHE_LEVEL);
     if (target == -1) {
       return null;
     }
@@ -1203,8 +1269,7 @@ class JobInProgress {
       return null;
     }
 
-    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts, 
-                                    status.reduceProgress());
+    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts);
     if (target == -1) {
       return null;
     }
@@ -1267,15 +1332,21 @@ class JobInProgress {
       name = Values.MAP.name();
       counter = JobCounter.TOTAL_LAUNCHED_MAPS;
       splits = tip.getSplitNodes();
-      if (tip.getActiveTasks().size() > 1)
+      if (tip.isSpeculating()) {
         speculativeMapTasks++;
+        LOG.debug("Chosen speculative task, current speculativeMap task count: "
+            + speculativeMapTasks);
+      }
       metrics.launchMap(id);
     } else {
       ++runningReduceTasks;
       name = Values.REDUCE.name();
       counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
-      if (tip.getActiveTasks().size() > 1)
+      if (tip.isSpeculating()) {
         speculativeReduceTasks++;
+        LOG.debug("Chosen speculative task, current speculativeReduce task count: "
+          + speculativeReduceTasks);
+      }
       metrics.launchReduce(id);
     }
     // Note that the logs are for the scheduled tasks only. Tasks that join on 
@@ -1433,7 +1504,7 @@ class JobInProgress {
     String[] splitLocations = tip.getSplitLocations();
 
     // Remove the TIP from the list for running non-local maps
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.remove(tip);
       return;
     }
@@ -1473,8 +1544,9 @@ class JobInProgress {
    * Adds a map tip to the list of running maps.
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleMap(TaskInProgress tip) {
+  protected synchronized void scheduleMap(TaskInProgress tip) {
     
+    runningMapTaskStats.add(0.0f);
     if (runningMapCache == null) {
       LOG.warn("Running cache for maps is missing!! " 
                + "Job details are missing.");
@@ -1483,7 +1555,7 @@ class JobInProgress {
     String[] splitLocations = tip.getSplitLocations();
 
     // Add the TIP to the list of non-local running TIPs
-    if (splitLocations.length == 0) {
+    if (splitLocations == null || splitLocations.length == 0) {
       nonLocalRunningMaps.add(tip);
       return;
     }
@@ -1508,7 +1580,8 @@ class JobInProgress {
    * Adds a reduce tip to the list of running reduces
    * @param tip the tip that needs to be scheduled as running
    */
-  private synchronized void scheduleReduce(TaskInProgress tip) {
+  protected synchronized void scheduleReduce(TaskInProgress tip) {
+    runningReduceTaskStats.add(0.0f);
     if (runningReduces == null) {
       LOG.warn("Running cache for reducers missing!! "
                + "Job details are missing.");
@@ -1612,57 +1685,71 @@ class JobInProgress {
     return null;
   }
   
+  public boolean hasSpeculativeMaps() {
+    return hasSpeculativeMaps;
+  }
+
+  public boolean hasSpeculativeReduces() {
+    return hasSpeculativeReduces;
+  }
+
   /**
-   * Find a speculative task
-   * @param list a list of tips
-   * @param taskTracker the tracker that has requested a tip
-   * @param avgProgress the average progress for speculation
-   * @param currentTime current time in milliseconds
-   * @param shouldRemove whether to remove the tips
-   * @return a tip that can be speculated on the tracker
+   * Retrieve a task for speculation.
+   * If a task slot becomes available and there are less than SpeculativeCap
+   * speculative tasks running: 
+   *  1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
+   *  2)Choose candidate tasks - those tasks whose progress rate is below
+   *    slowTaskThreshold * mean(progress-rates)
+   *  3)Speculate task that's expected to complete last
+   * @param list pool of tasks to choose from
+   * @param taskTrackerName the name of the TaskTracker asking for a task
+   * @param taskTrackerHost the hostname of the TaskTracker asking for a task
+   * @return the TIP to speculatively re-execute
    */
-  private synchronized TaskInProgress findSpeculativeTask(
-      Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
-      double avgProgress, long currentTime, boolean shouldRemove) {
+  protected synchronized TaskInProgress findSpeculativeTask(
+      Collection<TaskInProgress> list, String taskTrackerName, 
+      String taskTrackerHost) {
+    if (list.isEmpty()) {
+      return null;
+    }
+    long now = jobtracker.getClock().getTime();
+    if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list)) {
+      return null;
+    }
+    // List of speculatable candidates, start with all, and chop it down
+    ArrayList<TaskInProgress> candidates = new ArrayList<TaskInProgress>(list);
     
-    Iterator<TaskInProgress> iter = list.iterator();
-
+    Iterator<TaskInProgress> iter = candidates.iterator();
     while (iter.hasNext()) {
       TaskInProgress tip = iter.next();
-      // should never be true! (since we delete completed/failed tasks)
-      if (!tip.isRunning()) {
-        iter.remove();
-        continue;
-      }
-
-      if (!tip.hasRunOnMachine(ttStatus.getHost(), 
-                               ttStatus.getTrackerName())) {
-        if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
-          // In case of shared list we don't remove it. Since the TIP failed 
-          // on this tracker can be scheduled on some other tracker.
-          if (shouldRemove) {
-            iter.remove(); //this tracker is never going to run it again
-          }
-          return tip;
-        } 
-      } else {
-        // Check if this tip can be removed from the list.
-        // If the list is shared then we should not remove.
-        if (shouldRemove) {
-          // This tracker will never speculate this tip
+      if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
+          !tip.canBeSpeculated(now)) {
+          //remove it from candidates
           iter.remove();
-        }
       }
     }
-    return null;
+    //resort according to expected time till completion
+    Comparator<TaskInProgress> LateComparator = 
+      new EstimatedTimeLeftComparator(now);
+    Collections.sort(candidates, LateComparator);
+    if (candidates.size() > 0 ) {
+      TaskInProgress tip = candidates.get(0);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Chose task " + tip.getTIPId() + ". Statistics: Task's : " +
+            tip.getCurrentProgressRate(now) + " Job's : " + 
+            (tip.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
+      }
+      return tip;
+    } else {
+      return null;
+    }
   }
-  
+
   /**
    * Find new map task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param numUniqueHosts The number of hosts that run task trackers
-   * @param avgProgress The average progress of this kind of task in this job
    * @param maxCacheLevel The maximum topology level until which to schedule
    *                      maps. 
    *                      A value of {@link #anyCacheLevel} implies any 
@@ -1675,14 +1762,14 @@ class JobInProgress {
   private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
                                           final int clusterSize,
                                           final int numUniqueHosts,
-                                          final int maxCacheLevel,
-                                          final double avgProgress) {
+                                          final int maxCacheLevel) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
     if (numMapTasks == 0) {
       LOG.info("No maps to schedule for " + profile.getJobID());
       return -1;
     }
 
-    String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
     
     //
@@ -1694,7 +1781,7 @@ class JobInProgress {
       return -1;
     }
     
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
     }
 
@@ -1821,82 +1908,61 @@ class JobInProgress {
     // 
  
     if (hasSpeculativeMaps) {
-      long currentTime = System.currentTimeMillis();
-
-      // 1. Check bottom up for speculative tasks from the running cache
-      if (node != null) {
-        Node key = node;
-        for (int level = 0; level < maxLevel; ++level) {
-          Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
-          if (cacheForLevel != null) {
-            tip = findSpeculativeTask(cacheForLevel, tts, 
-                                      avgProgress, currentTime, level == 0);
-            if (tip != null) {
-              if (cacheForLevel.size() == 0) {
-                runningMapCache.remove(key);
-              }
-              return tip.getIdWithinJob();
-            }
-          }
-          key = key.getParent();
-        }
+      tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
+      if (tip != null) {
+        return tip.getIdWithinJob();
       }
+    }
+   return -1;
+  }
 
-      // 2. Check breadth-wise for speculative tasks
-      
-      for (Node parent : nodesAtMaxLevel) {
-        // ignore the parent which is already scanned
-        if (parent == nodeParentAtMaxLevel) {
-          continue;
-        }
+  private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, 
+      String taskTrackerHost) {
 
-        Set<TaskInProgress> cache = runningMapCache.get(parent);
-        if (cache != null) {
-          tip = findSpeculativeTask(cache, tts, avgProgress, 
-                                    currentTime, false);
-          if (tip != null) {
-            // remove empty cache entries
-            if (cache.size() == 0) {
-              runningMapCache.remove(parent);
-            }
-            LOG.info("Choosing a non-local task " + tip.getTIPId() 
-                     + " for speculation");
-            return tip.getIdWithinJob();
-          }
-        }
-      }
-
-      // 3. Check non-local tips for speculation
-      tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress, 
-                                currentTime, false);
-      if (tip != null) {
-        LOG.info("Choosing a non-local task " + tip.getTIPId() 
-                 + " for speculation");
-        return tip.getIdWithinJob();
+    //////// Populate allTips with all TaskInProgress
+    Set<TaskInProgress> allTips = new HashSet<TaskInProgress>();
+    
+    // collection of node at max level in the cache structure
+    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
+    // Add all tasks from max-level nodes breadth-wise
+    for (Node parent : nodesAtMaxLevel) {
+      Set<TaskInProgress> cache = runningMapCache.get(parent);
+      if (cache != null) {
+        allTips.addAll(cache);
       }
     }
+    // Add all non-local TIPs
+    allTips.addAll(nonLocalRunningMaps);
     
-    return -1;
+    ///////// Select a TIP to run on
+    TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, 
+        taskTrackerHost);
+    
+    if (tip != null) {
+      LOG.info("Choosing map task " + tip.getTIPId() + 
+          " for speculative execution");
+    } else {
+      LOG.debug("No speculative map task found for tracker " + taskTrackerName);
+    }
+    return tip;
   }
-
+  
   /**
    * Find new reduce task
    * @param tts The task tracker that is asking for a task
    * @param clusterSize The number of task trackers in the cluster
    * @param numUniqueHosts The number of hosts that run task trackers
-   * @param avgProgress The average progress of this kind of task in this job
    * @return the index in tasks of the selected task (or -1 for no task)
    */
   private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
                                              int clusterSize,
-                                             int numUniqueHosts,
-                                             double avgProgress) {
+                                             int numUniqueHosts) {
+    String taskTrackerName = tts.getTrackerName();
+    String taskTrackerHost = tts.getHost();
     if (numReduceTasks == 0) {
       LOG.info("No reduces to schedule for " + profile.getJobID());
       return -1;
     }
-
-    String taskTracker = tts.getTrackerName();
     TaskInProgress tip = null;
 
     // Update the last-known clusterSize
@@ -1906,14 +1972,14 @@ class JobInProgress {
       return -1;
     }
 
-    if (!shouldRunOnTaskTracker(taskTracker)) {
+    if (!shouldRunOnTaskTracker(taskTrackerName)) {
       return -1;
     }
 
     long outSize = resourceEstimator.getEstimatedReduceInputSize();
     long availSpace = tts.getResourceStatus().getAvailableSpace();
     if(availSpace < outSize) {
-      LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
+      LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " +
                 availSpace + 
                " bytes free; but we expect reduce input to take " + outSize);
 
@@ -1930,16 +1996,187 @@ class JobInProgress {
 
     // 2. check for a reduce tip to be speculated
     if (hasSpeculativeReduces) {
-      tip = findSpeculativeTask(runningReduces, tts, avgProgress, 
-                                System.currentTimeMillis(), false);
+      tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
       if (tip != null) {
-        scheduleReduce(tip);
         return tip.getIdWithinJob();
       }
     }
 
     return -1;
   }
+
+  private synchronized TaskInProgress getSpeculativeReduce(
+      String taskTrackerName, String taskTrackerHost) {
+    TaskInProgress tip = findSpeculativeTask(
+        runningReduces, taskTrackerName, taskTrackerHost);
+    if (tip != null) {
+      LOG.info("Choosing reduce task " + tip.getTIPId() + 
+          " for speculative execution");
+    }else {
+      LOG.debug("No speculative map task found for tracker " + taskTrackerHost);
+    }
+    return tip;
+  }
+
+    /**
+     * Check to see if the maximum number of speculative tasks are
+     * already being executed currently.
+     * @param tasks the set of tasks to test
+     * @return has the cap been reached?
+     */
+   private boolean atSpeculativeCap(Collection<TaskInProgress> tasks) {
+     float numTasks = tasks.size();
+     if (numTasks == 0){
+       return true; // avoid divide by zero
+     }
+
+     //return true if totalSpecTask < max(10, 0.01 * total-slots, 
+     //                                   0.1 * total-running-tasks)
+
+     if (speculativeMapTasks + speculativeReduceTasks < MIN_SPEC_CAP) {
+       return false; // at least one slow tracker's worth of slots(default=10)
+     }
+     ClusterStatus c = jobtracker.getClusterStatus(false); 
+     int numSlots = c.getMaxMapTasks() + c.getMaxReduceTasks();
+     if ((float)(speculativeMapTasks + speculativeReduceTasks) < 
+       numSlots * MIN_SLOTS_CAP) {
+       return false;
+     }
+     boolean atCap = (((float)(speculativeMapTasks+
+         speculativeReduceTasks)/numTasks) >= speculativeCap);
+     if (LOG.isDebugEnabled()) {
+       LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
+           ((float)(speculativeMapTasks+speculativeReduceTasks)/numTasks)+
+           ", so atSpecCap() is returning "+atCap);
+     }
+     return atCap;
+   }
+  
+  /**
+   * A class for comparing the estimated time to completion of two tasks
+   */
+  private static class EstimatedTimeLeftComparator 
+  implements Comparator<TaskInProgress> {
+    private long time;
+    public EstimatedTimeLeftComparator(long now) {
+      this.time = now;
+    }
+    /**
+     * Estimated time to completion is measured as:
+     *   % of task left to complete (1 - progress) / progress rate of the task.
+     * 
+     * This assumes that tasks are linear in their progress, which is 
+     * often wrong, especially since progress for reducers is currently
+     * calculated by evenly weighting their three stages (shuffle, sort, map)
+     * which rarely account for 1/3 each. This should be fixed in the future
+     * by calculating progressRate more intelligently or splitting these
+     * multi-phase tasks into individual tasks.
+     * 
+     * The ordering this comparator defines is: task1 < task2 if task1 is
+     * estimated to finish farther in the future => compare(t1,t2) returns -1
+     */
+    public int compare(TaskInProgress tip1, TaskInProgress tip2) {
+      //we have to use the Math.max in the denominator to avoid divide by zero
+      //error because prog and progRate can both be zero (if one is zero,
+      //the other one will be 0 too).
+      //We use inverse of time_reminaing=[(1- prog) / progRate]
+      //so that (1-prog) is in denom. because tasks can have arbitrarily 
+      //low progRates in practice (e.g. a task that is half done after 1000
+      //seconds will have progRate of 0.0000005) so we would rather 
+      //use Math.maxnon (1-prog) by putting it in the denominator 
+      //which will cause tasks with prog=1 look 99.99% done instead of 100%
+      //which is okay
+      double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001, 
+          1.0 - tip1.getProgress());
+      double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001, 
+          1.0 - tip2.getProgress());
+      if (t1 < t2) return -1;
+      else if (t2 < t1) return 1;
+      else return 0;
+    }
+  }
+  
+  /**
+   * Compares the ave progressRate of tasks that have finished on this 
+   * taskTracker to the ave of all succesfull tasks thus far to see if this 
+   * TT one is too slow for speculating.
+   * slowNodeThreshold is used to determine the number of standard deviations
+   * @param taskTracker the name of the TaskTracker we are checking
+   * @return is this TaskTracker slow
+   */
+  protected boolean isSlowTracker(String taskTracker) {
+    if (trackerMapStats.get(taskTracker) != null &&
+        trackerMapStats.get(taskTracker).mean() -
+        mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Tracker " + taskTracker + 
+            " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() +
+            " mapTaskStats :" + mapTaskStats);
+      }
+      return true;
+    }
+    if (trackerReduceStats.get(taskTracker) != null && 
+        trackerReduceStats.get(taskTracker).mean() -
+        reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Tracker " + taskTracker + 
+            " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() +
+            " reduceTaskStats :" + reduceTaskStats);
+      }
+      return true;
+    }
+    return false;
+  }
+  
+  static class DataStatistics{
+    private int count = 0;
+    private double sum = 0;
+    private double sumSquares = 0;
+    
+    public DataStatistics() {
+    }
+    
+    public DataStatistics(double initNum) {
+      this.count = 1;
+      this.sum = initNum;
+      this.sumSquares = initNum * initNum;
+    }
+    
+    public void add(double newNum) {
+      this.count++;
+      this.sum += newNum;
+      this.sumSquares += newNum * newNum;
+    }
+
+    public void updateStatistics(double old, double update) {
+      sub(old);
+      add(update);
+    }
+    private void sub(double oldNum) {
+      this.count--;
+      this.sum -= oldNum;
+      this.sumSquares -= oldNum * oldNum;
+    }
+    
+    public double mean() {
+      return sum/count;      
+    }
+  
+    public double var() {
+      // E(X^2) - E(X)^2
+      return (sumSquares/count) - mean() * mean();
+    }
+    
+    public double std() {
+      return Math.sqrt(this.var());
+    }
+    
+    public String toString() {
+      return "DataStatistics: count is " + count + ", sum is " + sum + 
+      ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
+    }
+    
+  }
   
   private boolean shouldRunOnTaskTracker(String taskTracker) {
     //
@@ -2002,7 +2239,6 @@ class JobInProgress {
                                             TaskStatus status)
   {
     TaskAttemptID taskid = status.getTaskID();
-    int oldNumAttempts = tip.getActiveTasks().size();
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
         
     // Sanity check: is the TIP already complete? 
@@ -2018,10 +2254,9 @@ class JobInProgress {
       }
       return false;
     } 
-
+    boolean wasSpeculating = tip.isSpeculating(); //store this fact
     LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
              " successfully.");          
-
     // Mark the TIP as complete
     tip.completed(taskid);
     resourceEstimator.updateWithCompletedTask(status, tip);
@@ -2059,7 +2294,6 @@ class JobInProgress {
                                 tip.getExecFinishTime(),
                                 status.getCounters()); 
         
-    int newNumAttempts = tip.getActiveTasks().size();
     if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
@@ -2096,12 +2330,11 @@ class JobInProgress {
       jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
     } else if (tip.isMapTask()) {
       runningMapTasks -= 1;
-      // check if this was a sepculative task
-      if (oldNumAttempts > 1) {
-        speculativeMapTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedMapTasks += 1;
       metrics.completeMap(taskid);
+      if (hasSpeculativeMaps) {
+        updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
+      }
       // remove the completed map from the resp running caches
       retireMap(tip);
       if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
@@ -2109,21 +2342,66 @@ class JobInProgress {
       }
     } else {
       runningReduceTasks -= 1;
-      if (oldNumAttempts > 1) {
-        speculativeReduceTasks -= (oldNumAttempts - newNumAttempts);
-      }
       finishedReduceTasks += 1;
       metrics.completeReduce(taskid);
+      if (hasSpeculativeReduces) {
+        updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
+      }
       // remove the completed reduces from the running reducers set
       retireReduce(tip);
       if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
         this.status.setReduceProgress(1.0f);
       }
     }
-    
+    decrementSpeculativeCount(wasSpeculating, tip);
     return true;
   }
-
+  
+  private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, 
+      Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
+    float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime();
+    DataStatistics ttStats = 
+      trackerStats.get(ttStatus.getTrackerName());
+    double oldMean = 0.0d;
+    //We maintain the mean of TaskTrackers' means. That way, we get a single
+    //data-point for every tracker (used in the evaluation in isSlowTracker)
+    if (ttStats != null) {
+      oldMean = ttStats.mean();
+      ttStats.add(tipDuration);
+      overallStats.updateStatistics(oldMean, ttStats.mean());
+    } else {
+      trackerStats.put(ttStatus.getTrackerName(),
+          (ttStats = new DataStatistics(tipDuration)));
+      overallStats.add(tipDuration);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+
+          (tip.isMapTask() ? "Map" : "Reduce") +
+          " on "+ttStatus.getTrackerName()+". DataStatistics is now: " +
+          trackerStats.get(ttStatus.getTrackerName()));
+    }
+  }
+  
+  public void updateStatistics(double oldProg, double newProg, boolean isMap) {
+    if (isMap) {   
+      runningMapTaskStats.updateStatistics(oldProg, newProg);
+    } else {
+      runningReduceTaskStats.updateStatistics(oldProg, newProg);
+    }
+  }
+  
+  public DataStatistics getRunningTaskStatistics(boolean isMap) {
+    if (isMap) {
+      return runningMapTaskStats;
+    } else {
+      return runningReduceTaskStats;
+    }
+  }
+  
+  public float getSlowTaskThreshold() {
+    return slowTaskThreshold;
+  }
+  
   /**
    * The job is done since all it's component tasks are either
    * successful or have failed.
@@ -2142,7 +2420,7 @@ class JobInProgress {
       if (reduces.length == 0) {
         this.status.setReduceProgress(1.0f);
       }
-      this.finishTime = System.currentTimeMillis();
+      this.finishTime = jobtracker.getClock().getTime();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
@@ -2164,7 +2442,7 @@ class JobInProgress {
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.FAILED,
                                     status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.finishTime = jobtracker.getClock().getTime();
         JobHistory.JobInfo.logFailed(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2172,7 +2450,7 @@ class JobInProgress {
         this.status = new JobStatus(status.getJobID(),
                                     1.0f, 1.0f, 1.0f, JobStatus.KILLED,
                                     status.getJobPriority());
-        this.finishTime = System.currentTimeMillis();
+        this.finishTime = jobtracker.getClock().getTime();
         JobHistory.JobInfo.logKilled(this.status.getJobID(), finishTime, 
                                      this.finishedMapTasks, 
                                      this.finishedReduceTasks);
@@ -2273,6 +2551,21 @@ class JobInProgress {
     terminate(JobStatus.FAILED);
   }
   
+  private void decrementSpeculativeCount(boolean wasSpeculating, 
+      TaskInProgress tip) {
+    if (wasSpeculating) {
+      if (tip.isMapTask()) {
+        speculativeMapTasks--;
+        LOG.debug("Decrement count. Current speculativeMap task count: " +
+            speculativeMapTasks);
+      } else {
+        speculativeReduceTasks--;
+        LOG.debug("Decremented count. Current speculativeReduce task count: " + 
+            speculativeReduceTasks);
+      }
+    }
+  }
+  
   /**
    * A task assigned to this JobInProgress has reported in as failed.
    * Most of the time, we'll just reschedule execution.  However, after
@@ -2292,9 +2585,11 @@ class JobInProgress {
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
     boolean wasFailed = tip.isFailed();
+    boolean wasSpeculating = tip.isSpeculating();
 
     // Mark the taskid as FAILED or KILLED
     tip.incompleteSubTask(taskid, this.status);
+    decrementSpeculativeCount(wasSpeculating, tip);
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
@@ -2477,8 +2772,8 @@ class JobInProgress {
    * @param reason The reason that the task failed
    * @param trackerName The task tracker the task failed on
    */
-  public void failedTask(TaskInProgress tip, TaskAttemptID taskid, String reason, 
-                         TaskStatus.Phase phase, TaskStatus.State state, 
+  public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid,
+      String reason, TaskStatus.Phase phase, TaskStatus.State state, 
                          String trackerName) {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
@@ -2491,10 +2786,10 @@ class JobInProgress {
     // update the actual start-time of the attempt
     TaskStatus oldStatus = tip.getTaskStatus(taskid); 
     long startTime = oldStatus == null
-                     ? System.currentTimeMillis()
+                     ? jobtracker.getClock().getTime()
                      : oldStatus.getStartTime();
     status.setStartTime(startTime);
-    status.setFinishTime(System.currentTimeMillis());
+    status.setFinishTime(jobtracker.getClock().getTime());
     boolean wasComplete = tip.isComplete();
     updateTaskStatus(tip, status);
     boolean isComplete = tip.isComplete();

+ 30 - 20
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -138,6 +138,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   // system files should have 700 permission
   final static FsPermission SYSTEM_FILE_PERMISSION =
     FsPermission.createImmutable((short) 0700); // rwx------
+  
+  private Clock clock;
 
   /**
    * A client tried to submit a job before the Job Tracker was ready.
@@ -165,6 +167,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
     
+  public Clock getClock() {
+    return clock;
+  }
+  
   /**
    * Start the JobTracker with given configuration.
    * 
@@ -181,7 +187,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     JobTracker result = null;
     while (true) {
       try {
-        result = new JobTracker(conf);
+        result = new JobTracker(conf, new Clock());
         result.taskScheduler.setTaskTrackerManager(result);
         break;
       } catch (VersionMismatch e) {
@@ -242,7 +248,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         try {
           // Every 3 minutes check for any tasks that are overdue
           Thread.sleep(tasktrackerExpiryInterval/3);
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           LOG.debug("Starting launching task sweep");
           synchronized (JobTracker.this) {
             synchronized (launchingTasks) {
@@ -295,7 +301,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     public void addNewTask(TaskAttemptID taskName) {
       synchronized (launchingTasks) {
         launchingTasks.put(taskName, 
-                           System.currentTimeMillis());
+                           clock.getTime());
       }
     }
       
@@ -339,7 +345,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           synchronized (JobTracker.this) {
             synchronized (taskTrackers) {
               synchronized (trackerExpiryQueue) {
-                long now = System.currentTimeMillis();
+                long now = clock.getTime();
                 TaskTrackerStatus leastRecent = null;
                 while ((trackerExpiryQueue.size() > 0) &&
                        ((leastRecent = trackerExpiryQueue.first()) != null) &&
@@ -405,7 +411,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         try {
           Thread.sleep(retireJobCheckInterval);
           List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
-          long now = System.currentTimeMillis();
+          long now = clock.getTime();
           long retireBefore = now - retireJobInterval;
 
           synchronized (jobs) {
@@ -465,9 +471,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     long lastUpdated;
     boolean blacklisted; 
 
-    FaultInfo() {
+    FaultInfo(long time) {
       numFaults = 0;
-      lastUpdated = System.currentTimeMillis();
+      lastUpdated = time;
       blacklisted = false;
     }
 
@@ -517,14 +523,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     void incrementFaults(String hostName) {
       synchronized (potentiallyFaultyTrackers) {
         FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+        long now = clock.getTime();
         if (fi == null) {
-          fi = new FaultInfo();
+          fi = new FaultInfo(now);
           potentiallyFaultyTrackers.put(hostName, fi);
         }
         int numFaults = fi.getFaultCount();
         ++numFaults;
         fi.setFaultCount(numFaults);
-        fi.setLastUpdated(System.currentTimeMillis());
+        fi.setLastUpdated(now);
         if (!fi.isBlacklisted()) {
           if (shouldBlacklist(hostName, numFaults)) {
             LOG.info("Adding " + hostName + " to the blacklist" +
@@ -1028,7 +1035,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       TaskTrackerStatus ttStatus = 
         new TaskTrackerStatus(trackerName, trackerHostName, port, ttStatusList, 
                               0 , 0, 0);
-      ttStatus.setLastSeen(System.currentTimeMillis());
+      ttStatus.setLastSeen(clock.getTime());
 
       synchronized (JobTracker.this) {
         synchronized (taskTrackers) {
@@ -1244,7 +1251,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     }
 
     public void recover() {
-      long recoveryProcessStartTime = System.currentTimeMillis();
+      long recoveryProcessStartTime = clock.getTime();
       if (!shouldRecover()) {
         // clean up jobs structure
         jobsToRecover.clear();
@@ -1312,13 +1319,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
           continue;
         }
       }
-
+      long now = clock.getTime();
       LOG.info("Took a total of " 
-               + StringUtils.formatTime(System.currentTimeMillis() 
+               + StringUtils.formatTime(now 
                                         - recoveryProcessStartTime) 
                + " for recovering filenames of all the jobs from history.");
 
-      long recoveryStartTime = System.currentTimeMillis();
 
       // II. Recover each job
       idIter = jobsToRecover.iterator();
@@ -1375,10 +1381,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         }
       }
 
-      long recoveryProcessEndTime = System.currentTimeMillis();
+      long recoveryProcessEndTime = clock.getTime();
       LOG.info("Took a total of " 
                + StringUtils.formatTime(recoveryProcessEndTime
-                                        - recoveryStartTime) 
+                                        - now) 
                + " for parsing and recovering all the jobs from history.");
 
       recoveryDuration = recoveryProcessEndTime - recoveryProcessStartTime;
@@ -1565,11 +1571,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
   private QueueManager queueManager;
 
+  JobTracker(JobConf conf) throws IOException,InterruptedException{
+    this(conf, new Clock());
+  }
   /**
    * Start the JobTracker process, listen on the indicated port
    */
-  JobTracker(JobConf conf) throws IOException, InterruptedException {
+  JobTracker(JobConf conf, Clock clock) throws IOException, InterruptedException {
     // find the owner of the process
+    this.clock = clock;
     try {
       mrOwner = UnixUserGroupInformation.login(conf);
     } catch (LoginException e) {
@@ -1649,7 +1659,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         conf.get("mapred.job.tracker.http.address", "0.0.0.0:50030"));
     String infoBindAddress = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
-    this.startTime = System.currentTimeMillis();
+    this.startTime = clock.getTime();
     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
         tmpInfoPort == 0, conf);
     infoServer.setAttribute("job.tracker", this);
@@ -2152,7 +2162,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     final JobTrackerInstrumentation metrics = getInstrumentation();
     metrics.finalizeJob(conf, id);
     
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     
     // mark the job for cleanup at all the trackers
     addJobForCleanup(id);
@@ -2568,7 +2578,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
     // First check if the last heartbeat response got through
     String trackerName = status.getTrackerName();
-    long now = System.currentTimeMillis();
+    long now = clock.getTime();
     boolean isBlacklisted = false;
     if (restarted) {
       faultyTrackers.markTrackerHealthy(status.getHost());

+ 101 - 35
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -32,6 +32,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobClient.RawSplit;
+import org.apache.hadoop.mapred.JobInProgress.DataStatistics;
 import org.apache.hadoop.mapred.SortedRanges.Range;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.net.Node;
@@ -52,9 +53,8 @@ import org.apache.hadoop.net.Node;
  * **************************************************************
  */
 class TaskInProgress {
-  static final int MAX_TASK_EXECS = 1;
+  static final int MAX_TASK_EXECS = 1; //max # nonspec tasks to run concurrently  
   int maxTaskAttempts = 4;    
-  static final double SPECULATIVE_GAP = 0.2;
   static final long SPECULATIVE_LAG = 60 * 1000;
   private static final int NUM_ATTEMPTS_PER_RESTART = 1000;
 
@@ -74,9 +74,10 @@ class TaskInProgress {
   private int numTaskFailures = 0;
   private int numKilledTasks = 0;
   private double progress = 0;
+  private double oldProgressRate;
   private String state = "";
-  private long startTime = 0;
-  private long execStartTime = 0;
+  private long dispatchTime = 0;   // most recent time task attempt given to TT
+  private long execStartTime = 0;  // when we started first task-attempt
   private long execFinishTime = 0;
   private int completes = 0;
   private boolean failed = false;
@@ -220,7 +221,6 @@ class TaskInProgress {
    * Initialization common to Map and Reduce
    */
   void init(JobID jobId) {
-    this.startTime = System.currentTimeMillis();
     this.id = new TaskID(jobId, isMapTask() ? TaskType.MAP : TaskType.REDUCE,
         partition);
     this.skipping = startSkipping();
@@ -229,12 +229,19 @@ class TaskInProgress {
   ////////////////////////////////////
   // Accessors, info, profiles, etc.
   ////////////////////////////////////
-
+  
   /**
-   * Return the start time
+   * Return the dispatch time
    */
-  public long getStartTime() {
-    return startTime;
+  public long getDispatchTime(){
+    return this.dispatchTime;
+  }
+  
+  /**
+   * Set the dispatch time
+   */
+  public void setDispatchTime(long disTime){
+    this.dispatchTime = disTime;
   }
   
   /**
@@ -399,9 +406,15 @@ class TaskInProgress {
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
+      if (isComplete() && !isComplete(taskid)) {
+        addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
+            " already SUCCEEDED");
+      }      
     } else if (isCommitPending(taskid) && !shouldCommit(taskid) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
+      addDiagnosticInfo(taskid, "Another (possibly speculative) attempt" +
+           " went to COMMIT_PENDING state earlier");
       close = true; 
     } else {
       close = tasksToKill.keySet().contains(taskid);
@@ -562,6 +575,17 @@ class TaskInProgress {
     // but finishTime has to be updated.
     if (!isCleanupAttempt(taskid)) {
       taskStatuses.put(taskid, status);
+      if ((isMapTask() && job.hasSpeculativeMaps()) || 
+          (!isMapTask() && job.hasSpeculativeReduces())) {
+        long now = jobtracker.getClock().getTime();
+        double oldProgRate = getOldProgressRate();
+        double currProgRate = getCurrentProgressRate(now);
+        job.updateStatistics(oldProgRate, currProgRate, isMapTask());
+        //we need to store the current progress rate, so that we can
+        //update statistics accurately the next time we invoke
+        //updateStatistics
+        setProgressRate(currProgRate);
+      }
     } else {
       taskStatuses.get(taskid).statusUpdate(status.getRunState(),
         status.getProgress(), status.getStateString(), status.getPhase(),
@@ -619,7 +643,7 @@ class TaskInProgress {
 
       // tasktracker went down and failed time was not reported. 
       if (0 == status.getFinishTime()){
-        status.setFinishTime(System.currentTimeMillis());
+        status.setFinishTime(jobtracker.getClock().getTime());
       }
     }
 
@@ -723,7 +747,7 @@ class TaskInProgress {
     //
 
     this.completes++;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
     
   }
@@ -762,7 +786,7 @@ class TaskInProgress {
     }
     this.failed = true;
     killed = true;
-    this.execFinishTime = System.currentTimeMillis();
+    this.execFinishTime = jobtracker.getClock().getTime();
     recomputeProgress();
   }
 
@@ -860,35 +884,39 @@ class TaskInProgress {
   }
     
   /**
-   * Return whether the TIP has a speculative task to run.  We
-   * only launch a speculative task if the current TIP is really
-   * far behind, and has been behind for a non-trivial amount of 
-   * time.
+   * Can this task be speculated? This requires that it isn't done or almost
+   * done and that it isn't already being speculatively executed.
+   * 
+   * Added for use by queue scheduling algorithms.
+   * @param currentTime 
    */
-  boolean hasSpeculativeTask(long currentTime, double averageProgress) {
-    //
-    // REMIND - mjc - these constants should be examined
-    // in more depth eventually...
-    //
-      
-    if (!skipping && activeTasks.size() <= MAX_TASK_EXECS &&
-        (averageProgress - progress >= SPECULATIVE_GAP) &&
-        (currentTime - startTime >= SPECULATIVE_LAG) 
-        && completes == 0 && !isOnlyCommitPending()) {
-      return true;
+  boolean canBeSpeculated(long currentTime) {
+    DataStatistics taskStats = job.getRunningTaskStatistics(isMapTask());
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("activeTasks.size(): " + activeTasks.size() + " "
+          + activeTasks.firstKey() + " task's progressrate: " + 
+          getCurrentProgressRate(currentTime) + 
+          " taskStats : " + taskStats);
     }
-    return false;
+    return (!skipping && isRunnable() && isRunning() &&
+        activeTasks.size() <= MAX_TASK_EXECS &&
+        currentTime - dispatchTime >= SPECULATIVE_LAG &&
+        completes == 0 && !isOnlyCommitPending() &&
+        (taskStats.mean() - getCurrentProgressRate(currentTime) >
+              taskStats.std() * job.getSlowTaskThreshold()));
   }
-    
+  
+  /**
+   * Is the task currently speculating?
+   */
+  boolean isSpeculating() {
+   return (activeTasks.size() > MAX_TASK_EXECS);
+  }
+  
   /**
    * Return a Task that can be sent to a TaskTracker for execution.
    */
-  public Task getTaskToRun(String taskTracker) throws IOException {
-    if (0 == execStartTime){
-      // assume task starts running now
-      execStartTime = System.currentTimeMillis();
-    }
-
+  public Task getTaskToRun(String taskTracker) throws IOException {   
     // Create the 'taskid'; do not count the 'killed' tasks against the job!
     TaskAttemptID taskid = null;
     if (nextTaskId < (MAX_TASK_EXECS + maxTaskAttempts + numKilledTasks)) {
@@ -903,6 +931,16 @@ class TaskInProgress {
       return null;
     }
 
+    //keep track of the last time we started an attempt at this TIP
+    //used to calculate the progress rate of this TIP
+    setDispatchTime(jobtracker.getClock().getTime());
+ 
+    //set this the first time we run a taskAttempt in this TIP
+    //each Task attempt has its own TaskStatus, which tracks that
+    //attempts execStartTime, thus this startTime is TIP wide.
+    if (0 == execStartTime){
+      setExecStartTime(dispatchTime);
+    }
     return addRunningTask(taskid, taskTracker);
   }
   
@@ -1083,6 +1121,34 @@ class TaskInProgress {
     rawSplit.clearBytes();
   }
   
+  /**
+   * Compare most recent task attempts dispatch time to current system time so
+   * that task progress rate will slow down as time proceeds even if no progress
+   * is reported for the task. This allows speculative tasks to be launched for
+   * tasks on slow/dead TT's before we realize the TT is dead/slow. Skew isn't
+   * an issue since both times are from the JobTrackers perspective.
+   * @return the progress rate from the active task that is doing best
+   */
+  public double getCurrentProgressRate(long currentTime) {
+    double bestProgressRate = 0;
+    for (TaskStatus ts : taskStatuses.values()){
+      double progressRate = ts.getProgress()/Math.max(1,
+          currentTime - dispatchTime);
+      if ((ts.getRunState() == TaskStatus.State.RUNNING  || 
+          ts.getRunState() == TaskStatus.State.SUCCEEDED) &&
+          progressRate > bestProgressRate){
+        bestProgressRate = progressRate;
+      }
+    }
+    return bestProgressRate;
+  }
+  
+  private void setProgressRate(double rate) {
+    oldProgressRate = rate;
+  }
+  private double getOldProgressRate() {
+    return oldProgressRate;
+  }
   /**
    * This class keeps the records to be skipped during further executions 
    * based on failed records from all the previous attempts.

+ 7 - 5
src/mapred/org/apache/hadoop/mapred/TaskStatus.java

@@ -50,7 +50,7 @@ abstract class TaskStatus implements Writable, Cloneable {
   private String stateString;
   private String taskTracker;
     
-  private long startTime; 
+  private long startTime; //in ms
   private long finishTime; 
   private long outputSize;
     
@@ -81,7 +81,9 @@ abstract class TaskStatus implements Writable, Cloneable {
   public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
   public float getProgress() { return progress; }
-  public void setProgress(float progress) { this.progress = progress; } 
+  public void setProgress(float progress) {
+    this.progress = progress;
+  } 
   public State getRunState() { return runState; }
   public String getTaskTracker() {return taskTracker;}
   public void setTaskTracker(String tracker) { this.taskTracker = tracker;}
@@ -279,7 +281,7 @@ abstract class TaskStatus implements Writable, Cloneable {
   public List<TaskAttemptID> getFetchFailedMaps() {
     return null;
   }
-  
+
   /**
    * Add to the list of maps from which output-fetches failed.
    *  
@@ -310,7 +312,7 @@ abstract class TaskStatus implements Writable, Cloneable {
    * @param status updated status
    */
   synchronized void statusUpdate(TaskStatus status) {
-    this.progress = status.getProgress();
+    setProgress (status.getProgress());
     this.runState = status.getRunState();
     this.stateString = status.getStateString();
     this.nextRecordRange = status.getNextRecordRange();
@@ -397,7 +399,7 @@ abstract class TaskStatus implements Writable, Cloneable {
 
   public void readFields(DataInput in) throws IOException {
     this.taskid.readFields(in);
-    this.progress = in.readFloat();
+    setProgress(in.readFloat());
     this.runState = WritableUtils.readEnum(in, State.class);
     this.diagnosticInfo = Text.readString(in);
     this.stateString = Text.readString(in);

+ 6 - 0
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -192,6 +192,12 @@ class TaskTrackerStatus implements Writable {
     taskReports = new ArrayList<TaskStatus>();
     resStatus = new ResourceStatus();
   }
+  
+  public TaskTrackerStatus(String trackerName, String host) {
+    this();
+    this.trackerName = trackerName;
+    this.host = host;
+  }
 
   /**
    */

+ 1 - 1
src/test/mapred/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java

@@ -45,7 +45,7 @@ public class TestJobQueueTaskScheduler extends TestCase {
     
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+      super(new JobID("test", ++jobCounter), jobConf, null);
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);

+ 1 - 1
src/test/mapred/org/apache/hadoop/mapred/TestParallelInitialization.java

@@ -42,7 +42,7 @@ public class TestParallelInitialization extends TestCase {
    
     public FakeJobInProgress(JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager) throws IOException {
-      super(new JobID("test", ++jobCounter), jobConf);
+      super(new JobID("test", ++jobCounter), jobConf, null);
       this.startTime = System.currentTimeMillis();
       this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
       this.status.setJobPriority(JobPriority.NORMAL);

+ 2 - 2
src/test/mapred/org/apache/hadoop/mapred/TestResourceEstimation.java

@@ -32,7 +32,7 @@ public class TestResourceEstimation extends TestCase {
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc);
+    JobInProgress jip = new JobInProgress(jid, jc, null);
     //unfortunately, we can't set job input size from here.
     ResourceEstimator re = new ResourceEstimator(jip);
     
@@ -64,7 +64,7 @@ public class TestResourceEstimation extends TestCase {
     jc.setNumMapTasks(maps);
     jc.setNumReduceTasks(reduces);
     
-    JobInProgress jip = new JobInProgress(jid, jc) {
+    JobInProgress jip = new JobInProgress(jid, jc, null) {
       long getInputLength() {
         return singleMapInputSize*desiredMaps();
       }

+ 348 - 0
src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java

@@ -0,0 +1,348 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.mapred.TaskStatus.Phase;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.hadoop.mapred.UtilsForTests.FakeClock;
+
+import junit.extensions.TestSetup;
+import junit.framework.Test;
+import junit.framework.TestCase;
+import junit.framework.TestSuite;
+
+public class TestSpeculativeExecution extends TestCase {
+
+  FakeJobInProgress job;
+  static FakeJobTracker jobTracker;
+  static String jtIdentifier = "test";
+  private static int jobCounter;
+  static class SpecFakeClock extends FakeClock {
+    long SPECULATIVE_LAG = TaskInProgress.SPECULATIVE_LAG;
+    @Override
+    public void advance(long millis) {
+      time += millis + SPECULATIVE_LAG;
+    }
+  };
+  static SpecFakeClock clock;
+  
+  static String trackers[] = new String[] {"tracker_tracker1:1000", 
+      "tracker_tracker2:1000", "tracker_tracker3:1000",
+      "tracker_tracker4:1000", "tracker_tracker5:1000"};
+
+  public static Test suite() {
+    TestSetup setup = 
+      new TestSetup(new TestSuite(TestSpeculativeExecution.class)) {
+      protected void setUp() throws Exception {
+        JobConf conf = new JobConf();
+        conf.set("mapred.job.tracker", "localhost:0");
+        conf.set("mapred.job.tracker.http.address", "0.0.0.0:0");
+        jobTracker = new FakeJobTracker(conf, (clock = new SpecFakeClock()));
+        for (String tracker : trackers) {
+          jobTracker.heartbeat(new TaskTrackerStatus(tracker,
+              JobInProgress.convertTrackerNameToHostName(tracker)), false, 
+              true, false, (short)0);
+        }
+      }
+      protected void tearDown() throws Exception {
+        //delete the build/test/logs/ dir
+      }
+    };
+    return setup;
+  }
+  
+  /*
+   * This class is required mainly to check the speculative cap
+   * based on cluster size
+   */
+  static class FakeJobTracker extends JobTracker {
+    //initialize max{Map/Reduce} task capacities to twice the clustersize
+    int totalSlots = trackers.length * 4;
+    FakeJobTracker(JobConf conf, Clock clock) throws IOException, 
+    InterruptedException {
+      super(conf, clock);
+    }
+    @Override
+    public ClusterStatus getClusterStatus(boolean detailed) {
+      return new ClusterStatus(trackers.length,
+          0, 0, 0, 0, totalSlots/2, totalSlots/2, JobTracker.State.RUNNING, 0);
+    }
+    public void setNumSlots(int totalSlots) {
+      this.totalSlots = totalSlots;
+    }
+  }
+
+  static class FakeJobInProgress extends JobInProgress {
+    JobClient.RawSplit[] rawSplits;
+    FakeJobInProgress(JobConf jobConf, JobTracker tracker) throws IOException {
+      super(new JobID(jtIdentifier, ++jobCounter), jobConf, tracker);
+      //initObjects(tracker, numMaps, numReduces);
+    }
+    @Override
+    public synchronized void initTasks() throws IOException {
+      maps = new TaskInProgress[numMapTasks];
+      for (int i = 0; i < numMapTasks; i++) {
+        JobClient.RawSplit split = new JobClient.RawSplit();
+        split.setLocations(new String[0]);
+        maps[i] = new TaskInProgress(getJobID(), "test", 
+            split, jobtracker, getJobConf(), this, i);
+        nonLocalMaps.add(maps[i]);
+      }
+      reduces = new TaskInProgress[numReduceTasks];
+      for (int i = 0; i < numReduceTasks; i++) {
+        reduces[i] = new TaskInProgress(getJobID(), "test", 
+                                        numMapTasks, i, 
+                                        jobtracker, getJobConf(), this);
+        nonRunningReduces.add(reduces[i]);
+      }
+    }
+    private TaskAttemptID findTask(String trackerName, String trackerHost,
+        Collection<TaskInProgress> nonRunningTasks, 
+        Collection<TaskInProgress> runningTasks)
+    throws IOException {
+      TaskInProgress tip = null;
+      Iterator<TaskInProgress> iter = nonRunningTasks.iterator();
+      //look for a non-running task first
+      while (iter.hasNext()) {
+        TaskInProgress t = iter.next();
+        if (t.isRunnable() && !t.isRunning()) {
+          runningTasks.add(t);
+          iter.remove();
+          tip = t;
+          break;
+        }
+      }
+      if (tip == null) {
+        if (getJobConf().getSpeculativeExecution()) {
+          tip = findSpeculativeTask(runningTasks, trackerName, trackerHost);
+        }
+      }
+      if (tip != null) {
+        TaskAttemptID tId = tip.getTaskToRun(trackerName).getTaskID();
+        if (tip.isMapTask()) {
+          scheduleMap(tip);
+        } else {
+          scheduleReduce(tip);
+        }
+        //Set it to RUNNING
+        makeRunning(tId, tip, trackerName);
+        return tId;
+      }
+      return null;
+    }
+    public TaskAttemptID findMapTask(String trackerName)
+    throws IOException {
+      return findTask(trackerName, 
+          JobInProgress.convertTrackerNameToHostName(trackerName),
+          nonLocalMaps, nonLocalRunningMaps);
+    }
+    public TaskAttemptID findReduceTask(String trackerName) 
+    throws IOException {
+      return findTask(trackerName, 
+          JobInProgress.convertTrackerNameToHostName(trackerName),
+          nonRunningReduces, runningReduces);
+    }
+    public void finishTask(TaskAttemptID taskId) {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          1.0f, TaskStatus.State.SUCCEEDED, "", "", tip.machineWhereTaskRan(taskId), 
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+    private void makeRunning(TaskAttemptID taskId, TaskInProgress tip, 
+        String taskTracker) {
+      addRunningTaskToTIP(tip, taskId, new TaskTrackerStatus(taskTracker,
+          JobInProgress.convertTrackerNameToHostName(taskTracker)), true);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          0.0f, TaskStatus.State.RUNNING, "", "", taskTracker,
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+    public void progressMade(TaskAttemptID taskId, float progress) {
+      TaskInProgress tip = jobtracker.taskidToTIPMap.get(taskId);
+      TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), taskId, 
+          progress, TaskStatus.State.RUNNING, "", "", tip.machineWhereTaskRan(taskId), 
+          tip.isMapTask() ? Phase.MAP : Phase.REDUCE, new Counters());
+      updateTaskStatus(tip, status);
+    }
+  }
+
+  public void testIsSlowTracker() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(10);
+    conf.setNumReduceTasks(0);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);    
+    job.initTasks();
+    //schedule some tasks
+    taskAttemptID[0] = job.findMapTask(trackers[0]);
+    taskAttemptID[1] = job.findMapTask(trackers[0]);
+    taskAttemptID[2] = job.findMapTask(trackers[0]);
+    taskAttemptID[3] = job.findMapTask(trackers[1]);
+    taskAttemptID[4] = job.findMapTask(trackers[1]);
+    taskAttemptID[5] = job.findMapTask(trackers[1]);
+    taskAttemptID[6] = job.findMapTask(trackers[2]);
+    taskAttemptID[7] = job.findMapTask(trackers[2]);
+    taskAttemptID[8] = job.findMapTask(trackers[2]);
+    clock.advance(1000);
+    //Some tasks finish in 1 second (on trackers[0])
+    job.finishTask(taskAttemptID[0]);
+    job.finishTask(taskAttemptID[1]);
+    job.finishTask(taskAttemptID[2]);
+    clock.advance(1000);
+    //Some tasks finish in 2 second (on trackers[1])
+    job.finishTask(taskAttemptID[3]);
+    job.finishTask(taskAttemptID[4]);
+    job.finishTask(taskAttemptID[5]);
+    assertEquals("Tracker "+ trackers[0] + " expected to be not slow ",
+        job.isSlowTracker(trackers[0]), false);
+    clock.advance(100000);
+    //After a long time, some tasks finished on trackers[2]
+    job.finishTask(taskAttemptID[6]);
+    job.finishTask(taskAttemptID[7]);
+    job.finishTask(taskAttemptID[8]);
+    assertEquals("Tracker "+ trackers[2] + " expected to be slow ",
+        job.isSlowTracker(trackers[2]), true);
+  }
+  
+  public void testTaskToSpeculate() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[6];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(5);
+    conf.setNumReduceTasks(5);
+    conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);    
+    job.initTasks();
+    //schedule maps
+    taskAttemptID[0] = job.findReduceTask(trackers[0]);
+    taskAttemptID[1] = job.findReduceTask(trackers[1]);
+    taskAttemptID[2] = job.findReduceTask(trackers[2]);
+    taskAttemptID[3] = job.findReduceTask(trackers[3]);
+    taskAttemptID[4] = job.findReduceTask(trackers[3]);
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[0]);
+    clock.advance(1000);
+    job.finishTask(taskAttemptID[1]);
+    clock.advance(20000);
+    //we should get a speculative task now
+    taskAttemptID[5] = job.findReduceTask(trackers[4]);
+    assertEquals(taskAttemptID[5].getTaskID().getId(),2);
+    clock.advance(5000);
+    job.finishTask(taskAttemptID[5]);
+    
+    taskAttemptID[5] = job.findReduceTask(trackers[4]);
+    assertEquals(taskAttemptID[5].getTaskID().getId(),3);
+    
+  }
+  
+  /*
+   * Tests the fact that we choose tasks with lesser progress
+   * among the possible candidates for speculation
+   */
+  public void testTaskLATEScheduling() throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[20];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(5);
+    conf.setNumReduceTasks(0);
+    conf.setFloat("mapred.speculative.execution.slowTaskThreshold", 0.5f);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+
+    taskAttemptID[0] = job.findMapTask(trackers[0]);
+    taskAttemptID[1] = job.findMapTask(trackers[1]);
+    taskAttemptID[2] = job.findMapTask(trackers[2]);
+    taskAttemptID[3] = job.findMapTask(trackers[3]);
+    clock.advance(2000);
+    job.finishTask(taskAttemptID[0]);
+    job.finishTask(taskAttemptID[1]);
+    job.finishTask(taskAttemptID[2]);
+    clock.advance(28000);
+    taskAttemptID[4] = job.findMapTask(trackers[3]);
+    clock.advance(5000);
+    //by doing the above clock adjustments, we bring the progress rate of 
+    //taskID 3 lower than 4. For taskID 3, the rate is 85/35000
+    //and for taskID 4, the rate is 20/5000. But when we ask for a spec task
+    //now, we should get back taskID 4 (since that is expected to complete
+    //later than taskID 3).
+    job.progressMade(taskAttemptID[3], 0.85f);
+    job.progressMade(taskAttemptID[4], 0.20f);
+    taskAttemptID[5] = job.findMapTask(trackers[4]);
+    assertEquals(taskAttemptID[5].getTaskID().getId(),4);
+  }
+  
+  /*
+   * Tests the fact that we only launch a limited number of speculative tasks,
+   * even though we have a lot of tasks in RUNNING state
+   */
+  public void testAtSpeculativeCap() throws IOException {
+    //The expr which is evaluated for determining whether atSpeculativeCap should
+    //return true or false is
+    //(#speculative-tasks < max (10, 0.01*#slots, 0.1*#running-tasks)
+    
+    //Tests the fact that the max tasks launched is 0.1 * #running-tasks
+    assertEquals(speculativeCap(1200,800,20), 40);
+    //Tests the fact that the max tasks launched is 10
+    assertEquals(speculativeCap(1200,1150,20), 10);
+    //Tests the fact that the max tasks launched is 0.01 * #slots
+    assertEquals(speculativeCap(1200,1150,2000), 20);
+  }
+  
+  private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
+  throws IOException {
+    TaskAttemptID[] taskAttemptID = new TaskAttemptID[1500];
+    JobConf conf = new JobConf();
+    conf.setSpeculativeExecution(true);
+    conf.setNumMapTasks(totalTasks);
+    conf.setNumReduceTasks(0);
+    jobTracker.setNumSlots(slots);
+    FakeJobInProgress job = new FakeJobInProgress(conf, jobTracker);
+    job.initTasks();
+    int i;
+    for (i = 0; i < totalTasks; i++) {
+      taskAttemptID[i] = job.findMapTask(trackers[0]);
+    }
+    clock.advance(5000);
+    for (i = 0; i < numEarlyComplete; i++) {
+      job.finishTask(taskAttemptID[i]);
+    }
+    for (i = numEarlyComplete; i < totalTasks; i++) {
+      job.progressMade(taskAttemptID[i], 0.85f);
+    }
+    clock.advance(50000);
+    for (i = 0; i < (totalTasks - numEarlyComplete); i++) {
+      taskAttemptID[i] = job.findMapTask(trackers[1]);
+      clock.advance(2000);
+      if (taskAttemptID[i] != null) {
+        //add some good progress constantly for the different task-attempts so that
+        //the tasktracker doesn't get into the slow trackers category
+        job.progressMade(taskAttemptID[i], 0.99f);
+      } else {
+        break;
+      }
+    }
+    return i;
+  }
+}

+ 11 - 4
src/test/mapred/org/apache/hadoop/mapred/UtilsForTests.java

@@ -24,9 +24,6 @@ import java.io.DataOutputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.FileNotFoundException;
 import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.Enumeration;
@@ -35,7 +32,6 @@ import java.util.Properties;
 
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.examples.RandomWriter;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSTestUtil;
@@ -650,7 +646,18 @@ public class UtilsForTests {
 
     return job;
   }
+  static class FakeClock extends Clock {
+    long time = 0;
+    
+    public void advance(long millis) {
+      time += millis;
+    }
 
+    @Override
+    long getTime() {
+      return time;
+    }
+  }
   // Mapper that fails
   static class FailMapper extends MapReduceBase implements
       Mapper<WritableComparable, Writable, WritableComparable, Writable> {