|
@@ -33,6 +33,7 @@ import java.util.TreeMap;
|
|
import junit.framework.TestCase;
|
|
import junit.framework.TestCase;
|
|
|
|
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
import org.apache.hadoop.io.BytesWritable;
|
|
|
|
+import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
//import org.apache.hadoop.mapred.CapacityTaskScheduler;
|
|
//import org.apache.hadoop.mapred.CapacityTaskScheduler;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
@@ -289,6 +290,32 @@ public class TestCapacityScheduler extends TestCase {
|
|
status.setRunState(TaskStatus.State.SUCCEEDED);
|
|
status.setRunState(TaskStatus.State.SUCCEEDED);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ void finalizeJob(FakeJobInProgress fjob) {
|
|
|
|
+ // take a snapshot of the status before changing it
|
|
|
|
+ JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
|
|
|
|
+ fjob.getStatus().setRunState(JobStatus.SUCCEEDED);
|
|
|
|
+ JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
|
|
|
|
+ JobStatusChangeEvent event =
|
|
|
|
+ new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus,
|
|
|
|
+ newStatus);
|
|
|
|
+ for (JobInProgressListener listener : listeners) {
|
|
|
|
+ listener.jobUpdated(event);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
|
|
|
|
+ // take a snapshot of the status before changing it
|
|
|
|
+ JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
|
|
|
|
+ fjob.setPriority(priority);
|
|
|
|
+ JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
|
|
|
|
+ JobStatusChangeEvent event =
|
|
|
|
+ new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus,
|
|
|
|
+ newStatus);
|
|
|
|
+ for (JobInProgressListener listener : listeners) {
|
|
|
|
+ listener.jobUpdated(event);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
void addQueues(String[] arr) {
|
|
void addQueues(String[] arr) {
|
|
Set<String> queues = new HashSet<String>();
|
|
Set<String> queues = new HashSet<String>();
|
|
for (String s: arr) {
|
|
for (String s: arr) {
|
|
@@ -418,6 +445,84 @@ public class TestCapacityScheduler extends TestCase {
|
|
return job;
|
|
return job;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // Submit a job and update the listeners
|
|
|
|
+ private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
|
|
|
|
+ String queue, String user)
|
|
|
|
+ throws IOException {
|
|
|
|
+ FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
|
|
|
|
+ scheduler.jobQueuesManager.jobUpdated(initTasksAndReportEvent(j));
|
|
|
|
+ return j;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Note that there is no concept of setup tasks here. So init itself should
|
|
|
|
+ // report the job-status change
|
|
|
|
+ private JobStatusChangeEvent initTasksAndReportEvent(FakeJobInProgress jip)
|
|
|
|
+ throws IOException {
|
|
|
|
+ JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
|
|
|
|
+ jip.initTasks();
|
|
|
|
+ JobStatus newStatus = (JobStatus)jip.getStatus().clone();
|
|
|
|
+ return new JobStatusChangeEvent(jip, EventType.RUN_STATE_CHANGED,
|
|
|
|
+ oldStatus, newStatus);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // test job run-state change
|
|
|
|
+ public void testJobRunStateChange() throws IOException {
|
|
|
|
+ // start the scheduler
|
|
|
|
+ taskTrackerManager.addQueues(new String[] {"default"});
|
|
|
|
+ resConf = new FakeResourceManagerConf();
|
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
|
+ queues.add(new FakeQueueInfo("default", 1f, 1, true, 1));
|
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+
|
|
|
|
+ // submit the job
|
|
|
|
+ FakeJobInProgress fjob1 =
|
|
|
|
+ submitJob(JobStatus.PREP, 1, 0, "default", "user");
|
|
|
|
+
|
|
|
|
+ FakeJobInProgress fjob2 =
|
|
|
|
+ submitJob(JobStatus.PREP, 1, 0, "default", "user");
|
|
|
|
+
|
|
|
|
+ // check if the job is in the waiting queue
|
|
|
|
+ assertTrue("Waiting queue doesnt contain queued job",
|
|
|
|
+ scheduler.jobQueuesManager.getWaitingJobQueue("default")
|
|
|
|
+ .contains(fjob1));
|
|
|
|
+
|
|
|
|
+ // change the job priority
|
|
|
|
+ taskTrackerManager.setPriority(fjob2, JobPriority.HIGH);
|
|
|
|
+
|
|
|
|
+ // Check if the priority changes are reflected
|
|
|
|
+ JobInProgress firstJob =
|
|
|
|
+ scheduler.getJobs("default").toArray(new JobInProgress[0])[0];
|
|
|
|
+ assertTrue("Priority change didnt not work as expected",
|
|
|
|
+ firstJob.getJobID().equals(fjob2.getJobID()));
|
|
|
|
+
|
|
|
|
+ // Create an event
|
|
|
|
+ JobChangeEvent event = initTasksAndReportEvent(fjob1);
|
|
|
|
+
|
|
|
|
+ // inform the scheduler
|
|
|
|
+ scheduler.jobQueuesManager.jobUpdated(event);
|
|
|
|
+
|
|
|
|
+ // check if the job is in the running queue
|
|
|
|
+ assertTrue("Running queue doesnt contain running/inited job",
|
|
|
|
+ scheduler.jobQueuesManager.getRunningJobQueue("default")
|
|
|
|
+ .contains(fjob1));
|
|
|
|
+
|
|
|
|
+ // schedule a task
|
|
|
|
+ List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
|
+
|
|
|
|
+ // complete the job
|
|
|
|
+ taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(),
|
|
|
|
+ fjob1);
|
|
|
|
+
|
|
|
|
+ // mark the job as complete
|
|
|
|
+ taskTrackerManager.finalizeJob(fjob1);
|
|
|
|
+
|
|
|
|
+ // check if the job is removed from the scheduler
|
|
|
|
+ assertFalse("Scheduler contains completed job",
|
|
|
|
+ scheduler.getJobs("default").contains(fjob1));
|
|
|
|
+ }
|
|
|
|
+
|
|
/*protected void submitJobs(int number, int state, int maps, int reduces)
|
|
/*protected void submitJobs(int number, int state, int maps, int reduces)
|
|
throws IOException {
|
|
throws IOException {
|
|
for (int i = 0; i < number; i++) {
|
|
for (int i = 0; i < number; i++) {
|
|
@@ -438,14 +543,8 @@ public class TestCapacityScheduler extends TestCase {
|
|
scheduler.start();
|
|
scheduler.start();
|
|
|
|
|
|
// submit 2 jobs
|
|
// submit 2 jobs
|
|
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
|
|
|
|
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
|
|
|
|
-
|
|
|
|
- // init them and inform the scheduler
|
|
|
|
- j1.initTasks();
|
|
|
|
- scheduler.jobQueuesManager.jobUpdated(j1);
|
|
|
|
- j2.initTasks();
|
|
|
|
- scheduler.jobQueuesManager.jobUpdated(j2);
|
|
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
|
|
|
|
|
|
// I. Check multiple assignments with running tasks within job
|
|
// I. Check multiple assignments with running tasks within job
|
|
// ask for a task from first job
|
|
// ask for a task from first job
|
|
@@ -477,8 +576,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// IV. Check assignment with completed job
|
|
// IV. Check assignment with completed job
|
|
// finish first job
|
|
// finish first job
|
|
- j1.getStatus().setRunState(JobStatus.SUCCEEDED);
|
|
|
|
- scheduler.jobRemoved(j1);
|
|
|
|
|
|
+ scheduler.jobCompleted(j1);
|
|
|
|
|
|
// ask for another task from the second job
|
|
// ask for another task from the second job
|
|
// if tasks can be assigned then the structures are properly updated
|
|
// if tasks can be assigned then the structures are properly updated
|
|
@@ -503,12 +601,13 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// submit a job with no queue specified. It should be accepted
|
|
// submit a job with no queue specified. It should be accepted
|
|
// and given to the default queue.
|
|
// and given to the default queue.
|
|
- JobInProgress j = submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
|
|
|
+ JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
|
+
|
|
// when we ask for a task, we should get one, from the job submitted
|
|
// when we ask for a task, we should get one, from the job submitted
|
|
Task t;
|
|
Task t;
|
|
t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
// submit another job, to a different queue
|
|
// submit another job, to a different queue
|
|
- j = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
|
|
|
+ j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
// now when we get a task, it should be from the second job
|
|
// now when we get a task, it should be from the second job
|
|
t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
}
|
|
}
|
|
@@ -525,7 +624,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
scheduler.start();
|
|
scheduler.start();
|
|
|
|
|
|
// submit a job
|
|
// submit a job
|
|
- JobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "default", "u1");
|
|
|
|
|
|
+ JobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
|
|
|
// submit another job
|
|
// submit another job
|
|
@@ -635,7 +734,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
scheduler.start();
|
|
scheduler.start();
|
|
|
|
|
|
// submit a job
|
|
// submit a job
|
|
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// we should get a task
|
|
// we should get a task
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
@@ -662,12 +761,12 @@ public class TestCapacityScheduler extends TestCase {
|
|
scheduler.start();
|
|
scheduler.start();
|
|
|
|
|
|
// submit a job
|
|
// submit a job
|
|
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// we should get a task
|
|
// we should get a task
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
// Submit another job, from a different user
|
|
// Submit another job, from a different user
|
|
- submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
|
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
// Now if I ask for a map task, it should come from the second job
|
|
// Now if I ask for a map task, it should come from the second job
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
// Now we're at full capacity for maps. If I ask for another map task,
|
|
// Now we're at full capacity for maps. If I ask for another map task,
|
|
@@ -691,14 +790,14 @@ public class TestCapacityScheduler extends TestCase {
|
|
scheduler.start();
|
|
scheduler.start();
|
|
|
|
|
|
// submit a job
|
|
// submit a job
|
|
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// we should get a task
|
|
// we should get a task
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
// since we're the only job, we get another map
|
|
// since we're the only job, we get another map
|
|
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
// Submit another job, from a different user
|
|
// Submit another job, from a different user
|
|
- submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
|
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
// Now if I ask for a map task, it should come from the second job
|
|
// Now if I ask for a map task, it should come from the second job
|
|
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
// and another
|
|
// and another
|
|
@@ -720,7 +819,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
scheduler.start();
|
|
scheduler.start();
|
|
|
|
|
|
// submit a job
|
|
// submit a job
|
|
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// for queue 'q2', the GC for maps is 2. Since we're the only user,
|
|
// we should get a task
|
|
// we should get a task
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
@@ -730,7 +829,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
// Submit another job, from a different user
|
|
// Submit another job, from a different user
|
|
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
// one of the task finishes
|
|
// one of the task finishes
|
|
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
|
|
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
|
|
// Now if I ask for a map task, it should come from the second job
|
|
// Now if I ask for a map task, it should come from the second job
|
|
@@ -763,7 +862,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
taskTrackerManager.addTaskTracker("tt5");
|
|
taskTrackerManager.addTaskTracker("tt5");
|
|
|
|
|
|
// u1 submits job
|
|
// u1 submits job
|
|
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
// it gets the first 5 slots
|
|
// it gets the first 5 slots
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
@@ -771,7 +870,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
|
|
checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
|
|
// u2 submits job with 4 slots
|
|
// u2 submits job with 4 slots
|
|
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 4, 4, null, "u2");
|
|
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
|
|
// u2 should get next 4 slots
|
|
// u2 should get next 4 slots
|
|
checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
|
|
checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
|
|
checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
|
|
checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
|
|
@@ -818,8 +917,8 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// set up a situation where q2 is under capacity, and default & q3
|
|
// set up a situation where q2 is under capacity, and default & q3
|
|
// are at/over capacity
|
|
// are at/over capacity
|
|
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
|
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
|
|
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
|
|
@@ -866,11 +965,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
taskTrackerManager.addTaskTracker("tt5");
|
|
taskTrackerManager.addTaskTracker("tt5");
|
|
|
|
|
|
// q2 has nothing running, default is under cap, q3 and q4 are over cap
|
|
// q2 has nothing running, default is under cap, q3 and q4 are over cap
|
|
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 2, 2, null, "u1");
|
|
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 10, 10, "q3", "u1");
|
|
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
- FakeJobInProgress j3 = submitJob(JobStatus.PREP, 10, 10, "q4", "u1");
|
|
|
|
|
|
+ FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1");
|
|
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
|
|
checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
|
|
checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
|
|
@@ -918,13 +1017,13 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
// set up a situation where q2 is under capacity, and default is
|
|
// set up a situation where q2 is under capacity, and default is
|
|
// at/over capacity
|
|
// at/over capacity
|
|
- FakeJobInProgress j1 = submitJob(JobStatus.PREP, 10, 10, null, "u1");
|
|
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
// now submit a job to q2
|
|
// now submit a job to q2
|
|
- FakeJobInProgress j2 = submitJob(JobStatus.PREP, 1, 1, "q2", "u1");
|
|
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
|
|
// update our structures
|
|
// update our structures
|
|
scheduler.updateQSIInfo();
|
|
scheduler.updateQSIInfo();
|
|
// get scheduler to notice that q2 needs to reclaim
|
|
// get scheduler to notice that q2 needs to reclaim
|
|
@@ -933,7 +1032,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
// we start reclaiming when 15 secs are left.
|
|
// we start reclaiming when 15 secs are left.
|
|
clock.advance(400000);
|
|
clock.advance(400000);
|
|
// submit another job to q2 which causes more capacity to be reclaimed
|
|
// submit another job to q2 which causes more capacity to be reclaimed
|
|
- j2 = submitJob(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
|
|
|
|
+ j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
|
|
// update our structures
|
|
// update our structures
|
|
scheduler.updateQSIInfo();
|
|
scheduler.updateQSIInfo();
|
|
clock.advance(200000);
|
|
clock.advance(200000);
|