瀏覽代碼

Fixed bad commit for MAPREDUCE-901.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1157454 13f79535-47bb-0310-9956-ffa450edef68
Arun Murthy 13 年之前
父節點
當前提交
b2b976a03b

+ 0 - 5
mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -1251,11 +1251,6 @@ public class JobInProgress {
    * @return the job-level counters.
    */
   public synchronized Counters getJobCounters() {
-    try {
-      throw new IOException("");
-    } catch (IOException ioe) {
-      LOG.info("getJC", ioe);
-    }
     return jobCounters;
   }
   

+ 0 - 3729
mapreduce/src/java/org/apache/hadoop/mapred/JobInProgress.java.orig

@@ -1,3729 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
-import java.util.EnumMap;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
-import java.util.Iterator;
-import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.Vector;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobCounter;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.MRJobConfig;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.jobhistory.JobFinishedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobHistory;
-import org.apache.hadoop.mapreduce.jobhistory.JobInfoChangeEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobInitedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobPriorityChangeEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobStatusChangedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobSubmittedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
-import org.apache.hadoop.mapreduce.jobhistory.MapAttemptFinishedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.ReduceAttemptFinishedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptStartedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.TaskAttemptUnsuccessfulCompletionEvent;
-import org.apache.hadoop.mapreduce.jobhistory.TaskFailedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.TaskFinishedEvent;
-import org.apache.hadoop.mapreduce.jobhistory.TaskStartedEvent;
-import org.apache.hadoop.mapreduce.security.TokenCache;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.mapreduce.split.JobSplit;
-import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader;
-import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
-import org.apache.hadoop.mapreduce.task.JobContextImpl;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NetworkTopology;
-import org.apache.hadoop.net.Node;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.StringUtils;
-
-/**
- * JobInProgress maintains all the info for keeping a Job on the straight and
- * narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
- * tables for doing bookkeeping of its Tasks.
- */
-@InterfaceAudience.LimitedPrivate({"MapReduce"})
-@InterfaceStability.Unstable
-public class JobInProgress {
-  /**
-   * Used when the a kill is issued to a job which is initializing.
-   */
-  static class KillInterruptedException extends InterruptedException {
-   private static final long serialVersionUID = 1L;
-    public KillInterruptedException(String msg) {
-      super(msg);
-    }
-  }
-  
-  static final Log LOG = LogFactory.getLog(JobInProgress.class);
-    
-  JobProfile profile;
-  JobStatus status;
-  Path jobFile = null;
-  Path localJobFile = null;
-
-  TaskInProgress maps[] = new TaskInProgress[0];
-  TaskInProgress reduces[] = new TaskInProgress[0];
-  TaskInProgress cleanup[] = new TaskInProgress[0];
-  TaskInProgress setup[] = new TaskInProgress[0];
-  int numMapTasks = 0;
-  int numReduceTasks = 0;
-  final long memoryPerMap;
-  final long memoryPerReduce;
-  volatile int numSlotsPerMap = 1;
-  volatile int numSlotsPerReduce = 1;
-  final int maxTaskFailuresPerTracker;
-  
-  // Counters to track currently running/finished/failed Map/Reduce task-attempts
-  int runningMapTasks = 0;
-  int runningReduceTasks = 0;
-  int finishedMapTasks = 0;
-  int finishedReduceTasks = 0;
-  int failedMapTasks = 0; 
-  int failedReduceTasks = 0;
-  
-  static final float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
-  int completedMapsForReduceSlowstart = 0;
-  
-  // runningMapTasks include speculative tasks, so we need to capture 
-  // speculative tasks separately 
-  int speculativeMapTasks = 0;
-  int speculativeReduceTasks = 0;
-  
-  int mapFailuresPercent = 0;
-  int reduceFailuresPercent = 0;
-  int failedMapTIPs = 0;
-  int failedReduceTIPs = 0;
-  private volatile boolean launchedCleanup = false;
-  private volatile boolean launchedSetup = false;
-  private volatile boolean jobKilled = false;
-  private volatile boolean jobFailed = false;
-  private final boolean jobSetupCleanupNeeded;
-  private final boolean taskCleanupNeeded;
-
-  JobPriority priority = JobPriority.NORMAL;
-  protected JobTracker jobtracker;
-  
-  protected Credentials tokenStorage;
-  
-  JobHistory jobHistory;
-
-  // NetworkTopology Node to the set of TIPs
-  Map<Node, List<TaskInProgress>> nonRunningMapCache;
-  
-  // Map of NetworkTopology Node to set of running TIPs
-  Map<Node, Set<TaskInProgress>> runningMapCache;
-
-  // A list of non-local non-running maps
-  List<TaskInProgress> nonLocalMaps;
-
-  // A set of non-local running maps
-  Set<TaskInProgress> nonLocalRunningMaps;
-
-  // A list of non-running reduce TIPs
-  List<TaskInProgress> nonRunningReduces;
-
-  // A set of running reduce TIPs
-  Set<TaskInProgress> runningReduces;
-  
-  // A list of cleanup tasks for the map task attempts, to be launched
-  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
-  
-  // A list of cleanup tasks for the reduce task attempts, to be launched
-  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
-
-  int maxLevel;
-
-  /**
-   * A special value indicating that 
-   * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
-   * schedule any available map tasks for this job, including speculative tasks.
-   */
-  int anyCacheLevel;
-  
-  /**
-   * A special value indicating that 
-   * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
-   * schedule any only off-switch and speculative map tasks for this job.
-   */
-  private static final int NON_LOCAL_CACHE_LEVEL = -1;
-
-  private int taskCompletionEventTracker = 0; 
-  List<TaskCompletionEvent> taskCompletionEvents;
-    
-  // The maximum percentage of trackers in cluster added to the 'blacklist'.
-  private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
-  
-  // The maximum percentage of fetch failures allowed for a map 
-  private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
-  
-  // No. of tasktrackers in the cluster
-  private volatile int clusterSize = 0;
-  
-  // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker()
-  // tasks have failed
-  private volatile int flakyTaskTrackers = 0;
-  // Map of trackerHostName -> no. of task failures
-  private Map<String, Integer> trackerToFailuresMap = 
-    new TreeMap<String, Integer>();
-    
-  //Confine estimation algorithms to an "oracle" class that JIP queries.
-  ResourceEstimator resourceEstimator; 
-  
-  long startTime;
-  long launchTime;
-  long finishTime;
-
-  // First *task launch times
-  final Map<TaskType, Long> firstTaskLaunchTimes =
-      new EnumMap<TaskType, Long>(TaskType.class);
-  
-  // Indicates how many times the job got restarted
-  private final int restartCount;
-
-  JobConf conf;
-  protected AtomicBoolean tasksInited = new AtomicBoolean(false);
-  private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
-
-  LocalFileSystem localFs;
-  FileSystem fs;
-  String user;
-  JobID jobId;
-  volatile private boolean hasSpeculativeMaps;
-  volatile private boolean hasSpeculativeReduces;
-  long inputLength = 0;
-  
-  Counters jobCounters = new Counters();
-  
-  // Maximum no. of fetch-failure notifications after which map task is killed
-  private static final int MAX_FETCH_FAILURES_NOTIFICATIONS = 3;
-
-  // Don't lower speculativeCap below one TT's worth (for small clusters)
-  private static final int MIN_SPEC_CAP = 10;
-  
-  private static final float MIN_SLOTS_CAP = 0.01f;
-  
-  // Map of mapTaskId -> no. of fetch failures
-  private Map<TaskAttemptID, Integer> mapTaskIdToFetchFailuresMap =
-    new TreeMap<TaskAttemptID, Integer>();
-
-  private Object schedulingInfo;
-  private String submitHostName;
-  private String submitHostAddress;
-
-  //thresholds for speculative execution
-  float slowTaskThreshold;
-  float speculativeCap;
-  float slowNodeThreshold; //standard deviations
-
-  //Statistics are maintained for a couple of things
-  //mapTaskStats is used for maintaining statistics about
-  //the completion time of map tasks on the trackers. On a per
-  //tracker basis, the mean time for task completion is maintained
-  private DataStatistics mapTaskStats = new DataStatistics();
-  //reduceTaskStats is used for maintaining statistics about
-  //the completion time of reduce tasks on the trackers. On a per
-  //tracker basis, the mean time for task completion is maintained
-  private DataStatistics reduceTaskStats = new DataStatistics();
-  //trackerMapStats used to maintain a mapping from the tracker to the
-  //the statistics about completion time of map tasks
-  private Map<String,DataStatistics> trackerMapStats = 
-    new HashMap<String,DataStatistics>();
-  //trackerReduceStats used to maintain a mapping from the tracker to the
-  //the statistics about completion time of reduce tasks
-  private Map<String,DataStatistics> trackerReduceStats = 
-    new HashMap<String,DataStatistics>();
-  //runningMapStats used to maintain the RUNNING map tasks' statistics 
-  private DataStatistics runningMapTaskStats = new DataStatistics();
-  //runningReduceStats used to maintain the RUNNING reduce tasks' statistics
-  private DataStatistics runningReduceTaskStats = new DataStatistics();
- 
-  private static class FallowSlotInfo {
-    long timestamp;
-    int numSlots;
-    
-    public FallowSlotInfo(long timestamp, int numSlots) {
-      this.timestamp = timestamp;
-      this.numSlots = numSlots;
-    }
-
-    public long getTimestamp() {
-      return timestamp;
-    }
-
-    public void setTimestamp(long timestamp) {
-      this.timestamp = timestamp;
-    }
-
-    public int getNumSlots() {
-      return numSlots;
-    }
-
-    public void setNumSlots(int numSlots) {
-      this.numSlots = numSlots;
-    }
-  }
-  
-  private Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps = 
-    new HashMap<TaskTracker, FallowSlotInfo>();
-  private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
-    new HashMap<TaskTracker, FallowSlotInfo>();
-  private Path jobSubmitDir = null;
-  
-  /**
-   * Create an almost empty JobInProgress, which can be used only for tests
-   */
-  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
-    this.conf = conf;
-    this.jobId = jobid;
-    this.numMapTasks = conf.getNumMapTasks();
-    this.numReduceTasks = conf.getNumReduceTasks();
-    this.maxLevel = NetworkTopology.DEFAULT_HOST_LEVEL;
-    this.anyCacheLevel = this.maxLevel+1;
-    this.jobtracker = tracker;
-    this.restartCount = 0;
-    this.profile = new JobProfile(conf.getUser(), jobid, "", "", 
-                                  conf.getJobName(),conf.getQueueName());
-
-    this.memoryPerMap = conf.getMemoryForMapTask();
-    this.memoryPerReduce = conf.getMemoryForReduceTask();
-
-    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
-
-    
-    hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-    hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
-    this.nonLocalMaps = new LinkedList<TaskInProgress>();
-    this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
-    this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
-    this.nonRunningReduces = new LinkedList<TaskInProgress>();    
-    this.runningReduces = new LinkedHashSet<TaskInProgress>();
-    this.resourceEstimator = new ResourceEstimator(this);
-    this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP, 
-        this.profile.getUser(), this.profile.getJobName(), 
-        this.profile.getJobFile(), "");
-    this.jobtracker.getInstrumentation().addPrepJob(conf, jobid);
-    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
-    (numMapTasks + numReduceTasks + 10);
-    
-    this.slowTaskThreshold = Math.max(0.0f,
-        conf.getFloat(MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD,1.0f));
-    this.speculativeCap = conf.getFloat(
-        MRJobConfig.SPECULATIVECAP,0.1f);
-    this.slowNodeThreshold = conf.getFloat(
-        MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD,1.0f);
-    this.jobSetupCleanupNeeded = conf.getBoolean(
-        MRJobConfig.SETUP_CLEANUP_NEEDED, true);
-    this.taskCleanupNeeded = conf.getBoolean(
-        MRJobConfig.TASK_CLEANUP_NEEDED, true);
-    if (tracker != null) { // Some mock tests have null tracker
-      this.jobHistory = tracker.getJobHistory();
-    }
-    this.tokenStorage = null;
-  }
-  
-  JobInProgress(JobConf conf) {
-    restartCount = 0;
-    jobSetupCleanupNeeded = false;
-    taskCleanupNeeded = true;
-
-    this.memoryPerMap = conf.getMemoryForMapTask();
-    this.memoryPerReduce = conf.getMemoryForReduceTask();
-
-    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
-  }
-  
-  /**
-   * Create a JobInProgress with the given job file, plus a handle
-   * to the tracker.
-   */
-  public JobInProgress(JobTracker jobtracker, 
-                       final JobConf default_conf, int rCount,
-                       JobInfo jobInfo,
-                       Credentials ts
-                      ) throws IOException, InterruptedException {
-    try {
-      this.restartCount = rCount;
-      this.jobId = JobID.downgrade(jobInfo.getJobID());
-      String url = "http://" + jobtracker.getJobTrackerMachine() + ":"
-          + jobtracker.getInfoPort() + "/jobdetails.jsp?jobid=" + this.jobId;
-      this.jobtracker = jobtracker;
-      this.jobHistory = jobtracker.getJobHistory();
-      this.startTime = System.currentTimeMillis();
-
-      this.localFs = jobtracker.getLocalFileSystem();
-      this.tokenStorage = ts;
-      // use the user supplied token to add user credentials to the conf
-      jobSubmitDir = jobInfo.getJobSubmitDir();
-      user = jobInfo.getUser().toString();
-
-      UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
-      if (ts != null) {
-        for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
-          ugi.addToken(token);
-        }
-      }
-
-      fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() {
-        public FileSystem run() throws IOException {
-          return jobSubmitDir.getFileSystem(default_conf);
-        }
-      });
-      this.localJobFile = default_conf.getLocalPath(JobTracker.SUBDIR + "/"
-          + this.jobId + ".xml");
-
-      jobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
-      fs.copyToLocalFile(jobFile, localJobFile);
-      conf = new JobConf(localJobFile);
-      if (conf.getUser() == null) {
-        this.conf.setUser(user);
-      }
-      if (!conf.getUser().equals(user)) {
-        String desc = "The username " + conf.getUser() + " obtained from the "
-            + "conf doesn't match the username " + user + " the user "
-            + "authenticated as";
-        AuditLogger.logFailure(user, Operation.SUBMIT_JOB.name(),
-            conf.getUser(), jobId.toString(), desc);
-        throw new IOException(desc);
-      }
-
-      String userGroups[] = ugi.getGroupNames();
-      String primaryGroup = (userGroups.length > 0) ? userGroups[0] : null;
-      if (primaryGroup != null) {
-        conf.set("group.name", primaryGroup);
-      }
-
-      this.priority = conf.getJobPriority();
-      this.profile = new JobProfile(conf.getUser(), this.jobId, jobFile
-          .toString(), url, conf.getJobName(), conf.getQueueName());
-      this.status = new JobStatus(this.jobId, 0.0f, 0.0f, JobStatus.PREP,
-          profile.getUser(), profile.getJobName(), profile.getJobFile(),
-          profile.getURL().toString());
-      this.jobtracker.getInstrumentation().addPrepJob(conf, this.jobId);
-      status.setStartTime(startTime);
-      this.status.setJobPriority(this.priority);
-
-      this.numMapTasks = conf.getNumMapTasks();
-      this.numReduceTasks = conf.getNumReduceTasks();
-
-      this.memoryPerMap = conf.getMemoryForMapTask();
-      this.memoryPerReduce = conf.getMemoryForReduceTask();
-
-      this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
-          numMapTasks + numReduceTasks + 10);
-      JobContext jobContext = new JobContextImpl(conf, jobId);
-      this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
-      this.taskCleanupNeeded = jobContext.getTaskCleanupNeeded();
-
-      // Construct the jobACLs
-      status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
-
-      this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
-      this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
-
-      this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
-
-      hasSpeculativeMaps = conf.getMapSpeculativeExecution();
-      hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
-      this.maxLevel = jobtracker.getNumTaskCacheLevels();
-      this.anyCacheLevel = this.maxLevel + 1;
-      this.nonLocalMaps = new LinkedList<TaskInProgress>();
-      this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
-      this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
-      this.nonRunningReduces = new LinkedList<TaskInProgress>();
-      this.runningReduces = new LinkedHashSet<TaskInProgress>();
-      this.resourceEstimator = new ResourceEstimator(this);
-      this.submitHostName = conf.getJobSubmitHostName();
-      this.submitHostAddress = conf.getJobSubmitHostAddress();
-
-      this.slowTaskThreshold = Math.max(0.0f, conf.getFloat(
-          MRJobConfig.SPECULATIVE_SLOWTASK_THRESHOLD, 1.0f));
-      this.speculativeCap = conf.getFloat(MRJobConfig.SPECULATIVECAP, 0.1f);
-      this.slowNodeThreshold = conf.getFloat(
-          MRJobConfig.SPECULATIVE_SLOWNODE_THRESHOLD, 1.0f);
-      // register job's tokens for renewal
-      DelegationTokenRenewal.registerDelegationTokensForRenewal(jobInfo
-          .getJobID(), ts, jobtracker.getConf());
-    } finally {
-      // close all FileSystems that was created above for the current user
-      // At this point, this constructor is called in the context of an RPC, and
-      // hence the "current user" is actually referring to the kerberos
-      // authenticated user (if security is ON).
-      FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
-    }
-  }
-    
-  private void printCache (Map<Node, List<TaskInProgress>> cache) {
-    LOG.info("The taskcache info:");
-    for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
-      List <TaskInProgress> tips = n.getValue();
-      LOG.info("Cached TIPs on node: " + n.getKey());
-      for (TaskInProgress tip : tips) {
-        LOG.info("tip : " + tip.getTIPId());
-      }
-    }
-  }
-  
-  Map<Node, List<TaskInProgress>> createCache(
-                         TaskSplitMetaInfo[] splits, int maxLevel) {
-    Map<Node, List<TaskInProgress>> cache = 
-      new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
-    
-    for (int i = 0; i < splits.length; i++) {
-      String[] splitLocations = splits[i].getLocations();
-      if (splitLocations.length == 0) {
-        nonLocalMaps.add(maps[i]);
-        continue;
-      }
-
-      for(String host: splitLocations) {
-        Node node = jobtracker.resolveAndAddToTopology(host);
-        LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
-        for (int j = 0; j < maxLevel; j++) {
-          List<TaskInProgress> hostMaps = cache.get(node);
-          if (hostMaps == null) {
-            hostMaps = new ArrayList<TaskInProgress>();
-            cache.put(node, hostMaps);
-            hostMaps.add(maps[i]);
-          }
-          //check whether the hostMaps already contains an entry for a TIP
-          //This will be true for nodes that are racks and multiple nodes in
-          //the rack contain the input for a tip. Note that if it already
-          //exists in the hostMaps, it must be the last element there since
-          //we process one TIP at a time sequentially in the split-size order
-          if (hostMaps.get(hostMaps.size() - 1) != maps[i]) {
-            hostMaps.add(maps[i]);
-          }
-          node = node.getParent();
-        }
-      }
-    }
-    return cache;
-  }
-  
-  /**
-   * Check if the job has been initialized.
-   * @return <code>true</code> if the job has been initialized, 
-   *         <code>false</code> otherwise
-   */
-  public boolean inited() {
-    return tasksInited.get();
-  }
-  
-  /**
-   * Get the user for the job
-   */
-  public String getUser() {
-    return user;
-  }
-
-  boolean getMapSpeculativeExecution() {
-    return hasSpeculativeMaps;
-  }
-  
-  boolean getReduceSpeculativeExecution() {
-    return hasSpeculativeReduces;
-  }
-  
-  long getMemoryForMapTask() {
-    return memoryPerMap;
-  }
-  
-  long getMemoryForReduceTask() {
-    return memoryPerReduce;
-  }
-  
-  /**
-   * Get the number of slots required to run a single map task-attempt.
-   * @return the number of slots required to run a single map task-attempt
-   */
-  int getNumSlotsPerMap() {
-    return numSlotsPerMap;
-  }
-
-  /**
-   * Set the number of slots required to run a single map task-attempt.
-   * This is typically set by schedulers which support high-ram jobs.
-   * @param slots the number of slots required to run a single map task-attempt
-   */
-  void setNumSlotsPerMap(int numSlotsPerMap) {
-    this.numSlotsPerMap = numSlotsPerMap;
-  }
-
-  /**
-   * Get the number of slots required to run a single reduce task-attempt.
-   * @return the number of slots required to run a single reduce task-attempt
-   */
-  int getNumSlotsPerReduce() {
-    return numSlotsPerReduce;
-  }
-
-  /**
-   * Set the number of slots required to run a single reduce task-attempt.
-   * This is typically set by schedulers which support high-ram jobs.
-   * @param slots the number of slots required to run a single reduce 
-   *              task-attempt
-   */
-  void setNumSlotsPerReduce(int numSlotsPerReduce) {
-    this.numSlotsPerReduce = numSlotsPerReduce;
-  }
-
-  /**
-   * Construct the splits, etc.  This is invoked from an async
-   * thread so that split-computation doesn't block anyone. Only the 
-   * {@link JobTracker} should invoke this api. Look 
-   * at {@link JobTracker#initJob(JobInProgress)} for more details.
-   */
-  public synchronized void initTasks() 
-  throws IOException, KillInterruptedException, UnknownHostException {
-    if (tasksInited.get() || isComplete()) {
-      return;
-    }
-    synchronized(jobInitKillStatus){
-      if(jobInitKillStatus.killed || jobInitKillStatus.initStarted) {
-        return;
-      }
-      jobInitKillStatus.initStarted = true;
-    }
-
-    LOG.info("Initializing " + jobId);
-
-    logSubmissionToJobHistory();
-    
-    // log the job priority
-    setPriority(this.priority);
-    
-    //
-    // generate security keys needed by Tasks
-    //
-    generateAndStoreTokens();
-    
-    //
-    // read input splits and create a map per a split
-    //
-    TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(jobId);
-    numMapTasks = taskSplitMetaInfo.length;
-
-    checkTaskLimits();
-
-    // Sanity check the locations so we don't create/initialize unnecessary tasks
-    for (TaskSplitMetaInfo split : taskSplitMetaInfo) {
-      NetUtils.verifyHostnames(split.getLocations());
-    }
-
-    jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
-    jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
-
-    createMapTasks(jobFile.toString(), taskSplitMetaInfo);
-    
-    if (numMapTasks > 0) { 
-      nonRunningMapCache = createCache(taskSplitMetaInfo,
-          maxLevel);
-    }
-        
-    // set the launch time
-    this.launchTime = JobTracker.getClock().getTime();
-
-    createReduceTasks(jobFile.toString());
-    
-    // Calculate the minimum number of maps to be complete before 
-    // we should start scheduling reduces
-    completedMapsForReduceSlowstart = 
-      (int)Math.ceil(
-          (conf.getFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 
-                         DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART) * 
-           numMapTasks));
-    
-    initSetupCleanupTasks(jobFile.toString());
-    
-    synchronized(jobInitKillStatus){
-      jobInitKillStatus.initDone = true;
-      if(jobInitKillStatus.killed) {
-        //setup not launched so directly terminate
-        throw new KillInterruptedException("Job " + jobId + " killed in init");
-      }
-    }
-    
-    tasksInited.set(true);
-    JobInitedEvent jie = new JobInitedEvent(
-        profile.getJobID(),  this.launchTime,
-        numMapTasks, numReduceTasks,
-        JobStatus.getJobRunState(JobStatus.PREP));
-    
-    jobHistory.logEvent(jie, jobId);
-   
-    // Log the number of map and reduce tasks
-    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks 
-             + " map tasks and " + numReduceTasks + " reduce tasks.");
-  }
-
-  // Returns true if the job is empty (0 maps, 0 reduces and no setup-cleanup)
-  // else return false.
-  synchronized boolean isJobEmpty() {
-    return maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded;
-  }
-  
-  synchronized boolean isSetupCleanupRequired() {
-   return jobSetupCleanupNeeded;
-  }
-
-  // Should be called once the init is done. This will complete the job 
-  // because the job is empty (0 maps, 0 reduces and no setup-cleanup).
-  synchronized void completeEmptyJob() {
-    jobComplete();
-  }
-
-  synchronized void completeSetup() {
-    setupComplete();
-  }
-
-  void logSubmissionToJobHistory() throws IOException {
-    // log job info
-    String username = conf.getUser();
-    if (username == null) { username = ""; }
-    String jobname = conf.getJobName();
-    String jobQueueName = conf.getQueueName();
-
-    setUpLocalizedJobConf(conf, jobId);
-    jobHistory.setupEventWriter(jobId, conf);
-    JobSubmittedEvent jse =
-        new JobSubmittedEvent(jobId, jobname, username, this.startTime,
-            jobFile.toString(), status.getJobACLs(), jobQueueName);
-    jobHistory.logEvent(jse, jobId);
-    
-  }
-
-  TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId) 
-  throws IOException {
-    TaskSplitMetaInfo[] allTaskSplitMetaInfo = 
-      SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, jobSubmitDir);
-    return allTaskSplitMetaInfo;
-  }
-
-  /**
-   * If the number of taks is greater than the configured value
-   * throw an exception that will fail job initialization
-   */
-  void checkTaskLimits() throws IOException {
-    int maxTasks = jobtracker.getMaxTasksPerJob();
-    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
-      throw new IOException(
-                "The number of tasks for this job " + 
-                (numMapTasks + numReduceTasks) +
-                " exceeds the configured limit " + maxTasks);
-    }
-  }
-
-  synchronized void createMapTasks(String jobFile, 
-		  TaskSplitMetaInfo[] splits) {
-    maps = new TaskInProgress[numMapTasks];
-    for(int i=0; i < numMapTasks; ++i) {
-      inputLength += splits[i].getInputDataLength();
-      maps[i] = new TaskInProgress(jobId, jobFile, 
-                                   splits[i], 
-                                   jobtracker, conf, this, 
-                                   i, numSlotsPerMap);
-    }
-    LOG.info("Input size for job " + jobId + " = " + inputLength
-        + ". Number of splits = " + splits.length);
-
-  }
-
-  synchronized void createReduceTasks(String jobFile) {
-    this.reduces = new TaskInProgress[numReduceTasks];
-    for (int i = 0; i < numReduceTasks; i++) {
-      reduces[i] = new TaskInProgress(jobId, jobFile, 
-                                      numMapTasks, i, 
-                                      jobtracker, conf, 
-                                      this, numSlotsPerReduce);
-      nonRunningReduces.add(reduces[i]);
-    }
-  }
-
-  
-  synchronized void initSetupCleanupTasks(String jobFile) {
-    if (!jobSetupCleanupNeeded) {
-      LOG.info("Setup/Cleanup not needed for job " + jobId);
-      // nothing to initialize
-      return;
-    }
-    // create cleanup two cleanup tips, one map and one reduce.
-    cleanup = new TaskInProgress[2];
-
-    // cleanup map tip. This map doesn't use any splits. Just assign an empty
-    // split.
-    TaskSplitMetaInfo emptySplit = JobSplit.EMPTY_TASK_SPLIT;
-    cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
-            jobtracker, conf, this, numMapTasks, 1);
-    cleanup[0].setJobCleanupTask();
-
-    // cleanup reduce tip.
-    cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
-                       numReduceTasks, jobtracker, conf, this, 1);
-    cleanup[1].setJobCleanupTask();
-
-    // create two setup tips, one map and one reduce.
-    setup = new TaskInProgress[2];
-
-    // setup map tip. This map doesn't use any split. Just assign an empty
-    // split.
-    setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
-            jobtracker, conf, this, numMapTasks + 1, 1);
-    setup[0].setJobSetupTask();
-
-    // setup reduce tip.
-    setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
-                       numReduceTasks + 1, jobtracker, conf, this, 1);
-    setup[1].setJobSetupTask();
-  }
-  
-  void setupComplete() {
-    status.setSetupProgress(1.0f);
-    if (this.status.getRunState() == JobStatus.PREP) {
-      changeStateTo(JobStatus.RUNNING);
-      JobStatusChangedEvent jse = 
-        new JobStatusChangedEvent(profile.getJobID(),
-         JobStatus.getJobRunState(JobStatus.RUNNING));
-      jobHistory.logEvent(jse, profile.getJobID());
-    }
-  }
-
-  /////////////////////////////////////////////////////
-  // Accessors for the JobInProgress
-  /////////////////////////////////////////////////////
-  public JobProfile getProfile() {
-    return profile;
-  }
-  public JobStatus getStatus() {
-    return status;
-  }
-  public synchronized long getLaunchTime() {
-    return launchTime;
-  }
-  Map<TaskType, Long> getFirstTaskLaunchTimes() {
-    return firstTaskLaunchTimes;
-  }
-  public long getStartTime() {
-    return startTime;
-  }
-  public long getFinishTime() {
-    return finishTime;
-  }
-  public int desiredMaps() {
-    return numMapTasks;
-  }
-  public synchronized int finishedMaps() {
-    return finishedMapTasks;
-  }
-  public int desiredReduces() {
-    return numReduceTasks;
-  }
-  public synchronized int runningMaps() {
-    return runningMapTasks;
-  }
-  public synchronized int runningReduces() {
-    return runningReduceTasks;
-  }
-  public synchronized int finishedReduces() {
-    return finishedReduceTasks;
-  }
-  public synchronized int pendingMaps() {
-    return numMapTasks - runningMapTasks - failedMapTIPs - 
-    finishedMapTasks + speculativeMapTasks;
-  }
-  public synchronized int pendingReduces() {
-    return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
-    finishedReduceTasks + speculativeReduceTasks;
-  }
- 
-  public int getNumSlotsPerTask(TaskType taskType) {
-    if (taskType == TaskType.MAP) {
-      return numSlotsPerMap;
-    } else if (taskType == TaskType.REDUCE) {
-      return numSlotsPerReduce;
-    } else {
-      return 1;
-    }
-  }
-  public JobPriority getPriority() {
-    return this.priority;
-  }
-  public void setPriority(JobPriority priority) {
-    if(priority == null) {
-      priority = JobPriority.NORMAL;
-    }
-    synchronized (this) {
-      this.priority = priority;
-      status.setJobPriority(priority);
-      // log and change to the job's priority
-      JobPriorityChangeEvent prEvent = 
-        new JobPriorityChangeEvent(jobId, priority);
-       
-      jobHistory.logEvent(prEvent, jobId);
-      
-    }
-  }
-
-  // Update the job start/launch time (upon restart) and log to history
-  synchronized void updateJobInfo(long startTime, long launchTime) {
-    // log and change to the job's start/launch time
-    this.startTime = startTime;
-    this.launchTime = launchTime;
-    JobInfoChangeEvent event = 
-      new JobInfoChangeEvent(jobId, startTime, launchTime);
-     
-    jobHistory.logEvent(event, jobId);
-    
-  }
-
-  /**
-   * Get the number of times the job has restarted
-   */
-  int getNumRestarts() {
-    return restartCount;
-  }
-  
-  long getInputLength() {
-    return inputLength;
-  }
- 
-  boolean isCleanupLaunched() {
-    return launchedCleanup;
-  }
-
-  boolean isSetupLaunched() {
-    return launchedSetup;
-  }
-
-  /** 
-   * Get all the tasks of the desired type in this job.
-   * @param type {@link TaskType} of the tasks required
-   * @return An array of {@link TaskInProgress} matching the given type. 
-   *         Returns an empty array if no tasks are found for the given type.  
-   */
-  TaskInProgress[] getTasks(TaskType type) {
-    TaskInProgress[] tasks = null;
-    switch (type) {
-      case MAP:
-      {
-        tasks = maps;
-      }
-      break;
-      case REDUCE:
-      {
-        tasks = reduces;
-      }
-      break;
-      case JOB_SETUP: 
-      {
-        tasks = setup;
-      }
-      break;
-      case JOB_CLEANUP:
-      {
-        tasks = cleanup;
-      }
-      break;
-      default:
-      {
-          tasks = new TaskInProgress[0];
-      }
-      break;
-    }
-    return tasks;
-  }
-
-  /**
-   * Return the nonLocalRunningMaps
-   * @return
-   */
-  Set<TaskInProgress> getNonLocalRunningMaps()
-  {
-    return nonLocalRunningMaps;
-  }
-  
-  /**
-   * Return the runningMapCache
-   * @return
-   */
-  Map<Node, Set<TaskInProgress>> getRunningMapCache()
-  {
-    return runningMapCache;
-  }
-  
-  /**
-   * Return runningReduces
-   * @return
-   */
-  Set<TaskInProgress> getRunningReduces()
-  {
-    return runningReduces;
-  }
-  
-  /**
-   * Get the job configuration
-   * @return the job's configuration
-   */
-  JobConf getJobConf() {
-    return conf;
-  }
-    
-  /**
-   * Return a vector of completed TaskInProgress objects
-   */
-  public synchronized Vector<TaskInProgress> reportTasksInProgress(boolean shouldBeMap,
-                                                      boolean shouldBeComplete) {
-    
-    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
-    TaskInProgress tips[] = null;
-    if (shouldBeMap) {
-      tips = maps;
-    } else {
-      tips = reduces;
-    }
-    for (int i = 0; i < tips.length; i++) {
-      if (tips[i].isComplete() == shouldBeComplete) {
-        results.add(tips[i]);
-      }
-    }
-    return results;
-  }
-  
-  /**
-   * Return a vector of cleanup TaskInProgress objects
-   */
-  public synchronized Vector<TaskInProgress> reportCleanupTIPs(
-                                               boolean shouldBeComplete) {
-    
-    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
-    for (int i = 0; i < cleanup.length; i++) {
-      if (cleanup[i].isComplete() == shouldBeComplete) {
-        results.add(cleanup[i]);
-      }
-    }
-    return results;
-  }
-
-  /**
-   * Return a vector of setup TaskInProgress objects
-   */
-  public synchronized Vector<TaskInProgress> reportSetupTIPs(
-                                               boolean shouldBeComplete) {
-    
-    Vector<TaskInProgress> results = new Vector<TaskInProgress>();
-    for (int i = 0; i < setup.length; i++) {
-      if (setup[i].isComplete() == shouldBeComplete) {
-        results.add(setup[i]);
-      }
-    }
-    return results;
-  }
-
-  ////////////////////////////////////////////////////
-  // Status update methods
-  ////////////////////////////////////////////////////
-
-  /**
-   * Assuming {@link JobTracker} is locked on entry.
-   */
-  public synchronized void updateTaskStatus(TaskInProgress tip, 
-                                            TaskStatus status) {
-
-    double oldProgress = tip.getProgress();   // save old progress
-    boolean wasRunning = tip.isRunning();
-    boolean wasComplete = tip.isComplete();
-    boolean wasPending = tip.isOnlyCommitPending();
-    TaskAttemptID taskid = status.getTaskID();
-    boolean wasAttemptRunning = tip.isAttemptRunning(taskid);
-
-    
-    // If the TIP is already completed and the task reports as SUCCEEDED then 
-    // mark the task as KILLED.
-    // In case of task with no promotion the task tracker will mark the task 
-    // as SUCCEEDED.
-    // User has requested to kill the task, but TT reported SUCCEEDED, 
-    // mark the task KILLED.
-    if ((wasComplete || tip.wasKilled(taskid)) && 
-        (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
-      status.setRunState(TaskStatus.State.KILLED);
-    }
-    
-    // If the job is complete or task-cleanup is switched off
-    // and a task has just reported its state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
-    // make the task's state FAILED/KILLED without launching cleanup attempt.
-    // Note that if task is already a cleanup attempt, 
-    // we don't change the state to make sure the task gets a killTaskAction
-    if ((this.isComplete() || jobFailed || jobKilled || !taskCleanupNeeded) && 
-        !tip.isCleanupAttempt(taskid)) {
-      if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
-        status.setRunState(TaskStatus.State.FAILED);
-      } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
-        status.setRunState(TaskStatus.State.KILLED);
-      }
-    }
-    
-    boolean change = tip.updateStatus(status);
-    if (change) {
-      TaskStatus.State state = status.getRunState();
-      // get the TaskTrackerStatus where the task ran 
-      TaskTracker taskTracker = 
-        this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
-      TaskTrackerStatus ttStatus = 
-        (taskTracker == null) ? null : taskTracker.getStatus();
-      String taskTrackerHttpLocation = null; 
-
-      if (null != ttStatus){
-        String host;
-        if (NetUtils.getStaticResolution(ttStatus.getHost()) != null) {
-          host = NetUtils.getStaticResolution(ttStatus.getHost());
-        } else {
-          host = ttStatus.getHost();
-        }
-        taskTrackerHttpLocation = "http://" + host + ":"
-            + ttStatus.getHttpPort(); 
-      }
-
-      TaskCompletionEvent taskEvent = null;
-      if (state == TaskStatus.State.SUCCEEDED) {
-        taskEvent = new TaskCompletionEvent(
-                                            taskCompletionEventTracker, 
-                                            taskid,
-                                            tip.idWithinJob(),
-                                            status.getIsMap() &&
-                                            !tip.isJobCleanupTask() &&
-                                            !tip.isJobSetupTask(),
-                                            TaskCompletionEvent.Status.SUCCEEDED,
-                                            taskTrackerHttpLocation 
-                                           );
-        taskEvent.setTaskRunTime((int)(status.getFinishTime() 
-                                       - status.getStartTime()));
-        tip.setSuccessEventNumber(taskCompletionEventTracker); 
-      } else if (state == TaskStatus.State.COMMIT_PENDING) {
-        // If it is the first attempt reporting COMMIT_PENDING
-        // ask the task to commit.
-        if (!wasComplete && !wasPending) {
-          tip.doCommit(taskid);
-        }
-        return;
-      } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
-                 state == TaskStatus.State.KILLED_UNCLEAN) {
-        tip.incompleteSubTask(taskid, this.status);
-        // add this task, to be rescheduled as cleanup attempt
-        if (tip.isMapTask()) {
-          mapCleanupTasks.add(taskid);
-        } else {
-          reduceCleanupTasks.add(taskid);
-        }
-        // Remove the task entry from jobtracker
-        jobtracker.removeTaskEntry(taskid);
-      }
-      //For a failed task update the JT datastructures. 
-      else if (state == TaskStatus.State.FAILED ||
-               state == TaskStatus.State.KILLED) {
-        // Get the event number for the (possibly) previously successful
-        // task. If there exists one, then set that status to OBSOLETE 
-        int eventNumber;
-        if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
-          TaskCompletionEvent t = 
-            this.taskCompletionEvents.get(eventNumber);
-          if (t.getTaskAttemptId().equals(taskid))
-            t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
-        }
-        
-        // Tell the job to fail the relevant task
-        failedTask(tip, taskid, status, taskTracker,
-                   wasRunning, wasComplete, wasAttemptRunning);
-
-        // Did the task failure lead to tip failure?
-        TaskCompletionEvent.Status taskCompletionStatus = 
-          (state == TaskStatus.State.FAILED ) ?
-              TaskCompletionEvent.Status.FAILED :
-              TaskCompletionEvent.Status.KILLED;
-        if (tip.isFailed()) {
-          taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
-        }
-        taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
-                                            taskid,
-                                            tip.idWithinJob(),
-                                            status.getIsMap() &&
-                                            !tip.isJobCleanupTask() &&
-                                            !tip.isJobSetupTask(),
-                                            taskCompletionStatus, 
-                                            taskTrackerHttpLocation
-                                           );
-      }          
-
-      // Add the 'complete' task i.e. successful/failed
-      // It _is_ safe to add the TaskCompletionEvent.Status.SUCCEEDED
-      // *before* calling TIP.completedTask since:
-      // a. One and only one task of a TIP is declared as a SUCCESS, the
-      //    other (speculative tasks) are marked KILLED
-      // b. TIP.completedTask *does not* throw _any_ exception at all.
-      if (taskEvent != null) {
-        this.taskCompletionEvents.add(taskEvent);
-        taskCompletionEventTracker++;
-        JobTrackerStatistics.TaskTrackerStat ttStat = jobtracker.
-           getStatistics().getTaskTrackerStat(tip.machineWhereTaskRan(taskid));
-        if(ttStat != null) { // ttStat can be null in case of lost tracker
-          ttStat.incrTotalTasks();
-        }
-        if (state == TaskStatus.State.SUCCEEDED) {
-          completedTask(tip, status);
-          if(ttStat != null) {
-            ttStat.incrSucceededTasks();
-          }
-        }
-      }
-    }
-        
-    //
-    // Update JobInProgress status
-    //
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Taking progress for " + tip.getTIPId() + " from " + 
-                 oldProgress + " to " + tip.getProgress());
-    }
-    
-    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
-      double progressDelta = tip.getProgress() - oldProgress;
-      if (tip.isMapTask()) {
-          this.status.setMapProgress((float) (this.status.mapProgress() +
-                                              progressDelta / maps.length));
-      } else {
-        this.status.setReduceProgress((float) (this.status.reduceProgress() + 
-                                           (progressDelta / reduces.length)));
-      }
-    }
-  }
-
-  /**
-   * Returns the job-level counters.
-   * 
-   * @return the job-level counters.
-   */
-  public synchronized Counters getJobCounters() {
-    return jobCounters;
-  }
-  
-  /**
-   *  Returns map phase counters by summing over all map tasks in progress.
-   */
-  public synchronized Counters getMapCounters() {
-    return incrementTaskCounters(new Counters(), maps);
-  }
-    
-  /**
-   *  Returns map phase counters by summing over all map tasks in progress.
-   */
-  public synchronized Counters getReduceCounters() {
-    return incrementTaskCounters(new Counters(), reduces);
-  }
-    
-  /**
-   *  Returns the total job counters, by adding together the job, 
-   *  the map and the reduce counters.
-   */
-  public Counters getCounters() {
-    Counters result = new Counters();
-    synchronized (this) {
-      result.incrAllCounters(getJobCounters());
-    }
-
-    // the counters of TIPs are not updated in place.
-    // hence read-only access is ok without any locks
-    incrementTaskCounters(result, maps);
-    return incrementTaskCounters(result, reduces);
-  }
-    
-  /**
-   * Increments the counters with the counters from each task.
-   * @param counters the counters to increment
-   * @param tips the tasks to add in to counters
-   * @return counters the same object passed in as counters
-   */
-  private Counters incrementTaskCounters(Counters counters,
-                                         TaskInProgress[] tips) {
-    for (TaskInProgress tip : tips) {
-      counters.incrAllCounters(tip.getCounters());
-    }
-    return counters;
-  }
-
-  /////////////////////////////////////////////////////
-  // Create/manage tasks
-  /////////////////////////////////////////////////////
-  /**
-   * Return a MapTask, if appropriate, to run on the given tasktracker
-   */
-  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
-                                            int clusterSize, 
-                                            int numUniqueHosts,
-                                            int maxCacheLevel
-                                           ) throws IOException {
-    if (status.getRunState() != JobStatus.RUNNING) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      return null;
-    }
-       
-    int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
-        maxCacheLevel);
-    if (target == -1) {
-      return null;
-    }
-    
-    Task result = maps[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
-    }
-
-    return result;
-  } 
-  
-  /**
-   * Return a MapTask, if appropriate, to run on the given tasktracker
-   */
-  public synchronized Task obtainNewMapTask(TaskTrackerStatus tts, 
-                                            int clusterSize, 
-                                            int numUniqueHosts
-                                           ) throws IOException {
-    return obtainNewMapTask(tts, clusterSize, numUniqueHosts, anyCacheLevel);
-  }    
-
-  /*
-   * Return task cleanup attempt if any, to run on a given tracker
-   */
-  public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
-                                                 boolean isMapSlot)
-  throws IOException {
-    if (!tasksInited.get()) {
-      return null;
-    }
-    synchronized (this) {
-      if (this.status.getRunState() != JobStatus.RUNNING || 
-          jobFailed || jobKilled) {
-        return null;
-      }
-      String taskTracker = tts.getTrackerName();
-      if (!shouldRunOnTaskTracker(taskTracker)) {
-        return null;
-      }
-      TaskAttemptID taskid = null;
-      TaskInProgress tip = null;
-      if (isMapSlot) {
-        if (!mapCleanupTasks.isEmpty()) {
-          taskid = mapCleanupTasks.remove(0);
-          tip = maps[taskid.getTaskID().getId()];
-        }
-      } else {
-        if (!reduceCleanupTasks.isEmpty()) {
-          taskid = reduceCleanupTasks.remove(0);
-          tip = reduces[taskid.getTaskID().getId()];
-        }
-      }
-      if (tip != null) {
-        return tip.addRunningTask(taskid, taskTracker, true);
-      }
-      return null;
-    }
-  }
-  
-  public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
-                                                     int clusterSize, 
-                                                     int numUniqueHosts)
-  throws IOException {
-    if (!tasksInited.get()) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      return null;
-    }
-  
-    return obtainNewMapTask(tts, clusterSize, numUniqueHosts, maxLevel);
-  }
-  
-  public synchronized Task obtainNewNonLocalMapTask(TaskTrackerStatus tts,
-                                                    int clusterSize, 
-                                                    int numUniqueHosts)
-  throws IOException {
-    if (!tasksInited.get()) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      return null;
-    }
-  
-    return obtainNewMapTask(tts, clusterSize, numUniqueHosts,
-        NON_LOCAL_CACHE_LEVEL);
-  }
-  
-  /**
-   * Return a CleanupTask, if appropriate, to run on the given tasktracker
-   * 
-   */
-  public Task obtainJobCleanupTask(TaskTrackerStatus tts, 
-                                             int clusterSize, 
-                                             int numUniqueHosts,
-                                             boolean isMapSlot
-                                            ) throws IOException {
-    if(!tasksInited.get() || !jobSetupCleanupNeeded) {
-      return null;
-    }
-    
-    synchronized(this) {
-      if (!canLaunchJobCleanupTask()) {
-        return null;
-      }
-      
-      String taskTracker = tts.getTrackerName();
-      // Update the last-known clusterSize
-      this.clusterSize = clusterSize;
-      if (!shouldRunOnTaskTracker(taskTracker)) {
-        return null;
-      }
-      
-      List<TaskInProgress> cleanupTaskList = new ArrayList<TaskInProgress>();
-      if (isMapSlot) {
-        cleanupTaskList.add(cleanup[0]);
-      } else {
-        cleanupTaskList.add(cleanup[1]);
-      }
-      TaskInProgress tip = findTaskFromList(cleanupTaskList,
-                             tts, numUniqueHosts, false);
-      if (tip == null) {
-        return null;
-      }
-      
-      // Now launch the cleanupTask
-      Task result = tip.getTaskToRun(tts.getTrackerName());
-      if (result != null) {
-        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
-        if (jobFailed) {
-          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
-                .State.FAILED);
-        } else if (jobKilled) {
-          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce.JobStatus
-                .State.KILLED);
-        } else {
-          result.setJobCleanupTaskState(org.apache.hadoop.mapreduce
-                .JobStatus.State.SUCCEEDED);
-        }
-      }
-      return result;
-    }
-    
-  }
-  
-  /**
-   * Check whether cleanup task can be launched for the job.
-   * 
-   * Cleanup task can be launched if it is not already launched
-   * or job is Killed
-   * or all maps and reduces are complete
-   * @return true/false
-   */
-  private synchronized boolean canLaunchJobCleanupTask() {
-    // check if the job is running
-    if (status.getRunState() != JobStatus.RUNNING &&
-        status.getRunState() != JobStatus.PREP) {
-      return false;
-    }
-    // check if cleanup task has been launched already or if setup isn't
-    // launched already. The later check is useful when number of maps is
-    // zero.
-    if (launchedCleanup || !isSetupFinished()) {
-      return false;
-    }
-    // check if job has failed or killed
-    if (jobKilled || jobFailed) {
-      return true;
-    }
-    // Check if all maps and reducers have finished.
-    boolean launchCleanupTask = 
-        ((finishedMapTasks + failedMapTIPs) == (numMapTasks));
-    if (launchCleanupTask) {
-      launchCleanupTask = 
-        ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
-    }
-    return launchCleanupTask;
-  }
-
-  /**
-   * Return a SetupTask, if appropriate, to run on the given tasktracker
-   * 
-   */
-  public Task obtainJobSetupTask(TaskTrackerStatus tts, 
-                                             int clusterSize, 
-                                             int numUniqueHosts,
-                                             boolean isMapSlot
-                                            ) throws IOException {
-    if(!tasksInited.get() || !jobSetupCleanupNeeded) {
-      return null;
-    }
-    
-    synchronized(this) {
-      if (!canLaunchSetupTask()) {
-        return null;
-      }
-      String taskTracker = tts.getTrackerName();
-      // Update the last-known clusterSize
-      this.clusterSize = clusterSize;
-      if (!shouldRunOnTaskTracker(taskTracker)) {
-        return null;
-      }
-      
-      List<TaskInProgress> setupTaskList = new ArrayList<TaskInProgress>();
-      if (isMapSlot) {
-        setupTaskList.add(setup[0]);
-      } else {
-        setupTaskList.add(setup[1]);
-      }
-      TaskInProgress tip = findTaskFromList(setupTaskList,
-                             tts, numUniqueHosts, false);
-      if (tip == null) {
-        return null;
-      }
-      
-      // Now launch the setupTask
-      Task result = tip.getTaskToRun(tts.getTrackerName());
-      if (result != null) {
-        addRunningTaskToTIP(tip, result.getTaskID(), tts, true);
-      }
-      return result;
-    }
-  }
-  
-  public synchronized boolean scheduleReduces() {
-    return finishedMapTasks >= completedMapsForReduceSlowstart;
-  }
-  
-  /**
-   * Check whether setup task can be launched for the job.
-   * 
-   * Setup task can be launched after the tasks are inited
-   * and Job is in PREP state
-   * and if it is not already launched
-   * or job is not Killed/Failed
-   * @return true/false
-   */
-  private synchronized boolean canLaunchSetupTask() {
-    return (tasksInited.get() && status.getRunState() == JobStatus.PREP && 
-           !launchedSetup && !jobKilled && !jobFailed);
-  }
-  
-
-  /**
-   * Return a ReduceTask, if appropriate, to run on the given tasktracker.
-   * We don't have cache-sensitivity for reduce tasks, as they
-   *  work on temporary MapRed files.  
-   */
-  public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
-                                               int clusterSize,
-                                               int numUniqueHosts
-                                              ) throws IOException {
-    if (status.getRunState() != JobStatus.RUNNING) {
-      LOG.info("Cannot create task split for " + profile.getJobID());
-      return null;
-    }
-    
-    // Ensure we have sufficient map outputs ready to shuffle before 
-    // scheduling reduces
-    if (!scheduleReduces()) {
-      return null;
-    }
-
-    int  target = findNewReduceTask(tts, clusterSize, numUniqueHosts);
-    if (target == -1) {
-      return null;
-    }
-    
-    Task result = reduces[target].getTaskToRun(tts.getTrackerName());
-    if (result != null) {
-      addRunningTaskToTIP(reduces[target], result.getTaskID(), tts, true);
-    }
-
-    return result;
-  }
-  
-  // returns the (cache)level at which the nodes matches
-  private int getMatchingLevelForNodes(Node n1, Node n2) {
-    int count = 0;
-    do {
-      if (n1.equals(n2)) {
-        return count;
-      }
-      ++count;
-      n1 = n1.getParent();
-      n2 = n2.getParent();
-    } while (n1 != null);
-    return this.maxLevel;
-  }
-
-  /**
-   * Populate the data structures as a task is scheduled.
-   * 
-   * Assuming {@link JobTracker} is locked on entry.
-   * 
-   * @param tip The tip for which the task is added
-   * @param id The attempt-id for the task
-   * @param tts task-tracker status
-   * @param isScheduled Whether this task is scheduled from the JT or has 
-   *        joined back upon restart
-   */
-  synchronized void addRunningTaskToTIP(TaskInProgress tip, TaskAttemptID id, 
-                                        TaskTrackerStatus tts, 
-                                        boolean isScheduled) {
-    // Make an entry in the tip if the attempt is not scheduled i.e externally
-    // added
-    if (!isScheduled) {
-      tip.addRunningTask(id, tts.getTrackerName());
-    }
-    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
-
-    // keeping the earlier ordering intact
-    TaskType name;
-    String splits = "";
-    Enum counter = null;
-    if (tip.isJobSetupTask()) {
-      launchedSetup = true;
-      name = TaskType.JOB_SETUP;
-    } else if (tip.isJobCleanupTask()) {
-      launchedCleanup = true;
-      name = TaskType.JOB_CLEANUP;
-    } else if (tip.isMapTask()) {
-      ++runningMapTasks;
-      name = TaskType.MAP;
-      counter = JobCounter.TOTAL_LAUNCHED_MAPS;
-      splits = tip.getSplitNodes();
-      if (tip.isSpeculating()) {
-        speculativeMapTasks++;
-        metrics.speculateMap(id);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Chosen speculative task, current speculativeMap task count: "
-                    + speculativeMapTasks);
-        }
-      }
-      metrics.launchMap(id);
-    } else {
-      ++runningReduceTasks;
-      name = TaskType.REDUCE;
-      counter = JobCounter.TOTAL_LAUNCHED_REDUCES;
-      if (tip.isSpeculating()) {
-        speculativeReduceTasks++;
-        metrics.speculateReduce(id);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Chosen speculative task, current speculativeReduce task count: "
-                    + speculativeReduceTasks);
-        }
-      }
-      metrics.launchReduce(id);
-    }
-    // Note that the logs are for the scheduled tasks only. Tasks that join on 
-    // restart has already their logs in place.
-    if (tip.isFirstAttempt(id)) {
-      TaskStartedEvent tse = new TaskStartedEvent(tip.getTIPId(), 
-          tip.getExecStartTime(),
-          name, splits);
-      
-      jobHistory.logEvent(tse, tip.getJob().jobId);
-      setFirstTaskLaunchTime(tip);
-    }
-    if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
-      jobCounters.incrCounter(counter, 1);
-    }
-    
-    //TODO The only problem with these counters would be on restart.
-    // The jobtracker updates the counter only when the task that is scheduled
-    // if from a non-running tip and is local (data, rack ...). But upon restart
-    // as the reports come from the task tracker, there is no good way to infer
-    // when exactly to increment the locality counters. The only solution is to 
-    // increment the counters for all the tasks irrespective of 
-    //    - whether the tip is running or not
-    //    - whether its a speculative task or not
-    //
-    // So to simplify, increment the data locality counter whenever there is 
-    // data locality.
-    if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
-      // increment the data locality counter for maps
-      int level = getLocalityLevel(tip, tts);
-      switch (level) {
-      case 0 :
-        LOG.info("Choosing data-local task " + tip.getTIPId());
-        jobCounters.incrCounter(JobCounter.DATA_LOCAL_MAPS, 1);
-        metrics.launchDataLocalMap(id);
-        break;
-      case 1:
-        LOG.info("Choosing rack-local task " + tip.getTIPId());
-        jobCounters.incrCounter(JobCounter.RACK_LOCAL_MAPS, 1);
-        metrics.launchRackLocalMap(id);
-        break;
-      default :
-        // check if there is any locality
-        if (level != this.maxLevel) {
-          LOG.info("Choosing cached task at level " + level + tip.getTIPId());
-          jobCounters.incrCounter(JobCounter.OTHER_LOCAL_MAPS, 1);
-        }
-        break;
-      }
-    }
-  }
-
-  void setFirstTaskLaunchTime(TaskInProgress tip) {
-    TaskType key = getTaskType(tip);
-
-    synchronized(firstTaskLaunchTimes) {
-      // Could be optimized to do only one lookup with a little more code
-      if (!firstTaskLaunchTimes.containsKey(key)) {
-        firstTaskLaunchTimes.put(key, tip.getExecStartTime());
-      }
-    }
-  }
-    
-  public static String convertTrackerNameToHostName(String trackerName) {
-    // Ugly!
-    // Convert the trackerName to it's host name
-    int indexOfColon = trackerName.indexOf(":");
-    String trackerHostName = (indexOfColon == -1) ? 
-      trackerName : 
-      trackerName.substring(0, indexOfColon);
-    return trackerHostName.substring("tracker_".length());
-  }
-    
-  /**
-   * Note that a task has failed on a given tracker and add the tracker  
-   * to the blacklist iff too many trackers in the cluster i.e. 
-   * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
-   * 
-   * @param taskTracker task-tracker on which a task failed
-   */
-  synchronized void addTrackerTaskFailure(String trackerName, 
-                                          TaskTracker taskTracker) {
-    if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { 
-      String trackerHostName = convertTrackerNameToHostName(trackerName);
-
-      Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
-      if (trackerFailures == null) {
-        trackerFailures = 0;
-      }
-      trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
-
-      // Check if this tasktracker has turned 'flaky'
-      if (trackerFailures.intValue() == maxTaskFailuresPerTracker) {
-        ++flakyTaskTrackers;
-        
-        // Cancel reservations if appropriate
-        if (taskTracker != null) {
-          if (trackersReservedForMaps.containsKey(taskTracker)) {
-            taskTracker.unreserveSlots(TaskType.MAP, this);
-          }
-          if (trackersReservedForReduces.containsKey(taskTracker)) {
-            taskTracker.unreserveSlots(TaskType.REDUCE, this);
-          }
-        }
-        LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
-      }
-    }
-  }
-  
-  public synchronized void reserveTaskTracker(TaskTracker taskTracker,
-                                              TaskType type, int numSlots) {
-    Map<TaskTracker, FallowSlotInfo> map =
-      (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
-    
-    long now = System.currentTimeMillis();
-    
-    FallowSlotInfo info = map.get(taskTracker);
-    int reservedSlots = 0;
-    if (info == null) {
-      info = new FallowSlotInfo(now, numSlots);
-      reservedSlots = numSlots;
-    } else {
-      // Increment metering info if the reservation is changing
-      if (info.getNumSlots() != numSlots) {
-        Enum<JobCounter> counter = 
-          (type == TaskType.MAP) ? 
-              JobCounter.FALLOW_SLOTS_MILLIS_MAPS : 
-              JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
-        long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
-        jobCounters.incrCounter(counter, fallowSlotMillis);
-        
-        // Update 
-        reservedSlots = numSlots - info.getNumSlots();
-        info.setTimestamp(now);
-        info.setNumSlots(numSlots);
-      }
-    }
-    map.put(taskTracker, info);
-    if (type == TaskType.MAP) {
-      jobtracker.getInstrumentation().addReservedMapSlots(reservedSlots);
-    }
-    else {
-      jobtracker.getInstrumentation().addReservedReduceSlots(reservedSlots);
-    }
-    jobtracker.incrementReservations(type, reservedSlots);
-  }
-  
-  public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
-                                                TaskType type) {
-    Map<TaskTracker, FallowSlotInfo> map =
-      (type == TaskType.MAP) ? trackersReservedForMaps : 
-                               trackersReservedForReduces;
-
-    FallowSlotInfo info = map.get(taskTracker);
-    if (info == null) {
-      LOG.warn("Cannot find information about fallow slots for " + 
-               taskTracker.getTrackerName());
-      return;
-    }
-    
-    long now = System.currentTimeMillis();
-
-    Enum<JobCounter> counter = 
-      (type == TaskType.MAP) ? 
-          JobCounter.FALLOW_SLOTS_MILLIS_MAPS : 
-          JobCounter.FALLOW_SLOTS_MILLIS_REDUCES;
-    long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
-    jobCounters.incrCounter(counter, fallowSlotMillis);
-
-    map.remove(taskTracker);
-    if (type == TaskType.MAP) {
-      jobtracker.getInstrumentation().decReservedMapSlots(info.getNumSlots());
-    }
-    else {
-      jobtracker.getInstrumentation().decReservedReduceSlots(
-        info.getNumSlots());
-    }
-    jobtracker.decrementReservations(type, info.getNumSlots());
-  }
-  
-  public int getNumReservedTaskTrackersForMaps() {
-    return trackersReservedForMaps.size();
-  }
-  
-  public int getNumReservedTaskTrackersForReduces() {
-    return trackersReservedForReduces.size();
-  }
-  
-  private int getTrackerTaskFailures(String trackerName) {
-    String trackerHostName = convertTrackerNameToHostName(trackerName);
-    Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
-    return (failedTasks != null) ? failedTasks.intValue() : 0; 
-  }
-    
-  /**
-   * Get the black listed trackers for the job
-   * 
-   * @return List of blacklisted tracker names
-   */
-  List<String> getBlackListedTrackers() {
-    List<String> blackListedTrackers = new ArrayList<String>();
-    for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
-       if (e.getValue().intValue() >= maxTaskFailuresPerTracker) {
-         blackListedTrackers.add(e.getKey());
-       }
-    }
-    return blackListedTrackers;
-  }
-  
-  /**
-   * Get the no. of 'flaky' tasktrackers for a given job.
-   * 
-   * @return the no. of 'flaky' tasktrackers for a given job.
-   */
-  int getNoOfBlackListedTrackers() {
-    return flakyTaskTrackers;
-  }
-    
-  /**
-   * Get the information on tasktrackers and no. of errors which occurred
-   * on them for a given job. 
-   * 
-   * @return the map of tasktrackers and no. of errors which occurred
-   *         on them for a given job. 
-   */
-  synchronized Map<String, Integer> getTaskTrackerErrors() {
-    // Clone the 'trackerToFailuresMap' and return the copy
-    Map<String, Integer> trackerErrors = 
-      new TreeMap<String, Integer>(trackerToFailuresMap);
-    return trackerErrors;
-  }
-
-  /**
-   * Remove a map TIP from the lists for running maps.
-   * Called when a map fails/completes (note if a map is killed,
-   * it won't be present in the list since it was completed earlier)
-   * @param tip the tip that needs to be retired
-   */
-  private synchronized void retireMap(TaskInProgress tip) {
-    if (runningMapCache == null) {
-      LOG.warn("Running cache for maps missing!! "
-               + "Job details are missing.");
-      return;
-    }
-    
-    String[] splitLocations = tip.getSplitLocations();
-
-    // Remove the TIP from the list for running non-local maps
-    if (splitLocations == null || splitLocations.length == 0) {
-      nonLocalRunningMaps.remove(tip);
-      return;
-    }
-
-    // Remove from the running map caches
-    for(String host: splitLocations) {
-      Node node = jobtracker.getNode(host);
-
-      for (int j = 0; j < maxLevel; ++j) {
-        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
-        if (hostMaps != null) {
-          hostMaps.remove(tip);
-          if (hostMaps.size() == 0) {
-            runningMapCache.remove(node);
-          }
-        }
-        node = node.getParent();
-      }
-    }
-  }
-
-  /**
-   * Remove a reduce TIP from the list for running-reduces
-   * Called when a reduce fails/completes
-   * @param tip the tip that needs to be retired
-   */
-  private synchronized void retireReduce(TaskInProgress tip) {
-    if (runningReduces == null) {
-      LOG.warn("Running list for reducers missing!! "
-               + "Job details are missing.");
-      return;
-    }
-    runningReduces.remove(tip);
-  }
-
-  /**
-   * Adds a map tip to the list of running maps.
-   * @param tip the tip that needs to be scheduled as running
-   */
-  protected synchronized void scheduleMap(TaskInProgress tip) {
-    
-    runningMapTaskStats.add(0.0f);
-    if (runningMapCache == null) {
-      LOG.warn("Running cache for maps is missing!! " 
-               + "Job details are missing.");
-      return;
-    }
-    String[] splitLocations = tip.getSplitLocations();
-
-    // Add the TIP to the list of non-local running TIPs
-    if (splitLocations == null || splitLocations.length == 0) {
-      nonLocalRunningMaps.add(tip);
-      return;
-    }
-
-    for(String host: splitLocations) {
-      Node node = jobtracker.getNode(host);
-
-      for (int j = 0; j < maxLevel; ++j) {
-        Set<TaskInProgress> hostMaps = runningMapCache.get(node);
-        if (hostMaps == null) {
-          // create a cache if needed
-          hostMaps = new LinkedHashSet<TaskInProgress>();
-          runningMapCache.put(node, hostMaps);
-        }
-        hostMaps.add(tip);
-        node = node.getParent();
-      }
-    }
-  }
-  
-  /**
-   * Adds a reduce tip to the list of running reduces
-   * @param tip the tip that needs to be scheduled as running
-   */
-  protected synchronized void scheduleReduce(TaskInProgress tip) {
-    runningReduceTaskStats.add(0.0f);
-    if (runningReduces == null) {
-      LOG.warn("Running cache for reducers missing!! "
-               + "Job details are missing.");
-      return;
-    }
-    runningReduces.add(tip);
-  }
-  
-  /**
-   * Adds the failed TIP in the front of the list for non-running maps
-   * @param tip the tip that needs to be failed
-   */
-  private synchronized void failMap(TaskInProgress tip) {
-    if (nonRunningMapCache == null) {
-      LOG.warn("Non-running cache for maps missing!! "
-               + "Job details are missing.");
-      return;
-    }
-
-    // 1. Its added everywhere since other nodes (having this split local)
-    //    might have removed this tip from their local cache
-    // 2. Give high priority to failed tip - fail early
-
-    String[] splitLocations = tip.getSplitLocations();
-
-    // Add the TIP in the front of the list for non-local non-running maps
-    if (splitLocations.length == 0) {
-      nonLocalMaps.add(0, tip);
-      return;
-    }
-
-    for(String host: splitLocations) {
-      Node node = jobtracker.getNode(host);
-      
-      for (int j = 0; j < maxLevel; ++j) {
-        List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
-        if (hostMaps == null) {
-          hostMaps = new LinkedList<TaskInProgress>();
-          nonRunningMapCache.put(node, hostMaps);
-        }
-        hostMaps.add(0, tip);
-        node = node.getParent();
-      }
-    }
-  }
-  
-  /**
-   * Adds a failed TIP in the front of the list for non-running reduces
-   * @param tip the tip that needs to be failed
-   */
-  private synchronized void failReduce(TaskInProgress tip) {
-    if (nonRunningReduces == null) {
-      LOG.warn("Failed cache for reducers missing!! "
-               + "Job details are missing.");
-      return;
-    }
-    nonRunningReduces.add(0, tip);
-  }
-  
-  /**
-   * Find a non-running task in the passed list of TIPs
-   * @param tips a collection of TIPs
-   * @param ttStatus the status of tracker that has requested a task to run
-   * @param numUniqueHosts number of unique hosts that run trask trackers
-   * @param removeFailedTip whether to remove the failed tips
-   */
-  private synchronized TaskInProgress findTaskFromList(
-      Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
-      int numUniqueHosts,
-      boolean removeFailedTip) {
-    Iterator<TaskInProgress> iter = tips.iterator();
-    while (iter.hasNext()) {
-      TaskInProgress tip = iter.next();
-
-      // Select a tip if
-      //   1. runnable   : still needs to be run and is not completed
-      //   2. ~running   : no other node is running it
-      //   3. earlier attempt failed : has not failed on this host
-      //                               and has failed on all the other hosts
-      // A TIP is removed from the list if 
-      // (1) this tip is scheduled
-      // (2) if the passed list is a level 0 (host) cache
-      // (3) when the TIP is non-schedulable (running, killed, complete)
-      if (tip.isRunnable() && !tip.isRunning()) {
-        // check if the tip has failed on this host
-        if (!tip.hasFailedOnMachine(ttStatus.getHost()) || 
-             tip.getNumberOfFailedMachines() >= numUniqueHosts) {
-          // check if the tip has failed on all the nodes
-          iter.remove();
-          return tip;
-        } else if (removeFailedTip) { 
-          // the case where we want to remove a failed tip from the host cache
-          // point#3 in the TIP removal logic above
-          iter.remove();
-        }
-      } else {
-        // see point#3 in the comment above for TIP removal logic
-        iter.remove();
-      }
-    }
-    return null;
-  }
-  
-  public boolean hasSpeculativeMaps() {
-    return hasSpeculativeMaps;
-  }
-
-  public boolean hasSpeculativeReduces() {
-    return hasSpeculativeReduces;
-  }
-
-  /**
-   * Retrieve a task for speculation.
-   * If a task slot becomes available and there are less than SpeculativeCap
-   * speculative tasks running: 
-   *  1)Ignore the request if the TT's progressRate is < SlowNodeThreshold
-   *  2)Choose candidate tasks - those tasks whose progress rate is below
-   *    slowTaskThreshold * mean(progress-rates)
-   *  3)Speculate task that's expected to complete last
-   * @param list pool of tasks to choose from
-   * @param taskTrackerName the name of the TaskTracker asking for a task
-   * @param taskTrackerHost the hostname of the TaskTracker asking for a task
-   * @param taskType the type of task (MAP/REDUCE) that we are considering
-   * @return the TIP to speculatively re-execute
-   */
-  protected synchronized TaskInProgress findSpeculativeTask(
-      Collection<TaskInProgress> list, String taskTrackerName, 
-      String taskTrackerHost, TaskType taskType) {
-    if (list.isEmpty()) {
-      return null;
-    }
-    long now = JobTracker.getClock().getTime();
-
-    // Don't return anything if either the TaskTracker is slow or we have
-    // already launched enough speculative tasks in the cluster.
-    if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
-      return null;
-    }
-
-    TaskInProgress slowestTIP = null;
-    Comparator<TaskInProgress> LateComparator =
-      new EstimatedTimeLeftComparator(now);
-
-    Iterator<TaskInProgress> iter = list.iterator();
-    while (iter.hasNext()) {
-      TaskInProgress tip = iter.next();
-
-      // If this tip has already run on this machine once or it doesn't need any
-      // more speculative attempts, skip it.
-      if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
-          !tip.canBeSpeculated(now)) {
-          continue;
-      }
-
-      if (slowestTIP == null) {
-        slowestTIP = tip;
-      } else {
-        slowestTIP =
-            LateComparator.compare(tip, slowestTIP) < 0 ? tip : slowestTIP;
-      }
-    }
-
-    if (slowestTIP != null) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Chose task " + slowestTIP.getTIPId() + ". Statistics: Task's : " +
-            slowestTIP.getCurrentProgressRate(now) + " Job's : " + 
-            (slowestTIP.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
-      }
-    }
-
-  return slowestTIP;
-  }
-
-  /**
-   * Find new map task
-   * @param tts The task tracker that is asking for a task
-   * @param clusterSize The number of task trackers in the cluster
-   * @param numUniqueHosts The number of hosts that run task trackers
-   * @param maxCacheLevel The maximum topology level until which to schedule
-   *                      maps. 
-   *                      A value of {@link #anyCacheLevel} implies any 
-   *                      available task (node-local, rack-local, off-switch and 
-   *                      speculative tasks).
-   *                      A value of {@link #NON_LOCAL_CACHE_LEVEL} implies only
-   *                      off-switch/speculative tasks should be scheduled.
-   * @return the index in tasks of the selected task (or -1 for no task)
-   */
-  private synchronized int findNewMapTask(final TaskTrackerStatus tts, 
-                                          final int clusterSize,
-                                          final int numUniqueHosts,
-                                          final int maxCacheLevel) {
-    String taskTrackerName = tts.getTrackerName();
-    String taskTrackerHost = tts.getHost();
-    if (numMapTasks == 0) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("No maps to schedule for " + profile.getJobID());
-      }
-      return -1;
-    }
-
-    TaskInProgress tip = null;
-    
-    //
-    // Update the last-known clusterSize
-    //
-    this.clusterSize = clusterSize;
-
-    if (!shouldRunOnTaskTracker(taskTrackerName)) {
-      return -1;
-    }
-
-    // Check to ensure this TaskTracker has enough resources to 
-    // run tasks from this job
-    long outSize = resourceEstimator.getEstimatedMapOutputSize();
-    long availSpace = tts.getResourceStatus().getAvailableSpace();
-    if(availSpace < outSize) {
-      LOG.warn("No room for map task. Node " + tts.getHost() + 
-               " has " + availSpace + 
-               " bytes free; but we expect map to take " + outSize);
-
-      return -1; //see if a different TIP might work better. 
-    }
-    
-    
-    // For scheduling a map task, we have two caches and a list (optional)
-    //  I)   one for non-running task
-    //  II)  one for running task (this is for handling speculation)
-    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
-    //       the list is empty if all TIPs have associated locations
-
-    // First a look up is done on the non-running cache and on a miss, a look 
-    // up is done on the running cache. The order for lookup within the cache:
-    //   1. from local node to root [bottom up]
-    //   2. breadth wise for all the parent nodes at max level
-
-    // We fall to linear scan of the list (III above) if we have misses in the 
-    // above caches
-
-    Node node = jobtracker.getNode(tts.getHost());
-    
-    //
-    // I) Non-running TIP :
-    // 
-
-    // 1. check from local node to the root [bottom up cache lookup]
-    //    i.e if the cache is available and the host has been resolved
-    //    (node!=null)
-    if (node != null) {
-      Node key = node;
-      int level = 0;
-      // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
-      // called to schedule any task (local, rack-local, off-switch or speculative)
-      // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
-      //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
-      // tasks
-      int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
-      for (level = 0;level < maxLevelToSchedule; ++level) {
-        List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
-        if (cacheForLevel != null) {
-          tip = findTaskFromList(cacheForLevel, tts, 
-              numUniqueHosts,level == 0);
-          if (tip != null) {
-            // Add to running cache
-            scheduleMap(tip);
-
-            // remove the cache if its empty
-            if (cacheForLevel.size() == 0) {
-              nonRunningMapCache.remove(key);
-            }
-
-            return tip.getIdWithinJob();
-          }
-        }
-        key = key.getParent();
-      }
-      
-      // Check if we need to only schedule a local task (node-local/rack-local)
-      if (level == maxCacheLevel) {
-        return -1;
-      }
-    }
-
-    //2. Search breadth-wise across parents at max level for non-running 
-    //   TIP if
-    //     - cache exists and there is a cache miss 
-    //     - node information for the tracker is missing (tracker's topology
-    //       info not obtained yet)
-
-    // collection of node at max level in the cache structure
-    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
-
-    // get the node parent at max level
-    Node nodeParentAtMaxLevel = 
-      (node == null) ? null : JobTracker.getParentNode(node, maxLevel - 1);
-    
-    for (Node parent : nodesAtMaxLevel) {
-
-      // skip the parent that has already been scanned
-      if (parent == nodeParentAtMaxLevel) {
-        continue;
-      }
-
-      List<TaskInProgress> cache = nonRunningMapCache.get(parent);
-      if (cache != null) {
-        tip = findTaskFromList(cache, tts, numUniqueHosts, false);
-        if (tip != null) {
-          // Add to the running cache
-          scheduleMap(tip);
-
-          // remove the cache if empty
-          if (cache.size() == 0) {
-            nonRunningMapCache.remove(parent);
-          }
-          LOG.info("Choosing a non-local task " + tip.getTIPId());
-          return tip.getIdWithinJob();
-        }
-      }
-    }
-
-    // 3. Search non-local tips for a new task
-    tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
-    if (tip != null) {
-      // Add to the running list
-      scheduleMap(tip);
-
-      LOG.info("Choosing a non-local task " + tip.getTIPId());
-      return tip.getIdWithinJob();
-    }
-
-    //
-    // II) Running TIP :
-    // 
- 
-    if (hasSpeculativeMaps) {
-      tip = getSpeculativeMap(taskTrackerName, taskTrackerHost);
-      if (tip != null) {
-        return tip.getIdWithinJob();
-      }
-    }
-   return -1;
-  }
-
-  private synchronized TaskInProgress getSpeculativeMap(String taskTrackerName, 
-      String taskTrackerHost) {
-
-    //////// Populate allTips with all TaskInProgress
-    Set<TaskInProgress> allTips = new HashSet<TaskInProgress>();
-    
-    // collection of node at max level in the cache structure
-    Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
-    // Add all tasks from max-level nodes breadth-wise
-    for (Node parent : nodesAtMaxLevel) {
-      Set<TaskInProgress> cache = runningMapCache.get(parent);
-      if (cache != null) {
-        allTips.addAll(cache);
-      }
-    }
-    // Add all non-local TIPs
-    allTips.addAll(nonLocalRunningMaps);
-    
-    ///////// Select a TIP to run on
-    TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName, 
-        taskTrackerHost, TaskType.MAP);
-    
-    if (tip != null) {
-      LOG.info("Choosing map task " + tip.getTIPId() + 
-          " for speculative execution");
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No speculative map task found for tracker " + taskTrackerName);
-      }
-    }
-    return tip;
-  }
-  
-  /**
-   * Find new reduce task
-   * @param tts The task tracker that is asking for a task
-   * @param clusterSize The number of task trackers in the cluster
-   * @param numUniqueHosts The number of hosts that run task trackers
-   * @return the index in tasks of the selected task (or -1 for no task)
-   */
-  private synchronized int findNewReduceTask(TaskTrackerStatus tts, 
-                                             int clusterSize,
-                                             int numUniqueHosts) {
-    String taskTrackerName = tts.getTrackerName();
-    String taskTrackerHost = tts.getHost();
-    if (numReduceTasks == 0) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("No reduces to schedule for " + profile.getJobID());
-      }
-      return -1;
-    }
-    TaskInProgress tip = null;
-    
-    // Update the last-known clusterSize
-    this.clusterSize = clusterSize;
-
-    if (!shouldRunOnTaskTracker(taskTrackerName)) {
-      return -1;
-    }
-
-    long outSize = resourceEstimator.getEstimatedReduceInputSize();
-    long availSpace = tts.getResourceStatus().getAvailableSpace();
-    if(availSpace < outSize) {
-      LOG.warn("No room for reduce task. Node " + taskTrackerName + " has " +
-                availSpace + 
-               " bytes free; but we expect reduce input to take " + outSize);
-
-      return -1; //see if a different TIP might work better. 
-    }
-    
-    // 1. check for a never-executed reduce tip
-    // reducers don't have a cache and so pass -1 to explicitly call that out
-    tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
-    if (tip != null) {
-      scheduleReduce(tip);
-      return tip.getIdWithinJob();
-    }
-
-    // 2. check for a reduce tip to be speculated
-    if (hasSpeculativeReduces) {
-      tip = getSpeculativeReduce(taskTrackerName, taskTrackerHost);
-      if (tip != null) {
-        return tip.getIdWithinJob();
-      }
-    }
-
-    return -1;
-  }
-
-  private synchronized TaskInProgress getSpeculativeReduce(
-      String taskTrackerName, String taskTrackerHost) {
-    TaskInProgress tip = findSpeculativeTask(
-        runningReduces, taskTrackerName, taskTrackerHost, TaskType.REDUCE);
-    if (tip != null) {
-      LOG.info("Choosing reduce task " + tip.getTIPId() + 
-          " for speculative execution");
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("No speculative map task found for tracker "
-                  + taskTrackerHost);
-      }
-    }
-    return tip;
-  }
-
-    /**
-     * Check to see if the maximum number of speculative tasks are
-     * already being executed currently.
-     * @param tasks the set of tasks to test
-     * @param type the type of task (MAP/REDUCE) that we are considering
-     * @return has the cap been reached?
-     */
-   private boolean atSpeculativeCap(Collection<TaskInProgress> tasks, 
-       TaskType type) {
-     float numTasks = tasks.size();
-     if (numTasks == 0){
-       return true; // avoid divide by zero
-     }
-     int speculativeTaskCount = type == TaskType.MAP ? speculativeMapTasks 
-         : speculativeReduceTasks;
-     //return true if totalSpecTask < max(10, 0.01 * total-slots, 
-     //                                   0.1 * total-running-tasks)
-
-     if (speculativeTaskCount < MIN_SPEC_CAP) {
-       return false; // at least one slow tracker's worth of slots(default=10)
-     }
-     ClusterStatus c = jobtracker.getClusterStatus(false); 
-     int numSlots = (type == TaskType.MAP ? c.getMaxMapTasks() : c.getMaxReduceTasks());
-     if ((float)speculativeTaskCount < numSlots * MIN_SLOTS_CAP) {
-       return false;
-     }
-     boolean atCap = (((float)(speculativeTaskCount)/numTasks) >= speculativeCap);
-     if (LOG.isDebugEnabled()) {
-       LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
-           ((float)(speculativeTaskCount)/numTasks)+
-           ", so atSpecCap() is returning "+atCap);
-     }
-     return atCap;
-   }
-  
-  /**
-   * A class for comparing the estimated time to completion of two tasks
-   */
-  private static class EstimatedTimeLeftComparator 
-  implements Comparator<TaskInProgress> {
-    private long time;
-    public EstimatedTimeLeftComparator(long now) {
-      this.time = now;
-    }
-    /**
-     * Estimated time to completion is measured as:
-     *   % of task left to complete (1 - progress) / progress rate of the task.
-     * 
-     * This assumes that tasks are linear in their progress, which is 
-     * often wrong, especially since progress for reducers is currently
-     * calculated by evenly weighting their three stages (shuffle, sort, map)
-     * which rarely account for 1/3 each. This should be fixed in the future
-     * by calculating progressRate more intelligently or splitting these
-     * multi-phase tasks into individual tasks.
-     * 
-     * The ordering this comparator defines is: task1 < task2 if task1 is
-     * estimated to finish farther in the future => compare(t1,t2) returns -1
-     */
-    public int compare(TaskInProgress tip1, TaskInProgress tip2) {
-      //we have to use the Math.max in the denominator to avoid divide by zero
-      //error because prog and progRate can both be zero (if one is zero,
-      //the other one will be 0 too).
-      //We use inverse of time_reminaing=[(1- prog) / progRate]
-      //so that (1-prog) is in denom. because tasks can have arbitrarily 
-      //low progRates in practice (e.g. a task that is half done after 1000
-      //seconds will have progRate of 0.0000005) so we would rather 
-      //use Math.maxnon (1-prog) by putting it in the denominator 
-      //which will cause tasks with prog=1 look 99.99% done instead of 100%
-      //which is okay
-      double t1 = tip1.getCurrentProgressRate(time) / Math.max(0.0001, 
-          1.0 - tip1.getProgress());
-      double t2 = tip2.getCurrentProgressRate(time) / Math.max(0.0001, 
-          1.0 - tip2.getProgress());
-      if (t1 < t2) return -1;
-      else if (t2 < t1) return 1;
-      else return 0;
-    }
-  }
-  
-  /**
-   * Compares the ave progressRate of tasks that have finished on this 
-   * taskTracker to the ave of all succesfull tasks thus far to see if this 
-   * TT one is too slow for speculating.
-   * slowNodeThreshold is used to determine the number of standard deviations
-   * @param taskTracker the name of the TaskTracker we are checking
-   * @return is this TaskTracker slow
-   */
-  protected boolean isSlowTracker(String taskTracker) {
-    if (trackerMapStats.get(taskTracker) != null &&
-        trackerMapStats.get(taskTracker).mean() -
-        mapTaskStats.mean() > mapTaskStats.std()*slowNodeThreshold) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Tracker " + taskTracker + 
-            " declared slow. trackerMapStats.get(taskTracker).mean() :" + trackerMapStats.get(taskTracker).mean() +
-            " mapTaskStats :" + mapTaskStats);
-      }
-      return true;
-    }
-    if (trackerReduceStats.get(taskTracker) != null && 
-        trackerReduceStats.get(taskTracker).mean() -
-        reduceTaskStats.mean() > reduceTaskStats.std()*slowNodeThreshold) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Tracker " + taskTracker + 
-            " declared slow. trackerReduceStats.get(taskTracker).mean() :" + trackerReduceStats.get(taskTracker).mean() +
-            " reduceTaskStats :" + reduceTaskStats);
-      }
-      return true;
-    }
-    return false;
-  }
-  
-  static class DataStatistics{
-    private int count = 0;
-    private double sum = 0;
-    private double sumSquares = 0;
-    
-    public DataStatistics() {
-    }
-    
-    public DataStatistics(double initNum) {
-      this.count = 1;
-      this.sum = initNum;
-      this.sumSquares = initNum * initNum;
-    }
-    
-    public void add(double newNum) {
-      this.count++;
-      this.sum += newNum;
-      this.sumSquares += newNum * newNum;
-    }
-
-    public void updateStatistics(double old, double update) {
-      sub(old);
-      add(update);
-    }
-    private void sub(double oldNum) {
-      this.count--;
-      this.sum = Math.max(this.sum -= oldNum, 0.0d);
-      this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d);
-    }
-    
-    public double mean() {
-      return sum/count;      
-    }
-  
-    public double var() {
-      // E(X^2) - E(X)^2
-      return Math.max((sumSquares/count) - mean() * mean(), 0.0d);
-    }
-    
-    public double std() {
-      return Math.sqrt(this.var());
-    }
-    
-    public String toString() {
-      return "DataStatistics: count is " + count + ", sum is " + sum + 
-      ", sumSquares is " + sumSquares + " mean is " + mean() + " std() is " + std();
-    }
-    
-  }
-  
-  private boolean shouldRunOnTaskTracker(String taskTracker) {
-    //
-    // Check if too many tasks of this job have failed on this
-    // tasktracker prior to assigning it a new one.
-    //
-    int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
-    if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
-        taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
-      if (LOG.isDebugEnabled()) {
-        String flakyTracker = convertTrackerNameToHostName(taskTracker);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 
-                    + "' for assigning a new task");
-        }
-      }
-      return false;
-    }
-    return true;
-  }
-  
-
-  /**
-   * Metering: Occupied Slots * (Finish - Start)
-   * @param tip {@link TaskInProgress} to be metered which just completed, 
-   *            cannot be <code>null</code> 
-   * @param status {@link TaskStatus} of the completed task, cannot be 
-   *               <code>null</code>
-   */
-  private void meterTaskAttempt(TaskInProgress tip, TaskStatus status) {
-    JobCounter slotCounter = 
-      (tip.isMapTask()) ? JobCounter.SLOTS_MILLIS_MAPS : 
-                          JobCounter.SLOTS_MILLIS_REDUCES;
-    jobCounters.incrCounter(slotCounter, 
-                            tip.getNumSlotsRequired() * 
-                            (status.getFinishTime() - status.getStartTime()));
-  }
-  
-  /**
-   * A taskid assigned to this JobInProgress has reported in successfully.
-   */
-  public synchronized boolean completedTask(TaskInProgress tip, 
-                                            TaskStatus status)
-  {
-    TaskAttemptID taskid = status.getTaskID();
-    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
-        
-    // Metering
-    meterTaskAttempt(tip, status);
-    
-    // Sanity check: is the TIP already complete? 
-    // This would not happen, 
-    // because no two tasks are SUCCEEDED at the same time. 
-    if (tip.isComplete()) {
-      // Mark this task as KILLED
-      tip.alreadyCompletedTask(taskid);
-
-      // Let the JobTracker cleanup this taskid if the job isn't running
-      if (this.status.getRunState() != JobStatus.RUNNING) {
-        jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
-      }
-      return false;
-    } 
-    boolean wasSpeculating = tip.isSpeculating(); //store this fact
-    LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
-             " successfully.");          
-    // Mark the TIP as complete
-    tip.completed(taskid);
-    resourceEstimator.updateWithCompletedTask(status, tip);
-
-    // Update jobhistory 
-    TaskTrackerStatus ttStatus = 
-      this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
-    String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
-    TaskType taskType = getTaskType(tip);
-
-    TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
-        status.getTaskID(), taskType, status.getStartTime(), 
-        status.getTaskTracker(),  ttStatus.getHttpPort());
-    
-    jobHistory.logEvent(tse, status.getTaskID().getJobID());
-    TaskAttemptID statusAttemptID = status.getTaskID();
-
-    if (status.getIsMap()){
-      MapAttemptFinishedEvent mfe = new MapAttemptFinishedEvent(
-          statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(),
-          status.getMapFinishTime(),
-          status.getFinishTime(),  trackerHostname,
-          status.getStateString(), 
-          new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
-          tip.getSplits(statusAttemptID).burst()
-          );
-      
-      jobHistory.logEvent(mfe,  status.getTaskID().getJobID());
-      
-    }else{
-      ReduceAttemptFinishedEvent rfe = new ReduceAttemptFinishedEvent(
-          statusAttemptID, taskType, TaskStatus.State.SUCCEEDED.toString(), 
-          status.getShuffleFinishTime(),
-          status.getSortFinishTime(), status.getFinishTime(),
-          trackerHostname, status.getStateString(),
-          new org.apache.hadoop.mapreduce.Counters(status.getCounters()),
-          tip.getSplits(statusAttemptID).burst()
-          );
-      
-      jobHistory.logEvent(rfe,  status.getTaskID().getJobID());
-      
-    }
-
-    TaskFinishedEvent tfe = new TaskFinishedEvent(tip.getTIPId(),
-        tip.getExecFinishTime(), taskType, 
-        TaskStatus.State.SUCCEEDED.toString(),
-        new org.apache.hadoop.mapreduce.Counters(status.getCounters()));
-    
-    jobHistory.logEvent(tfe, tip.getJob().getJobID());
-    
-   
-    if (tip.isJobSetupTask()) {
-      // setup task has finished. kill the extra setup tip
-      killSetupTip(!tip.isMapTask());
-      setupComplete();
-    } else if (tip.isJobCleanupTask()) {
-      // cleanup task has finished. Kill the extra cleanup tip
-      if (tip.isMapTask()) {
-        // kill the reduce tip
-        cleanup[1].kill();
-      } else {
-        cleanup[0].kill();
-      }
-      //
-      // The Job is done
-      // if the job is failed, then mark the job failed.
-      if (jobFailed) {
-        terminateJob(JobStatus.FAILED);
-      }
-      // if the job is killed, then mark the job killed.
-      if (jobKilled) {
-        terminateJob(JobStatus.KILLED);
-      }
-      else {
-        jobComplete();
-      }
-      // The job has been killed/failed/successful
-      // JobTracker should cleanup this task
-      jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
-    } else if (tip.isMapTask()) {
-      runningMapTasks -= 1;
-      finishedMapTasks += 1;
-      metrics.completeMap(taskid);
-      if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
-        updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
-      }
-      // remove the completed map from the resp running caches
-      retireMap(tip);
-      if ((finishedMapTasks + failedMapTIPs) == (numMapTasks)) {
-        this.status.setMapProgress(1.0f);
-      }
-    } else {
-      runningReduceTasks -= 1;
-      finishedReduceTasks += 1;
-      metrics.completeReduce(taskid);
-      if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
-        updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
-      }
-      // remove the completed reduces from the running reducers set
-      retireReduce(tip);
-      if ((finishedReduceTasks + failedReduceTIPs) == (numReduceTasks)) {
-        this.status.setReduceProgress(1.0f);
-      }
-    }
-    decrementSpeculativeCount(wasSpeculating, tip);
-    // is job complete?
-    if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
-      jobComplete();
-    }
-    return true;
-  }
-  
-  private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus, 
-      Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
-    float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime(tip.getSuccessfulTaskid());
-    DataStatistics ttStats = 
-      trackerStats.get(ttStatus.getTrackerName());
-    double oldMean = 0.0d;
-    //We maintain the mean of TaskTrackers' means. That way, we get a single
-    //data-point for every tracker (used in the evaluation in isSlowTracker)
-    if (ttStats != null) {
-      oldMean = ttStats.mean();
-      ttStats.add(tipDuration);
-      overallStats.updateStatistics(oldMean, ttStats.mean());
-    } else {
-      trackerStats.put(ttStatus.getTrackerName(),
-          (ttStats = new DataStatistics(tipDuration)));
-      overallStats.add(tipDuration);
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Added mean of " +ttStats.mean() + " to trackerStats of type "+
-          (tip.isMapTask() ? "Map" : "Reduce") +
-          " on "+ttStatus.getTrackerName()+". DataStatistics is now: " +
-          trackerStats.get(ttStatus.getTrackerName()));
-    }
-  }
-  
-  public void updateStatistics(double oldProg, double newProg, boolean isMap) {
-    if (isMap) {   
-      runningMapTaskStats.updateStatistics(oldProg, newProg);
-    } else {
-      runningReduceTaskStats.updateStatistics(oldProg, newProg);
-    }
-  }
-  
-  public DataStatistics getRunningTaskStatistics(boolean isMap) {
-    if (isMap) {
-      return runningMapTaskStats;
-    } else {
-      return runningReduceTaskStats;
-    }
-  }
-  
-  public float getSlowTaskThreshold() {
-    return slowTaskThreshold;
-  }
-
-  /**
-   * Job state change must happen thru this call
-   */
-  private void changeStateTo(int newState) {
-    int oldState = this.status.getRunState();
-    if (oldState == newState) {
-      return; //old and new states are same
-    }
-    this.status.setRunState(newState);
-    
-    //update the metrics
-    if (oldState == JobStatus.PREP) {
-      this.jobtracker.getInstrumentation().decPrepJob(conf, jobId);
-    } else if (oldState == JobStatus.RUNNING) {
-      this.jobtracker.getInstrumentation().decRunningJob(conf, jobId);
-    }
-    
-    if (newState == JobStatus.PREP) {
-      this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
-    } else if (newState == JobStatus.RUNNING) {
-      this.jobtracker.getInstrumentation().addRunningJob(conf, jobId);
-    }
-    
-  }
-
-  /**
-   * The job is done since all it's component tasks are either
-   * successful or have failed.
-   */
-  private void jobComplete() {
-    final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
-    //
-    // All tasks are complete, then the job is done!
-    //
-    if (this.status.getRunState() == JobStatus.RUNNING ||
-        this.status.getRunState() == JobStatus.PREP) {
-      changeStateTo(JobStatus.SUCCEEDED);
-      this.status.setCleanupProgress(1.0f);
-      if (maps.length == 0) {
-        this.status.setMapProgress(1.0f);
-      }
-      if (reduces.length == 0) {
-        this.status.setReduceProgress(1.0f);
-      }
-      this.finishTime = JobTracker.getClock().getTime();
-      this.status.setFinishTime(this.finishTime);
-      LOG.info("Job " + this.status.getJobID() + 
-               " has completed successfully.");
-      
-      // Log the job summary (this should be done prior to logging to 
-      // job-history to ensure job-counters are in-sync 
-      JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
-
-      // Log job-history
-      JobFinishedEvent jfe = 
-        new JobFinishedEvent(this.status.getJobID(),
-          this.finishTime,
-          this.finishedMapTasks,this.finishedReduceTasks, failedMapTasks, 
-          failedReduceTasks, 
-          new org.apache.hadoop.mapreduce.Counters(getMapCounters()),
-          new org.apache.hadoop.mapreduce.Counters(getReduceCounters()),
-          new org.apache.hadoop.mapreduce.Counters(getCounters()));
-      
-      jobHistory.logEvent(jfe, this.status.getJobID());
-      jobHistory.closeWriter(this.status.getJobID());
-
-      // Note that finalize will close the job history handles which garbage collect
-      // might try to finalize
-      garbageCollect();
-      
-      metrics.completeJob(this.conf, this.status.getJobID());
-    }
-  }
-  
-  private synchronized void terminateJob(int jobTerminationState) {
-    if ((status.getRunState() == JobStatus.RUNNING) ||
-        (status.getRunState() == JobStatus.PREP)) {
-
-      this.finishTime = JobTracker.getClock().getTime();
-      this.status.setMapProgress(1.0f);
-      this.status.setReduceProgress(1.0f);
-      this.status.setCleanupProgress(1.0f);
-      this.status.setFinishTime(this.finishTime);
-
-      if (jobTerminationState == JobStatus.FAILED) {
-        changeStateTo(JobStatus.FAILED);
-      } else {
-        changeStateTo(JobStatus.KILLED);
-      }
-      // Log the job summary
-      JobSummary.logJobSummary(this, jobtracker.getClusterStatus(false));
-
-      JobUnsuccessfulCompletionEvent failedEvent = 
-        new JobUnsuccessfulCompletionEvent(this.status.getJobID(),
-            finishTime,
-            this.finishedMapTasks, 
-            this.finishedReduceTasks,
-            JobStatus.getJobRunState(jobTerminationState));
-      
-      jobHistory.logEvent(failedEvent, this.status.getJobID());
-      jobHistory.closeWriter(this.status.getJobID());
-
-      garbageCollect();
-
-      jobtracker.getInstrumentation().terminateJob(
-          this.conf, this.status.getJobID());
-      if (jobTerminationState == JobStatus.FAILED) {
-        jobtracker.getInstrumentation().failedJob(
-            this.conf, this.status.getJobID());
-      } else {
-        jobtracker.getInstrumentation().killedJob(
-            this.conf, this.status.getJobID());
-      }
-    }
-  }
-
-  /**
-   * Terminate the job and all its component tasks.
-   * Calling this will lead to marking the job as failed/killed. Cleanup 
-   * tip will be launched. If the job has not inited, it will directly call 
-   * terminateJob as there is no need to launch cleanup tip.
-   * This method is reentrant.
-   * @param jobTerminationState job termination state
-   */
-  private synchronized void terminate(int jobTerminationState) {
-    if(!tasksInited.get()) {
-    	//init could not be done, we just terminate directly.
-      terminateJob(jobTerminationState);
-      return;
-    }
-
-    if ((status.getRunState() == JobStatus.RUNNING) ||
-         (status.getRunState() == JobStatus.PREP)) {
-      LOG.info("Killing job '" + this.status.getJobID() + "'");
-      if (jobTerminationState == JobStatus.FAILED) {
-        if(jobFailed) {//reentrant
-          return;
-        }
-        jobFailed = true;
-      } else if (jobTerminationState == JobStatus.KILLED) {
-        if(jobKilled) {//reentrant
-          return;
-        }
-        jobKilled = true;
-      }
-      // clear all unclean tasks
-      clearUncleanTasks();
-      //
-      // kill all TIPs.
-      //
-      for (int i = 0; i < setup.length; i++) {
-        setup[i].kill();
-      }
-      for (int i = 0; i < maps.length; i++) {
-        maps[i].kill();
-      }
-      for (int i = 0; i < reduces.length; i++) {
-        reduces[i].kill();
-      }
-      
-      if (!jobSetupCleanupNeeded) {
-        terminateJob(jobTerminationState);
-      }
-    }
-  }
-
-  /**
-   * Cancel all reservations since the job is done
-   */
-  private void cancelReservedSlots() {
-    // Make a copy of the set of TaskTrackers to prevent a 
-    // ConcurrentModificationException ...
-    Set<TaskTracker> tm = 
-      new HashSet<TaskTracker>(trackersReservedForMaps.keySet());
-    for (TaskTracker tt : tm) {
-      tt.unreserveSlots(TaskType.MAP, this);
-    }
-
-    Set<TaskTracker> tr = 
-      new HashSet<TaskTracker>(trackersReservedForReduces.keySet());
-    for (TaskTracker tt : tr) {
-      tt.unreserveSlots(TaskType.REDUCE, this);
-    }
-  }
-  
-  private void clearUncleanTasks() {
-    TaskAttemptID taskid = null;
-    TaskInProgress tip = null;
-    while (!mapCleanupTasks.isEmpty()) {
-      taskid = mapCleanupTasks.remove(0);
-      tip = maps[taskid.getTaskID().getId()];
-      updateTaskStatus(tip, tip.getTaskStatus(taskid));
-    }
-    while (!reduceCleanupTasks.isEmpty()) {
-      taskid = reduceCleanupTasks.remove(0);
-      tip = reduces[taskid.getTaskID().getId()];
-      updateTaskStatus(tip, tip.getTaskStatus(taskid));
-    }
-  }
-
-  /**
-   * Kill the job and all its component tasks. This method should be called from 
-   * jobtracker and should return fast as it locks the jobtracker.
-   */
-  public void kill() {
-    boolean killNow = false;
-    synchronized(jobInitKillStatus) {
-      jobInitKillStatus.killed = true;
-      //if not in middle of init, terminate it now
-      if(!jobInitKillStatus.initStarted || jobInitKillStatus.initDone) {
-        //avoiding nested locking by setting flag
-        killNow = true;
-      }
-    }
-    if(killNow) {
-      terminate(JobStatus.KILLED);
-    }
-  }
-  
-  /**
-   * Fails the job and all its component tasks. This should be called only from
-   * {@link JobInProgress} or {@link JobTracker}. Look at 
-   * {@link JobTracker#failJob(JobInProgress)} for more details.
-   * Note that the job doesnt expect itself to be failed before its inited. 
-   * Only when the init is done (successfully or otherwise), the job can be 
-   * failed. 
-   */
-  synchronized void fail() {
-    terminate(JobStatus.FAILED);
-  }
-  
-  private void decrementSpeculativeCount(boolean wasSpeculating, 
-      TaskInProgress tip) {
-    if (wasSpeculating) {
-      if (tip.isMapTask()) {
-        speculativeMapTasks--;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Decremented count for " + 
-                    tip.getTIPId()+"/"+tip.getJob().getJobID() + 
-                    ". Current speculativeMap task count: "
-                    + speculativeMapTasks);
-        }
-      } else {
-        speculativeReduceTasks--;
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Decremented count for " +
-                    tip.getTIPId()+"/"+tip.getJob().getJobID() +
-                    ". Current speculativeReduce task count: "
-                    + speculativeReduceTasks);
-        }
-      }
-    }
-  }
-  
-  /**
-   * A task assigned to this JobInProgress has reported in as failed.
-   * Most of the time, we'll just reschedule execution.  However, after
-   * many repeated failures we may instead decide to allow the entire 
-   * job to fail or succeed if the user doesn't care about a few tasks failing.
-   *
-   * Even if a task has reported as completed in the past, it might later
-   * be reported as failed.  That's because the TaskTracker that hosts a map
-   * task might die before the entire job can complete.  If that happens,
-   * we need to schedule reexecution so that downstream reduce tasks can 
-   * obtain the map task's output.
-   */
-  private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
-                          TaskStatus status,
-                          TaskTracker taskTracker, boolean wasRunning,
-                          boolean wasComplete, boolean wasAttemptRunning) {
-    // check if the TIP is already failed
-    boolean wasFailed = tip.isFailed();
-    boolean wasSpeculating = tip.isSpeculating();
-
-    // Mark the taskid as FAILED or KILLED
-    tip.incompleteSubTask(taskid, this.status);
-    decrementSpeculativeCount(wasSpeculating, tip);
-   
-    boolean isRunning = tip.isRunning();
-    boolean isComplete = tip.isComplete();
-
-    if(wasAttemptRunning) {
-      // We are decrementing counters without looking for isRunning ,
-      // because we increment the counters when we obtain
-      // new map task attempt or reduce task attempt.We do not really check
-      // for tip being running.
-      // Whenever we obtain new task attempt runningMapTasks incremented.
-      // hence we are decrementing the same.      
-      if(!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
-        if(tip.isMapTask()) {
-          runningMapTasks -= 1;
-        } else {
-          runningReduceTasks -= 1;
-        }
-      }
-      
-      // Metering
-      meterTaskAttempt(tip, status);
-    }
-        
-    //update running  count on task failure.
-    if (wasRunning && !isRunning) {
-      if (tip.isJobCleanupTask()) {
-        launchedCleanup = false;
-      } else if (tip.isJobSetupTask()) {
-        launchedSetup = false;
-      } else if (tip.isMapTask()) {
-        // remove from the running queue and put it in the non-running cache
-        // if the tip is not complete i.e if the tip still needs to be run
-        if (!isComplete) {
-          retireMap(tip);
-          failMap(tip);
-        }
-      } else {
-        // remove from the running queue and put in the failed queue if the tip
-        // is not complete
-        if (!isComplete) {
-          retireReduce(tip);
-          failReduce(tip);
-        }
-      }
-    }
-        
-    // The case when the map was complete but the task tracker went down.
-    // However, we don't need to do any metering here...
-    if (wasComplete && !isComplete) {
-      if (tip.isMapTask()) {
-        // Put the task back in the cache. This will help locality for cases
-        // where we have a different TaskTracker from the same rack/switch
-        // asking for a task. 
-        // We bother about only those TIPs that were successful
-        // earlier (wasComplete and !isComplete) 
-        // (since they might have been removed from the cache of other 
-        // racks/switches, if the input split blocks were present there too)
-        failMap(tip);
-        finishedMapTasks -= 1;
-      }
-    }
-        
-    // update job history
-    // get taskStatus from tip
-    TaskStatus taskStatus = tip.getTaskStatus(taskid);
-    String taskTrackerName = taskStatus.getTaskTracker();
-    String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
-    int taskTrackerPort = -1;
-    TaskTrackerStatus taskTrackerStatus = 
-      (taskTracker == null) ? null : taskTracker.getStatus();
-    if (taskTrackerStatus != null) {
-      taskTrackerPort = taskTrackerStatus.getHttpPort();
-    }
-    long startTime = taskStatus.getStartTime();
-    long finishTime = taskStatus.getFinishTime();
-    List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
-    String diagInfo = taskDiagnosticInfo == null ? "" :
-      StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
-    TaskType taskType = getTaskType(tip);
-    TaskAttemptStartedEvent tse = new TaskAttemptStartedEvent(
-        taskid, taskType, startTime, taskTrackerName, taskTrackerPort);
-    
-    jobHistory.logEvent(tse, taskid.getJobID());
-
-    ProgressSplitsBlock splits = tip.getSplits(taskStatus.getTaskID());
-   
-    TaskAttemptUnsuccessfulCompletionEvent tue =
-      new TaskAttemptUnsuccessfulCompletionEvent
-            (taskid, 
-             taskType, taskStatus.getRunState().toString(),
-             finishTime, 
-             taskTrackerHostName, diagInfo,
-             splits.burst());
-    jobHistory.logEvent(tue, taskid.getJobID());
-        
-    // After this, try to assign tasks with the one after this, so that
-    // the failed task goes to the end of the list.
-    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
-      if (tip.isMapTask()) {
-        failedMapTasks++;
-      } else {
-        failedReduceTasks++; 
-      }
-    }
-            
-    //
-    // Note down that a task has failed on this tasktracker 
-    //
-    if (status.getRunState() == TaskStatus.State.FAILED) { 
-      addTrackerTaskFailure(taskTrackerName, taskTracker);
-    }
-        
-    //
-    // Let the JobTracker know that this task has failed
-    //
-    jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
-
-    //
-    // Check if we need to kill the job because of too many failures or 
-    // if the job is complete since all component tasks have completed
-
-    // We do it once per TIP and that too for the task that fails the TIP
-    if (!wasFailed && tip.isFailed()) {
-      //
-      // Allow upto 'mapFailuresPercent' of map tasks to fail or
-      // 'reduceFailuresPercent' of reduce tasks to fail
-      //
-      boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
-                        tip.isMapTask() ? 
-            ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
-            ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
-      
-      if (killJob) {
-        LOG.info("Aborting job " + profile.getJobID());
-        TaskFailedEvent tfe = 
-          new TaskFailedEvent(tip.getTIPId(), finishTime, taskType, diagInfo,
-              TaskStatus.State.FAILED.toString(),
-              null);
-        
-        jobHistory.logEvent(tfe, tip.getJob().getJobID());
-        
-        if (tip.isJobCleanupTask()) {
-          // kill the other tip
-          if (tip.isMapTask()) {
-            cleanup[1].kill();
-          } else {
-            cleanup[0].kill();
-          }
-          terminateJob(JobStatus.FAILED);
-        } else {
-          if (tip.isJobSetupTask()) {
-            // kill the other tip
-            killSetupTip(!tip.isMapTask());
-          }
-          fail();
-        }
-      }
-      
-      //
-      // Update the counters
-      //
-      if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
-        if (tip.isMapTask()) {
-          jobCounters.incrCounter(JobCounter.NUM_FAILED_MAPS, 1);
-        } else {
-          jobCounters.incrCounter(JobCounter.NUM_FAILED_REDUCES, 1);
-        }
-      }
-    }
-  }
-
-  void killSetupTip(boolean isMap) {
-    if (isMap) {
-      setup[0].kill();
-    } else {
-      setup[1].kill();
-    }
-  }
-
-  boolean isSetupFinished() {
-    // if there is no setup to be launched, consider setup is finished.  
-    if ((tasksInited.get() && setup.length == 0) || 
-        setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
-        || setup[1].isFailed()) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Fail a task with a given reason, but without a status object.
-   * 
-   * Assuming {@link JobTracker} is locked on entry.
-   * 
-   * @param tip The task's tip
-   * @param taskid The task id
-   * @param reason The reason that the task failed
-   * @param trackerName The task tracker the task failed on
-   */
-  public synchronized void failedTask(TaskInProgress tip, TaskAttemptID taskid,
-      String reason, TaskStatus.Phase phase, TaskStatus.State state, 
-                         String trackerName) {
-    TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
-                                                    taskid,
-                                                    0.0f,
-                                                    tip.isMapTask() ? 
-                                                        numSlotsPerMap : 
-                                                        numSlotsPerReduce,
-                                                    state,
-                                                    reason,
-                                                    reason,
-                                                    trackerName, phase,
-                                                    new Counters());
-    // update the actual start-time of the attempt
-    TaskStatus oldStatus = tip.getTaskStatus(taskid); 
-    long startTime = oldStatus == null
-                     ? JobTracker.getClock().getTime()
-                     : oldStatus.getStartTime();
-    status.setStartTime(startTime);
-    status.setFinishTime(JobTracker.getClock().getTime());
-    boolean wasComplete = tip.isComplete();
-    updateTaskStatus(tip, status);
-    boolean isComplete = tip.isComplete();
-    if (wasComplete && !isComplete) { // mark a successful tip as failed
-      TaskType taskType = getTaskType(tip);
-      TaskFailedEvent tfe = 
-        new TaskFailedEvent(tip.getTIPId(), tip.getExecFinishTime(), taskType,
-            reason, TaskStatus.State.FAILED.toString(),
-            taskid);
-      
-        jobHistory.logEvent(tfe, tip.getJob().getJobID());
-      
-    }
-  }
-       
-                           
-  /**
-   * The job is dead.  We're now GC'ing it, getting rid of the job
-   * from all tables.  Be sure to remove all of this job's tasks
-   * from the various tables.
-   */
-   void garbageCollect() {
-     synchronized(this) {
-       // Cancel task tracker reservation
-       cancelReservedSlots();
-
-
-       // Let the JobTracker know that a job is complete
-       jobtracker.getInstrumentation().decWaitingMaps(getJobID(), pendingMaps());
-       jobtracker.getInstrumentation().decWaitingReduces(getJobID(), pendingReduces());
-       jobtracker.storeCompletedJob(this);
-       jobtracker.finalizeJob(this);
-
-       try {
-         // Definitely remove the local-disk copy of the job file
-         if (localJobFile != null) {
-           localFs.delete(localJobFile, true);
-           localJobFile = null;
-         }
-
-         Path tempDir = jobtracker.getSystemDirectoryForJob(getJobID());
-         new CleanupQueue().addToQueue(new PathDeletionContext(
-             jobtracker.getFileSystem(), tempDir.toUri().getPath())); 
-       } catch (IOException e) {
-         LOG.warn("Error cleaning up "+profile.getJobID()+": "+e);
-       }
-
-       // free up the memory used by the data structures
-       this.nonRunningMapCache = null;
-       this.runningMapCache = null;
-       this.nonRunningReduces = null;
-       this.runningReduces = null;
-
-     }
-     // remove jobs delegation tokens
-     if(conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)) {
-       DelegationTokenRenewal.removeDelegationTokenRenewalForJob(jobId);
-     } // else don't remove it.May be used by spawned tasks
-   }
-
-  /**
-   * Return the TaskInProgress that matches the tipid.
-   */
-  public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
-    if (tipid.getTaskType() == TaskType.MAP) {
-      // cleanup map tip
-      if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
-        return cleanup[0]; 
-      }
-      // setup map tip
-      if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) { 
-        return setup[0];
-      }
-      for (int i = 0; i < maps.length; i++) {
-        if (tipid.equals(maps[i].getTIPId())){
-          return maps[i];
-        }
-      }
-    } else {
-      // cleanup reduce tip
-      if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) { 
-        return cleanup[1]; 
-      }
-      // setup reduce tip
-      if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) { 
-        return setup[1];
-      }
-      for (int i = 0; i < reduces.length; i++) {
-        if (tipid.equals(reduces[i].getTIPId())){
-          return reduces[i];
-        }
-      }
-    }
-    return null;
-  }
-    
-  /**
-   * Find the details of someplace where a map has finished
-   * @param mapId the id of the map
-   * @return the task status of the completed task
-   */
-  public synchronized TaskStatus findFinishedMap(int mapId) {
-    TaskInProgress tip = maps[mapId];
-    if (tip.isComplete()) {
-      TaskStatus[] statuses = tip.getTaskStatuses();
-      for(int i=0; i < statuses.length; i++) {
-        if (statuses[i].getRunState() == TaskStatus.State.SUCCEEDED) {
-          return statuses[i];
-        }
-      }
-    }
-    return null;
-  }
-  
-  synchronized int getNumTaskCompletionEvents() {
-    return taskCompletionEvents.size();
-  }
-    
-  synchronized public TaskCompletionEvent[] getTaskCompletionEvents(
-                                                                    int fromEventId, int maxEvents) {
-    TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
-    if (taskCompletionEvents.size() > fromEventId) {
-      int actualMax = Math.min(maxEvents, 
-                               (taskCompletionEvents.size() - fromEventId));
-      events = taskCompletionEvents.subList(fromEventId, actualMax + fromEventId).toArray(events);        
-    }
-    return events; 
-  }
-  
-  synchronized void fetchFailureNotification(TaskInProgress tip, 
-                                             TaskAttemptID mapTaskId, 
-                                             String mapTrackerName,
-                                             TaskAttemptID reduceTaskId,
-                                             String reduceTrackerName) {
-    Integer fetchFailures = mapTaskIdToFetchFailuresMap.get(mapTaskId);
-    fetchFailures = (fetchFailures == null) ? 1 : (fetchFailures+1);
-    mapTaskIdToFetchFailuresMap.put(mapTaskId, fetchFailures);
-    LOG.info("Failed fetch notification #" + fetchFailures + " for map task: "
-             + mapTaskId + " running on tracker: " + mapTrackerName
-             + " and reduce task: " + reduceTaskId + " running on tracker: "
-             + reduceTrackerName);
-
-    float failureRate = (float)fetchFailures / runningReduceTasks;
-    // declare faulty if fetch-failures >= max-allowed-failures
-    boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) 
-                          ? true
-                          : false;
-    if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
-        && isMapFaulty) {
-      LOG.info("Too many fetch-failures for output of task: " + mapTaskId 
-               + " ... killing it");
-      
-      failedTask(tip, mapTaskId, "Too many fetch-failures",                            
-                 (tip.isMapTask() ? TaskStatus.Phase.MAP : 
-                                    TaskStatus.Phase.REDUCE), 
-                 TaskStatus.State.FAILED, mapTrackerName);
-      
-      mapTaskIdToFetchFailuresMap.remove(mapTaskId);
-    }
-  }
-  
-  /**
-   * @return The JobID of this JobInProgress.
-   */
-  public JobID getJobID() {
-    return jobId;
-  }
-  
-  public synchronized Object getSchedulingInfo() {
-    return this.schedulingInfo;
-  }
-  
-  public synchronized void setSchedulingInfo(Object schedulingInfo) {
-    this.schedulingInfo = schedulingInfo;
-    this.status.setSchedulingInfo(schedulingInfo.toString());
-  }
-  
-  /**
-   * To keep track of kill and initTasks status of this job. initTasks() take 
-   * a lock on JobInProgress object. kill should avoid waiting on 
-   * JobInProgress lock since it may take a while to do initTasks().
-   */
-  private static class JobInitKillStatus {
-    //flag to be set if kill is called
-    boolean killed;
-    
-    boolean initStarted;
-    boolean initDone;
-  }
-
-  boolean isComplete() {
-    return status.isJobComplete();
-  }
-  
-  /**
-   * Get the task type for logging it to {@link JobHistory}.
-   */
-  private TaskType getTaskType(TaskInProgress tip) {
-    if (tip.isJobCleanupTask()) {
-      return TaskType.JOB_CLEANUP;
-    } else if (tip.isJobSetupTask()) {
-      return TaskType.JOB_SETUP;
-    } else if (tip.isMapTask()) {
-      return TaskType.MAP;
-    } else {
-      return TaskType.REDUCE;
-    }
-  }
-  
-  /**
-   * Get the level of locality that a given task would have if launched on
-   * a particular TaskTracker. Returns 0 if the task has data on that machine,
-   * 1 if it has data on the same rack, etc (depending on number of levels in
-   * the network hierarchy).
-   */
-  int getLocalityLevel(TaskInProgress tip, TaskTrackerStatus tts) {
-    Node tracker = jobtracker.getNode(tts.getHost());
-    int level = this.maxLevel;
-    // find the right level across split locations
-    for (String local : maps[tip.getIdWithinJob()].getSplitLocations()) {
-      Node datanode = jobtracker.getNode(local);
-      int newLevel = this.maxLevel;
-      if (tracker != null && datanode != null) {
-        newLevel = getMatchingLevelForNodes(tracker, datanode);
-      }
-      if (newLevel < level) {
-        level = newLevel;
-        // an optimization
-        if (level == 0) {
-          break;
-        }
-      }
-    }
-    return level;
-  }
-  
-  /**
-   * Test method to set the cluster sizes
-   */
-  void setClusterSize(int clusterSize) {
-    this.clusterSize = clusterSize;
-  }
-
-  static class JobSummary {
-    static final Log LOG = LogFactory.getLog(JobSummary.class);
-    
-    // Escape sequences 
-    static final char EQUALS = '=';
-    static final char[] charsToEscape = 
-      {StringUtils.COMMA, EQUALS, StringUtils.ESCAPE_CHAR};
-
-    static class SummaryBuilder {
-      final StringBuilder buffer = new StringBuilder();
-
-      // A little optimization for a very common case
-      SummaryBuilder add(String key, long value) {
-        return _add(key, Long.toString(value));
-      }
-
-      <T> SummaryBuilder add(String key, T value) {
-        return _add(key, StringUtils.escapeString(String.valueOf(value),
-                    StringUtils.ESCAPE_CHAR, charsToEscape));
-      }
-
-      SummaryBuilder add(SummaryBuilder summary) {
-        if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
-        buffer.append(summary.buffer);
-        return this;
-      }
-
-      SummaryBuilder _add(String key, String value) {
-        if (buffer.length() > 0) buffer.append(StringUtils.COMMA);
-        buffer.append(key).append(EQUALS).append(value);
-        return this;
-      }
-
-      @Override public String toString() {
-        return buffer.toString();
-      }
-    }
-
-    static SummaryBuilder getTaskLaunchTimesSummary(JobInProgress job) {
-      SummaryBuilder summary = new SummaryBuilder();
-      Map<TaskType, Long> timeMap = job.getFirstTaskLaunchTimes();
-
-      synchronized(timeMap) {
-        for (Map.Entry<TaskType, Long> e : timeMap.entrySet()) {
-          summary.add("first"+ StringUtils.camelize(e.getKey().name()) +
-                      "TaskLaunchTime", e.getValue().longValue());
-        }
-      }
-      return summary;
-    }
-
-    /**
-     * Log a summary of the job's runtime.
-     * 
-     * @param job {@link JobInProgress} whose summary is to be logged, cannot
-     *            be <code>null</code>.
-     * @param cluster {@link ClusterStatus} of the cluster on which the job was
-     *                run, cannot be <code>null</code>
-     */
-    public static void logJobSummary(JobInProgress job, ClusterStatus cluster) {
-      JobStatus status = job.getStatus();
-      JobProfile profile = job.getProfile();
-      Counters jobCounters = job.getJobCounters();
-      long mapSlotSeconds = 
-        (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_MAPS) +
-         jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_MAPS)) / 1000;
-      long reduceSlotSeconds = 
-        (jobCounters.getCounter(JobCounter.SLOTS_MILLIS_REDUCES) +
-         jobCounters.getCounter(JobCounter.FALLOW_SLOTS_MILLIS_REDUCES)) / 1000;
-
-      SummaryBuilder summary = new SummaryBuilder()
-          .add("jobId", job.getJobID())
-          .add("submitTime", job.getStartTime())
-          .add("launchTime", job.getLaunchTime())
-          .add(getTaskLaunchTimesSummary(job))
-          .add("finishTime", job.getFinishTime())
-          .add("numMaps", job.getTasks(TaskType.MAP).length)
-          .add("numSlotsPerMap", job.getNumSlotsPerMap())
-          .add("numReduces", job.getTasks(TaskType.REDUCE).length)
-          .add("numSlotsPerReduce", job.getNumSlotsPerReduce())
-          .add("user", profile.getUser())
-          .add("queue", profile.getQueueName())
-          .add("status", JobStatus.getJobRunState(status.getRunState()))
-          .add("mapSlotSeconds", mapSlotSeconds)
-          .add("reduceSlotsSeconds", reduceSlotSeconds)
-          .add("clusterMapCapacity", cluster.getMaxMapTasks())
-          .add("clusterReduceCapacity", cluster.getMaxReduceTasks());
-
-      LOG.info(summary);
-    }
-  }
-  
-  /**
-   * Creates the localized copy of job conf
-   * @param jobConf
-   * @param id
-   */
-  void setUpLocalizedJobConf(JobConf jobConf, 
-      org.apache.hadoop.mapreduce.JobID id) {
-    String localJobFilePath = jobtracker.getLocalJobFilePath(id); 
-    File localJobFile = new File(localJobFilePath);
-    FileOutputStream jobOut = null;
-    try {
-      jobOut = new FileOutputStream(localJobFile);
-      jobConf.writeXml(jobOut);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Job conf for " + id + " stored at " 
-            + localJobFile.getAbsolutePath());
-      }
-    } catch (IOException ioe) {
-      LOG.error("Failed to store job conf on the local filesystem ", ioe);
-    } finally {
-      if (jobOut != null) {
-        try {
-          jobOut.close();
-        } catch (IOException ie) {
-          LOG.info("Failed to close the job configuration file " 
-              + StringUtils.stringifyException(ie));
-        }
-      }
-    }
-  }
-
-  /**
-   * Deletes localized copy of job conf
-   */
-  void cleanupLocalizedJobConf(org.apache.hadoop.mapreduce.JobID id) {
-    String localJobFilePath = jobtracker.getLocalJobFilePath(id);
-    File f = new File (localJobFilePath);
-    LOG.info("Deleting localized job conf at " + f);
-    if (!f.delete()) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to delete file " + f);
-      }
-    }
-  }
-  
-  /**
-   * generate job token and save it into the file
-   * @throws IOException
-   */
-  private void generateAndStoreTokens() throws IOException{
-    Path jobDir = jobtracker.getSystemDirectoryForJob(jobId);
-    Path keysFile = new Path(jobDir, TokenCache.JOB_TOKEN_HDFS_FILE);
-
-    if (tokenStorage == null) {
-      tokenStorage = new Credentials();
-    }
-    
-    //create JobToken file and write token to it
-    JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(jobId
-        .toString()));
-    Token<JobTokenIdentifier> token = new Token<JobTokenIdentifier>(identifier,
-        jobtracker.getJobTokenSecretManager());
-    token.setService(identifier.getJobId());
-    
-    TokenCache.setJobToken(token, tokenStorage);
-    
-    // write TokenStorage out
-    tokenStorage.writeTokenStorageFile(keysFile, jobtracker.getConf());
-    LOG.info("jobToken generated and stored with users keys in "
-        + keysFile.toUri().getPath());
-  }
-
-  public String getJobSubmitHostAddress() {
-    return submitHostAddress;
-  }
-
-  public String getJobSubmitHostName() {
-    return submitHostName;
-  }
-}