|
@@ -78,7 +78,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
@Override
|
|
|
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
|
|
|
int ignored) throws IOException {
|
|
|
- if (runningMapTasks == numMapTasks) return null;
|
|
|
+ if (mapTaskCtr == numMapTasks) return null;
|
|
|
TaskAttemptID attemptId = getTaskAttemptID(true);
|
|
|
Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
|
|
|
@Override
|
|
@@ -97,7 +97,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
@Override
|
|
|
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
|
|
|
int clusterSize, int ignored) throws IOException {
|
|
|
- if (runningReduceTasks == numReduceTasks) return null;
|
|
|
+ if (redTaskCtr == numReduceTasks) return null;
|
|
|
TaskAttemptID attemptId = getTaskAttemptID(false);
|
|
|
Task task = new ReduceTask("", attemptId, 0, 10) {
|
|
|
@Override
|
|
@@ -422,6 +422,69 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
}*/
|
|
|
|
|
|
+ // tests if tasks can be assinged when there are multiple jobs from a same
|
|
|
+ // user
|
|
|
+ public void testJobFinished() throws Exception {
|
|
|
+ taskTrackerManager.addQueues(new String[] {"default"});
|
|
|
+
|
|
|
+ resConf = new FakeResourceManagerConf();
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ // submit 2 jobs
|
|
|
+ FakeJobInProgress j1 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
|
|
|
+ FakeJobInProgress j2 = submitJob(JobStatus.PREP, 3, 0, "default", "u1");
|
|
|
+
|
|
|
+ // init them and inform the scheduler
|
|
|
+ j1.initTasks();
|
|
|
+ scheduler.jobQueuesManager.jobUpdated(j1);
|
|
|
+ j2.initTasks();
|
|
|
+ scheduler.jobQueuesManager.jobUpdated(j2);
|
|
|
+
|
|
|
+ // I. Check multiple assignments with running tasks within job
|
|
|
+ // ask for a task from first job
|
|
|
+ Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ // ask for another task from the first job
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
|
+
|
|
|
+ // complete tasks
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
|
|
|
+
|
|
|
+ // II. Check multiple assignments with running tasks across jobs
|
|
|
+ // ask for a task from first job
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
|
|
|
+
|
|
|
+ // ask for a task from the second job
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
+
|
|
|
+ // complete tasks
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
|
|
|
+
|
|
|
+ // III. Check multiple assignments with completed tasks across jobs
|
|
|
+ // ask for a task from the second job
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
|
|
|
+
|
|
|
+ // complete task
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
|
|
|
+
|
|
|
+ // IV. Check assignment with completed job
|
|
|
+ // finish first job
|
|
|
+ j1.getStatus().setRunState(JobStatus.SUCCEEDED);
|
|
|
+ scheduler.jobRemoved(j1);
|
|
|
+
|
|
|
+ // ask for another task from the second job
|
|
|
+ // if tasks can be assigned then the structures are properly updated
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
|
|
|
+
|
|
|
+ // complete task
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
|
|
|
+ }
|
|
|
+
|
|
|
// basic tests, should be able to submit to queues
|
|
|
public void testSubmitToQueues() throws Exception {
|
|
|
// set up some queues
|