Pārlūkot izejas kodu

HADOOP-299. Fix the tasktracker, permitting multiple jobs to more easily execute at the same time. Contributed by Owen.

git-svn-id: https://svn.apache.org/repos/asf/lucene/hadoop/trunk@415383 13f79535-47bb-0310-9956-ffa450edef68
Doug Cutting 19 gadi atpakaļ
vecāks
revīzija
7d29589f3a
2 mainītis faili ar 18 papildinājumiem un 15 dzēšanām
  1. 4 0
      CHANGES.txt
  2. 14 15
      src/java/org/apache/hadoop/mapred/JobTracker.java

+ 4 - 0
CHANGES.txt

@@ -6,6 +6,10 @@ Trunk (unreleased changes)
  1. HADOOP-298.  Improved progress reports for CopyFiles utility, the
     distributed file copier.  (omalley via cutting)
 
+ 2. HADOOP-299.  Fix some problems in the TaskTracker, permitting
+    multiple jobs to more easily execute at the same time.
+    (omalley via cutting)
+
 
 Release 0.3.2 - 2006-06-09
 

+ 14 - 15
src/java/org/apache/hadoop/mapred/JobTracker.java

@@ -39,7 +39,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
     static long RETIRE_JOB_CHECK_INTERVAL;
     static float TASK_ALLOC_EPSILON;
     static float PAD_FRACTION;
-    static float MIN_SLOTS_FOR_PADDING;
+    static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
 
     /**
      * Used for formatting the id numbers
@@ -405,8 +405,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
         RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
         TASK_ALLOC_EPSILON = conf.getFloat("mapred.jobtracker.taskalloc.loadbalance.epsilon", 0.2f);
-        PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 0.1f);
-        MIN_SLOTS_FOR_PADDING = 3 * maxCurrentTasks;
+        PAD_FRACTION = conf.getFloat("mapred.jobtracker.taskalloc.capacitypad", 
+                                     0.01f);
 
         // This is a directory of temporary submission files.  We delete it
         // on startup, and can delete any files that we're done with
@@ -643,8 +643,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         //
         // Compute average map and reduce task numbers across pool
         //
-        int avgMaps = 0;
-        int avgReduces = 0;
         int remainingReduceLoad = 0;
         int remainingMapLoad = 0;
         int numTaskTrackers;
@@ -669,8 +667,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
         }
         
         if (numTaskTrackers > 0) {
-          avgMaps = totalMaps / numTaskTrackers;
-          avgReduces = totalReduces / numTaskTrackers;
           avgMapLoad = remainingMapLoad / numTaskTrackers;
           avgReduceLoad = remainingReduceLoad / numTaskTrackers;
         }
@@ -727,11 +723,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                     // schedule tasks to the hilt.
                     //
                     totalNeededMaps += job.desiredMaps();
-                    double padding = 0;
-                    if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
-                        padding = Math.min(maxCurrentTasks, totalNeededMaps * PAD_FRACTION);
+                    int padding = 0;
+                    if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+                      padding = Math.min(maxCurrentTasks,
+                                         (int)(totalNeededMaps * PAD_FRACTION));
                     }
-                    if (totalNeededMaps + padding >= totalCapacity) {
+                    if (totalMaps + padding >= totalCapacity) {
                         break;
                     }
                 }
@@ -762,11 +759,13 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
                     // schedule tasks to the hilt.
                     //
                     totalNeededReduces += job.desiredReduces();
-                    double padding = 0;
-                    if (totalCapacity > MIN_SLOTS_FOR_PADDING) {
-                        padding = Math.min(maxCurrentTasks, totalNeededReduces * PAD_FRACTION);
+                    int padding = 0;
+                    if (numTaskTrackers > MIN_CLUSTER_SIZE_FOR_PADDING) {
+                        padding = 
+                          Math.min(maxCurrentTasks,
+                                   (int) (totalNeededReduces * PAD_FRACTION));
                     }
-                    if (totalNeededReduces + padding >= totalCapacity) {
+                    if (totalReduces + padding >= totalCapacity) {
                         break;
                     }
                 }