Browse Source

commit ecc38f53c5b63427cd07a731433372785428755f
Author: Arun C Murthy <acmurthy@apache.org>
Date: Wed Jul 28 15:48:12 2010 -0700

MAPREDUCE-1872. Fix CapacityTaskScheduler to ensure task limits are checked before queue limits on job submission. Also, fixed the scheduler to allow for jobs in queues with miniscule capacities to make progress.


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20-security-patches@1077614 13f79535-47bb-0310-9956-ffa450edef68

Owen O'Malley 14 years ago
parent
commit
7ad5e7ed76

+ 44 - 14
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java

@@ -324,7 +324,7 @@ class CapacitySchedulerQueue {
     this.maxJobsToAccept = maxJobsToAccept;
     this.maxJobsToAccept = maxJobsToAccept;
     this.maxJobsPerUserToAccept = maxJobsPerUserToAccept;
     this.maxJobsPerUserToAccept = maxJobsPerUserToAccept;
     
     
-    LOG.info("Initialized '" + queueName + "' queue with " +
+    LOG.info("Initializing '" + queueName + "' queue with " +
         "cap=" + capacityPercent + ", " +
         "cap=" + capacityPercent + ", " +
         "maxCap=" + maxCapacityPercent + ", " +
         "maxCap=" + maxCapacityPercent + ", " +
         "ulMin=" + ulMin + ", " +
         "ulMin=" + ulMin + ", " +
@@ -337,6 +337,14 @@ class CapacitySchedulerQueue {
         "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + ", " +
         "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + ", " +
         "maxActiveTasksPerUser=" + maxActiveTasksPerUser
         "maxActiveTasksPerUser=" + maxActiveTasksPerUser
     );
     );
+    
+    // Sanity checks
+    if (maxActiveTasks < maxActiveTasksPerUser ||
+        maxJobsToInit < maxJobsPerUserToInit || 
+        maxJobsToAccept < maxJobsPerUserToAccept) {
+      throw new IllegalArgumentException("Illegal queue configuration for " +
+      		"queue '" + queueName + "'");
+    }
   }
   }
   
   
   synchronized void initializeQueue(CapacitySchedulerQueue other) {
   synchronized void initializeQueue(CapacitySchedulerQueue other) {
@@ -871,23 +879,31 @@ class CapacitySchedulerQueue {
    * the requested number of slots, <code>false</code> otherwise
    * the requested number of slots, <code>false</code> otherwise
    */
    */
   boolean assignSlotsToJob(TaskType taskType, JobInProgress job, String user) {
   boolean assignSlotsToJob(TaskType taskType, JobInProgress job, String user) {
+    int numSlotsRequested = job.getNumSlotsPerTask(taskType);
+    
     // Check to ensure we will not go over the queue's max-capacity
     // Check to ensure we will not go over the queue's max-capacity
-    if (!assignSlotsToQueue(taskType, job.getNumSlotsPerTask(taskType))) {
+    if (!assignSlotsToQueue(taskType, numSlotsRequested)) {
       return false;
       return false;
     }
     }
     
     
-    // what is our current capacity? It is equal to the queue-capacity if
-    // we're running below capacity. If we're running over capacity, then its
-    // #running plus slotPerTask of the job (which is the number of extra
-    // slots we're getting).
+    // What is our current capacity? 
+    // * It is equal to the max(numSlotsRequested queue-capacity) if
+    //   we're running below capacity. The 'max' ensures that jobs in queues
+    //   with miniscule capacity (< 1 slot) make progress
+    // * If we're running over capacity, then its
+    //   #running plus slotPerTask of the job (which is the number of extra
+    //   slots we're getting).
+    
+    // Allow progress for queues with miniscule capacity
+    int queueCapacity = Math.max(getCapacity(taskType), numSlotsRequested);
+    
+    int queueSlotsOccupied = getNumSlotsOccupied(taskType);
     int currentCapacity;
     int currentCapacity;
-    int queueCapacity = getCapacity(taskType);
-    if (getNumSlotsOccupied(taskType) < queueCapacity) {
+    if (queueSlotsOccupied < queueCapacity) {
       currentCapacity = queueCapacity;
       currentCapacity = queueCapacity;
     }
     }
     else {
     else {
-      currentCapacity = 
-        getNumSlotsOccupied(taskType) + job.getNumSlotsPerTask(taskType);
+      currentCapacity = queueSlotsOccupied + numSlotsRequested;
     }
     }
     
     
     // Never allow a single user to take more than the 
     // Never allow a single user to take more than the 
@@ -900,11 +916,14 @@ class CapacitySchedulerQueue {
                    divideAndCeil(ulMin*currentCapacity, 100)),
                    divideAndCeil(ulMin*currentCapacity, 100)),
           (int)(queueCapacity * ulMinFactor)
           (int)(queueCapacity * ulMinFactor)
           );
           );
-    if (getNumSlotsOccupiedByUser(user, taskType) >= limit) {
+
+    if ((getNumSlotsOccupiedByUser(user, taskType) + numSlotsRequested) > 
+        limit) {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("User " + user + " is over limit, num slots occupied=" + 
-            getNumSlotsOccupiedByUser(user, taskType) + 
-            ", limit=" + limit);
+        LOG.debug("User " + user + " is over limit for queue=" + queueName + 
+            " num slots occupied=" + getNumSlotsOccupiedByUser(user, taskType) + 
+            " limit=" + limit +" numSlotsRequested=" + numSlotsRequested + 
+            " currentCapacity=" + currentCapacity);
       }
       }
       return false;
       return false;
     }
     }
