|
@@ -243,10 +243,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
return (Set<TaskInProgress>)reduceTips;
|
|
return (Set<TaskInProgress>)reduceTips;
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- synchronized void fail() {
|
|
|
|
- getStatus().setRunState(JobStatus.FAILED);
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
static class FakeTaskInProgress extends TaskInProgress {
|
|
static class FakeTaskInProgress extends TaskInProgress {
|
|
@@ -1343,6 +1339,39 @@ public class TestCapacityScheduler extends TestCase {
|
|
assertEquals(j1.runningMapTasks, 2);
|
|
assertEquals(j1.runningMapTasks, 2);
|
|
|
|
|
|
}
|
|
}
|
|
|
|
+ /*
|
|
|
|
+ * Following is the testing strategy for testing scheduling information.
|
|
|
|
+ * - start capacity scheduler with two queues.
|
|
|
|
+ * - check the scheduling information with respect to the configuration
|
|
|
|
+ * which was used to configure the queues.
|
|
|
|
+ * - Submit 5 jobs to a queue.
|
|
|
|
+ * - Check the waiting jobs count, it should be 5.
|
|
|
|
+ * - Then run initializationPoller()
|
|
|
|
+ * - Check once again the waiting queue, it should be 5 jobs again.
|
|
|
|
+ * - Then raise status change events.
|
|
|
|
+ * - Assign one task to a task tracker. (Map)
|
|
|
|
+ * - Check waiting job count, it should be 4 now and used map (%) = 100
|
|
|
|
+ * - Assign another one task (Reduce)
|
|
|
|
+ * - Check waiting job count, it should be 4 now and used map (%) = 100
|
|
|
|
+ * and used reduce (%) = 100
|
|
|
|
+ * - finish the job and then check the used percentage it should go
|
|
|
|
+ * back to zero
|
|
|
|
+ * - Then pick an initialized job but not scheduled job and fail it.
|
|
|
|
+ * - Run the poller
|
|
|
|
+ * - Check the waiting job count should now be 3.
|
|
|
|
+ * - Now fail a job which has not been initialized at all.
|
|
|
|
+ * - Run the poller, so that it can clean up the job queue.
|
|
|
|
+ * - Check the count, the waiting job count should be 2.
|
|
|
|
+ * - Now raise status change events to move the initialized jobs which
|
|
|
|
+ * should be two in count to running queue.
|
|
|
|
+ * - Then schedule a map of the job in running queue.
|
|
|
|
+ * - Run the poller because the poller is responsible for waiting
|
|
|
|
+ * jobs count. Check the count, it should be using 100% map and one
|
|
|
|
+ * waiting job
|
|
|
|
+ * - fail the running job.
|
|
|
|
+ * - Check the count, it should be now one waiting job and zero running
|
|
|
|
+ * tasks
|
|
|
|
+ */
|
|
|
|
|
|
public void testSchedulingInformation() throws Exception {
|
|
public void testSchedulingInformation() throws Exception {
|
|
String[] qs = {"default", "q2"};
|
|
String[] qs = {"default", "q2"};
|
|
@@ -1371,35 +1400,20 @@ public class TestCapacityScheduler extends TestCase {
|
|
String[] infoStrings = schedulingInfo.split("\n");
|
|
String[] infoStrings = schedulingInfo.split("\n");
|
|
|
|
|
|
assertEquals(infoStrings.length, 10);
|
|
assertEquals(infoStrings.length, 10);
|
|
- assertEquals(infoStrings[0] , "Guaranteed Capacity (%) : 50.0 ");
|
|
|
|
|
|
+ assertEquals(infoStrings[0] , "Guaranteed Capacity : 50.0 %");
|
|
assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + " ");
|
|
assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + " ");
|
|
assertEquals(infoStrings[2] , "Guaranteed Capacity Reduces : " + totalReduces * 50/100 + " ");
|
|
assertEquals(infoStrings[2] , "Guaranteed Capacity Reduces : " + totalReduces * 50/100 + " ");
|
|
- assertEquals(infoStrings[3] , "User Limit : 25 ");
|
|
|
|
- assertEquals(infoStrings[4] , "Reclaim Time limit : 1000000 " );
|
|
|
|
- assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
|
|
|
|
- assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
|
|
|
|
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 0 ");
|
|
|
|
- assertEquals(infoStrings[8] , "Priority Supported : YES ");
|
|
|
|
- assertEquals(infoStrings[9] , "* Scheduling information can be off by " +
|
|
|
|
- "maximum of "+ StringUtils.formatTime(resConf.getSleepInterval()));
|
|
|
|
|
|
+ assertEquals(infoStrings[3] , "User Limit : 25 %");
|
|
|
|
+ assertEquals(infoStrings[4] , "Reclaim Time limit : " +
|
|
|
|
+ StringUtils.formatTime(1000000) + " ");
|
|
|
|
+ assertEquals(infoStrings[5] , "Priority Supported : YES ");
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 0 ");
|
|
assertEquals(schedulingInfo, schedulingInfo2);
|
|
assertEquals(schedulingInfo, schedulingInfo2);
|
|
|
|
|
|
- /*
|
|
|
|
- * Following is the testing strategy for testing scheduling information.
|
|
|
|
- * - Submit 5 jobs to a queue.
|
|
|
|
- * - Check the waiting jobs count, it should be 5.
|
|
|
|
- * - Then run initializationPoller()
|
|
|
|
- * - Check once again the waiting queue, it should be 5 jobs again.
|
|
|
|
- * - Then raise status change events.
|
|
|
|
- * - Assign one task to a task tracker.
|
|
|
|
- * - Check waiting job count, it should be 4 now.
|
|
|
|
- * - Then pick an initialized job but not scheduled job and fail it.
|
|
|
|
- * - Run the poller
|
|
|
|
- * - Check the waiting job count should now be 3.
|
|
|
|
- * - Now fail a job which has not been initialized at all.
|
|
|
|
- * - Run the poller, so that it can clean up the job queue.
|
|
|
|
- * - Check the count, the waiting job count should be 2.
|
|
|
|
- */
|
|
|
|
//Testing with actual job submission.
|
|
//Testing with actual job submission.
|
|
ArrayList<FakeJobInProgress> userJobs =
|
|
ArrayList<FakeJobInProgress> userJobs =
|
|
submitJobs(1, 5, "default").get("u1");
|
|
submitJobs(1, 5, "default").get("u1");
|
|
@@ -1409,9 +1423,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
//waiting job should be equal to number of jobs submitted.
|
|
//waiting job should be equal to number of jobs submitted.
|
|
assertEquals(infoStrings.length, 10);
|
|
assertEquals(infoStrings.length, 10);
|
|
- assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
|
|
|
|
- assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
|
|
|
|
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
|
|
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
|
|
|
|
|
|
//Initalize the jobs but don't raise events
|
|
//Initalize the jobs but don't raise events
|
|
p.selectJobsToInitialize();
|
|
p.selectJobsToInitialize();
|
|
@@ -1422,14 +1438,16 @@ public class TestCapacityScheduler extends TestCase {
|
|
assertEquals(infoStrings.length, 10);
|
|
assertEquals(infoStrings.length, 10);
|
|
//should be previous value as nothing is scheduled because no events
|
|
//should be previous value as nothing is scheduled because no events
|
|
//has been raised after initialization.
|
|
//has been raised after initialization.
|
|
- assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
|
|
|
|
- assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
|
|
|
|
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
|
|
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 5 ");
|
|
|
|
|
|
//Raise status change event so that jobs can move to running queue.
|
|
//Raise status change event so that jobs can move to running queue.
|
|
raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
//assign one job
|
|
//assign one job
|
|
- scheduler.assignTasks(tracker("tt1")); // heartbeat
|
|
|
|
|
|
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
//Initalize extra job.
|
|
//Initalize extra job.
|
|
p.selectJobsToInitialize();
|
|
p.selectJobsToInitialize();
|
|
|
|
|
|
@@ -1439,35 +1457,116 @@ public class TestCapacityScheduler extends TestCase {
|
|
queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
infoStrings = schedulingInfo.split("\n");
|
|
infoStrings = schedulingInfo.split("\n");
|
|
assertEquals(infoStrings.length, 10);
|
|
assertEquals(infoStrings.length, 10);
|
|
- //TODO check running task count also fix in HADOOP-4445
|
|
|
|
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 4 ");
|
|
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 100.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
|
|
|
|
+
|
|
|
|
+ //assign a reduce task
|
|
|
|
+
|
|
|
|
+ Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
|
+ schedulingInfo =
|
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 100.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 100.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
|
|
|
|
+
|
|
|
|
+ //Complete the job and check the running tasks count
|
|
|
|
+ FakeJobInProgress u1j1 = userJobs.get(0);
|
|
|
|
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
|
|
|
|
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
|
|
|
|
+ taskTrackerManager.finalizeJob(u1j1);
|
|
|
|
+
|
|
|
|
+ schedulingInfo =
|
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 4 ");
|
|
|
|
|
|
|
|
|
|
//Fail a job which is initialized but not scheduled and check the count.
|
|
//Fail a job which is initialized but not scheduled and check the count.
|
|
FakeJobInProgress u1j2 = userJobs.get(1);
|
|
FakeJobInProgress u1j2 = userJobs.get(1);
|
|
assertTrue("User1 job 2 not initalized ",
|
|
assertTrue("User1 job 2 not initalized ",
|
|
u1j2.getStatus().getRunState() == JobStatus.RUNNING);
|
|
u1j2.getStatus().getRunState() == JobStatus.RUNNING);
|
|
- u1j2.fail();
|
|
|
|
|
|
+ taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
|
|
//Run initializer to clean up failed jobs
|
|
//Run initializer to clean up failed jobs
|
|
p.selectJobsToInitialize();
|
|
p.selectJobsToInitialize();
|
|
schedulingInfo =
|
|
schedulingInfo =
|
|
queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
infoStrings = schedulingInfo.split("\n");
|
|
infoStrings = schedulingInfo.split("\n");
|
|
assertEquals(infoStrings.length, 10);
|
|
assertEquals(infoStrings.length, 10);
|
|
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 3 ");
|
|
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 3 ");
|
|
|
|
|
|
//Fail a job which is not initialized but is in the waiting queue.
|
|
//Fail a job which is not initialized but is in the waiting queue.
|
|
FakeJobInProgress u1j5 = userJobs.get(4);
|
|
FakeJobInProgress u1j5 = userJobs.get(4);
|
|
assertFalse("User1 job 5 initalized ",
|
|
assertFalse("User1 job 5 initalized ",
|
|
u1j5.getStatus().getRunState() == JobStatus.RUNNING);
|
|
u1j5.getStatus().getRunState() == JobStatus.RUNNING);
|
|
- u1j5.fail();
|
|
|
|
|
|
+
|
|
|
|
+ taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
|
|
//run initializer to clean up failed job
|
|
//run initializer to clean up failed job
|
|
p.selectJobsToInitialize();
|
|
p.selectJobsToInitialize();
|
|
schedulingInfo =
|
|
schedulingInfo =
|
|
queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
infoStrings = schedulingInfo.split("\n");
|
|
infoStrings = schedulingInfo.split("\n");
|
|
assertEquals(infoStrings.length, 10);
|
|
assertEquals(infoStrings.length, 10);
|
|
- assertEquals(infoStrings[7] , "Number of Waiting Jobs : 2 ");
|
|
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 2 ");
|
|
|
|
+
|
|
|
|
+ //Raise status change events as none of the intialized jobs would be
|
|
|
|
+ //in running queue as we just failed the second job which was initialized
|
|
|
|
+ //and completed the first one.
|
|
|
|
+
|
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
|
+
|
|
|
|
+ //Now schedule a map should be job3 of the user as job1 succeeded job2
|
|
|
|
+ //failed and now job3 is running
|
|
|
|
+
|
|
|
|
+ t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
|
|
|
|
+ FakeJobInProgress u1j3 = userJobs.get(2);
|
|
|
|
+ assertTrue("User Job 3 not running ",
|
|
|
|
+ u1j3.getStatus().getRunState() == JobStatus.RUNNING);
|
|
|
|
+
|
|
|
|
+ //now the running count of map should be one and waiting jobs should be
|
|
|
|
+ //one. run the poller as it is responsible for waiting count
|
|
|
|
+ p.selectJobsToInitialize();
|
|
|
|
+ schedulingInfo =
|
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 100.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
|
|
|
|
+
|
|
|
|
+ //Fail the executing job
|
|
|
|
+ taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
|
|
|
|
+ //Now running counts should become zero
|
|
|
|
+ schedulingInfo =
|
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
|
+ assertEquals(infoStrings[7] ,
|
|
|
|
+ "Running Maps : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[8] ,
|
|
|
|
+ "Running Reduces : 0.0 % of Guaranteed Capacity");
|
|
|
|
+ assertEquals(infoStrings[9] , "Number of Waiting Jobs : 1 ");
|
|
|
|
|
|
}
|
|
}
|
|
|
|
|