Browse Source

HADOOP-4759. Removes temporary output directory for failed and killed tasks by launching special CLEANUP tasks for the same. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.19@741197 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
81ead4c6cf
24 changed files with 1006 additions and 440 deletions
  1. 4 0
      CHANGES.txt
  2. 15 7
      src/mapred/org/apache/hadoop/mapred/Child.java
  3. 5 3
      src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
  4. 2 1
      src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
  5. 3 3
      src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
  6. 4 2
      src/mapred/org/apache/hadoop/mapred/JobHistory.java
  7. 148 60
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  8. 33 7
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  9. 9 24
      src/mapred/org/apache/hadoop/mapred/JvmManager.java
  10. 43 43
      src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
  11. 26 16
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  12. 15 9
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  13. 108 53
      src/mapred/org/apache/hadoop/mapred/Task.java
  14. 85 40
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  15. 41 9
      src/mapred/org/apache/hadoop/mapred/TaskLog.java
  16. 19 10
      src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  17. 15 47
      src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
  18. 11 9
      src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  19. 35 3
      src/mapred/org/apache/hadoop/mapred/TaskStatus.java
  20. 179 83
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  21. 4 2
      src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
  22. 2 1
      src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
  23. 137 0
      src/test/org/apache/hadoop/mapred/TestTaskFail.java
  24. 63 8
      src/webapps/job/taskdetails.jsp

+ 4 - 0
CHANGES.txt

@@ -85,6 +85,10 @@ Release 0.19.1 - Unreleased
     HADOOP-5156. TestHeartbeatHandling uses MiiDFSCluster.getNamesystem()
     which does not exit in branch 0.19 and 0.20. (hairong)
 
+    HADOOP-4759. HADOOP-4759. Removes temporary output directory for failed and
+    killed tasks by launching special CLEANUP tasks for the same. 
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

+ 15 - 7
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -47,6 +47,7 @@ class Child {
     LogFactory.getLog(TaskTracker.class);
 
   static volatile TaskAttemptID taskid;
+  static volatile boolean isCleanup;
 
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
@@ -75,7 +76,7 @@ class Child {
           try {
             Thread.sleep(5000);
             if (taskid != null) {
-              TaskLog.syncLogs(firstTaskid, taskid);
+              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
             }
           } catch (InterruptedException ie) {
           } catch (IOException iee) {
@@ -95,6 +96,7 @@ class Child {
     Path srcPidPath = null;
     Path dstPidPath = null;
     int idleLoopCount = 0;
+    Task task = null;
     try {
       while (true) {
         JvmTask myTask = umbilical.getTask(jvmId);
@@ -114,22 +116,23 @@ class Child {
           }
         }
         idleLoopCount = 0;
-        Task task = myTask.getTask();
+        task = myTask.getTask();
         taskid = task.getTaskID();
+        isCleanup = task.isTaskCleanupTask();
         
         //create the index file so that the log files 
         //are viewable immediately
-        TaskLog.syncLogs(firstTaskid, taskid);
+        TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
         if (job.getBoolean("task.memory.mgmt.enabled", false)) {
           if (srcPidPath == null) {
-            srcPidPath = TaskMemoryManagerThread.getPidFilePath(firstTaskid,
-                                                              job);
+            // get the first task's path for the first time  
+            srcPidPath = new Path(task.getPidFile());
           }
           //since the JVM is running multiple tasks potentially, we need
           //to do symlink stuff only for the subsequent tasks
           if (!taskid.equals(firstTaskid)) {
-            dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+            dstPidPath = new Path(task.getPidFile());
             FileUtil.symLink(srcPidPath.toUri().getPath(), 
                 dstPidPath.toUri().getPath());
           }
@@ -154,9 +157,10 @@ class Child {
         try {
           task.run(job, umbilical);             // run the task
         } finally {
-          TaskLog.syncLogs(firstTaskid, taskid);
+          TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
           if (!taskid.equals(firstTaskid) && 
               job.getBoolean("task.memory.mgmt.enabled", false)) {
+            // delete the pid-file's symlink
             new File(dstPidPath.toUri().getPath()).delete();
           }
         }
@@ -169,6 +173,10 @@ class Child {
       umbilical.fsError(taskid, e.getMessage());
     } catch (Throwable throwable) {
       LOG.warn("Error running child", throwable);
+      if (task != null) {
+        // do cleanup for the task
+        task.taskCleanup(umbilical);
+      }
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       throwable.printStackTrace(new PrintStream(baos));

+ 5 - 3
src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java

@@ -132,9 +132,11 @@ public class FileOutputCommitter extends OutputCommitter {
   public void abortTask(TaskAttemptContext context) {
     Path taskOutputPath =  getTempTaskOutputPath(context);
     try {
-      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
-      context.getProgressible().progress();
-      fs.delete(taskOutputPath, true);
+      if (taskOutputPath != null) {
+        FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+        context.getProgressible().progress();
+        fs.delete(taskOutputPath, true);
+      }
     } catch (IOException ie) {
       LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
     }

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -52,8 +52,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
                  so that the TaskTracker can synchronize itself.
    * Version 20: Changed status message due to changes in TaskStatus
    *             (HADOOP-4232)
+   * Version 21: Changed format of Task and TaskStatus for HADOOP-4759 
    */
-  public static final long versionID = 20L;
+  public static final long versionID = 21L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 3 - 3
src/mapred/org/apache/hadoop/mapred/IsolationRunner.java

@@ -178,9 +178,9 @@ public class IsolationRunner {
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
     File workDirName = new File(lDirAlloc.getLocalPathToRead(
-                                  TaskTracker.getJobCacheSubdir() 
-                                  + Path.SEPARATOR + taskId.getJobID() 
-                                  + Path.SEPARATOR + taskId
+                                  TaskTracker.getLocalTaskDir(
+                                    taskId.getJobID().toString(), 
+                                    taskId.toString())
                                   + Path.SEPARATOR + "work",
                                   conf). toString());
     local.setWorkingDirectory(new Path(workDirName.toString()));

+ 4 - 2
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -1272,7 +1272,8 @@ public class JobHistory {
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
-                                      String.valueOf(httpPort)}); 
+                                      httpPort == -1 ? "" : 
+                                        String.valueOf(httpPort)}); 
         }
       }
     }
@@ -1468,7 +1469,8 @@ public class JobHistory {
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
-                                      String.valueOf(httpPort)}); 
+                                      httpPort == -1 ? "" : 
+                                        String.valueOf(httpPort)}); 
         }
       }
     }

+ 148 - 60
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -43,6 +43,7 @@ import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.StringUtils;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -108,6 +109,12 @@ class JobInProgress {
 
   // A set of running reduce TIPs
   Set<TaskInProgress> runningReduces;
+  
+  // A list of cleanup tasks for the map task attempts, to be launched
+  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
+  
+  // A list of cleanup tasks for the reduce task attempts, to be launched
+  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
   private int maxLevel;
   
@@ -438,12 +445,12 @@ class JobInProgress {
     // Just assign splits[0]
     cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
             jobtracker, conf, this, numMapTasks);
-    cleanup[0].setCleanupTask();
+    cleanup[0].setJobCleanupTask();
 
     // cleanup reduce tip.
     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks, jobtracker, conf, this);
-    cleanup[1].setCleanupTask();
+    cleanup[1].setJobCleanupTask();
 
     // create two setup tips, one map and one reduce.
     setup = new TaskInProgress[2];
@@ -451,12 +458,12 @@ class JobInProgress {
     // Just assign splits[0]
     setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
             jobtracker, conf, this, numMapTasks + 1 );
