Explorar el Código

HADOOP-205. Incorporate pending tasksinto tasktracker load calculations. Contributed by Mahadev.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@409014 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting hace 19 años
padre
commit
432fe8789e

+ 4 - 0
CHANGES.txt

@@ -52,6 +52,10 @@ Trunk (unreleased)
     tasks and logs.  Also add log access to job tracker web interface.
     (omalley via cutting)
 
+14. HADOOP-205.  Incorporate pending tasks into tasktracker load
+    calculations.  (Mahadev Konar via cutting)
+
+
 Release 0.2.1 - 2006-05-12
 
  1. HADOOP-199.  Fix reduce progress (broken by HADOOP-182).

+ 78 - 21
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -42,7 +42,10 @@ class JobInProgress {
     TaskInProgress reduces[] = new TaskInProgress[0];
     int numMapTasks = 0;
     int numReduceTasks = 0;
-
+    int runningMapTasks = 0;
+    int runningReduceTasks = 0;
+    int finishedMapTasks = 0;
+    int finishedReduceTasks = 0;
     JobTracker jobtracker = null;
     HashMap hostToMaps = new HashMap();
 
@@ -201,27 +204,21 @@ class JobInProgress {
         return numMapTasks;
     }
     public int finishedMaps() {
-        int finishedCount = 0;
-        for (int i = 0; i < maps.length; i++) {
-            if (maps[i].isComplete()) {
-                finishedCount++;
-            }
-        }
-        return finishedCount;
+        return finishedMapTasks;
     }
     public int desiredReduces() {
         return numReduceTasks;
     }
+    public synchronized int runningMaps() {
+        return runningMapTasks;
+    }
+    public synchronized int runningReduces() {
+        return runningReduceTasks;
+    }
     public int finishedReduces() {
-        int finishedCount = 0;
-        for (int i = 0; i < reduces.length; i++) {
-            if (reduces[i].isComplete()) {
-                finishedCount++;
-            }
-        }
-        return finishedCount;
+        return finishedReduceTasks;
     }
-
+ 
     /**
      * Get the list of map tasks
      * @return the raw array of maps for this job
@@ -377,12 +374,24 @@ class JobInProgress {
         //
         if (cacheTarget >= 0) {
             t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress);
+            runningMapTasks += 1;
         } else if (stdTarget >= 0) {
             t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
-        } else if (specTarget >= 0) {
+            runningMapTasks += 1;
+	} else if (specTarget >= 0) {
+	    //should always be true, but being paranoid
+            boolean isRunning = maps[specTarget].isRunning(); 
             t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
+            if (!isRunning){
+                runningMapTasks += 1;
+            }
         } else if (failedTarget >= 0) {
+           //should always be false, but being paranoid again
+            boolean isRunning = maps[failedTarget].isRunning(); 
             t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress);
+            if (!isRunning) {
+                runningMapTasks += 1;
+	    }
         }
         return t;
     }
@@ -424,11 +433,21 @@ class JobInProgress {
         
         if (stdTarget >= 0) {
             t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
-        } else if (specTarget >= 0) {
+            runningReduceTasks += 1;
+	} else if (specTarget >= 0) {
+            //should be false
+            boolean isRunning = reduces[specTarget].isRunning();
             t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
+            if (!isRunning){
+               runningReduceTasks += 1;
+            }
         } else if (failedTarget >= 0) {
+            boolean isRunning = reduces[failedTarget].isRunning();
             t = reduces[failedTarget].getTaskToRun(taskTracker, tts, 
                                                    avgProgress);
+            if (!isRunning){
+                runningReduceTasks += 1;
+            }
         }
         return t;
     }
@@ -439,10 +458,25 @@ class JobInProgress {
     public synchronized void completedTask(TaskInProgress tip, 
                                            TaskStatus status) {
         String taskid = status.getTaskId();
+        boolean oldDone = tip.isComplete();
         updateTaskStatus(tip, status);
         LOG.info("Taskid '" + taskid + "' has finished successfully.");
         tip.completed(taskid);
-
+        boolean newDone = tip.isComplete();
+        // updating the running/finished map/reduce counts
+        if (oldDone != newDone) {
+            if (newDone) {  
+                if (tip.isMapTask()){
+                    runningMapTasks -= 1;
+                    finishedMapTasks += 1;
+                }
+                else{
+                    runningReduceTasks -= 1;
+                    finishedReduceTasks += 1;
+                }    
+            }
+        }
+        
         //
         // Figure out whether the Job is done
         //
@@ -480,7 +514,8 @@ class JobInProgress {
         if (status.getRunState() != JobStatus.FAILED) {
             this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);
             this.finishTime = System.currentTimeMillis();
-
+            this.runningMapTasks = 0;
+            this.runningReduceTasks = 0;
             //
             // kill all TIPs.
             //
@@ -509,9 +544,31 @@ class JobInProgress {
      */
     public synchronized void failedTask(TaskInProgress tip, String taskid, 
                                         TaskStatus status, String trackerName) {
+        boolean oldStatus = tip.isRunning();
+        boolean oldRun = tip.isComplete();
         tip.failedSubTask(taskid, trackerName);
         updateTaskStatus(tip, status);
-        
+        boolean newStatus = tip.isRunning();
+        boolean newRun = tip.isComplete();
+        //update running  count on task failure.
+        if (oldStatus != newStatus) {
+           if (!newStatus) {
+              if (tip.isMapTask()){
+                  runningMapTasks -= 1;
+              }
+              else{
+                  runningReduceTasks -= 1;
+              }
+           }
+        }
+        // the case when the map was complete but the task tracker went down.
+        if (oldRun != newRun) {
+            if (oldRun){
+                if (tip.isMapTask()){
+                    finishedMapTasks -= 1;
+                }
+            }
+        }
         // After this, try to assign tasks with the one after this, so that
         // the failed task goes to the end of the list.
         if (tip.isMapTask()) {

+ 25 - 3
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -645,15 +645,34 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         //
         int avgMaps = 0;
         int avgReduces = 0;
+        int remainingReduceLoad = 0;
+        int remainingMapLoad = 0;
         int numTaskTrackers;
         TaskTrackerStatus tts;
+        int avgMapLoad = 0;
+        int avgReduceLoad = 0;
+	
         synchronized (taskTrackers) {
           numTaskTrackers = taskTrackers.size();
           tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
         }
+        synchronized(jobsByArrival){
+            for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
+                    JobInProgress job = (JobInProgress) it.next();
+                    if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+                         int totalMapTasks = job.desiredMaps();
+                         int totalReduceTasks = job.desiredReduces();
+                         remainingMapLoad += (totalMapTasks - job.finishedMaps());
+                         remainingReduceLoad += (totalReduceTasks - job.finishedReduces());
+                    }
+            }   
+        }
+        
         if (numTaskTrackers > 0) {
           avgMaps = totalMaps / numTaskTrackers;
           avgReduces = totalReduces / numTaskTrackers;
+          avgMapLoad = remainingMapLoad / numTaskTrackers;
+          avgReduceLoad = remainingReduceLoad / numTaskTrackers;
         }
         int totalCapacity = numTaskTrackers * maxCurrentTasks;
         //
@@ -676,15 +695,18 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
 
         //
         // We hand a task to the current taskTracker if the given machine 
-        // has a workload that's equal to or less than the averageMaps 
+        // has a workload that's equal to or less than the pendingMaps average.
+        // This way the maps are launched if the TaskTracker has running tasks 
+        // less than the pending average 
         // +/- TASK_ALLOC_EPSILON.  (That epsilon is in place in case
         // there is an odd machine that is failing for some reason but 
         // has not yet been removed from the pool, making capacity seem
         // larger than it really is.)
         //
+       
         synchronized (jobsByArrival) {
             if ((numMaps < maxCurrentTasks) &&
-                (numMaps <= (avgMaps + TASK_ALLOC_EPSILON))) {
+                (numMaps <= avgMapLoad + 1 + TASK_ALLOC_EPSILON)) {
 
                 int totalNeededMaps = 0;
                 for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
@@ -719,7 +741,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
             // Same thing, but for reduce tasks
             //
             if ((numReduces < maxCurrentTasks) &&
-                (numReduces <= (avgReduces + TASK_ALLOC_EPSILON))) {
+                (numReduces <= avgReduceLoad + 1 + TASK_ALLOC_EPSILON)) {
 
                 int totalNeededReduces = 0;
                 for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {