|
@@ -843,7 +843,8 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
TaskLookUpStatus.NO_TASK_IN_JOB, msg);
|
|
|
}
|
|
|
|
|
|
- private List<Task> assignTasks(TaskTrackerStatus taskTracker) throws IOException {
|
|
|
+ // don't return null
|
|
|
+ private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
|
|
|
Task t = null;
|
|
|
|
|
|
/*
|
|
@@ -867,7 +868,8 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
// Queues are sorted so that ones without capacities
|
|
|
// come towards the end. Hence, we can simply return
|
|
|
// from here without considering any further queues.
|
|
|
- return null;
|
|
|
+ return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
|
|
|
+ null);
|
|
|
}
|
|
|
|
|
|
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
|
|
@@ -878,10 +880,9 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
}
|
|
|
|
|
|
if (lookUpStatus == TaskLookUpStatus.TASK_FOUND) {
|
|
|
- t = tlr.getTask();
|
|
|
// we have a task. Update reclaimed resource info
|
|
|
updateReclaimedResources(qsi);
|
|
|
- return Collections.singletonList(t);
|
|
|
+ return tlr;
|
|
|
}
|
|
|
|
|
|
if (lookUpStatus == TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS) {
|
|
@@ -891,13 +892,14 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
LOG.warn(msg);
|
|
|
LOG.warn("Returning nothing to the Tasktracker "
|
|
|
+ taskTracker.trackerName);
|
|
|
- return null;
|
|
|
+ return tlr;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// nothing to give
|
|
|
- return null;
|
|
|
+ return new TaskLookupResult(null, TaskLookUpStatus.NO_TASK_IN_QUEUE,
|
|
|
+ null);
|
|
|
}
|
|
|
|
|
|
private void printQSIs() {
|
|
@@ -1307,7 +1309,7 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
|
|
|
throws IOException {
|
|
|
|
|
|
- List<Task> tasks = null;
|
|
|
+ TaskLookupResult tlr;
|
|
|
/*
|
|
|
* If TT has Map and Reduce slot free, we need to figure out whether to
|
|
|
* give it a Map or Reduce task.
|
|
@@ -1324,22 +1326,54 @@ class CapacityTaskScheduler extends TaskScheduler {
|
|
|
int currentMapTasks = taskTracker.countMapTasks();
|
|
|
int maxReduceTasks = taskTracker.getMaxReduceTasks();
|
|
|
int currentReduceTasks = taskTracker.countReduceTasks();
|
|
|
+
|
|
|
if ((maxReduceTasks - currentReduceTasks) >
|
|
|
(maxMapTasks - currentMapTasks)) {
|
|
|
- tasks = reduceScheduler.assignTasks(taskTracker);
|
|
|
+ // get a reduce task first
|
|
|
+ tlr = reduceScheduler.assignTasks(taskTracker);
|
|
|
+ if (TaskLookUpStatus.TASK_FOUND ==
|
|
|
+ tlr.getLookUpStatus()) {
|
|
|
+ // found a task; return
|
|
|
+ return Collections.singletonList(tlr.getTask());
|
|
|
+ }
|
|
|
+ else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS ==
|
|
|
+ tlr.getLookUpStatus()) {
|
|
|
+ // return no task
|
|
|
+ return null;
|
|
|
+ }
|
|
|
// if we didn't get any, look at map tasks, if TT has space
|
|
|
- if ((null == tasks) && (maxMapTasks > currentMapTasks)) {
|
|
|
- tasks = mapScheduler.assignTasks(taskTracker);
|
|
|
+ else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE ==
|
|
|
+ tlr.getLookUpStatus()) && (maxMapTasks > currentMapTasks)) {
|
|
|
+ tlr = mapScheduler.assignTasks(taskTracker);
|
|
|
+ if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
|
|
|
+ return Collections.singletonList(tlr.getTask());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
else {
|
|
|
- tasks = mapScheduler.assignTasks(taskTracker);
|
|
|
- // if we didn't get any, look at red tasks, if TT has space
|
|
|
- if ((null == tasks) && (maxReduceTasks > currentReduceTasks)) {
|
|
|
- tasks = reduceScheduler.assignTasks(taskTracker);
|
|
|
+ // get a map task first
|
|
|
+ tlr = mapScheduler.assignTasks(taskTracker);
|
|
|
+ if (TaskLookUpStatus.TASK_FOUND ==
|
|
|
+ tlr.getLookUpStatus()) {
|
|
|
+ // found a task; return
|
|
|
+ return Collections.singletonList(tlr.getTask());
|
|
|
+ }
|
|
|
+ else if (TaskLookUpStatus.NO_TASK_MATCHING_MEMORY_REQUIREMENTS ==
|
|
|
+ tlr.getLookUpStatus()) {
|
|
|
+ // return no task
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ // if we didn't get any, look at reduce tasks, if TT has space
|
|
|
+ else if ((TaskLookUpStatus.NO_TASK_IN_QUEUE ==
|
|
|
+ tlr.getLookUpStatus()) && (maxReduceTasks > currentReduceTasks)) {
|
|
|
+ tlr = reduceScheduler.assignTasks(taskTracker);
|
|
|
+ if (TaskLookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
|
|
|
+ return Collections.singletonList(tlr.getTask());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- return tasks;
|
|
|
+
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
/**
|