浏览代码

HADOOP-4100. Removes the cleanupTask scheduling from the Scheduler implementations and moves it to the JobTracker. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@693360 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 17 年之前
父节点
当前提交
380500514c

+ 4 - 0
CHANGES.txt

@@ -484,6 +484,10 @@ Trunk (unreleased changes)
     HADOOP-3963. libhdfs does not exit on its own, instead it returns error 
     to the caller and behaves as a true library. (Pete Wyckoff via dhruba)
 
+    HADOOP-4100. Removes the cleanupTask scheduling from the Scheduler 
+    implementations and moves it to the JobTracker. 
+    (Amareshwari Sriramadasu via ddas)
+
 Release 0.18.1 - Unreleased
 
   BUG FIXES

+ 5 - 0
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -804,6 +804,11 @@ class JobInProgress {
    * @return true/false
    */
   private synchronized boolean canLaunchCleanupTask() {
+    // check if the job is running
+    if (status.getRunState() != JobStatus.RUNNING) {
+      return false;
+    }
+    // check if cleanup task has been launched already. 
     if (launchedCleanup) {
       return false;
     }

+ 0 - 39
src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java

@@ -73,34 +73,6 @@ class JobQueueTaskScheduler extends TaskScheduler {
                                  0.01f);
   }
 
-  protected Task getCleanupTask(int numMaps, int numReduces,
-                                int maxMapTasks, int maxReduceTasks,
-                                TaskTrackerStatus taskTracker,
-                                int numTaskTrackers,
-                                Collection<JobInProgress> jobQueue) 
-  throws IOException {
-    Task t = null;
-    if (numMaps < maxMapTasks) {
-      for (JobInProgress job : jobQueue) {
-        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
-                       taskTrackerManager.getNumberOfUniqueHosts(), true);
-        if (t != null) {
-          return t;
-        }
-      }
-    }
-    if (numReduces < maxReduceTasks) {
-      for (JobInProgress job : jobQueue) {
-        t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
-                       taskTrackerManager.getNumberOfUniqueHosts(), false);
-        if (t != null) {
-          return t;
-        }
-      }
-    }
-    return t;
-  }
-  
   @Override
   public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
       throws IOException {
@@ -119,17 +91,6 @@ class JobQueueTaskScheduler extends TaskScheduler {
     int numMaps = taskTracker.countMapTasks();
     int numReduces = taskTracker.countReduceTasks();
 
-
-    // cleanup task has the highest priority, it should be 
-    // launched as soon as the job is done.
-    synchronized (jobQueue) {
-      Task t = getCleanupTask(numMaps, numReduces, maxCurrentMapTasks,
-                 maxCurrentReduceTasks, taskTracker, numTaskTrackers, jobQueue);
-      if (t != null) {
-        return Collections.singletonList(t);
-      }
-    }
-
     //
     // Compute average map and reduce task numbers across pool
     //

+ 41 - 1
src/mapred/org/apache/hadoop/mapred/JobTracker.java

@@ -1221,7 +1221,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
-        List<Task> tasks = taskScheduler.assignTasks(taskTrackerStatus);
+        List<Task> tasks = getCleanupTask(taskTrackerStatus);
+        if (tasks == null ) {
+          tasks = taskScheduler.assignTasks(taskTrackerStatus);
+        }
         if (tasks != null) {
           for (Task task : tasks) {
             expireLaunchingTasks.addNewTask(task.getTaskID());
@@ -1457,6 +1460,43 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
     return null;
   }
   
+  private synchronized List<Task> getCleanupTask(TaskTrackerStatus taskTracker)
+  throws IOException {
+    int maxMapTasks = taskTracker.getMaxMapTasks();
+    int maxReduceTasks = taskTracker.getMaxReduceTasks();
+    int numMaps = taskTracker.countMapTasks();
+    int numReduces = taskTracker.countReduceTasks();
+    int numTaskTrackers = getClusterStatus().getTaskTrackers();
+    int numUniqueHosts = getNumberOfUniqueHosts();
+
+    Task t = null;
+    synchronized (jobs) {
+      if (numMaps < maxMapTasks) {
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                                    numUniqueHosts, true);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+      }
+      if (numReduces < maxReduceTasks) {
+        for (Iterator<JobInProgress> it = jobs.values().iterator();
+             it.hasNext();) {
+          JobInProgress job = it.next();
+          t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+                                    numUniqueHosts, false);
+          if (t != null) {
+            return Collections.singletonList(t);
+          }
+        }
+      }
+    }
+    return null;
+  }
+
   /**
    * Grab the local fs name
    */

+ 0 - 10
src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java

@@ -74,16 +74,6 @@ class LimitTasksPerJobTaskScheduler extends JobQueueTaskScheduler {
     final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
     final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
 
-    // check if cleanup task can be launched
-    synchronized (jobQueue) {
-      task = getCleanupTask(mapTasksNumber, reduceTasksNumber,
-               maximumMapTasksNumber, maximumReduceTasksNumber,
-               taskTracker, numTaskTrackers, jobQueue);
-      if (task != null) {
-        return Collections.singletonList(task);
-      }
-    }
-
     /*
      * Statistics about the whole cluster. Most are approximate because of
      * concurrency