فهرست منبع

HADOOP-4759. Removes temporary output directory for failed and killed tasks in the JobTracker's task commit thread. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@741192 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 سال پیش
والد
کامیت
fd8876f327
26فایلهای تغییر یافته به همراه897 افزوده شده و 408 حذف شده
  1. 4 0
      CHANGES.txt
  2. 1 1
      src/mapred/mapred-default.xml
  3. 15 6
      src/mapred/org/apache/hadoop/mapred/Child.java
  4. 5 3
      src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
  5. 2 1
      src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
  6. 3 3
      src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
  7. 4 2
      src/mapred/org/apache/hadoop/mapred/JobHistory.java
  8. 149 61
      src/mapred/org/apache/hadoop/mapred/JobInProgress.java
  9. 33 7
      src/mapred/org/apache/hadoop/mapred/JobTracker.java
  10. 15 28
      src/mapred/org/apache/hadoop/mapred/JvmManager.java
  11. 43 43
      src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
  12. 26 16
      src/mapred/org/apache/hadoop/mapred/MapTask.java
  13. 15 9
      src/mapred/org/apache/hadoop/mapred/ReduceTask.java
  14. 112 54
      src/mapred/org/apache/hadoop/mapred/Task.java
  15. 85 40
      src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
  16. 41 9
      src/mapred/org/apache/hadoop/mapred/TaskLog.java
  17. 19 10
      src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
  18. 14 11
      src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
  19. 10 7
      src/mapred/org/apache/hadoop/mapred/TaskRunner.java
  20. 35 3
      src/mapred/org/apache/hadoop/mapred/TaskStatus.java
  21. 191 79
      src/mapred/org/apache/hadoop/mapred/TaskTracker.java
  22. 4 2
      src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
  23. 2 1
      src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
  24. 4 2
      src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
  25. 2 2
      src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
  26. 63 8
      src/webapps/job/taskdetails.jsp

+ 4 - 0
CHANGES.txt

@@ -759,6 +759,10 @@ Release 0.19.1 - Unreleased
     HADOOP-5034. NameNode should send both replication and deletion requests
     to DataNode in one reply to a heartbeat. (hairong)
 
+    HADOOP-4759. Removes temporary output directory for failed and killed
+    tasks in the JobTracker's task commit thread.
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.19.0 - 2008-11-18
 
   INCOMPATIBLE CHANGES

+ 1 - 1
src/mapred/mapred-default.xml

@@ -312,7 +312,7 @@
 </property>
 
 <property>
-  <name>mapred.tasktracker.sigkillthread.sleeptime-before-sigkill</name>
+  <name>mapred.tasktracker.tasks.sleeptime-before-sigkill</name>
   <value>5000</value>
   <description>The time, in milliseconds, the tasktracker waits for sending a
   SIGKILL to a process, after it has been sent a SIGTERM.

+ 15 - 6
src/mapred/org/apache/hadoop/mapred/Child.java

@@ -47,6 +47,7 @@ class Child {
     LogFactory.getLog(TaskTracker.class);
 
   static volatile TaskAttemptID taskid;
+  static volatile boolean isCleanup;
 
   public static void main(String[] args) throws Throwable {
     LOG.debug("Child starting");
@@ -75,7 +76,7 @@ class Child {
           try {
             Thread.sleep(5000);
             if (taskid != null) {
-              TaskLog.syncLogs(firstTaskid, taskid);
+              TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
             }
           } catch (InterruptedException ie) {
           } catch (IOException iee) {
@@ -94,6 +95,7 @@ class Child {
     Path srcPidPath = null;
     Path dstPidPath = null;
     int idleLoopCount = 0;
+    Task task = null;
     try {
       while (true) {
         JvmTask myTask = umbilical.getTask(jvmId);
@@ -113,20 +115,22 @@ class Child {
           }
         }
         idleLoopCount = 0;
-        Task task = myTask.getTask();
+        task = myTask.getTask();
         taskid = task.getTaskID();
+        isCleanup = task.isTaskCleanupTask();
         
         //create the index file so that the log files 
         //are viewable immediately
-        TaskLog.syncLogs(firstTaskid, taskid);
+        TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
         JobConf job = new JobConf(task.getJobFile());
         if (srcPidPath == null) {
-          srcPidPath = TaskTracker.getPidFilePath(firstTaskid, job);
+          // get the first task's path for the first time
+          srcPidPath = new Path(task.getPidFile());
         }
         //since the JVM is running multiple tasks potentially, we need
         //to do symlink stuff only for the subsequent tasks
         if (!taskid.equals(firstTaskid)) {
-          dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+          dstPidPath = new Path(task.getPidFile());
           FileUtil.symLink(srcPidPath.toUri().getPath(), 
               dstPidPath.toUri().getPath());
         }
@@ -150,8 +154,9 @@ class Child {
         try {
           task.run(job, umbilical);             // run the task
         } finally {
-          TaskLog.syncLogs(firstTaskid, taskid);
+          TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
           if (!taskid.equals(firstTaskid)) {
+            // delete the pid-file's symlink
             new File(dstPidPath.toUri().getPath()).delete();
           }
         }
@@ -164,6 +169,10 @@ class Child {
       umbilical.fsError(taskid, e.getMessage());
     } catch (Throwable throwable) {
       LOG.warn("Error running child", throwable);
+      if (task != null) {
+        // do cleanup for the task
+        task.taskCleanup(umbilical);
+      }
       // Report back any failures, for diagnostic purposes
       ByteArrayOutputStream baos = new ByteArrayOutputStream();
       throwable.printStackTrace(new PrintStream(baos));

+ 5 - 3
src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java

@@ -132,9 +132,11 @@ public class FileOutputCommitter extends OutputCommitter {
   public void abortTask(TaskAttemptContext context) {
     Path taskOutputPath =  getTempTaskOutputPath(context);
     try {
-      FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
-      context.getProgressible().progress();
-      fs.delete(taskOutputPath, true);
+      if (taskOutputPath != null) {
+        FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+        context.getProgressible().progress();
+        fs.delete(taskOutputPath, true);
+      }
     } catch (IOException ie) {
       LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
     }

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java

@@ -59,8 +59,9 @@ interface InterTrackerProtocol extends VersionedProtocol {
    *             in heartbeat method (HADOOP-4305) 
    * Version 23: Added parameter 'initialContact' again in heartbeat method
    *            (HADOOP-4869) 
+   * Version 24: Changed format of Task and TaskStatus for HADOOP-4759 
    */
-  public static final long versionID = 23L;
+  public static final long versionID = 24L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

+ 3 - 3
src/mapred/org/apache/hadoop/mapred/IsolationRunner.java

@@ -180,9 +180,9 @@ public class IsolationRunner {
     FileSystem local = FileSystem.getLocal(conf);
     LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
     File workDirName = new File(lDirAlloc.getLocalPathToRead(
-                                  TaskTracker.getJobCacheSubdir() 
-                                  + Path.SEPARATOR + taskId.getJobID() 
-                                  + Path.SEPARATOR + taskId
+                                  TaskTracker.getLocalTaskDir(
+                                    taskId.getJobID().toString(), 
+                                    taskId.toString())
                                   + Path.SEPARATOR + "work",
                                   conf). toString());
     local.setWorkingDirectory(new Path(workDirName.toString()));

+ 4 - 2
src/mapred/org/apache/hadoop/mapred/JobHistory.java

@@ -1272,7 +1272,8 @@ public class JobHistory {
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
-                                      String.valueOf(httpPort)}); 
+                                      httpPort == -1 ? "" : 
+                                        String.valueOf(httpPort)}); 
         }
       }
     }
@@ -1468,7 +1469,8 @@ public class JobHistory {
                                       taskAttemptId.getTaskID().toString(), 
                                       taskAttemptId.toString(), 
                                       String.valueOf(startTime), trackerName,
-                                      String.valueOf(httpPort)}); 
+                                      httpPort == -1 ? "" : 
+                                        String.valueOf(httpPort)}); 
         }
       }
     }

+ 149 - 61
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -44,6 +44,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.StringUtils;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -113,6 +114,12 @@ class JobInProgress {
 
   // A set of running reduce TIPs
   Set<TaskInProgress> runningReduces;
+  
+  // A list of cleanup tasks for the map task attempts, to be launched
+  List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
+  
+  // A list of cleanup tasks for the reduce task attempts, to be launched
+  List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
   private final int maxLevel;
 
@@ -473,12 +480,12 @@ class JobInProgress {
     // Just assign splits[0]
     cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
             jobtracker, conf, this, numMapTasks);
-    cleanup[0].setCleanupTask();
+    cleanup[0].setJobCleanupTask();
 
     // cleanup reduce tip.
     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks, jobtracker, conf, this);
-    cleanup[1].setCleanupTask();
+    cleanup[1].setJobCleanupTask();
 
     // create two setup tips, one map and one reduce.
     setup = new TaskInProgress[2];
@@ -486,12 +493,12 @@ class JobInProgress {
     // Just assign splits[0]
     setup[0] = new TaskInProgress(jobId, jobFile, splits[0], 
             jobtracker, conf, this, numMapTasks + 1 );
-    setup[0].setSetupTask();
+    setup[0].setJobSetupTask();
 
     // setup reduce tip.
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
                        numReduceTasks + 1, jobtracker, conf, this);
