|
@@ -862,6 +862,86 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
1, rqueue.size());
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the max map limit.
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void testMaxMapCap() throws IOException {
|
|
|
+ this.setUp(4,1,1);
|
|
|
+ taskTrackerManager.addQueues(new String[] {"default"});
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ resConf.setMaxMapCap("default",2);
|
|
|
+ resConf.setMaxReduceCap("default",-1);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ //submit the Job
|
|
|
+ FakeJobInProgress fjob1 =
|
|
|
+ submitJobAndInit(JobStatus.PREP,3,1,"default","user");
|
|
|
+
|
|
|
+ List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
|
|
|
+
|
|
|
+ //Once the 2 tasks are running the third assigment should be reduce.
|
|
|
+ checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
|
|
|
+ //This should fail.
|
|
|
+ List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
|
|
|
+ assertNull(task4);
|
|
|
+ //Now complete the task 1.
|
|
|
+ // complete the job
|
|
|
+ taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
|
|
|
+ fjob1);
|
|
|
+ //We have completed the tt1 task which was a map task so we expect one map
|
|
|
+ //task to be picked up
|
|
|
+ checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test max reduce limit
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void testMaxReduceCap() throws IOException {
|
|
|
+ this.setUp(4, 1, 1);
|
|
|
+ taskTrackerManager.addQueues(new String[]{"default"});
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ resConf.setMaxMapCap("default", -1);
|
|
|
+ resConf.setMaxReduceCap("default", 2);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ //submit the Job
|
|
|
+ FakeJobInProgress fjob1 =
|
|
|
+ submitJobAndInit(JobStatus.PREP, 1, 3, "default", "user");
|
|
|
+
|
|
|
+ List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
|
|
|
+ List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
|
|
|
+ List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
|
|
|
+
|
|
|
+ //This should fail. 1 map, 2 reduces , we have reached the limit.
|
|
|
+ List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
|
|
|
+ assertNull(task4);
|
|
|
+ //Now complete the task 1 i.e map task.
|
|
|
+ // complete the job
|
|
|
+ taskTrackerManager.finishTask(
|
|
|
+ "tt1", task1.get(0).getTaskID().toString(),
|
|
|
+ fjob1);
|
|
|
+
|
|
|
+ //This should still fail as only map task is done
|
|
|
+ task4 = scheduler.assignTasks(tracker("tt4"));
|
|
|
+ assertNull(task4);
|
|
|
+
|
|
|
+ //Complete the reduce task
|
|
|
+ taskTrackerManager.finishTask(
|
|
|
+ "tt2", task2.get(0).getTaskID().toString(), fjob1);
|
|
|
+
|
|
|
+ //One reduce is done hence assign the new reduce.
|
|
|
+ checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
|
|
|
+ }
|
|
|
|
|
|
// test if the queue reflects the changes
|
|
|
private void testJobOrderChange(FakeJobInProgress fjob1,
|
|
@@ -1132,6 +1212,144 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Creates a queue with max task limit of 2
|
|
|
+ * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
|
|
|
+ * given to high ram job and are reserved , no other tasks are accepted .
|
|
|
+ *
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void testHighMemoryBlockingWithMaxLimit()
|
|
|
+ throws IOException {
|
|
|
+
|
|
|
+ // 2 map and 1 reduce slots
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
|
|
|
+
|
|
|
+ taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ resConf.setMaxMapCap("defaultXYZ",2);
|
|
|
+ scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
+ // enabled memory-based scheduling
|
|
|
+ // Normal job in the cluster would be 1GB maps/reduces
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
|
|
|
+ 2 * 1024);
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
|
|
|
+ 1 * 1024);
|
|
|
+ scheduler.getConf().setLong(
|
|
|
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ // The situation : Submit 2 jobs with high memory map task
|
|
|
+ //Set the max limit for queue to 2 ,
|
|
|
+ // try submitting more map tasks to the queue , it should not happen
|
|
|
+
|
|
|
+ LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
|
|
|
+ + "2 map tasks");
|
|
|
+ JobConf jConf = new JobConf(conf);
|
|
|
+ jConf.setMemoryForMapTask(2 * 1024);
|
|
|
+ jConf.setMemoryForReduceTask(0);
|
|
|
+ jConf.setNumMapTasks(2);
|
|
|
+ jConf.setNumReduceTasks(0);
|
|
|
+ jConf.setQueueName("defaultXYZ");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
+
|
|
|
+ LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
|
|
|
+ + "2 map/red tasks");
|
|
|
+ jConf = new JobConf(conf);
|
|
|
+ jConf.setMemoryForMapTask(1 * 1024);
|
|
|
+ jConf.setMemoryForReduceTask(1 * 1024);
|
|
|
+ jConf.setNumMapTasks(2);
|
|
|
+ jConf.setNumReduceTasks(2);
|
|
|
+ jConf.setQueueName("defaultXYZ");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
+
|
|
|
+ // first, a map from j1 will run this is a high memory job so it would
|
|
|
+ // occupy the 2 slots
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+
|
|
|
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
+
|
|
|
+ // at this point, the scheduler tries to schedule another map from j1.
|
|
|
+ // there isn't enough space. The second job's reduce should be scheduled.
|
|
|
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
+
|
|
|
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
|
|
|
+
|
|
|
+ //at this point , the scheduler tries to schedule another map from j2 for
|
|
|
+ //another task tracker.
|
|
|
+ // This should not happen as all the map slots are taken
|
|
|
+ //by the first task itself.hence reduce task from the second job is given
|
|
|
+
|
|
|
+ checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * test if user limits automatically adjust to max map or reduce limit
|
|
|
+ */
|
|
|
+ public void testUserLimitsWithMaxLimits() throws Exception {
|
|
|
+ setUp(4, 4, 4);
|
|
|
+ // set up some queues
|
|
|
+ String[] qs = {"default"};
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ resConf.setMaxMapCap("default", 2);
|
|
|
+ resConf.setMaxReduceCap("default", 2);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ // submit a job
|
|
|
+ FakeJobInProgress fjob1 =
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
|
|
|
+ FakeJobInProgress fjob2 =
|
|
|
+ submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
|
|
|
+
|
|
|
+ // for queue 'default', the capacity for maps is 2.
|
|
|
+ // But the max map limit is 2
|
|
|
+ // hence user should be getting not more than 1 as it is the 50%.
|
|
|
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+
|
|
|
+ //Now we should get the task from the other job. As the
|
|
|
+ //first user has reached his max map limit.
|
|
|
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
|
+
|
|
|
+ //Now we are done with map limit , now if we ask for task we should
|
|
|
+ // get reduce from 1st job
|
|
|
+ checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
|
|
|
+ // Now we're at full capacity for maps. 1 done with reduces for job 1 so
|
|
|
+ // now we should get 1 reduces for job 2
|
|
|
+ Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
|
|
|
+
|
|
|
+ taskTrackerManager.finishTask(
|
|
|
+ "tt1", t1.getTaskID().toString(),
|
|
|
+ fjob1);
|
|
|
+
|
|
|
+ //tt1 completed the task so we have 1 map slot for u1
|
|
|
+ // we are assigning the 2nd map task from fjob1
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
|
+
|
|
|
+ taskTrackerManager.finishTask(
|
|
|
+ "tt4", t4.getTaskID().toString(),
|
|
|
+ fjob2);
|
|
|
+ //tt4 completed the task , so we have 1 reduce slot for u2
|
|
|
+ //we are assigning the 2nd reduce from fjob2
|
|
|
+ checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
// test user limits
|
|
|
public void testUserLimits() throws Exception {
|
|
|
// set up some queues
|
|
@@ -2707,17 +2925,25 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
/**
|
|
|
* Verify the number of slots of type 'type' from the queue 'queue'.
|
|
|
+ * incrMapIndex and incrReduceIndex are set , when expected output string is
|
|
|
+ * changed.these values can be set if the index of
|
|
|
+ * "Used capacity: %d (%.1f%% of Capacity)"
|
|
|
+ * is changed.
|
|
|
*
|
|
|
* @param queue
|
|
|
* @param type
|
|
|
* @param numActiveUsers in the queue at present.
|
|
|
* @param expectedOccupiedSlots
|
|
|
* @param expectedOccupiedSlotsPercent
|
|
|
- * @return
|
|
|
+ * @param incrMapIndex
|
|
|
+ * @param incrReduceIndex
|
|
|
*/
|
|
|
- private void checkOccupiedSlots(String queue,
|
|
|
- TaskType type, int numActiveUsers,
|
|
|
- int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
|
|
|
+ private void checkOccupiedSlots(
|
|
|
+ String queue,
|
|
|
+ TaskType type, int numActiveUsers,
|
|
|
+ int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,int incrMapIndex
|
|
|
+ ,int incrReduceIndex
|
|
|
+ ) {
|
|
|
scheduler.updateQSIInfoForTests();
|
|
|
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
|
|
|
String schedulingInfo =
|
|
@@ -2725,9 +2951,9 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
String[] infoStrings = schedulingInfo.split("\n");
|
|
|
int index = -1;
|
|
|
if (type.equals(TaskType.MAP)) {
|
|
|
- index = 7;
|
|
|
+ index = 7+ incrMapIndex;
|
|
|
} else if (type.equals(TaskType.REDUCE)) {
|
|
|
- index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
|
|
|
+ index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers)+incrReduceIndex;
|
|
|
}
|
|
|
LOG.info(infoStrings[index]);
|
|
|
assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
|
|
@@ -2735,6 +2961,24 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
infoStrings[index]);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @param queue
|
|
|
+ * @param type
|
|
|
+ * @param numActiveUsers
|
|
|
+ * @param expectedOccupiedSlots
|
|
|
+ * @param expectedOccupiedSlotsPercent
|
|
|
+ */
|
|
|
+ private void checkOccupiedSlots(
|
|
|
+ String queue,
|
|
|
+ TaskType type, int numActiveUsers,
|
|
|
+ int expectedOccupiedSlots, float expectedOccupiedSlotsPercent
|
|
|
+ ) {
|
|
|
+ checkOccupiedSlots(
|
|
|
+ queue, type, numActiveUsers, expectedOccupiedSlots,
|
|
|
+ expectedOccupiedSlotsPercent,0,0);
|
|
|
+ }
|
|
|
+
|
|
|
private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
|
|
|
assertTrue("Observed and expected queues are not of same length.",
|
|
|
expectedOrder.length == observedOrder.length);
|