Browse Source

Reduce iteration through all map & reduce tasks to improve jobtracker performance.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@383690 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 years ago
parent
commit
9c70ec2b42
1 changed files with 15 additions and 26 deletions
  1. 15 26
      src/java/org/apache/hadoop/mapred/JobInProgress.java

+ 15 - 26
src/java/org/apache/hadoop/mapred/JobInProgress.java

@@ -245,30 +245,27 @@ class JobInProgress {
     // Status update methods
     ////////////////////////////////////////////////////
     public void updateTaskStatus(TaskInProgress tip, TaskStatus status) {
-        tip.updateStatus(status);
+        double oldProgress = tip.getProgress();   // save old progress
+        tip.updateStatus(status);                 // update tip
 
         //
         // Update JobInProgress status
         //
-        if (maps.length == 0) {
+        double progressDelta = tip.getProgress() - oldProgress;
+        if (tip.isMapTask()) {
+          if (maps.length == 0) {
             this.status.setMapProgress(1.0f);
+          } else {
+            this.status.mapProgress += (progressDelta / maps.length);
+          }
         } else {
-            double reportedProgress = 0;
-            for (int i = 0; i < maps.length; i++) {
-                reportedProgress += maps[i].getProgress();
-            }
-            this.status.setMapProgress((float) (reportedProgress / maps.length));
-        }
-        if (reduces.length == 0) {
+          if (reduces.length == 0) {
             this.status.setReduceProgress(1.0f);
-        } else {
-            double reportedProgress = 0;
-            for (int i = 0; i < reduces.length; i++) {
-                reportedProgress += reduces[i].getProgress();
-            }
-            this.status.setReduceProgress((float) (reportedProgress / reduces.length));
+          } else {
+            this.status.reduceProgress += (progressDelta / reduces.length);
+          }
         }
-    }
+    }   
 
     /////////////////////////////////////////////////////
     // Create/manage tasks
@@ -286,7 +283,6 @@ class JobInProgress {
         int cacheTarget = -1;
         int stdTarget = -1;
         int specTarget = -1;
-        double totalProgress = 0;
 
         //
         // We end up creating two tasks for the same bucket, because
@@ -297,10 +293,7 @@ class JobInProgress {
         //
         // Compute avg progress through the map tasks
         //
-        for (int i = 0; i < maps.length; i++) {        
-            totalProgress += maps[i].getProgress();
-        }
-        double avgProgress = totalProgress / maps.length;
+        double avgProgress = status.mapProgress() / maps.length;
 
         //
         // See if there is a split over a block that is stored on
@@ -373,11 +366,7 @@ class JobInProgress {
         Task t = null;
         int stdTarget = -1;
         int specTarget = -1;
-        int totalProgress = 0;
-        for (int i = 0; i < reduces.length; i++) {
-            totalProgress += reduces[i].getProgress();
-        }
-        double avgProgress = (1.0 * totalProgress) / reduces.length;
+        double avgProgress = status.reduceProgress() / reduces.length;
 
         for (int i = 0; i < reduces.length; i++) {
             if (reduces[i].hasTask()) {