|
@@ -108,8 +108,6 @@ class JobInProgress {
|
|
|
|
|
|
private LocalFileSystem localFs;
|
|
|
private String jobId;
|
|
|
- private boolean hasSpeculativeMaps;
|
|
|
- private boolean hasSpeculativeReduces;
|
|
|
|
|
|
// Per-job counters
|
|
|
public static enum Counter {
|
|
@@ -181,8 +179,6 @@ class JobInProgress {
|
|
|
this.jobMetrics.setTag("sessionId", conf.getSessionId());
|
|
|
this.jobMetrics.setTag("jobName", conf.getJobName());
|
|
|
this.jobMetrics.setTag("jobId", jobid);
|
|
|
- hasSpeculativeMaps = conf.getMapSpeculativeExecution();
|
|
|
- hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -636,7 +632,7 @@ class JobInProgress {
|
|
|
|
|
|
|
|
|
int target = findNewTask(tts, clusterSize, status.mapProgress(),
|
|
|
- maps, nodesToMaps, hasSpeculativeMaps);
|
|
|
+ maps, nodesToMaps);
|
|
|
if (target == -1) {
|
|
|
return null;
|
|
|
}
|
|
@@ -672,7 +668,7 @@ class JobInProgress {
|
|
|
}
|
|
|
|
|
|
int target = findNewTask(tts, clusterSize, status.reduceProgress() ,
|
|
|
- reduces, null, hasSpeculativeReduces);
|
|
|
+ reduces, null);
|
|
|
if (target == -1) {
|
|
|
return null;
|
|
|
}
|
|
@@ -758,11 +754,10 @@ class JobInProgress {
|
|
|
return trackerErrors;
|
|
|
}
|
|
|
|
|
|
- private boolean shouldRunSpeculativeTask(long currentTime,
|
|
|
- TaskInProgress task,
|
|
|
- double avgProgress,
|
|
|
- String taskTracker) {
|
|
|
- return task.hasSpeculativeTask(currentTime, avgProgress) &&
|
|
|
+ private boolean shouldRunSpeculativeTask(TaskInProgress task,
|
|
|
+ double avgProgress,
|
|
|
+ String taskTracker) {
|
|
|
+ return task.hasSpeculativeTask(avgProgress) &&
|
|
|
!task.hasRunOnMachine(taskTracker);
|
|
|
}
|
|
|
|
|
@@ -774,15 +769,13 @@ class JobInProgress {
|
|
|
* @param tasks The list of potential tasks to try
|
|
|
* @param firstTaskToTry The first index in tasks to check
|
|
|
* @param cachedTasks A list of tasks that would like to run on this node
|
|
|
- * @param hasSpeculative Should it try to find speculative tasks
|
|
|
* @return the index in tasks of the selected task (or -1 for no task)
|
|
|
*/
|
|
|
private int findNewTask(TaskTrackerStatus tts,
|
|
|
int clusterSize,
|
|
|
double avgProgress,
|
|
|
TaskInProgress[] tasks,
|
|
|
- Map<Node,List<TaskInProgress>> cachedTasks,
|
|
|
- boolean hasSpeculative) {
|
|
|
+ Map<Node,List<TaskInProgress>> cachedTasks) {
|
|
|
String taskTracker = tts.getTrackerName();
|
|
|
int specTarget = -1;
|
|
|
|
|
@@ -806,7 +799,6 @@ class JobInProgress {
|
|
|
}
|
|
|
return -1;
|
|
|
}
|
|
|
- long currentTime = System.currentTimeMillis();
|
|
|
|
|
|
//
|
|
|
// See if there is a split over a block that is stored on
|
|
@@ -853,9 +845,8 @@ class JobInProgress {
|
|
|
}
|
|
|
return cacheTarget;
|
|
|
}
|
|
|
- if (hasSpeculative && specTarget == -1 &&
|
|
|
- shouldRunSpeculativeTask(currentTime, tip, avgProgress,
|
|
|
- taskTracker)) {
|
|
|
+ if (specTarget == -1 &&
|
|
|
+ shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
|
|
|
specTarget = tip.getIdWithinJob();
|
|
|
}
|
|
|
}
|
|
@@ -890,9 +881,8 @@ class JobInProgress {
|
|
|
if (!isRunning) {
|
|
|
LOG.info("Choosing normal task " + tasks[i].getTIPId());
|
|
|
return i;
|
|
|
- } else if (hasSpeculative && specTarget == -1 &&
|
|
|
- shouldRunSpeculativeTask(currentTime, task, avgProgress,
|
|
|
- taskTracker)) {
|
|
|
+ } else if (specTarget == -1 &&
|
|
|
+ shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
|
|
|
specTarget = i;
|
|
|
}
|
|
|
}
|