浏览代码

HADOOP-815. Fix memory leaks in JobTracker. Contributed by Arun.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@494158 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 18 年之前
父节点
当前提交
51e475f1ae

+ 2 - 0
CHANGES.txt

@@ -9,6 +9,8 @@ Trunk (unreleased changes)
  2. HADOOP-863.  Reduce logging verbosity introduced by HADOOP-813.
  2. HADOOP-863.  Reduce logging verbosity introduced by HADOOP-813.
     (Devaraj Das via cutting)
     (Devaraj Das via cutting)
 
 
+ 3. HADOOP-815.  Fix memory leaks in JobTracker. (Arun C Murthy via cutting)
+
 
 
 Release 0.10.0 - 2007-01-05
 Release 0.10.0 - 2007-01-05
 
 

+ 55 - 30
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -454,40 +454,52 @@ class JobInProgress {
                                            TaskStatus status,
                                            TaskStatus status,
                                            JobTrackerMetrics metrics) {
                                            JobTrackerMetrics metrics) {
         String taskid = status.getTaskId();
         String taskid = status.getTaskId();
+        
+        // Sanity check: is the TIP already complete?
         if (tip.isComplete()) {
         if (tip.isComplete()) {
           LOG.info("Already complete TIP " + tip.getTIPId() + 
           LOG.info("Already complete TIP " + tip.getTIPId() + 
-                   " has completed task " + taskid);
-          return;
-        } else {
-          LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
-                   " successfully.");          
-
-          String taskTrackerName = status.getTaskTracker();
+               " has completed task " + taskid);
+          
+          // Just mark this 'task' as complete
+          tip.completedTask(taskid);
           
           
-          if(status.getIsMap()){
-            JobHistory.MapAttempt.logStarted(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                taskTrackerName); 
-            JobHistory.MapAttempt.logFinished(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), status.getFinishTime(), 
-                taskTrackerName); 
-            JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
-                Values.MAP.name(), status.getFinishTime()); 
-          }else{
-              JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
-                  tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                  taskTrackerName); 
-              JobHistory.ReduceAttempt.logFinished(profile.getJobId(), 
-                  tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
-                  status.getSortFinishTime(), status.getFinishTime(), 
-                  taskTrackerName); 
-              JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
-                  Values.REDUCE.name(), status.getFinishTime()); 
+          // Let the JobTracker cleanup this taskid if the job isn't running
+          if (this.status.getRunState() != JobStatus.RUNNING) {
+            jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
           }
           }
+          return;
+        } 
+
+        LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
+          " successfully.");          
+
+        // Update jobhistory 
+        String taskTrackerName = status.getTaskTracker();
+        if(status.getIsMap()){
+          JobHistory.MapAttempt.logStarted(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+               taskTrackerName); 
+          JobHistory.MapAttempt.logFinished(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getFinishTime(), 
+               taskTrackerName); 
+          JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
+               Values.MAP.name(), status.getFinishTime()); 
+        }else{
+          JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+               taskTrackerName); 
+          JobHistory.ReduceAttempt.logFinished(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
+               status.getSortFinishTime(), status.getFinishTime(), 
+               taskTrackerName); 
+          JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
+               Values.REDUCE.name(), status.getFinishTime()); 
         }
         }
         
         
+        // Mark the TIP as complete
         tip.completed(taskid);
         tip.completed(taskid);
-        // updating the running/finished map/reduce counts
+        
+        // Update the running/finished map/reduce counts
         if (tip.isMapTask()){
         if (tip.isMapTask()){
           runningMapTasks -= 1;
           runningMapTasks -= 1;
           finishedMapTasks += 1;
           finishedMapTasks += 1;
@@ -533,6 +545,10 @@ class JobInProgress {
             JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, 
             JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, 
                 this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
                 this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
             metrics.completeJob();
             metrics.completeJob();
+        } else if (this.status.getRunState() != JobStatus.RUNNING) {
+            // The job has been killed/failed, 
+            // JobTracker should cleanup this task
+            jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
         }
         }
     }
     }
 
 
