Browse Source

HADOOP-182. Fix problems related to lost task trackers.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@399833 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
af5832a247

+ 3 - 0
CHANGES.txt

@@ -184,6 +184,9 @@ Trunk (unreleased)
     every block in every file in the filesystem.  (Konstantin Shvachko
     via cutting)
 
+48. HADOOP-182.  Fix so that lost task trackers to not change the
+    status of reduce tasks or completed jobs.  Also fixes the progress
+    meter so that failed tasks are subtracted. (omalley via cutting)
 
 Release 0.1.1 - 2006-04-08
 

+ 0 - 6
src/examples/org/apache/hadoop/examples/ExampleDriver.java

@@ -16,12 +16,6 @@
 
 package org.apache.hadoop.examples;
 import org.apache.hadoop.util.ProgramDriver;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.TreeMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Map.Entry;
 
 public class ExampleDriver {
   

+ 0 - 1
src/examples/org/apache/hadoop/examples/WordCount.java

@@ -19,7 +19,6 @@ package org.apache.hadoop.examples;
 import java.io.*;
 import java.util.*;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.UTF8;

+ 3 - 3
src/java/org/apache/hadoop/dfs/DFSck.java

@@ -333,9 +333,9 @@ public class DFSck {
       e.printStackTrace();
       success = false;
     } finally {
-      try {in.close(); } catch (Exception e1) {};
-      try {out.close(); } catch (Exception e1) {};
-      try {s.close(); } catch (Exception e1) {};
+      try {in.close(); } catch (Exception e1) {}
+      try {out.close(); } catch (Exception e1) {}
+      try {s.close(); } catch (Exception e1) {}
     }
     if (!success)
       throw new Exception("Could not copy block data for " + lblock.getBlock().getBlockName());

+ 0 - 1
src/java/org/apache/hadoop/dfs/DistributedFileSystem.java

@@ -18,7 +18,6 @@ package org.apache.hadoop.dfs;
 
 import java.io.*;
 import java.net.*;
-import java.util.*;
 
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;

+ 38 - 6
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -95,7 +95,7 @@ class JobInProgress {
      * Construct the splits, etc.  This is invoked from an async
      * thread so that split-computation doesn't block anyone.
      */
-    public void initTasks() throws IOException {
+    public synchronized void initTasks() throws IOException {
         if (tasksInited) {
             return;
         }
@@ -243,9 +243,12 @@ class JobInProgress {
     ////////////////////////////////////////////////////
     // Status update methods
     ////////////////////////////////////////////////////
-    public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
+    public synchronized void updateTaskStatus(TaskInProgress tip, 
+                                              TaskStatus status) {
         double oldProgress = tip.getProgress();   // save old progress
         tip.updateStatus(status);                 // update tip
+        LOG.fine("Taking progress for " + tip.getTIPId() + " from " + 
+                 oldProgress + " to " + tip.getProgress());
 
         //
         // Update JobInProgress status
@@ -416,7 +419,10 @@ class JobInProgress {
     /**
      * A taskid assigned to this JobInProgress has reported in successfully.
      */
-    public synchronized void completedTask(TaskInProgress tip, String taskid) {
+    public synchronized void completedTask(TaskInProgress tip, 
+                                           TaskStatus status) {
+        String taskid = status.getTaskId();
+        updateTaskStatus(tip, status);
         LOG.info("Taskid '" + taskid + "' has finished successfully.");
         tip.completed(taskid);
 
@@ -443,7 +449,8 @@ class JobInProgress {
         // If all tasks are complete, then the job is done!
         //
         if (status.getRunState() == JobStatus.RUNNING && allDone) {
-            this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.SUCCEEDED);
+            this.status = new JobStatus(this.status.getJobId(), 1.0f, 1.0f, 
+                                        JobStatus.SUCCEEDED);
             this.finishTime = System.currentTimeMillis();
             garbageCollect();
         }
@@ -483,8 +490,10 @@ class JobInProgress {
      * we need to schedule reexecution so that downstream reduce tasks can 
      * obtain the map task's output.
      */
-    public void failedTask(TaskInProgress tip, String taskid, String trackerName) {
+    public synchronized void failedTask(TaskInProgress tip, String taskid, 
+                                        TaskStatus status, String trackerName) {
         tip.failedSubTask(taskid, trackerName);
+        updateTaskStatus(tip, status);
         
         // After this, try to assign tasks with the one after this, so that
         // the failed task goes to the end of the list.
@@ -501,8 +510,31 @@ class JobInProgress {
             LOG.info("Aborting job " + profile.getJobId());
             kill();
         }
-    }
 
+        jobtracker.removeTaskEntry(taskid);
+ }
+
+    /**
+     * Fail a task with a given reason, but without a status object.
+     * @author Owen O'Malley
+     * @param tip The task's tip
+     * @param taskid The task id
+     * @param reason The reason that the task failed
+     * @param trackerName The task tracker the task failed on
+     */
+    public void failedTask(TaskInProgress tip, String taskid, 
+                           String reason, String hostname, String trackerName) {
+       TaskStatus status = new TaskStatus(taskid,
+                                          tip.isMapTask(),
+                                          0.0f,
+                                          TaskStatus.FAILED,
+                                          reason,
+                                          reason,
+                                          hostname);
+       failedTask(tip, taskid, status, trackerName);
+    }
+       
+                           
     /**
      * The job is dead.  We're now GC'ing it, getting rid of the job
      * from all tables.  Be sure to remove all of this job's tasks

+ 20 - 34
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -95,8 +95,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
        * map: task-id (String) -> time-assigned (Long)
        */
       private Map launchingTasks = new LinkedHashMap();
-      private static final String errorMsg = "Error launching task";
-      private static final String errorHost = "n/a";
       
       public void run() {
         try {
@@ -119,21 +117,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                     tip = (TaskInProgress) taskidToTIPMap.get(taskId);
                   }
                   if (tip != null) {
-                    synchronized (tip) {
-                      JobInProgress job = tip.getJob();
-                      // record why the job failed, so that the user can
-                      // see the problem
-                      TaskStatus status = 
-                        new TaskStatus(taskId,
-                                       tip.isMapTask(),
-                                       0.0f,
-                                       TaskStatus.FAILED,
-                                       errorMsg,
-                                       errorMsg,
-                                       errorHost);
-                      tip.updateStatus(status);
-                      job.failedTask(tip, taskId, errorHost);
-                    }
+                     JobInProgress job = tip.getJob();
+                     job.failedTask(tip, taskId, "Error launching task", 
+                                    "n/a", "n/a");
                   }
                   itr.remove();
                 } else {
@@ -214,7 +200,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                                 if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
                                     // Remove completely
                                     updateTaskTrackerStatus(trackerName, null);
-                                    lostTaskTracker(leastRecent.getTrackerName());
+                                    lostTaskTracker(leastRecent.getTrackerName(),
+                                                    leastRecent.getHost());
                                 } else {
                                     // Update time by inserting latest profile
                                     trackerExpiryQueue.add(newProfile);
@@ -582,14 +569,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     ////////////////////////////////////////////////////
     // InterTrackerProtocol
     ////////////////////////////////////////////////////
-    public void initialize(String taskTrackerName) {
-      synchronized (taskTrackers) {
-        boolean seenBefore = updateTaskTrackerStatus(taskTrackerName, null);
-        if (seenBefore) {
-          lostTaskTracker(taskTrackerName);
-        }
-      }
-    }
 
     /**
      * Update the last recorded status for the given task tracker.
@@ -632,7 +611,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 if (initialContact) {
                     // If it's first contact, then clear out any state hanging around
                     if (seenBefore) {
-                        lostTaskTracker(trackerName);
+                        lostTaskTracker(trackerName, trackerStatus.getHost());
                     }
                 } else {
                     // If not first contact, there should be some record of the tracker
@@ -981,13 +960,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
             } else {
                 expireLaunchingTasks.removeTask(taskId);
                 JobInProgress job = tip.getJob();
-                job.updateTaskStatus(tip, report);
 
                 if (report.getRunState() == TaskStatus.SUCCEEDED) {
-                    job.completedTask(tip, report.getTaskId());
+                    job.completedTask(tip, report);
                 } else if (report.getRunState() == TaskStatus.FAILED) {
                     // Tell the job to fail the relevant task
-                    job.failedTask(tip, report.getTaskId(), status.getTrackerName());
+                    job.failedTask(tip, report.getTaskId(), report, 
+                                   status.getTrackerName());
                 }
             }
         }
@@ -998,7 +977,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
      * already been updated.  Just process the contained tasks and any
      * jobs that might be affected.
      */
-    void lostTaskTracker(String trackerName) {
+    void lostTaskTracker(String trackerName, String hostname) {
         LOG.info("Lost tracker '" + trackerName + "'");
         TreeSet lostTasks = (TreeSet) trackerToTaskMap.get(trackerName);
         trackerToTaskMap.remove(trackerName);
@@ -1008,9 +987,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                 String taskId = (String) it.next();
                 TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
 
-                // Tell the job to fail the relevant task
-                JobInProgress job = tip.getJob();
-                job.failedTask(tip, taskId, trackerName);
+                // Completed reduce tasks never need to be failed, because 
+                // their outputs go to dfs
+                if (tip.isMapTask() || !tip.isComplete()) {
+                  JobInProgress job = tip.getJob();
+                  // if the job is done, we don't want to change anything
+                  if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+                    job.failedTask(tip, taskId, "Lost task tracker", 
+                                   hostname, trackerName);
+                  }
+                }
             }
         }
     }

+ 0 - 8
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -15,11 +15,8 @@
  */
 package org.apache.hadoop.mapred;
 
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.LogFormatter;
 
-import java.io.*;
 import java.text.NumberFormat;
 import java.util.*;
 import java.util.logging.*;
@@ -304,11 +301,6 @@ class TaskInProgress {
             kill();
         }
         machinesWhereFailed.add(trackerName);
-
-        // Ask JobTracker to forget about this task
-        jobtracker.removeTaskEntry(taskid);
-
-        recomputeProgress();
     }
 
     /**

+ 2 - 0
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -575,6 +575,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
                     failures += 1;
                   }
                   runstate = TaskStatus.FAILED;
+                  progress = 0.0f;
               }
               
               needCleanup = runstate == TaskStatus.FAILED;
@@ -627,6 +628,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutpu
             if (runstate == TaskStatus.SUCCEEDED) {
               LOG.info("Reporting output lost:"+task.getTaskId());
               runstate = TaskStatus.FAILED;       // change status to failure
+              progress = 0.0f;
               runningTasks.put(task.getTaskId(), this);
               mapTotal++;
             } else {