Browse Source

HADOOP-1472. Fix so that timed-out tasks are counted as failures rather than as killed. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@547790 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 years ago
parent
commit
d53342a787
2 changed files with 31 additions and 45 deletions
  1. 3 0
      CHANGES.txt
  2. 28 45
      src/java/org/apache/hadoop/mapred/TaskTracker.java

+ 3 - 0
CHANGES.txt

@@ -130,6 +130,9 @@ Trunk (unreleased changes)
  41. HADOOP-1457.  Add counters for monitoring task assignments.
  41. HADOOP-1457.  Add counters for monitoring task assignments.
      (Arun C Murthy via tomwhite)
      (Arun C Murthy via tomwhite)
 
 
+ 42. HADOOP-1472.  Fix so that timed-out tasks are counted as failures
+     rather than as killed.  (Arun C Murthy via cutting)
+
 
 
 Release 0.13.0 - 2007-06-08
 Release 0.13.0 - 2007-06-08
 
 

+ 28 - 45
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -215,7 +215,7 @@ public class TaskTracker
                 }
                 }
                 LOG.info("Received KillTaskAction for task: " + 
                 LOG.info("Received KillTaskAction for task: " + 
                          killAction.getTaskId());
                          killAction.getTaskId());
-                purgeTask(tip);
+                purgeTask(tip, false);
               } else {
               } else {
                 LOG.error("Non-delete action given to cleanup thread: "
                 LOG.error("Non-delete action given to cleanup thread: "
                           + action);
                           + action);
@@ -623,7 +623,7 @@ public class TaskTracker
       new TreeMap<String, TaskInProgress>();
       new TreeMap<String, TaskInProgress>();
     tasksToClose.putAll(tasks);
     tasksToClose.putAll(tasks);
     for (TaskInProgress tip : tasksToClose.values()) {
     for (TaskInProgress tip : tasksToClose.values()) {
-      tip.jobHasFinished();
+      tip.jobHasFinished(false);
     }
     }
 
 
     // Shutdown local RPC servers.  Do them
     // Shutdown local RPC servers.  Do them
@@ -920,13 +920,13 @@ public class TaskTracker
         // time-period greater than the configured time-out
         // time-period greater than the configured time-out
         long timeSinceLastReport = now - tip.getLastProgressReport();
         long timeSinceLastReport = now - tip.getLastProgressReport();
         if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
         if (timeSinceLastReport > jobTaskTimeout && !tip.wasKilled) {
-          String msg = "Task failed to report status for " +
-            (timeSinceLastReport / 1000) + 
-            " seconds. Killing.";
+          String msg = 
+            "Task " + tip.getTask().getTaskId() + " failed to report status for " 
+            + (timeSinceLastReport / 1000) + " seconds. Killing!";
           LOG.info(tip.getTask().getTaskId() + ": " + msg);
           LOG.info(tip.getTask().getTaskId() + ": " + msg);
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
           tip.reportDiagnosticInfo(msg);
           tip.reportDiagnosticInfo(msg);
-          purgeTask(tip);
+          purgeTask(tip, true);
         }
         }
       }
       }
     }
     }
@@ -951,7 +951,7 @@ public class TaskTracker
       synchronized (rjob) {            
       synchronized (rjob) {            
         // Add this tips of this job to queue of tasks to be purged 
         // Add this tips of this job to queue of tasks to be purged 
         for (TaskInProgress tip : rjob.tasks) {
         for (TaskInProgress tip : rjob.tasks) {
-          tip.jobHasFinished();
+          tip.jobHasFinished(false);
         }
         }
         // Delete the job directory for this  
         // Delete the job directory for this  
         // task if the job is done/failed
         // task if the job is done/failed
@@ -974,17 +974,17 @@ public class TaskTracker
    * Remove the tip and update all relevant state.
    * Remove the tip and update all relevant state.
    * 
    * 
    * @param tip {@link TaskInProgress} to be removed.
    * @param tip {@link TaskInProgress} to be removed.
-   * @param purgeJobFiles <code>true</code> if the job files are to be
-   *                      purged, <code>false</code> otherwise.
+   * @param wasFailure did the task fail or was it killed?
    */
    */
-  private void purgeTask(TaskInProgress tip) throws IOException {
+  private void purgeTask(TaskInProgress tip, boolean wasFailure) 
+  throws IOException {
     if (tip != null) {
     if (tip != null) {
       LOG.info("About to purge task: " + tip.getTask().getTaskId());
       LOG.info("About to purge task: " + tip.getTask().getTaskId());
         
         
       // Remove the task from running jobs, 
       // Remove the task from running jobs, 
       // removing the job if it's the last task
       // removing the job if it's the last task
       removeTaskFromJob(tip.getTask().getJobId(), tip);
       removeTaskFromJob(tip.getTask().getJobId(), tip);
-      tip.jobHasFinished();
+      tip.jobHasFinished(wasFailure);
     }
     }
   }
   }
 
 
@@ -1008,7 +1008,7 @@ public class TaskTracker
             " Killing task.";
             " Killing task.";
           LOG.info(killMe.getTask().getTaskId() + ": " + msg);
           LOG.info(killMe.getTask().getTaskId() + ": " + msg);
           killMe.reportDiagnosticInfo(msg);
           killMe.reportDiagnosticInfo(msg);
-          purgeTask(killMe);
+          purgeTask(killMe, false);
         }
         }
       }
       }
     }
     }