-    setup[1].setSetupTask();
+    setup[1].setJobSetupTask();
     
     synchronized(jobInitKillStatus){
       jobInitKillStatus.initDone = true;
@@ -743,11 +750,27 @@ class JobInProgress {
     if (wasComplete && (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
       status.setRunState(TaskStatus.State.KILLED);
     }
+    
+    // If the job is complete and a task has just reported its 
+    // state as FAILED_UNCLEAN/KILLED_UNCLEAN, 
+    // make the task's state FAILED/KILLED without launching cleanup attempt.
+    // Note that if task is already a cleanup attempt, 
+    // we don't change the state to make sure the task gets a killTaskAction
+    if ((this.isComplete() || jobFailed || jobKilled) && 
+        !tip.isCleanupAttempt(taskid)) {
+      if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.FAILED);
+      } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
+        status.setRunState(TaskStatus.State.KILLED);
+      }
+    }
+    
     boolean change = tip.updateStatus(status);
     if (change) {
       TaskStatus.State state = status.getRunState();
+      // get the TaskTrackerStatus where the task ran 
       TaskTrackerStatus ttStatus = 
-        this.jobtracker.getTaskTracker(status.getTaskTracker());
+        this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
       String httpTaskLogLocation = null; 
 
       if (null != ttStatus){
@@ -768,8 +791,8 @@ class JobInProgress {
                                             taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap() &&
-                                            !tip.isCleanupTask() &&
-                                            !tip.isSetupTask(),
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
@@ -783,6 +806,15 @@ class JobInProgress {
           tip.doCommit(taskid);
         }
         return;
+      } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
+                 state == TaskStatus.State.KILLED_UNCLEAN) {
+        tip.incompleteSubTask(taskid, this.status);
+        // add this task, to be rescheduled as cleanup attempt
+        if (tip.isMapTask()) {
+          mapCleanupTasks.add(taskid);
+        } else {
+          reduceCleanupTasks.add(taskid);
+        }
       }
       //For a failed task update the JT datastructures. 
       else if (state == TaskStatus.State.FAILED ||
@@ -813,8 +845,8 @@ class JobInProgress {
                                             taskid,
                                             tip.idWithinJob(),
                                             status.getIsMap() &&
-                                            !tip.isCleanupTask() &&
-                                            !tip.isSetupTask(),
+                                            !tip.isJobCleanupTask() &&
+                                            !tip.isJobSetupTask(),
                                             taskCompletionStatus, 
                                             httpTaskLogLocation
                                            );
@@ -843,7 +875,7 @@ class JobInProgress {
                  oldProgress + " to " + tip.getProgress());
     }
     
-    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       double progressDelta = tip.getProgress() - oldProgress;
       if (tip.isMapTask()) {
         if (maps.length == 0) {
@@ -941,6 +973,40 @@ class JobInProgress {
     return result;
   }    
 
+  /*
+   * Return task cleanup attempt if any, to run on a given tracker
+   */
+  public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
+                                                 boolean isMapSlot)
+  throws IOException {
+    if (this.status.getRunState() != JobStatus.RUNNING || 
+        jobFailed || jobKilled) {
+      return null;
+    }
+    
+    String taskTracker = tts.getTrackerName();
+    if (!shouldRunOnTaskTracker(taskTracker)) {
+      return null;
+    }
+    TaskAttemptID taskid = null;
+    TaskInProgress tip = null;
+    if (isMapSlot) {
+      if (!mapCleanupTasks.isEmpty()) {
+        taskid = mapCleanupTasks.remove(0);
+        tip = maps[taskid.getTaskID().getId()];
+      }
+    } else {
+      if (!reduceCleanupTasks.isEmpty()) {
+        taskid = reduceCleanupTasks.remove(0);
+        tip = reduces[taskid.getTaskID().getId()];
+      }
+    }
+    if (tip != null) {
+      return tip.addRunningTask(taskid, taskTracker, true);
+    }
+    return null;
+  }
+  
   public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
                                                      int clusterSize, 
                                                      int numUniqueHosts)
@@ -991,7 +1057,7 @@ class JobInProgress {
    * Return a CleanupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public Task obtainCleanupTask(TaskTrackerStatus tts, 
+  public Task obtainJobCleanupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
@@ -1001,7 +1067,7 @@ class JobInProgress {
     }
     
     synchronized(this) {
-      if (!canLaunchCleanupTask()) {
+      if (!canLaunchJobCleanupTask()) {
         return null;
       }
       
@@ -1042,7 +1108,7 @@ class JobInProgress {
    * or all maps and reduces are complete
    * @return true/false
    */
-  private synchronized boolean canLaunchCleanupTask() {
+  private synchronized boolean canLaunchJobCleanupTask() {
     if (!tasksInited.get()) {
       return false;
     }
@@ -1073,7 +1139,7 @@ class JobInProgress {
    * Return a SetupTask, if appropriate, to run on the given tasktracker
    * 
    */
-  public Task obtainSetupTask(TaskTrackerStatus tts, 
+  public Task obtainJobSetupTask(TaskTrackerStatus tts, 
                                              int clusterSize, 
                                              int numUniqueHosts,
                                              boolean isMapSlot
@@ -1197,10 +1263,10 @@ class JobInProgress {
     String name;
     String splits = "";
     Enum counter = null;
-    if (tip.isSetupTask()) {
+    if (tip.isJobSetupTask()) {
       launchedSetup = true;
       name = Values.SETUP.name();
-    } else if (tip.isCleanupTask()) {
+    } else if (tip.isJobCleanupTask()) {
       launchedCleanup = true;
       name = Values.CLEANUP.name();
     } else if (tip.isMapTask()) {
@@ -1223,7 +1289,7 @@ class JobInProgress {
       JobHistory.Task.logStarted(tip.getTIPId(), name,
                                  tip.getExecStartTime(), splits);
     }
-    if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+    if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       jobCounters.incrCounter(counter, 1);
     }
     
@@ -1244,7 +1310,7 @@ class JobInProgress {
     //
     // So to simplify, increment the data locality counter whenever there is 
     // data locality.
-    if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
+    if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
       // increment the data locality counter for maps
       Node tracker = jobtracker.getNode(tts.getHost());
       int level = this.maxLevel;
@@ -1924,8 +1990,8 @@ class JobInProgress {
     TaskTrackerStatus ttStatus = 
       this.jobtracker.getTaskTracker(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
-    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isSetupTask() ? Values.SETUP.name() :
+    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isJobSetupTask() ? Values.SETUP.name() :
                       tip.isMapTask() ? Values.MAP.name() : 
                       Values.REDUCE.name();
     if (status.getIsMap()){
@@ -1955,14 +2021,14 @@ class JobInProgress {
                                 status.getCounters()); 
         
     int newNumAttempts = tip.getActiveTasks().size();
-    if (tip.isSetupTask()) {
+    if (tip.isJobSetupTask()) {
       // setup task has finished. kill the extra setup tip
       killSetupTip(!tip.isMapTask());
       // Job can start running now.
       this.status.setSetupProgress(1.0f);
       this.status.setRunState(JobStatus.RUNNING);
       JobHistory.JobInfo.logStarted(profile.getJobID());
-    } else if (tip.isCleanupTask()) {
+    } else if (tip.isJobCleanupTask()) {
       // cleanup task has finished. Kill the extra cleanup tip
       if (tip.isMapTask()) {
         // kill the reduce tip
@@ -2097,6 +2163,8 @@ class JobInProgress {
         }
         jobKilled = true;
       }
+      // clear all unclean tasks
+      clearUncleanTasks();
       //
       // kill all TIPs.
       //
@@ -2111,7 +2179,22 @@ class JobInProgress {
       }
     }
   }
-  
+
+  private void clearUncleanTasks() {
+    TaskAttemptID taskid = null;
+    TaskInProgress tip = null;
+    while (!mapCleanupTasks.isEmpty()) {
+      taskid = mapCleanupTasks.remove(0);
+      tip = maps[taskid.getTaskID().getId()];
+      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+    }
+    while (!reduceCleanupTasks.isEmpty()) {
+      taskid = reduceCleanupTasks.remove(0);
+      tip = reduces[taskid.getTaskID().getId()];
+      updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+    }
+  }
+
   /**
    * Kill the job and all its component tasks. This method is called from 
    * jobtracker and should return fast as it locks the jobtracker.
@@ -2162,16 +2245,16 @@ class JobInProgress {
     boolean wasFailed = tip.isFailed();
 
     // Mark the taskid as FAILED or KILLED
-    tip.incompleteSubTask(taskid, taskTrackerStatus, this.status);
+    tip.incompleteSubTask(taskid, this.status);
    
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();
         
     //update running  count on task failure.
     if (wasRunning && !isRunning) {
-      if (tip.isCleanupTask()) {
+      if (tip.isJobCleanupTask()) {
         launchedCleanup = false;
-      } else if (tip.isSetupTask()) {
+      } else if (tip.isJobSetupTask()) {
         launchedSetup = false;
       } else if (tip.isMapTask()) {
         runningMapTasks -= 1;
@@ -2208,43 +2291,48 @@ class JobInProgress {
     }
         
     // update job history
-    String taskTrackerName = taskTrackerStatus.getHost();
-    long finishTime = status.getFinishTime();
-    String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
-                      tip.isSetupTask() ? Values.SETUP.name() :
+    // get taskStatus from tip
+    TaskStatus taskStatus = tip.getTaskStatus(taskid);
+    String taskTrackerName = taskStatus.getTaskTracker();
+    String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
+    int taskTrackerPort = -1;
+    if (taskTrackerStatus != null) {
+      taskTrackerPort = taskTrackerStatus.getHttpPort();
+    }
+    long startTime = taskStatus.getStartTime();
+    long finishTime = taskStatus.getFinishTime();
+    List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
+    String diagInfo = taskDiagnosticInfo == null ? "" :
+      StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
+    String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+                      tip.isJobSetupTask() ? Values.SETUP.name() :
                       tip.isMapTask() ? Values.MAP.name() : 
                       Values.REDUCE.name();
-    if (status.getIsMap()) {
-      JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          taskType);
-      if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+    if (taskStatus.getIsMap()) {
+      JobHistory.MapAttempt.logStarted(taskid, startTime, 
+        taskTrackerName, taskTrackerPort, taskType);
+      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.MapAttempt.logFailed(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       } else {
-        JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(),
-                taskType);
+        JobHistory.MapAttempt.logKilled(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       }
     } else {
-      JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(), 
-          status.getTaskTracker(), taskTrackerStatus.getHttpPort(), 
-          taskType);
-      if (status.getRunState() == TaskStatus.State.FAILED) {
-        JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+      JobHistory.ReduceAttempt.logStarted(taskid, startTime, 
+        taskTrackerName, taskTrackerPort, taskType);
+      if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+        JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       } else {
-        JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
-                taskTrackerName, status.getDiagnosticInfo(), 
-                taskType);
+        JobHistory.ReduceAttempt.logKilled(taskid, finishTime,
+          taskTrackerHostName, diagInfo, taskType);
       }
     }
         
     // After this, try to assign tasks with the one after this, so that
     // the failed task goes to the end of the list.
-    if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+    if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
       if (tip.isMapTask()) {
         failedMapTasks++;
       } else {
@@ -2256,7 +2344,7 @@ class JobInProgress {
     // Note down that a task has failed on this tasktracker 
     //
     if (status.getRunState() == TaskStatus.State.FAILED) { 
-      addTrackerTaskFailure(taskTrackerStatus.getTrackerName());
+      addTrackerTaskFailure(taskTrackerName);
     }
         
     //
@@ -2274,7 +2362,7 @@ class JobInProgress {
       // Allow upto 'mapFailuresPercent' of map tasks to fail or
       // 'reduceFailuresPercent' of reduce tasks to fail
       //
-      boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
+      boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
                         tip.isMapTask() ? 
             ((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
             ((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -2283,9 +2371,9 @@ class JobInProgress {
         LOG.info("Aborting job " + profile.getJobID());
         JobHistory.Task.logFailed(tip.getTIPId(), 
                                   taskType,  
-                                  status.getFinishTime(), 
-                                  status.getDiagnosticInfo());
-        if (tip.isCleanupTask()) {
+                                  finishTime, 
+                                  diagInfo);
+        if (tip.isJobCleanupTask()) {
           // kill the other tip
           if (tip.isMapTask()) {
             cleanup[1].kill();
@@ -2294,7 +2382,7 @@ class JobInProgress {
           }
           terminateJob(JobStatus.FAILED);
         } else {
-          if (tip.isSetupTask()) {
+          if (tip.isJobSetupTask()) {
             // kill the other tip
             killSetupTip(!tip.isMapTask());
           }
@@ -2305,7 +2393,7 @@ class JobInProgress {
       //
       // Update the counters
       //
-      if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+      if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
         if (tip.isMapTask()) {
           jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
         } else {
@@ -2344,8 +2432,8 @@ class JobInProgress {
     status.setFinishTime(System.currentTimeMillis());
     updateTaskStatus(tip, status, metrics);
     JobHistory.Task.logFailed(tip.getTIPId(), 
-                              tip.isCleanupTask() ? Values.CLEANUP.name() : 
-                              tip.isSetupTask() ? Values.SETUP.name() : 
+                              tip.isJobCleanupTask() ? Values.CLEANUP.name() : 
+                              tip.isJobSetupTask() ? Values.SETUP.name() : 
                               tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(), 
                               tip.getExecFinishTime(), reason, taskid); 
   }

+ 33 - 7
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1654,7 +1654,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   // and TaskInProgress
   ///////////////////////////////////////////////////////
   void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
-    LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");
+    LOG.info("Adding task " + 
+      (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") + 
+      "'"  + taskid + "' to tip " + 
+      tip.getTIPId() + ", for tracker '" + taskTracker + "'");
 
     // taskid --> tracker
     taskidToTrackerMap.put(taskid, taskTracker);
@@ -1736,6 +1739,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING && 
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+            taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
@@ -1746,6 +1751,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       for (TaskStatus taskStatus : tip.getTaskStatuses()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+            taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
           markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
                                    taskStatus.getTaskID());
@@ -2559,7 +2566,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, true);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2568,7 +2575,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+          t = job.obtainTaskCleanupTask(taskTracker, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
                                   numUniqueHosts, true);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2579,7 +2594,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+          t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, false);
           if (t != null) {
             return Collections.singletonList(t);
@@ -2588,7 +2603,15 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         for (Iterator<JobInProgress> it = jobs.values().iterator();
              it.hasNext();) {
           JobInProgress job = it.next();
-          t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+          t = job.obtainTaskCleanupTask(taskTracker, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
                                     numUniqueHosts, false);
           if (t != null) {
             return Collections.singletonList(t);
@@ -3137,7 +3160,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         // And completed maps with zero reducers of the job 
         // never need to be failed. 
         if (!tip.isComplete() || 
-            (tip.isMapTask() && !tip.isSetupTask() && 
+            (tip.isMapTask() && !tip.isJobSetupTask() && 
              job.desiredReduces() != 0)) {
           // if the job is done, we don't want to change anything
           if (job.getStatus().getRunState() == JobStatus.RUNNING ||
@@ -3146,7 +3169,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
                            (tip.isMapTask() ? 
                                TaskStatus.Phase.MAP : 
                                TaskStatus.Phase.REDUCE), 
-                           TaskStatus.State.KILLED, trackerName, myInstrumentation);
+                            tip.isRunningTask(taskId) ? 
+                              TaskStatus.State.KILLED_UNCLEAN : 
+                              TaskStatus.State.KILLED,
+                            trackerName, myInstrumentation);
             jobsWithFailures.add(job);
           }
         } else {

+ 15 - 28
src/mapred/org/apache/hadoop/mapred/JvmManager.java

@@ -34,9 +34,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.Shell.ShellCommandExecutor;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ProcessTree;
-import org.apache.hadoop.util.ProcfsBasedProcessTree;
 
 class JvmManager {
 
@@ -47,8 +45,6 @@ class JvmManager {
 
   JvmManagerForType reduceJvmManager;
   
-  TaskTracker tracker;
-
   public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
       File stdout,File stderr,long logSize, File workDir, 
       Map<String,String> env, String pidFile, JobConf conf) {
@@ -57,10 +53,9 @@ class JvmManager {
   
   public JvmManager(TaskTracker tracker) {
     mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(), 
-        true, tracker);
+        true);
     reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
-        false, tracker);
-    this.tracker = tracker;
+        false);
   }
   
   public void stop() {
@@ -78,9 +73,9 @@ class JvmManager {
 
   public void launchJvm(TaskRunner t, JvmEnv env) {
     if (t.getTask().isMapTask()) {
-      mapJvmManager.reapJvm(t, tracker, env);
+      mapJvmManager.reapJvm(t, env);
     } else {
-      reduceJvmManager.reapJvm(t, tracker, env);
+      reduceJvmManager.reapJvm(t, env);
     }
   }
 
@@ -129,12 +124,10 @@ class JvmManager {
     boolean isMap;
     
     Random rand = new Random(System.currentTimeMillis());
-    TaskTracker tracker;
 
-    public JvmManagerForType(int maxJvms, boolean isMap, TaskTracker tracker) {
+    public JvmManagerForType(int maxJvms, boolean isMap) {
       this.maxJvms = maxJvms;
       this.isMap = isMap;
-      this.tracker = tracker;
     }
 
     synchronized public void setRunningTaskForJvm(JVMId jvmId, 
@@ -198,7 +191,7 @@ class JvmManager {
       jvmIdToRunner.remove(jvmId);
     }
     private synchronized void reapJvm( 
-        TaskRunner t, TaskTracker tracker, JvmEnv env) {
+        TaskRunner t, JvmEnv env) {
       if (t.getTaskInProgress().wasKilled()) {
         //the task was killed in-flight
         //no need to do the rest of the operations
@@ -255,7 +248,7 @@ class JvmManager {
           LOG.info("Killing JVM: " + runnerToKill.jvmId);
           runnerToKill.kill();
         }
-        spawnNewJvm(jobId, env, tracker, t);
+        spawnNewJvm(jobId, env, t);
         return;
       }
       //*MUST* never reach this
@@ -285,7 +278,7 @@ class JvmManager {
       return details.toString();
     }
 
-    private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker, 
+    private void spawnNewJvm(JobID jobId, JvmEnv env,  
         TaskRunner t) {
       JvmRunner jvmRunner = new JvmRunner(env,jobId);
       jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
@@ -297,11 +290,6 @@ class JvmManager {
       //tasks. Doing it this way also keeps code simple.
       jvmRunner.setDaemon(true);
       jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
-      if (tracker.isTaskMemoryManagerEnabled()) {
-        tracker.getTaskMemoryManager().addTask(
-            TaskAttemptID.forName(env.conf.get("mapred.task.id")),
-            tracker.getVirtualMemoryForTask(env.conf));
-      }
       setRunningTaskForJvm(jvmRunner.jvmId, t);
       LOG.info(jvmRunner.getName());
       jvmRunner.start();
@@ -359,6 +347,7 @@ class JvmManager {
           LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " + 
               numTasksRan);
           try {
+            // In case of jvm-reuse,
             //the task jvm cleans up the common workdir for every 
             //task at the beginning of each task in the task JVM.
             //For the last task, we do it here.
@@ -366,9 +355,6 @@ class JvmManager {
               FileUtil.fullyDelete(env.workDir);
             }
           } catch (IOException ie){}
-          // Remove the associated pid-file, if any
-          tracker.removePidFile(TaskAttemptID.forName(
-                   env.conf.get("mapred.task.id")));
         }
       }
 
@@ -380,18 +366,19 @@ class JvmManager {
         if (shexec != null) {
           Process process = shexec.getProcess();
           if (process != null) {
-            Path pidFilePath = TaskTracker.getPidFilePath(
-                        TaskAttemptID.forName(env.conf.get("mapred.task.id")),
-                        env.conf);
+            Path pidFilePath = new Path(env.pidFile);
             String pid = ProcessTree.getPidFromPidFile(
                                                     pidFilePath.toString());
 
             long sleeptimeBeforeSigkill = env.conf.getLong(
-                 "mapred.tasktracker.sigkillthread.sleeptime-before-sigkill",
+                 "mapred.tasktracker.tasks.sleeptime-before-sigkill",
                  ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
 
             ProcessTree.destroy(pid, sleeptimeBeforeSigkill,
-                     ProcessTree.isSetsidAvailable, true/*in the background*/);
+                     ProcessTree.isSetsidAvailable, false);
+            try {
+              LOG.info("Process exited with exit code:" + process.waitFor());
+            } catch (InterruptedException ie) {}
           }
         }
         removeJvm(jvmId);

+ 43 - 43
src/mapred/org/apache/hadoop/mapred/MapOutputFile.java

@@ -30,13 +30,13 @@ import org.apache.hadoop.fs.Path;
 class MapOutputFile {
 
   private JobConf conf;
-  private String jobDir;
+  private JobID jobId;
   
   MapOutputFile() {
   }
 
   MapOutputFile(JobID jobId) {
-    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+    this.jobId = jobId;
   }
 
   private LocalDirAllocator lDirAlloc = 
@@ -47,9 +47,9 @@ class MapOutputFile {
    */
   public Path getOutputFile(TaskAttemptID mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/file.out", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out", conf);
   }
 
   /** Create a local map output file name.
@@ -58,9 +58,9 @@ class MapOutputFile {
    */
   public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/file.out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out", size, conf);
   }
 
   /** Return the path to a local map output index file created earlier
@@ -68,9 +68,9 @@ class MapOutputFile {
    */
   public Path getOutputIndexFile(TaskAttemptID mapTaskId)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/file.out.index", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out.index", conf);
   }
 
   /** Create a local map output index file name.
@@ -79,10 +79,10 @@ class MapOutputFile {
    */
   public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
     throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/file.out.index", 
-                                          size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/file.out.index", 
+                       size, conf);
   }
 
   /** Return a local map spill file created earlier.
@@ -91,10 +91,10 @@ class MapOutputFile {
    */
   public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/spill" 
-                                        + spillNumber + ".out", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" 
+                       + spillNumber + ".out", conf);
   }
 
   /** Create a local map spill file name.
@@ -104,10 +104,10 @@ class MapOutputFile {
    */
   public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber, 
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/spill" + 
-                                          spillNumber + ".out", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + 
+                       spillNumber + ".out", size, conf);
   }
 
   /** Return a local map spill index file created earlier
@@ -116,10 +116,10 @@ class MapOutputFile {
    */
   public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
     throws IOException {
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        mapTaskId + Path.SEPARATOR +
-                                        "output" + "/spill" + 
-                                        spillNumber + ".out.index", conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + 
+                       spillNumber + ".out.index", conf);
   }
 
   /** Create a local map spill index file name.
@@ -129,10 +129,10 @@ class MapOutputFile {
    */
   public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
          long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          mapTaskId + Path.SEPARATOR +
-                                          "output" + "/spill" + spillNumber + 
-                                          ".out.index", size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), mapTaskId.toString())
+                       + "/spill" + spillNumber + 
+                       ".out.index", size, conf);
   }
 
   /** Return a local reduce input file created earlier
@@ -142,10 +142,10 @@ class MapOutputFile {
   public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
-                                        reduceTaskId + Path.SEPARATOR + 
-                                        "output" + "/map_" + mapId + ".out",
-                                        conf);
+    return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), reduceTaskId.toString())
+                       + "/map_" + mapId + ".out",
+                       conf);
   }
 
   /** Create a local reduce input file name.
@@ -157,17 +157,17 @@ class MapOutputFile {
                                    long size)
     throws IOException {
     // TODO *oom* should use a format here
-    return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
-                                          reduceTaskId + Path.SEPARATOR +
-                                          ("output" + "/map_" + mapId.getId() + 
-                                           ".out"), 
-                                          size, conf);
+    return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+                       jobId.toString(), reduceTaskId.toString())
+                       + "/map_" + mapId.getId() + ".out", 
+                       size, conf);
   }
 
   /** Removes all of the files related to a task. */
   public void removeAll(TaskAttemptID taskId) throws IOException {
-    conf.deleteLocalFiles(jobDir + Path.SEPARATOR +
-                          taskId + Path.SEPARATOR + "output");
+    conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
+                          jobId.toString(), taskId.toString())
+);
   }
 
   public void setConf(Configuration conf) {
@@ -179,7 +179,7 @@ class MapOutputFile {
   }
   
   public void setJobId(JobID jobId) {
-    this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+    this.jobId = jobId;
   }
 
 }

