|
@@ -23,7 +23,6 @@ import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
-import java.util.Iterator;
|
|
|
import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -34,13 +33,109 @@ import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
|
-//import org.apache.hadoop.mapred.CapacityTaskScheduler;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
+
|
|
|
public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
private static int jobCounter;
|
|
|
|
|
|
+ /**
|
|
|
+ * Test class that removes the asynchronous nature of job initialization.
|
|
|
+ *
|
|
|
+ * The run method is a dummy which just waits for completion. It is
|
|
|
+ * expected that test code calls the main method, initializeJobs, directly
|
|
|
+ * to trigger initialization.
|
|
|
+ */
|
|
|
+ class ControlledJobInitializer extends
|
|
|
+ JobInitializationPoller.JobInitializationThread {
|
|
|
+
|
|
|
+ boolean stopRunning;
|
|
|
+
|
|
|
+ public ControlledJobInitializer(JobInitializationPoller p) {
|
|
|
+ p.super();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (!stopRunning) {
|
|
|
+ try {
|
|
|
+ synchronized(this) {
|
|
|
+ this.wait();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void stopRunning() {
|
|
|
+ stopRunning = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test class that removes the asynchronous nature of job initialization.
|
|
|
+ *
|
|
|
+ * The run method is a dummy which just waits for completion. It is
|
|
|
+ * expected that test code calls the main method, selectJobsToInitialize,
|
|
|
+ * directly to trigger initialization.
|
|
|
+ *
|
|
|
+ * The class also creates the test worker thread objects of type
|
|
|
+ * ControlledJobInitializer instead of the objects of the actual class
|
|
|
+ */
|
|
|
+ class ControlledInitializationPoller extends JobInitializationPoller {
|
|
|
+
|
|
|
+ private boolean stopRunning;
|
|
|
+ private ArrayList<ControlledJobInitializer> workers;
|
|
|
+
|
|
|
+ public ControlledInitializationPoller(JobQueuesManager mgr,
|
|
|
+ CapacitySchedulerConf rmConf,
|
|
|
+ Set<String> queues) {
|
|
|
+ super(mgr, rmConf, queues);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ // don't do anything here.
|
|
|
+ while (!stopRunning) {
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ this.wait();
|
|
|
+ }
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ JobInitializationThread createJobInitializationThread() {
|
|
|
+ ControlledJobInitializer t = new ControlledJobInitializer(this);
|
|
|
+ if (workers == null) {
|
|
|
+ workers = new ArrayList<ControlledJobInitializer>();
|
|
|
+ }
|
|
|
+ workers.add(t);
|
|
|
+ return t;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ void selectJobsToInitialize() {
|
|
|
+ super.selectJobsToInitialize();
|
|
|
+ for (ControlledJobInitializer t : workers) {
|
|
|
+ t.initializeJobs();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void stopRunning() {
|
|
|
+ stopRunning = true;
|
|
|
+ for (ControlledJobInitializer t : workers) {
|
|
|
+ t.stopRunning();
|
|
|
+ t.interrupt();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
static class FakeJobInProgress extends JobInProgress {
|
|
|
|
|
|
private FakeTaskTrackerManager taskTrackerManager;
|
|
@@ -208,12 +303,18 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
new HashMap<String, TaskStatus>();
|
|
|
|
|
|
public FakeTaskTrackerManager() {
|
|
|
+ this(2, 1);
|
|
|
+ }
|
|
|
+
|
|
|
+ public FakeTaskTrackerManager(int maxMapSlots, int maxReduceSlots) {
|
|
|
trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
|
|
|
new ArrayList<TaskStatus>(), 0,
|
|
|
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
|
|
|
+ maxMapSlots, maxReduceSlots));
|
|
|
+ maxMapTasksPerTracker = maxMapSlots;
|
|
|
trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
|
|
|
new ArrayList<TaskStatus>(), 0,
|
|
|
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
|
|
|
+ maxMapSlots, maxReduceSlots));
|
|
|
+ maxReduceTasksPerTracker = maxReduceSlots;
|
|
|
}
|
|
|
|
|
|
public void addTaskTracker(String ttName) {
|
|
@@ -405,6 +506,16 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
public boolean isPrioritySupported(String queue) {
|
|
|
return queueMap.get(queue).supportsPrio;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public long getSleepInterval() {
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int getMaxWorkerThreads() {
|
|
|
+ return 1;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
protected class FakeClock extends CapacityTaskScheduler.Clock {
|
|
@@ -522,8 +633,9 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
scheduler.jobQueuesManager.jobUpdated(event);
|
|
|
|
|
|
// check if the jobs are missing from the waiting queue
|
|
|
- assertEquals("Waiting queue is garbled on job init", 0,
|
|
|
- scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
+ // The jobs are not removed from waiting queue until they are scheduled
|
|
|
+ assertEquals("Waiting queue is garbled on job init", 2,
|
|
|
+ scheduler.jobQueuesManager.getJobs("default")
|
|
|
.size());
|
|
|
|
|
|
// test if changing the job priority/start-time works as expected in the
|
|
@@ -550,7 +662,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// check if the running queue size is correct
|
|
|
assertEquals("Job finish garbles the queue",
|
|
|
1, rqueue.size());
|
|
|
-
|
|
|
+
|
|
|
}
|
|
|
|
|
|
// test if the queue reflects the changes
|
|
@@ -607,7 +719,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
private JobInProgress[] getJobsInQueue(boolean waiting) {
|
|
|
Collection<JobInProgress> queue =
|
|
|
waiting
|
|
|
- ? scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
+ ? scheduler.jobQueuesManager.getJobs("default")
|
|
|
: scheduler.jobQueuesManager.getRunningJobQueue("default");
|
|
|
return queue.toArray(new JobInProgress[0]);
|
|
|
}
|
|
@@ -711,24 +823,23 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
resConf.setFakeQueues(queues);
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
scheduler.start();
|
|
|
+ HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
|
|
|
+ submitJobs(1, 4, "default");
|
|
|
+
|
|
|
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
|
|
|
|
- // submit a job
|
|
|
- JobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
|
|
|
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ while(mgr.getJobs("default").size() < 4){
|
|
|
+ Thread.sleep(1);
|
|
|
+ }
|
|
|
+ //Raise status change events for jobs submitted.
|
|
|
+ raiseStatusChangeEvents(mgr);
|
|
|
+ Collection<JobInProgress> jobs = scheduler.getJobs("default");
|
|
|
|
|
|
- // submit another job
|
|
|
- JobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
|
|
|
+ assertTrue("Number of jobs returned by scheduler is wrong"
|
|
|
+ ,jobs.size() == 4);
|
|
|
|
|
|
- Collection<JobInProgress> jobs = scheduler.getJobs("default");
|
|
|
- assertEquals(2, jobs.size());
|
|
|
- Iterator<JobInProgress> iter = jobs.iterator();
|
|
|
- assertEquals(j1, iter.next());
|
|
|
- assertEquals(j2, iter.next());
|
|
|
-
|
|
|
- assertEquals(1, scheduler.jobQueuesManager.
|
|
|
- getRunningJobQueue("default").size());
|
|
|
- assertEquals(1, scheduler.jobQueuesManager.
|
|
|
- getWaitingJobQueue("default").size());
|
|
|
+ assertTrue("Submitted jobs and Returned jobs are not same",
|
|
|
+ subJobsList.get("u1").containsAll(jobs));
|
|
|
}
|
|
|
|
|
|
//Basic test to test GC allocation across the queues which have no
|
|
@@ -970,13 +1081,16 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// u1 finishes a task
|
|
|
taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
|
|
|
// u1 submits a few more jobs
|
|
|
- submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
- submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
- submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
+ // All the jobs are inited when submitted
|
|
|
+ // because of addition of Eager Job Initializer all jobs in this
|
|
|
+ //case would e initialised.
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
// u2 also submits a job
|
|
|
- submitJob(JobStatus.PREP, 10, 10, null, "u2");
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
|
|
|
// now u3 submits a job
|
|
|
- submitJob(JobStatus.PREP, 2, 2, null, "u3");
|
|
|
+ submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
|
|
|
// next slot should go to u3, even though u2 has an earlier job, since
|
|
|
// user limits have changed and u1/u2 are over limits
|
|
|
checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
|
|
@@ -1013,7 +1127,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
|
|
|
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
|
|
|
// now submit a job to q2
|
|
|
- FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
+ FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
// update our structures
|
|
|
scheduler.updateQSIInfo();
|
|
|
// get scheduler to notice that q2 needs to reclaim
|
|
@@ -1070,7 +1184,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// at this point, q3 is running 5 tasks (with a cap of 2), q4 is
|
|
|
// running 3 tasks (with a cap of 1).
|
|
|
// If we submit a job to 'default', we need to get 3 slots back.
|
|
|
- FakeJobInProgress j4 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
+ FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
// update our structures
|
|
|
scheduler.updateQSIInfo();
|
|
|
// get scheduler to notice that q2 needs to reclaim
|
|
@@ -1184,4 +1298,287 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
return tasks.get(0);
|
|
|
}
|
|
|
|
|
|
+ /*
|
|
|
+ * Test cases for Job Initialization poller.
|
|
|
+ */
|
|
|
+
|
|
|
+ /*
|
|
|
+ * This test verifies that the correct number of jobs for
|
|
|
+ * correct number of users is initialized.
|
|
|
+ * It also verifies that as jobs of users complete, new jobs
|
|
|
+ * from the correct users are initialized.
|
|
|
+ */
|
|
|
+ public void testJobInitialization() throws Exception {
|
|
|
+ // set up the scheduler
|
|
|
+ String[] qs = { "default" };
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(1, 1);
|
|
|
+ scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ resConf = new FakeResourceManagerConf();
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
+ scheduler.jobQueuesManager,
|
|
|
+ resConf,
|
|
|
+ resConf.getQueues());
|
|
|
+ scheduler.setInitializationPoller(p);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
|
+ JobInitializationPoller initPoller = scheduler.getInitializationPoller();
|
|
|
+
|
|
|
+ // submit 4 jobs each for 3 users.
|
|
|
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs = submitJobs(3,
|
|
|
+ 4, "default");
|
|
|
+
|
|
|
+ // get the jobs submitted.
|
|
|
+ ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
|
|
|
+ ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
|
|
|
+ ArrayList<FakeJobInProgress> u3Jobs = userJobs.get("u3");
|
|
|
+
|
|
|
+ // reference to the initializedJobs data structure
|
|
|
+ // changes are reflected in the set as they are made by the poller
|
|
|
+ HashSet<JobID> initializedJobs = initPoller.getInitializedJobList();
|
|
|
+
|
|
|
+ // we should have 12 (3 x 4) jobs in the job queue
|
|
|
+ assertEquals(mgr.getJobs("default").size(), 12);
|
|
|
+
|
|
|
+ // run one poller iteration.
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ // the poller should initialize 6 jobs
|
|
|
+ // 3 users and 2 jobs from each
|
|
|
+ assertEquals(initializedJobs.size(), 6);
|
|
|
+
|
|
|
+ assertTrue("Initialized jobs didnt contain the user1 job 1",
|
|
|
+ initializedJobs.contains(u1Jobs.get(0).getJobID()));
|
|
|
+ assertTrue("Initialized jobs didnt contain the user1 job 2",
|
|
|
+ initializedJobs.contains(u1Jobs.get(1).getJobID()));
|
|
|
+ assertTrue("Initialized jobs didnt contain the user2 job 1",
|
|
|
+ initializedJobs.contains(u2Jobs.get(0).getJobID()));
|
|
|
+ assertTrue("Initialized jobs didnt contain the user2 job 2",
|
|
|
+ initializedJobs.contains(u2Jobs.get(1).getJobID()));
|
|
|
+ assertTrue("Initialized jobs didnt contain the user3 job 1",
|
|
|
+ initializedJobs.contains(u3Jobs.get(0).getJobID()));
|
|
|
+ assertTrue("Initialized jobs didnt contain the user3 job 2",
|
|
|
+ initializedJobs.contains(u3Jobs.get(1).getJobID()));
|
|
|
+
|
|
|
+ // now submit one more job from another user.
|
|
|
+ FakeJobInProgress u4j1 =
|
|
|
+ submitJob(JobStatus.PREP, 1, 1, "default", "u4");
|
|
|
+
|
|
|
+ // run the poller again.
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ // since no jobs have started running, there should be no
|
|
|
+ // change to the initialized jobs.
|
|
|
+ assertEquals(initializedJobs.size(), 6);
|
|
|
+ assertFalse("Initialized jobs contains user 4 jobs",
|
|
|
+ initializedJobs.contains(u4j1.getJobID()));
|
|
|
+
|
|
|
+ // This event simulates raising the event on completion of setup task
|
|
|
+ // and moves the job to the running list for the scheduler to pick up.
|
|
|
+ raiseStatusChangeEvents(mgr);
|
|
|
+
|
|
|
+ // get some tasks assigned.
|
|
|
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+ Task t3 = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
|
+ Task t4 = checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
|
|
|
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(0));
|
|
|
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(0));
|
|
|
+ taskTrackerManager.finishTask("tt2", t3.getTaskID().toString(), u1Jobs.get(1));
|
|
|
+ taskTrackerManager.finishTask("tt2", t4.getTaskID().toString(), u1Jobs.get(1));
|
|
|
+
|
|
|
+ // as some jobs have running tasks, the poller will now
|
|
|
+ // pick up new jobs to initialize.
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ // count should still be the same
|
|
|
+ assertEquals(initializedJobs.size(), 6);
|
|
|
+
|
|
|
+ // new jobs that have got into the list
|
|
|
+ assertTrue(initializedJobs.contains(u1Jobs.get(2).getJobID()));
|
|
|
+ assertTrue(initializedJobs.contains(u1Jobs.get(3).getJobID()));
|
|
|
+ raiseStatusChangeEvents(mgr);
|
|
|
+
|
|
|
+ // the first two jobs are done, no longer in the initialized list.
|
|
|
+ assertFalse("Initialized jobs contains the user1 job 1",
|
|
|
+ initializedJobs.contains(u1Jobs.get(0).getJobID()));
|
|
|
+ assertFalse("Initialized jobs contains the user1 job 2",
|
|
|
+ initializedJobs.contains(u1Jobs.get(1).getJobID()));
|
|
|
+
|
|
|
+ // finish one more job
|
|
|
+ t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
|
|
|
+ t2 = checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
|
|
|
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(2));
|
|
|
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(2));
|
|
|
+
|
|
|
+ // no new jobs should be picked up, because max user limit
|
|
|
+ // is still 3.
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ assertEquals(initializedJobs.size(), 5);
|
|
|
+
|
|
|
+ // run 1 more jobs..
|
|
|
+ t1 = checkAssignment("tt1", "attempt_test_0004_m_000001_0 on tt1");
|
|
|
+ t1 = checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
|
|
|
+ taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(3));
|
|
|
+ taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(3));
|
|
|
+
|
|
|
+ // Now initialised jobs should contain user 4's job, as
|
|
|
+ // user 1's jobs are all done and the number of users is
|
|
|
+ // below the limit
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+ assertEquals(initializedJobs.size(), 5);
|
|
|
+ assertTrue(initializedJobs.contains(u4j1.getJobID()));
|
|
|
+
|
|
|
+ p.stopRunning();
|
|
|
+ }
|
|
|
+
|
|
|
+ /*
|
|
|
+ * testHighPriorityJobInitialization() shows behaviour when high priority job
|
|
|
+ * is submitted into a queue and how initialisation happens for the same.
|
|
|
+ */
|
|
|
+ public void testHighPriorityJobInitialization() throws Exception {
|
|
|
+ String[] qs = { "default"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ resConf = new FakeResourceManagerConf();
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
+ scheduler.jobQueuesManager,
|
|
|
+ resConf,
|
|
|
+ resConf.getQueues());
|
|
|
+ scheduler.setInitializationPoller(p);
|
|
|
+ scheduler.start();
|
|
|
+ JobInitializationPoller initPoller = scheduler.getInitializationPoller();
|
|
|
+ HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
|
|
|
+
|
|
|
+ // submit 3 jobs for 3 users
|
|
|
+ submitJobs(3,3,"default");
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+ assertEquals(initializedJobsList.size(), 6);
|
|
|
+
|
|
|
+ // submit 2 job for a different user. one of them will be made high priority
|
|
|
+ FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
|
|
|
+ FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
|
|
|
+
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ // shouldn't change
|
|
|
+ assertEquals(initializedJobsList.size(), 6);
|
|
|
+
|
|
|
+ assertFalse("Contains U4J1 high priority job " ,
|
|
|
+ initializedJobsList.contains(u4j1.getJobID()));
|
|
|
+ assertFalse("Contains U4J2 Normal priority job " ,
|
|
|
+ initializedJobsList.contains(u4j2.getJobID()));
|
|
|
+
|
|
|
+ // change priority of one job
|
|
|
+ taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
|
|
|
+
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ // the high priority job should get initialized, but not the
|
|
|
+ // low priority job from u4, as we have already exceeded the
|
|
|
+ // limit.
|
|
|
+ assertEquals(initializedJobsList.size(), 7);
|
|
|
+ assertTrue("Does not contain U4J1 high priority job " ,
|
|
|
+ initializedJobsList.contains(u4j1.getJobID()));
|
|
|
+ assertFalse("Contains U4J2 Normal priority job " ,
|
|
|
+ initializedJobsList.contains(u4j2.getJobID()));
|
|
|
+ p.stopRunning();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testJobMovement() throws Exception {
|
|
|
+ String[] qs = { "default"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ resConf = new FakeResourceManagerConf();
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
|
|
|
+ scheduler.jobQueuesManager,
|
|
|
+ resConf,
|
|
|
+ resConf.getQueues());
|
|
|
+ scheduler.setInitializationPoller(p);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
|
+ JobInitializationPoller initPoller = scheduler.getInitializationPoller();
|
|
|
+ HashSet<JobID> initializedJobsList = initPoller.getInitializedJobList();
|
|
|
+
|
|
|
+ // submit a job
|
|
|
+ FakeJobInProgress job =
|
|
|
+ submitJob(JobStatus.PREP, 1, 1, "default", "u1");
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ assertEquals(initializedJobsList.size(), 1);
|
|
|
+
|
|
|
+ // make it running.
|
|
|
+ raiseStatusChangeEvents(mgr);
|
|
|
+
|
|
|
+ // it should be there in both the queues.
|
|
|
+ assertTrue("Job not present in Job Queue",
|
|
|
+ mgr.getJobs("default").contains(job));
|
|
|
+ assertTrue("Job not present in Running Queue",
|
|
|
+ mgr.getRunningJobQueue("default").contains(job));
|
|
|
+
|
|
|
+ // assign a task
|
|
|
+ Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+
|
|
|
+ p.selectJobsToInitialize();
|
|
|
+
|
|
|
+ // now this task should be removed from the initialized list.
|
|
|
+ assertTrue(initializedJobsList.isEmpty());
|
|
|
+
|
|
|
+ // the job should also be removed from the job queue as tasks
|
|
|
+ // are scheduled
|
|
|
+ assertFalse("Job present in Job Queue",
|
|
|
+ mgr.getJobs("default").contains(job));
|
|
|
+
|
|
|
+ // complete tasks and job
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job);
|
|
|
+ taskTrackerManager.finalizeJob(job);
|
|
|
+
|
|
|
+ // make sure it is removed from the run queue
|
|
|
+ assertFalse("Job present in running queue",
|
|
|
+ mgr.getRunningJobQueue("default").contains(job));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void raiseStatusChangeEvents(JobQueuesManager mgr) {
|
|
|
+ Collection<JobInProgress> jips = mgr.getJobs("default");
|
|
|
+ for(JobInProgress jip : jips) {
|
|
|
+ if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
|
|
|
+ JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,
|
|
|
+ EventType.RUN_STATE_CHANGED,jip.getStatus());
|
|
|
+ mgr.jobUpdated(evt);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private HashMap<String, ArrayList<FakeJobInProgress>> submitJobs(
|
|
|
+ int numberOfUsers, int numberOfJobsPerUser, String queue)
|
|
|
+ throws Exception{
|
|
|
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
|
|
|
+ new HashMap<String, ArrayList<FakeJobInProgress>>();
|
|
|
+ for (int i = 1; i <= numberOfUsers; i++) {
|
|
|
+ String user = String.valueOf("u" + i);
|
|
|
+ ArrayList<FakeJobInProgress> jips = new ArrayList<FakeJobInProgress>();
|
|
|
+ for (int j = 1; j <= numberOfJobsPerUser; j++) {
|
|
|
+ jips.add(submitJob(JobStatus.PREP, 1, 1, queue, user));
|
|
|
+ }
|
|
|
+ userJobs.put(user, jips);
|
|
|
+ }
|
|
|
+ return userJobs;
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
}
|