@@ -1105,7 +1105,7 @@ public class TaskTracker
       LOG.warn(msg);
       LOG.warn(msg);
       tip.reportDiagnosticInfo(msg);
       tip.reportDiagnosticInfo(msg);
       try {
       try {
-        tip.killAndCleanup(true);
+        tip.kill(true);
       } catch (IOException ie2) {
       } catch (IOException ie2) {
         LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" +
         LOG.info("Error cleaning up " + tip.getTask().getTaskId() + ":\n" +
                  StringUtils.stringifyException(ie2));          
                  StringUtils.stringifyException(ie2));          
@@ -1187,7 +1187,6 @@ public class TaskTracker
     private boolean keepFailedTaskFiles;
     private boolean keepFailedTaskFiles;
     private boolean alwaysKeepTaskFiles;
     private boolean alwaysKeepTaskFiles;
     private TaskStatus taskStatus; 
     private TaskStatus taskStatus; 
-    private boolean keepJobFiles;
     private long taskTimeout;
     private long taskTimeout;
         
         
     /**
     /**
@@ -1207,7 +1206,6 @@ public class TaskTracker
                                   getName(), task.isMapTask()? TaskStatus.Phase.MAP:
                                   getName(), task.isMapTask()? TaskStatus.Phase.MAP:
                                   TaskStatus.Phase.SHUFFLE,
                                   TaskStatus.Phase.SHUFFLE,
                                   task.getCounters()); 
                                   task.getCounters()); 
-      keepJobFiles = false;
       taskTimeout = (10 * 60 * 1000);
       taskTimeout = (10 * 60 * 1000);
     }
     }
         
         
@@ -1236,7 +1234,6 @@ public class TaskTracker
       task.setConf(localJobConf);
       task.setConf(localJobConf);
       String keepPattern = localJobConf.getKeepTaskFilesPattern();
       String keepPattern = localJobConf.getKeepTaskFilesPattern();
       if (keepPattern != null) {
       if (keepPattern != null) {
-        keepJobFiles = true;
         alwaysKeepTaskFiles = 
         alwaysKeepTaskFiles = 
           Pattern.matches(keepPattern, task.getTaskId());
           Pattern.matches(keepPattern, task.getTaskId());
       } else {
       } else {
@@ -1408,50 +1405,36 @@ public class TaskTracker
 
 
     /**
     /**
      * We no longer need anything from this task, as the job has
      * We no longer need anything from this task, as the job has
-     * finished.  If the task is still running, kill it (and clean up
+     * finished.  If the task is still running, kill it and clean up.
+     * 
+     * @param wasFailure did the task fail, as opposed to was it killed by
+     *                   the framework
      */
      */
-    public void jobHasFinished() throws IOException {
-      boolean killTask = false;  
-      synchronized(this){
-        killTask = (getRunState() == TaskStatus.State.RUNNING);
-        if (killTask) {
-          killAndCleanup(false);
-        }
-      }
-      if (!killTask) {
-        cleanup();
-      }
-      if (keepJobFiles)
-        return;
-              
+    public void jobHasFinished(boolean wasFailure) throws IOException {
+      // Kill the task if it is still running
       synchronized(this){
       synchronized(this){
-        // Delete temp directory in case any task used PhasedFileSystem.
-        try{
-          String systemDir = task.getConf().get("mapred.system.dir");
-          Path taskTempDir = new Path(systemDir + "/" + 
-                                      task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId());
-          if (fs.exists(taskTempDir)){
-            fs.delete(taskTempDir);
-          }
-        }catch(IOException e){
-          LOG.warn("Error in deleting reduce temporary output", e); 
+        if (getRunState() == TaskStatus.State.RUNNING) {
+          kill(wasFailure);
         }
         }
       }
       }
+      
+      // Cleanup on the finished task
+      cleanup();
     }
     }
 
 
     /**
     /**
      * Something went wrong and the task must be killed.
      * Something went wrong and the task must be killed.
      * @param wasFailure was it a failure (versus a kill request)?
      * @param wasFailure was it a failure (versus a kill request)?
      */
      */
-    public synchronized void killAndCleanup(boolean wasFailure
-                                            ) throws IOException {
+    public synchronized void kill(boolean wasFailure) throws IOException {
       if (runstate == TaskStatus.State.RUNNING) {
       if (runstate == TaskStatus.State.RUNNING) {
         wasKilled = true;
         wasKilled = true;
         if (wasFailure) {
         if (wasFailure) {
           failures += 1;
           failures += 1;
         }
         }
         runner.kill();
         runner.kill();
-        runstate = TaskStatus.State.KILLED;
+        runstate = 
+          (wasFailure) ? TaskStatus.State.FAILED : TaskStatus.State.KILLED;
       } else if (runstate == TaskStatus.State.UNASSIGNED) {
       } else if (runstate == TaskStatus.State.UNASSIGNED) {
         if (wasFailure) {
         if (wasFailure) {
           failures += 1;
           failures += 1;
@@ -1596,7 +1579,7 @@ public class TaskTracker
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
     LOG.fatal("Task: " + taskId + " - Killed due to FSError: " + message);
     TaskInProgress tip = runningTasks.get(taskId);
     TaskInProgress tip = runningTasks.get(taskId);
     tip.reportDiagnosticInfo("FSError: " + message);
     tip.reportDiagnosticInfo("FSError: " + message);
-    purgeTask(tip);
+    purgeTask(tip, true);
   }
   }
 
 
   public TaskCompletionEvent[] getMapCompletionEvents(
   public TaskCompletionEvent[] getMapCompletionEvents(