-    setup[0].setSetupTask();
+    setup[0].setJobSetupTask();
 
     // setup reduce tip.
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks + 1, jobtracker, conf, this);
-    setup[1].setSetupTask();
+    setup[1].setJobSetupTask();
     
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
@@ -696,11 +703,27 @@ class JobInProgress {
     if (wasComplete && (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
       status.setRunState(TaskStatus.State.KILLED);
     }
+    
+    // If the job is complete and a task has just reported its 
+    // state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
+    // make the task's state FAILED/KILLED without launching cleanup attempt.
+    // Note that if task is already a cleanup attempt, 
+    // we don't change the state to make sure the task gets a killTaskAction
+    if ((this.isComplete() || jobFailed || jobKilled) && 
+        !tip.isCleanupAttempt(taskid)) {
+      if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.FAILED);
+      } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.KILLED);
+      }
+    }
+    
     boolean change = tip.updateStatus(status);
     if (change) {
       TaskStatus.State state = status.getRunState();
+      // get the TaskTrackerStatus where the task ran 
       TaskTrackerStatus ttStatus = 
-        this.jobtracker.getTaskTracker(status.getTaskTracker());
+        this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
       String httpTaskLogLocation = null; 
 
       if (null != ttStatus){
@@ -721,8 +744,8 @@ class JobInProgress {
                                             taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap() &&
-                                            !tip.isCleanupTask() &&
-                                            !tip.isSetupTask(),
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
@@ -736,6 +759,15 @@ class JobInProgress {
           tip.doCommit(taskid);
         }
         return;
+      } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
+                 state == TaskStatus.State.KILLED_UNCLEAN) {
+        tip.incompleteSubTask(taskid, this.status);
+        // add this task, to be rescheduled as cleanup attempt
+        if (tip.isMapTask()) {
+          mapCleanupTasks.add(taskid);
+        } else {
+          reduceCleanupTasks.add(taskid);
+        }
       }
       //For a failed task update the JT datastructures. 
       else if (state == TaskStatus.State.FAILED ||
@@ -766,8 +798,8 @@ class JobInProgress {
                                             taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap() &&
-                                            !tip.isCleanupTask() &&
-                                            !tip.isSetupTask(),
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
                                             taskCompletionStatus, 
                                             httpTaskLogLocation
                                            );
@@ -796,7 +828,7 @@ class JobInProgress {
                  oldProgress + " to " + tip.getProgress());
     }
     
-    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
         if (maps.length == 0) {
@@ -894,11 +926,45 @@ class JobInProgress {
     return result;
   }    
 
+  /*
+   * Return task cleanup attempt if any, to run on a given tracker
+   */
+  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+                                                 boolean isMapSlot)
+  throws IOException {
+    if (this.status.getRunState() != JobStatus.RUNNING || 
+        jobFailed || jobKilled) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    TaskAttemptID taskid = null;
+    TaskInProgress tip = null;
+    if (isMapSlot) {
+      if (!mapCleanupTasks.isEmpty()) {
+        taskid = mapCleanupTasks.remove(0);
+        tip = maps[taskid.getTaskID().getId()];
+      }
+    } else {
+      if (!reduceCleanupTasks.isEmpty()) {
+        taskid = reduceCleanupTasks.remove(0);
+        tip = reduces[taskid.getTaskID().getId()];
+      }
+    }
+    if (tip != null) {
+      return tip.addRunningTask(taskid, taskTracker, true);
+    }
+    return null;
+  }
+  
   /**
    * Return a CleanupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public Task obtainCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainJobCleanupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
@@ -908,7 +974,7 @@ class JobInProgress {
     }
     
     synchronized(this) {
-      if (!canLaunchCleanupTask()) {
+      if (!canLaunchJobCleanupTask()) {
         return null;
       }
       
@@ -949,7 +1015,7 @@ class JobInProgress {
    * or all maps and reduces are complete
    * @return true/false
    */
-  private synchronized boolean canLaunchCleanupTask() {
+  private synchronized boolean canLaunchJobCleanupTask() {
     if (!tasksInited.get()) {
       return false;
     }
@@ -980,7 +1046,7 @@ class JobInProgress {
    * Return a SetupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public Task obtainSetupTask(TaskTrackerStatus tts, 
+  public Task obtainJobSetupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
@@ -1094,10 +1160,10 @@ class JobInProgress {
     String name;
     String splits = "";
     Enum counter = null;
-    if (tip.isSetupTask()) {
+    if (tip.isJobSetupTask()) {
       launchedSetup = true;
       name = Values.SETUP.name();
-    } else if (tip.isCleanupTask()) {
+    } else if (tip.isJobCleanupTask()) {
       launchedCleanup = true;
       name = Values.CLEANUP.name();
     } else if (tip.isMapTask()) {
@@ -1120,7 +1186,7 @@ class JobInProgress {
       JobHistory.Task.logStarted(tip.getTIPId(), name,
                                  tip.getExecStartTime(), splits);
     }
-    if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+    if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       jobCounters.incrCounter(counter, 1);
     }
     
@@ -1141,7 +1207,7 @@ class JobInProgress {
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
-    if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
+    if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
       int level = this.maxLevel;
@@ -1795,8 +1861,8 @@ class JobInProgress {
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTracker(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
-    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isSetupTask() ? Values.SETUP.name() :
+    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isJobSetupTask() ? Values.SETUP.name() :
                       tip.isMapTask() ? Values.MAP.name() : 
                       Values.REDUCE.name();
     if (status.getIsMap()){
@@ -1826,14 +1892,14 @@ class JobInProgress {
                                 status.getCounters()); 
         
     int newNumAttempts = tip.getActiveTasks().size();
-    if (tip.isSetupTask()) {
+    if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
       // Job can start running now.
       this.status.setSetupProgress(1.0f);
       this.status.setRunState(JobStatus.RUNNING);
       JobHistory.JobInfo.logStarted(profile.getJobID());
-    } else if (tip.isCleanupTask()) {
+    } else if (tip.isJobCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
         // kill the reduce tip
@@ -1968,6 +2034,8 @@ class JobInProgress {
         }
         jobKilled = true;
       }
+      // clear all unclean tasks
+      clearUncleanTasks();
       //
       // kill all TIPs.
       //
@@ -1983,6 +2051,21 @@ class JobInProgress {
     }
   }
   
+  private void clearUncleanTasks() {
+    TaskAttemptID taskid = null;
+    TaskInProgress tip = null;
+    while (!mapCleanupTasks.isEmpty()) {
+      taskid = mapCleanupTasks.remove(0);
+      tip = maps[taskid.getTaskID().getId()];
+      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+    }
+    while (!reduceCleanupTasks.isEmpty()) {
+      taskid = reduceCleanupTasks.remove(0);
+      tip = reduces[taskid.getTaskID().getId()];
+      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+    }
+  }
+  
   /**
    * Kill the job and all its component tasks. This method is called from 
    * jobtracker and should return fast as it locks the jobtracker.
@@ -2033,16 +2116,16 @@ class JobInProgress {
     boolean wasFailed = tip.isFailed();
 
     // Mark the taskid as FAILED or KILLED
-    tip.incompleteSubTask(taskid, taskTrackerStatus, this.status);
+    tip.incompleteSubTask(taskid, this.status);
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
-      if (tip.isCleanupTask()) {
+      if (tip.isJobCleanupTask()) {
         launchedCleanup = false;
-      } else if (tip.isSetupTask()) {
+      } else if (tip.isJobSetupTask()) {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
@@ -2079,43 +2162,48 @@ class JobInProgress {
     }
         
     // update job history
-    String taskTrackerName = taskTrackerStatus.getHost();
-    long finishTime = status.getFinishTime();
-    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isSetupTask() ? Values.SETUP.name() :
+    // get taskStatus from tip
+    TaskStatus taskStatus = tip.getTaskStatus(taskid);
+    String taskTrackerName = taskStatus.getTaskTracker();
+    String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
+    int taskTrackerPort = -1;
+    if (taskTrackerStatus != null) {
+      taskTrackerPort = taskTrackerStatus.getHttpPort();
+    }
+    long startTime = taskStatus.getStartTime();
+    long finishTime = taskStatus.getFinishTime();
+    List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
+    String diagInfo = taskDiagnosticInfo == null ? "" :
+      StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
+    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isJobSetupTask() ? Values.SETUP.name() :
                       tip.isMapTask() ? Values.MAP.name() : 
                       Values.REDUCE.name();
-    if (status.getIsMap()) {
-      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          taskType);
-      if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+    if (taskStatus.getIsMap()) {
+      JobHistory.MapAttempt.logStarted(taskid, startTime, 
+        taskTrackerName, taskTrackerPort, taskType);
+      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.MapAttempt.logFailed(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       } else {
-        JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(),
-                taskType);
+        JobHistory.MapAttempt.logKilled(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       }
     } else {
-      JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          taskType);
-      if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+      JobHistory.ReduceAttempt.logStarted(taskid, startTime, 
+        taskTrackerName, taskTrackerPort, taskType);
+      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       } else {
-        JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+        JobHistory.ReduceAttempt.logKilled(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       }
     }
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
-    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       if (tip.isMapTask()) {
         failedMapTasks++;
       } else {
@@ -2127,7 +2215,7 @@ class JobInProgress {
     // Note down that a task has failed on this tasktracker 
     //
     if (status.getRunState() == TaskStatus.State.FAILED) { 
-      addTrackerTaskFailure(taskTrackerStatus.getTrackerName());
+      addTrackerTaskFailure(taskTrackerName);
     }
         
     //
@@ -2145,7 +2233,7 @@ class JobInProgress {
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
-      boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
+      boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
                         tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -2154,9 +2242,9 @@ class JobInProgress {
         LOG.info("Aborting job " + profile.getJobID());
         JobHistory.Task.logFailed(tip.getTIPId(), 
                                   taskType,  
-                                  status.getFinishTime(), 
-                                  status.getDiagnosticInfo());
-        if (tip.isCleanupTask()) {
+                                  finishTime, 
+                                  diagInfo);
+        if (tip.isJobCleanupTask()) {
           // kill the other tip
           if (tip.isMapTask()) {
             cleanup[1].kill();
@@ -2165,7 +2253,7 @@ class JobInProgress {
           }
           terminateJob(JobStatus.FAILED);
         } else {
-          if (tip.isSetupTask()) {
+          if (tip.isJobSetupTask()) {
             // kill the other tip
             killSetupTip(!tip.isMapTask());
           }
@@ -2176,7 +2264,7 @@ class JobInProgress {
       //
       // Update the counters
       //
-      if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+      if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
         if (tip.isMapTask()) {
           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
         } else {
@@ -2215,8 +2303,8 @@ class JobInProgress {
     status.setFinishTime(System.currentTimeMillis());
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
-                              tip.isCleanupTask() ? Values.CLEANUP.name() : 
-                              tip.isSetupTask() ? Values.SETUP.name() : 
+                              tip.isJobCleanupTask() ? Values.CLEANUP.name() : 
+                              tip.isJobSetupTask() ? Values.SETUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               tip.getExecFinishTime(), reason, taskid); 
   }

+ 33 - 7
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1372,7 +1372,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   // and TaskInProgress
   ///////////////////////////////////////////////////////
   void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
-    LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");
+    LOG.info("Adding task " + 
+      (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") + 
+      "'"  + taskid + "' to tip " + 
+      tip.getTIPId() + ", for tracker '" + taskTracker + "'");
 
     // taskid --> tracker
     taskidToTrackerMap.put(taskid, taskTracker);
@@ -1454,6 +1457,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+            taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
@@ -1464,6 +1469,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+            taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
@@ -2148,7 +2155,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, true);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2157,7 +2164,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+          t = job.obtainTaskCleanupTask(taskTracker, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
                                   numUniqueHosts, true);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2168,7 +2183,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, false);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2177,7 +2192,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+          t = job.obtainTaskCleanupTask(taskTracker, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, false);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2700,7 +2723,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // And completed maps with zero reducers of the job 
         // never need to be failed. 
         if (!tip.isComplete() || 
-            (tip.isMapTask() && !tip.isSetupTask() && 
+            (tip.isMapTask() && !tip.isJobSetupTask() && 
              job.desiredReduces() != 0)) {
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING ||
@@ -2709,7 +2732,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
                            (tip.isMapTask() ? 
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
-                           TaskStatus.State.KILLED, trackerName, myInstrumentation);
+                            tip.isRunningTask(taskId) ? 
+                              TaskStatus.State.KILLED_UNCLEAN : 
+                              TaskStatus.State.KILLED,
+                            trackerName, myInstrumentation);
             jobsWithFailures.add(job);
           }
         } else {

+ 9 - 24
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -43,8 +43,6 @@ class JvmManager {
 
   JvmManagerForType reduceJvmManager;
   
-  TaskTracker tracker;
-
   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
       File stdout,File stderr,long logSize, File workDir, 
       Map<String,String> env, String pidFile, JobConf conf) {
@@ -53,10 +51,9 @@ class JvmManager {
   
   public JvmManager(TaskTracker tracker) {
     mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
-        true, tracker);
+        true);
     reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
-        false, tracker);
-    this.tracker = tracker;
+        false);
   }
   
   public void stop() {
@@ -74,9 +71,9 @@ class JvmManager {
 
   public void launchJvm(TaskRunner t, JvmEnv env) {
     if (t.getTask().isMapTask()) {
-      mapJvmManager.reapJvm(t, tracker, env);
+      mapJvmManager.reapJvm(t, env);
     } else {
-      reduceJvmManager.reapJvm(t, tracker, env);
+      reduceJvmManager.reapJvm(t, env);
     }
   }
 
@@ -125,12 +122,10 @@ class JvmManager {
     boolean isMap;
     
     Random rand = new Random(System.currentTimeMillis());
-    TaskTracker tracker;
 
-    public JvmManagerForType(int maxJvms, boolean isMap, TaskTracker tracker) {
+    public JvmManagerForType(int maxJvms, boolean isMap) {
       this.maxJvms = maxJvms;
       this.isMap = isMap;
-      this.tracker = tracker;
     }
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
@@ -194,7 +189,7 @@ class JvmManager {
       jvmIdToRunner.remove(jvmId);
     }
     private synchronized void reapJvm( 
-        TaskRunner t, TaskTracker tracker, JvmEnv env) {
+        TaskRunner t, JvmEnv env) {
       if (t.getTaskInProgress().wasKilled()) {
         //the task was killed in-flight
         //no need to do the rest of the operations
@@ -251,7 +246,7 @@ class JvmManager {
           LOG.info("Killing JVM: " + runnerToKill.jvmId);
           runnerToKill.kill();
         }
-        spawnNewJvm(jobId, env, tracker, t);
+        spawnNewJvm(jobId, env, t);
         return;
       }
       //*MUST* never reach this
@@ -281,7 +276,7 @@ class JvmManager {
       return details.toString();
     }
 
-    private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker, 
+    private void spawnNewJvm(JobID jobId, JvmEnv env,  
         TaskRunner t) {
       JvmRunner jvmRunner = new JvmRunner(env,jobId);
       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
@@ -293,11 +288,6 @@ class JvmManager {
       //tasks. Doing it this way also keeps code simple.
       jvmRunner.setDaemon(true);
       jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
-      if (tracker.isTaskMemoryManagerEnabled()) {
-        tracker.getTaskMemoryManager().addTask(
-            TaskAttemptID.forName(env.conf.get("mapred.task.id")),
-            tracker.getMemoryForTask(env.conf));
-      }
       setRunningTaskForJvm(jvmRunner.jvmId, t);
       LOG.info(jvmRunner.getName());
       jvmRunner.start();
@@ -355,6 +345,7 @@ class JvmManager {
           LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
               numTasksRan);
           try {
+            // In case of jvm-reuse,
             //the task jvm cleans up the common workdir for every 
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
@@ -362,12 +353,6 @@ class JvmManager {
               FileUtil.fullyDelete(env.workDir);
             }
           } catch (IOException ie){}
-          if (tracker.isTaskMemoryManagerEnabled()) {
-          // Remove the associated pid-file, if any
-            tracker.getTaskMemoryManager().
-               removePidFile(TaskAttemptID.forName(
-                   env.conf.get("mapred.task.id")));
-          }
         }
       }
 

+ 43 - 43
src/mapred/org/apache/hadoop/mapred/MapOutputFile.java

@@ -30,13 +30,13 @@ import org.apache.hadoop.fs.Path;
 class MapOutputFile {
 
   private JobConf conf;
-  private String jobDir;
+  private JobID jobId;
   
   MapOutputFile() {
   }
 
   MapOutputFile(JobID jobId) {
-    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+    this.jobId = jobId;
   }
 
   private LocalDirAllocator lDirAlloc = 
@@ -47,9 +47,9 @@ class MapOutputFile {
    */
   public Path getOutputFile(TaskAttemptID mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/file.out", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out", conf);
   }
 
   /** Create a local map output file name.
@@ -58,9 +58,9 @@ class MapOutputFile {
    */
   public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/file.out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out", size, conf);
   }
 
   /** Return the path to a local map output index file created earlier
@@ -68,9 +68,9 @@ class MapOutputFile {
    */
   public Path getOutputIndexFile(TaskAttemptID mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/file.out.index", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out.index", conf);
   }
 
   /** Create a local map output index file name.
@@ -79,10 +79,10 @@ class MapOutputFile {
    */
   public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/file.out.index", 
-                                          size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out.index", 
+                       size, conf);
   }
 
   /** Return a local map spill file created earlier.
@@ -91,10 +91,10 @@ class MapOutputFile {
    */
   public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/spill" 
-                                        + spillNumber + ".out", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" 
+                       + spillNumber + ".out", conf);
   }
 
   /** Create a local map spill file name.
@@ -104,10 +104,10 @@ class MapOutputFile {
    */
   public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, 
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/spill" + 
-                                          spillNumber + ".out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + 
+                       spillNumber + ".out", size, conf);
   }
 
   /** Return a local map spill index file created earlier
@@ -116,10 +116,10 @@ class MapOutputFile {
    */
   public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/spill" + 
-                                        spillNumber + ".out.index", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + 
+                       spillNumber + ".out.index", conf);
   }
 
   /** Create a local map spill index file name.
@@ -129,10 +129,10 @@ class MapOutputFile {
    */
   public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/spill" + spillNumber + 
