|
@@ -143,7 +143,9 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private ControlledInitializationPoller controlledInitializationPoller;
|
|
|
+
|
|
|
static class FakeJobInProgress extends JobInProgress {
|
|
|
|
|
|
private FakeTaskTrackerManager taskTrackerManager;
|
|
@@ -605,8 +607,14 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
|
|
|
conf = new JobConf();
|
|
|
+ // Don't let the JobInitializationPoller come in our way.
|
|
|
+ resConf = new FakeResourceManagerConf();
|
|
|
+ controlledInitializationPoller = new ControlledInitializationPoller(
|
|
|
+ scheduler.jobQueuesManager,
|
|
|
+ resConf,
|
|
|
+ resConf.getQueues());
|
|
|
+ scheduler.setInitializationPoller(controlledInitializationPoller);
|
|
|
scheduler.setConf(conf);
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -668,7 +676,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
public void testJobRunStateChange() throws IOException {
|
|
|
// start the scheduler
|
|
|
taskTrackerManager.addQueues(new String[] {"default"});
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -806,7 +813,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
public void testJobFinished() throws Exception {
|
|
|
taskTrackerManager.addQueues(new String[] {"default"});
|
|
|
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -862,7 +868,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
|
|
@@ -887,7 +892,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// need only one queue
|
|
|
String[] qs = { "default" };
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -918,7 +922,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
public void testGCAllocationToQueues() throws Exception {
|
|
|
String[] qs = {"default","q1","q2","q3","q4"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default",25.0f,5000,true,25));
|
|
|
queues.add(new FakeQueueInfo("q1",-1.0f,5000,true,25));
|
|
@@ -940,7 +943,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
// set the gc % as 10%, so that gc will be zero initially as
|
|
|
// the cluster capacity increase slowly.
|
|
@@ -995,7 +997,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
|
|
@@ -1022,7 +1023,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
|
|
@@ -1051,7 +1051,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
|
|
@@ -1080,7 +1079,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
|
|
@@ -1120,7 +1118,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up one queue, with 10 slots
|
|
|
String[] qs = {"default"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -1179,7 +1176,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2", "q3"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
|
|
@@ -1221,7 +1217,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2", "q3", "q4"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
|
|
@@ -1280,7 +1275,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
int maxSlots = taskTrackerManager.maxMapTasksPerTracker
|
|
|
* taskTrackerManager.taskTrackers().size();
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
|
|
@@ -1315,7 +1309,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
|
|
@@ -1378,20 +1371,13 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
public void testReclaimCapacityWithUninitializedJobs() throws IOException {
|
|
|
String[] qs = {"default", "q2"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
-
|
|
|
- ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
- scheduler.jobQueuesManager,
|
|
|
- resConf,
|
|
|
- resConf.getQueues());
|
|
|
- scheduler.setInitializationPoller(p);
|
|
|
-
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
scheduler.start();
|
|
|
+
|
|
|
//Submit one job to the default queue and get the capacity over the
|
|
|
//gc of the particular queue.
|
|
|
FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
@@ -1424,7 +1410,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// set up some queues
|
|
|
String[] qs = {"default", "q2", "q3"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
// we want q3 to have 0 GC. Map slots = 4.
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
@@ -1501,18 +1486,13 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
|
|
|
scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
|
|
|
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
- ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
- scheduler.jobQueuesManager,
|
|
|
- resConf,
|
|
|
- resConf.getQueues());
|
|
|
- scheduler.setInitializationPoller(p);
|
|
|
scheduler.start();
|
|
|
+
|
|
|
scheduler.assignTasks(tracker("tt1")); // heartbeat
|
|
|
scheduler.assignTasks(tracker("tt2")); // heartbeat
|
|
|
int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
|
|
@@ -1550,7 +1530,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
|
|
|
|
|
|
//Initalize the jobs but don't raise events
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
schedulingInfo =
|
|
|
queueManager.getJobQueueInfo("default").getSchedulingInfo();
|
|
@@ -1567,7 +1547,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
//assign one job
|
|
|
Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
//Initalize extra job.
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
//Get scheduling information, now the number of waiting job should have
|
|
|
//changed to 4 as one is scheduled and has become running.
|
|
@@ -1615,7 +1595,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
u1j2.getStatus().getRunState() == JobStatus.RUNNING);
|
|
|
taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
|
|
|
//Run initializer to clean up failed jobs
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
// make sure we update our stats
|
|
|
scheduler.updateQSIInfoForTests();
|
|
|
schedulingInfo =
|
|
@@ -1633,7 +1613,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
|
|
|
//run initializer to clean up failed job
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
// make sure we update our stats
|
|
|
scheduler.updateQSIInfoForTests();
|
|
|
schedulingInfo =
|
|
@@ -1658,7 +1638,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
//now the running count of map should be one and waiting jobs should be
|
|
|
//one. run the poller as it is responsible for waiting count
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
// make sure we update our stats
|
|
|
scheduler.updateQSIInfoForTests();
|
|
|
schedulingInfo =
|
|
@@ -1702,7 +1682,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
.setTotalPhysicalMemory(512 * 1024 * 1024L);
|
|
|
|
|
|
taskTrackerManager.addQueues(new String[] { "default" });
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -1749,7 +1728,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
ttStatus.setReservedPhysicalMemory(0);
|
|
|
|
|
|
taskTrackerManager.addQueues(new String[] { "default" });
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -1799,7 +1777,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
|
|
|
|
|
|
taskTrackerManager.addQueues(new String[] { "default" });
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -1885,7 +1862,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// Normal job on this TT would be 1GB vmem, 0.5GB pmem
|
|
|
|
|
|
taskTrackerManager.addQueues(new String[] { "default" });
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
|
|
|
resConf.setFakeQueues(queues);
|
|
@@ -1949,7 +1925,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
|
|
|
ttStatus.setReservedPhysicalMemory(0);
|
|
|
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
|
|
|
taskTrackerManager.addQueues(new String[] { "default" });
|
|
@@ -1966,11 +1941,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
resConf.setDefaultPercentOfPmemInVmem(33.3f);
|
|
|
resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
- ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
- scheduler.jobQueuesManager,
|
|
|
- resConf,
|
|
|
- resConf.getQueues());
|
|
|
- scheduler.setInitializationPoller(p);
|
|
|
scheduler.start();
|
|
|
|
|
|
LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
|
|
@@ -2022,7 +1992,6 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
|
|
|
ttStatus.setReservedPhysicalMemory(0);
|
|
|
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
|
|
|
taskTrackerManager.addQueues(new String[] { "default" });
|
|
@@ -2122,16 +2091,10 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
|
|
|
scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
- ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
- scheduler.jobQueuesManager,
|
|
|
- resConf,
|
|
|
- resConf.getQueues());
|
|
|
- scheduler.setInitializationPoller(p);
|
|
|
scheduler.start();
|
|
|
|
|
|
JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
@@ -2154,7 +2117,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
assertEquals(mgr.getWaitingJobs("default").size(), 12);
|
|
|
|
|
|
// run one poller iteration.
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
// the poller should initialize 6 jobs
|
|
|
// 3 users and 2 jobs from each
|
|
@@ -2178,7 +2141,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
submitJob(JobStatus.PREP, 1, 1, "default", "u4");
|
|
|
|
|
|
// run the poller again.
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
// since no jobs have started running, there should be no
|
|
|
// change to the initialized jobs.
|
|
@@ -2202,7 +2165,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// as some jobs have running tasks, the poller will now
|
|
|
// pick up new jobs to initialize.
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
// count should still be the same
|
|
|
assertEquals(initializedJobs.size(), 6);
|
|
@@ -2226,7 +2189,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// no new jobs should be picked up, because max user limit
|
|
|
// is still 3.
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
assertEquals(initializedJobs.size(), 5);
|
|
|
|
|
@@ -2239,11 +2202,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// Now initialised jobs should contain user 4's job, as
|
|
|
// user 1's jobs are all done and the number of users is
|
|
|
// below the limit
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
assertEquals(initializedJobs.size(), 5);
|
|
|
assertTrue(initializedJobs.contains(u4j1.getJobID()));
|
|
|
|
|
|
- p.stopRunning();
|
|
|
+ controlledInitializationPoller.stopRunning();
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -2253,30 +2216,25 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
public void testHighPriorityJobInitialization() throws Exception {
|
|
|
String[] qs = { "default"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
- ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
- scheduler.jobQueuesManager,
|
|
|
- resConf,
|
|
|
- resConf.getQueues());
|
|
|
- scheduler.setInitializationPoller(p);
|
|
|
scheduler.start();
|
|
|
+
|
|
|
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
|
|
|
Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
|
|
|
|
|
|
// submit 3 jobs for 3 users
|
|
|
submitJobs(3,3,"default");
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
assertEquals(initializedJobsList.size(), 6);
|
|
|
|
|
|
// submit 2 job for a different user. one of them will be made high priority
|
|
|
FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
|
|
|
FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
|
|
|
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
// shouldn't change
|
|
|
assertEquals(initializedJobsList.size(), 6);
|
|
@@ -2289,7 +2247,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// change priority of one job
|
|
|
taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
|
|
|
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
// the high priority job should get initialized, but not the
|
|
|
// low priority job from u4, as we have already exceeded the
|
|
@@ -2299,22 +2257,16 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
initializedJobsList.contains(u4j1.getJobID()));
|
|
|
assertFalse("Contains U4J2 Normal priority job " ,
|
|
|
initializedJobsList.contains(u4j2.getJobID()));
|
|
|
- p.stopRunning();
|
|
|
+ controlledInitializationPoller.stopRunning();
|
|
|
}
|
|
|
|
|
|
public void testJobMovement() throws Exception {
|
|
|
String[] qs = { "default"};
|
|
|
taskTrackerManager.addQueues(qs);
|
|
|
- resConf = new FakeResourceManagerConf();
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
|
|
|
resConf.setFakeQueues(queues);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
- ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
- scheduler.jobQueuesManager,
|
|
|
- resConf,
|
|
|
- resConf.getQueues());
|
|
|
- scheduler.setInitializationPoller(p);
|
|
|
scheduler.start();
|
|
|
|
|
|
JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
@@ -2340,7 +2292,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// submit a job
|
|
|
FakeJobInProgress job =
|
|
|
submitJob(JobStatus.PREP, 1, 1, "default", "u1");
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
assertEquals(p.getInitializedJobList().size(), 1);
|
|
|
|
|
@@ -2357,7 +2309,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
|
|
|
- p.selectJobsToInitialize();
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
|
|
|
// now this task should be removed from the initialized list.
|
|
|
assertTrue(p.getInitializedJobList().isEmpty());
|