+ 26 - 16
src/mapred/org/apache/hadoop/mapred/MapTask.java

@@ -103,13 +103,15 @@ class MapTask extends Task {
   @Override
   public void localizeConfiguration(JobConf conf) throws IOException {
     super.localizeConfiguration(conf);
-    Path localSplit = new Path(new Path(getJobFile()).getParent(), 
-                               "split.dta");
-    LOG.debug("Writing local split to " + localSplit);
-    DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
-    Text.writeString(out, splitClass);
-    split.write(out);
-    out.close();
+    if (isMapOrReduce()) {
+      Path localSplit = new Path(new Path(getJobFile()).getParent(), 
+                                 "split.dta");
+      LOG.debug("Writing local split to " + localSplit);
+      DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
+      Text.writeString(out, splitClass);
+      split.write(out);
+      out.close();
+    }
   }
   
   @Override
@@ -121,16 +123,20 @@ class MapTask extends Task {
   @Override
   public void write(DataOutput out) throws IOException {
     super.write(out);
-    Text.writeString(out, splitClass);
-    split.write(out);
-    split = null;
+    if (isMapOrReduce()) {
+      Text.writeString(out, splitClass);
+      split.write(out);
+      split = null;
+    }
   }
   
   @Override
   public void readFields(DataInput in) throws IOException {
     super.readFields(in);
-    splitClass = Text.readString(in);
-    split.readFields(in);
+    if (isMapOrReduce()) {
+      splitClass = Text.readString(in);
+      split.readFields(in);
+    }
   }
 
   /**
@@ -280,12 +286,16 @@ class MapTask extends Task {
     initialize(job, getJobID(), reporter, useNewApi);
 
     // check if it is a cleanupJobTask
-    if (cleanupJob) {
-      runCleanup(umbilical, reporter);
+    if (jobCleanup) {
+      runJobCleanupTask(umbilical, reporter);
+      return;
+    }
+    if (jobSetup) {
+      runJobSetupTask(umbilical, reporter);
       return;
     }
-    if (setupJob) {
-      runSetupJob(umbilical, reporter);
+    if (taskCleanup) {
+      runTaskCleanupTask(umbilical, reporter);
       return;
     }
 

+ 15 - 9
src/mapred/org/apache/hadoop/mapred/ReduceTask.java

@@ -343,7 +343,7 @@ class ReduceTask extends Task {
     throws IOException, InterruptedException, ClassNotFoundException {
     job.setBoolean("mapred.skip.on", isSkipping());
 
-    if (!cleanupJob && !setupJob) {
+    if (isMapOrReduce()) {
       copyPhase = getProgress().addPhase("copy");
       sortPhase  = getProgress().addPhase("sort");
       reducePhase = getProgress().addPhase("reduce");
@@ -355,12 +355,16 @@ class ReduceTask extends Task {
     initialize(job, getJobID(), reporter, useNewApi);
 
     // check if it is a cleanupJobTask
-    if (cleanupJob) {
-      runCleanup(umbilical, reporter);
+    if (jobCleanup) {
+      runJobCleanupTask(umbilical, reporter);
       return;
     }
-    if (setupJob) {
-      runSetupJob(umbilical, reporter);
+    if (jobSetup) {
+      runJobSetupTask(umbilical, reporter);
+      return;
+    }
+    if (taskCleanup) {
+      runTaskCleanupTask(umbilical, reporter);
       return;
     }
     
@@ -383,6 +387,7 @@ class ReduceTask extends Task {
     }
     copyPhase.complete();                         // copy is already complete
     setPhase(TaskStatus.Phase.SORT);
+    statusUpdate(umbilical);
 
     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
     RawKeyValueIterator rIter = isLocal
@@ -398,6 +403,7 @@ class ReduceTask extends Task {
     
     sortPhase.complete();                         // sort is complete
     setPhase(TaskStatus.Phase.REDUCE); 
+    statusUpdate(umbilical);
     Class keyClass = job.getMapOutputKeyClass();
     Class valueClass = job.getMapOutputValueClass();
     RawComparator comparator = job.getOutputValueGroupingComparator();
@@ -1267,10 +1273,10 @@ class ReduceTask extends Task {
         // else, we will check the localFS to find a suitable final location
         // for this path
         TaskAttemptID reduceId = reduceTask.getTaskID();
-        Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
-                                 Path.SEPARATOR + getTaskID().getJobID() +
-                                 Path.SEPARATOR + reduceId +
-                                 Path.SEPARATOR + "output" + "/map_" +
+        Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
+                                 reduceId.getJobID().toString(),
+                                 reduceId.toString()) 
+                                 + "/map_" +
                                  loc.getTaskId().getId() + ".out");
         
         // Copy the map output to a temp file whose name is unique to this attempt 

+ 112 - 54
src/mapred/org/apache/hadoop/mapred/Task.java

@@ -110,8 +110,9 @@ abstract class Task implements Writable, Configurable {
   private TaskAttemptID taskId;                   // unique, includes job id
   private int partition;                          // id within job
   TaskStatus taskStatus;                          // current status of the task
-  protected boolean cleanupJob = false;
-  protected boolean setupJob = false;
+  protected boolean jobCleanup = false;
+  protected boolean jobSetup = false;
+  protected boolean taskCleanup = false;
   
   //skip ranges based on failed ranges from previous attempts
   private SortedRanges skipRanges = new SortedRanges();
@@ -131,8 +132,8 @@ abstract class Task implements Writable, Configurable {
   protected TaskAttemptContext taskContext;
   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
-  private volatile boolean commitPending = false;
   protected final Counters.Counter spilledRecordsCounter;
+  private String pidFile = "";
 
   ////////////////////////////////////////////
   // Constructors
@@ -168,6 +169,12 @@ abstract class Task implements Writable, Configurable {
   public String getJobFile() { return jobFile; }
   public TaskAttemptID getTaskID() { return taskId; }
   Counters getCounters() { return counters; }
+  public void setPidFile(String pidFile) { 
+    this.pidFile = pidFile; 
+  }
+  public String getPidFile() { 
+    return pidFile; 
+  }
   
   /**
    * Get the job name for this task.
@@ -244,15 +251,50 @@ abstract class Task implements Writable, Configurable {
   }
 
   /**
-   * Sets whether the task is cleanup task
+   * Return current state of the task. 
+   * needs to be synchronized as communication thread 
+   * sends the state every second
+   * @return
+   */
+  synchronized TaskStatus.State getState(){
+    return this.taskStatus.getRunState(); 
+  }
+  /**
+   * Set current state of the task. 
+   * @param state
    */
-  public void setCleanupTask() {
-    cleanupJob = true;
+  synchronized void setState(TaskStatus.State state){
+    this.taskStatus.setRunState(state); 
   }
 
-  public void setSetupTask() {
-    setupJob = true; 
+  void setTaskCleanupTask() {
+    taskCleanup = true;
   }
+	   
+  boolean isTaskCleanupTask() {
+    return taskCleanup;
+  }
+
+  boolean isJobCleanupTask() {
+    return jobCleanup;
+  }
+
+  boolean isJobSetupTask() {
+    return jobSetup;
+  }
+
+  void setJobSetupTask() {
+    jobSetup = true; 
+  }
+
+  void setJobCleanupTask() {
+    jobCleanup = true; 
+  }
+
+  boolean isMapOrReduce() {
+    return !jobSetup && !jobCleanup && !taskCleanup;
+  }
+  
   ////////////////////////////////////////////
   // Writable methods
   ////////////////////////////////////////////
@@ -264,10 +306,13 @@ abstract class Task implements Writable, Configurable {
     taskStatus.write(out);
     skipRanges.write(out);
     out.writeBoolean(skipping);
-    out.writeBoolean(cleanupJob);
-    out.writeBoolean(setupJob);
+    out.writeBoolean(jobCleanup);
+    out.writeBoolean(jobSetup);
     out.writeBoolean(writeSkipRecs);
+    out.writeBoolean(taskCleanup);  
+    Text.writeString(out, pidFile);
   }
+  
   public void readFields(DataInput in) throws IOException {
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
@@ -278,9 +323,14 @@ abstract class Task implements Writable, Configurable {
     currentRecIndexIterator = skipRanges.skipRangeIterator();
     currentRecStartIndex = currentRecIndexIterator.next();
     skipping = in.readBoolean();
-    cleanupJob = in.readBoolean();
-    setupJob = in.readBoolean();
+    jobCleanup = in.readBoolean();
+    jobSetup = in.readBoolean();
     writeSkipRecs = in.readBoolean();
+    taskCleanup = in.readBoolean();
+    if (taskCleanup) {
+      setPhase(TaskStatus.Phase.CLEANUP);
+    }
+    pidFile = Text.readString(in);
   }
 
   @Override
@@ -332,6 +382,9 @@ abstract class Task implements Writable, Configurable {
                                                    InterruptedException {
     jobContext = new JobContext(job, id, reporter);
     taskContext = new TaskAttemptContext(job, taskId, reporter);
+    if (getState() == TaskStatus.State.UNASSIGNED) {
+      setState(TaskStatus.State.RUNNING);
+    }
     if (useNewApi) {
       LOG.debug("using new api for output committer");
       outputFormat =
@@ -461,17 +514,10 @@ abstract class Task implements Writable, Configurable {
           if (sendProgress) {
             // we need to send progress update
             updateCounters();
-            if (commitPending) {
-              taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
-                                      taskProgress.get(),
-                                      taskProgress.toString(), 
-                                      counters);
-            } else {
-              taskStatus.statusUpdate(TaskStatus.State.RUNNING,
-                                      taskProgress.get(),
-                                      taskProgress.toString(), 
-                                      counters);
-            }
+            taskStatus.statusUpdate(getState(),
+                                    taskProgress.get(),
+                                    taskProgress.toString(), 
+                                    counters);
             taskFound = umbilical.statusUpdate(taskId, taskStatus);
             taskStatus.clearStatus();
           }
@@ -604,8 +650,7 @@ abstract class Task implements Writable, Configurable {
     boolean commitRequired = committer.needsTaskCommit(taskContext);
     if (commitRequired) {
       int retries = MAX_RETRIES;
-      taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
-      commitPending = true;
+      setState(TaskStatus.State.COMMIT_PENDING);
       // say the task tracker that task is commit pending
       while (true) {
         try {
@@ -631,37 +676,21 @@ abstract class Task implements Writable, Configurable {
     sendDone(umbilical);
   }
 
-  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  protected void statusUpdate(TaskUmbilicalProtocol umbilical) 
   throws IOException {
-    //first wait for the COMMIT approval from the tasktracker
     int retries = MAX_RETRIES;
     while (true) {
       try {
-        // send a final status report
-        if (commitPending) {
-          taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
-                                  taskProgress.get(),
-                                  taskProgress.toString(), 
-                                  counters);
-        } else {
-          taskStatus.statusUpdate(TaskStatus.State.RUNNING,
-                                  taskProgress.get(),
-                                  taskProgress.toString(), 
-                                  counters);
-        }
-
-        try {
-          if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
-            LOG.warn("Parent died.  Exiting "+taskId);
-            System.exit(66);
-          }
-          taskStatus.clearStatus();
-          return;
-        } catch (InterruptedException ie) {
-          Thread.currentThread().interrupt(); // interrupt ourself
+        if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+          LOG.warn("Parent died.  Exiting "+taskId);
+          System.exit(66);
         }
+        taskStatus.clearStatus();
+        return;
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt(); // interrupt ourself
       } catch (IOException ie) {
-        LOG.warn("Failure sending last status update: " + 
+        LOG.warn("Failure sending status update: " + 
                   StringUtils.stringifyException(ie));
         if (--retries == 0) {
           throw ie;
@@ -669,6 +698,16 @@ abstract class Task implements Writable, Configurable {
       }
     }
   }
+  
+  private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // send a final status report
+    taskStatus.statusUpdate(getState(),
+                            taskProgress.get(),
+                            taskProgress.toString(), 
+                            counters);
+    statusUpdate(umbilical);
+  }
 
   private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
     int retries = MAX_RETRIES;
@@ -735,18 +774,37 @@ abstract class Task implements Writable, Configurable {
     }
   }
 
-  protected void runCleanup(TaskUmbilicalProtocol umbilical,
-                            TaskReporter reporter
-                            ) throws IOException, InterruptedException {
+  protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical,
+                                TaskReporter reporter) 
+  throws IOException, InterruptedException {
+    taskCleanup(umbilical);
+    done(umbilical, reporter);
+  }
+
+  void taskCleanup(TaskUmbilicalProtocol umbilical) 
+  throws IOException {
+    // set phase for this task
+    setPhase(TaskStatus.Phase.CLEANUP);
+    getProgress().setStatus("cleanup");
+    statusUpdate(umbilical);
+    LOG.info("Runnning cleanup for the task");
+    // do the cleanup
+    discardOutput(taskContext);
+  }
+
+  protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
+                               TaskReporter reporter
+                              ) throws IOException, InterruptedException {
     // set phase for this task
     setPhase(TaskStatus.Phase.CLEANUP);
     getProgress().setStatus("cleanup");
+    statusUpdate(umbilical);
     // do the cleanup
     committer.cleanupJob(jobContext);
     done(umbilical, reporter);
   }
 
-  protected void runSetupJob(TaskUmbilicalProtocol umbilical,
+  protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
                              TaskReporter reporter
                              ) throws IOException, InterruptedException {
     // do the setup

+ 85 - 40
src/mapred/org/apache/hadoop/mapred/TaskInProgress.java

@@ -82,8 +82,8 @@ class TaskInProgress {
   private long maxSkipRecords = 0;
   private FailedRanges failedRanges = new FailedRanges();
   private volatile boolean skipping = false;
-  private boolean cleanup = false; 
-  private boolean setup = false;
+  private boolean jobCleanup = false; 
+  private boolean jobSetup = false;
    
   // The 'next' usable taskid of this tip
   int nextTaskId = 0;
@@ -103,8 +103,10 @@ class TaskInProgress {
   private TreeMap<TaskAttemptID,TaskStatus> taskStatuses = 
     new TreeMap<TaskAttemptID,TaskStatus>();
 
-  // Map from taskId -> Task
-  private Map<TaskAttemptID, Task> tasks = new TreeMap<TaskAttemptID, Task>();
+  // Map from taskId -> TaskTracker Id, 
+  // contains cleanup attempts and where they ran, if any
+  private TreeMap<TaskAttemptID, String> cleanupTasks =
+    new TreeMap<TaskAttemptID, String>();
 
   private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
   private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
@@ -174,20 +176,20 @@ class TaskInProgress {
     return partition;
   }    
 
-  public boolean isCleanupTask() {
-   return cleanup;
+  public boolean isJobCleanupTask() {
+   return jobCleanup;
   }
   
-  public void setCleanupTask() {
-    cleanup = true;
+  public void setJobCleanupTask() {
+    jobCleanup = true;
   }
 
-  public boolean isSetupTask() {
-    return setup;
+  public boolean isJobSetupTask() {
+    return jobSetup;
   }
 	  
-  public void setSetupTask() {
-    setup = true;
+  public void setJobSetupTask() {
+    jobSetup = true;
   }
 
   public boolean isOnlyCommitPending() {
@@ -274,15 +276,6 @@ class TaskInProgress {
     return rawSplit != null;
   }
     
-  /**
-   * Return the Task object associated with a taskId
-   * @param taskId
-   * @return
-   */  
-  public Task getTask(TaskAttemptID taskId) {
-    return tasks.get(taskId);
-  }
-
   /**
    * Is the Task associated with taskid is the first attempt of the tip? 
    * @param taskId
@@ -392,7 +385,8 @@ class TaskInProgress {
       tasksReportedClosed.add(taskid);
       close = true;
     } else if (isComplete() && 
-               !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
+               !(isMapTask() && !jobSetup && 
+                   !jobCleanup && isComplete(taskid)) &&
                !tasksReportedClosed.contains(taskid)) {
       tasksReportedClosed.add(taskid);
       close = true; 
@@ -516,6 +510,8 @@ class TaskInProgress {
       // @see {@link TaskTracker.transmitHeartbeat()}
       if ((newState != TaskStatus.State.RUNNING && 
            newState != TaskStatus.State.COMMIT_PENDING && 
+           newState != TaskStatus.State.FAILED_UNCLEAN && 
+           newState != TaskStatus.State.KILLED_UNCLEAN && 
            newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
@@ -531,6 +527,8 @@ class TaskInProgress {
           newState == TaskStatus.State.UNASSIGNED) &&
           (oldState == TaskStatus.State.FAILED || 
            oldState == TaskStatus.State.KILLED || 
+           oldState == TaskStatus.State.FAILED_UNCLEAN || 
+           oldState == TaskStatus.State.KILLED_UNCLEAN || 
            oldState == TaskStatus.State.SUCCEEDED ||
            oldState == TaskStatus.State.COMMIT_PENDING)) {
         return false;
@@ -538,8 +536,17 @@ class TaskInProgress {
           
       changed = oldState != newState;
     }
-        
-    taskStatuses.put(taskid, status);
+    // if task is a cleanup attempt, do not replace the complete status,
+    // update only specific fields.
+    // For example, startTime should not be updated, 
+    // but finishTime has to be updated.
+    if (!isCleanupAttempt(taskid)) {
+      taskStatuses.put(taskid, status);
+    } else {
+      taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+        status.getProgress(), status.getStateString(), status.getPhase(),
+        status.getFinishTime());
+    }
 
     // Recompute progress
     recomputeProgress();
@@ -551,29 +558,38 @@ class TaskInProgress {
    * has failed.
    */
   public void incompleteSubTask(TaskAttemptID taskid, 
-                                TaskTrackerStatus ttStatus,
                                 JobStatus jobStatus) {
     //
     // Note the failure and its location
     //
-    String trackerName = ttStatus.getTrackerName();
-    String trackerHostName = ttStatus.getHost();
-     
     TaskStatus status = taskStatuses.get(taskid);
+    String trackerName;
+    String trackerHostName = null;
     TaskStatus.State taskState = TaskStatus.State.FAILED;
     if (status != null) {
+      trackerName = status.getTaskTracker();
+      trackerHostName = 
+        JobInProgress.convertTrackerNameToHostName(trackerName);
       // Check if the user manually KILLED/FAILED this task-attempt...
       Boolean shouldFail = tasksToKill.remove(taskid);
       if (shouldFail != null) {
-        taskState = (shouldFail) ? TaskStatus.State.FAILED :
-                                   TaskStatus.State.KILLED;
+        if (isCleanupAttempt(taskid)) {
+          taskState = (shouldFail) ? TaskStatus.State.FAILED :
+                                     TaskStatus.State.KILLED;
+        } else {
+          taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
+                                     TaskStatus.State.KILLED_UNCLEAN;
+          
+        }
         status.setRunState(taskState);
         addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
       }
  
       taskState = status.getRunState();
       if (taskState != TaskStatus.State.FAILED && 
-              taskState != TaskStatus.State.KILLED) {
+          taskState != TaskStatus.State.KILLED &&
+          taskState != TaskStatus.State.FAILED_UNCLEAN &&
+          taskState != TaskStatus.State.KILLED_UNCLEAN) {
         LOG.info("Task '" + taskid + "' running on '" + trackerName + 
                 "' in state: '" + taskState + "' being failed!");
         status.setRunState(TaskStatus.State.FAILED);
@@ -594,7 +610,7 @@ class TaskInProgress {
     // should note this failure only for completed maps, only if this taskid;
     // completed this map. however if the job is done, there is no need to 
     // manipulate completed maps
-    if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) && 
+    if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) && 
         jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
       
@@ -614,7 +630,7 @@ class TaskInProgress {
           skipping = startSkipping();
         }
 
-      } else {
+      } else if (taskState == TaskStatus.State.KILLED) {
         numKilledTasks++;
       }
     }
@@ -741,6 +757,7 @@ class TaskInProgress {
     TaskStatus st = taskStatuses.get(taskId);
     if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
         || st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+        st.inTaskCleanupPhase() ||
         st.getRunState() == TaskStatus.State.UNASSIGNED)
         && tasksToKill.put(taskId, shouldFail) == null ) {
       String logStr = "Request received to " + (shouldFail ? "fail" : "kill") 
@@ -865,11 +882,17 @@ class TaskInProgress {
     return addRunningTask(taskid, taskTracker);
   }
   
+  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+    return addRunningTask(taskid, taskTracker, false);
+  }
+  
   /**
    * Adds a previously running task to this tip. This is used in case of 
    * jobtracker restarts.
    */
-  public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+  public Task addRunningTask(TaskAttemptID taskid, 
+                             String taskTracker,
+                             boolean taskCleanup) {
     // create the task
     Task t = null;
     if (isMapTask()) {
@@ -880,11 +903,17 @@ class TaskInProgress {
     } else {
       t = new ReduceTask(jobFile, taskid, partition, numMaps);
     }
-    if (cleanup) {
-      t.setCleanupTask();
+    if (jobCleanup) {
+      t.setJobCleanupTask();
+    }
+    if (jobSetup) {
+      t.setJobSetupTask();
     }
-    if (setup) {
-      t.setSetupTask();
+    if (taskCleanup) {
+      t.setTaskCleanupTask();
+      t.setState(taskStatuses.get(taskid).getRunState());
+      cleanupTasks.put(taskid, taskTracker);
+      jobtracker.removeTaskEntry(taskid);
     }
     t.setConf(conf);
     LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
@@ -893,7 +922,6 @@ class TaskInProgress {
     if(failedRanges.isTestAttempt()) {
       t.setWriteSkipRecs(false);
     }
-    tasks.put(taskid, t);
 
     activeTasks.put(taskid, taskTracker);
 
@@ -901,6 +929,23 @@ class TaskInProgress {
     jobtracker.createTaskEntry(taskid, taskTracker, this);
     return t;
   }
+
+  boolean isRunningTask(TaskAttemptID taskid) {
+    TaskStatus status = taskStatuses.get(taskid);
+    return status != null && status.getRunState() == TaskStatus.State.RUNNING;
+  }
+  
+  boolean isCleanupAttempt(TaskAttemptID taskid) {
+    return cleanupTasks.containsKey(taskid);
+  }
+  
+  String machineWhereCleanupRan(TaskAttemptID taskid) {
+    return cleanupTasks.get(taskid);
+  }
+  
+  String machineWhereTaskRan(TaskAttemptID taskid) {
+    return taskStatuses.get(taskid).getTaskTracker();
+  }
     
   /**
    * Has this task already failed on this machine?
@@ -987,7 +1032,7 @@ class TaskInProgress {
   }
 
   public long getMapInputSize() {
-    if(isMapTask() && !setup && !cleanup) {
+    if(isMapTask() && !jobSetup && !jobCleanup) {
       return rawSplit.getDataLength();
     } else {
       return 0;

+ 41 - 9
src/mapred/org/apache/hadoop/mapred/TaskLog.java

@@ -81,7 +81,14 @@ public class TaskLog {
   
   private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
       LogName filter) throws IOException {
-    File indexFile = new File(getBaseDir(taskid.toString()), "log.index");
+    return getLogFileDetail(taskid, filter, false);
+  }
+  
+  private static LogFileDetail getLogFileDetail(TaskAttemptID taskid, 
+                                                LogName filter,
+                                                boolean isCleanup) 
+  throws IOException {
+    File indexFile = getIndexFile(taskid.toString(), isCleanup);
     BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
     //the format of the index file is
     //LOG_DIR: <the dir where the task logs are really stored>
@@ -121,8 +128,17 @@ public class TaskLog {
   }
   
   public static File getIndexFile(String taskid) {
-    return new File(getBaseDir(taskid), "log.index");
+    return getIndexFile(taskid, false);
+  }
+  
+  public static File getIndexFile(String taskid, boolean isCleanup) {
+    if (isCleanup) {
+      return new File(getBaseDir(taskid), "log.index.cleanup");
+    } else {
+      return new File(getBaseDir(taskid), "log.index");
+    }
   }
+  
   private static File getBaseDir(String taskid) {
     return new File(LOG_DIR, taskid);
   }
@@ -130,9 +146,10 @@ public class TaskLog {
   private static long prevErrLength;
   private static long prevLogLength;
   
-  private static void writeToIndexFile(TaskAttemptID firstTaskid) 
+  private static void writeToIndexFile(TaskAttemptID firstTaskid,
+                                       boolean isCleanup) 
   throws IOException {
-    File indexFile = getIndexFile(currentTaskid.toString());
+    File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
     BufferedOutputStream bos = 
       new BufferedOutputStream(new FileOutputStream(indexFile,false));
     DataOutputStream dos = new DataOutputStream(bos);
@@ -160,8 +177,17 @@ public class TaskLog {
     prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
   }
   private volatile static TaskAttemptID currentTaskid = null;
+
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+                                           TaskAttemptID taskid) 
+  throws IOException {
+    syncLogs(firstTaskid, taskid, false);
+  }
+  
   @SuppressWarnings("unchecked")
-  public synchronized static void syncLogs(TaskAttemptID firstTaskid, TaskAttemptID taskid) 
+  public synchronized static void syncLogs(TaskAttemptID firstTaskid, 
+                                           TaskAttemptID taskid,
+                                           boolean isCleanup) 
   throws IOException {
     System.out.flush();
     System.err.flush();
@@ -180,10 +206,9 @@ public class TaskLog {
       currentTaskid = taskid;
       resetPrevLengths(firstTaskid);
     }
-    writeToIndexFile(firstTaskid);
+    writeToIndexFile(firstTaskid, isCleanup);
   }
   
-  
   /**
    * The filter for userlogs.
    */
@@ -250,6 +275,12 @@ public class TaskLog {
   static class Reader extends InputStream {
     private long bytesRemaining;
     private FileInputStream file;
+
+    public Reader(TaskAttemptID taskid, LogName kind, 
+                  long start, long end) throws IOException {
+      this(taskid, kind, start, end, false);
+    }
+    
     /**
      * Read a log file from start to end positions. The offsets may be negative,
      * in which case they are relative to the end of the file. For example,
@@ -259,12 +290,13 @@ public class TaskLog {
      * @param kind the kind of log to read
      * @param start the offset to read from (negative is relative to tail)
      * @param end the offset to read upto (negative is relative to tail)
+     * @param isCleanup whether the attempt is cleanup attempt or not
      * @throws IOException
      */
     public Reader(TaskAttemptID taskid, LogName kind, 
-                  long start, long end) throws IOException {
+                  long start, long end, boolean isCleanup) throws IOException {
       // find the right log file
-      LogFileDetail fileDetail = getTaskLogFileDetail(taskid, kind);
+      LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
       // calculate the start and stop
       long size = fileDetail.length;
       if (start < 0) {

+ 19 - 10
src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java

@@ -104,7 +104,8 @@ public class TaskLogServlet extends HttpServlet {
   private void printTaskLog(HttpServletResponse response,
                             OutputStream out, TaskAttemptID taskId, 
                             long start, long end, boolean plainText, 
-                            TaskLog.LogName filter) throws IOException {
+                            TaskLog.LogName filter, boolean isCleanup) 
+  throws IOException {
     if (!plainText) {
       out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" +
                  "<pre>\n").getBytes());
@@ -112,7 +113,7 @@ public class TaskLogServlet extends HttpServlet {
 
     try {
       InputStream taskLogReader = 
-        new TaskLog.Reader(taskId, filter, start, end);
+        new TaskLog.Reader(taskId, filter, start, end, isCleanup);
       byte[] b = new byte[65536];
       int result;
       while (true) {
@@ -159,6 +160,7 @@ public class TaskLogServlet extends HttpServlet {
     long end = -1;
     boolean plainText = false;
     TaskLog.LogName filter = null;
+    boolean isCleanup = false;
 
     String taskIdStr = request.getParameter("taskid");
     if (taskIdStr == null) {
@@ -193,7 +195,12 @@ public class TaskLogServlet extends HttpServlet {
     if (sPlainText != null) {
       plainText = Boolean.valueOf(sPlainText);
     }
-
+    
+    String sCleanup = request.getParameter("cleanup");
+    if (sCleanup != null) {
+      isCleanup = Boolean.valueOf(sCleanup);
+    }
+    
     OutputStream out = response.getOutputStream();
     if( !plainText ) {
       out.write(("<html>\n" +
@@ -203,21 +210,22 @@ public class TaskLogServlet extends HttpServlet {
 
       if (filter == null) {
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.STDOUT);
+                     TaskLog.LogName.STDOUT, isCleanup);
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.STDERR);
+                     TaskLog.LogName.STDERR, isCleanup);
         printTaskLog(response, out, taskId, start, end, plainText, 
-                     TaskLog.LogName.SYSLOG);
+                     TaskLog.LogName.SYSLOG, isCleanup);
         if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
           printTaskLog(response, out, taskId, start, end, plainText, 
-                       TaskLog.LogName.DEBUGOUT);
+                       TaskLog.LogName.DEBUGOUT, isCleanup);
         }
         if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
           printTaskLog(response, out, taskId, start, end, plainText, 
-                       TaskLog.LogName.PROFILE);
+                       TaskLog.LogName.PROFILE, isCleanup);
         }
       } else {
-        printTaskLog(response, out, taskId, start, end, plainText, filter);
+        printTaskLog(response, out, taskId, start, end, plainText, filter,
+                     isCleanup);
       }
       
       out.write("</body></html>\n".getBytes());
@@ -226,7 +234,8 @@ public class TaskLogServlet extends HttpServlet {
       response.sendError(HttpServletResponse.SC_BAD_REQUEST,
           "You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
     } else {
-      printTaskLog(response, out, taskId, start, end, plainText, filter);
+      printTaskLog(response, out, taskId, start, end, plainText, filter, 
+                   isCleanup);
     } 
   }
 }

+ 14 - 11
src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.mapred;
 
+import java.io.File;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -27,7 +28,6 @@ import java.util.ArrayList;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.TaskTracker;
 import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
@@ -66,10 +66,11 @@ class TaskMemoryManagerThread extends Thread {
         "mapred.tasktracker.taskmemorymanager.monitoring-interval", 5000L);
   }
 
-  public void addTask(TaskAttemptID tid, long memLimit) {
+  public void addTask(TaskAttemptID tid, long memLimit, String pidFile) {
     synchronized (tasksToBeAdded) {
       LOG.debug("Tracking ProcessTree " + tid + " for the first time");
-      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit);
+      ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, 
+                                                   memLimit, pidFile);
       tasksToBeAdded.put(tid, ptInfo);
     }
   }
@@ -85,13 +86,15 @@ class TaskMemoryManagerThread extends Thread {
     private String pid;
     private ProcfsBasedProcessTree pTree;
     private long memLimit;
+    private String pidFile;
 
     public ProcessTreeInfo(TaskAttemptID tid, String pid,
-        ProcfsBasedProcessTree pTree, long memLimit) {
+        ProcfsBasedProcessTree pTree, long memLimit, String pidFile) {
       this.tid = tid;
       this.pid = pid;
       this.pTree = pTree;
       this.memLimit = memLimit;
+      this.pidFile = pidFile;
     }
 
     public TaskAttemptID getTID() {
@@ -162,7 +165,8 @@ class TaskMemoryManagerThread extends Thread {
 
         // Initialize any uninitialized processTrees
         if (pId == null) {
-          pId = getPid(tid); // get pid from pid-file
+          // get pid from pid-file
+          pId = getPid(ptInfo.pidFile); 
           if (pId != null) {
             // PID will be null, either if the pid file is yet to be created
             // or if the tip is finished and we removed pidFile, but the TIP
@@ -305,15 +309,14 @@ class TaskMemoryManagerThread extends Thread {
   /**
    * Load pid of the task from the pidFile.
    * 
-   * @param tipID
+   * @param pidFileName
    * @return the pid of the task process.
    */
-  private String getPid(TaskAttemptID tipID) {
-    Path pidFileName = TaskTracker.getPidFilePath(tipID, taskTracker.getJobConf());
-    if (pidFileName == null) {
-      return null;
+  private String getPid(String pidFileName) {
+    if ((new File(pidFileName)).exists()) {
+      return ProcessTree.getPidFromPidFile(pidFileName);
     }
-    return ProcessTree.getPidFromPidFile(pidFileName.toString());
+    return null;
   }
 
 

+ 10 - 7
src/mapred/org/apache/hadoop/mapred/TaskRunner.java

@@ -113,9 +113,10 @@ abstract class TaskRunner extends Thread {
                           new Path(conf.getJar()).getParent().toString());
       }
       File workDir = new File(lDirAlloc.getLocalPathToRead(
-                                TaskTracker.getJobCacheSubdir() 
-                                + Path.SEPARATOR + t.getJobID() 
-                                + Path.SEPARATOR + t.getTaskID()
+                                TaskTracker.getLocalTaskDir( 
+                                  t.getJobID().toString(), 
+                                  t.getTaskID().toString(),
+                                  t.isTaskCleanupTask())
                                 + Path.SEPARATOR + MRConstants.WORKDIR,
                                 conf). toString());
 
@@ -374,10 +375,12 @@ abstract class TaskRunner extends Thread {
       vargs.add(Integer.toString(address.getPort())); 
       vargs.add(taskid.toString());                      // pass task identifier
 
-      String pidFile = null;
-      pidFile = lDirAlloc.getLocalPathForWrite(
-            (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid),
+      String pidFile = lDirAlloc.getLocalPathForWrite(
+            (TaskTracker.getPidFile(t.getJobID().toString(),
+               t.getTaskID().toString(), t.isTaskCleanupTask())),
             this.conf).toString();
+      t.setPidFile(pidFile);
+      tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
 
       // set memory limit using ulimit if feasible and necessary ...
       String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -456,7 +459,7 @@ abstract class TaskRunner extends Thread {
       }catch(IOException ie){
         LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
       }
-      tracker.reportTaskFinished(t.getTaskID(), false);
+      tip.reportTaskFinished();
     }
   }
   

+ 35 - 3
src/mapred/org/apache/hadoop/mapred/TaskStatus.java

@@ -41,7 +41,7 @@ abstract class TaskStatus implements Writable, Cloneable {
 
   // what state is the task in?
   public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED, 
-                            COMMIT_PENDING}
+                            COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
     
   private final TaskAttemptID taskid;
   private float progress;
@@ -204,6 +204,12 @@ abstract class TaskStatus implements Writable, Cloneable {
     }
     this.phase = phase; 
   }
+
+  boolean inTaskCleanupPhase() {
+    return (this.phase == TaskStatus.Phase.CLEANUP && 
+      (this.runState == TaskStatus.State.FAILED_UNCLEAN || 
+      this.runState == TaskStatus.State.KILLED_UNCLEAN));
+  }
   
   public boolean getIncludeCounters() {
     return includeCounters; 
@@ -261,9 +267,9 @@ abstract class TaskStatus implements Writable, Cloneable {
   /**
    * Update the status of the task.
    * 
+   * @param runstate
    * @param progress
    * @param state
-   * @param phase
    * @param counters
    */
   synchronized void statusUpdate(State runState, 
@@ -300,7 +306,33 @@ abstract class TaskStatus implements Writable, Cloneable {
     this.counters = status.getCounters();
     this.outputSize = status.outputSize;
   }
-  
+
+  /**
+   * Update specific fields of task status
+   * 
+   * This update is done in JobTracker when a cleanup attempt of task
+   * reports its status. Then update only specific fields, not all.
+   * 
+   * @param runState
+   * @param progress
+   * @param state
+   * @param phase
+   * @param finishTime
+   */
+  synchronized void statusUpdate(State runState, 
+                                 float progress,
+                                 String state, 
+                                 Phase phase,
+                                 long finishTime) {
+    setRunState(runState);
+    setProgress(progress);
+    setStateString(state);
+    setPhase(phase);
+    if (finishTime != 0) {
+      this.finishTime = finishTime; 
+    }
+  }
+
   /**
    * Clear out transient information after sending out a status-update
    * from either the {@link Task} to the {@link TaskTracker} or from the

+ 191 - 79
src/mapred/org/apache/hadoop/mapred/TaskTracker.java

@@ -183,7 +183,8 @@ public class TaskTracker
   private static final String SUBDIR = "taskTracker";
   private static final String CACHEDIR = "archive";
   private static final String JOBCACHE = "jobcache";
-  private static final String PIDDIR = "pids";
+  private static final String PID = "pid";
+  private static final String OUTPUT = "output";
   private JobConf originalConf;
   private JobConf fConf;
   private int maxCurrentMapTasks;
@@ -412,38 +413,63 @@ public class TaskTracker
   static String getJobCacheSubdir() {
     return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
   }
+  
+  static String getLocalJobDir(String jobid) {
+	return getJobCacheSubdir() + Path.SEPARATOR + jobid; 
+  }
 
-  static String getPidFilesSubdir() {
-    return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+  static String getLocalTaskDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid, false) ; 
   }
- 
+
+  static String getIntermediateOutputDir(String jobid, String taskid) {
+	return getLocalTaskDir(jobid, taskid) 
+           + Path.SEPARATOR + TaskTracker.OUTPUT ; 
+  }
+
+  static String getLocalTaskDir(String jobid, 
+                                String taskid, 
+                                boolean isCleanupAttempt) {
+	String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+	if (isCleanupAttempt) { 
+      taskDir = taskDir + ".cleanup";
+	}
+	return taskDir;
+  }
+
+  static String getPidFile(String jobid, 
+                           String taskid, 
+                           boolean isCleanup) {
+    return  getLocalTaskDir(jobid, taskid, isCleanup)
+            + Path.SEPARATOR + PID;
+  }
+
   /**
    * Get the pidFile path of a Task
+   * 
    * @param tid the TaskAttemptID of the task for which pidFile's path is needed
+   * @param conf Configuration for local dir allocator
+   * @param isCleanup true if the task is cleanup attempt
+   *  
    * @return pidFile's Path
    */
-  public static Path getPidFilePath(TaskAttemptID tid, JobConf conf) {
+  static Path getPidFilePath(TaskAttemptID tid, 
+                             JobConf conf, 
+                             boolean isCleanup) {
     Path pidFileName = null;
     try {
       //this actually need not use a localdirAllocator since the PID
       //files are really small..
       pidFileName = lDirAlloc.getLocalPathToRead(
-          (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tid),
-          conf);
+        getPidFile(tid.getJobID().toString(), tid.toString(), isCleanup),
+        conf);
     } catch (IOException i) {
       // PID file is not there
-      LOG.warn("Failed to get pidFile name for " + tid + " " + i);
+      LOG.warn("Failed to get pidFile name for " + tid + " " + 
+                StringUtils.stringifyException(i));
     }
     return pidFileName;
   }
-  public void removePidFile(TaskAttemptID tid) {
-    Path pidFilePath = getPidFilePath(tid, getJobConf());
-    if (pidFilePath != null) {
-      try {
-        FileSystem.getLocal(getJobConf()).delete(pidFilePath, false);
-      } catch(IOException ie) {}
-    }
-  }
   
   public long getProtocolVersion(String protocol, 
                                  long clientVersion) throws IOException {
@@ -785,9 +811,9 @@ public class TaskTracker
     } catch(FileNotFoundException fe) {
       jobFileSize = -1;
     }
-    Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                                    + Path.SEPARATOR + jobId 
-                                    + Path.SEPARATOR + "job.xml"),
+    Path localJobFile = lDirAlloc.getLocalPathForWrite(
+                                    getLocalJobDir(jobId.toString())
+                                    + Path.SEPARATOR + "job.xml",
                                     jobFileSize, fConf);
     RunningJob rjob = addTaskToJob(jobId, tip);
     synchronized (rjob) {
@@ -811,9 +837,9 @@ public class TaskTracker
         
         // create the 'work' directory
         // job-specific shared directory for use as scratch space 
-        Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
-                       + Path.SEPARATOR + jobId 
-                       + Path.SEPARATOR + "work"), fConf);
+        Path workDir = lDirAlloc.getLocalPathForWrite(
+                         (getLocalJobDir(jobId.toString())
+                         + Path.SEPARATOR + "work"), fConf);
         if (!localFs.mkdirs(workDir)) {
           throw new IOException("Mkdirs failed to create " 
                       + workDir.toString());
@@ -835,8 +861,7 @@ public class TaskTracker
           // Here we check for and we check five times the size of jarFileSize
           // to accommodate for unjarring the jar file in work directory 
           localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
-                                     getJobCacheSubdir()
-                                     + Path.SEPARATOR + jobId 
+                                     getLocalJobDir(jobId.toString())
                                      + Path.SEPARATOR + "jars",
                                      5 * jarFileSize, fConf), "job.jar");
           if (!localFs.mkdirs(localJarFile.getParent())) {
@@ -1253,7 +1278,8 @@ public class TaskTracker
       for (TaskStatus taskStatus : status.getTaskReports()) {
         if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
             taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
-            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+            taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+            !taskStatus.inTaskCleanupPhase()) {
           if (taskStatus.getIsMap()) {
             mapTotal--;
           } else {
@@ -1370,7 +1396,8 @@ public class TaskTracker
     long now = System.currentTimeMillis();
     for (TaskInProgress tip: runningTasks.values()) {
       if (tip.getRunState() == TaskStatus.State.RUNNING ||
-          tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          tip.isCleaningup()) {
         // Check the per-job timeout interval for tasks;
         // an interval of '0' implies it is never timed-out
         long jobTaskTimeout = tip.getTaskTimeout();
@@ -1424,8 +1451,7 @@ public class TaskTracker
         // task if the job is done/failed
         if (!rjob.keepJobFiles){
           directoryCleanupThread.addToQueue(getLocalFiles(fConf, 
-                                   SUBDIR + Path.SEPARATOR + JOBCACHE + 
-                                   Path.SEPARATOR +  rjob.getJobID()));
+            getLocalJobDir(rjob.getJobID().toString())));
         }
         // Remove this job 
         rjob.tasks.clear();
@@ -1684,7 +1710,9 @@ public class TaskTracker
           }
           synchronized (tip) {
             //to make sure that there is no kill task action for this
-            if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+            if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
+                tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+                tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
               //got killed externally while still in the launcher queue
               addFreeSlot();
               continue;
@@ -1705,7 +1733,8 @@ public class TaskTracker
   private TaskInProgress registerTask(LaunchTaskAction action, 
       TaskLauncher launcher) {
     Task t = action.getTask();
-    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
+    LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+             " task's state:" + t.getState());
     TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
     synchronized (this) {
       tasks.put(t.getTaskID(), tip);
@@ -1727,10 +1756,6 @@ public class TaskTracker
   private void startNewTask(TaskInProgress tip) {
     try {
       localizeJob(tip);
-      if (isTaskMemoryManagerEnabled()) {
-        taskMemoryManager.addTask(tip.getTask().getTaskID(), 
-            getVirtualMemoryForTask(tip.getJobConf()));
-      }
     } catch (Throwable e) {
       String msg = ("Error initializing " + tip.getTask().getTaskID() + 
                     ":\n" + StringUtils.stringifyException(e));
@@ -1751,7 +1776,23 @@ public class TaskTracker
       }
     }
   }
-    
+  
+  void addToMemoryManager(TaskAttemptID attemptId, 
+                          JobConf conf, 
+                          String pidFile) {
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.addTask(attemptId, 
+        getVirtualMemoryForTask(conf), pidFile);
+    }
+  }
+
+  void removeFromMemoryManager(TaskAttemptID attemptId) {
+    // Remove the entry from taskMemoryManagerThread's data structures.
+    if (isTaskMemoryManagerEnabled()) {
+      taskMemoryManager.removeTask(attemptId);
+    }
+  }
+
   /**
    * The server retry loop.  
    * This while-loop attempts to connect to the JobTracker.  It only 
@@ -1838,10 +1879,12 @@ public class TaskTracker
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
-                                               TaskStatus.State.UNASSIGNED, 
+                                               task.getState(),
                                                diagnosticInfo.toString(), 
                                                "initializing",  
                                                getName(), 
+                                               task.isTaskCleanupTask() ? 
+                                                 TaskStatus.Phase.CLEANUP :  
                                                task.isMapTask()? TaskStatus.Phase.MAP:
                                                TaskStatus.Phase.SHUFFLE,
                                                task.getCounters()); 
@@ -1851,9 +1894,10 @@ public class TaskTracker
     private void localizeTask(Task task) throws IOException{
 
       Path localTaskDir = 
-        lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() + 
-                    Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
-                    task.getTaskID()), defaultJobConf );
+        lDirAlloc.getLocalPathForWrite(
+          TaskTracker.getLocalTaskDir(task.getJobID().toString(), 
+            task.getTaskID().toString(), task.isTaskCleanupTask()), 
+          defaultJobConf );
       
       FileSystem localFs = FileSystem.getLocal(fConf);
       if (!localFs.mkdirs(localTaskDir)) {
@@ -1863,8 +1907,7 @@ public class TaskTracker
 
       // create symlink for ../work if it already doesnt exist
       String workDir = lDirAlloc.getLocalPathToRead(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
+                         TaskTracker.getLocalJobDir(task.getJobID().toString())
                          + Path.SEPARATOR  
                          + "work", defaultJobConf).toString();
       String link = localTaskDir.getParent().toString() 
@@ -1875,11 +1918,10 @@ public class TaskTracker
       
       // create the working-directory of the task 
       Path cwd = lDirAlloc.getLocalPathForWrite(
-                         TaskTracker.getJobCacheSubdir() 
-                         + Path.SEPARATOR + task.getJobID() 
-                         + Path.SEPARATOR + task.getTaskID()
-                         + Path.SEPARATOR + MRConstants.WORKDIR,
-                         defaultJobConf);
+                   getLocalTaskDir(task.getJobID().toString(), 
+                      task.getTaskID().toString(), task.isTaskCleanupTask()) 
+                   + Path.SEPARATOR + MRConstants.WORKDIR,
+                   defaultJobConf);
       if (!localFs.mkdirs(cwd)) {
         throw new IOException("Mkdirs failed to create " 
                     + cwd.toString());
@@ -1974,9 +2016,13 @@ public class TaskTracker
      * Kick off the task execution
      */
     public synchronized void launchTask() throws IOException {
-      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+      if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
+          this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+          this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
         localizeTask(task);
-        this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+          this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+        }
         this.runner = task.createRunner(TaskTracker.this, this);
         this.runner.start();
         this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1986,6 +2032,10 @@ public class TaskTracker
       }
     }
 
+    boolean isCleaningup() {
+   	  return this.taskStatus.inTaskCleanupPhase();
+    }
+    
     /**
      * The task is reporting its progress
      */
@@ -1993,10 +2043,14 @@ public class TaskTracker
     {
       LOG.info(task.getTaskID() + " " + taskStatus.getProgress() + 
           "% " + taskStatus.getStateString());
-      
+      // task will report its state as
+      // COMMIT_PENDING when it is waiting for commit response and 
+      // when it is committing.
+      // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
       if (this.done || 
           (this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
-          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
+          this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+          !isCleaningup())) {
         //make sure we ignore progress messages after a task has 
         //invoked TaskUmbilicalProtocol.done() or if the task has been
         //KILLED/FAILED
@@ -2047,7 +2101,16 @@ public class TaskTracker
      * The task is reporting that it's done running
      */
     public synchronized void reportDone() {
-      this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      if (isCleaningup()) {
+        if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.FAILED);
+        } else if (this.taskStatus.getRunState() == 
+                   TaskStatus.State.KILLED_UNCLEAN) {
+          this.taskStatus.setRunState(TaskStatus.State.KILLED);
+        }
+      } else {
+        this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+      }
       this.taskStatus.setProgress(1.0f);
       this.taskStatus.setFinishTime(System.currentTimeMillis());
       this.done = true;
@@ -2062,6 +2125,11 @@ public class TaskTracker
       return wasKilled;
     }
 
+    void reportTaskFinished() {
+      taskFinished();
+      releaseSlot();
+    }
+
     /**
      * The task has actually finished running.
      */
@@ -2088,7 +2156,23 @@ public class TaskTracker
         if (!done) {
           if (!wasKilled) {
             failures += 1;
-            taskStatus.setRunState(TaskStatus.State.FAILED);
+            /* State changes:
+             * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
+             * FAILED_UNCLEAN -> FAILED 
+             * KILLED_UNCLEAN -> KILLED 
+             */
+            if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            } else if (taskStatus.getRunState() == 
+                       TaskStatus.State.KILLED_UNCLEAN) {
+              taskStatus.setRunState(TaskStatus.State.KILLED);
+            } else if (task.isMapOrReduce() && 
+                       taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+              taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+            } else {
+              taskStatus.setRunState(TaskStatus.State.FAILED);
+            }
+            removeFromMemoryManager(task.getTaskID());
             // call the script here for the failed tasks.
             if (debugCommand != null) {
               String taskStdout ="";
@@ -2114,9 +2198,10 @@ public class TaskTracker
               File workDir = null;
               try {
                 workDir = new File(lDirAlloc.getLocalPathToRead(
-                                     TaskTracker.getJobCacheSubdir() 
-                                     + Path.SEPARATOR + task.getJobID() 
-                                     + Path.SEPARATOR + task.getTaskID()
+                                     TaskTracker.getLocalTaskDir( 
+                                       task.getJobID().toString(), 
+                                       task.getTaskID().toString(),
+                                       task.isTaskCleanupTask())
                                      + Path.SEPARATOR + MRConstants.WORKDIR,
                                      localJobConf). toString());
               } catch (IOException e) {
@@ -2169,14 +2254,14 @@ public class TaskTracker
                 LOG.warn("Exception in add diagnostics!");
               }
             }
-          } else {
-            taskStatus.setRunState(TaskStatus.State.KILLED);
           }
           taskStatus.setProgress(0.0f);
         }
         this.taskStatus.setFinishTime(System.currentTimeMillis());
         needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED || 
-                       taskStatus.getRunState() == TaskStatus.State.KILLED);
+                taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+                taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN || 
+                taskStatus.getRunState() == TaskStatus.State.KILLED);
       }
 
       //
@@ -2286,7 +2371,8 @@ public class TaskTracker
       synchronized(this){
         if (getRunState() == TaskStatus.State.RUNNING ||
             getRunState() == TaskStatus.State.UNASSIGNED ||
-            getRunState() == TaskStatus.State.COMMIT_PENDING) {
+            getRunState() == TaskStatus.State.COMMIT_PENDING ||
+            isCleaningup()) {
           kill(wasFailure);
         }
       }
@@ -2297,19 +2383,46 @@ public class TaskTracker
 
     /**
      * Something went wrong and the task must be killed.
+     * 
+     * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+     * FAILED_UNCLEAN -> FAILED 
+     * KILLED_UNCLEAN -> KILLED
+     * UNASSIGNED -> FAILED/KILLED
      * @param wasFailure was it a failure (versus a kill request)?
      */
     public synchronized void kill(boolean wasFailure) throws IOException {
+      /* State changes:
+       * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
+       * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+       * FAILED_UNCLEAN -> FAILED 
+       * KILLED_UNCLEAN -> KILLED
+       * UNASSIGNED -> FAILED/KILLED 
+       */
       if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
-          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+          taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+          isCleaningup()) {
         wasKilled = true;
         if (wasFailure) {
           failures += 1;
         }
         runner.kill();
-        taskStatus.setRunState((wasFailure) ? 
-                                  TaskStatus.State.FAILED : 
-                                  TaskStatus.State.KILLED);
+        if (task.isMapOrReduce()) {
+          taskStatus.setRunState((wasFailure) ? 
+                                    TaskStatus.State.FAILED_UNCLEAN : 
+                                    TaskStatus.State.KILLED_UNCLEAN);
+        } else {
+          // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+          if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.FAILED);
+          } else if (taskStatus.getRunState() == 
+                     TaskStatus.State.KILLED_UNCLEAN) {
+            taskStatus.setRunState(TaskStatus.State.KILLED);
+          } else {
+            taskStatus.setRunState((wasFailure) ? 
+                                      TaskStatus.State.FAILED : 
+                                      TaskStatus.State.KILLED);
+          }
+        }
       } else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
           failures += 1;
@@ -2318,6 +2431,7 @@ public class TaskTracker
           taskStatus.setRunState(TaskStatus.State.KILLED);
         }
       }
+      removeFromMemoryManager(task.getTaskID());
       releaseSlot();
     }
     
@@ -2369,7 +2483,12 @@ public class TaskTracker
 
       synchronized (TaskTracker.this) {
         if (needCleanup) {
-          tasks.remove(taskId);
+          // see if tasks data structure is holding this tip.
+          // tasks could hold the tip for cleanup attempt, if cleanup attempt 
+          // got launched before this method.
+          if (tasks.get(taskId) == this) {
+            tasks.remove(taskId);
+          }
         }
         synchronized (this){
           if (alwaysKeepTaskFiles ||
@@ -2381,8 +2500,8 @@ public class TaskTracker
       }
       synchronized (this) {
         try {
-          String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
-                           + task.getJobID() + Path.SEPARATOR + taskId;
+          String taskDir = getLocalTaskDir(task.getJobID().toString(),
+                             taskId.toString(), task.isTaskCleanupTask());
           if (needCleanup) {
             if (runner != null) {
               //cleans up the output directory of the task (where map outputs 
@@ -2603,15 +2722,10 @@ public class TaskTracker
     }
     if (tip != null) {
       if (!commitPending) {
-        tip.taskFinished();
-        // Remove the entry from taskMemoryManagerThread's data structures.
-        if (isTaskMemoryManagerEnabled()) {
-          taskMemoryManager.removeTask(taskid);
-        }
-        tip.releaseSlot();
+        tip.reportTaskFinished();
       }
     } else {
-      LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
+      LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
     }
   }
   
@@ -2838,15 +2952,13 @@ public class TaskTracker
 
         // Index file
         Path indexFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out.index", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out.index", conf);
         
         // Map-output file
         Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
-            TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + 
-            jobId + Path.SEPARATOR +
-            mapId + "/output" + "/file.out", conf);
+            TaskTracker.getIntermediateOutputDir(jobId, mapId)
+            + "/file.out", conf);
 
         /**
          * Read the index file to get the information about where

+ 4 - 2
src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -250,7 +250,8 @@ class TaskTrackerStatus implements Writable {
       TaskStatus.State state = ts.getRunState();
       if (ts.getIsMap() &&
           ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         mapCount++;
       }
     }
@@ -267,7 +268,8 @@ class TaskTrackerStatus implements Writable {
       TaskStatus.State state = ts.getRunState();
       if ((!ts.getIsMap()) &&
           ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED))) {
+           (state == TaskStatus.State.UNASSIGNED) ||
+           ts.inTaskCleanupPhase())) {
         reduceCount++;
       }
     }

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java

@@ -52,9 +52,10 @@ interface TaskUmbilicalProtocol extends VersionedProtocol {
    *            encapsulates the events and whether to reset events index.
    * Version 13 changed the getTask method signature for HADOOP-249
    * Version 14 changed the getTask method signature for HADOOP-4232
+   * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
    * */
 
-  public static final long versionID = 14L;
+  public static final long versionID = 15L;
   
   /**
    * Called when a child task process starts, to get its task.

+ 4 - 2
src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java

@@ -174,8 +174,10 @@ public class FileOutputCommitter extends OutputCommitter {
   @Override
   public void abortTask(TaskAttemptContext context) {
     try {
-      context.progress();
-      outputFileSystem.delete(workPath, true);
+      if (workPath != null) { 
+        context.progress();
+        outputFileSystem.delete(workPath, true);
+      }
     } catch (IOException ie) {
       LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
     }

+ 2 - 2
src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java

@@ -110,7 +110,7 @@ public class TestKillSubProcesses extends TestCase {
         JobConf confForThisTask = new JobConf(conf);
         confForThisTask.set("mapred.local.dir", localDir);//set the localDir
 
-        Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+        Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
         while (pidFilePath == null) {
           //wait till the pid file is created
           try {
@@ -119,7 +119,7 @@ public class TestKillSubProcesses extends TestCase {
             LOG.warn("sleep is interrupted:" + ie);
             break;
           }
-          pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+          pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
         }
 
         pid = ProcessTree.getPidFromPidFile(pidFilePath.toString());

+ 63 - 8
src/webapps/job/taskdetails.jsp

@@ -67,13 +67,19 @@
         }
       }
     }
-    TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
-        : null;
+    TaskInProgress tip = null;
+    if (job != null && tipidObj != null) {
+      tip = job.getTaskInProgress(tipidObj);
+    }
+    TaskStatus[] ts = null;
+    if (tip != null) { 
+      ts = tip.getTaskStatuses();
+    }
     boolean isCleanupOrSetup = false;
-    if (tipidObj != null) { 
-      isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+    if ( tip != null) {
+      isCleanupOrSetup = tip.isJobCleanupTask();
       if (!isCleanupOrSetup) {
-        isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+        isCleanupOrSetup = tip.isJobSetupTask();
       }
     }
 %>
@@ -115,14 +121,41 @@
       TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
       out.print("<tr><td>" + status.getTaskID() + "</td>");
       String taskAttemptTracker = null;
+      String cleanupTrackerName = null;
+      TaskTrackerStatus cleanupTracker = null;
+      String cleanupAttemptTracker = null;
+      boolean hasCleanupAttempt = false;
+      if (tip != null && tip.isCleanupAttempt(status.getTaskID())) {
+        cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID());
+        cleanupTracker = tracker.getTaskTracker(cleanupTrackerName);
+        if (cleanupTracker != null) {
+          cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":"
+            + cleanupTracker.getHttpPort();
+        }
+        hasCleanupAttempt = true;
+      }
+      out.print("<td>");
+      if (hasCleanupAttempt) {
+        out.print("Task attempt: ");
+      }
       if (taskTracker == null) {
-        out.print("<td>" + taskTrackerName + "</td>");
+        out.print(taskTrackerName);
       } else {
         taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
           + taskTracker.getHttpPort();
-        out.print("<td><a href=\"" + taskAttemptTracker + "\">"
-          + tracker.getNode(taskTracker.getHost()) + "</a></td>");
+        out.print("<a href=\"" + taskAttemptTracker + "\">"
+          + tracker.getNode(taskTracker.getHost()) + "</a>");
+      }
+      if (hasCleanupAttempt) {
+        out.print("<br/>Cleanup Attempt: ");
+        if (cleanupAttemptTracker == null ) {
+          out.print(cleanupTrackerName);
+        } else {
+          out.print("<a href=\"" + cleanupAttemptTracker + "\">"
+            + tracker.getNode(cleanupTracker.getHost()) + "</a>");
         }
+      }
+      out.print("</td>");
         out.print("<td>" + status.getRunState() + "</td>");
         out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
           + ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>");
@@ -162,6 +195,9 @@
         						String.valueOf(taskTracker.getHttpPort()),
         						status.getTaskID().toString());
       	}
+        if (hasCleanupAttempt) {
+          out.print("Task attempt: <br/>");
+        }
         if (taskLogUrl == null) {
           out.print("n/a");
         } else {
@@ -172,6 +208,25 @@
           out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
           out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
         }
+        if (hasCleanupAttempt) {
+          out.print("Cleanup attempt: <br/>");
+          taskLogUrl = null;
+          if (cleanupTracker != null ) {
+        	taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(),
+                                String.valueOf(cleanupTracker.getHttpPort()),
+                                status.getTaskID().toString());
+      	  }
+          if (taskLogUrl == null) {
+            out.print("n/a");
+          } else {
+            String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true";
+            String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true";
+            String entireLogUrl = taskLogUrl + "&all=true&cleanup=true";
+            out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+            out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+            out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+          }
+        }
         out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
           + "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
           + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");