@@ -952,6 +971,17 @@ class CapacitySchedulerQueue {
           "");
           "");
     }
     }
     
     
+    // Task limits - No point accepting the job if it can never be initialized
+    if (job.desiredTasks() > maxActiveTasksPerUser) {
+      throw new IOException(
+          "Job '" + job.getJobID() + "' from user '" + user  +
+          "' rejected since it has " + job.desiredTasks() + " tasks which" +
+          " exceeds the limit of " + maxActiveTasksPerUser + 
+          " tasks per-user which can be initialized for queue '" + 
+          queueName + "'"
+          );
+    }
+    
     // Across all jobs in queue
     // Across all jobs in queue
     if ((getNumWaitingJobs() + getNumRunningJobs()) >= maxJobsToAccept) {
     if ((getNumWaitingJobs() + getNumRunningJobs()) >= maxJobsToAccept) {
       throw new IOException(
       throw new IOException(

+ 0 - 6
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

@@ -410,12 +410,6 @@ class CapacityTaskScheduler extends TaskScheduler {
       
       
       
       
       for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
       for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
-        // we may have queues with capacity=0. We shouldn't look at jobs from 
-        // these queues
-        if (0 == queue.getCapacity(TaskType.MAP)) {
-          continue;
-        }
-
         //This call is for optimization if we are already over the
         //This call is for optimization if we are already over the
         //maximum-capacity we avoid traversing the queues.
         //maximum-capacity we avoid traversing the queues.
         if (!queue.assignSlotsToQueue(type, 1)) {
         if (!queue.assignSlotsToQueue(type, 1)) {

+ 9 - 8
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -168,7 +168,7 @@ public class TestCapacityScheduler extends TestCase {
     private int speculativeReduceTaskCounter = 0;
     private int speculativeReduceTaskCounter = 0;
     public FakeJobInProgress(JobID jId, JobConf jobConf,
     public FakeJobInProgress(JobID jId, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user, 
         FakeTaskTrackerManager taskTrackerManager, String user, 
-        JobTracker jt) {
+        JobTracker jt) throws IOException {
       super(jId, jobConf, jt);
       super(jId, jobConf, jt);
       if (user == null) {
       if (user == null) {
         user = "drwho";
         user = "drwho";
@@ -327,7 +327,7 @@ public class TestCapacityScheduler extends TestCase {
 
 
     public FakeFailingJobInProgress(JobID id, JobConf jobConf,
     public FakeFailingJobInProgress(JobID id, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user, 
         FakeTaskTrackerManager taskTrackerManager, String user, 
-        JobTracker jt) {
+        JobTracker jt) throws IOException {
       super(id, jobConf, taskTrackerManager, user, jt);
       super(id, jobConf, taskTrackerManager, user, jt);
     }
     }
     
     
@@ -1364,9 +1364,10 @@ public class TestCapacityScheduler extends TestCase {
 
 
 
 
     //high ram map from job 1 and normal reduce task from job 1
     //high ram map from job 1 and normal reduce task from job 1
-    List<Task> tasks = checkMultipleAssignment(
-      "tt1", "attempt_test_0001_m_000001_0 on tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
+    List<Task> tasks = checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
 
 
     checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
     checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
     checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
     checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
@@ -2958,8 +2959,8 @@ public class TestCapacityScheduler extends TestCase {
     jConf.setNumMapTasks(2);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(0);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setQueueName("default");
-    jConf.setUser("u1");
-    FakeJobInProgress job4= submitJobAndInit(JobStatus.PREP, jConf);
+    jConf.setUser("u2");
+    FakeJobInProgress job4 = submitJobAndInit(JobStatus.PREP, jConf);
 
 
     LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
     LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
         + "2 map/red tasks");
@@ -2969,7 +2970,7 @@ public class TestCapacityScheduler extends TestCase {
     jConf.setNumMapTasks(2);
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
     jConf.setQueueName("default");
-    jConf.setUser("u2");
+    jConf.setUser("u3");
     FakeJobInProgress job5 = submitJobAndInit(JobStatus.PREP, jConf);
     FakeJobInProgress job5 = submitJobAndInit(JobStatus.PREP, jConf);
 
 
     // Job4, a high memory job cannot be accommodated on a any TT. But with each
     // Job4, a high memory job cannot be accommodated on a any TT. But with each

+ 28 - 11
src/mapred/org/apache/hadoop/mapred/JobInProgress.java

@@ -319,7 +319,8 @@ public class JobInProgress {
   /**
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
    * Create an almost empty JobInProgress, which can be used only for tests
    */
    */
-  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
+  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) 
+  throws IOException {
     this.conf = conf;
     this.conf = conf;
     this.jobId = jobid;
     this.jobId = jobid;
     this.numMapTasks = conf.getNumMapTasks();
     this.numMapTasks = conf.getNumMapTasks();
@@ -347,6 +348,9 @@ public class JobInProgress {
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
 
 
+    // Check task limits
+    checkTaskLimits();
+
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
       (numMapTasks + numReduceTasks + 10);
     try {
     try {
@@ -387,6 +391,7 @@ public class JobInProgress {
         public FileSystem run() throws IOException {
         public FileSystem run() throws IOException {
           return jobSubmitDir.getFileSystem(default_conf);
           return jobSubmitDir.getFileSystem(default_conf);
         }});
         }});
+      
       /** check for the size of jobconf **/
       /** check for the size of jobconf **/
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
       FileStatus fstatus = fs.getFileStatus(submitJobFile);
       FileStatus fstatus = fs.getFileStatus(submitJobFile);
@@ -411,6 +416,7 @@ public class JobInProgress {
             jobId.toString(), desc);
             jobId.toString(), desc);
         throw new IOException(desc);
         throw new IOException(desc);
       }
       }
+      
       this.priority = conf.getJobPriority();
       this.priority = conf.getJobPriority();
       this.status.setJobPriority(this.priority);
       this.status.setJobPriority(this.priority);
       this.profile = new JobProfile(user, jobId, 
       this.profile = new JobProfile(user, jobId, 
@@ -458,6 +464,9 @@ public class JobInProgress {
       // register job's tokens for renewal
       // register job's tokens for renewal
       DelegationTokenRenewal.registerDelegationTokensForRenewal(
       DelegationTokenRenewal.registerDelegationTokensForRenewal(
           jobInfo.getJobID(), ts, jobtracker.getConf());
           jobInfo.getJobID(), ts, jobtracker.getConf());
+      
+      // Check task limits
+      checkTaskLimits();
     } finally {
     } finally {
       //close all FileSystems that was created above for the current user
       //close all FileSystems that was created above for the current user
       //At this point, this constructor is called in the context of an RPC, and
       //At this point, this constructor is called in the context of an RPC, and
@@ -467,6 +476,19 @@ public class JobInProgress {
     }
     }
   }
   }
     
     
+  private void checkTaskLimits() throws IOException {
+    // if the number of tasks is larger than a configured value
+    // then fail the job.
+    int maxTasks = jobtracker.getMaxTasksPerJob();
+    LOG.info(jobId + ": nMaps=" + numMapTasks + " nReduces=" + numReduceTasks + " max=" + maxTasks);
+    if (maxTasks > 0 && (numMapTasks + numReduceTasks) > maxTasks) {
+      throw new IOException(
+                "The number of tasks for this job " + 
+                (numMapTasks + numReduceTasks) +
+                " exceeds the configured limit " + maxTasks);
+    }
+  }
+  
   /**
   /**
    * Called when the job is complete
    * Called when the job is complete
    */
    */
@@ -649,18 +671,13 @@ public class JobInProgress {
     // read input splits and create a map per a split
     // read input splits and create a map per a split
     //
     //
     TaskSplitMetaInfo[] splits = createSplits(jobId);
     TaskSplitMetaInfo[] splits = createSplits(jobId);
+    if (numMapTasks != splits.length) {
+      throw new IOException("Number of maps in JobConf doesn't match number of " +
+      		"recieved splits for job " + jobId + "! " +
+      		"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
+    }
     numMapTasks = splits.length;
     numMapTasks = splits.length;
 
 
-
-    // if the number of splits is larger than a configured value
-    // then fail the job.
-    int maxTasks = jobtracker.getMaxTasksPerJob();
-    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
-      throw new IOException(
-                "The number of tasks for this job " + 
-                (numMapTasks + numReduceTasks) +
-                " exceeds the configured limit " + maxTasks);
-    }
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
 
 

+ 1 - 1
src/test/org/apache/hadoop/mapred/TestClusterStatus.java

@@ -128,7 +128,7 @@ public class TestClusterStatus extends TestCase {
    */
    */
   static class FakeJobInProgress extends JobInProgress {
   static class FakeJobInProgress extends JobInProgress {
     public FakeJobInProgress(JobID jId, JobConf jobConf,
     public FakeJobInProgress(JobID jId, JobConf jobConf,
-                JobTracker jt) {
+                JobTracker jt) throws IOException {
       super(jId, jobConf, jt);
       super(jId, jobConf, jt);
     }
     }
   }
   }

+ 3 - 2
src/test/org/apache/hadoop/mapred/TestJobRetire.java

@@ -268,7 +268,8 @@ public class TestJobRetire extends TestCase {
   }
   }
  
  
   // create a new job and add it to the jobtracker
   // create a new job and add it to the jobtracker
-  private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) {
+  private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) 
+  throws IOException {
     // submit a job in a fake manner
     // submit a job in a fake manner
     // get the new job-id
     // get the new job-id
     JobID id = 
     JobID id = 
@@ -333,7 +334,7 @@ public class TestJobRetire extends TestCase {
   //   - remove the job from the jobtracker
   //   - remove the job from the jobtracker
   //   - check if the fake attempt is removed from the jobtracker
   //   - check if the fake attempt is removed from the jobtracker
   private void testRemoveJobTasks(JobTracker jobtracker, JobConf conf, 
   private void testRemoveJobTasks(JobTracker jobtracker, JobConf conf, 
-                                  TaskType type) {
+                                  TaskType type) throws IOException {
     // create and submit a job
     // create and submit a job
     JobInProgress jip = createAndAddJob(jobtracker, conf);
     JobInProgress jip = createAndAddJob(jobtracker, conf);
     // create and add a tip
     // create and add a tip