-                                          ".out.index", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + spillNumber + 
+                       ".out.index", size, conf);
   }
 
   /** Return a local reduce input file created earlier
@@ -142,10 +142,10 @@ class MapOutputFile {
   public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        reduceTaskId + Path.SEPARATOR + 
-                                        "output" + "/map_" + mapId + ".out",
-                                        conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), reduceTaskId.toString())
+                       + "/map_" + mapId + ".out",
+                       conf);
   }
 
   /** Create a local reduce input file name.
@@ -157,17 +157,17 @@ class MapOutputFile {
                                    long size)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          reduceTaskId + Path.SEPARATOR +
-                                          ("output" + "/map_" + mapId.getId() + 
-                                           ".out"), 
-                                          size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), reduceTaskId.toString())
+                       + "/map_" + mapId.getId() + ".out", 
+                       size, conf);
   }
 
   /** Removes all of the files related to a task. */
   public void removeAll(TaskAttemptID taskId) throws IOException {
-    conf.deleteLocalFiles(jobDir + Path.SEPARATOR +
-                          taskId + Path.SEPARATOR + "output");
+    conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
+                          jobId.toString(), taskId.toString())
+);
   }
 
   public void setConf(Configuration conf) {
@@ -179,7 +179,7 @@ class MapOutputFile {
   }
   
   public void setJobId(JobID jobId) {
-    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+    this.jobId = jobId;
   }
 
 }

+ 26 - 16
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -101,13 +101,15 @@ class MapTask extends Task {
   @Override
   public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
-    Path localSplit = new Path(new Path(getJobFile()).getParent(), 
-                               "split.dta");
-    LOG.debug("Writing local split to " + localSplit);
-    DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
-    Text.writeString(out, splitClass);
-    split.write(out);
-    out.close();
+    if (isMapOrReduce()) {
+      Path localSplit = new Path(new Path(getJobFile()).getParent(), 
+                                 "split.dta");
+      LOG.debug("Writing local split to " + localSplit);
+      DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
+      Text.writeString(out, splitClass);
+      split.write(out);
+      out.close();
+    }
   }
   
   @Override
@@ -119,16 +121,20 @@ class MapTask extends Task {
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    Text.writeString(out, splitClass);
-    split.write(out);
-    split = null;
+    if (isMapOrReduce()) {
+      Text.writeString(out, splitClass);
+      split.write(out);
+      split = null;
+    }
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    splitClass = Text.readString(in);
-    split.readFields(in);
+    if (isMapOrReduce()) {
+      splitClass = Text.readString(in);
+      split.readFields(in);
+    }
   }
 
   @Override
