Browse Source

HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when a job finishes. This fixes a bunch of problems to do with NPE when a completed job is not in memory and a tasktracker comes to the jobtracker with a status report of a task belonging to that job. Contributed by Amar Kamat.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@746233 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 years ago
parent
commit
d1bd7b37b9

+ 5 - 0
CHANGES.txt

@@ -813,6 +813,11 @@ Release 0.20.0 - Unreleased
     (HADOOP-5234) and NPE in handling KillTaskAction of a cleanup task (HADOOP-5235).
     (Amareshwari Sriramadasu via ddas)
 
+    HADOOP-5247. Introduces a broadcast of KillJobAction to all trackers when
+    a job finishes. This fixes a bunch of problems to do with NPE when a completed
+    job is not in memory and a tasktracker comes to the jobtracker with a status
+    report of a task belonging to that job. (Amar Kamat via ddas)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 1 - 4
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -2609,9 +2609,6 @@ class JobInProgress {
   }
 
   boolean isComplete() {
-    int runState = this.status.getRunState();
-    return runState == JobStatus.SUCCEEDED 
-           || runState == JobStatus.FAILED 
-           || runState == JobStatus.KILLED;
+    return status.isJobComplete();
   }
 }

+ 8 - 0
src/mapred/org/apache/hadoop/mapred/JobStatus.java

@@ -270,6 +270,14 @@ public class JobStatus implements Writable, Cloneable {
      priority = jp;
    }
   
+   /**
+    * Returns true if the status is for a completed job.
+    */
+   public synchronized boolean isJobComplete() {
+     return (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED 
+             || runState == JobStatus.KILLED);
+   }
+
   ///////////////////////////////////////
   // Writable
   ///////////////////////////////////////

+ 69 - 9
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1219,6 +1219,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
   TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
     new TreeMap<String, ArrayList<JobInProgress>>();
     
+  // (trackerID --> list of jobs to cleanup)
+  Map<String, Set<JobID>> trackerToJobsToCleanup = 
+    new HashMap<String, Set<JobID>>();
+  
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
@@ -1835,6 +1839,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
 
     long now = System.currentTimeMillis();
     
+    // mark the job for cleanup at all the trackers
+    addJobForCleanup(id);
+
     // add the blacklisted trackers to potentially faulty list
     if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
       if (job.getNoOfBlackListedTrackers() > 0) {
@@ -2320,6 +2327,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       actions.addAll(killTasksList);
     }
      
+    // Check for jobs to be killed/cleanedup
+    List<TaskTrackerAction> killJobsList = getJobsForCleanup(trackerName);
+    if (killJobsList != null) {
+      actions.addAll(killJobsList);
+    }
+
     // Check for tasks whose outputs can be saved
     List<TaskTrackerAction> commitTasksList = getTasksToSave(status);
     if (commitTasksList != null) {
@@ -2496,27 +2509,58 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     Set<TaskAttemptID> taskIds = trackerToTaskMap.get(taskTracker);
     if (taskIds != null) {
       List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
-      Set<JobID> killJobIds = new TreeSet<JobID>(); 
       for (TaskAttemptID killTaskId : taskIds) {
         TaskInProgress tip = taskidToTIPMap.get(killTaskId);
+        if (tip == null) {
+          continue;
+        }
         if (tip.shouldClose(killTaskId)) {
           // 
           // This is how the JobTracker ends a task at the TaskTracker.
           // It may be successfully completed, or may be killed in
           // mid-execution.
           //
-          if (tip.getJob().getStatus().getRunState() == JobStatus.RUNNING ||
-              tip.getJob().getStatus().getRunState() == JobStatus.PREP) {
+          if (!tip.getJob().isComplete()) {
             killList.add(new KillTaskAction(killTaskId));
             LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
-          } else {
-            JobID killJobId = tip.getJob().getStatus().getJobID(); 
-            killJobIds.add(killJobId);
           }
         }
       }
             
-      for (JobID killJobId : killJobIds) {
+      return killList;
+    }
+    return null;
+  }
+
+  /**
+   * Add a job to cleanup for the tracker.
+   */
+  private void addJobForCleanup(JobID id) {
+    for (String taskTracker : taskTrackers.keySet()) {
+      LOG.debug("Marking job " + id + " for cleanup by tracker " + taskTracker);
+      synchronized (trackerToJobsToCleanup) {
+        Set<JobID> jobsToKill = trackerToJobsToCleanup.get(taskTracker);
+        if (jobsToKill == null) {
+          jobsToKill = new HashSet<JobID>();
+          trackerToJobsToCleanup.put(taskTracker, jobsToKill);
+        }
+        jobsToKill.add(id);
+      }
+    }
+  }
+  
+  /**
+   * A tracker wants to know if any job needs cleanup because the job completed.
+   */
+  private List<TaskTrackerAction> getJobsForCleanup(String taskTracker) {
+    Set<JobID> jobs = null;
+    synchronized (trackerToJobsToCleanup) {
+      jobs = trackerToJobsToCleanup.remove(taskTracker);
+    }
+    if (jobs != null) {
+      // prepare the actions list
+      List<TaskTrackerAction> killList = new ArrayList<TaskTrackerAction>();
+      for (JobID killJobId : jobs) {
         killList.add(new KillJobAction(killJobId));
         LOG.debug(taskTracker + " -> KillJobAction: " + killJobId);
       }
@@ -2538,6 +2582,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
         if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
           TaskAttemptID taskId = taskStatus.getTaskID();
           TaskInProgress tip = taskidToTIPMap.get(taskId);
+          if (tip == null) {
+            continue;
+          }
           if (tip.shouldCommit(taskId)) {
             saveList.add(new CommitTaskAction(taskId));
             LOG.debug(tts.getTrackerName() + 
@@ -3091,16 +3138,23 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     for (TaskStatus report : status.getTaskReports()) {
       report.setTaskTracker(trackerName);
       TaskAttemptID taskId = report.getTaskID();
+      
+      // expire it
+      expireLaunchingTasks.removeTask(taskId);
+      
+      JobInProgress job = getJob(taskId.getJobID());
+      if (job == null) {
+        continue;
+      }
+      
       TaskInProgress tip = taskidToTIPMap.get(taskId);
       // Check if the tip is known to the jobtracker. In case of a restarted
       // jt, some tasks might join in later
       if (tip != null || hasRestarted()) {
-        JobInProgress job = getJob(taskId.getJobID());
         if (tip == null) {
           tip = job.getTaskInProgress(taskId.getTaskID());
           job.addRunningTaskToTIP(tip, taskId, status, false);
         }
-        expireLaunchingTasks.removeTask(taskId);
         
         // Update the job and inform the listeners if necessary
         JobStatus prevStatus = (JobStatus)job.getStatus().clone();
@@ -3148,6 +3202,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
    */
   void lostTaskTracker(String trackerName) {
     LOG.info("Lost tracker '" + trackerName + "'");
+    
+    // remove the tracker from the local structures
+    synchronized (trackerToJobsToCleanup) {
+      trackerToJobsToCleanup.remove(trackerName);
+    }
+    
     Set<TaskAttemptID> lostTasks = trackerToTaskMap.get(trackerName);
     trackerToTaskMap.remove(trackerName);