|
@@ -23,6 +23,10 @@ import java.util.List;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.Map;
|
|
|
import java.io.IOException;
|
|
|
+
|
|
|
+import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress;
|
|
|
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
|
|
|
+
|
|
|
import static org.apache.hadoop.mapred.CapacityTestUtils.*;
|
|
|
|
|
|
public class TestContainerQueue extends TestCase {
|
|
@@ -236,10 +240,10 @@ public class TestContainerQueue extends TestCase {
|
|
|
// Create 2 levels of hierarchy.
|
|
|
|
|
|
//Firt level
|
|
|
- QueueSchedulingContext sch =
|
|
|
- new QueueSchedulingContext("rt.sch", a, -1, -1);
|
|
|
- QueueSchedulingContext gta =
|
|
|
- new QueueSchedulingContext("rt.gta", b, -1, -1);
|
|
|
+ QueueSchedulingContext sch = new QueueSchedulingContext("rt.sch", a, -1,
|
|
|
+ -1, rt.qsc);
|
|
|
+ QueueSchedulingContext gta = new QueueSchedulingContext("rt.gta", b, -1,
|
|
|
+ -1, rt.qsc);
|
|
|
|
|
|
AbstractQueue schq = new ContainerQueue(rt, sch);
|
|
|
|
|
@@ -249,11 +253,11 @@ public class TestContainerQueue extends TestCase {
|
|
|
map.put(gtaq.getName(), gtaq);
|
|
|
scheduler.jobQueuesManager.addQueue((JobQueue) gtaq);
|
|
|
|
|
|
- //Create further children.
|
|
|
- QueueSchedulingContext prod =
|
|
|
- new QueueSchedulingContext("rt.sch.prod", c, -1, -1);
|
|
|
- QueueSchedulingContext misc =
|
|
|
- new QueueSchedulingContext("rt.sch.misc", d, -1, -1);
|
|
|
+ // Create further children.
|
|
|
+ QueueSchedulingContext prod = new QueueSchedulingContext("rt.sch.prod", c,
|
|
|
+ -1, -1, sch);
|
|
|
+ QueueSchedulingContext misc = new QueueSchedulingContext("rt.sch.misc", d,
|
|
|
+ -1, -1, sch);
|
|
|
|
|
|
AbstractQueue prodq = new JobQueue(schq, prod);
|
|
|
AbstractQueue miscq = new JobQueue(schq, misc);
|
|
@@ -265,6 +269,101 @@ public class TestContainerQueue extends TestCase {
|
|
|
return map;
|
|
|
}
|
|
|
|
|
|
+ public void testMaxCapacityContainerQueuehonuredInchildqueue()
|
|
|
+ throws IOException {
|
|
|
+ this.setUp(8, 1, 1);
|
|
|
+ taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
|
|
|
+ // set up some queues
|
|
|
+ Map<String, AbstractQueue> map = setUpHierarchy(50, 50, 50, 50);
|
|
|
+ scheduler.updateContextInfoForTests();
|
|
|
+ // verify initial capacity distribution
|
|
|
+ TaskSchedulingContext mapTsc = map.get("rt.gta")
|
|
|
+ .getQueueSchedulingContext().getMapTSC();
|
|
|
+ assertEquals(mapTsc.getCapacity(), 4);
|
|
|
+
|
|
|
+ mapTsc = map.get("rt.sch").getQueueSchedulingContext().getMapTSC();
|
|
|
+ assertEquals(mapTsc.getCapacity(), 4);
|
|
|
+
|
|
|
+ mapTsc = map.get("rt.sch.prod").getQueueSchedulingContext().getMapTSC();
|
|
|
+ assertEquals(mapTsc.getCapacity(), 2);
|
|
|
+
|
|
|
+ mapTsc = map.get("rt.sch.misc").getQueueSchedulingContext().getMapTSC();
|
|
|
+ assertEquals(mapTsc.getCapacity(), 2);
|
|
|
+
|
|
|
+ assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
|
|
|
+ "rt.sch.misc" }, new int[] { 0, 0, 0, 0 });
|
|
|
+
|
|
|
+ map.get("rt.sch").getQueueSchedulingContext().setMaxCapacityPercent(50.0f);
|
|
|
+ map.get("rt.sch").getQueueSchedulingContext().getMapTSC().setMaxCapacity(4);
|
|
|
+ map.get("rt.sch").getQueueSchedulingContext().getReduceTSC()
|
|
|
+ .setMaxCapacity(4);
|
|
|
+
|
|
|
+ // Only Allow job submission to leaf queue
|
|
|
+ FakeJobInProgress fjob1 = taskTrackerManager.submitJob(JobStatus.PREP, 5,
|
|
|
+ 5, "rt.sch.prod", "u1");
|
|
|
+ taskTrackerManager.initJob(fjob1);
|
|
|
+
|
|
|
+ Map<String, String> expectedStrings = new HashMap<String, String>();
|
|
|
+ expectedStrings.clear();
|
|
|
+ expectedStrings.put(CapacityTestUtils.MAP,
|
|
|
+ "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ expectedStrings.put(CapacityTestUtils.REDUCE,
|
|
|
+ "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+ List<Task> task1 = checkMultipleTaskAssignment(taskTrackerManager,
|
|
|
+ scheduler, "tt1", expectedStrings);
|
|
|
+ assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
|
|
|
+ "rt.sch.misc" }, new int[] { 0, 1, 1, 0 });
|
|
|
+
|
|
|
+ expectedStrings.clear();
|
|
|
+ expectedStrings.put(CapacityTestUtils.MAP,
|
|
|
+ "attempt_test_0001_m_000002_0 on tt2");
|
|
|
+ expectedStrings.put(CapacityTestUtils.REDUCE,
|
|
|
+ "attempt_test_0001_r_000002_0 on tt2");
|
|
|
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt2",
|
|
|
+ expectedStrings);
|
|
|
+ assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
|
|
|
+ "rt.sch.misc" }, new int[] { 0, 2, 2, 0 });
|
|
|
+
|
|
|
+ expectedStrings.clear();
|
|
|
+ expectedStrings.put(CapacityTestUtils.MAP,
|
|
|
+ "attempt_test_0001_m_000003_0 on tt3");
|
|
|
+ expectedStrings.put(CapacityTestUtils.REDUCE,
|
|
|
+ "attempt_test_0001_r_000003_0 on tt3");
|
|
|
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt3",
|
|
|
+ expectedStrings);
|
|
|
+ assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
|
|
|
+ "rt.sch.misc" }, new int[] { 0, 3, 3, 0 });
|
|
|
+
|
|
|
+ expectedStrings.clear();
|
|
|
+ expectedStrings.put(CapacityTestUtils.MAP,
|
|
|
+ "attempt_test_0001_m_000004_0 on tt4");
|
|
|
+ expectedStrings.put(CapacityTestUtils.REDUCE,
|
|
|
+ "attempt_test_0001_r_000004_0 on tt4");
|
|
|
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt4",
|
|
|
+ expectedStrings);
|
|
|
+ assertUsedCapacity(map, new String[] { "rt.gta", "rt.sch", "rt.sch.prod",
|
|
|
+ "rt.sch.misc" }, new int[] { 0, 4, 4, 0 });
|
|
|
+
|
|
|
+ // we have already reached the limit
|
|
|
+ // this call would return null
|
|
|
+ List<Task> task5 = scheduler.assignTasks(tracker("tt5"));
|
|
|
+ assertNull(task5);
|
|
|
+
|
|
|
+ // Now complete the task 1 i.e map task.
|
|
|
+ for (Task task : task1) {
|
|
|
+ taskTrackerManager.finishTask(task.getTaskID().toString(), fjob1);
|
|
|
+ }
|
|
|
+ expectedStrings.clear();
|
|
|
+ expectedStrings.put(MAP, "attempt_test_0001_m_000005_0 on tt1");
|
|
|
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000005_0 on tt1");
|
|
|
+ task5 = checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1",
|
|
|
+ expectedStrings);
|
|
|
+ }
|
|
|
+
|
|
|
+ protected TaskTracker tracker(String taskTrackerName) {
|
|
|
+ return taskTrackerManager.getTaskTracker(taskTrackerName);
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Verifies that capacities are allocated properly in hierarchical queues.
|
|
|
*
|