|
@@ -311,167 +311,132 @@ class JobInProgress {
|
|
|
/**
|
|
|
* Return a MapTask, if appropriate, to run on the given tasktracker
|
|
|
*/
|
|
|
- public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) {
|
|
|
+ public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) {
|
|
|
+ if (! tasksInited) {
|
|
|
+ LOG.info("Cannot create task split for " + profile.getJobId());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
|
|
|
+ double avgProgress = status.mapProgress() / maps.length;
|
|
|
+ int target = findNewTask(tts, clusterSize, avgProgress,
|
|
|
+ maps, firstMapToTry, mapCache);
|
|
|
+ if (target == -1) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ boolean wasRunning = maps[target].isRunning();
|
|
|
+ Task result = maps[target].getTaskToRun(tts.getTrackerName());
|
|
|
+ if (!wasRunning) {
|
|
|
+ runningMapTasks += 1;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Return a ReduceTask, if appropriate, to run on the given tasktracker.
|
|
|
+ * We don't have cache-sensitivity for reduce tasks, as they
|
|
|
+ * work on temporary MapRed files.
|
|
|
+ */
|
|
|
+ public Task obtainNewReduceTask(TaskTrackerStatus tts,
|
|
|
+ int clusterSize) {
|
|
|
if (! tasksInited) {
|
|
|
LOG.info("Cannot create task split for " + profile.getJobId());
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- Task t = null;
|
|
|
- int cacheTarget = -1;
|
|
|
- int stdTarget = -1;
|
|
|
- int specTarget = -1;
|
|
|
- int failedTarget = -1;
|
|
|
-
|
|
|
- //
|
|
|
- // We end up creating two tasks for the same bucket, because
|
|
|
- // we call obtainNewMapTask() really fast, twice in a row.
|
|
|
- // There's not enough time for the "recentTasks"
|
|
|
- //
|
|
|
-
|
|
|
- //
|
|
|
- // Compute avg progress through the map tasks
|
|
|
- //
|
|
|
- double avgProgress = status.mapProgress() / maps.length;
|
|
|
-
|
|
|
+ double avgProgress = status.reduceProgress() / reduces.length;
|
|
|
+ int target = findNewTask(tts, clusterSize, avgProgress,
|
|
|
+ reduces, firstReduceToTry, null);
|
|
|
+ if (target == -1) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ boolean wasRunning = reduces[target].isRunning();
|
|
|
+ Task result = reduces[target].getTaskToRun(tts.getTrackerName());
|
|
|
+ if (!wasRunning) {
|
|
|
+ runningReduceTasks += 1;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Find a new task to run.
|
|
|
+ * @param tts The task tracker that is asking for a task
|
|
|
+ * @param clusterSize The number of task trackers in the cluster
|
|
|
+ * @param avgProgress The average progress of this kind of task in this job
|
|
|
+ * @param tasks The list of potential tasks to try
|
|
|
+ * @param firstTaskToTry The first index in tasks to check
|
|
|
+ * @param cachedTasks A list of tasks that would like to run on this node
|
|
|
+ * @return the index in tasks of the selected task (or -1 for no task)
|
|
|
+ */
|
|
|
+ private int findNewTask(TaskTrackerStatus tts,
|
|
|
+ int clusterSize,
|
|
|
+ double avgProgress,
|
|
|
+ TaskInProgress[] tasks,
|
|
|
+ int firstTaskToTry,
|
|
|
+ List cachedTasks) {
|
|
|
+ String taskTracker = tts.getTrackerName();
|
|
|
//
|
|
|
// See if there is a split over a block that is stored on
|
|
|
// the TaskTracker checking in. That means the block
|
|
|
// doesn't have to be transmitted from another node.
|
|
|
//
|
|
|
- ArrayList hostMaps = (ArrayList)hostToMaps.get(tts.getHost());
|
|
|
- if (hostMaps != null) {
|
|
|
- Iterator i = hostMaps.iterator();
|
|
|
+ if (cachedTasks != null) {
|
|
|
+ Iterator i = cachedTasks.iterator();
|
|
|
while (i.hasNext()) {
|
|
|
TaskInProgress tip = (TaskInProgress)i.next();
|
|
|
- if (tip.hasTask() && !tip.hasFailedOnMachine(taskTracker)) {
|
|
|
- LOG.info("Found task with local split for "+tts.getHost());
|
|
|
- cacheTarget = tip.getIdWithinJob();
|
|
|
- i.remove();
|
|
|
- break;
|
|
|
+ i.remove();
|
|
|
+ if (tip.isRunnable() &&
|
|
|
+ !tip.isRunning() &&
|
|
|
+ !tip.hasFailedOnMachine(taskTracker)) {
|
|
|
+ LOG.info("Choosing cached task " + tip.getTIPId());
|
|
|
+ int cacheTarget = tip.getIdWithinJob();
|
|
|
+ return cacheTarget;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+
|
|
|
//
|
|
|
// If there's no cached target, see if there's
|
|
|
// a std. task to run.
|
|
|
//
|
|
|
- if (cacheTarget < 0) {
|
|
|
- for (int i = 0; i < maps.length; i++) {
|
|
|
- int realIdx = (i + firstMapToTry) % maps.length;
|
|
|
- if (maps[realIdx].hasTask()) {
|
|
|
- if (stdTarget < 0) {
|
|
|
- if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
|
|
|
- if (failedTarget < 0) {
|
|
|
- failedTarget = realIdx;
|
|
|
- }
|
|
|
- } else {
|
|
|
- stdTarget = realIdx;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // If no cached-target and no std target, see if
|
|
|
- // there's a speculative task to run.
|
|
|
- //
|
|
|
- if (cacheTarget < 0 && stdTarget < 0) {
|
|
|
- for (int i = 0; i < maps.length; i++) {
|
|
|
- int realIdx = (i + firstMapToTry) % maps.length;
|
|
|
- if (maps[realIdx].hasSpeculativeTask(avgProgress)) {
|
|
|
- if (!maps[realIdx].hasFailedOnMachine(taskTracker)) {
|
|
|
- specTarget = realIdx;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- //
|
|
|
- // Run whatever we found
|
|
|
- //
|
|
|
- if (cacheTarget >= 0) {
|
|
|
- t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress);
|
|
|
- runningMapTasks += 1;
|
|
|
- } else if (stdTarget >= 0) {
|
|
|
- t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
|
|
|
- runningMapTasks += 1;
|
|
|
- } else if (specTarget >= 0) {
|
|
|
- //should always be true, but being paranoid
|
|
|
- boolean isRunning = maps[specTarget].isRunning();
|
|
|
- t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
|
|
|
- if (!isRunning){
|
|
|
- runningMapTasks += 1;
|
|
|
- }
|
|
|
- } else if (failedTarget >= 0) {
|
|
|
- //should always be false, but being paranoid again
|
|
|
- boolean isRunning = maps[failedTarget].isRunning();
|
|
|
- t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress);
|
|
|
- if (!isRunning) {
|
|
|
- runningMapTasks += 1;
|
|
|
- }
|
|
|
- }
|
|
|
- return t;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Return a ReduceTask, if appropriate, to run on the given tasktracker.
|
|
|
- * We don't have cache-sensitivity for reduce tasks, as they
|
|
|
- * work on temporary MapRed files.
|
|
|
- */
|
|
|
- public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) {
|
|
|
- if (! tasksInited) {
|
|
|
- LOG.info("Cannot create task split for " + profile.getJobId());
|
|
|
- return null;
|
|
|
- }
|
|
|
-
|
|
|
- Task t = null;
|
|
|
- int stdTarget = -1;
|
|
|
- int specTarget = -1;
|
|
|
int failedTarget = -1;
|
|
|
- double avgProgress = status.reduceProgress() / reduces.length;
|
|
|
-
|
|
|
- for (int i = 0; i < reduces.length; i++) {
|
|
|
- int realIdx = (i + firstReduceToTry) % reduces.length;
|
|
|
- if (reduces[realIdx].hasTask()) {
|
|
|
- if (reduces[realIdx].hasFailedOnMachine(taskTracker)) {
|
|
|
- if (failedTarget < 0) {
|
|
|
- failedTarget = realIdx;
|
|
|
- }
|
|
|
- } else if (stdTarget < 0) {
|
|
|
- stdTarget = realIdx;
|
|
|
- }
|
|
|
- } else if (reduces[realIdx].hasSpeculativeTask(avgProgress)) {
|
|
|
- if (specTarget < 0 &&
|
|
|
- !reduces[realIdx].hasFailedOnMachine(taskTracker)) {
|
|
|
- specTarget = realIdx;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (stdTarget >= 0) {
|
|
|
- t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
|
|
|
- runningReduceTasks += 1;
|
|
|
- } else if (specTarget >= 0) {
|
|
|
- //should be false
|
|
|
- boolean isRunning = reduces[specTarget].isRunning();
|
|
|
- t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
|
|
|
- if (!isRunning){
|
|
|
- runningReduceTasks += 1;
|
|
|
+ int specTarget = -1;
|
|
|
+ for (int i = 0; i < tasks.length; i++) {
|
|
|
+ int realIdx = (i + firstTaskToTry) % tasks.length;
|
|
|
+ TaskInProgress task = tasks[realIdx];
|
|
|
+ if (task.isRunnable()) {
|
|
|
+ // if it failed here and we haven't tried every machine, we
|
|
|
+ // don't schedule it here.
|
|
|
+ boolean hasFailed = task.hasFailedOnMachine(taskTracker);
|
|
|
+ if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {
|
|
|
+ continue;
|
|
|
}
|
|
|
- } else if (failedTarget >= 0) {
|
|
|
- boolean isRunning = reduces[failedTarget].isRunning();
|
|
|
- t = reduces[failedTarget].getTaskToRun(taskTracker, tts,
|
|
|
- avgProgress);
|
|
|
- if (!isRunning){
|
|
|
- runningReduceTasks += 1;
|
|
|
+ boolean isRunning = task.isRunning();
|
|
|
+ if (hasFailed) {
|
|
|
+ // failed tasks that aren't running can be scheduled as a last
|
|
|
+ // resort
|
|
|
+ if (!isRunning && failedTarget == -1) {
|
|
|
+ failedTarget = realIdx;
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (!isRunning) {
|
|
|
+ LOG.info("Choosing normal task " + tasks[realIdx].getTIPId());
|
|
|
+ return realIdx;
|
|
|
+ } else if (specTarget == -1 &&
|
|
|
+ task.hasSpeculativeTask(avgProgress)) {
|
|
|
+ specTarget = realIdx;
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (specTarget != -1) {
|
|
|
+ LOG.info("Choosing speculative task " +
|
|
|
+ tasks[specTarget].getTIPId());
|
|
|
+ } else if (failedTarget != -1) {
|
|
|
+ LOG.info("Choosing failed task " +
|
|
|
+ tasks[failedTarget].getTIPId());
|
|
|
}
|
|
|
- return t;
|
|
|
+ return specTarget != -1 ? specTarget : failedTarget;
|
|
|
}
|
|
|
|
|
|
/**
|