|
@@ -252,6 +252,9 @@ public class TestCapacityScheduler extends TestCase {
|
|
@Override
|
|
@Override
|
|
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
|
|
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
|
|
int clusterSize, int ignored) throws IOException {
|
|
int clusterSize, int ignored) throws IOException {
|
|
|
|
+ if (!scheduleReduces()) {
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
|
|
boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
|
|
if (areAllReducesRunning){
|
|
if (areAllReducesRunning){
|
|
if(!getJobConf().getReduceSpeculativeExecution() ||
|
|
if(!getJobConf().getReduceSpeculativeExecution() ||
|
|
@@ -3424,6 +3427,160 @@ public class TestCapacityScheduler extends TestCase {
|
|
assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
|
|
assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // test that 1st user gets reduce slots when 2nd user haven't finished
|
|
|
|
+ // enough map tasks yet
|
|
|
|
+ public void testUserLimit() throws Exception {
|
|
|
|
+ // set up some queues
|
|
|
|
+ String[] qs = {"default", "q2"};
|
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
|
+ queues.add(new FakeQueueInfo("default", 40.0f, true, 50));
|
|
|
|
+ queues.add(new FakeQueueInfo("q2", 60.0f, false, 50));
|
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+
|
|
|
|
+ // add some more TTs
|
|
|
|
+ taskTrackerManager.addTaskTracker("tt3");
|
|
|
|
+ taskTrackerManager.addTaskTracker("tt4");
|
|
|
|
+ taskTrackerManager.addTaskTracker("tt5");
|
|
|
|
+
|
|
|
|
+ // submit a job
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
|
|
|
|
+ // for queue 'default', the capacity for maps is 4. Since we're the only user,
|
|
|
|
+ // we should get 2 map tasks & 1 reduce
|
|
|
|
+ checkAssignments("tt1",
|
|
|
|
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
|
|
|
|
+ "attempt_test_0001_m_000002_0 on tt1",
|
|
|
|
+ "attempt_test_0001_r_000001_0 on tt1"});
|
|
|
|
+
|
|
|
|
+ // Submit another job, from a different user
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 6, 1, "default", "u2");
|
|
|
|
+ // FakeJobInProgress uses override initTasks() to init tasks, which does
|
|
|
|
+ // not initialize completedMapsForReduceSlowstart, and there is no proper
|
|
|
|
+ // API to set completedMapsForReduceSlowstart neither.
|
|
|
|
+ // We manually set completedMapsForReduceSlowstart here to accommadate this
|
|
|
|
+ // test, and to avoid changing mapredcuce code for testing purpose only
|
|
|
|
+ j2.completedMapsForReduceSlowstart = 3;
|
|
|
|
+
|
|
|
|
+ // Now if I ask for a map task, it should come from the second job
|
|
|
|
+ // reduce task will be from j1 since j2 hasn't completed enough map task
|
|
|
|
+ checkAssignments("tt2",
|
|
|
|
+ new String[] {"attempt_test_0002_m_000001_0 on tt2",
|
|
|
|
+ "attempt_test_0002_m_000002_0 on tt2",
|
|
|
|
+ "attempt_test_0001_r_000002_0 on tt2"});
|
|
|
|
+
|
|
|
|
+ // Now if I ask for map tasks again, 1 from j2, 1 from job1
|
|
|
|
+ // no reduce task since j1 used up queue capacity & j2 hasn't completed
|
|
|
|
+ // enough map tasks yet
|
|
|
|
+ checkAssignments("tt3",
|
|
|
|
+ new String[] {"attempt_test_0001_m_000003_0 on tt3",
|
|
|
|
+ "attempt_test_0002_m_000003_0 on tt3"});
|
|
|
|
+
|
|
|
|
+ // Now if I ask for map tasks again, 1 from j2, 1 from j1,
|
|
|
|
+ // no reduce task still
|
|
|
|
+ checkAssignments("tt4",
|
|
|
|
+ new String[] {"attempt_test_0001_m_000004_0 on tt4",
|
|
|
|
+ "attempt_test_0002_m_000004_0 on tt4"});
|
|
|
|
+
|
|
|
|
+ // complete 3 tasks from j2
|
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
|
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0", j2);
|
|
|
|
+ taskTrackerManager.finishTask("tt3", "attempt_test_0002_m_000003_0", j2);
|
|
|
|
+
|
|
|
|
+ // Now if I ask for map tasks again, 2 from j2,
|
|
|
|
+ // for reduce tasks, 1 from j2
|
|
|
|
+ checkAssignments("tt5",
|
|
|
|
+ new String[] {"attempt_test_0002_m_000005_0 on tt5",
|
|
|
|
+ "attempt_test_0002_m_000006_0 on tt5",
|
|
|
|
+ "attempt_test_0002_r_000001_0 on tt5"});
|
|
|
|
+
|
|
|
|
+ // complete 2 tasks from j1
|
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
|
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", j1);
|
|
|
|
+
|
|
|
|
+ // Now if I ask for tasks again, 1 map & 1 reduce from j1
|
|
|
|
+ checkAssignments("tt1",
|
|
|
|
+ new String[] {"attempt_test_0001_m_000005_0 on tt1",
|
|
|
|
+ "attempt_test_0001_r_000003_0 on tt1"});
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // test that 1st user gets reduce slots when 2nd user haven't finished
|
|
|
|
+ // enough map tasks yet, witout exceeding MaxCapacity
|
|
|
|
+ public void testUserLimitWithMaxCapacity() throws Exception {
|
|
|
|
+ // set up some queues
|
|
|
|
+ String[] qs = {"default", "q2"};
|
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
|
+ queues.add(new FakeQueueInfo("default", 40.0f, true, 50));
|
|
|
|
+ queues.add(new FakeQueueInfo("q2", 60.0f, false, 50));
|
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
|
+ resConf.setMaxCapacity("default", 60.0f);
|
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+
|
|
|
|
+ // add some more TTs
|
|
|
|
+ taskTrackerManager.addTaskTracker("tt3");
|
|
|
|
+ taskTrackerManager.addTaskTracker("tt4");
|
|
|
|
+ taskTrackerManager.addTaskTracker("tt5");
|
|
|
|
+
|
|
|
|
+ // submit a job
|
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
|
|
|
|
+ // for queue 'default', the capacity for maps is 4. Since we're the only user,
|
|
|
|
+ // we should get 2 map tasks & 1 reduce
|
|
|
|
+ checkAssignments("tt1",
|
|
|
|
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
|
|
|
|
+ "attempt_test_0001_m_000002_0 on tt1",
|
|
|
|
+ "attempt_test_0001_r_000001_0 on tt1"});
|
|
|
|
+
|
|
|
|
+ // Submit another job, from a different user
|
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 6, 1, "default", "u2");
|
|
|
|
+ // FakeJobInProgress uses override initTasks() to init tasks, which does
|
|
|
|
+ // not initialize completedMapsForReduceSlowstart, and there is no proper
|
|
|
|
+ // API to set completedMapsForReduceSlowstart neither.
|
|
|
|
+ // We manually set completedMapsForReduceSlowstart here to accommadate this
|
|
|
|
+ // test, and to avoid changing mapredcuce code for testing purpose only
|
|
|
|
+ j2.completedMapsForReduceSlowstart = 3;
|
|
|
|
+
|
|
|
|
+ // Now if I ask for a map task, it should come from the second job
|
|
|
|
+ // reduce task will be from j1 since j2 hasn't completed enough map task
|
|
|
|
+ checkAssignments("tt2",
|
|
|
|
+ new String[] {"attempt_test_0002_m_000001_0 on tt2",
|
|
|
|
+ "attempt_test_0002_m_000002_0 on tt2",
|
|
|
|
+ "attempt_test_0001_r_000002_0 on tt2"});
|
|
|
|
+
|
|
|
|
+ // Now if I ask for map tasks again, 1 from j2, 1 from job1
|
|
|
|
+ // no reduce task since j1 used up queue capacity & j2 hasn't completed
|
|
|
|
+ // enough map tasks yet
|
|
|
|
+ checkAssignments("tt3",
|
|
|
|
+ new String[] {"attempt_test_0001_m_000003_0 on tt3",
|
|
|
|
+ "attempt_test_0002_m_000003_0 on tt3"});
|
|
|
|
+
|
|
|
|
+ // Now if I ask for map tasks again, no tasks since it reached maxcapacity
|
|
|
|
+ checkAssignments("tt4", new String[] {});
|
|
|
|
+
|
|
|
|
+ // complete 3 tasks from j2
|
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
|
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0", j2);
|
|
|
|
+ taskTrackerManager.finishTask("tt3", "attempt_test_0002_m_000003_0", j2);
|
|
|
|
+
|
|
|
|
+ // Now if I ask for map tasks again, 2 from j2,
|
|
|
|
+ // for reduce tasks, 1 from j2
|
|
|
|
+ checkAssignments("tt5",
|
|
|
|
+ new String[] {"attempt_test_0002_m_000004_0 on tt5",
|
|
|
|
+ "attempt_test_0002_m_000005_0 on tt5",
|
|
|
|
+ "attempt_test_0002_r_000001_0 on tt5"});
|
|
|
|
+
|
|
|
|
+ // complete 2 tasks from j1
|
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
|
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", j1);
|
|
|
|
+
|
|
|
|
+ // Now if I ask for tasks again, 1 map & 1 reduce from j1
|
|
|
|
+ checkAssignments("tt1",
|
|
|
|
+ new String[] {"attempt_test_0001_m_000004_0 on tt1",
|
|
|
|
+ "attempt_test_0001_r_000003_0 on tt1"});
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Checks for multiple assignment.
|
|
* Checks for multiple assignment.
|
|
*
|
|
*
|