|
@@ -342,28 +342,32 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
private static class TaskLookupResult {
|
|
|
|
|
|
static enum LookUpStatus {
|
|
|
- TASK_FOUND,
|
|
|
+ LOCAL_TASK_FOUND,
|
|
|
NO_TASK_FOUND,
|
|
|
TASK_FAILING_MEMORY_REQUIREMENT,
|
|
|
+ OFF_SWITCH_TASK_FOUND
|
|
|
}
|
|
|
// constant TaskLookupResult objects. Should not be accessed directly.
|
|
|
private static final TaskLookupResult NoTaskLookupResult =
|
|
|
- new TaskLookupResult(null, TaskLookupResult.LookUpStatus.NO_TASK_FOUND);
|
|
|
+ new TaskLookupResult(null, null,
|
|
|
+ TaskLookupResult.LookUpStatus.NO_TASK_FOUND);
|
|
|
private static final TaskLookupResult MemFailedLookupResult =
|
|
|
- new TaskLookupResult(null,
|
|
|
+ new TaskLookupResult(null, null,
|
|
|
TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT);
|
|
|
|
|
|
private LookUpStatus lookUpStatus;
|
|
|
private Task task;
|
|
|
-
|
|
|
+ private JobInProgress job;
|
|
|
+
|
|
|
// should not call this constructor directly. use static factory methods.
|
|
|
- private TaskLookupResult(Task t, LookUpStatus lUStatus) {
|
|
|
+ private TaskLookupResult(Task t, JobInProgress job, LookUpStatus lUStatus) {
|
|
|
this.task = t;
|
|
|
+ this.job = job;
|
|
|
this.lookUpStatus = lUStatus;
|
|
|
}
|
|
|
|
|
|
- static TaskLookupResult getTaskFoundResult(Task t) {
|
|
|
- return new TaskLookupResult(t, LookUpStatus.TASK_FOUND);
|
|
|
+ static TaskLookupResult getTaskFoundResult(Task t, JobInProgress job) {
|
|
|
+ return new TaskLookupResult(t, job, LookUpStatus.LOCAL_TASK_FOUND);
|
|
|
}
|
|
|
static TaskLookupResult getNoTaskFoundResult() {
|
|
|
return NoTaskLookupResult;
|
|
@@ -371,12 +375,19 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
static TaskLookupResult getMemFailedResult() {
|
|
|
return MemFailedLookupResult;
|
|
|
}
|
|
|
-
|
|
|
+ static TaskLookupResult getOffSwitchTaskFoundResult(Task t,
|
|
|
+ JobInProgress job) {
|
|
|
+ return new TaskLookupResult(t, job, LookUpStatus.OFF_SWITCH_TASK_FOUND);
|
|
|
+ }
|
|
|
|
|
|
Task getTask() {
|
|
|
return task;
|
|
|
}
|
|
|
|
|
|
+ JobInProgress getJob() {
|
|
|
+ return job;
|
|
|
+ }
|
|
|
+
|
|
|
LookUpStatus getLookUpStatus() {
|
|
|
return lookUpStatus;
|
|
|
}
|
|
@@ -394,8 +405,11 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
protected CapacityTaskScheduler scheduler;
|
|
|
protected TaskType type = null;
|
|
|
|
|
|
- abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
|
|
|
- JobInProgress job) throws IOException;
|
|
|
+ abstract void updateTSI(QueueSchedulingInfo qsi, String user,
|
|
|
+ int numRunningTasks, int numSlotsOccupied);
|
|
|
+
|
|
|
+ abstract TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker,
|
|
|
+ JobInProgress job, boolean assignOffSwitch) throws IOException;
|
|
|
|
|
|
int getSlotsOccupied(JobInProgress job) {
|
|
|
return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) *
|
|
@@ -557,7 +571,9 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
* Always return a TaskLookupResult object. Don't return null.
|
|
|
*/
|
|
|
private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
|
|
|
- QueueSchedulingInfo qsi)
|
|
|
+ int availableSlots,
|
|
|
+ QueueSchedulingInfo qsi,
|
|
|
+ boolean assignOffSwitch)
|
|
|
throws IOException {
|
|
|
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
|
|
|
// we only look at jobs in the running queues, as these are the ones
|
|
@@ -583,13 +599,18 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
//a task to be scheduled on the task tracker.
|
|
|
//if we find a job then we pass it on.
|
|
|
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
|
|
|
- taskTrackerStatus)) {
|
|
|
+ taskTrackerStatus,
|
|
|
+ availableSlots)) {
|
|
|
// We found a suitable job. Get task from it.
|
|
|
- Task t = obtainNewTask(taskTrackerStatus, j);
|
|
|
+ TaskLookupResult tlr =
|
|
|
+ obtainNewTask(taskTrackerStatus, j, assignOffSwitch);
|
|
|
//if there is a task return it immediately.
|
|
|
- if (t != null) {
|
|
|
+ if (tlr.getLookUpStatus() ==
|
|
|
+ TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||
|
|
|
+ tlr.getLookUpStatus() ==
|
|
|
+ TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
|
|
|
// we're successful in getting a task
|
|
|
- return TaskLookupResult.getTaskFoundResult(t);
|
|
|
+ return tlr;
|
|
|
} else {
|
|
|
//skip to the next job in the queue.
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -648,13 +669,17 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
}
|
|
|
|
|
|
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
|
|
|
- taskTrackerStatus)) {
|
|
|
+ taskTrackerStatus, availableSlots)) {
|
|
|
// We found a suitable job. Get task from it.
|
|
|
- Task t = obtainNewTask(taskTrackerStatus, j);
|
|
|
- //if there is a task return it immediately.
|
|
|
- if (t != null) {
|
|
|
+ TaskLookupResult tlr =
|
|
|
+ obtainNewTask(taskTrackerStatus, j, assignOffSwitch);
|
|
|
+
|
|
|
+ if (tlr.getLookUpStatus() ==
|
|
|
+ TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||
|
|
|
+ tlr.getLookUpStatus() ==
|
|
|
+ TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
|
|
|
// we're successful in getting a task
|
|
|
- return TaskLookupResult.getTaskFoundResult(t);
|
|
|
+ return tlr;
|
|
|
} else {
|
|
|
//skip to the next job in the queue.
|
|
|
continue;
|
|
@@ -682,7 +707,9 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
// Always return a TaskLookupResult object. Don't return null.
|
|
|
// The caller is responsible for ensuring that the QSI objects and the
|
|
|
// collections are up-to-date.
|
|
|
- private TaskLookupResult assignTasks(TaskTracker taskTracker)
|
|
|
+ private TaskLookupResult assignTasks(TaskTracker taskTracker,
|
|
|
+ int availableSlots,
|
|
|
+ boolean assignOffSwitch)
|
|
|
throws IOException {
|
|
|
TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
|
|
|
|
|
@@ -691,7 +718,6 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
// Check if this tasktracker has been reserved for a job...
|
|
|
JobInProgress job = taskTracker.getJobForFallowSlot(type);
|
|
|
if (job != null) {
|
|
|
- int availableSlots = taskTracker.getAvailableSlots(type);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " +
|
|
|
taskTracker.getTrackerName() + " with " + availableSlots +
|
|
@@ -703,17 +729,11 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
taskTracker.unreserveSlots(type, job);
|
|
|
|
|
|
// We found a suitable job. Get task from it.
|
|
|
- Task t = obtainNewTask(taskTrackerStatus, job);
|
|
|
- //if there is a task return it immediately.
|
|
|
- if (t != null) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.info(job.getJobID() + ": Got " + t.getTaskID() +
|
|
|
- " for reserved tasktracker " +
|
|
|
- taskTracker.getTrackerName());
|
|
|
- }
|
|
|
- // we're successful in getting a task
|
|
|
- return TaskLookupResult.getTaskFoundResult(t);
|
|
|
- }
|
|
|
+ if (type == TaskType.MAP) {
|
|
|
+ // Don't care about locality!
|
|
|
+ job.overrideSchedulingOpportunities();
|
|
|
+ }
|
|
|
+ return obtainNewTask(taskTrackerStatus, job, true);
|
|
|
} else {
|
|
|
// Re-reserve the current tasktracker
|
|
|
taskTracker.reserveSlots(type, job, availableSlots);
|
|
@@ -740,7 +760,8 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
if(this.areTasksInQueueOverMaxCapacity(qsi,1)) {
|
|
|
continue;
|
|
|
}
|
|
|
- TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
|
|
|
+ TaskLookupResult tlr =
|
|
|
+ getTaskFromQueue(taskTracker, availableSlots, qsi, assignOffSwitch);
|
|
|
TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
|
|
|
|
|
|
if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
|
|
@@ -748,7 +769,8 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
}
|
|
|
|
|
|
// if we find a task, return
|
|
|
- if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
|
|
|
+ if (lookUpStatus == TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND ||
|
|
|
+ lookUpStatus == TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND) {
|
|
|
return tlr;
|
|
|
}
|
|
|
// if there was a memory mismatch, return
|
|
@@ -817,6 +839,22 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
}
|
|
|
LOG.debug(s);
|
|
|
}
|
|
|
+
|
|
|
+ StringBuffer s = new StringBuffer();
|
|
|
+ for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
|
|
|
+ TaskSchedulingInfo tsi = getTSI(qsi);
|
|
|
+ Collection<JobInProgress> runJobs =
|
|
|
+ scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
|
|
|
+ s.append(
|
|
|
+ String.format(
|
|
|
+ " Queue '%s'(%s): runningTasks=%d, "
|
|
|
+ + "occupiedSlots=%d, capacity=%d, runJobs=%d maxCapacity=%d ",
|
|
|
+ qsi.queueName,
|
|
|
+ this.type, Integer.valueOf(tsi.numRunningTasks), Integer
|
|
|
+ .valueOf(tsi.numSlotsOccupied), Integer
|
|
|
+ .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
|
|
|
+ Integer.valueOf(tsi.getMaxCapacity())));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -854,13 +892,47 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
|
|
|
+ void updateTSI(QueueSchedulingInfo qsi, String user,
|
|
|
+ int numRunningTasks, int numSlotsOccupied) {
|
|
|
+ qsi.mapTSI.numRunningTasks += numRunningTasks;
|
|
|
+ qsi.mapTSI.numSlotsOccupied += numSlotsOccupied;
|
|
|
+ Integer i = qsi.mapTSI.numSlotsOccupiedByUser.get(user);
|
|
|
+ int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
|
|
|
+ qsi.mapTSI.numSlotsOccupiedByUser.put(user, slots);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker,
|
|
|
+ JobInProgress job, boolean assignOffSwitch)
|
|
|
throws IOException {
|
|
|
ClusterStatus clusterStatus =
|
|
|
scheduler.taskTrackerManager.getClusterStatus();
|
|
|
int numTaskTrackers = clusterStatus.getTaskTrackers();
|
|
|
- return job.obtainNewMapTask(taskTracker, numTaskTrackers,
|
|
|
- scheduler.taskTrackerManager.getNumberOfUniqueHosts());
|
|
|
+ int numUniqueHosts = scheduler.taskTrackerManager.getNumberOfUniqueHosts();
|
|
|
+
|
|
|
+ // Inform the job it is about to get a scheduling opportunity
|
|
|
+ job.schedulingOpportunity();
|
|
|
+
|
|
|
+ // First, try to get a 'local' task
|
|
|
+ Task t =
|
|
|
+ job.obtainNewLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
|
|
|
+
|
|
|
+ if (t != null) {
|
|
|
+ return TaskLookupResult.getTaskFoundResult(t, job);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Next, try to get an 'off-switch' task if appropriate
|
|
|
+ // Do not bother as much about locality for High-RAM jobs
|
|
|
+ if (job.getNumSlotsPerMap() > 1 ||
|
|
|
+ (assignOffSwitch &&
|
|
|
+ job.scheduleOffSwitch(numTaskTrackers))) {
|
|
|
+ t =
|
|
|
+ job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers, numUniqueHosts);
|
|
|
+ }
|
|
|
+
|
|
|
+ return (t != null) ?
|
|
|
+ TaskLookupResult.getOffSwitchTaskFoundResult(t, job) :
|
|
|
+ TaskLookupResult.getNoTaskFoundResult();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -915,13 +987,27 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
|
|
|
+ void updateTSI(QueueSchedulingInfo qsi, String user,
|
|
|
+ int numRunningTasks, int numSlotsOccupied) {
|
|
|
+ qsi.reduceTSI.numRunningTasks += numRunningTasks;
|
|
|
+ qsi.reduceTSI.numSlotsOccupied += numSlotsOccupied;
|
|
|
+ Integer i = qsi.reduceTSI.numSlotsOccupiedByUser.get(user);
|
|
|
+ qsi.reduceTSI.numSlotsOccupiedByUser.put(user,
|
|
|
+ Integer.valueOf(i.intValue() + numSlotsOccupied));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ TaskLookupResult obtainNewTask(TaskTrackerStatus taskTracker,
|
|
|
+ JobInProgress job, boolean unused)
|
|
|
throws IOException {
|
|
|
ClusterStatus clusterStatus =
|
|
|
scheduler.taskTrackerManager.getClusterStatus();
|
|
|
int numTaskTrackers = clusterStatus.getTaskTrackers();
|
|
|
- return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
|
|
|
+ Task t = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
|
|
|
scheduler.taskTrackerManager.getNumberOfUniqueHosts());
|
|
|
+
|
|
|
+ return (t != null) ? TaskLookupResult.getTaskFoundResult(t, job) :
|
|
|
+ TaskLookupResult.getNoTaskFoundResult();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -998,7 +1084,6 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
private long memSizeForReduceSlotOnJT;
|
|
|
private long limitMaxMemForMapTasks;
|
|
|
private long limitMaxMemForReduceTasks;
|
|
|
- private boolean assignMultipleTasks = true;
|
|
|
|
|
|
public CapacityTaskScheduler() {
|
|
|
this(new Clock());
|
|
@@ -1244,6 +1329,8 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
|
|
|
mapScheduler.updateCollectionOfQSIs();
|
|
|
reduceScheduler.updateCollectionOfQSIs();
|
|
|
+ mapScheduler.printQSIs();
|
|
|
+ reduceScheduler.printQSIs();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1307,23 +1394,20 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
int numReservedReduceSlotsForThisJob =
|
|
|
(reduceScheduler.getNumReservedTaskTrackers(j) *
|
|
|
reduceScheduler.getSlotsPerTask(j));
|
|
|
+
|
|
|
j.setSchedulingInfo(getJobQueueSchedInfo(numMapsRunningForThisJob,
|
|
|
numRunningMapSlots,
|
|
|
numReservedMapSlotsForThisJob,
|
|
|
numReducesRunningForThisJob,
|
|
|
numRunningReduceSlots,
|
|
|
numReservedReduceSlotsForThisJob));
|
|
|
- qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
|
|
|
- qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
|
|
|
- qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
|
|
|
- qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
|
|
|
- Integer i =
|
|
|
- qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
|
|
|
- qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
|
|
|
- Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
|
|
|
- i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
|
|
|
- qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
|
|
|
- Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
|
|
|
+
|
|
|
+ mapScheduler.updateTSI(qsi, j.getProfile().getUser(),
|
|
|
+ numMapsRunningForThisJob, numMapSlotsForThisJob);
|
|
|
+ reduceScheduler.updateTSI(qsi, j.getProfile().getUser(),
|
|
|
+ numReducesRunningForThisJob,
|
|
|
+ numReduceSlotsForThisJob);
|
|
|
+
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
|
|
|
+ "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
|
|
@@ -1372,18 +1456,6 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
return sb.toString();
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Sets whether the scheduler can assign multiple tasks in a heartbeat
|
|
|
- * or not.
|
|
|
- *
|
|
|
- * This method is used only for testing purposes.
|
|
|
- *
|
|
|
- * @param assignMultipleTasks true, to assign multiple tasks per heartbeat
|
|
|
- */
|
|
|
- void setAssignMultipleTasks(boolean assignMultipleTasks) {
|
|
|
- this.assignMultipleTasks = assignMultipleTasks;
|
|
|
- }
|
|
|
-
|
|
|
/*
|
|
|
* The grand plan for assigning a task.
|
|
|
*
|
|
@@ -1427,33 +1499,12 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
* becomes expensive, do it once every few heartbeats only.
|
|
|
*/
|
|
|
updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
|
|
|
+
|
|
|
+ // schedule tasks
|
|
|
List<Task> result = new ArrayList<Task>();
|
|
|
- if (assignMultipleTasks) {
|
|
|
- addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
|
|
|
- addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
|
|
|
- } else {
|
|
|
- /*
|
|
|
- * If TT has Map and Reduce slot free, we need to figure out whether to
|
|
|
- * give it a Map or Reduce task.
|
|
|
- * Number of ways to do this. For now, base decision on how much is needed
|
|
|
- * versus how much is used (default to Map, if equal).
|
|
|
- */
|
|
|
- if ((maxReduceSlots - currentReduceSlots)
|
|
|
- > (maxMapSlots - currentMapSlots)) {
|
|
|
- addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
|
|
|
- if (result.size() == 0) {
|
|
|
- addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
|
|
|
- }
|
|
|
- } else {
|
|
|
- addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
|
|
|
- if (result.size() == 0) {
|
|
|
- addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
|
|
|
- }
|
|
|
- }
|
|
|
- if (result.size() == 0) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- }
|
|
|
+ addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
|
|
|
+ int numMapsAssigned = result.size();
|
|
|
+ addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
|
|
|
return result;
|
|
|
}
|
|
|
|
|
@@ -1462,10 +1513,12 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
private void addReduceTask(TaskTracker taskTracker, List<Task> tasks,
|
|
|
int maxReduceSlots, int currentReduceSlots)
|
|
|
throws IOException {
|
|
|
- if (maxReduceSlots > currentReduceSlots) {
|
|
|
+ int availableSlots = maxReduceSlots - currentReduceSlots;
|
|
|
+ if (availableSlots > 0) {
|
|
|
reduceScheduler.updateCollectionOfQSIs();
|
|
|
- TaskLookupResult tlr = reduceScheduler.assignTasks(taskTracker);
|
|
|
- if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
|
|
|
+ TaskLookupResult tlr =
|
|
|
+ reduceScheduler.assignTasks(taskTracker, availableSlots, true);
|
|
|
+ if (TaskLookupResult.LookUpStatus.LOCAL_TASK_FOUND == tlr.getLookUpStatus()) {
|
|
|
tasks.add(tlr.getTask());
|
|
|
}
|
|
|
}
|
|
@@ -1473,15 +1526,37 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
|
|
|
// Pick a map task and add to the list of tasks, if there's space
|
|
|
// on the TT to run one.
|
|
|
- private void addMapTask(TaskTracker taskTracker, List<Task> tasks,
|
|
|
+ private void addMapTasks(TaskTracker taskTracker, List<Task> tasks,
|
|
|
int maxMapSlots, int currentMapSlots)
|
|
|
throws IOException {
|
|
|
- if (maxMapSlots > currentMapSlots) {
|
|
|
+ int availableSlots = maxMapSlots - currentMapSlots;
|
|
|
+ boolean assignOffSwitch = true;
|
|
|
+ while (availableSlots > 0) {
|
|
|
mapScheduler.updateCollectionOfQSIs();
|
|
|
- TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker);
|
|
|
- if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
|
|
|
- tasks.add(tlr.getTask());
|
|
|
+ TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker,
|
|
|
+ availableSlots,
|
|
|
+ assignOffSwitch);
|
|
|
+ if (TaskLookupResult.LookUpStatus.NO_TASK_FOUND ==
|
|
|
+ tlr.getLookUpStatus() ||
|
|
|
+ TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT ==
|
|
|
+ tlr.getLookUpStatus()) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
+ Task t = tlr.getTask();
|
|
|
+ JobInProgress job = tlr.getJob();
|
|
|
+
|
|
|
+ tasks.add(t);
|
|
|
+
|
|
|
+ if (TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND ==
|
|
|
+ tlr.getLookUpStatus()) {
|
|
|
+ // Atmost 1 off-switch task per-heartbeat
|
|
|
+ assignOffSwitch = false;
|
|
|
}
|
|
|
+ availableSlots -= t.getNumSlotsRequired();
|
|
|
+ mapScheduler.updateTSI(queueInfoMap.get(job.getProfile().getQueueName()),
|
|
|
+ job.getProfile().getUser(), 1,
|
|
|
+ t.getNumSlotsRequired());
|
|
|
}
|
|
|
}
|
|
|
|