|
@@ -40,6 +40,7 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
|
|
|
|
+
|
|
|
public class TestCapacityScheduler extends TestCase {
|
|
|
|
|
|
static final Log LOG =
|
|
@@ -145,17 +146,24 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
|
|
|
private ControlledInitializationPoller controlledInitializationPoller;
|
|
|
-
|
|
|
+ /*
|
|
|
+ * Fake job in progress object used for testing the schedulers scheduling
|
|
|
+ * decisions. The JobInProgress objects returns out FakeTaskInProgress
|
|
|
+ * objects when assignTasks is called. If speculative maps and reduces
|
|
|
+ * are configured then JobInProgress returns exactly one Speculative
|
|
|
+ * map and reduce task.
|
|
|
+ */
|
|
|
static class FakeJobInProgress extends JobInProgress {
|
|
|
|
|
|
- private FakeTaskTrackerManager taskTrackerManager;
|
|
|
+ protected FakeTaskTrackerManager taskTrackerManager;
|
|
|
private int mapTaskCtr;
|
|
|
private int redTaskCtr;
|
|
|
private Set<TaskInProgress> mapTips =
|
|
|
new HashSet<TaskInProgress>();
|
|
|
private Set<TaskInProgress> reduceTips =
|
|
|
new HashSet<TaskInProgress>();
|
|
|
-
|
|
|
+ private int speculativeMapTaskCounter = 0;
|
|
|
+ private int speculativeReduceTaskCounter = 0;
|
|
|
public FakeJobInProgress(JobID jId, JobConf jobConf,
|
|
|
FakeTaskTrackerManager taskTrackerManager, String user) {
|
|
|
super(jId, jobConf);
|
|
@@ -186,8 +194,14 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
@Override
|
|
|
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
|
|
|
int ignored) throws IOException {
|
|
|
- if (mapTaskCtr == numMapTasks) return null;
|
|
|
- TaskAttemptID attemptId = getTaskAttemptID(true);
|
|
|
+ boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
|
|
|
+ if (areAllMapsRunning){
|
|
|
+ if(!getJobConf().getMapSpeculativeExecution() ||
|
|
|
+ speculativeMapTasks > 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
|
|
|
Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
|
|
|
@Override
|
|
|
public String toString() {
|
|
@@ -197,16 +211,39 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
taskTrackerManager.startTask(tts.getTrackerName(), task);
|
|
|
runningMapTasks++;
|
|
|
// create a fake TIP and keep track of it
|
|
|
- mapTips.add(new FakeTaskInProgress(getJobID(),
|
|
|
- getJobConf(), task, true, this));
|
|
|
+ FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(),
|
|
|
+ getJobConf(), task, true, this);
|
|
|
+ mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
|
|
|
+ if(areAllMapsRunning) {
|
|
|
+ speculativeMapTasks++;
|
|
|
+ //you have scheduled a speculative map. Now set all tips in the
|
|
|
+ //map tips not to have speculative task.
|
|
|
+ for(TaskInProgress t : mapTips) {
|
|
|
+ if (t instanceof FakeTaskInProgress) {
|
|
|
+ FakeTaskInProgress mt = (FakeTaskInProgress) t;
|
|
|
+ mt.hasSpeculativeMap = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //add only non-speculative tips.
|
|
|
+ mapTips.add(mapTip);
|
|
|
+ //add the tips to the JobInProgress TIPS
|
|
|
+ maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
|
|
|
+ }
|
|
|
return task;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
|
|
|
int clusterSize, int ignored) throws IOException {
|
|
|
- if (redTaskCtr == numReduceTasks) return null;
|
|
|
- TaskAttemptID attemptId = getTaskAttemptID(false);
|
|
|
+ boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
|
|
|
+ if (areAllReducesRunning){
|
|
|
+ if(!getJobConf().getReduceSpeculativeExecution() ||
|
|
|
+ speculativeReduceTasks > 0) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
|
|
|
Task task = new ReduceTask("", attemptId, 0, 10) {
|
|
|
@Override
|
|
|
public String toString() {
|
|
@@ -216,8 +253,25 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
taskTrackerManager.startTask(tts.getTrackerName(), task);
|
|
|
runningReduceTasks++;
|
|
|
// create a fake TIP and keep track of it
|
|
|
- reduceTips.add(new FakeTaskInProgress(getJobID(),
|
|
|
- getJobConf(), task, false, this));
|
|
|
+ FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(),
|
|
|
+ getJobConf(), task, false, this);
|
|
|
+ reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
|
|
|
+ if(areAllReducesRunning) {
|
|
|
+ speculativeReduceTasks++;
|
|
|
+ //you have scheduled a speculative map. Now set all tips in the
|
|
|
+ //map tips not to have speculative task.
|
|
|
+ for(TaskInProgress t : reduceTips) {
|
|
|
+ if (t instanceof FakeTaskInProgress) {
|
|
|
+ FakeTaskInProgress rt = (FakeTaskInProgress) t;
|
|
|
+ rt.hasSpeculativeReduce = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ //add only non-speculative tips.
|
|
|
+ reduceTips.add(reduceTip);
|
|
|
+ //add the tips to the JobInProgress TIPS
|
|
|
+ reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
|
|
|
+ }
|
|
|
return task;
|
|
|
}
|
|
|
|
|
@@ -231,10 +285,15 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
finishedReduceTasks++;
|
|
|
}
|
|
|
|
|
|
- private TaskAttemptID getTaskAttemptID(boolean isMap) {
|
|
|
+ private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
|
|
|
JobID jobId = getJobID();
|
|
|
- return new TaskAttemptID(jobId.getJtIdentifier(),
|
|
|
- jobId.getId(), isMap, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
|
|
|
+ if (!isSpeculative) {
|
|
|
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), isMap,
|
|
|
+ (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
|
|
|
+ } else {
|
|
|
+ return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), isMap,
|
|
|
+ (isMap) ? mapTaskCtr : redTaskCtr, 1);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -265,12 +324,15 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
this.status.setRunState(JobStatus.FAILED);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
static class FakeTaskInProgress extends TaskInProgress {
|
|
|
private boolean isMap;
|
|
|
private FakeJobInProgress fakeJob;
|
|
|
private TreeMap<TaskAttemptID, String> activeTasks;
|
|
|
private TaskStatus taskStatus;
|
|
|
+ boolean hasSpeculativeMap;
|
|
|
+ boolean hasSpeculativeReduce;
|
|
|
+
|
|
|
FakeTaskInProgress(JobID jId, JobConf jobConf, Task t,
|
|
|
boolean isMap, FakeJobInProgress job) {
|
|
|
super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
|
|
@@ -282,6 +344,16 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
this.taskStatus = TaskStatus.createTaskStatus(isMap);
|
|
|
taskStatus.setProgress(0.5f);
|
|
|
taskStatus.setRunState(TaskStatus.State.RUNNING);
|
|
|
+ if (jobConf.getMapSpeculativeExecution()) {
|
|
|
+ //resetting of the hasSpeculativeMap is done
|
|
|
+ //when speculative map is scheduled by the job.
|
|
|
+ hasSpeculativeMap = true;
|
|
|
+ }
|
|
|
+ if (jobConf.getReduceSpeculativeExecution()) {
|
|
|
+ //resetting of the hasSpeculativeReduce is done
|
|
|
+ //when speculative reduce is scheduled by the job.
|
|
|
+ hasSpeculativeReduce = true;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -303,6 +375,27 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
}
|
|
|
return true;
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ /*
|
|
|
+ *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
|
|
|
+ *after the speculative tip has been scheduled.
|
|
|
+ */
|
|
|
+ boolean hasSpeculativeTask(long currentTime, double averageProgress) {
|
|
|
+ if(isMap && hasSpeculativeMap) {
|
|
|
+ return fakeJob.getJobConf().getMapSpeculativeExecution();
|
|
|
+ }
|
|
|
+ if (!isMap && hasSpeculativeReduce) {
|
|
|
+ return fakeJob.getJobConf().getReduceSpeculativeExecution();
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public boolean isRunning() {
|
|
|
+ return !activeTasks.isEmpty();
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
|
|
|
static class FakeQueueManager extends QueueManager {
|
|
@@ -619,6 +712,9 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
resConf.getQueues());
|
|
|
scheduler.setInitializationPoller(controlledInitializationPoller);
|
|
|
scheduler.setConf(conf);
|
|
|
+ //by default disable speculative execution.
|
|
|
+ conf.setMapSpeculativeExecution(false);
|
|
|
+ conf.setReduceSpeculativeExecution(false);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1527,6 +1623,8 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
jConf.setNumReduceTasks(1);
|
|
|
jConf.setQueueName("default");
|
|
|
jConf.setUser("u1");
|
|
|
+ jConf.setMapSpeculativeExecution(false);
|
|
|
+ jConf.setReduceSpeculativeExecution(false);
|
|
|
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
|
|
|
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
|
|
@@ -1545,6 +1643,8 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
jConf.setNumReduceTasks(0);
|
|
|
jConf.setQueueName("default");
|
|
|
jConf.setUser("u1");
|
|
|
+ jConf.setMapSpeculativeExecution(false);
|
|
|
+ jConf.setReduceSpeculativeExecution(false);
|
|
|
submitJobAndInit(JobStatus.PREP, jConf); // job2
|
|
|
|
|
|
// This job shouldn't run the TT now because of lack of pmem
|
|
@@ -2205,6 +2305,269 @@ public class TestCapacityScheduler extends TestCase {
|
|
|
mgr.getRunningJobQueue("default").contains(job));
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test case deals with normal jobs which have speculative maps and reduce.
|
|
|
+ * Following is test executed
|
|
|
+ * <ol>
|
|
|
+ * <li>Submit one job with speculative maps and reduce.</li>
|
|
|
+ * <li>Submit another job with no speculative execution.</li>
|
|
|
+ * <li>Observe that all tasks from first job get scheduled, speculative
|
|
|
+ * and normal tasks</li>
|
|
|
+ * <li>Finish all the first jobs tasks second jobs tasks get scheduled.</li>
|
|
|
+ * </ol>
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void testSpeculativeTaskScheduling() throws IOException {
|
|
|
+ String[] qs = {"default"};
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
|
|
|
+ scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
+ taskTrackerManager.addQueues(qs);
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
|
|
|
+ JobConf conf = new JobConf();
|
|
|
+ conf.setNumMapTasks(1);
|
|
|
+ conf.setNumReduceTasks(1);
|
|
|
+ conf.setMapSpeculativeExecution(true);
|
|
|
+ conf.setReduceSpeculativeExecution(true);
|
|
|
+ //Submit a job which would have one speculative map and one speculative
|
|
|
+ //reduce.
|
|
|
+ FakeJobInProgress fjob1 = submitJob(JobStatus.PREP, conf);
|
|
|
+
|
|
|
+ conf = new JobConf();
|
|
|
+ conf.setNumMapTasks(1);
|
|
|
+ conf.setNumReduceTasks(1);
|
|
|
+ //Submit a job which has no speculative map or reduce.
|
|
|
+ FakeJobInProgress fjob2 = submitJob(JobStatus.PREP, conf);
|
|
|
+
|
|
|
+ //Ask the poller to initalize all the submitted job and raise status
|
|
|
+ //change event.
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
+ raiseStatusChangeEvents(mgr);
|
|
|
+
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ assertTrue("Pending maps of job1 greater than zero",
|
|
|
+ (fjob1.pendingMaps() == 0));
|
|
|
+ checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
|
|
|
+ assertTrue("Pending reduces of job2 greater than zero",
|
|
|
+ (fjob1.pendingReduces() == 0));
|
|
|
+ checkAssignment("tt2", "attempt_test_0001_r_000001_1 on tt2");
|
|
|
+
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", fjob1);
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
|
|
|
+ taskTrackerManager.finalizeJob(fjob1);
|
|
|
+
|
|
|
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
|
|
|
+ taskTrackerManager.finalizeJob(fjob2);
|
|
|
+
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * Test case to test scheduling of
|
|
|
+ * <ol>
|
|
|
+ * <li>High ram job with speculative map execution.
|
|
|
+ * <ul>
|
|
|
+ * <li>Submit one high ram job which has speculative map.</li>
|
|
|
+ * <li>Submit a normal job which has no speculative map.</li>
|
|
|
+ * <li>Scheduler should schedule first all map tasks from first job and block
|
|
|
+ * the cluster till both maps from first job get completed.
|
|
|
+ * </ul>
|
|
|
+ * </li>
|
|
|
+ * <li>High ram job with speculative reduce execution.
|
|
|
+ * <ul>
|
|
|
+ * <li>Submit one high ram job which has speculative reduce.</li>
|
|
|
+ * <li>Submit a normal job which has no speculative reduce.</li>
|
|
|
+ * <li>Scheduler should schedule first all reduce tasks from first job and block
|
|
|
+ * the cluster till both reduces are completed.</li>
|
|
|
+ * </ul>
|
|
|
+ * </li>
|
|
|
+ * </ol>
|
|
|
+ * @throws IOException
|
|
|
+ */
|
|
|
+ public void testHighRamJobWithSpeculativeExecution() throws IOException {
|
|
|
+ // 2 map and 2 reduce slots
|
|
|
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
|
|
|
+
|
|
|
+ //task tracker memory configurations.
|
|
|
+ TaskTrackerStatus.ResourceStatus ttStatus =
|
|
|
+ taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
|
|
|
+ ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
|
|
|
+ ttStatus.setReservedVirtualMemory(0);
|
|
|
+ ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
|
|
|
+ ttStatus.setReservedPhysicalMemory(0);
|
|
|
+ ttStatus = taskTrackerManager.getTaskTracker("tt2").getResourceStatus();
|
|
|
+ ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
|
|
|
+ ttStatus.setReservedVirtualMemory(0);
|
|
|
+ ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
|
|
|
+ ttStatus.setReservedPhysicalMemory(0);
|
|
|
+
|
|
|
+
|
|
|
+ taskTrackerManager.addQueues(new String[] { "default" });
|
|
|
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
|
|
|
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
|
|
|
+ resConf.setFakeQueues(queues);
|
|
|
+ scheduler.setTaskTrackerManager(taskTrackerManager);
|
|
|
+ // enabled memory-based scheduling
|
|
|
+ scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
|
|
|
+ 1 * 1024 * 1024 * 1024L);
|
|
|
+ scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
|
|
|
+ 3 * 1024 * 1024 * 1024L);
|
|
|
+ resConf.setDefaultPercentOfPmemInVmem(33.3f);
|
|
|
+ resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
|
|
|
+ scheduler.setResourceManagerConf(resConf);
|
|
|
+ scheduler.start();
|
|
|
+
|
|
|
+
|
|
|
+ JobConf jConf = new JobConf();
|
|
|
+ jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
|
|
|
+ jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
|
|
|
+ jConf.setNumMapTasks(1);
|
|
|
+ jConf.setNumReduceTasks(0);
|
|
|
+ jConf.setQueueName("default");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ jConf.setMapSpeculativeExecution(true);
|
|
|
+ jConf.setReduceSpeculativeExecution(false);
|
|
|
+ FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
|
|
|
+ jConf, taskTrackerManager,"u1");
|
|
|
+
|
|
|
+ //Submit a high memory job with speculative tasks.
|
|
|
+ taskTrackerManager.submitJob(job1);
|
|
|
+
|
|
|
+ jConf = new JobConf();
|
|
|
+ jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
|
|
|
+ jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
|
|
|
+ jConf.setNumMapTasks(1);
|
|
|
+ jConf.setNumReduceTasks(0);
|
|
|
+ jConf.setQueueName("default");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ jConf.setMapSpeculativeExecution(false);
|
|
|
+ jConf.setReduceSpeculativeExecution(false);
|
|
|
+ //Submit normal job
|
|
|
+ FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
|
|
|
+
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
+
|
|
|
+ // first, a map from j1 will run
|
|
|
+ // at this point, there is a speculative task for the same job to be
|
|
|
+ //scheduled. This task would be scheduled. Till the tasks from job1 gets
|
|
|
+ //complete none of the tasks from other jobs would be scheduled.
|
|
|
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
|
|
|
+ assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
|
|
|
+ //make same tracker get back, check if you are blocking. Your job
|
|
|
+ //has speculative map task so tracker should be blocked even tho' it
|
|
|
+ //can run job2's map.
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
+ //TT2 now gets speculative map of the job1
|
|
|
+ checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
|
|
|
+
|
|
|
+ // Now since the first job has no more speculative maps, it can schedule
|
|
|
+ // the second job.
|
|
|
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
|
|
|
+
|
|
|
+ //finish everything
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0",
|
|
|
+ job1);
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1",
|
|
|
+ job1);
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0",
|
|
|
+ job2);
|
|
|
+ taskTrackerManager.finalizeJob(job1);
|
|
|
+ taskTrackerManager.finalizeJob(job2);
|
|
|
+
|
|
|
+ //Now submit high ram job with speculative reduce and check.
|
|
|
+ jConf = new JobConf();
|
|
|
+ jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
|
|
|
+ jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
|
|
|
+ jConf.setNumMapTasks(1);
|
|
|
+ jConf.setNumReduceTasks(1);
|
|
|
+ jConf.setQueueName("default");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ jConf.setMapSpeculativeExecution(false);
|
|
|
+ jConf.setReduceSpeculativeExecution(true);
|
|
|
+ FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
|
|
|
+ jConf, taskTrackerManager,"u1");
|
|
|
+
|
|
|
+ //Submit a high memory job with speculative reduce tasks.
|
|
|
+ taskTrackerManager.submitJob(job3);
|
|
|
+
|
|
|
+ jConf = new JobConf();
|
|
|
+ jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
|
|
|
+ jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
|
|
|
+ jConf.setNumMapTasks(1);
|
|
|
+ jConf.setNumReduceTasks(1);
|
|
|
+ jConf.setQueueName("default");
|
|
|
+ jConf.setUser("u1");
|
|
|
+ jConf.setMapSpeculativeExecution(false);
|
|
|
+ jConf.setReduceSpeculativeExecution(false);
|
|
|
+ //Submit normal job
|
|
|
+ FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
|
|
|
+
|
|
|
+ controlledInitializationPoller.selectJobsToInitialize();
|
|
|
+ raiseStatusChangeEvents(scheduler.jobQueuesManager);
|
|
|
+ //all maps of jobs get assigned to same task tracker as
|
|
|
+ //job does not have speculative map and same tracker sends two heart
|
|
|
+ //beat back to back.
|
|
|
+ checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
|
|
|
+ //first map slot gets attention on this tracker.
|
|
|
+ checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
|
|
|
+ //now first reduce of the job3 would be scheduled on tt2 since it has
|
|
|
+ //memory.
|
|
|
+ //assigntasks() would check for free reduce slot is greater than
|
|
|
+ //map slots. Seeing there is more free reduce slot it would try scheduling
|
|
|
+ //reduce of job1 but would block as in it is a high memory task.
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
+ //TT2 would get the reduce task from high memory job as the tt is running
|
|
|
+ //normal jobs map. which is low mem.
|
|
|
+ checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
|
|
|
+ // now if either TT comes back, it will block because all maps
|
|
|
+ // are done, and the first jobs reduce has a speculative task.
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt2")));
|
|
|
+ //finish maps.
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0",
|
|
|
+ job3);
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0004_m_000001_0",
|
|
|
+ job4);
|
|
|
+ //check speculative reduce code path is covered.
|
|
|
+ assertEquals("Pending reduces not zero for high " +
|
|
|
+ "ram job with speculative reduce.", 0, job3.pendingReduces());
|
|
|
+ //if tt2 returns back it is not given any task even if it can schedule
|
|
|
+ //job2 reduce.
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt2")));
|
|
|
+ //speculative reduce of the job3 would be scheduled.
|
|
|
+ checkAssignment("tt1", "attempt_test_0003_r_000001_1 on tt1");
|
|
|
+ //now both speculative and actual task have been scheduled for job3.
|
|
|
+ //Normal task of Job4 would now be scheduled on TT1 as it has free space
|
|
|
+ //to run.
|
|
|
+ checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
|
|
|
+ //No more tasks.
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt2")));
|
|
|
+ assertNull(scheduler.assignTasks(tracker("tt1")));
|
|
|
+
|
|
|
+ //finish all the reduces.
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_1",
|
|
|
+ job3);
|
|
|
+ taskTrackerManager.finishTask("tt2", "attempt_test_0003_r_000001_0",
|
|
|
+ job3);
|
|
|
+ //finish the job
|
|
|
+ taskTrackerManager.finalizeJob(job3);
|
|
|
+ //finish the task and the job.
|
|
|
+ taskTrackerManager.finishTask("tt1", "attempt_test_0004_r_000001_0",
|
|
|
+ job4);
|
|
|
+ taskTrackerManager.finalizeJob(job4);
|
|
|
+
|
|
|
+ }
|
|
|
|
|
|
private void checkFailedInitializedJobMovement() throws IOException {
|
|
|
|