@@ -541,6 +557,7 @@ class JobInProgress {
      */
      */
     public synchronized void kill() {
     public synchronized void kill() {
         if (status.getRunState() != JobStatus.FAILED) {
         if (status.getRunState() != JobStatus.FAILED) {
+            LOG.info("Killing job '" + this.status.getJobId() + "'");
             this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);
             this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);
             this.finishTime = System.currentTimeMillis();
             this.finishTime = System.currentTimeMillis();
             this.runningMapTasks = 0;
             this.runningMapTasks = 0;
@@ -575,7 +592,9 @@ class JobInProgress {
     private void failedTask(TaskInProgress tip, String taskid, 
     private void failedTask(TaskInProgress tip, String taskid, 
                             TaskStatus status, String trackerName,
                             TaskStatus status, String trackerName,
                             boolean wasRunning, boolean wasComplete) {
                             boolean wasRunning, boolean wasComplete) {
+        // Mark the taskid as a 'failure'
         tip.failedSubTask(taskid, trackerName);
         tip.failedSubTask(taskid, trackerName);
+        
         boolean isRunning = tip.isRunning();
         boolean isRunning = tip.isRunning();
         boolean isComplete = tip.isComplete();
         boolean isComplete = tip.isComplete();
         
         
@@ -621,6 +640,11 @@ class JobInProgress {
           failedReduceTasks++; 
           failedReduceTasks++; 
         }
         }
             
             
+        //
+        // Let the JobTracker know that this task has failed
+        //
+        jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+
         //
         //
         // Check if we need to kill the job because of too many failures
         // Check if we need to kill the job because of too many failures
         //
         //
@@ -633,9 +657,7 @@ class JobInProgress {
                 System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
                 System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
             kill();
             kill();
         }
         }
-
-        jobtracker.removeTaskEntry(taskid);
- }
+    }
 
 
     /**
     /**
      * Fail a task with a given reason, but without a status object.
      * Fail a task with a given reason, but without a status object.
@@ -669,6 +691,9 @@ class JobInProgress {
      * from the various tables.
      * from the various tables.
      */
      */
     synchronized void garbageCollect() {
     synchronized void garbageCollect() {
+      // Let the JobTracker know that a job is complete
+      jobtracker.finalizeJob(this);
+      
       try {
       try {
         // Definitely remove the local-disk copy of the job file
         // Definitely remove the local-disk copy of the job file
         if (localJobFile != null) {
         if (localJobFile != null) {

+ 278 - 37
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -46,6 +46,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     static float PAD_FRACTION;
     static float PAD_FRACTION;
     static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
     static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
 
 
+    /**
+     * The maximum no. of 'completed' (successful/failed/killed)
+     * jobs kept in memory per-user. 
+     */
+    static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100;
+    
     /**
     /**
      * Used for formatting the id numbers
      * Used for formatting the id numbers
      */
      */
@@ -289,10 +295,26 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                                 if (job.getStatus().getRunState() != JobStatus.RUNNING &&
                                 if (job.getStatus().getRunState() != JobStatus.RUNNING &&
                                     job.getStatus().getRunState() != JobStatus.PREP &&
                                     job.getStatus().getRunState() != JobStatus.PREP &&
                                     (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
                                     (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
+                                    // Ok, this call to removeTaskEntries
+                                    // is dangerous in some very very obscure
+                                    // cases; e.g. when job completed, exceeded
+                                    // RETIRE_JOB_INTERVAL time-limit and yet
+                                    // some task (taskid) wasn't complete!
+                                    removeJobTasks(job);
+                                    
                                     it.remove();
                                     it.remove();
-                            
+                                    synchronized (userToJobsMap) {
+                                        ArrayList<JobInProgress> userJobs =
+                                            userToJobsMap.get(job.getProfile().getUser());
+                                        synchronized (userJobs) {
+                                            userJobs.remove(job);
+                                        }
+                                    }
                                     jobInitQueue.remove(job);
                                     jobInitQueue.remove(job);
                                     jobsByArrival.remove(job);
                                     jobsByArrival.remove(job);
+                                    
+                                    LOG.info("Retired job with id: '" + 
+                                            job.getProfile().getJobId() + "'");
                                 }
                                 }
                             }
                             }
                         }
                         }
@@ -418,6 +440,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     TreeMap jobs = new TreeMap();
     TreeMap jobs = new TreeMap();
     Vector jobsByArrival = new Vector();
     Vector jobsByArrival = new Vector();
 
 
+    // (user -> list of JobInProgress)
+    TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
+    
     // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
     // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
     Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
     Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
 
 
@@ -427,8 +452,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     // (trackerID->TreeSet of taskids running at that tracker)
     // (trackerID->TreeSet of taskids running at that tracker)
     TreeMap trackerToTaskMap = new TreeMap();
     TreeMap trackerToTaskMap = new TreeMap();
 
 
-    // (trackerID --> last sent HeartBeatResponseID)
-    Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap();
+    // (trackerID -> TreeSet of completed taskids running at that tracker)
+    TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
+
+    // (trackerID --> last sent HeartBeatResponse)
+    Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = 
+      new TreeMap();
     
     
     //
     //
     // Watch and expire TaskTracker objects using these structures.
     // Watch and expire TaskTracker objects using these structures.
@@ -644,18 +673,181 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         // taskid --> TIP
         // taskid --> TIP
         taskidToTIPMap.put(taskid, tip);
         taskidToTIPMap.put(taskid, tip);
     }
     }
+    
     void removeTaskEntry(String taskid) {
     void removeTaskEntry(String taskid) {
         // taskid --> tracker
         // taskid --> tracker
         String tracker = (String) taskidToTrackerMap.remove(taskid);
         String tracker = (String) taskidToTrackerMap.remove(taskid);
 
 
         // tracker --> taskid
         // tracker --> taskid
-        TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
-        if (trackerSet != null) {
-            trackerSet.remove(taskid);
+        if (tracker != null) {
+            TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+            if (trackerSet != null) {
+                trackerSet.remove(taskid);
+            }
         }
         }
 
 
         // taskid --> TIP
         // taskid --> TIP
         taskidToTIPMap.remove(taskid);
         taskidToTIPMap.remove(taskid);
+        
+        LOG.debug("Removing task '" + taskid + "'");
+    }
+    
+    /**
+     * Mark a 'task' for removal later.
+     * This function assumes that the JobTracker is locked on entry.
+     * 
+     * @param taskTracker the tasktracker at which the 'task' was running
+     * @param taskid completed (success/failure/killed) task
+     */
+    void markCompletedTaskAttempt(String taskTracker, String taskid) {
+      // tracker --> taskid
+      TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
+      if (taskset == null) {
+        taskset = new TreeSet();
+        trackerToMarkedTasksMap.put(taskTracker, taskset);
+      }
+      taskset.add(taskid);
+      
+      LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
+    }
+
+    /**
+     * Mark all 'non-running' jobs of the job for pruning.
+     * This function assumes that the JobTracker is locked on entry.
+     * 
+     * @param job the completed job
+     */
+    void markCompletedJob(JobInProgress job) {
+      for (TaskInProgress tip : job.getMapTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+            markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
+                    taskStatus.getTaskId());
+          }
+        }
+      }
+      for (TaskInProgress tip : job.getReduceTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+            markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
+                    taskStatus.getTaskId());
+          }
+        }
+      }
+    }
+    
+    /**
+     * Remove all 'marked' tasks running on a given {@link TaskTracker}
+     * from the {@link JobTracker}'s data-structures.
+     * This function assumes that the JobTracker is locked on entry.
+     * 
+     * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
+     */
+    private void removeMarkedTasks(String taskTracker) {
+      // Purge all the 'marked' tasks which were running at taskTracker
+      TreeSet<String> markedTaskSet = 
+        (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
+      if (markedTaskSet != null) {
+        for (String taskid : markedTaskSet) {
+          removeTaskEntry(taskid);
+          LOG.info("Removed completed task '" + taskid + "' from '" + 
+                  taskTracker + "'");
+        }
+        // Clear 
+        trackerToMarkedTasksMap.remove(taskTracker);
+      }
+    }
+    
+    /**
+     * Call {@link #removeTaskEntry(String)} for each of the
+     * job's tasks.
+     * When the JobTracker is retiring the long-completed
+     * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
+     * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs 
+     * has been reached, we can afford to nuke all it's tasks; a little
+     * unsafe, but practically feasible. 
+     * 
+     * @param job the job about to be 'retired'
+     */
+    synchronized private void removeJobTasks(JobInProgress job) { 
+      for (TaskInProgress tip : job.getMapTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          removeTaskEntry(taskStatus.getTaskId());
+        }
+      }
+      for (TaskInProgress tip : job.getReduceTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          removeTaskEntry(taskStatus.getTaskId());
+        }
+      }
+    }
+    
+    /**
+     * Safe clean-up all data structures at the end of the 
+     * job (success/failure/killed).
+     * Here we also ensure that for a given user we maintain 
+     * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs 
+     * on the JobTracker.
+     *  
+     * @param job completed job.
+     */
+    synchronized void finalizeJob(JobInProgress job) {
+      // Mark the 'non-running' tasks for pruning
+      markCompletedJob(job);
+      
+      // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
+      // in memory; information about the purged jobs is available via
+      // JobHistory.
+      synchronized (jobs) {
+        synchronized (jobsByArrival) {
+          synchronized (jobInitQueue) {
+            String jobUser = job.getProfile().getUser();
+            synchronized (userToJobsMap) {
+              ArrayList<JobInProgress> userJobs = 
+                userToJobsMap.get(jobUser);
+              synchronized (userJobs) {
+                while (userJobs.size() > 
+                MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+                  JobInProgress rjob = userJobs.get(0);
+                  
+                  // Do not delete 'current'
+                  // finished job just yet.
+                  if (rjob == job) {
+                    break;
+                  }
+                  
+                  // Cleanup all datastructures
+                  int rjobRunState = 
+                    rjob.getStatus().getRunState();
+                  if (rjobRunState == JobStatus.SUCCEEDED || 
+                          rjobRunState == JobStatus.FAILED) {
+                    // Ok, this call to removeTaskEntries
+                    // is dangerous is some very very obscure
+                    // cases; e.g. when rjob completed, hit
+                    // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
+                    // limit and yet some task (taskid)
+                    // wasn't complete!
+                    removeJobTasks(rjob);
+                    
+                    userJobs.remove(0);
+                    jobs.remove(rjob.getProfile().getJobId());
+                    jobInitQueue.remove(rjob);
+                    jobsByArrival.remove(rjob);
+                    
+                    LOG.info("Retired job with id: '" + 
+                            rjob.getProfile().getJobId() + "'");
+                  } else {
+                    // Do not remove jobs that aren't complete.
+                    // Stop here, and let the next pass take
+                    // care of purging jobs.
+                    break;
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
     }
     }
 
 
     ///////////////////////////////////////////////////////
     ///////////////////////////////////////////////////////
@@ -736,26 +928,46 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
     public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
             boolean initialContact, boolean acceptNewTasks, short responseId) 
             boolean initialContact, boolean acceptNewTasks, short responseId) 
     throws IOException {
     throws IOException {
-      LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
+        LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
               " (initialContact: " + initialContact + 
               " (initialContact: " + initialContact + 
               " acceptNewTasks: " + acceptNewTasks + ")" +
               " acceptNewTasks: " + acceptNewTasks + ")" +
               " with responseId: " + responseId);
               " with responseId: " + responseId);
       
       
         // First check if the last heartbeat response got through 
         // First check if the last heartbeat response got through 
         String trackerName = status.getTrackerName();
         String trackerName = status.getTrackerName();
-        Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName);
-      
-        short newResponseId = (short)(responseId + 1);
-        if (!initialContact && oldResponseId != null && 
-                oldResponseId.shortValue() != responseId) {
-            newResponseId = oldResponseId.shortValue();
+        HeartbeatResponse prevHeartbeatResponse =
+            trackerToHeartbeatResponseMap.get(trackerName);
+
+        if (initialContact != true) {
+            // If this isn't the 'initial contact' from the tasktracker,
+            // there is something seriously wrong if the JobTracker has
+            // no record of the 'previous heartbeat'; if so, ask the 
+            // tasktracker to re-initialize itself.
+            if (prevHeartbeatResponse == null) {
+                LOG.warn("Serious problem, cannot find record of 'previous' " +
+                    "heartbeat for '" + trackerName + 
+                    "'; reinitializing the tasktracker");
+                return new HeartbeatResponse(responseId, 
+                        new TaskTrackerAction[] {new ReinitTrackerAction()});
+
+            }
+                
+            // It is completely safe to ignore a 'duplicate' from a tracker
+            // since we are guaranteed that the tracker sends the same 
+            // 'heartbeat' when rpcs are lost. 
+            // {@see TaskTracker.transmitHeartbeat()}
+            if (prevHeartbeatResponse.getResponseId() != responseId) {
+                LOG.info("Ignoring 'duplicate' heartbeat from '" + 
+                        trackerName + "'");
+                return prevHeartbeatResponse;
+            }
         }
         }
       
       
         // Process this heartbeat 
         // Process this heartbeat 
-        if (!processHeartbeat(status, initialContact, 
-                (newResponseId != responseId))) {
-            if (oldResponseId != null) {
-                trackerToHeartbeatResponseIDMap.remove(trackerName);
+        short newResponseId = (short)(responseId + 1);
+        if (!processHeartbeat(status, initialContact)) {
+            if (prevHeartbeatResponse != null) {
+                trackerToHeartbeatResponseMap.remove(trackerName);
             }
             }
 
 
             return new HeartbeatResponse(newResponseId, 
             return new HeartbeatResponse(newResponseId, 
@@ -784,12 +996,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         response.setActions(
         response.setActions(
                 actions.toArray(new TaskTrackerAction[actions.size()]));
                 actions.toArray(new TaskTrackerAction[actions.size()]));
         
         
-        // Update the trackerToHeartbeatResponseIDMap
-        if (newResponseId != responseId) {
-            trackerToHeartbeatResponseIDMap.put(trackerName, 
-                    new Short(newResponseId));
-        }
+        // Update the trackerToHeartbeatResponseMap
+        trackerToHeartbeatResponseMap.put(trackerName, response);
 
 
+        // Done processing the hearbeat, now remove 'marked' tasks
+        removeMarkedTasks(trackerName);
+        
         return response;
         return response;
     }
     }
     
     
@@ -824,12 +1036,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
      * Process incoming heartbeat messages from the task trackers.
      * Process incoming heartbeat messages from the task trackers.
      */
      */
     private synchronized boolean processHeartbeat(
     private synchronized boolean processHeartbeat(
-            TaskTrackerStatus trackerStatus, 
-            boolean initialContact, boolean updateStatusTimestamp) {
+            TaskTrackerStatus trackerStatus, boolean initialContact) {
         String trackerName = trackerStatus.getTrackerName();
         String trackerName = trackerStatus.getTrackerName();
-        if (initialContact || updateStatusTimestamp) {
-          trackerStatus.setLastSeen(System.currentTimeMillis());
-        }
+        trackerStatus.setLastSeen(System.currentTimeMillis());
 
 
         synchronized (taskTrackers) {
         synchronized (taskTrackers) {
             synchronized (trackerExpiryQueue) {
             synchronized (trackerExpiryQueue) {
@@ -857,7 +1066,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         }
         }
 
 
         updateTaskStatuses(trackerStatus);
         updateTaskStatuses(trackerStatus);
-        //LOG.info("Got heartbeat from "+trackerName);
+
         return true;
         return true;
     }
     }
 
 
@@ -1028,7 +1237,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                         killList.add(new KillTaskAction(killTaskId));
                         killList.add(new KillTaskAction(killTaskId));
                         LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
                         LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
                     } else {
                     } else {
-                      //killTasksList.add(new KillJobAction(taskId));
                         String killJobId = tip.getJob().getStatus().getJobId(); 
                         String killJobId = tip.getJob().getStatus().getJobId(); 
                         killJobIds.add(killJobId);
                         killJobIds.add(killJobId);
                     }
                     }
@@ -1051,14 +1259,28 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
      * map task outputs.
      * map task outputs.
      */
      */
     public synchronized MapOutputLocation[] 
     public synchronized MapOutputLocation[] 
-             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {
-        ArrayList result = new ArrayList(mapTasksNeeded.length);
+             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) 
+    throws IOException {
+        // Check to make sure that the job hasn't 'completed'.
         JobInProgress job = getJob(jobId);
         JobInProgress job = getJob(jobId);
+        if (job.status.getRunState() != JobStatus.RUNNING) {
+          return new MapOutputLocation[0];
+        }
+        
+        ArrayList result = new ArrayList(mapTasksNeeded.length);
         for (int i = 0; i < mapTasksNeeded.length; i++) {
         for (int i = 0; i < mapTasksNeeded.length; i++) {
           TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
           TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
           if (status != null) {
           if (status != null) {
              String trackerId = 
              String trackerId = 
                (String) taskidToTrackerMap.get(status.getTaskId());
                (String) taskidToTrackerMap.get(status.getTaskId());
+             // Safety check, if we can't find the taskid in 
+             // taskidToTrackerMap and job isn't 'running', then just
+             // return an empty array
+             if (trackerId == null && 
+                     job.status.getRunState() != JobStatus.RUNNING) {
+               return new MapOutputLocation[0];
+             }
+             
              TaskTrackerStatus tracker;
              TaskTrackerStatus tracker;
              synchronized (taskTrackers) {
              synchronized (taskTrackers) {
                tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
                tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
@@ -1108,10 +1330,22 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         synchronized (jobs) {
         synchronized (jobs) {
             synchronized (jobsByArrival) {
             synchronized (jobsByArrival) {
                 synchronized (jobInitQueue) {
                 synchronized (jobInitQueue) {
-                    jobs.put(job.getProfile().getJobId(), job);
-                    jobsByArrival.add(job);
-                    jobInitQueue.add(job);
-                    jobInitQueue.notifyAll();
+                    synchronized (userToJobsMap) {
+                        jobs.put(job.getProfile().getJobId(), job);
+                        String jobUser = job.getProfile().getUser();
+                        if (!userToJobsMap.containsKey(jobUser)) {
+                            userToJobsMap.put(jobUser, 
+                                    new ArrayList<JobInProgress>());
+                        }
+                        ArrayList<JobInProgress> userJobs = 
+                            userToJobsMap.get(jobUser);
+                        synchronized (userJobs) {
+                            userJobs.add(job);
+                        }
+                        jobsByArrival.add(job);
+                        jobInitQueue.add(job);
+                        jobInitQueue.notifyAll();
+                    }
                 }
                 }
             }
             }
         }
         }
@@ -1271,8 +1505,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
      * jobs that might be affected.
      * jobs that might be affected.
      */
      */
     void updateTaskStatuses(TaskTrackerStatus status) {
     void updateTaskStatuses(TaskTrackerStatus status) {
-        for (Iterator it = status.taskReports(); it.hasNext(); ) {
-            TaskStatus report = (TaskStatus) it.next();
+        for (TaskStatus report : status.getTaskReports()) {
             report.setTaskTracker(status.getTrackerName());
             report.setTaskTracker(status.getTrackerName());
             String taskId = report.getTaskId();
             String taskId = report.getTaskId();
             TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
             TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
@@ -1310,8 +1543,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                                    TaskStatus.Phase.MAP, hostname, trackerName, 
                                    TaskStatus.Phase.MAP, hostname, trackerName, 
                                    myMetrics);
                                    myMetrics);
                   }
                   }
+                } else if (!tip.isMapTask() && tip.isComplete()) {
+                  // Completed 'reduce' task, not failed;
+                  // only removed from data-structures.
+                  markCompletedTaskAttempt(trackerName, taskId);
                 }
                 }
             }
             }
+            
+            // Purge 'marked' tasks, needs to be done  
+            // here to prevent hanging references!
+            removeMarkedTasks(trackerName);
         }
         }
     }
     }
 
 

+ 61 - 14
src/java/org/apache/hadoop/mapred/TaskInProgress.java

@@ -57,7 +57,6 @@ class TaskInProgress {
     private int partition;
     private int partition;
     private JobTracker jobtracker;
     private JobTracker jobtracker;
     private String id;
     private String id;
-    private String totalTaskIds[];
     private JobInProgress job;
     private JobInProgress job;
 
 
     // Status of the TIP
     // Status of the TIP
@@ -70,7 +69,13 @@ class TaskInProgress {
     private int completes = 0;
     private int completes = 0;
     private boolean failed = false;
     private boolean failed = false;
     private boolean killed = false;
     private boolean killed = false;
-    private TreeSet usableTaskIds = new TreeSet();
+
+    // The 'unique' prefix for taskids of this tip
+    String taskIdPrefix;
+    
+    // The 'next' usable taskid of this tip
+    int nextTaskId = 0;
+    
     // Map from task Id -> TaskTracker Id, contains tasks that are
     // Map from task Id -> TaskTracker Id, contains tasks that are
     // currently runnings
     // currently runnings
     private TreeMap<String, String> activeTasks = new TreeMap();
     private TreeMap<String, String> activeTasks = new TreeMap();
@@ -139,13 +144,8 @@ class TaskInProgress {
     void init(String jobUniqueString) {
     void init(String jobUniqueString) {
         this.startTime = System.currentTimeMillis();
         this.startTime = System.currentTimeMillis();
         this.runSpeculative = conf.getSpeculativeExecution();
         this.runSpeculative = conf.getSpeculativeExecution();
-        String uniqueString = makeUniqueString(jobUniqueString);
-        this.id = "tip_" + uniqueString;
-        this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];
-        for (int i = 0; i < totalTaskIds.length; i++) {
-          totalTaskIds[i] = "task_" + uniqueString + "_" + i;
-          usableTaskIds.add(totalTaskIds[i]);
-        }
+        this.taskIdPrefix = makeUniqueString(jobUniqueString);
+        this.id = "tip_" + this.taskIdPrefix;
     }
     }
 
 
     ////////////////////////////////////
     ////////////////////////////////////
@@ -180,11 +180,19 @@ class TaskInProgress {
     }
     }
     
     
     /**
     /**
+     * Is this tip complete?
+     * 
+     * @return <code>true</code> if the tip is complete, else <code>false</code>
      */
      */
     public boolean isComplete() {
     public boolean isComplete() {
         return (completes > 0);
         return (completes > 0);
     }
     }
+
     /**
     /**
+     * Is the given taskid in this tip complete?
+     * 
+     * @param taskid taskid of attempt to check for completion
+     * @return <code>true</code> if taskid is complete, else <code>false</code>
      */
      */
     public boolean isComplete(String taskid) {
     public boolean isComplete(String taskid) {
         TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
         TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
@@ -194,7 +202,11 @@ class TaskInProgress {
         return ((completes > 0) && 
         return ((completes > 0) && 
                 (status.getRunState() == TaskStatus.State.SUCCEEDED));
                 (status.getRunState() == TaskStatus.State.SUCCEEDED));
     }
     }
+
     /**
     /**
+     * Is the tip a failure?
+     * 
+     * @return <code>true</code> if tip has failed, else <code>false</code>
      */
      */
     public boolean isFailed() {
     public boolean isFailed() {
         return failed;
         return failed;
@@ -293,6 +305,17 @@ class TaskInProgress {
           TaskStatus.State oldState = oldStatus.getRunState();
           TaskStatus.State oldState = oldStatus.getRunState();
           TaskStatus.State newState = status.getRunState();
           TaskStatus.State newState = status.getRunState();
           
           
+          // We should never recieve a duplicate success/failure/killed
+          // status update for the same taskid! This is a safety check, 
+          // and is addressed better at the TaskTracker to ensure this.
+          // @see {@link TaskTracker.transmitHeartbeat()}
+          if ((newState != TaskStatus.State.RUNNING) && 
+                  (oldState == newState)) {
+              LOG.warn("Recieved duplicate status update of '" + newState + 
+                      "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+              return false;
+          }
+
           // The task is not allowed to move from completed back to running.
           // The task is not allowed to move from completed back to running.
           // We have seen out of order status messagesmoving tasks from complete
           // We have seen out of order status messagesmoving tasks from complete
           // to running. This is a spot fix, but it should be addressed more
           // to running. This is a spot fix, but it should be addressed more
@@ -346,14 +369,29 @@ class TaskInProgress {
 
 
     /**
     /**
      * Indicate that one of the taskids in this TaskInProgress
      * Indicate that one of the taskids in this TaskInProgress
-     * has successfully completed!
+     * has successfully completed. 
+     * 
+     * However this may not be the first subtask in this 
+     * TaskInProgress to be completed and hence we might not want to 
+     * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
      */
      */
-    public void completed(String taskid) {
+    void completedTask(String taskid) {
         LOG.info("Task '" + taskid + "' has completed.");
         LOG.info("Task '" + taskid + "' has completed.");
         TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
         TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
         status.setRunState(TaskStatus.State.SUCCEEDED);
         status.setRunState(TaskStatus.State.SUCCEEDED);
         activeTasks.remove(taskid);
         activeTasks.remove(taskid);
-
+    }
+    
+    /**
+     * Indicate that one of the taskids in this TaskInProgress
+     * has successfully completed!
+     */
+    public void completed(String taskid) {
+        //
+        // Record that this taskid is complete
+        //
+        completedTask(taskid);
+        
         //
         //
         // Now that the TIP is complete, the other speculative 
         // Now that the TIP is complete, the other speculative 
         // subtasks will be closed when the owning tasktracker 
         // subtasks will be closed when the owning tasktracker 
@@ -470,8 +508,17 @@ class TaskInProgress {
           execStartTime = System.currentTimeMillis();
           execStartTime = System.currentTimeMillis();
         }
         }
 
 
-        String taskid = (String) usableTaskIds.first();
-        usableTaskIds.remove(taskid);
+        // Create the 'taskid'
+        String taskid = null;
+        if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES)) {
+          taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
+          ++nextTaskId;
+        } else {
+          LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) + 
+                  " attempts for the tip '" + getTIPId() + "'");
+          return null;
+        }
+        
         String jobId = job.getProfile().getJobId();
         String jobId = job.getProfile().getJobId();
 
 
         if (isMapTask()) {
         if (isMapTask()) {

+ 38 - 12
src/java/org/apache/hadoop/mapred/TaskTracker.java

@@ -74,6 +74,16 @@ public class TaskTracker
     // last heartbeat response recieved
     // last heartbeat response recieved
     short heartbeatResponseId = -1;
     short heartbeatResponseId = -1;
 
 
+    /*
+     * This is the last 'status' report sent by this tracker to the JobTracker.
+     * 
+     * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
+     * indicating that a 'fresh' status report be generated; in the event the
+     * rpc calls fails for whatever reason, the previous status report is sent
+     * again.
+     */
+    TaskTrackerStatus status = null;
+    
     StatusHttpServer server = null;
     StatusHttpServer server = null;
     
     
     boolean shuttingDown = false;
     boolean shuttingDown = false;
@@ -249,6 +259,7 @@ public class TaskTracker
         this.mapTotal = 0;
         this.mapTotal = 0;
         this.reduceTotal = 0;
         this.reduceTotal = 0;
         this.acceptNewTasks = true;
         this.acceptNewTasks = true;
+        this.status = null;
         
         
         this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
         this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
         this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
         this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
@@ -535,20 +546,27 @@ public class TaskTracker
      * @throws IOException
      * @throws IOException
      */
      */
     private HeartbeatResponse transmitHeartBeat() throws IOException {
     private HeartbeatResponse transmitHeartBeat() throws IOException {
+      // 
+      // Check if the last heartbeat got through... 
+      // if so then build the heartbeat information for the JobTracker;
+      // else resend the previous status information.
       //
       //
-      // Build the heartbeat information for the JobTracker
-      //
-      List<TaskStatus> taskReports = 
-        new ArrayList<TaskStatus>(runningTasks.size());
-      synchronized (this) {
-        for (TaskInProgress tip: runningTasks.values()) {
-          taskReports.add(tip.createStatus());
+      if (status == null) {
+        List<TaskStatus> taskReports = 
+          new ArrayList<TaskStatus>(runningTasks.size());
+        synchronized (this) {
+          for (TaskInProgress tip: runningTasks.values()) {
+            taskReports.add(tip.createStatus());
+          }
         }
         }
+        status = 
+          new TaskTrackerStatus(taskTrackerName, localHostname, 
+                  httpPort, taskReports, 
+                  failures); 
+      } else {
+        LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
+                "' with reponseId '" + heartbeatResponseId);
       }
       }
-      TaskTrackerStatus status = 
-        new TaskTrackerStatus(taskTrackerName, localHostname, 
-                httpPort, taskReports, 
-                failures); 
       
       
       //
       //
       // Check if we should ask for a new Task
       // Check if we should ask for a new Task
@@ -569,10 +587,14 @@ public class TaskTracker
       HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
       HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
               justStarted, askForNewTask, 
               justStarted, askForNewTask, 
               heartbeatResponseId);
               heartbeatResponseId);
+      
+      //
+      // The heartbeat got through successfully!
+      //
       heartbeatResponseId = heartbeatResponse.getResponseId();
       heartbeatResponseId = heartbeatResponse.getResponseId();
       
       
       synchronized (this) {
       synchronized (this) {
-        for (TaskStatus taskStatus : taskReports) {
+        for (TaskStatus taskStatus : status.getTaskReports()) {
           if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
           if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
             if (taskStatus.getIsMap()) {
             if (taskStatus.getIsMap()) {
               mapTotal--;
               mapTotal--;
@@ -584,6 +606,10 @@ public class TaskTracker
           }
           }
         }
         }
       }
       }
+
+      // Force a rebuild of 'status' on the next iteration
+      status = null;                                
+
       return heartbeatResponse;
       return heartbeatResponse;
     }
     }
 
 

+ 13 - 0
src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java

@@ -98,11 +98,24 @@ class TaskTrackerStatus implements Writable {
      * All current tasks at the TaskTracker.  
      * All current tasks at the TaskTracker.  
      *
      *
      * Tasks are tracked by a TaskStatus object.
      * Tasks are tracked by a TaskStatus object.
+     * 
+     * @deprecated use {@link #getTaskReports()} instead
      */
      */
     public Iterator taskReports() {
     public Iterator taskReports() {
         return taskReports.iterator();
         return taskReports.iterator();
     }
     }
 
 
+    /**
+     * Get the current tasks at the TaskTracker.
+     * Tasks are tracked by a {@link TaskStatus} object.
+     * 
+     * @return a list of {@link TaskStatus} representing 
+     *         the current tasks at the TaskTracker.
+     */
+    public List<TaskStatus> getTaskReports() {
+      return taskReports;
+    }
+    
     /**
     /**
      * Return the current MapTask count
      * Return the current MapTask count
      */
      */