|
@@ -379,6 +379,10 @@ public class TestCapacityScheduler extends TestCase {
|
|
job.kill();
|
|
job.kill();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public void removeJob(JobID jobid) {
|
|
|
|
+ jobs.remove(jobid);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public JobInProgress getJob(JobID jobid) {
|
|
public JobInProgress getJob(JobID jobid) {
|
|
return jobs.get(jobid);
|
|
return jobs.get(jobid);
|
|
@@ -1785,6 +1789,84 @@ public class TestCapacityScheduler extends TestCase {
|
|
assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * Testcase to verify fix for a NPE (HADOOP-5641), when memory based
|
|
|
|
+ * scheduling is enabled and jobs are retired from memory when tasks
|
|
|
|
+ * are still active on some Tasktrackers.
|
|
|
|
+ *
|
|
|
|
+ * @throws IOException
|
|
|
|
+ */
|
|
|
|
+ public void testMemoryMatchingWithRetiredJobs() throws IOException {
|
|
|
|
+ // create a cluster with a single node.
|
|
|
|
+ LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
|
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
|
|
|
|
+ TaskTrackerStatus.ResourceStatus ttStatus =
|
|
|
|
+ taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
|
|
|
|
+ LOG.debug("Assume TT has 4 GB virtual mem and 2 GB RAM");
|
|
|
|
+ ttStatus.setTotalVirtualMemory(4 * 1024 * 1024 * 1024L);
|
|
|
|
+ ttStatus.setReservedVirtualMemory(0);
|
|
|
|
+ ttStatus.setTotalPhysicalMemory(2 * 1024 * 1024 * 1024L);
|
|
|
|
+ ttStatus.setReservedPhysicalMemory(0);
|
|
|
|
+
|
|
|
|
+ // create scheduler
|
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
|
|
|
|
+ taskTrackerManager.addQueues(new String[] { "default" });
|
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
|
+ scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
|
+ // enabled memory-based scheduling
|
|
|
|
+ LOG.debug("By default, jobs get 0.5 GB per task vmem" +
|
|
|
|
+ " and 2 GB max vmem, with 50% of it for RAM");
|
|
|
|
+ scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
|
|
|
|
+ 512 * 1024 * 1024L);
|
|
|
|
+ scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
|
|
|
|
+ 2 * 1024 * 1024 * 1024L);
|
|
|
|
+ resConf.setDefaultPercentOfPmemInVmem(50.0f);
|
|
|
|
+ resConf.setLimitMaxPmemForTasks(1 * 1024 * 1024 * 1024L);
|
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
|
+ scheduler.start();
|
|
|
|
+
|
|
|
|
+ // submit a normal job
|
|
|
|
+ LOG.debug("Submitting a normal job with 2 maps and 2 reduces");
|
|
|
|
+ JobConf jConf = new JobConf();
|
|
|
|
+ jConf.setNumMapTasks(2);
|
|
|
|
+ jConf.setNumReduceTasks(2);
|
|
|
|
+ jConf.setQueueName("default");
|
|
|
|
+ jConf.setUser("u1");
|
|
|
|
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
+
|
|
|
|
+ // 1st cycle - 1 map gets assigned.
|
|
|
|
+ Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
|
+
|
|
|
|
+ // kill this job !
|
|
|
|
+ taskTrackerManager.killJob(job1.getJobID());
|
|
|
|
+
|
|
|
|
+ // retire the job
|
|
|
|
+ taskTrackerManager.removeJob(job1.getJobID());
|
|
|
|
+
|
|
|
|
+ // submit another job.
|
|
|
|
+ LOG.debug("Submitting another normal job with 1 map and 1 reduce");
|
|
|
|
+ jConf = new JobConf();
|
|
|
|
+ jConf.setNumMapTasks(1);
|
|
|
|
+ jConf.setNumReduceTasks(1);
|
|
|
|
+ jConf.setQueueName("default");
|
|
|
|
+ jConf.setUser("u1");
|
|
|
|
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
|
+
|
|
|
|
+ // 2nd cycle - nothing should get assigned. Memory matching code
|
|
|
|
+ // will see the job is missing and fail memory requirements.
|
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
|
+ // calling again should not make a difference, as the task is still running
|
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
|
+
|
|
|
|
+ // finish the task on the tracker.
|
|
|
|
+ taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
|
|
|
|
+ // now a new task can be assigned.
|
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
|
+ // reduce can be assigned.
|
|
|
|
+ t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
|
+ }
|
|
|
|
+
|
|
protected TaskTrackerStatus tracker(String taskTrackerName) {
|
|
protected TaskTrackerStatus tracker(String taskTrackerName) {
|
|
return taskTrackerManager.getTaskTracker(taskTrackerName);
|
|
return taskTrackerManager.getTaskTracker(taskTrackerName);
|
|
}
|
|
}
|