Parcourir la source

HADOOP-362. Fix a problem where jobs hung when status messages were recieved out of order. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@427239 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting il y a 19 ans
Parent
commit
cb14d7b53c

+ 3 - 0
CHANGES.txt

@@ -105,6 +105,9 @@ Trunk (unreleased changes)
     Also fix some build dependencies.
     (Mahadev & Konstantin via cutting)
 
+30. HADOOP-362.  Fix a problem where jobs hang when status messages
+    are recieved out-of-order.  (omalley via cutting)
+
 
 Release 0.4.0 - 2006-06-28
 

+ 63 - 46
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -19,6 +19,7 @@ import org.apache.commons.logging.*;
 
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.*;
+import org.apache.hadoop.mapred.JobTracker.JobTrackerMetrics;
 
 import java.io.*;
 import java.net.*;
@@ -266,15 +267,28 @@ class JobInProgress {
     // Status update methods
     ////////////////////////////////////////////////////
     public synchronized void updateTaskStatus(TaskInProgress tip, 
-                                              TaskStatus status) {
+                                              TaskStatus status,
+                                              JobTrackerMetrics metrics) {
+
         double oldProgress = tip.getProgress();   // save old progress
-        tip.updateStatus(status);                 // update tip
-        LOG.debug("Taking progress for " + tip.getTIPId() + " from " + 
-                 oldProgress + " to " + tip.getProgress());
+        boolean wasRunning = tip.isRunning();
+        boolean wasComplete = tip.isComplete();
+        boolean change = tip.updateStatus(status);
+        if (change) {
+          if (status.getRunState() == TaskStatus.SUCCEEDED) {
+            completedTask(tip, status, metrics);
+          } else if (status.getRunState() == TaskStatus.FAILED) {
+            // Tell the job to fail the relevant task
+            failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
+                       wasRunning, wasComplete);
+          }          
+        }
 
         //
         // Update JobInProgress status
         //
+        LOG.debug("Taking progress for " + tip.getTIPId() + " from " + 
+                  oldProgress + " to " + tip.getProgress());
         double progressDelta = tip.getProgress() - oldProgress;
         if (tip.isMapTask()) {
           if (maps.length == 0) {
@@ -464,25 +478,28 @@ class JobInProgress {
      * A taskid assigned to this JobInProgress has reported in successfully.
      */
     public synchronized void completedTask(TaskInProgress tip, 
-                                           TaskStatus status) {
+                                           TaskStatus status,
+                                           JobTrackerMetrics metrics) {
         String taskid = status.getTaskId();
-        boolean oldDone = tip.isComplete();
-        updateTaskStatus(tip, status);
-        LOG.info("Taskid '" + taskid + "' has finished successfully.");
+        if (tip.isComplete()) {
+          LOG.info("Already complete TIP " + tip.getTIPId() + 
+                   " has completed task " + taskid);
+          return;
+        } else {
+          LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
+                   " successfully.");          
+        }
+        
         tip.completed(taskid);
-        boolean newDone = tip.isComplete();
         // updating the running/finished map/reduce counts
-        if (oldDone != newDone) {
-            if (newDone) {  
-                if (tip.isMapTask()){
-                    runningMapTasks -= 1;
-                    finishedMapTasks += 1;
-                }
-                else{
-                    runningReduceTasks -= 1;
-                    finishedReduceTasks += 1;
-                }    
-            }
+        if (tip.isMapTask()){
+          runningMapTasks -= 1;
+          finishedMapTasks += 1;
+          metrics.completeMap();
+        } else{
+          runningReduceTasks -= 1;
+          finishedReduceTasks += 1;
+          metrics.completeReduce();
         }
         
         //
@@ -508,10 +525,12 @@ class JobInProgress {
         // If all tasks are complete, then the job is done!
         //
         if (status.getRunState() == JobStatus.RUNNING && allDone) {
-            this.status = new JobStatus(this.status.getJobId(), 1.0f, 1.0f, 
-                                        JobStatus.SUCCEEDED);
+            this.status.runState = JobStatus.SUCCEEDED;
             this.finishTime = System.currentTimeMillis();
             garbageCollect();
+            LOG.info("Job " + this.status.getJobId() + 
+                     " has completed successfully.");
+            metrics.completeJob();
         }
     }
 
@@ -550,33 +569,29 @@ class JobInProgress {
      * we need to schedule reexecution so that downstream reduce tasks can 
      * obtain the map task's output.
      */
-    public synchronized void failedTask(TaskInProgress tip, String taskid, 
-                                        TaskStatus status, String trackerName) {
-        boolean oldStatus = tip.isRunning();
-        boolean oldRun = tip.isComplete();
+    private void failedTask(TaskInProgress tip, String taskid, 
+                            TaskStatus status, String trackerName,
+                            boolean wasRunning, boolean wasComplete) {
         tip.failedSubTask(taskid, trackerName);
-        updateTaskStatus(tip, status);
-        boolean newStatus = tip.isRunning();
-        boolean newRun = tip.isComplete();
+        boolean isRunning = tip.isRunning();
+        boolean isComplete = tip.isComplete();
+        
         //update running  count on task failure.
-        if (oldStatus != newStatus) {
-           if (!newStatus) {
-              if (tip.isMapTask()){
-                  runningMapTasks -= 1;
-              }
-              else{
-                  runningReduceTasks -= 1;
-              }
-           }
+        if (wasRunning && !isRunning) {
+          if (tip.isMapTask()){
+            runningMapTasks -= 1;
+          } else {
+            runningReduceTasks -= 1;
+          }
         }
+        
         // the case when the map was complete but the task tracker went down.
-        if (oldRun != newRun) {
-            if (oldRun){
-                if (tip.isMapTask()){
-                    finishedMapTasks -= 1;
-                }
-            }
+        if (wasComplete && !isComplete) {
+          if (tip.isMapTask()){
+            finishedMapTasks -= 1;
+          }
         }
+        
         // After this, try to assign tasks with the one after this, so that
         // the failed task goes to the end of the list.
         if (tip.isMapTask()) {
@@ -605,7 +620,9 @@ class JobInProgress {
      * @param trackerName The task tracker the task failed on
      */
     public void failedTask(TaskInProgress tip, String taskid, 
-                           String reason, String hostname, String trackerName) {
+                           String reason, String hostname, 
+                           String trackerName,
+                           JobTrackerMetrics metrics) {
        TaskStatus status = new TaskStatus(taskid,
                                           tip.isMapTask(),
                                           0.0f,
@@ -613,7 +630,7 @@ class JobInProgress {
                                           reason,
                                           reason,
                                           trackerName);
-       failedTask(tip, taskid, status, trackerName);
+       updateTaskStatus(tip, status, metrics);
     }
        
                            

+ 4 - 21
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -138,7 +138,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                       TaskTrackerStatus trackerStatus = 
                         getTaskTracker(trackerName);
                       job.failedTask(tip, taskId, "Error launching task", 
-                                     trackerStatus.getHost(), trackerName);
+                                     trackerStatus.getHost(), trackerName,
+                                     myMetrics);
                     }
                     itr.remove();
                   } else {
@@ -1145,25 +1146,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskId());
             } else {
                 expireLaunchingTasks.removeTask(taskId);
-                JobInProgress job = tip.getJob();
-
-                if (report.getRunState() == TaskStatus.SUCCEEDED) {
-                    job.completedTask(tip, report);
-                    if (tip.isMapTask()) {
-                        myMetrics.completeMap();
-                    } else {
-                        myMetrics.completeReduce();
-                    }
-                    if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
-                        myMetrics.completeJob();
-                    }
-                } else if (report.getRunState() == TaskStatus.FAILED) {
-                    // Tell the job to fail the relevant task
-                    job.failedTask(tip, report.getTaskId(), report, 
-                                   status.getTrackerName());
-                } else {
-                    job.updateTaskStatus(tip, report);
-                }
+                tip.getJob().updateTaskStatus(tip, report, myMetrics);
             }
         }
     }
@@ -1190,7 +1173,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                   // if the job is done, we don't want to change anything
                   if (job.getStatus().getRunState() == JobStatus.RUNNING) {
                     job.failedTask(tip, taskId, "Lost task tracker", 
-                                   hostname, trackerName);
+                                   hostname, trackerName, myMetrics);
                   }
                 }
             }

+ 34 - 10
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -70,6 +70,9 @@ class TaskInProgress {
     private JobConf conf;
     private boolean runSpeculative;
     private TreeMap taskDiagnosticData = new TreeMap();
+    /**
+     * Map from taskId -> TaskStatus
+     */
     private TreeMap taskStatuses = new TreeMap();
 
     private TreeSet machinesWhereFailed = new TreeSet();
@@ -227,10 +230,10 @@ class TaskInProgress {
      * task ID and overall status, plus reports for all the
      * component task-threads that have ever been started.
      */
-    TaskReport generateSingleReport() {
+    synchronized TaskReport generateSingleReport() {
       ArrayList diagnostics = new ArrayList();
       for (Iterator i = taskDiagnosticData.values().iterator(); i.hasNext();) {
-        diagnostics.addAll((Vector)i.next());
+        diagnostics.addAll((List)i.next());
       }
       return new TaskReport
         (getTIPId(), (float)progress, state,
@@ -245,23 +248,44 @@ class TaskInProgress {
      * A status message from a client has arrived.
      * It updates the status of a single component-thread-task,
      * which might result in an overall TaskInProgress status update.
+     * @return has the task changed its state noticably?
      */
-    public void updateStatus(TaskStatus status) {
+    synchronized boolean updateStatus(TaskStatus status) {
         String taskid = status.getTaskId();
         String diagInfo = status.getDiagnosticInfo();
+        TaskStatus oldStatus = (TaskStatus) taskStatuses.get(taskid);
+        boolean changed = true;
         if (diagInfo != null && diagInfo.length() > 0) {
-            LOG.info("Error from "+taskid+": "+diagInfo);
-            Vector diagHistory = (Vector) taskDiagnosticData.get(taskid);
-            if (diagHistory == null) {
-                diagHistory = new Vector();
-                taskDiagnosticData.put(taskid, diagHistory);
-            }
-            diagHistory.add(diagInfo);
+          LOG.info("Error from "+taskid+": "+diagInfo);
+          List diagHistory = (List) taskDiagnosticData.get(taskid);
+          if (diagHistory == null) {
+              diagHistory = new ArrayList();
+              taskDiagnosticData.put(taskid, diagHistory);
+          }
+          diagHistory.add(diagInfo);
         }
+        if (oldStatus != null) {
+          int oldState = oldStatus.getRunState();
+          int newState = status.getRunState();
+          
+          // The task is not allowed to move from completed back to running.
+          // We have seen out of order status messagesmoving tasks from complete
+          // to running. This is a spot fix, but it should be addressed more
+          // globally.
+          if (newState == TaskStatus.RUNNING &&
+              (oldState == TaskStatus.FAILED || 
+               oldState == TaskStatus.SUCCEEDED)) {
+            return false;
+          }
+          
+          changed = oldState != newState;
+        }
+        
         taskStatuses.put(taskid, status);
 
         // Recompute progress
         recomputeProgress();
+        return changed;
     }
 
     /**

+ 15 - 9
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -357,15 +357,6 @@ public class TaskTracker
                     TaskInProgress tip = (TaskInProgress) it.next();
                     TaskStatus status = tip.createStatus();
                     taskReports.add(status);
-                    if (status.getRunState() != TaskStatus.RUNNING) {
-                        if (tip.getTask().isMapTask()) {
-                            mapTotal--;
-                        } else {
-                            reduceTotal--;
-                        }
-                        myMetrics.completeTask();
-                        it.remove();
-                    }
                 }
             }
 
@@ -378,6 +369,21 @@ public class TaskTracker
                                     httpPort, taskReports, 
                                     failures); 
             int resultCode = jobClient.emitHeartbeat(status, justStarted);
+            synchronized (this) {
+              for (Iterator it = taskReports.iterator();
+                   it.hasNext(); ) {
+                  TaskStatus taskStatus = (TaskStatus) it.next();
+                  if (taskStatus.getRunState() != TaskStatus.RUNNING) {
+                      if (taskStatus.getIsMap()) {
+                          mapTotal--;
+                      } else {
+                          reduceTotal--;
+                      }
+                      myMetrics.completeTask();
+                      runningTasks.remove(taskStatus.getTaskId());
+                  }
+              }
+            }
             justStarted = false;
               
             if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {