Просмотр исходного кода

Merge -r 732606:732607 from trunk to branch 0.20 to fix HADOOP-4980.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/branches/branch-0.20@732608 13f79535-47bb-0310-9956-ffa450edef68
Hemanth Yamijala 16 лет назад
Родитель
Сommit
b16fb56a0f

+ 3 - 0
CHANGES.txt

@@ -292,6 +292,9 @@ Release 0.20.0 - Unreleased
     HADOOP-4830. Add end-to-end test cases for testing queue capacities.
     (Vinod Kumar Vavilapalli via yhemanth)
 
+    HADOOP-4980. Improve code layout of capacity scheduler to make it 
+    easier to fix some blocker bugs. (Vivek Ratan via yhemanth)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

Разница между файлами не показана из-за своего большого размера
+ 357 - 376
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java


+ 4 - 0
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java

@@ -278,4 +278,8 @@ class JobQueuesManager extends JobInProgressListener {
     QueueInfo qi = jobQueues.get(queue);
     return qi.getWaitingJobCount();
   }
+
+  boolean doesQueueSupportPriorities(String queueName) {
+    return jobQueues.get(queueName).supportsPriorities;
+  }
 }

+ 65 - 74
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

@@ -985,8 +985,8 @@ public class TestCapacityScheduler extends TestCase {
                                           String queue) throws IOException {
     String schedInfo = taskTrackerManager.getQueueManager().
                           getSchedulerInfo(queue).toString();
-    assertTrue(schedInfo.contains("Guaranteed Capacity Maps : " 
-                                    + expectedCapacity));
+    assertTrue(schedInfo.contains("Map tasks\nGuaranteed Capacity: " 
+        + expectedCapacity));
   }
   
   // test capacity transfer
@@ -1301,7 +1301,7 @@ public class TestCapacityScheduler extends TestCase {
     
     scheduler.reclaimCapacity();
     
-    clock.advance(scheduler.rmConf.getReclaimTimeLimit("default") * 1000);
+    clock.advance(scheduler.schedConf.getReclaimTimeLimit("default") * 1000);
     
     scheduler.reclaimCapacity();
     
@@ -1475,19 +1475,18 @@ public class TestCapacityScheduler extends TestCase {
     String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
     
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[0] , "Guaranteed Capacity : 50.0 %");
-    assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + " ");
-    assertEquals(infoStrings[2] , "Guaranteed Capacity Reduces : " + totalReduces * 50/100 + " ");
-    assertEquals(infoStrings[3] , "User Limit : 25 %");
-    assertEquals(infoStrings[4] , "Reclaim Time limit : " + 
-        StringUtils.formatTime(1000000) + " ");
-    assertEquals(infoStrings[5] , "Priority Supported : YES ");
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 0 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[1] , "Guaranteed Capacity Percentage: 50.0%");
+    assertEquals(infoStrings[7] , "Guaranteed Capacity: " + totalMaps * 50/100);
+    assertEquals(infoStrings[11] , "Guaranteed Capacity: " + totalReduces * 50/100);
+    assertEquals(infoStrings[2] , "User Limit: 25%");
+    assertEquals(infoStrings[3] , "Reclaim Time limit: " + 
+        StringUtils.formatTime(1000000));
+    assertEquals(infoStrings[4] , "Priority Supported: YES");
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 0");
+    assertEquals(infoStrings[16] , "Number of users who have submitted jobs: 0");
     assertEquals(schedulingInfo, schedulingInfo2);
     
     //Testing with actual job submission.
@@ -1498,12 +1497,10 @@ public class TestCapacityScheduler extends TestCase {
     infoStrings = schedulingInfo.split("\n");
     
     //waiting job should be equal to number of jobs submitted.
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] ,
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
     
     //Initalize the jobs but don't raise events
     p.selectJobsToInitialize();
@@ -1511,14 +1508,12 @@ public class TestCapacityScheduler extends TestCase {
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
+    assertEquals(infoStrings.length, 17);
     //should be previous value as nothing is scheduled because no events
     //has been raised after initialization.
-    assertEquals(infoStrings[7] ,
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
     
     //Raise status change event so that jobs can move to running queue.
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
@@ -1529,28 +1524,27 @@ public class TestCapacityScheduler extends TestCase {
     
     //Get scheduling information, now the number of waiting job should have
     //changed to 4 as one is scheduled and has become running.
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+    assertEquals(infoStrings.length, 19);
+    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[17] , "Number of Waiting Jobs: 4");
     
     //assign a reduce task
-    
     Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
+    assertEquals(infoStrings.length, 21);
+    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[14],"Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[19] , "Number of Waiting Jobs: 4");
     
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
@@ -1558,16 +1552,15 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
     taskTrackerManager.finalizeJob(u1j1);
     
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] ,
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
-    
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 4");
     
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1576,15 +1569,15 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
     //Run initializer to clean up failed jobs
     p.selectJobsToInitialize();
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] ,
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 3 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 3");
     
     //Fail a job which is not initialized but is in the waiting queue.
     FakeJobInProgress u1j5 = userJobs.get(4);
@@ -1594,25 +1587,23 @@ public class TestCapacityScheduler extends TestCase {
     taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
     //run initializer to clean up failed job
     p.selectJobsToInitialize();
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 2 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 2");
     
     //Raise status change events as none of the intialized jobs would be
     //in running queue as we just failed the second job which was initialized
     //and completed the first one.
-    
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
     
     //Now schedule a map should be job3 of the user as job1 succeeded job2
     //failed and now job3 is running
-    
     t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
     FakeJobInProgress u1j3 = userJobs.get(2);
     assertTrue("User Job 3 not running ", 
@@ -1621,28 +1612,28 @@ public class TestCapacityScheduler extends TestCase {
     //now the running count of map should be one and waiting jobs should be
     //one. run the poller as it is responsible for waiting count
     p.selectJobsToInitialize();
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 100.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
+    assertEquals(infoStrings.length, 19);
+    assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[17] , "Number of Waiting Jobs: 1");
     
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
+    // make sure we update our stats
+    scheduler.updateQSIInfo();
     //Now running counts should become zero
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 10);
-    assertEquals(infoStrings[7] , 
-        "Running Maps : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[8] , 
-        "Running Reduces : 0.0 % of Guaranteed Capacity");
-    assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
+    assertEquals(infoStrings.length, 17);
+    assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
+    assertEquals(infoStrings[15] , "Number of Waiting Jobs: 1");
     
   }
 

Некоторые файлы не были показаны из-за большого количества измененных файлов