Explorar o código

Merge -r 734596:734597 from trunk to branch 0.20 to fix HADOOP-4977.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@734598 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala %!s(int64=16) %!d(string=hai) anos
pai
achega
8d0fb361f7

+ 3 - 0
CHANGES.txt

@@ -536,6 +536,9 @@ Release 0.20.0 - Unreleased
     HADOOP-5026. Make chukwa/bin scripts executable in repository. (Andy
     Konwinski via cdouglas)
 
+    HADOOP-4977. Fix a deadlock between the reclaimCapacity and assignTasks
+    in capacity scheduler. (Vivek Ratan via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

+ 169 - 121
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -160,6 +160,17 @@ class CapacityTaskScheduler extends TaskScheduler {
      * created.  
      */
     int numReclaimedResources = 0;
+    
+    /**
+     * reset the variables associated with tasks
+     */
+    void resetTaskVars() {
+      numRunningTasks = 0;
+      numPendingTasks = 0;
+      for (String s: numRunningTasksByUser.keySet()) {
+        numRunningTasksByUser.put(s, 0);
+      }
+    }
 
     /**
      * return information about the tasks
@@ -350,8 +361,6 @@ class CapacityTaskScheduler extends TaskScheduler {
    */
   private static abstract class TaskSchedulingMgr {
 
-    /** we keep track of the number of map or reduce slots we saw last */
-    private int prevClusterCapacity = 0;
     /** our TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
     // can be replaced with a global type, if we have one
@@ -362,8 +371,6 @@ class CapacityTaskScheduler extends TaskScheduler {
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException; 
-    abstract int getClusterCapacity();
-    abstract int getRunningTasks(JobInProgress job);
     abstract int getPendingTasks(JobInProgress job);
     abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
@@ -464,14 +471,15 @@ class CapacityTaskScheduler extends TaskScheduler {
      * waiting)
      * b. Check if a queue hasn't received enough of the resources it needed
      * to be reclaimed and thus tasks need to be killed.
+     * The caller is responsible for ensuring that the QSI objects and the 
+     * collections are up-to-date.
+     * 
+     * Make sure that we do not make any calls to scheduler.taskTrackerManager
+     * as this can result in a deadlock (see HADOOP-4977). 
      */
-    private synchronized void reclaimCapacity() {
+    private synchronized void reclaimCapacity(int nextHeartbeatInterval) {
       int tasksToKill = 0;
       
-      // make sure we always get the latest values
-      updateQSIObjects();
-      updateCollectionOfQSIs();
-      
       QueueSchedulingInfo lastQsi = 
         qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
       TaskSchedulingInfo lastTsi = getTSI(lastQsi);
@@ -513,11 +521,11 @@ class CapacityTaskScheduler extends TaskScheduler {
             // create a request for resources to be reclaimed
             int amt = Math.min((tsi.guaranteedCapacity-usedCap), 
                 (tsi.numPendingTasks - tsi.numReclaimedResources));
-            // create a rsource object that needs to be reclaimed some time
+            // create a resource object that needs to be reclaimed some time
             // in the future
             long whenToKill = qsi.reclaimTime - 
               (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING * 
-                  scheduler.taskTrackerManager.getNextHeartbeatInterval());
+                  nextHeartbeatInterval);
             if (whenToKill < 0) whenToKill = 0;
             tsi.reclaimList.add(new ReclaimedResource(amt, 
                 currentTime + qsi.reclaimTime, 
@@ -617,88 +625,6 @@ class CapacityTaskScheduler extends TaskScheduler {
       return tID;
     }
     
-    
-    /**
-     * Update individual QSI objects.
-     * We don't need exact information for all variables, just enough for us
-     * to make scheduling decisions. For example, we don't need an exact count
-     * of numRunningTasks. Once we count upto the grid capacity (gcSum), any
-     * number beyond that will make no difference.
-     * 
-     * The pending task count is only required in reclaim capacity. So 
-     * if the computation becomes expensive, we can add a boolean to 
-     * denote if pending task computation is required or not.
-     * */
-    private synchronized void updateQSIObjects() {
-      // if # of slots have changed since last time, update. 
-      // First, compute whether the total number of TT slots have changed
-      int currentClusterCapacity = getClusterCapacity();
-      for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
-        TaskSchedulingInfo tsi = getTSI(qsi);
-        // compute new GCs and ACs, if TT slots have changed
-        if (currentClusterCapacity != prevClusterCapacity) {
-          tsi.guaranteedCapacity =
-            (int)(qsi.guaranteedCapacityPercent*currentClusterCapacity/100);
-        }
-        tsi.numRunningTasks = 0;
-        tsi.numPendingTasks = 0;
-        for (String s: tsi.numRunningTasksByUser.keySet()) {
-          tsi.numRunningTasksByUser.put(s, 0);
-        }
-        // update stats on running jobs
-        for (JobInProgress j: 
-          scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-          if (j.getStatus().getRunState() != JobStatus.RUNNING) {
-            continue;
-          }
-          tsi.numRunningTasks += getRunningTasks(j);
-          Integer i = tsi.numRunningTasksByUser.get(j.getProfile().getUser());
-          tsi.numRunningTasksByUser.put(j.getProfile().getUser(), 
-              i+getRunningTasks(j));
-          tsi.numPendingTasks += getPendingTasks(j);
-          LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
-              j.runningMaps() + ", run(r) = " + j.runningReduces() + 
-              ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
-              j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
-              ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + 
-              j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks 
-              + ", total(m) = " + j.numMapTasks + ", total(r) = " + 
-              j.numReduceTasks);
-          /* 
-           * it's fine walking down the entire list of running jobs - there
-           * probably will not be many, plus, we may need to go through the
-           * list to compute numRunningTasksByUser. If this is expensive, we
-           * can keep a list of running jobs per user. Then we only need to
-           * consider the first few jobs per user.
-           */ 
-        }
-        
-        //update stats on waiting jobs
-        for(JobInProgress j : 
-          scheduler.jobQueuesManager.getJobs(qsi.queueName)) {
-          // pending tasks
-          if(tsi.numPendingTasks > currentClusterCapacity) {
-            // that's plenty. no need for more computation
-            break;
-          }
-          /*
-           * Consider only the waiting jobs in the job queue. Job queue can
-           * contain:
-           * 1. Jobs which are in running state but not scheduled
-           * (these would also be present in running queue), the pending 
-           * task count of these jobs is computed when scheduler walks
-           * through running job queue.
-           * 2. Jobs which are killed by user, but waiting job initialization
-           * poller to walk through the job queue to clean up killed jobs.
-           */
-          if (j.getStatus().getRunState() == JobStatus.PREP) {
-            tsi.numPendingTasks += getPendingTasks(j);
-          }
-        }
-      }
-      prevClusterCapacity = currentClusterCapacity;
-    }
-
     // called when a task is allocated to queue represented by qsi. 
     // update our info about reclaimed resources
     private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
@@ -838,22 +764,9 @@ class CapacityTaskScheduler extends TaskScheduler {
     }
 
     // Always return a TaskLookupResult object. Don't return null. 
+    // The caller is responsible for ensuring that the QSI objects and the 
+    // collections are up-to-date.
     private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
-      /* 
-       * update all our QSI objects.
-       * This involves updating each qsi structure. This operation depends
-       * on the number of running jobs in a queue, and some waiting jobs. If it
-       * becomes expensive, do it once every few heartbeats only.
-       */ 
-      updateQSIObjects();
-      LOG.debug("After updating QSI objects in " + this.type + " scheduler :");
-      printQSIs();
-      /*
-       * sort list of queues first, as we want queues that need the most to
-       * get first access. If this is expensive, sort every few heartbeats.
-       * We're only sorting a collection of queues - there shouldn't be many.
-       */
-      updateCollectionOfQSIs();
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         if (getTSI(qsi).guaranteedCapacity <= 0.0f) {
           // No capacity is guaranteed yet for this queue.
@@ -886,6 +799,7 @@ class CapacityTaskScheduler extends TaskScheduler {
       return TaskLookupResult.getNoTaskFoundResult();
     }
     
+    // for debugging.
     private void printQSIs() {
       StringBuffer s = new StringBuffer();
       for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
@@ -1044,6 +958,10 @@ class CapacityTaskScheduler extends TaskScheduler {
 
   MemoryMatcher memoryMatcher = new MemoryMatcher(this);
 
+  /** we keep track of the number of map/reduce slots we saw last */
+  private int prevMapClusterCapacity = 0;
+  private int prevReduceClusterCapacity = 0;
+  
   /** name of the default queue. */ 
   static final String DEFAULT_QUEUE_NAME = "default";
   
@@ -1269,22 +1187,135 @@ class CapacityTaskScheduler extends TaskScheduler {
     super.setConf(conf);
   }
 
+  /**
+   * Reclaim capacity for both map & reduce tasks. 
+   * Do not make this synchronized, since we call taskTrackerManager 
+   * (see HADOOP-4977). 
+   */
   void reclaimCapacity() {
-    mapScheduler.reclaimCapacity();
-    reduceScheduler.reclaimCapacity();
+    // get the cluster capacity
+    ClusterStatus c = taskTrackerManager.getClusterStatus();
+    int mapClusterCapacity = c.getMaxMapTasks();
+    int reduceClusterCapacity = c.getMaxReduceTasks();
+    int nextHeartbeatInterval = taskTrackerManager.getNextHeartbeatInterval();
+    // update the QSI objects
+    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+    // update the qsi collections, since we depend on their ordering 
+    mapScheduler.updateCollectionOfQSIs();
+    reduceScheduler.updateCollectionOfQSIs();
+    // now, reclaim
+    mapScheduler.reclaimCapacity(nextHeartbeatInterval);
+    reduceScheduler.reclaimCapacity(nextHeartbeatInterval);
   }
   
   /**
    * provided for the test classes
-   * lets you update the QSI objects and sorted collection
+   * lets you update the QSI objects and sorted collections
    */ 
-  void updateQSIInfo() {
-    mapScheduler.updateQSIObjects();
+  void updateQSIInfoForTests() {
+    ClusterStatus c = taskTrackerManager.getClusterStatus();
+    int mapClusterCapacity = c.getMaxMapTasks();
+    int reduceClusterCapacity = c.getMaxReduceTasks();
+    // update the QSI objects
+    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
     mapScheduler.updateCollectionOfQSIs();
-    reduceScheduler.updateQSIObjects();
     reduceScheduler.updateCollectionOfQSIs();
   }
-  
+
+  /**
+   * Update individual QSI objects.
+   * We don't need exact information for all variables, just enough for us
+   * to make scheduling decisions. For example, we don't need an exact count
+   * of numRunningTasks. Once we count upto the grid capacity, any
+   * number beyond that will make no difference.
+   * 
+   * The pending task count is only required in reclaim capacity. So 
+   * if the computation becomes expensive, we can add a boolean to 
+   * denote if pending task computation is required or not.
+   * 
+   **/
+  private synchronized void updateQSIObjects(int mapClusterCapacity, 
+      int reduceClusterCapacity) {
+    // if # of slots have changed since last time, update. 
+    // First, compute whether the total number of TT slots have changed
+    for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+      // compute new GCs, if TT slots have changed
+      if (mapClusterCapacity != prevMapClusterCapacity) {
+        qsi.mapTSI.guaranteedCapacity =
+          (int)(qsi.guaranteedCapacityPercent*mapClusterCapacity/100);
+      }
+      if (reduceClusterCapacity != prevReduceClusterCapacity) {
+        qsi.reduceTSI.guaranteedCapacity =
+          (int)(qsi.guaranteedCapacityPercent*reduceClusterCapacity/100);
+      }
+      // reset running/pending tasks, tasks per user
+      qsi.mapTSI.resetTaskVars();
+      qsi.reduceTSI.resetTaskVars();
+      // update stats on running jobs
+      for (JobInProgress j: 
+        jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
+          continue;
+        }
+        int runningMaps = j.runningMaps();
+        int runningReduces = j.runningReduces();
+        qsi.mapTSI.numRunningTasks += runningMaps;
+        qsi.reduceTSI.numRunningTasks += runningReduces;
+        Integer i = 
+          qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
+        qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
+            i+runningMaps);
+        i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
+        qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(), 
+            i+runningReduces);
+        qsi.mapTSI.numPendingTasks += j.pendingMaps();
+        qsi.reduceTSI.numPendingTasks += j.pendingReduces();
+        LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
+            j.runningMaps() + ", run(r) = " + j.runningReduces() + 
+            ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
+            j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
+            ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " + 
+            j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks 
+            + ", total(m) = " + j.numMapTasks + ", total(r) = " + 
+            j.numReduceTasks);
+        /* 
+         * it's fine walking down the entire list of running jobs - there
+         * probably will not be many, plus, we may need to go through the
+         * list to compute numRunningTasksByUser. If this is expensive, we
+         * can keep a list of running jobs per user. Then we only need to
+         * consider the first few jobs per user.
+         */ 
+      }
+      
+      //update stats on waiting jobs
+      for(JobInProgress j: jobQueuesManager.getJobs(qsi.queueName)) {
+        // pending tasks
+        if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
+            (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
+          // that's plenty. no need for more computation
+          break;
+        }
+        /*
+         * Consider only the waiting jobs in the job queue. Job queue can
+         * contain:
+         * 1. Jobs which are in running state but not scheduled
+         * (these would also be present in running queue), the pending 
+         * task count of these jobs is computed when scheduler walks
+         * through running job queue.
+         * 2. Jobs which are killed by user, but waiting job initialization
+         * poller to walk through the job queue to clean up killed jobs.
+         */
+        if (j.getStatus().getRunState() == JobStatus.PREP) {
+          qsi.mapTSI.numPendingTasks += j.pendingMaps();
+          qsi.reduceTSI.numPendingTasks += j.pendingReduces();
+        }
+      }
+    }
+    
+    prevMapClusterCapacity = mapClusterCapacity;
+    prevReduceClusterCapacity = reduceClusterCapacity;
+  }
+
   /* 
    * The grand plan for assigning a task. 
    * First, decide whether a Map or Reduce task should be given to a TT 
@@ -1309,20 +1340,34 @@ class CapacityTaskScheduler extends TaskScheduler {
      * Number of ways to do this. For now, base decision on how much is needed
      * versus how much is used (default to Map, if equal).
      */
-    LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + 
-        ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + 
-        taskTracker.getMaxReduceTasks() + ", run reds=" + 
-        taskTracker.countReduceTasks() + ", map cap=" + 
-        mapScheduler.getClusterCapacity() + ", red cap = " + 
-        reduceScheduler.getClusterCapacity());
+    ClusterStatus c = taskTrackerManager.getClusterStatus();
+    int mapClusterCapacity = c.getMaxMapTasks();
+    int reduceClusterCapacity = c.getMaxReduceTasks();
     int maxMapTasks = taskTracker.getMaxMapTasks();
     int currentMapTasks = taskTracker.countMapTasks();
     int maxReduceTasks = taskTracker.getMaxReduceTasks();
     int currentReduceTasks = taskTracker.countReduceTasks();
+    LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + 
+        ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + 
+        taskTracker.getMaxReduceTasks() + ", run reds=" + 
+        taskTracker.countReduceTasks() + ", map cap=" + 
+        mapClusterCapacity + ", red cap = " + 
+        reduceClusterCapacity);
+
+    /* 
+     * update all our QSI objects.
+     * This involves updating each qsi structure. This operation depends
+     * on the number of running jobs in a queue, and some waiting jobs. If it
+     * becomes expensive, do it once every few heartbeats only.
+     */ 
+    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+    // make sure we get our map or reduce scheduling object to update its 
+    // collection of QSI objects too. 
 
     if ((maxReduceTasks - currentReduceTasks) > 
     (maxMapTasks - currentMapTasks)) {
       // get a reduce task first
+      reduceScheduler.updateCollectionOfQSIs();
       tlr = reduceScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
@@ -1337,6 +1382,7 @@ class CapacityTaskScheduler extends TaskScheduler {
       // if we didn't get any, look at map tasks, if TT has space
       else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
         tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
+        mapScheduler.updateCollectionOfQSIs();
         tlr = mapScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
           tlr.getLookUpStatus()) {
@@ -1346,6 +1392,7 @@ class CapacityTaskScheduler extends TaskScheduler {
     }
     else {
       // get a map task first
+      mapScheduler.updateCollectionOfQSIs();
       tlr = mapScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
@@ -1359,6 +1406,7 @@ class CapacityTaskScheduler extends TaskScheduler {
       // if we didn't get any, look at reduce tasks, if TT has space
       else if ((TaskLookupResult.LookUpStatus.NO_TASK_FOUND == 
         tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
+        reduceScheduler.updateCollectionOfQSIs();
         tlr = reduceScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
           tlr.getLookUpStatus()) {

+ 7 - 7
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -1525,7 +1525,7 @@ public class TestCapacityScheduler extends TestCase {
     //Get scheduling information, now the number of waiting job should have
     //changed to 4 as one is scheduled and has become running.
     // make sure we update our stats
-    scheduler.updateQSIInfo();
+    scheduler.updateQSIInfoForTests();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1537,7 +1537,7 @@ public class TestCapacityScheduler extends TestCase {
     //assign a reduce task
     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     // make sure we update our stats
-    scheduler.updateQSIInfo();
+    scheduler.updateQSIInfoForTests();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1553,7 +1553,7 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.finalizeJob(u1j1);
     
     // make sure we update our stats
-    scheduler.updateQSIInfo();
+    scheduler.updateQSIInfoForTests();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1570,7 +1570,7 @@ public class TestCapacityScheduler extends TestCase {
     //Run initializer to clean up failed jobs
     p.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfo();
+    scheduler.updateQSIInfoForTests();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1588,7 +1588,7 @@ public class TestCapacityScheduler extends TestCase {
     //run initializer to clean up failed job
     p.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfo();
+    scheduler.updateQSIInfoForTests();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1613,7 +1613,7 @@ public class TestCapacityScheduler extends TestCase {
     //one. run the poller as it is responsible for waiting count
     p.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfo();
+    scheduler.updateQSIInfoForTests();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1625,7 +1625,7 @@ public class TestCapacityScheduler extends TestCase {
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
     // make sure we update our stats
-    scheduler.updateQSIInfo();
+    scheduler.updateQSIInfoForTests();
     //Now running counts should become zero
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();