|
@@ -738,6 +738,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
numReduceTasksPerTracker);
|
|
|
clock = new FakeClock();
|
|
|
scheduler = new CapacityTaskScheduler(clock);
|
|
|
+ scheduler.setAssignMultipleTasks(false);
|
|
|
scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
|
|
|
conf = new JobConf();
|
|
@@ -1055,7 +1056,171 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// complete task
|
|
|
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Tests whether a map and reduce task are assigned when there's
|
|
|
+ * a single queue and multiple task assignment is enabled.
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void testMultiTaskAssignmentInSingleQueue() throws Exception {
|
|
|
+ try {
|
|
|
+ setUp(1, 6, 2);
|
|
|
+ // set up some queues
|
|
|
+ String[] qs = {"default"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.setAssignMultipleTasks(true);
|
|
|
+
|
|
|
+ //Submit the job with 6 maps and 2 reduces
|
|
|
+ FakeJobInProgress j1 = submitJobAndInit(
|
|
|
+ JobStatus.PREP, 6, 2, "default", "u1");
|
|
|
+
|
|
|
+ List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ assertEquals(tasks.size(), 2);
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ LOG.info(" map task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ } else if (task.toString().contains("_r_")) {
|
|
|
+ LOG.info(" reduce task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+ } else {
|
|
|
+ fail(" should not have come here " + task.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().equals("attempt_test_0001_m_000001_0 on tt1")) {
|
|
|
+ //Now finish the task
|
|
|
+ taskTrackerManager.finishTask(
|
|
|
+ "tt1", task.getTaskID().toString(),
|
|
|
+ j1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ assertEquals(tasks.size(), 2);
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ LOG.info(" map task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
|
|
|
+ } else if (task.toString().contains("_r_")) {
|
|
|
+ LOG.info(" reduce task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_r_000002_0 on tt1");
|
|
|
+ } else {
|
|
|
+ fail(" should not have come here " + task.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //now both the reduce slots are being used , hence we should not
|
|
|
+ // get only 1 map task in this assignTasks call.
|
|
|
+ tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ assertEquals(tasks.size(), 1);
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ LOG.info(" map task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_m_000003_0 on tt1");
|
|
|
+ } else if (task.toString().contains("_r_")) {
|
|
|
+ LOG.info(" reduce task assigned " + task.toString());
|
|
|
+ fail("should not give reduce task " + task.toString());
|
|
|
+ } else {
|
|
|
+ fail(" should not have come here " + task.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ scheduler.setAssignMultipleTasks(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
|
|
|
+ try {
|
|
|
+ setUp(1, 6, 2);
|
|
|
+ // set up some queues
|
|
|
+ String[] qs = {"default","q1"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
|
|
|
+ queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+ scheduler.setAssignMultipleTasks(true);
|
|
|
+
|
|
|
+ //Submit the job with 6 maps and 2 reduces
|
|
|
+ submitJobAndInit(
|
|
|
+ JobStatus.PREP, 6, 1, "default", "u1");
|
|
|
+
|
|
|
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP,2,1,"q1","u2");
|
|
|
+
|
|
|
+ List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ assertEquals(tasks.size(), 2);
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ LOG.info(" map task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ } else if (task.toString().contains("_r_")) {
|
|
|
+ LOG.info(" reduce task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+ } else {
|
|
|
+ fail(" should not have come here " + task.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // next assignment will be for job in second queue.
|
|
|
+ tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ assertEquals(tasks.size(), 2);
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ LOG.info(" map task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0002_m_000001_0 on tt1");
|
|
|
+ } else if (task.toString().contains("_r_")) {
|
|
|
+ LOG.info(" reduce task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0002_r_000001_0 on tt1");
|
|
|
+ } else {
|
|
|
+ fail(" should not have come here " + task.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //now both the reduce slots are being used , hence we sholdnot get only 1
|
|
|
+ //map task in this assignTasks call.
|
|
|
+ tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ assertEquals(tasks.size(), 1);
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ LOG.info(" map task assigned " + task.toString());
|
|
|
+ // we get from job 2 because the queues are equal in capacity usage
|
|
|
+ // and sorting leaves order unchanged.
|
|
|
+ assertEquals(task.toString(), "attempt_test_0002_m_000002_0 on tt1");
|
|
|
+ } else if (task.toString().contains("_r_")) {
|
|
|
+ LOG.info(" reduce task assigned " + task.toString());
|
|
|
+ fail("should not give reduce task " + task.toString());
|
|
|
+ } else {
|
|
|
+ fail(" should not have come here " + task.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ tasks = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ assertEquals(tasks.size(), 1);
|
|
|
+ for (Task task : tasks) {
|
|
|
+ if (task.toString().contains("_m_")) {
|
|
|
+ LOG.info(" map task assigned " + task.toString());
|
|
|
+ assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
|
|
|
+ } else if (task.toString().contains("_r_")) {
|
|
|
+ LOG.info(" reduce task assigned " + task.toString());
|
|
|
+ fail("should not give reduce task " + task.toString());
|
|
|
+ } else {
|
|
|
+ fail(" should not have come here " + task.toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ scheduler.setAssignMultipleTasks(false);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
// basic tests, should be able to submit to queues
|
|
|
public void testSubmitToQueues() throws Exception {
|
|
|
// set up some queues
|
|
@@ -1947,14 +2112,24 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Test blocking of cluster for lack of memory.
|
|
|
+ * Tests that scheduler schedules normal jobs once high RAM jobs
|
|
|
+ * have been reserved to the limit.
|
|
|
+ *
|
|
|
+ * The test causes the scheduler to schedule a normal job on two
|
|
|
+ * trackers, and one task of the high RAM job on a third. Then it
|
|
|
+ * asserts that one of the first two trackers gets a reservation
|
|
|
+ * for the remaining task of the high RAM job. After this, it
|
|
|
+ * asserts that a normal job submitted later is allowed to run
|
|
|
+ * on a free slot, as all tasks of the high RAM job are either
|
|
|
+ * scheduled or reserved.
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void testClusterBlockingForLackOfMemory()
|
|
|
throws IOException {
|
|
|
|
|
|
LOG.debug("Starting the scheduler.");
|
|
|
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
|
|
|
|
|
|
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
|
|
@@ -1977,34 +2152,38 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
scheduler.start();
|
|
|
|
|
|
LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
|
|
|
- + "1 map, 1 reduce tasks.");
|
|
|
+ + "2 map, 2 reduce tasks.");
|
|
|
JobConf jConf = new JobConf(conf);
|
|
|
jConf.setMemoryForMapTask(1 * 1024);
|
|
|
jConf.setMemoryForReduceTask(1 * 1024);
|
|
|
- jConf.setNumMapTasks(1);
|
|
|
- jConf.setNumReduceTasks(1);
|
|
|
+ jConf.setNumMapTasks(2);
|
|
|
+ jConf.setNumReduceTasks(2);
|
|
|
jConf.setQueueName("default");
|
|
|
jConf.setUser("u1");
|
|
|
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
|
|
- // Fill the second tt with this job.
|
|
|
- checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
|
|
|
+ // Fill a tt with this job's tasks.
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
// Total 1 map slot should be accounted for.
|
|
|
- checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
|
|
|
assertEquals(String.format(
|
|
|
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
1, 1, 0, 0, 0, 0),
|
|
|
(String) job1.getSchedulingInfo());
|
|
|
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
|
|
|
- checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
|
|
|
- // Total 1 map slot should be accounted for.
|
|
|
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
|
|
|
- 25.0f);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 0L);
|
|
|
+
|
|
|
+ // same for reduces.
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
|
|
|
assertEquals(String.format(
|
|
|
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
1, 1, 0, 1, 1, 0),
|
|
|
(String) job1.getSchedulingInfo());
|
|
|
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
|
|
|
+
|
|
|
+ // fill another TT with the rest of the tasks of the job
|
|
|
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
|
|
|
+ checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
|
|
|
|
|
|
LOG.debug("Submit one high memory(2GB maps/reduces) job of "
|
|
|
+ "2 map, 2 reduce tasks.");
|
|
@@ -2017,27 +2196,26 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
jConf.setUser("u1");
|
|
|
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
|
|
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
- // Total 3 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
|
|
|
+ // Have another TT run one task of each type of the high RAM
|
|
|
+ // job. This will fill up the TT.
|
|
|
+ checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
|
|
|
assertEquals(String.format(
|
|
|
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
1, 2, 0, 0, 0, 0),
|
|
|
(String) job2.getSchedulingInfo());
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
+ checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 0L);
|
|
|
|
|
|
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
- // Total 3 reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
|
|
|
- 75.0f);
|
|
|
+ checkAssignment("tt3", "attempt_test_0002_r_000001_0 on tt3");
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
|
|
|
assertEquals(String.format(
|
|
|
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
1, 2, 0, 1, 2, 0),
|
|
|
(String) job2.getSchedulingInfo());
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
|
|
|
+ checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
|
|
|
|
|
|
LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
|
|
|
- + "1 map, 0 reduce tasks.");
|
|
|
+ + "1 map, 1 reduce tasks.");
|
|
|
jConf = new JobConf(conf);
|
|
|
jConf.setMemoryForMapTask(1 * 1024);
|
|
|
jConf.setMemoryForReduceTask(1 * 1024);
|
|
@@ -2047,29 +2225,27 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
jConf.setUser("u1");
|
|
|
FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
|
|
- // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
|
|
|
- assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
+ // Send a TT with insufficient space for task assignment,
|
|
|
+ // This will cause a reservation for the high RAM job.
|
|
|
assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
|
|
|
- // reserved tasktrackers contribute to occupied slots for maps.
|
|
|
- checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
|
|
|
- // occupied slots for reduces remain unchanged as tt1 is not reserved for
|
|
|
- // reduces.
|
|
|
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
|
|
|
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
|
|
|
+ // reserved tasktrackers contribute to occupied slots for maps and reduces
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
|
|
|
LOG.info(job2.getSchedulingInfo());
|
|
|
assertEquals(String.format(
|
|
|
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
- 1, 2, 2, 1, 2, 0),
|
|
|
+ 1, 2, 2, 1, 2, 2),
|
|
|
(String) job2.getSchedulingInfo());
|
|
|
assertEquals(String.format(
|
|
|
CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
0, 0, 0, 0, 0, 0),
|
|
|
(String) job3.getSchedulingInfo());
|
|
|
-
|
|
|
- // One reservation is already done for job2. So job3 should go ahead.
|
|
|
+
|
|
|
+ // Reservations are already done for job2. So job3 should go ahead.
|
|
|
checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
|
|
|
+ checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
|
|
|
}
|
|
|
|
|
|
/**
|