Преглед изворни кода

MAPREDUCE-1684. ClusterStatus can be cached in CapacityTaskScheduler.assignTasks() (Koji Noguchi via tgraves)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-1@1378357 13f79535-47bb-0310-9956-ffa450edef68
Thomas Graves пре 12 година
родитељ
комит
ae75246fa1

+ 3 - 0
CHANGES.txt

@@ -213,6 +213,9 @@ Release 1.2.0 - unreleased
     MAPREDUCE-4595. TestLostTracker failing - possibly due to a race in 
     JobHistory.JobHistoryFilesManager#run() (kkambatl via tucu)
 
+    MAPREDUCE-1684. ClusterStatus can be cached in 
+    CapacityTaskScheduler.assignTasks() (Koji Noguchi via tgraves)
+
 Release 1.1.0 - unreleased
 
   INCOMPATIBLE CHANGES

+ 19 - 11
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -154,7 +154,8 @@ class CapacityTaskScheduler extends TaskScheduler {
     protected TaskType type = null;
 
     abstract TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
-        JobInProgress job, boolean assignOffSwitch) throws IOException;
+        JobInProgress job, boolean assignOffSwitch,
+        ClusterStatus clusterStatus) throws IOException;
 
     int getSlotsOccupied(JobInProgress job) {
       return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * 
@@ -293,7 +294,8 @@ class CapacityTaskScheduler extends TaskScheduler {
     private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
                                               int availableSlots,
                                               CapacitySchedulerQueue queue,
-                                              boolean assignOffSwitch)
+                                              boolean assignOffSwitch,
+                                              ClusterStatus clusterStatus)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
@@ -320,7 +322,8 @@ class CapacityTaskScheduler extends TaskScheduler {
                                                               availableSlots)) {
           // We found a suitable job. Get task from it.
           TaskLookupResult tlr = 
-            obtainNewTask(taskTrackerStatus, j, assignOffSwitch);
+            obtainNewTask(taskTrackerStatus, j, assignOffSwitch,
+                          clusterStatus);
           //if there is a task return it immediately.
           if (tlr.getLookUpStatus() == 
                   TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND || 
@@ -379,6 +382,11 @@ class CapacityTaskScheduler extends TaskScheduler {
 
       printQueues();
 
+      //MAPREDUCE-1684: somehow getClusterStatus seems to be expensive. Caching
+      //here to reuse during the scheduling
+      ClusterStatus clusterStatus =
+        scheduler.taskTrackerManager.getClusterStatus();
+
       // Check if this tasktracker has been reserved for a job...
       JobInProgress job = taskTracker.getJobForFallowSlot(type);
       if (job != null) {
@@ -397,7 +405,7 @@ class CapacityTaskScheduler extends TaskScheduler {
             // Don't care about locality!
             job.overrideSchedulingOpportunities();
           }
-          return obtainNewTask(taskTrackerStatus, job, true);
+          return obtainNewTask(taskTrackerStatus, job, true, clusterStatus);
         } else {
           // Re-reserve the current tasktracker
           taskTracker.reserveSlots(type, job, availableSlots);
@@ -420,7 +428,8 @@ class CapacityTaskScheduler extends TaskScheduler {
         }
         
         TaskLookupResult tlr = 
-          getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch);
+          getTaskFromQueue(taskTracker, availableSlots, queue, assignOffSwitch,
+                          clusterStatus);
         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
 
         if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
@@ -501,10 +510,10 @@ class CapacityTaskScheduler extends TaskScheduler {
 
     @Override
     TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
-                                   JobInProgress job, boolean assignOffSwitch) 
+                                   JobInProgress job, boolean assignOffSwitch,
+                                   ClusterStatus clusterStatus)
     throws IOException {
-      ClusterStatus clusterStatus = 
-        scheduler.taskTrackerManager.getClusterStatus();
+
       int numTaskTrackers = clusterStatus.getTaskTrackers();
       int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts();
       
@@ -581,10 +590,9 @@ class CapacityTaskScheduler extends TaskScheduler {
 
     @Override
     TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker, 
-                                   JobInProgress job, boolean unused) 
+                                   JobInProgress job, boolean unused,
+                                   ClusterStatus clusterStatus)
     throws IOException {
-      ClusterStatus clusterStatus = 
-        scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
       Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());