瀏覽代碼

HADOOP-1576. Fix errors in count of completed tasks when speculative execution is enabled. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@555742 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父節點
當前提交
ce6e9c74bf

+ 4 - 0
CHANGES.txt

@@ -334,6 +334,10 @@ Trunk (unreleased changes)
 103. HADOOP-1585.  Modify GenericWritable to declare the classes as subtypes
      of Writable (Espen Amble Kolstad via omalley)
 
+104. HADOOP-1576.  Fix errors in count of completed tasks when
+     speculative execution is enabled.  (Arun C Murthy via cutting)
+
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

+ 5 - 13
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -745,19 +745,11 @@ class JobInProgress {
   throws IOException {
     String taskid = status.getTaskId();
         
-    // Sanity check: is the TIP already complete?
+    // Sanity check: is the TIP already complete? 
     if (tip.isComplete()) {
-      LOG.info("Already complete TIP " + tip.getTIPId() + 
-               " has completed task " + taskid);
-          
-      // Just mark this 'task' as complete
-      try {
-        tip.alreadyCompletedTask(taskid);
-      } catch (IOException ioe) {
-        LOG.info("Failed to discard output of " + taskid + " : " + 
-                StringUtils.stringifyException(ioe));
-      }
-          
+      // Mark this task as KILLED
+      tip.alreadyCompletedTask(taskid);
+
       // Let the JobTracker cleanup this taskid if the job isn't running
       if (this.status.getRunState() != JobStatus.RUNNING) {
         jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
@@ -905,7 +897,7 @@ class JobInProgress {
                           boolean wasRunning, boolean wasComplete,
                           JobTrackerMetrics metrics) {
     // Mark the taskid as a 'failure'
-    tip.incompleteSubTask(taskid, trackerName);
+    tip.incompleteSubTask(taskid, trackerName, this.status);
         
     boolean isRunning = tip.isRunning();
     boolean isComplete = tip.isComplete();

+ 56 - 23
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -328,6 +328,7 @@ class TaskInProgress {
 
   /**
    * Get the diagnostic messages for a given task within this tip.
+   * 
    * @param taskId the id of the required task
    * @return the list of diagnostics for that task
    */
@@ -339,6 +340,22 @@ class TaskInProgress {
   // Update methods, usually invoked by the owning
   // job.
   ////////////////////////////////////////////////
+  
+  /**
+   * Save diagnostic information for a given task.
+   * 
+   * @param taskId id of the task 
+   * @param diagInfo diagnostic information for the task
+   */
+  private void addDiagnosticInfo(String taskId, String diagInfo) {
+    List<String> diagHistory = taskDiagnosticData.get(taskId);
+    if (diagHistory == null) {
+      diagHistory = new ArrayList<String>();
+      taskDiagnosticData.put(taskId, diagHistory);
+    }
+    diagHistory.add(diagInfo);
+  }
+  
   /**
    * A status message from a client has arrived.
    * It updates the status of a single component-thread-task,
@@ -352,12 +369,7 @@ class TaskInProgress {
     boolean changed = true;
     if (diagInfo != null && diagInfo.length() > 0) {
       LOG.info("Error from "+taskid+": "+diagInfo);
-      List<String> diagHistory = taskDiagnosticData.get(taskid);
-      if (diagHistory == null) {
-        diagHistory = new ArrayList<String>();
-        taskDiagnosticData.put(taskid, diagHistory);
-      }
-      diagHistory.add(diagInfo);
+      addDiagnosticInfo(taskid, diagInfo);
     }
     if (oldStatus != null) {
       TaskStatus.State oldState = oldStatus.getRunState();
@@ -399,7 +411,8 @@ class TaskInProgress {
    * Indicate that one of the taskids in this TaskInProgress
    * has failed.
    */
-  public void incompleteSubTask(String taskid, String trackerName) {
+  public void incompleteSubTask(String taskid, String trackerName, 
+                                JobStatus jobStatus) {
     //
     // Note the failure and its location
     //
@@ -423,7 +436,12 @@ class TaskInProgress {
     }
 
     this.activeTasks.remove(taskid);
-    if (this.completes > 0 && this.isMapTask()) {
+    
+    // Since we do not fail completed reduces (whose outputs go to hdfs), we 
+    // should note this failure only for completed maps; however if the job
+    // is done, there is no need to manipulate completed maps
+    if (this.completes > 0 && this.isMapTask() && 
+        jobStatus.getRunState() != JobStatus.SUCCEEDED) {
       this.completes--;
     }
 
@@ -450,14 +468,25 @@ class TaskInProgress {
   }
 
   /**
-   * Indicate that one of the taskids in this TaskInProgress
-   * has successfully completed. 
+   * Finalize the <b>completed</b> task; note that this might not be the first 
+   * task-attempt of the {@link TaskInProgress} and hence might be declared 
+   * {@link TaskStatus.State.SUCCEEDED} or {@link TaskStatus.State.KILLED}
    * 
-   * However this may not be the first subtask in this 
-   * TaskInProgress to be completed and hence we might not want to 
-   * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
+   * @param taskId id of the completed task-attempt
+   * @param finalTaskState final {@link TaskStatus.State} of the task-attempt
    */
-  void alreadyCompletedTask(String taskid) throws IOException {
+  private void completedTask(String taskId, TaskStatus.State finalTaskState) {
+    TaskStatus status = taskStatuses.get(taskId);
+    status.setRunState(finalTaskState);
+    activeTasks.remove(taskId);
+  }
+  
+  /**
+   * Indicate that one of the taskids in this already-completed
+   * TaskInProgress has successfully completed; hence we mark this
+   * taskid as {@link TaskStatus.State.KILLED}. 
+   */
+  void alreadyCompletedTask(String taskid) {
     Task t = tasks.get(taskid);
     try {
       t.discardTaskOutput();
@@ -465,16 +494,17 @@ class TaskInProgress {
       LOG.info("Failed to discard output of task '" + taskid + "' with " + 
               StringUtils.stringifyException(ioe));
     }
-    completedTask(taskid);
-  }
 
-  void completedTask(String taskid) {
-    TaskStatus status = taskStatuses.get(taskid);
-    status.setRunState(TaskStatus.State.SUCCEEDED);
-    activeTasks.remove(taskid);
-    LOG.info("Task '" + taskid + "' has completed.");
-  }
+    // 'KILL' the task 
+    completedTask(taskid, TaskStatus.State.KILLED);
+    
+    // Note the reason for the task being 'KILLED'
+    addDiagnosticInfo(taskid, "Already completed TIP");
     
+    LOG.info("Already complete TIP " + getTIPId() + 
+             " has completed task " + taskid);
+  }
+
   /**
    * Indicate that one of the taskids in this TaskInProgress
    * has successfully completed!
@@ -500,7 +530,8 @@ class TaskInProgress {
     //
     // Record that this taskid is complete
     //
-    completedTask(taskid);
+    completedTask(taskid, TaskStatus.State.SUCCEEDED);
+    
         
     //
     // Now that the TIP is complete, the other speculative 
@@ -510,6 +541,8 @@ class TaskInProgress {
 
     this.completes++;
     recomputeProgress();
+    
+    LOG.info("Task '" + taskid + "' has completed succesfully");
   }
 
   /**