@@ -279,12 +285,16 @@ class MapTask extends Task {
 
     initialize(job, reporter);
     // check if it is a cleanupJobTask
-    if (cleanupJob) {
-      runCleanup(umbilical);
+    if (jobCleanup) {
+      runJobCleanupTask(umbilical);
+      return;
+    }
+    if (jobSetup) {
+      runJobSetupTask(umbilical);
       return;
     }
-    if (setupJob) {
-      runSetupJob(umbilical);
+    if (taskCleanup) {
+      runTaskCleanupTask(umbilical);
       return;
     }
 

+ 15 - 9
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -335,7 +335,7 @@ class ReduceTask extends Task {
     throws IOException {
     job.setBoolean("mapred.skip.on", isSkipping());
 
-    if (!cleanupJob && !setupJob) {
+    if (isMapOrReduce()) {
       copyPhase = getProgress().addPhase("copy");
       sortPhase  = getProgress().addPhase("sort");
       reducePhase = getProgress().addPhase("reduce");
@@ -346,12 +346,16 @@ class ReduceTask extends Task {
     initialize(job, reporter);
 
     // check if it is a cleanupJobTask
-    if (cleanupJob) {
-      runCleanup(umbilical);
+    if (jobCleanup) {
+      runJobCleanupTask(umbilical);
       return;
     }
-    if (setupJob) {
-      runSetupJob(umbilical);
+    if (jobSetup) {
+      runJobSetupTask(umbilical);
+      return;
+    }
+    if (taskCleanup) {
+      runTaskCleanupTask(umbilical);
       return;
     }
     
@@ -374,6 +378,7 @@ class ReduceTask extends Task {
     }
     copyPhase.complete();                         // copy is already complete
     setPhase(TaskStatus.Phase.SORT);
+    statusUpdate(umbilical);
 
     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
     RawKeyValueIterator rIter = isLocal
@@ -389,6 +394,7 @@ class ReduceTask extends Task {
     
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
+    statusUpdate(umbilical);
 
     // make output collector
     String finalName = getOutputName(getPartition());
@@ -1125,10 +1131,10 @@ class ReduceTask extends Task {
         // else, we will check the localFS to find a suitable final location
         // for this path
         TaskAttemptID reduceId = reduceTask.getTaskID();
-        Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
-                                 Path.SEPARATOR + getTaskID().getJobID() +
-                                 Path.SEPARATOR + reduceId +
-                                 Path.SEPARATOR + "output" + "/map_" +
+        Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
+                                 reduceId.getJobID().toString(),
+                                 reduceId.toString()) 
+                                 + "/map_" +
                                  loc.getTaskId().getId() + ".out");
         
         // Copy the map output to a temp file whose name is unique to this attempt 

+ 108 - 53
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -107,8 +107,10 @@ abstract class Task implements Writable, Configurable {
   private TaskAttemptID taskId;                          // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
-  protected boolean cleanupJob = false;
-  protected boolean setupJob = false;
+  protected boolean jobCleanup = false;
+  protected boolean jobSetup = false;
+  protected boolean taskCleanup = false;
+  private String pidFile = "";
   private Thread pingProgressThread;
   
   //skip ranges based on failed ranges from previous attempts
@@ -127,7 +129,6 @@ abstract class Task implements Writable, Configurable {
   private final static int MAX_RETRIES = 10;
   protected JobContext jobContext;
   protected TaskAttemptContext taskContext;
-  private volatile boolean commitPending = false;
 
   ////////////////////////////////////////////
   // Constructors
@@ -160,7 +161,12 @@ abstract class Task implements Writable, Configurable {
   public String getJobFile() { return jobFile; }
   public TaskAttemptID getTaskID() { return taskId; }
   public Counters getCounters() { return counters; }
-  
+  public void setPidFile(String pidFile) { 
+    this.pidFile = pidFile; 
+  }
+  public String getPidFile() { 
+    return pidFile; 
+  }  
   /**
    * Get the job name for this task.
    * @return the job name
@@ -236,15 +242,50 @@ abstract class Task implements Writable, Configurable {
   }
 
   /**
-   * Sets whether the task is cleanup task
+   * Return current state of the task. 
+   * needs to be synchronized as communication thread 
+   * sends the state every second
+   * @return
+   */
+  synchronized TaskStatus.State getState(){
+    return this.taskStatus.getRunState(); 
+  }
+  /**
+   * Set current state of the task. 
+   * @param state
    */
-  public void setCleanupTask() {
-    cleanupJob = true;
+  synchronized void setState(TaskStatus.State state){
+    this.taskStatus.setRunState(state); 
+  }
+
+  void setTaskCleanupTask() {
+    taskCleanup = true;
+  }
+	   
+  boolean isTaskCleanupTask() {
+    return taskCleanup;
+  }
+
+  boolean isJobCleanupTask() {
+    return jobCleanup;
   }
 
-  public void setSetupTask() {
-    setupJob = true; 
+  boolean isJobSetupTask() {
+    return jobSetup;
   }
+
+  void setJobSetupTask() {
+    jobSetup = true; 
+  }
+
+  void setJobCleanupTask() {
+    jobCleanup = true; 
+  }
+
+  boolean isMapOrReduce() {
+    return !jobSetup && !jobCleanup && !taskCleanup;
+  }
+  
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -256,10 +297,13 @@ abstract class Task implements Writable, Configurable {
     taskStatus.write(out);
     skipRanges.write(out);
     out.writeBoolean(skipping);
-    out.writeBoolean(cleanupJob);
-    out.writeBoolean(setupJob);
+    out.writeBoolean(jobCleanup);
+    out.writeBoolean(jobSetup);
     out.writeBoolean(writeSkipRecs);
+    out.writeBoolean(taskCleanup);  
+    Text.writeString(out, pidFile);
   }
+  
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
@@ -270,9 +314,14 @@ abstract class Task implements Writable, Configurable {
     currentRecIndexIterator = skipRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
-    cleanupJob = in.readBoolean();
-    setupJob = in.readBoolean();
+    jobCleanup = in.readBoolean();
+    jobSetup = in.readBoolean();
     writeSkipRecs = in.readBoolean();
+    taskCleanup = in.readBoolean();
+    if (taskCleanup) {
+      setPhase(TaskStatus.Phase.CLEANUP);
+    }
+    pidFile = Text.readString(in);
   }
 
   @Override
@@ -362,17 +411,10 @@ abstract class Task implements Writable, Configurable {
               if (sendProgress) {
                 // we need to send progress update
                 updateCounters();
-                if (commitPending) {
-                  taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
-                                          taskProgress.get(),
-                                          taskProgress.toString(), 
-                                          counters);
-                } else {
-                  taskStatus.statusUpdate(TaskStatus.State.RUNNING,
-                                          taskProgress.get(),
-                                          taskProgress.toString(), 
-                                          counters);
-                }
+                taskStatus.statusUpdate(getState(),
+                                        taskProgress.get(),
+                                        taskProgress.toString(), 
+                                        counters);
                 taskFound = umbilical.statusUpdate(taskId, taskStatus);
                 taskStatus.clearStatus();
               }
@@ -412,6 +454,9 @@ abstract class Task implements Writable, Configurable {
   throws IOException {
     jobContext = new JobContext(job, reporter);
     taskContext = new TaskAttemptContext(job, taskId, reporter);
+    if (getState() == TaskStatus.State.UNASSIGNED) {
+      setState(TaskStatus.State.RUNNING);
+    }
     OutputCommitter committer = conf.getOutputCommitter();
     Path outputPath = FileOutputFormat.getOutputPath(conf);
     if (outputPath != null) {
@@ -580,8 +625,7 @@ abstract class Task implements Writable, Configurable {
     boolean commitRequired = outputCommitter.needsTaskCommit(taskContext);
     if (commitRequired) {
       int retries = MAX_RETRIES;
-      taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
-      commitPending = true;
+      setState(TaskStatus.State.COMMIT_PENDING);
       // say the task tracker that task is commit pending
       while (true) {
         try {
@@ -610,37 +654,21 @@ abstract class Task implements Writable, Configurable {
     sendDone(umbilical);
   }
 
-  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  protected void statusUpdate(TaskUmbilicalProtocol umbilical) 
   throws IOException {
-    //first wait for the COMMIT approval from the tasktracker
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        // send a final status report
-        if (commitPending) {
-          taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
-                                  taskProgress.get(),
-                                  taskProgress.toString(), 
-                                  counters);
-        } else {
-          taskStatus.statusUpdate(TaskStatus.State.RUNNING,
-                                  taskProgress.get(),
-                                  taskProgress.toString(), 
-                                  counters);
-        }
-
-        try {
-          if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
-            LOG.warn("Parent died.  Exiting "+taskId);
-            System.exit(66);
-          }
-          taskStatus.clearStatus();
-          return;
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt(); // interrupt ourself
+        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+          LOG.warn("Parent died.  Exiting "+taskId);
+          System.exit(66);
         }
+        taskStatus.clearStatus();
+        return;
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt(); // interrupt ourself
       } catch (IOException ie) {
-        LOG.warn("Failure sending last status update: " + 
+        LOG.warn("Failure sending status update: " + 
                   StringUtils.stringifyException(ie));
         if (--retries == 0) {
           throw ie;
@@ -648,6 +676,16 @@ abstract class Task implements Writable, Configurable {
       }
     }
   }
+  
+  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // send a final status report
+    taskStatus.statusUpdate(getState(),
+                            taskProgress.get(),
+                            taskProgress.toString(), 
+                            counters);
+    statusUpdate(umbilical);
+  }
 
   private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = MAX_RETRIES;
@@ -712,7 +750,24 @@ abstract class Task implements Writable, Configurable {
     }
   }
 
-  protected void runCleanup(TaskUmbilicalProtocol umbilical) 
+  protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    taskCleanup(umbilical);
+    done(umbilical);
+  }
+
+  void taskCleanup(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // set phase for this task
+    setPhase(TaskStatus.Phase.CLEANUP);
+    getProgress().setStatus("cleanup");
+    statusUpdate(umbilical);
+    LOG.info("Runnning cleanup for the task");
+    // do the cleanup
+    discardOutput(taskContext, conf.getOutputCommitter());
+  }
+
+  protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical) 
   throws IOException {
     // set phase for this task
     setPhase(TaskStatus.Phase.CLEANUP);
@@ -722,7 +777,7 @@ abstract class Task implements Writable, Configurable {
     done(umbilical);
   }
 
-  protected void runSetupJob(TaskUmbilicalProtocol umbilical) 
+  protected void runJobSetupTask(TaskUmbilicalProtocol umbilical) 
   throws IOException {
     // do the setup
     getProgress().setStatus("setup");

+ 85 - 40
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -82,8 +82,8 @@ class TaskInProgress {
   private long maxSkipRecords = 0;
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
-  private boolean cleanup = false; 
-  private boolean setup = false;
+  private boolean jobCleanup = false; 
+  private boolean jobSetup = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -103,8 +103,10 @@ class TaskInProgress {
   private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
-  // Map from taskId -> Task
-  private Map<TaskAttemptID, Task> tasks = new TreeMap<TaskAttemptID, Task>();
+  // Map from taskId -> TaskTracker Id, 
+  // contains cleanup attempts and where they ran, if any
+  private TreeMap<TaskAttemptID, String> cleanupTasks =
+    new TreeMap<TaskAttemptID, String>();
 
   private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
   private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
@@ -174,20 +176,20 @@ class TaskInProgress {
     return partition;
   }    
 
-  public boolean isCleanupTask() {
-   return cleanup;
+  public boolean isJobCleanupTask() {
+   return jobCleanup;
   }
   
-  public void setCleanupTask() {
-    cleanup = true;
+  public void setJobCleanupTask() {
+    jobCleanup = true;
   }
 
-  public boolean isSetupTask() {
-    return setup;
+  public boolean isJobSetupTask() {
+    return jobSetup;
   }
 	  
-  public void setSetupTask() {
-    setup = true;
+  public void setJobSetupTask() {
+    jobSetup = true;
   }
 
   public boolean isOnlyCommitPending() {
@@ -274,15 +276,6 @@ class TaskInProgress {
     return rawSplit != null;
   }
     
-  /**
-   * Return the Task object associated with a taskId
-   * @param taskId
-   * @return
-   */  
-  public Task getTask(TaskAttemptID taskId) {
-    return tasks.get(taskId);
-  }
-
   /**
    * Is the Task associated with taskid is the first attempt of the tip? 
    * @param taskId
@@ -392,7 +385,8 @@ class TaskInProgress {
       tasksReportedClosed.add(taskid);
       close = true;
     } else if (isComplete() && 
-               !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
+               !(isMapTask() && !jobSetup && 
+                   !jobCleanup && isComplete(taskid)) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
@@ -499,6 +493,8 @@ class TaskInProgress {
       // @see {@link TaskTracker.transmitHeartbeat()}
       if ((newState != TaskStatus.State.RUNNING && 
            newState != TaskStatus.State.COMMIT_PENDING && 
+           newState != TaskStatus.State.FAILED_UNCLEAN && 
+           newState != TaskStatus.State.KILLED_UNCLEAN && 
            newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
@@ -514,6 +510,8 @@ class TaskInProgress {
           newState == TaskStatus.State.UNASSIGNED) &&
           (oldState == TaskStatus.State.FAILED || 
            oldState == TaskStatus.State.KILLED || 
+           oldState == TaskStatus.State.FAILED_UNCLEAN || 
+           oldState == TaskStatus.State.KILLED_UNCLEAN || 
            oldState == TaskStatus.State.SUCCEEDED ||
            oldState == TaskStatus.State.COMMIT_PENDING)) {
         return false;
@@ -521,8 +519,17 @@ class TaskInProgress {
           
       changed = oldState != newState;
     }
-        
-    taskStatuses.put(taskid, status);
+    // if task is a cleanup attempt, do not replace the complete status,
+    // update only specific fields.
+    // For example, startTime should not be updated, 
+    // but finishTime has to be updated.
+    if (!isCleanupAttempt(taskid)) {
+      taskStatuses.put(taskid, status);
+    } else {
+      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+        status.getProgress(), status.getStateString(), status.getPhase(),
+        status.getFinishTime());
+    }
 
     // Recompute progress
     recomputeProgress();
@@ -534,29 +541,38 @@ class TaskInProgress {
    * has failed.
    */
   public void incompleteSubTask(TaskAttemptID taskid, 
-                                TaskTrackerStatus ttStatus,
                                 JobStatus jobStatus) {
     //
     // Note the failure and its location
     //
-    String trackerName = ttStatus.getTrackerName();
-    String trackerHostName = ttStatus.getHost();
-     
     TaskStatus status = taskStatuses.get(taskid);
+    String trackerName;
+    String trackerHostName = null;
     TaskStatus.State taskState = TaskStatus.State.FAILED;
     if (status != null) {
+      trackerName = status.getTaskTracker();
+      trackerHostName = 
+        JobInProgress.convertTrackerNameToHostName(trackerName);
       // Check if the user manually KILLED/FAILED this task-attempt...
       Boolean shouldFail = tasksToKill.remove(taskid);
       if (shouldFail != null) {
-        taskState = (shouldFail) ? TaskStatus.State.FAILED :
-                                   TaskStatus.State.KILLED;
+        if (isCleanupAttempt(taskid)) {
+          taskState = (shouldFail) ? TaskStatus.State.FAILED :
+                                     TaskStatus.State.KILLED;
+        } else {
+          taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
+                                     TaskStatus.State.KILLED_UNCLEAN;
+          
+        }
         status.setRunState(taskState);
         addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
       }
  
       taskState = status.getRunState();
       if (taskState != TaskStatus.State.FAILED && 
-              taskState != TaskStatus.State.KILLED) {
+          taskState != TaskStatus.State.KILLED &&
+          taskState != TaskStatus.State.FAILED_UNCLEAN &&
+          taskState != TaskStatus.State.KILLED_UNCLEAN) {
         LOG.info("Task '" + taskid + "' running on '" + trackerName + 
                 "' in state: '" + taskState + "' being failed!");
         status.setRunState(TaskStatus.State.FAILED);
@@ -577,7 +593,7 @@ class TaskInProgress {
     // should note this failure only for completed maps, only if this taskid;
     // completed this map. however if the job is done, there is no need to 
     // manipulate completed maps
-    if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) && 
+    if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) && 
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
       
@@ -597,7 +613,7 @@ class TaskInProgress {
           skipping = startSkipping();
         }
 
-      } else {
+      } else if (taskState == TaskStatus.State.KILLED) {
         numKilledTasks++;
       }
     }
@@ -724,6 +740,7 @@ class TaskInProgress {
     TaskStatus st = taskStatuses.get(taskId);
     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
         || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+        st.inTaskCleanupPhase() ||
         st.getRunState() == TaskStatus.State.UNASSIGNED)
         && tasksToKill.put(taskId, shouldFail) == null ) {
       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
@@ -848,11 +865,17 @@ class TaskInProgress {
     return addRunningTask(taskid, taskTracker);
   }
   
+  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+    return addRunningTask(taskid, taskTracker, false);
+  }
+  
   /**
    * Adds a previously running task to this tip. This is used in case of 
    * jobtracker restarts.
    */
-  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+  public Task addRunningTask(TaskAttemptID taskid, 
+                             String taskTracker,
+                             boolean taskCleanup) {
     // create the task
     Task t = null;
     if (isMapTask()) {
@@ -863,11 +886,17 @@ class TaskInProgress {
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
-    if (cleanup) {
-      t.setCleanupTask();
+    if (jobCleanup) {
+      t.setJobCleanupTask();
+    }
+    if (jobSetup) {
+      t.setJobSetupTask();
     }
-    if (setup) {
-      t.setSetupTask();
+    if (taskCleanup) {
+      t.setTaskCleanupTask();
+      t.setState(taskStatuses.get(taskid).getRunState());
+      cleanupTasks.put(taskid, taskTracker);
+      jobtracker.removeTaskEntry(taskid);
     }
     t.setConf(conf);
     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
@@ -876,7 +905,6 @@ class TaskInProgress {
     if(failedRanges.isTestAttempt()) {
       t.setWriteSkipRecs(false);
     }
-    tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);
 
@@ -884,6 +912,23 @@ class TaskInProgress {
     jobtracker.createTaskEntry(taskid, taskTracker, this);
     return t;
   }
+
+  boolean isRunningTask(TaskAttemptID taskid) {
+    TaskStatus status = taskStatuses.get(taskid);
+    return status != null && status.getRunState() == TaskStatus.State.RUNNING;
+  }
+  
+  boolean isCleanupAttempt(TaskAttemptID taskid) {
+    return cleanupTasks.containsKey(taskid);
+  }
+  
+  String machineWhereCleanupRan(TaskAttemptID taskid) {
+    return cleanupTasks.get(taskid);
+  }
+  
+  String machineWhereTaskRan(TaskAttemptID taskid) {
+    return taskStatuses.get(taskid).getTaskTracker();
+  }
     
   /**
    * Has this task already failed on this machine?
@@ -970,7 +1015,7 @@ class TaskInProgress {
   }
 
   public long getMapInputSize() {
-    if(isMapTask() && !setup && !cleanup) {
+    if(isMapTask() && !jobSetup && !jobCleanup) {
       return rawSplit.getDataLength();
     } else {
       return 0;

+ 41 - 9
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -80,7 +80,14 @@ public class TaskLog {
   
   private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
       LogName filter) throws IOException {
-    File indexFile = new File(getBaseDir(taskid.toString()), "log.index");
+    return getLogFileDetail(taskid, filter, false);
+  }
+  
+  private static LogFileDetail getLogFileDetail(TaskAttemptID taskid, 
+                                                LogName filter,
+                                                boolean isCleanup) 
+  throws IOException {
+    File indexFile = getIndexFile(taskid.toString(), isCleanup);
     BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
@@ -120,8 +127,17 @@ public class TaskLog {
   }
   
   public static File getIndexFile(String taskid) {
-    return new File(getBaseDir(taskid), "log.index");
+    return getIndexFile(taskid, false);
+  }
+  
+  public static File getIndexFile(String taskid, boolean isCleanup) {
+    if (isCleanup) {
+      return new File(getBaseDir(taskid), "log.index.cleanup");
+    } else {
+      return new File(getBaseDir(taskid), "log.index");
+    }
   }
+  
   private static File getBaseDir(String taskid) {
     return new File(LOG_DIR, taskid);
   }
@@ -129,9 +145,10 @@ public class TaskLog {
   private static long prevErrLength;
   private static long prevLogLength;
   
-  private static void writeToIndexFile(TaskAttemptID firstTaskid) 
+  private static void writeToIndexFile(TaskAttemptID firstTaskid,
+                                       boolean isCleanup) 
   throws IOException {
-    File indexFile = getIndexFile(currentTaskid.toString());
+    File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
     BufferedOutputStream bos = 
       new BufferedOutputStream(new FileOutputStream(indexFile,false));
     DataOutputStream dos = new DataOutputStream(bos);
@@ -159,8 +176,17 @@ public class TaskLog {
     prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
   }
   private volatile static TaskAttemptID currentTaskid = null;
+
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+                                           TaskAttemptID taskid) 
+  throws IOException {
+    syncLogs(firstTaskid, taskid, false);
+  }
+  
   @SuppressWarnings("unchecked")
-  public synchronized static void syncLogs(TaskAttemptID firstTaskid, TaskAttemptID taskid) 
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+                                           TaskAttemptID taskid,
+                                           boolean isCleanup) 
   throws IOException {
     System.out.flush();
     System.err.flush();
@@ -179,10 +205,9 @@ public class TaskLog {
       currentTaskid = taskid;
       resetPrevLengths(firstTaskid);
     }
-    writeToIndexFile(firstTaskid);
+    writeToIndexFile(firstTaskid, isCleanup);
   }
   
-  
   /**
    * The filter for userlogs.
    */
@@ -249,6 +274,12 @@ public class TaskLog {
   static class Reader extends InputStream {
     private long bytesRemaining;
     private FileInputStream file;
+
+    public Reader(TaskAttemptID taskid, LogName kind, 
+                  long start, long end) throws IOException {
+      this(taskid, kind, start, end, false);
+    }
+    
     /**
      * Read a log file from start to end positions. The offsets may be negative,
      * in which case they are relative to the end of the file. For example,
@@ -258,12 +289,13 @@ public class TaskLog {
      * @param kind the kind of log to read
      * @param start the offset to read from (negative is relative to tail)
      * @param end the offset to read upto (negative is relative to tail)
+     * @param isCleanup whether the attempt is cleanup attempt or not
      * @throws IOException
      */
     public Reader(TaskAttemptID taskid, LogName kind, 
-                  long start, long end) throws IOException {
+                  long start, long end, boolean isCleanup) throws IOException {
       // find the right log file
-      LogFileDetail fileDetail = getTaskLogFileDetail(taskid, kind);
+      LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
       // calculate the start and stop
       long size = fileDetail.length;
       if (start < 0) {

+ 19 - 10
src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java

@@ -104,7 +104,8 @@ public class TaskLogServlet extends HttpServlet {
   private void printTaskLog(HttpServletResponse response,
                             OutputStream out, TaskAttemptID taskId, 
                             long start, long end, boolean plainText, 
-                            TaskLog.LogName filter) throws IOException {
+                            TaskLog.LogName filter, boolean isCleanup) 
+  throws IOException {
     if (!plainText) {
       out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" +
                  "<pre>\n").getBytes());
@@ -112,7 +113,7 @@ public class TaskLogServlet extends HttpServlet {
 
     try {
       InputStream taskLogReader = 
-        new TaskLog.Reader(taskId, filter, start, end);
+        new TaskLog.Reader(taskId, filter, start, end, isCleanup);
       byte[] b = new byte[65536];
       int result;
       while (true) {
@@ -159,6 +160,7 @@ public class TaskLogServlet extends HttpServlet {
     long end = -1;
     boolean plainText = false;
     TaskLog.LogName filter = null;
+    boolean isCleanup = false;
 
     String taskIdStr = request.getParameter("taskid");
     if (taskIdStr == null) {
@@ -193,7 +195,12 @@ public class TaskLogServlet extends HttpServlet {
     if (sPlainText != null) {
       plainText = Boolean.valueOf(sPlainText);
     }
-
+    
+    String sCleanup = request.getParameter("cleanup");
+    if (sCleanup != null) {
+      isCleanup = Boolean.valueOf(sCleanup);
+    }
+    
     OutputStream out = response.getOutputStream();
     if( !plainText ) {
       out.write(("<html>\n" +
@@ -203,21 +210,22 @@ public class TaskLogServlet extends HttpServlet {
 
       if (filter == null) {
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.STDOUT);
+                     TaskLog.LogName.STDOUT, isCleanup);
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.STDERR);
+                     TaskLog.LogName.STDERR, isCleanup);
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.SYSLOG);
+                     TaskLog.LogName.SYSLOG, isCleanup);
         if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
           printTaskLog(response, out, taskId, start, end, plainText, 
-                       TaskLog.LogName.DEBUGOUT);
+                       TaskLog.LogName.DEBUGOUT, isCleanup);
         }
         if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
           printTaskLog(response, out, taskId, start, end, plainText, 
-                       TaskLog.LogName.PROFILE);
+                       TaskLog.LogName.PROFILE, isCleanup);
         }
       } else {
-        printTaskLog(response, out, taskId, start, end, plainText, filter);
+        printTaskLog(response, out, taskId, start, end, plainText, filter,
+                     isCleanup);
       }
       
       out.write("</body></html>\n".getBytes());
@@ -226,7 +234,8 @@ public class TaskLogServlet extends HttpServlet {
       response.sendError(HttpServletResponse.SC_BAD_REQUEST,
           "You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
     } else {
-      printTaskLog(response, out, taskId, start, end, plainText, filter);
+      printTaskLog(response, out, taskId, start, end, plainText, filter, 
+                   isCleanup);
     } 
   }
 }

+ 15 - 47
src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java

@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
+import java.io.File;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -27,9 +27,6 @@ import java.util.ArrayList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 
@@ -64,14 +61,14 @@ class TaskMemoryManagerThread extends Thread {
         ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit) {
+  public void addTask(TaskAttemptID tid, long memLimit, String pidFile) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
       // TODO: Negative values must have been checked in JobConf.
       memLimit = (memLimit < 0 ? JobConf.DISABLED_VIRTUAL_MEMORY_LIMIT
           : memLimit);
       ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
-          sleepTimeBeforeSigKill);
+          sleepTimeBeforeSigKill, pidFile);
       tasksToBeAdded.put(tid, ptInfo);
     }
   }
@@ -87,9 +84,11 @@ class TaskMemoryManagerThread extends Thread {
     private String pid;
     private ProcfsBasedProcessTree pTree;
     private long memLimit;
+    private String pidFile;
 
     public ProcessTreeInfo(TaskAttemptID tid, String pid,
-        ProcfsBasedProcessTree pTree, long memLimit, long sleepTimeBeforeSigKill) {
+        ProcfsBasedProcessTree pTree, long memLimit, 
+        long sleepTimeBeforeSigKill, String pidFile) {
       this.tid = tid;
       this.pid = pid;
       this.pTree = pTree;
@@ -97,6 +96,7 @@ class TaskMemoryManagerThread extends Thread {
         this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
       }
       this.memLimit = memLimit;
+      this.pidFile = pidFile;
     }
 
     public TaskAttemptID getTID() {
@@ -166,7 +166,8 @@ class TaskMemoryManagerThread extends Thread {
 
         // Initialize any uninitialized processTrees
         if (pId == null) {
-          pId = getPid(tid); // get pid from pid-file
+          // get pid from pid-file
+          pId = getPid(ptInfo.pidFile); 
           if (pId != null) {
             // PID will be null, either if the pid file is yet to be created
             // or if the tip is finished and we removed pidFile, but the TIP
@@ -232,47 +233,14 @@ class TaskMemoryManagerThread extends Thread {
   /**
    * Load pid of the task from the pidFile.
    * 
-   * @param tipID
+   * @param pidFileName
    * @return the pid of the task process.
    */
-  private String getPid(TaskAttemptID tipID) {
-    Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
-    if (pidFileName == null) {
-      return null;
-    }
-    return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
-  }
-
-  private static LocalDirAllocator lDirAlloc = 
-    new LocalDirAllocator("mapred.local.dir");
-
-  /**
-   * Get the pidFile path of a Task
-   * @param tipID
-   * @return pidFile's Path
-   */
-  public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
-    Path pidFileName = null;
-    try {
-      //this actually need not use a localdirAllocator since the PID
-      //files are really small..
-      pidFileName = lDirAlloc.getLocalPathToRead(
-          (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
-          conf);
-    } catch (IOException i) {
-      // PID file is not there
-      LOG.debug("Failed to get pidFile name for " + tipID);
-    }
-    return pidFileName;
-  }
-  public void removePidFile(TaskAttemptID tid) {
-    if (taskTracker.isTaskMemoryManagerEnabled()) {
-      Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
-      if (pidFilePath != null) {
-        try {
-          FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
-        } catch(IOException ie) {}
-      }
+  private String getPid(String pidFileName) {
+    if ((new File(pidFileName)).exists()) {
+      return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName);
     }
+    return null;
   }
+  
 }

+ 11 - 9
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -113,9 +113,10 @@ abstract class TaskRunner extends Thread {
                           new Path(conf.getJar()).getParent().toString());
       }
       File workDir = new File(lDirAlloc.getLocalPathToRead(
-                                TaskTracker.getJobCacheSubdir() 
-                                + Path.SEPARATOR + t.getJobID() 
-                                + Path.SEPARATOR + t.getTaskID()
+                                TaskTracker.getLocalTaskDir( 
+                                  t.getJobID().toString(), 
+                                  t.getTaskID().toString(),
+                                  t.isTaskCleanupTask())
                                 + Path.SEPARATOR + MRConstants.WORKDIR,
                                 conf). toString());
 
@@ -374,12 +375,12 @@ abstract class TaskRunner extends Thread {
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
 
-      String pidFile = null;
-      if (tracker.isTaskMemoryManagerEnabled()) {
-        pidFile = lDirAlloc.getLocalPathForWrite(
-            (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid),
+      String pidFile = lDirAlloc.getLocalPathForWrite(
+          (TaskTracker.getPidFile(t.getJobID().toString(), 
+             taskid.toString(), t.isTaskCleanupTask())),
             this.conf).toString();
-      }
+      t.setPidFile(pidFile);
+      tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -458,7 +459,8 @@ abstract class TaskRunner extends Thread {
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tracker.reportTaskFinished(t.getTaskID(), false);
+      tip.reportTaskFinished();
+      tracker.reportTaskFinished();
     }
   }
   

+ 35 - 3
src/mapred/org/apache/hadoop/mapred/TaskStatus.java

@@ -41,7 +41,7 @@ abstract class TaskStatus implements Writable, Cloneable {
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING}
+                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
     
   private TaskAttemptID taskid;
   private float progress;
@@ -202,6 +202,12 @@ abstract class TaskStatus implements Writable, Cloneable {
     }
     this.phase = phase; 
   }
+
+  boolean inTaskCleanupPhase() {
+    return (this.phase == TaskStatus.Phase.CLEANUP && 
+      (this.runState == TaskStatus.State.FAILED_UNCLEAN || 
+      this.runState == TaskStatus.State.KILLED_UNCLEAN));
+  }
   
   public boolean getIncludeCounters() {
     return includeCounters; 
@@ -259,9 +265,9 @@ abstract class TaskStatus implements Writable, Cloneable {
   /**
    * Update the status of the task.
    * 
+   * @param runstate
    * @param progress
    * @param state
-   * @param phase
    * @param counters
    */
   synchronized void statusUpdate(State runState, 
@@ -298,7 +304,33 @@ abstract class TaskStatus implements Writable, Cloneable {
     this.counters = status.getCounters();
     this.outputSize = status.outputSize;
   }
-  
+
+  /**
+   * Update specific fields of task status
+   * 
+   * This update is done in JobTracker when a cleanup attempt of task
+   * reports its status. Then update only specific fields, not all.
+   * 
+   * @param runState
+   * @param progress
+   * @param state
+   * @param phase
+   * @param finishTime
+   */
+  synchronized void statusUpdate(State runState, 
+                                 float progress,
+                                 String state, 
+                                 Phase phase,
+                                 long finishTime) {
+    setRunState(runState);
+    setProgress(progress);
+    setStateString(state);
+    setPhase(phase);
+    if (finishTime != 0) {
+      this.finishTime = finishTime; 
+    }
+  }
+
   /**
    * Clear out transient information after sending out a status-update
    * from either the {@link Task} to the {@link TaskTracker} or from the

+ 179 - 83
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -181,7 +181,8 @@ public class TaskTracker
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
-  private static final String PIDDIR = "pids";
+  private static final String PID = "pid";
+  private static final String OUTPUT = "output";
   private JobConf originalConf;
   private JobConf fConf;
   private int maxCurrentMapTasks;
@@ -358,10 +359,36 @@ public class TaskTracker
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
 
-  static String getPidFilesSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+  static String getLocalJobDir(String jobid) {
+	return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
   }
-    
+
+  static String getLocalTaskDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid, false) ; 
+  }
+
+  static String getIntermediateOutputDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid) 
+           + Path.SEPARATOR + TaskTracker.OUTPUT ; 
+  }
+
+  static String getLocalTaskDir(String jobid, 
+                                String taskid, 
+                                boolean isCleanupAttempt) {
+	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+	if (isCleanupAttempt) { 
+      taskDir = taskDir + ".cleanup";
+	}
+	return taskDir;
+  }
+
+  static String getPidFile(String jobid, 
+                           String taskid, 
+                           boolean isCleanup) {
+    return  getLocalTaskDir(jobid, taskid, isCleanup)
+            + Path.SEPARATOR + PID;
+  }
+
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
     if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
@@ -699,9 +726,9 @@ public class TaskTracker
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
     }
-    Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                                    + Path.SEPARATOR + jobId 
-                                    + Path.SEPARATOR + "job.xml"),
+    Path localJobFile = lDirAlloc.getLocalPathForWrite(
+                                    getLocalJobDir(jobId.toString())
+                                    + Path.SEPARATOR + "job.xml",
                                     jobFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, tip);
     synchronized (rjob) {
@@ -725,9 +752,9 @@ public class TaskTracker
         
         // create the 'work' directory
         // job-specific shared directory for use as scratch space 
-        Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                       + Path.SEPARATOR + jobId 
-                       + Path.SEPARATOR + "work"), fConf);
+        Path workDir = lDirAlloc.getLocalPathForWrite(
+                         (getLocalJobDir(jobId.toString())
+                         + Path.SEPARATOR + "work"), fConf);
         if (!localFs.mkdirs(workDir)) {
           throw new IOException("Mkdirs failed to create " 
                       + workDir.toString());
@@ -749,8 +776,7 @@ public class TaskTracker
           // Here we check for and we check five times the size of jarFileSize
           // to accommodate for unjarring the jar file in work directory 
           localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
-                                     getJobCacheSubdir()
-                                     + Path.SEPARATOR + jobId 
+                                     getLocalJobDir(jobId.toString())
                                      + Path.SEPARATOR + "jars",
                                      5 * jarFileSize, fConf), "job.jar");
           if (!localFs.mkdirs(localJarFile.getParent())) {
@@ -1164,7 +1190,8 @@ public class TaskTracker
       for (TaskStatus taskStatus : status.getTaskReports()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            !taskStatus.inTaskCleanupPhase()) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
           } else {
@@ -1281,7 +1308,8 @@ public class TaskTracker
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
       if (tip.getRunState() == TaskStatus.State.RUNNING ||
-          tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          tip.isCleaningup()) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
         long jobTaskTimeout = tip.getTaskTimeout();
@@ -1335,8 +1363,7 @@ public class TaskTracker
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
           directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
-                                   SUBDIR + Path.SEPARATOR + JOBCACHE + 
-                                   Path.SEPARATOR +  rjob.getJobID()));
+            getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1581,7 +1608,9 @@ public class TaskTracker
           }
           synchronized (tip) {
             //to make sure that there is no kill task action for this
-            if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+            if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
+                tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+                tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
               //got killed externally while still in the launcher queue
               addFreeSlot();
               continue;
@@ -1602,7 +1631,8 @@ public class TaskTracker
   private TaskInProgress registerTask(LaunchTaskAction action, 
       TaskLauncher launcher) {
     Task t = action.getTask();
-    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
+    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+             " task's state:" + t.getState());
     TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
@@ -1624,10 +1654,6 @@ public class TaskTracker
   private void startNewTask(TaskInProgress tip) {
     try {
       localizeJob(tip);
-      if (isTaskMemoryManagerEnabled()) {
-        taskMemoryManager.addTask(tip.getTask().getTaskID(), 
-            getMemoryForTask(tip.getJobConf()));
-      }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -1648,7 +1674,23 @@ public class TaskTracker
       }
     }
   }
-    
+  
+  void addToMemoryManager(TaskAttemptID attemptId, 
+                          JobConf conf, 
+                          String pidFile) {
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.addTask(attemptId, 
+        getMemoryForTask(conf), pidFile);
+    }
+  }
+
+  void removeFromMemoryManager(TaskAttemptID attemptId) {
+    // Remove the entry from taskMemoryManagerThread's data structures.
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.removeTask(attemptId);
+    }
+  }
+
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -1735,10 +1777,12 @@ public class TaskTracker
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
-                                               TaskStatus.State.UNASSIGNED, 
+                                               task.getState(),
                                                diagnosticInfo.toString(), 
                                                "initializing",  
                                                getName(), 
+                                               task.isTaskCleanupTask() ? 
+                                                 TaskStatus.Phase.CLEANUP :  
                                                task.isMapTask()? TaskStatus.Phase.MAP:
                                                TaskStatus.Phase.SHUFFLE,
                                                task.getCounters()); 
@@ -1748,9 +1792,10 @@ public class TaskTracker
     private void localizeTask(Task task) throws IOException{
 
       Path localTaskDir = 
-        lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + 
-                    Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
-                    task.getTaskID()), defaultJobConf );
+        lDirAlloc.getLocalPathForWrite(
+          TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
+            task.getTaskID().toString(), task.isTaskCleanupTask()), 
+          defaultJobConf );
       
       FileSystem localFs = FileSystem.getLocal(fConf);
       if (!localFs.mkdirs(localTaskDir)) {
@@ -1760,8 +1805,7 @@ public class TaskTracker
 
       // create symlink for ../work if it already doesnt exist
       String workDir = lDirAlloc.getLocalPathToRead(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
+                         TaskTracker.getLocalJobDir(task.getJobID().toString())
                          + Path.SEPARATOR  
                          + "work", defaultJobConf).toString();
       String link = localTaskDir.getParent().toString() 
@@ -1772,11 +1816,10 @@ public class TaskTracker
       
       // create the working-directory of the task 
       Path cwd = lDirAlloc.getLocalPathForWrite(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
-                         + Path.SEPARATOR + task.getTaskID()
-                         + Path.SEPARATOR + MRConstants.WORKDIR,
-                         defaultJobConf);
+                   getLocalTaskDir(task.getJobID().toString(), 
+                      task.getTaskID().toString(), task.isTaskCleanupTask()) 
+                   + Path.SEPARATOR + MRConstants.WORKDIR,
+                   defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
                     + cwd.toString());
@@ -1870,9 +1913,13 @@ public class TaskTracker
      * Kick off the task execution
      */
     public synchronized void launchTask() throws IOException {
-      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
+          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
         localizeTask(task);
-        this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+          this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        }
         this.runner = task.createRunner(TaskTracker.this, this);
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1882,6 +1929,10 @@ public class TaskTracker
       }
     }
 
+    boolean isCleaningup() {
+   	  return this.taskStatus.inTaskCleanupPhase();
+    }
+    
     /**
      * The task is reporting its progress
      */
@@ -1889,10 +1940,14 @@ public class TaskTracker
     {
       LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + 
           "% " + taskStatus.getStateString());
-      
+      // task will report its state as
+      // COMMIT_PENDING when it is waiting for commit response and 
+      // when it is committing.
+      // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
       if (this.done || 
           (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
-          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
+          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+          !isCleaningup())) {
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done() or if the task has been
         //KILLED/FAILED
@@ -1943,7 +1998,16 @@ public class TaskTracker
      * The task is reporting that it's done running
      */
     public synchronized void reportDone() {
-      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      if (isCleaningup()) {
+        if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.FAILED);
+        } else if (this.taskStatus.getRunState() == 
+                   TaskStatus.State.KILLED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.KILLED);
+        }
+      } else {
+        this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      }
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -1958,6 +2022,11 @@ public class TaskTracker
       return wasKilled;
     }
 
+    void reportTaskFinished() {
+      taskFinished();
+      releaseSlot();
+    }
+
     /**
      * The task has actually finished running.
      */
@@ -1984,7 +2053,23 @@ public class TaskTracker
         if (!done) {
           if (!wasKilled) {
             failures += 1;
-            taskStatus.setRunState(TaskStatus.State.FAILED);
+            /* State changes:
+             * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
+             * FAILED_UNCLEAN -> FAILED 
+             * KILLED_UNCLEAN -> KILLED 
+             */
+            if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            } else if (taskStatus.getRunState() == 
+                       TaskStatus.State.KILLED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.KILLED);
+            } else if (task.isMapOrReduce() && 
+                       taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+              taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+            } else {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            }
+            removeFromMemoryManager(task.getTaskID());
             // call the script here for the failed tasks.
             if (debugCommand != null) {
               String taskStdout ="";
@@ -2010,9 +2095,10 @@ public class TaskTracker
               File workDir = null;
               try {
                 workDir = new File(lDirAlloc.getLocalPathToRead(
-                                     TaskTracker.getJobCacheSubdir() 
-                                     + Path.SEPARATOR + task.getJobID() 
-                                     + Path.SEPARATOR + task.getTaskID()
+                                     TaskTracker.getLocalTaskDir( 
+                                       task.getJobID().toString(), 
+                                       task.getTaskID().toString(),
+                                       task.isTaskCleanupTask())
                                      + Path.SEPARATOR + MRConstants.WORKDIR,
                                      localJobConf). toString());
               } catch (IOException e) {
@@ -2065,14 +2151,14 @@ public class TaskTracker
                 LOG.warn("Exception in add diagnostics!");
               }
             }
-          } else {
-            taskStatus.setRunState(TaskStatus.State.KILLED);
           }
           taskStatus.setProgress(0.0f);
         }
         this.taskStatus.setFinishTime(System.currentTimeMillis());
         needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || 
-                       taskStatus.getRunState() == TaskStatus.State.KILLED);
+                taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+                taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || 
+                taskStatus.getRunState() == TaskStatus.State.KILLED);
       }
 
       //
@@ -2182,7 +2268,8 @@ public class TaskTracker
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
             getRunState() == TaskStatus.State.UNASSIGNED ||
-            getRunState() == TaskStatus.State.COMMIT_PENDING) {
+            getRunState() == TaskStatus.State.COMMIT_PENDING ||
+            isCleaningup()) {
           kill(wasFailure);
         }
       }
@@ -2196,16 +2283,38 @@ public class TaskTracker
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
+      /* State changes:
+       * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
+       * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+       * FAILED_UNCLEAN -> FAILED 
+       * KILLED_UNCLEAN -> KILLED
+       * UNASSIGNED -> FAILED/KILLED 
+       */
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
-          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          isCleaningup()) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
         runner.kill();
-        taskStatus.setRunState((wasFailure) ? 
-                                  TaskStatus.State.FAILED : 
-                                  TaskStatus.State.KILLED);
+        if (task.isMapOrReduce()) {
+          taskStatus.setRunState((wasFailure) ? 
+                                    TaskStatus.State.FAILED_UNCLEAN : 
+                                    TaskStatus.State.KILLED_UNCLEAN);
+        } else {
+          // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+          if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.FAILED);
+          } else if (taskStatus.getRunState() == 
+                     TaskStatus.State.KILLED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.KILLED);
+          } else {
+            taskStatus.setRunState((wasFailure) ? 
+                                      TaskStatus.State.FAILED : 
+                                      TaskStatus.State.KILLED);
+          }
+        }
       } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
           failures += 1;
@@ -2214,6 +2323,7 @@ public class TaskTracker
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      removeFromMemoryManager(task.getTaskID());
       releaseSlot();
     }
     
@@ -2265,7 +2375,12 @@ public class TaskTracker
 
       synchronized (TaskTracker.this) {
         if (needCleanup) {
-          tasks.remove(taskId);
+          // see if tasks data structure is holding this tip.
+          // tasks could hold the tip for cleanup attempt, if cleanup attempt 
+          // got launched before this method.
+          if (tasks.get(taskId) == this) {
+            tasks.remove(taskId);
+          }
         }
         synchronized (this){
           if (alwaysKeepTaskFiles ||
@@ -2277,8 +2392,8 @@ public class TaskTracker
       }
       synchronized (this) {
         try {
-          String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
-                           + task.getJobID() + Path.SEPARATOR + taskId;
+          String taskDir = getLocalTaskDir(task.getJobID().toString(),
+                             taskId.toString(), task.isTaskCleanupTask());
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2415,7 +2530,7 @@ public class TaskTracker
   throws IOException {
     LOG.info("Task " + taskid + " is in COMMIT_PENDING");
     statusUpdate(taskid, taskStatus);
-    reportTaskFinished(taskid, true);
+    reportTaskFinished();
   }
   
   /**
@@ -2490,31 +2605,14 @@ public class TaskTracker
   //  Called by TaskTracker thread after task process ends
   /////////////////////////////////////////////////////
   /**
-   * The task is no longer running.  It may not have completed successfully
+   * when you see report task finished, wake up the heartbeat
    */
-  void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) {
-    TaskInProgress tip;
-    synchronized (this) {
-      tip = tasks.get(taskid);
-    }
-    if (tip != null) {
-      if (!commitPending) {
-        tip.taskFinished();
-        // Remove the entry from taskMemoryManagerThread's data structures.
-        if (isTaskMemoryManagerEnabled()) {
-          taskMemoryManager.removeTask(taskid);
-        }
-        tip.releaseSlot();
-      }
-      synchronized(finishedCount) {
-        finishedCount[0]++;
-        finishedCount.notify();
-      }
-    } else {
-      LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
+  void reportTaskFinished() {
+    synchronized(finishedCount) {
+      finishedCount[0]++;
+      finishedCount.notify();
     }
   }
-  
 
   /**
    * A completed map task's output has been lost.
@@ -2740,15 +2838,13 @@ public class TaskTracker
 
         // Index file
         Path indexFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out.index", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out.index", conf);
         
         // Map-output file
         Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out", conf);
 
         /**
          * Read the index file to get the information about where

+ 4 - 2
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -206,7 +206,8 @@ class TaskTrackerStatus implements Writable {
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         mapCount++;
       }
     }
@@ -223,7 +224,8 @@ class TaskTrackerStatus implements Writable {
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         reduceCount++;
       }
     }

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -52,9 +52,10 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
    *            encapsulates the events and whether to reset events index.
    * Version 13 changed the getTask method signature for HADOOP-249
    * Version 14 changed the getTask method signature for HADOOP-4232
+   * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
    * */
 
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   /**
    * Called when a child task process starts, to get its task.

+ 137 - 0
src/test/org/apache/hadoop/mapred/TestTaskFail.java

@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestTaskFail extends TestCase {
+  public static class MapperClass extends MapReduceBase
+  implements Mapper<LongWritable, Text, Text, IntWritable> {
+    String taskid;
+    public void configure(JobConf job) {
+      taskid = job.get("mapred.task.id");
+    }
+    public void map (LongWritable key, Text value, 
+                     OutputCollector<Text, IntWritable> output, 
+                     Reporter reporter) throws IOException {
+      if (taskid.endsWith("_0")) {
+        throw new IOException();
+      } else if (taskid.endsWith("_1")) {
+        System.exit(-1);
+      } 
+    }
+  }
+
+  public RunningJob launchJob(JobConf conf,
+                              Path inDir,
+                              Path outDir,
+                              String input) 
+  throws IOException {
+    // set up the input file system and write input text.
+    FileSystem inFs = inDir.getFileSystem(conf);
+    FileSystem outFs = outDir.getFileSystem(conf);
+    outFs.delete(outDir, true);
+    if (!inFs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    {
+      // write input into input file
+      DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+      file.writeBytes(input);
+      file.close();
+    }
+
+    // configure the mapred Job
+    conf.setMapperClass(MapperClass.class);        
+    conf.setReducerClass(IdentityReducer.class);
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+                                    "/tmp")).toString().replace(' ', '+');
+    conf.set("test.build.data", TEST_ROOT_DIR);
+    // return the RunningJob handle.
+    return new JobClient(conf).submitJob(conf);
+  }
+		  
+  public void testWithDFS() throws IOException {
+    MiniDFSCluster dfs = null;
+    MiniMRCluster mr = null;
+    FileSystem fileSys = null;
+    try {
+      final int taskTrackers = 4;
+
+      Configuration conf = new Configuration();
+      dfs = new MiniDFSCluster(conf, 4, true, null);
+      fileSys = dfs.getFileSystem();
+      mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+      JobConf jobConf = mr.createJobConf();
+      final Path inDir = new Path("./input");
+      final Path outDir = new Path("./output");
+      String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+      RunningJob job = null;
+
+      job = launchJob(jobConf, inDir, outDir, input);
+      // wait for the job to finish.
+      while (!job.isComplete());
+      assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+      
+      JobID jobId = job.getID();
+      // construct the task id of first map task
+      TaskAttemptID attemptId = 
+        new TaskAttemptID(new TaskID(jobId, true, 0), 0);
+      TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+                              getTip(attemptId.getTaskID());
+      // this should not be cleanup attempt since the first attempt 
+      // fails with an exception
+      assertTrue(!tip.isCleanupAttempt(attemptId));
+      TaskStatus ts = 
+        mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+      assertTrue(ts != null);
+      assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+      
+      attemptId =  new TaskAttemptID(new TaskID(jobId, true, 0), 1);
+      // this should be cleanup attempt since the second attempt fails
+      // with System.exit
+      assertTrue(tip.isCleanupAttempt(attemptId));
+      ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+      assertTrue(ts != null);
+      assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+
+    } finally {
+      if (dfs != null) { dfs.shutdown(); }
+      if (mr != null) { mr.shutdown(); }
+    }
+  }
+
+  public static void main(String[] argv) throws Exception {
+    TestTaskFail td = new TestTaskFail();
+    td.testWithDFS();
+  }
+}

+ 63 - 8
src/webapps/job/taskdetails.jsp

@@ -67,13 +67,19 @@
         }
       }
     }
-    TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
-        : null;
+    TaskInProgress tip = null;
+    if (job != null && tipidObj != null) {
+      tip = job.getTaskInProgress(tipidObj);
+    }
+    TaskStatus[] ts = null;
+    if (tip != null) { 
+      ts = tip.getTaskStatuses();
+    }
     boolean isCleanupOrSetup = false;
-    if (tipidObj != null) { 
-      isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+    if ( tip != null) {
+      isCleanupOrSetup = tip.isJobCleanupTask();
       if (!isCleanupOrSetup) {
-        isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+        isCleanupOrSetup = tip.isJobSetupTask();
       }
     }
 %>
@@ -115,14 +121,41 @@
       TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
       out.print("<tr><td>" + status.getTaskID() + "</td>");
       String taskAttemptTracker = null;
+      String cleanupTrackerName = null;
+      TaskTrackerStatus cleanupTracker = null;
+      String cleanupAttemptTracker = null;
+      boolean hasCleanupAttempt = false;
+      if (tip != null && tip.isCleanupAttempt(status.getTaskID())) {
+        cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID());
+        cleanupTracker = tracker.getTaskTracker(cleanupTrackerName);
+        if (cleanupTracker != null) {
+          cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":"
+            + cleanupTracker.getHttpPort();
+        }
+        hasCleanupAttempt = true;
+      }
+      out.print("<td>");
+      if (hasCleanupAttempt) {
+        out.print("Task attempt: ");
+      }
       if (taskTracker == null) {
-        out.print("<td>" + taskTrackerName + "</td>");
+        out.print(taskTrackerName);
       } else {
         taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
           + taskTracker.getHttpPort();
-        out.print("<td><a href=\"" + taskAttemptTracker + "\">"
-          + tracker.getNode(taskTracker.getHost()) + "</a></td>");
+        out.print("<a href=\"" + taskAttemptTracker + "\">"
+          + tracker.getNode(taskTracker.getHost()) + "</a>");
+      }
+      if (hasCleanupAttempt) {
+        out.print("<br/>Cleanup Attempt: ");
+        if (cleanupAttemptTracker == null ) {
+          out.print(cleanupTrackerName);
+        } else {
+          out.print("<a href=\"" + cleanupAttemptTracker + "\">"
+            + tracker.getNode(cleanupTracker.getHost()) + "</a>");
         }
+      }
+      out.print("</td>");
         out.print("<td>" + status.getRunState() + "</td>");
         out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
           + ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>");
@@ -162,6 +195,9 @@
         						String.valueOf(taskTracker.getHttpPort()),
         						status.getTaskID().toString());
       	}
+        if (hasCleanupAttempt) {
+          out.print("Task attempt: <br/>");
+        }
         if (taskLogUrl == null) {
           out.print("n/a");
         } else {
@@ -172,6 +208,25 @@
           out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
           out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
         }
+        if (hasCleanupAttempt) {
+          out.print("Cleanup attempt: <br/>");
+          taskLogUrl = null;
+          if (cleanupTracker != null ) {
+        	taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(),
+                                String.valueOf(cleanupTracker.getHttpPort()),
+                                status.getTaskID().toString());
+      	  }
+          if (taskLogUrl == null) {
+            out.print("n/a");
+          } else {
+            String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true";
+            String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true";
+            String entireLogUrl = taskLogUrl + "&all=true&cleanup=true";
+            out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+            out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+            out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+          }
+        }
         out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
           + "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
           + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");