|
@@ -36,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
|
+import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
|
|
@@ -241,6 +242,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
Set<TaskInProgress> getRunningReduces() {
|
|
|
return (Set<TaskInProgress>)reduceTips;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ synchronized void fail() {
|
|
|
+ getStatus().setRunState(JobStatus.FAILED);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
static class FakeTaskInProgress extends TaskInProgress {
|
|
@@ -1338,8 +1344,10 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
}
|
|
|
|
|
|
- public void testSchedulingInformation() throws IOException {
|
|
|
+ public void testSchedulingInformation() throws Exception {
|
|
|
String[] qs = {"default", "q2"};
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
|
|
|
+ scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
@@ -1347,6 +1355,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
+ scheduler.jobQueuesManager,
|
|
|
+ resConf,
|
|
|
+ resConf.getQueues());
|
|
|
+ scheduler.setInitializationPoller(p);
|
|
|
scheduler.start();
|
|
|
scheduler.assignTasks(tracker("tt1")); // heartbeat
|
|
|
scheduler.assignTasks(tracker("tt2")); // heartbeat
|
|
@@ -1356,6 +1369,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
|
|
|
String[] infoStrings = schedulingInfo.split("\n");
|
|
|
+
|
|
|
assertEquals(infoStrings.length, 10);
|
|
|
assertEquals(infoStrings[0] , "Guaranteed Capacity (%) : 50.0 ");
|
|
|
assertEquals(infoStrings[1] , "Guaranteed Capacity Maps : " + totalMaps * 50/100 + " ");
|
|
@@ -1364,10 +1378,97 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
assertEquals(infoStrings[4] , "Reclaim Time limit : 1000000 " );
|
|
|
assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
|
|
|
assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
|
|
|
- assertEquals(infoStrings[7] , "Number of Waiting Maps : 0 ");
|
|
|
- assertEquals(infoStrings[8] , "Number of Waiting Reduces : 0 ");
|
|
|
- assertEquals(infoStrings[9] , "Priority Supported : YES ");
|
|
|
- assertEquals(schedulingInfo, schedulingInfo2);
|
|
|
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 0 ");
|
|
|
+ assertEquals(infoStrings[8] , "Priority Supported : YES ");
|
|
|
+ assertEquals(infoStrings[9] , "* Scheduling information can be off by " +
|
|
|
+ "maximum of "+ StringUtils.formatTime(resConf.getSleepInterval()));
|
|
|
+ assertEquals(schedulingInfo, schedulingInfo2);
|
|
|
+
|
|
|
+ /*
|
|
|
+ * Following is the testing strategy for testing scheduling information.
|
|
|
+ * - Submit 5 jobs to a queue.
|
|
|
+ * - Check the waiting jobs count, it should be 5.
|
|
|
+ * - Then run initializationPoller()
|
|
|
+ * - Check once again the waiting queue, it should be 5 jobs again.
|
|
|
+ * - Then raise status change events.
|
|
|
+ * - Assign one task to a task tracker.
|
|
|
+ * - Check waiting job count, it should be 4 now.
|
|
|
+ * - Then pick an initialized job but not scheduled job and fail it.
|
|
|
+ * - Run the poller
|
|
|
+ * - Check the waiting job count should now be 3.
|
|
|
+ * - Now fail a job which has not been initialized at all.
|
|
|
+ * - Run the poller, so that it can clean up the job queue.
|
|
|
+ * - Check the count, the waiting job count should be 2.
|
|
|
+ */
|
|
|
+ //Testing with actual job submission.
|
|
|
+ ArrayList<FakeJobInProgress> userJobs =
|
|
|
+ submitJobs(1, 5, "default").get("u1");
|
|
|
+ schedulingInfo =
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
+
|
|
|
+ //waiting job should be equal to number of jobs submitted.
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
+ assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
|
|
|
+ assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
|
|
|
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
|
|
|
+
|
|
|
+ //Initalize the jobs but don't raise events
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ schedulingInfo =
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
+ //should be previous value as nothing is scheduled because no events
|
|
|
+ //has been raised after initialization.
|
|
|
+ assertEquals(infoStrings[5] , "Number of Running Maps : 0 ");
|
|
|
+ assertEquals(infoStrings[6] , "Number of Running Reduces : 0 ");
|
|
|
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 5 ");
|
|
|
+
|
|
|
+ //Raise status change event so that jobs can move to running queue.
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
+ //assign one job
|
|
|
+ scheduler.assignTasks(tracker("tt1")); // heartbeat
|
|
|
+ //Initalize extra job.
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ //Get scheduling information, now the number of waiting job should have
|
|
|
+ //changed to 4 as one is scheduled and has become running.
|
|
|
+ schedulingInfo =
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
+ //TODO check running task count also fix in HADOOP-4445
|
|
|
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 4 ");
|
|
|
+
|
|
|
+
|
|
|
+ //Fail a job which is initialized but not scheduled and check the count.
|
|
|
+ FakeJobInProgress u1j2 = userJobs.get(1);
|
|
|
+ assertTrue("User1 job 2 not initalized ",
|
|
|
+ u1j2.getStatus().getRunState() == JobStatus.RUNNING);
|
|
|
+ u1j2.fail();
|
|
|
+ //Run initializer to clean up failed jobs
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+ schedulingInfo =
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 3 ");
|
|
|
+
|
|
|
+ //Fail a job which is not initialized but is in the waiting queue.
|
|
|
+ FakeJobInProgress u1j5 = userJobs.get(4);
|
|
|
+ assertFalse("User1 job 5 initalized ",
|
|
|
+ u1j5.getStatus().getRunState() == JobStatus.RUNNING);
|
|
|
+ u1j5.fail();
|
|
|
+ //run initializer to clean up failed job
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+ schedulingInfo =
|
|
|
+ queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
|
+ infoStrings = schedulingInfo.split("\n");
|
|
|
+ assertEquals(infoStrings.length, 10);
|
|
|
+ assertEquals(infoStrings[7] , "Number of Waiting Jobs : 2 ");
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/**
|