|
@@ -40,6 +40,9 @@ import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
+import org.apache.hadoop.mapreduce.TaskType;
|
|
|
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
|
|
|
+
|
|
|
public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
static final Log LOG =
|
|
@@ -200,7 +203,8 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
|
|
|
- Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
|
|
|
+ Task task = new MapTask("", attemptId, 0, "", new BytesWritable(),
|
|
|
+ super.numSlotsPerMap, getJobConf().getUser()) {
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
|
|
@@ -242,7 +246,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
|
|
|
- Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
|
|
|
+ Task task = new ReduceTask("", attemptId, 0, 10, super.numSlotsPerReduce, getJobConf().getUser()) {
|
|
|
@Override
|
|
|
public String toString() {
|
|
|
return String.format("%s on %s", getTaskID(), tts.getTrackerName());
|
|
@@ -333,7 +337,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
|
|
|
boolean isMap, FakeJobInProgress job) {
|
|
|
- super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
|
|
|
+ super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
|
|
|
this.isMap = isMap;
|
|
|
this.fakeJob = job;
|
|
|
activeTasks = new TreeMap<TaskAttemptID, String>();
|
|
@@ -418,8 +422,8 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
new ArrayList<JobInProgressListener>();
|
|
|
FakeQueueManager qm = new FakeQueueManager();
|
|
|
|
|
|
- private Map<String, TaskTrackerStatus> trackers =
|
|
|
- new HashMap<String, TaskTrackerStatus>();
|
|
|
+ private Map<String, TaskTracker> trackers =
|
|
|
+ new HashMap<String, TaskTracker>();
|
|
|
private Map<String, TaskStatus> taskStatuses =
|
|
|
new HashMap<String, TaskStatus>();
|
|
|
private Map<JobID, JobInProgress> jobs =
|
|
@@ -435,16 +439,22 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
|
|
|
for (int i = 1; i < numTaskTrackers + 1; i++) {
|
|
|
String ttName = "tt" + i;
|
|
|
- trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", i,
|
|
|
- new ArrayList<TaskStatus>(), 0, maxMapTasksPerTracker,
|
|
|
- maxReduceTasksPerTracker));
|
|
|
+ TaskTracker tt = new TaskTracker(ttName);
|
|
|
+ tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
|
|
|
+ new ArrayList<TaskStatus>(), 0,
|
|
|
+ maxMapTasksPerTracker,
|
|
|
+ maxReduceTasksPerTracker));
|
|
|
+ trackers.put(ttName, tt);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
public void addTaskTracker(String ttName) {
|
|
|
- trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1,
|
|
|
- new ArrayList<TaskStatus>(), 0,
|
|
|
- maxMapTasksPerTracker, maxReduceTasksPerTracker));
|
|
|
+ TaskTracker tt = new TaskTracker(ttName);
|
|
|
+ tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
|
|
|
+ new ArrayList<TaskStatus>(), 0,
|
|
|
+ maxMapTasksPerTracker,
|
|
|
+ maxReduceTasksPerTracker));
|
|
|
+ trackers.put(ttName, tt);
|
|
|
}
|
|
|
|
|
|
public ClusterStatus getClusterStatus() {
|
|
@@ -505,7 +515,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
public Collection<TaskTrackerStatus> taskTrackers() {
|
|
|
- return trackers.values();
|
|
|
+ List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
|
|
|
+ for (TaskTracker tt : trackers.values()) {
|
|
|
+ statuses.add(tt.getStatus());
|
|
|
+ }
|
|
|
+ return statuses;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -524,7 +538,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public TaskTrackerStatus getTaskTracker(String trackerID) {
|
|
|
+ public TaskTracker getTaskTracker(String trackerID) {
|
|
|
return trackers.get(trackerID);
|
|
|
}
|
|
|
|
|
@@ -544,10 +558,15 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
public boolean getIsMap() {
|
|
|
return t.isMapTask();
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public int getNumSlots() {
|
|
|
+ return t.getNumSlotsRequired();
|
|
|
+ }
|
|
|
};
|
|
|
taskStatuses.put(t.getTaskID().toString(), status);
|
|
|
status.setRunState(TaskStatus.State.RUNNING);
|
|
|
- trackers.get(taskTrackerName).getTaskReports().add(status);
|
|
|
+ trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
|
|
|
}
|
|
|
|
|
|
public void finishTask(String taskTrackerName, String tipId,
|
|
@@ -1698,14 +1717,14 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// first, a map from j1 will run
|
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
// Total 2 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
|
|
|
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
|
|
|
// at this point, the scheduler tries to schedule another map from j1.
|
|
|
// there isn't enough space. The second job's reduce should be scheduled.
|
|
|
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
// Total 1 reduce slot should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
|
|
|
100.0f);
|
|
|
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
|
|
|
}
|
|
@@ -1754,17 +1773,19 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// Fill the second tt with this job.
|
|
|
checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
|
|
|
// Total 1 map slot should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
|
|
|
assertEquals(String.format(
|
|
|
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
|
|
|
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
+ 1, 1, 0, 0, 0, 0),
|
|
|
(String) job1.getSchedulingInfo());
|
|
|
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
|
|
|
checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
|
|
|
// Total 1 map slot should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
|
|
|
25.0f);
|
|
|
assertEquals(String.format(
|
|
|
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
|
|
|
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
+ 1, 1, 0, 1, 1, 0),
|
|
|
(String) job1.getSchedulingInfo());
|
|
|
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
|
|
|
|
|
@@ -1781,18 +1802,20 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
// Total 3 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
|
|
|
assertEquals(String.format(
|
|
|
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
|
|
|
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
+ 1, 2, 0, 0, 0, 0),
|
|
|
(String) job2.getSchedulingInfo());
|
|
|
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
|
|
|
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
// Total 3 reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
|
|
|
75.0f);
|
|
|
assertEquals(String.format(
|
|
|
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
|
|
|
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
+ 1, 2, 0, 1, 2, 0),
|
|
|
(String) job2.getSchedulingInfo());
|
|
|
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
|
|
|
|
|
@@ -1812,16 +1835,24 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
assertNull(scheduler.assignTasks(tracker("tt2")));
|
|
|
assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
assertNull(scheduler.assignTasks(tracker("tt2")));
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
|
|
|
- 75.0f);
|
|
|
+ // reserved tasktrackers contribute to occupied slots
|
|
|
+ // for maps, both tasktrackers are reserved.
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 7, 175.0f);
|
|
|
+ // for reduces, only one tasktracker is reserved, because
|
|
|
+ // the reduce scheduler is not visited for tt1 (as it has
|
|
|
+ // 0 slots free).
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 5,
|
|
|
+ 125.0f);
|
|
|
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
|
|
|
checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
|
|
|
+ LOG.info(job2.getSchedulingInfo());
|
|
|
assertEquals(String.format(
|
|
|
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
|
|
|
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
+ 1, 2, 4, 1, 2, 2),
|
|
|
(String) job2.getSchedulingInfo());
|
|
|
assertEquals(String.format(
|
|
|
- CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
|
|
|
+ CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING,
|
|
|
+ 0, 0, 0, 0, 0, 0),
|
|
|
(String) job3.getSchedulingInfo());
|
|
|
}
|
|
|
|
|
@@ -1872,62 +1903,65 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// 1st cycle - 1 map gets assigned.
|
|
|
Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
// Total 1 map slot should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
|
|
|
checkMemReservedForTasksOnTT("tt1", 512L, 0L);
|
|
|
|
|
|
// 1st cycle of reduces - 1 reduce gets assigned.
|
|
|
Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
// Total 1 reduce slot should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
|
|
|
50.0f);
|
|
|
checkMemReservedForTasksOnTT("tt1", 512L, 512L);
|
|
|
|
|
|
// kill this job !
|
|
|
taskTrackerManager.killJob(job1.getJobID());
|
|
|
// No more map/reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 0, 0, 0.0f);
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 0, 0,
|
|
|
0.0f);
|
|
|
|
|
|
// retire the job
|
|
|
taskTrackerManager.removeJob(job1.getJobID());
|
|
|
|
|
|
// submit another job.
|
|
|
- LOG.debug("Submitting another normal job with 1 map and 1 reduce");
|
|
|
+ LOG.debug("Submitting another normal job with 2 maps and 2 reduces");
|
|
|
jConf = new JobConf();
|
|
|
- jConf.setNumMapTasks(1);
|
|
|
- jConf.setNumReduceTasks(1);
|
|
|
+ jConf.setNumMapTasks(2);
|
|
|
+ jConf.setNumReduceTasks(2);
|
|
|
jConf.setMemoryForMapTask(512);
|
|
|
jConf.setMemoryForReduceTask(512);
|
|
|
jConf.setQueueName("default");
|
|
|
jConf.setUser("u1");
|
|
|
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
|
|
- // 2nd cycle - nothing should get assigned. Memory matching code
|
|
|
- // will see the job is missing and fail memory requirements.
|
|
|
- assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
- checkMemReservedForTasksOnTT("tt1", null, null);
|
|
|
+ // since with HADOOP-5964, we don't rely on a job conf to get
|
|
|
+ // the memory occupied, scheduling should be able to work correctly.
|
|
|
+ t1 = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
|
|
|
|
|
|
- // calling again should not make a difference, as the task is still running
|
|
|
- assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
- checkMemReservedForTasksOnTT("tt1", null, null);
|
|
|
+ // assign a reduce now.
|
|
|
+ t1 = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
|
|
|
|
|
|
+ // now, no more can be assigned because all the slots are blocked.
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
+
|
|
|
// finish the tasks on the tracker.
|
|
|
taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
|
|
|
taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
|
|
|
-
|
|
|
+
|
|
|
// now a new task can be assigned.
|
|
|
- t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
- // Total 1 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 512L, 0L);
|
|
|
-
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
|
|
|
+ // memory used will change because of the finished task above.
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
|
|
|
+
|
|
|
// reduce can be assigned.
|
|
|
- t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
- // Total 1 reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
|
|
|
- 50.0f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 512L, 512L);
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
|
|
|
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
|
|
|
+ checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
|
|
|
}
|
|
|
|
|
|
/*
|
|
@@ -2322,25 +2356,13 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Test case to test scheduling of
|
|
|
- * <ol>
|
|
|
- * <li>High ram job with speculative map execution.
|
|
|
- * <ul>
|
|
|
- * <li>Submit one high ram job which has speculative map.</li>
|
|
|
- * <li>Submit a normal job which has no speculative map.</li>
|
|
|
- * <li>Scheduler should schedule first all map tasks from first job and block
|
|
|
- * the cluster till both maps from first job get completed.
|
|
|
- * </ul>
|
|
|
- * </li>
|
|
|
- * <li>High ram job with speculative reduce execution.
|
|
|
- * <ul>
|
|
|
- * <li>Submit one high ram job which has speculative reduce.</li>
|
|
|
- * <li>Submit a normal job which has no speculative reduce.</li>
|
|
|
- * <li>Scheduler should schedule first all reduce tasks from first job and
|
|
|
- * block the cluster till both reduces are completed.</li>
|
|
|
- * </ul>
|
|
|
- * </li>
|
|
|
- * </ol>
|
|
|
+ * Test case to test scheduling of jobs with speculative execution
|
|
|
+ * in the face of high RAM jobs.
|
|
|
+ *
|
|
|
+ * Essentially, the test verifies that if a high RAM job has speculative
|
|
|
+ * tasks that cannot run because of memory requirements, we block
|
|
|
+ * that node and do not return any tasks to it.
|
|
|
+ *
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
public void testHighRamJobWithSpeculativeExecution() throws IOException {
|
|
@@ -2367,8 +2389,18 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
scheduler.setResourceManagerConf(resConf);
|
|
|
scheduler.start();
|
|
|
|
|
|
+ // Submit a normal job that should occupy a node
|
|
|
+ JobConf jConf = new JobConf(conf);
|
|
|
+ jConf.setMemoryForMapTask(1 * 1024);
|
|
|
+ jConf.setMemoryForReduceTask(0);
|
|
|
+ jConf.setNumMapTasks(2);
|
|
|
+ jConf.setNumReduceTasks(0);
|
|
|
+ jConf.setQueueName("default");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ FakeJobInProgress job1 = submitJob(JobStatus.PREP, jConf);
|
|
|
+
|
|
|
//Submit a high memory job with speculative tasks.
|
|
|
- JobConf jConf = new JobConf();
|
|
|
+ jConf = new JobConf();
|
|
|
jConf.setMemoryForMapTask(2 * 1024);
|
|
|
jConf.setMemoryForReduceTask(0);
|
|
|
jConf.setNumMapTasks(1);
|
|
@@ -2377,13 +2409,13 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
jConf.setUser("u1");
|
|
|
jConf.setMapSpeculativeExecution(true);
|
|
|
jConf.setReduceSpeculativeExecution(false);
|
|
|
- FakeJobInProgress job1 =
|
|
|
+ FakeJobInProgress job2 =
|
|
|
new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
|
|
|
taskTrackerManager, "u1");
|
|
|
- taskTrackerManager.submitJob(job1);
|
|
|
+ taskTrackerManager.submitJob(job2);
|
|
|
|
|
|
//Submit normal job
|
|
|
- jConf = new JobConf();
|
|
|
+ jConf = new JobConf(conf);
|
|
|
jConf.setMemoryForMapTask(1 * 1024);
|
|
|
jConf.setMemoryForReduceTask(0);
|
|
|
jConf.setNumMapTasks(1);
|
|
@@ -2392,127 +2424,46 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
jConf.setUser("u1");
|
|
|
jConf.setMapSpeculativeExecution(false);
|
|
|
jConf.setReduceSpeculativeExecution(false);
|
|
|
- FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
|
|
|
+ FakeJobInProgress job3 = submitJob(JobStatus.PREP, jConf);
|
|
|
|
|
|
controlledInitializationPoller.selectJobsToInitialize();
|
|
|
raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
|
|
|
- // first, a map from j1 will run
|
|
|
- // at this point, there is a speculative task for the same job to be
|
|
|
- //scheduled. This task would be scheduled. Till the tasks from job1 gets
|
|
|
- //complete none of the tasks from other jobs would be scheduled.
|
|
|
+ // Have one node on which all tasks of job1 are scheduled.
|
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
- assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
|
|
|
- // Total 2 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
|
|
|
|
|
|
- //make same tracker get back, check if you are blocking. Your job
|
|
|
- //has speculative map task so tracker should be blocked even tho' it
|
|
|
- //can run job2's map.
|
|
|
- assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
- // Total 2 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
+ // raise events to initialize the 3rd job
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
|
|
|
- //TT2 now gets speculative map of the job1
|
|
|
- checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
|
|
|
- // Total 4 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
|
|
|
+ // On the second node, one task of the high RAM job can be scheduled.
|
|
|
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
|
|
|
checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
|
|
|
-
|
|
|
- // Now since the first job has no more speculative maps, it can schedule
|
|
|
- // the second job.
|
|
|
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
- // Total 5 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
|
|
|
-
|
|
|
- //finish everything
|
|
|
+ assertEquals("pending maps greater than zero " , job2.pendingMaps(), 0);
|
|
|
+ // Total 4 map slots should be accounted for.
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
|
|
|
+
|
|
|
+ // now when the first node gets back, it cannot run any task
|
|
|
+ // because job2 has a speculative task that can run on this node.
|
|
|
+ // This is even though job3's tasks can run on this node.
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
+ // Reservation will count for 2 more slots.
|
|
|
+ checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
|
|
|
+
|
|
|
+ // finish one task from tt1.
|
|
|
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0",
|
|
|
job1);
|
|
|
- taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1",
|
|
|
- job1);
|
|
|
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0",
|
|
|
- job2);
|
|
|
- taskTrackerManager.finalizeJob(job1);
|
|
|
- taskTrackerManager.finalizeJob(job2);
|
|
|
|
|
|
- //Now submit high ram job with speculative reduce and check.
|
|
|
- jConf = new JobConf();
|
|
|
- jConf.setMemoryForMapTask(2 * 1024);
|
|
|
- jConf.setMemoryForReduceTask(2 * 1024L);
|
|
|
- jConf.setNumMapTasks(1);
|
|
|
- jConf.setNumReduceTasks(1);
|
|
|
- jConf.setQueueName("default");
|
|
|
- jConf.setUser("u1");
|
|
|
- jConf.setMapSpeculativeExecution(false);
|
|
|
- jConf.setReduceSpeculativeExecution(true);
|
|
|
- FakeJobInProgress job3 =
|
|
|
- new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
|
|
|
- taskTrackerManager, "u1");
|
|
|
- taskTrackerManager.submitJob(job3);
|
|
|
-
|
|
|
- //Submit normal job w.r.t reduces
|
|
|
- jConf = new JobConf();
|
|
|
- jConf.setMemoryForMapTask(1 * 1024L);
|
|
|
- jConf.setMemoryForReduceTask(1 * 1024L);
|
|
|
- jConf.setNumMapTasks(1);
|
|
|
- jConf.setNumReduceTasks(1);
|
|
|
- jConf.setQueueName("default");
|
|
|
- jConf.setUser("u1");
|
|
|
- jConf.setMapSpeculativeExecution(false);
|
|
|
- jConf.setReduceSpeculativeExecution(false);
|
|
|
- FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
|
|
|
+ // now, we can schedule the speculative task on tt1
|
|
|
+ checkAssignment("tt1", "attempt_test_0002_m_000001_1 on tt1");
|
|
|
|
|
|
- controlledInitializationPoller.selectJobsToInitialize();
|
|
|
- raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
-
|
|
|
- // Finish up the map scheduler
|
|
|
+ // finish one more task from tt1.
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0",
|
|
|
+ job1);
|
|
|
+
|
|
|
+ // now the new job's tasks can be scheduled.
|
|
|
checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
|
|
|
- // Total 2 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
|
|
|
-
|
|
|
- checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
|
|
|
- // Total 3 map slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
|
|
|
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
|
|
|
-
|
|
|
- // first, a reduce from j3 will run
|
|
|
- // at this point, there is a speculative task for the same job to be
|
|
|
- //scheduled. This task would be scheduled. Till the tasks from job3 gets
|
|
|
- //complete none of the tasks from other jobs would be scheduled.
|
|
|
- checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
|
|
|
- assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
|
|
|
- 0);
|
|
|
- // Total 2 reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
|
|
|
- 33.3f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
|
|
|
-
|
|
|
- //make same tracker get back, check if you are blocking. Your job
|
|
|
- //has speculative reduce task so tracker should be blocked even tho' it
|
|
|
- //can run job4's reduce.
|
|
|
- assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
- // Total 2 reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
|
|
|
- 33.3f);
|
|
|
-
|
|
|
- //TT2 now gets speculative reduce of the job3
|
|
|
- checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
|
|
|
- // Total 4 reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
|
|
|
- 66.7f);
|
|
|
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
|
|
|
-
|
|
|
- // Now since j3 has no more speculative reduces, it can schedule
|
|
|
- // the j4.
|
|
|
- checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
|
|
|
- // Total 5 reduce slots should be accounted for.
|
|
|
- checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
|
|
|
- 83.3f);
|
|
|
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2570,32 +2521,32 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// Map 1 of high memory job
|
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
checkQueuesOrder(qs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
|
|
|
+ .getOrderedQueues(TaskType.MAP));
|
|
|
|
|
|
// Reduce 1 of high memory job
|
|
|
checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
checkQueuesOrder(qs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
|
|
|
+ .getOrderedQueues(TaskType.REDUCE));
|
|
|
|
|
|
// Map 1 of normal job
|
|
|
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
|
|
|
+ .getOrderedQueues(TaskType.MAP));
|
|
|
|
|
|
// Reduce 1 of normal job
|
|
|
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
|
|
|
+ .getOrderedQueues(TaskType.REDUCE));
|
|
|
|
|
|
// Map 2 of normal job
|
|
|
checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
|
|
|
+ .getOrderedQueues(TaskType.MAP));
|
|
|
|
|
|
// Reduce 2 of normal job
|
|
|
checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
|
|
|
+ .getOrderedQueues(TaskType.REDUCE));
|
|
|
|
|
|
// Now both the queues are equally served. But the comparator doesn't change
|
|
|
// the order if queues are equally served.
|
|
@@ -2603,32 +2554,32 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
// Map 3 of normal job
|
|
|
checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
|
|
|
+ .getOrderedQueues(TaskType.MAP));
|
|
|
|
|
|
// Reduce 3 of normal job
|
|
|
checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
|
|
|
+ .getOrderedQueues(TaskType.REDUCE));
|
|
|
|
|
|
// Map 2 of high memory job
|
|
|
checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
|
|
|
checkQueuesOrder(qs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
|
|
|
+ .getOrderedQueues(TaskType.MAP));
|
|
|
|
|
|
// Reduce 2 of high memory job
|
|
|
checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
|
|
|
checkQueuesOrder(qs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
|
|
|
+ .getOrderedQueues(TaskType.REDUCE));
|
|
|
|
|
|
// Map 4 of normal job
|
|
|
checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
|
|
|
+ .getOrderedQueues(TaskType.MAP));
|
|
|
|
|
|
// Reduce 4 of normal job
|
|
|
checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
|
|
|
checkQueuesOrder(reversedQs, scheduler
|
|
|
- .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
|
|
|
+ .getOrderedQueues(TaskType.REDUCE));
|
|
|
}
|
|
|
|
|
|
private void checkFailedInitializedJobMovement() throws IOException {
|
|
@@ -2713,7 +2664,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
|
|
|
- protected TaskTrackerStatus tracker(String taskTrackerName) {
|
|
|
+ protected TaskTracker tracker(String taskTrackerName) {
|
|
|
return taskTrackerManager.getTaskTracker(taskTrackerName);
|
|
|
}
|
|
|
|
|
@@ -2737,11 +2688,11 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
private void checkMemReservedForTasksOnTT(String taskTracker,
|
|
|
Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
|
|
|
Long observedMemForMapsOnTT =
|
|
|
- scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
|
|
|
- CapacityTaskScheduler.TYPE.MAP);
|
|
|
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
|
|
|
+ TaskType.MAP);
|
|
|
Long observedMemForReducesOnTT =
|
|
|
- scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
|
|
|
- CapacityTaskScheduler.TYPE.REDUCE);
|
|
|
+ scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
|
|
|
+ TaskType.REDUCE);
|
|
|
if (expectedMemForMapsOnTT == null) {
|
|
|
assertTrue(observedMemForMapsOnTT == null);
|
|
|
} else {
|
|
@@ -2765,7 +2716,7 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
* @return
|
|
|
*/
|
|
|
private void checkOccupiedSlots(String queue,
|
|
|
- CapacityTaskScheduler.TYPE type, int numActiveUsers,
|
|
|
+ TaskType type, int numActiveUsers,
|
|
|
int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
|
|
|
scheduler.updateQSIInfoForTests();
|
|
|
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
|
|
@@ -2773,9 +2724,9 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
queueManager.getJobQueueInfo(queue).getSchedulingInfo();
|
|
|
String[] infoStrings = schedulingInfo.split("\n");
|
|
|
int index = -1;
|
|
|
- if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
|
|
|
+ if (type.equals(TaskType.MAP)) {
|
|
|
index = 7;
|
|
|
- } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
|
|
|
+ } else if (type.equals(TaskType.REDUCE)) {
|
|
|
index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
|
|
|
}
|
|
|
LOG.info(infoStrings[index]);
|