|
@@ -536,7 +536,7 @@ class JobInProgress {
|
|
}
|
|
}
|
|
|
|
|
|
// Tell the job to fail the relevant task
|
|
// Tell the job to fail the relevant task
|
|
- failedTask(tip, status.getTaskID(), status, status.getTaskTracker(),
|
|
|
|
|
|
+ failedTask(tip, status.getTaskID(), status, ttStatus,
|
|
wasRunning, wasComplete, metrics);
|
|
wasRunning, wasComplete, metrics);
|
|
|
|
|
|
// Did the task failure lead to tip failure?
|
|
// Did the task failure lead to tip failure?
|
|
@@ -652,14 +652,16 @@ class JobInProgress {
|
|
* Return a MapTask, if appropriate, to run on the given tasktracker
|
|
* Return a MapTask, if appropriate, to run on the given tasktracker
|
|
*/
|
|
*/
|
|
public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
|
|
public synchronized Task obtainNewMapTask(TaskTrackerStatus tts,
|
|
- int clusterSize
|
|
|
|
|
|
+ int clusterSize,
|
|
|
|
+ int numUniqueHosts
|
|
) throws IOException {
|
|
) throws IOException {
|
|
if (!tasksInited) {
|
|
if (!tasksInited) {
|
|
LOG.info("Cannot create task split for " + profile.getJobID());
|
|
LOG.info("Cannot create task split for " + profile.getJobID());
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- int target = findNewMapTask(tts, clusterSize, status.mapProgress());
|
|
|
|
|
|
+ int target = findNewMapTask(tts, clusterSize, numUniqueHosts,
|
|
|
|
+ status.mapProgress());
|
|
if (target == -1) {
|
|
if (target == -1) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -685,14 +687,16 @@ class JobInProgress {
|
|
* work on temporary MapRed files.
|
|
* work on temporary MapRed files.
|
|
*/
|
|
*/
|
|
public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
|
|
public synchronized Task obtainNewReduceTask(TaskTrackerStatus tts,
|
|
- int clusterSize
|
|
|
|
|
|
+ int clusterSize,
|
|
|
|
+ int numUniqueHosts
|
|
) throws IOException {
|
|
) throws IOException {
|
|
if (!tasksInited) {
|
|
if (!tasksInited) {
|
|
LOG.info("Cannot create task split for " + profile.getJobID());
|
|
LOG.info("Cannot create task split for " + profile.getJobID());
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- int target = findNewReduceTask(tts, clusterSize, status.reduceProgress());
|
|
|
|
|
|
+ int target = findNewReduceTask(tts, clusterSize, numUniqueHosts,
|
|
|
|
+ status.reduceProgress());
|
|
if (target == -1) {
|
|
if (target == -1) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -940,11 +944,14 @@ class JobInProgress {
|
|
/**
|
|
/**
|
|
* Find a non-running task in the passed list of TIPs
|
|
* Find a non-running task in the passed list of TIPs
|
|
* @param tips a collection of TIPs
|
|
* @param tips a collection of TIPs
|
|
- * @param taskTracker the tracker that has requested a task to run
|
|
|
|
|
|
+ * @param ttStatus the status of tracker that has requested a task to run
|
|
|
|
+ * @param numUniqueHosts number of unique hosts that run trask trackers
|
|
* @param removeFailedTip whether to remove the failed tips
|
|
* @param removeFailedTip whether to remove the failed tips
|
|
*/
|
|
*/
|
|
private synchronized TaskInProgress findTaskFromList(
|
|
private synchronized TaskInProgress findTaskFromList(
|
|
- Collection<TaskInProgress> tips, String taskTracker, boolean removeFailedTip) {
|
|
|
|
|
|
+ Collection<TaskInProgress> tips, TaskTrackerStatus ttStatus,
|
|
|
|
+ int numUniqueHosts,
|
|
|
|
+ boolean removeFailedTip) {
|
|
Iterator<TaskInProgress> iter = tips.iterator();
|
|
Iterator<TaskInProgress> iter = tips.iterator();
|
|
while (iter.hasNext()) {
|
|
while (iter.hasNext()) {
|
|
TaskInProgress tip = iter.next();
|
|
TaskInProgress tip = iter.next();
|
|
@@ -960,8 +967,8 @@ class JobInProgress {
|
|
// (3) when the TIP is non-schedulable (running, killed, complete)
|
|
// (3) when the TIP is non-schedulable (running, killed, complete)
|
|
if (tip.isRunnable() && !tip.isRunning()) {
|
|
if (tip.isRunnable() && !tip.isRunning()) {
|
|
// check if the tip has failed on this host
|
|
// check if the tip has failed on this host
|
|
- if (!tip.hasFailedOnMachine(taskTracker) ||
|
|
|
|
- tip.getNumberOfFailedMachines() >= clusterSize) {
|
|
|
|
|
|
+ if (!tip.hasFailedOnMachine(ttStatus.getHost()) ||
|
|
|
|
+ tip.getNumberOfFailedMachines() >= numUniqueHosts) {
|
|
// check if the tip has failed on all the nodes
|
|
// check if the tip has failed on all the nodes
|
|
iter.remove();
|
|
iter.remove();
|
|
return tip;
|
|
return tip;
|
|
@@ -988,8 +995,8 @@ class JobInProgress {
|
|
* @return a tip that can be speculated on the tracker
|
|
* @return a tip that can be speculated on the tracker
|
|
*/
|
|
*/
|
|
private synchronized TaskInProgress findSpeculativeTask(
|
|
private synchronized TaskInProgress findSpeculativeTask(
|
|
- Collection<TaskInProgress> list, String taskTracker, double avgProgress,
|
|
|
|
- long currentTime, boolean shouldRemove) {
|
|
|
|
|
|
+ Collection<TaskInProgress> list, TaskTrackerStatus ttStatus,
|
|
|
|
+ double avgProgress, long currentTime, boolean shouldRemove) {
|
|
|
|
|
|
Iterator<TaskInProgress> iter = list.iterator();
|
|
Iterator<TaskInProgress> iter = list.iterator();
|
|
|
|
|
|
@@ -1001,7 +1008,8 @@ class JobInProgress {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
- if (!tip.hasRunOnMachine(taskTracker)) {
|
|
|
|
|
|
+ if (!tip.hasRunOnMachine(ttStatus.getHost(),
|
|
|
|
+ ttStatus.getTrackerName())) {
|
|
if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
|
|
if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
|
|
// In case of shared list we don't remove it. Since the TIP failed
|
|
// In case of shared list we don't remove it. Since the TIP failed
|
|
// on this tracker can be scheduled on some other tracker.
|
|
// on this tracker can be scheduled on some other tracker.
|
|
@@ -1026,11 +1034,13 @@ class JobInProgress {
|
|
* Find new map task
|
|
* Find new map task
|
|
* @param tts The task tracker that is asking for a task
|
|
* @param tts The task tracker that is asking for a task
|
|
* @param clusterSize The number of task trackers in the cluster
|
|
* @param clusterSize The number of task trackers in the cluster
|
|
|
|
+ * @param numUniqueHosts The number of hosts that run task trackers
|
|
* @param avgProgress The average progress of this kind of task in this job
|
|
* @param avgProgress The average progress of this kind of task in this job
|
|
* @return the index in tasks of the selected task (or -1 for no task)
|
|
* @return the index in tasks of the selected task (or -1 for no task)
|
|
*/
|
|
*/
|
|
private synchronized int findNewMapTask(TaskTrackerStatus tts,
|
|
private synchronized int findNewMapTask(TaskTrackerStatus tts,
|
|
int clusterSize,
|
|
int clusterSize,
|
|
|
|
+ int numUniqueHosts,
|
|
double avgProgress) {
|
|
double avgProgress) {
|
|
String taskTracker = tts.getTrackerName();
|
|
String taskTracker = tts.getTrackerName();
|
|
TaskInProgress tip = null;
|
|
TaskInProgress tip = null;
|
|
@@ -1073,7 +1083,8 @@ class JobInProgress {
|
|
for (int level = 0; level < maxLevel; ++level) {
|
|
for (int level = 0; level < maxLevel; ++level) {
|
|
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
|
|
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
|
|
if (cacheForLevel != null) {
|
|
if (cacheForLevel != null) {
|
|
- tip = findTaskFromList(cacheForLevel, taskTracker, level == 0);
|
|
|
|
|
|
+ tip = findTaskFromList(cacheForLevel, tts,
|
|
|
|
+ numUniqueHosts,level == 0);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
// Add to running cache
|
|
// Add to running cache
|
|
scheduleMap(tip);
|
|
scheduleMap(tip);
|
|
@@ -1122,7 +1133,7 @@ class JobInProgress {
|
|
|
|
|
|
List<TaskInProgress> cache = nonRunningMapCache.get(parent);
|
|
List<TaskInProgress> cache = nonRunningMapCache.get(parent);
|
|
if (cache != null) {
|
|
if (cache != null) {
|
|
- tip = findTaskFromList(cache, taskTracker, false);
|
|
|
|
|
|
+ tip = findTaskFromList(cache, tts, numUniqueHosts, false);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
// Add to the running cache
|
|
// Add to the running cache
|
|
scheduleMap(tip);
|
|
scheduleMap(tip);
|
|
@@ -1138,7 +1149,7 @@ class JobInProgress {
|
|
}
|
|
}
|
|
|
|
|
|
// 3. Search non-local tips for a new task
|
|
// 3. Search non-local tips for a new task
|
|
- tip = findTaskFromList(nonLocalMaps, taskTracker, false);
|
|
|
|
|
|
+ tip = findTaskFromList(nonLocalMaps, tts, numUniqueHosts, false);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
// Add to the running list
|
|
// Add to the running list
|
|
scheduleMap(tip);
|
|
scheduleMap(tip);
|
|
@@ -1160,7 +1171,7 @@ class JobInProgress {
|
|
for (int level = 0; level < maxLevel; ++level) {
|
|
for (int level = 0; level < maxLevel; ++level) {
|
|
Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
|
|
Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
|
|
if (cacheForLevel != null) {
|
|
if (cacheForLevel != null) {
|
|
- tip = findSpeculativeTask(cacheForLevel, taskTracker,
|
|
|
|
|
|
+ tip = findSpeculativeTask(cacheForLevel, tts,
|
|
avgProgress, currentTime, level == 0);
|
|
avgProgress, currentTime, level == 0);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
if (cacheForLevel.size() == 0) {
|
|
if (cacheForLevel.size() == 0) {
|
|
@@ -1193,7 +1204,7 @@ class JobInProgress {
|
|
|
|
|
|
Set<TaskInProgress> cache = runningMapCache.get(parent);
|
|
Set<TaskInProgress> cache = runningMapCache.get(parent);
|
|
if (cache != null) {
|
|
if (cache != null) {
|
|
- tip = findSpeculativeTask(cache, taskTracker, avgProgress,
|
|
|
|
|
|
+ tip = findSpeculativeTask(cache, tts, avgProgress,
|
|
currentTime, false);
|
|
currentTime, false);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
// remove empty cache entries
|
|
// remove empty cache entries
|
|
@@ -1208,7 +1219,7 @@ class JobInProgress {
|
|
}
|
|
}
|
|
|
|
|
|
// 3. Check non-local tips for speculation
|
|
// 3. Check non-local tips for speculation
|
|
- tip = findSpeculativeTask(nonLocalRunningMaps, taskTracker, avgProgress,
|
|
|
|
|
|
+ tip = findSpeculativeTask(nonLocalRunningMaps, tts, avgProgress,
|
|
currentTime, false);
|
|
currentTime, false);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
LOG.info("Choosing a non-local task " + tip.getTIPId()
|
|
LOG.info("Choosing a non-local task " + tip.getTIPId()
|
|
@@ -1223,11 +1234,13 @@ class JobInProgress {
|
|
* Find new reduce task
|
|
* Find new reduce task
|
|
* @param tts The task tracker that is asking for a task
|
|
* @param tts The task tracker that is asking for a task
|
|
* @param clusterSize The number of task trackers in the cluster
|
|
* @param clusterSize The number of task trackers in the cluster
|
|
|
|
+ * @param numUniqueHosts The number of hosts that run task trackers
|
|
* @param avgProgress The average progress of this kind of task in this job
|
|
* @param avgProgress The average progress of this kind of task in this job
|
|
* @return the index in tasks of the selected task (or -1 for no task)
|
|
* @return the index in tasks of the selected task (or -1 for no task)
|
|
*/
|
|
*/
|
|
private synchronized int findNewReduceTask(TaskTrackerStatus tts,
|
|
private synchronized int findNewReduceTask(TaskTrackerStatus tts,
|
|
int clusterSize,
|
|
int clusterSize,
|
|
|
|
+ int numUniqueHosts,
|
|
double avgProgress) {
|
|
double avgProgress) {
|
|
String taskTracker = tts.getTrackerName();
|
|
String taskTracker = tts.getTrackerName();
|
|
TaskInProgress tip = null;
|
|
TaskInProgress tip = null;
|
|
@@ -1241,7 +1254,7 @@ class JobInProgress {
|
|
|
|
|
|
// 1. check for a never-executed reduce tip
|
|
// 1. check for a never-executed reduce tip
|
|
// reducers don't have a cache and so pass -1 to explicitly call that out
|
|
// reducers don't have a cache and so pass -1 to explicitly call that out
|
|
- tip = findTaskFromList(nonRunningReduces, taskTracker, false);
|
|
|
|
|
|
+ tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
scheduleReduce(tip);
|
|
scheduleReduce(tip);
|
|
return tip.getIdWithinJob();
|
|
return tip.getIdWithinJob();
|
|
@@ -1249,7 +1262,7 @@ class JobInProgress {
|
|
|
|
|
|
// 2. check for a reduce tip to be speculated
|
|
// 2. check for a reduce tip to be speculated
|
|
if (hasSpeculativeReduces) {
|
|
if (hasSpeculativeReduces) {
|
|
- tip = findSpeculativeTask(runningReduces, taskTracker, avgProgress,
|
|
|
|
|
|
+ tip = findSpeculativeTask(runningReduces, tts, avgProgress,
|
|
System.currentTimeMillis(), false);
|
|
System.currentTimeMillis(), false);
|
|
if (tip != null) {
|
|
if (tip != null) {
|
|
scheduleReduce(tip);
|
|
scheduleReduce(tip);
|
|
@@ -1441,14 +1454,15 @@ class JobInProgress {
|
|
* obtain the map task's output.
|
|
* obtain the map task's output.
|
|
*/
|
|
*/
|
|
private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
|
|
private void failedTask(TaskInProgress tip, TaskAttemptID taskid,
|
|
- TaskStatus status, String trackerName,
|
|
|
|
|
|
+ TaskStatus status,
|
|
|
|
+ TaskTrackerStatus taskTrackerStatus,
|
|
boolean wasRunning, boolean wasComplete,
|
|
boolean wasRunning, boolean wasComplete,
|
|
JobTrackerMetrics metrics) {
|
|
JobTrackerMetrics metrics) {
|
|
// check if the TIP is already failed
|
|
// check if the TIP is already failed
|
|
boolean wasFailed = tip.isFailed();
|
|
boolean wasFailed = tip.isFailed();
|
|
|
|
|
|
// Mark the taskid as FAILED or KILLED
|
|
// Mark the taskid as FAILED or KILLED
|
|
- tip.incompleteSubTask(taskid, trackerName, this.status);
|
|
|
|
|
|
+ tip.incompleteSubTask(taskid, taskTrackerStatus, this.status);
|
|
|
|
|
|
boolean isRunning = tip.isRunning();
|
|
boolean isRunning = tip.isRunning();
|
|
boolean isComplete = tip.isComplete();
|
|
boolean isComplete = tip.isComplete();
|
|
@@ -1490,8 +1504,8 @@ class JobInProgress {
|
|
}
|
|
}
|
|
|
|
|
|
// update job history
|
|
// update job history
|
|
- String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
|
|
|
|
- status.getTaskTracker()).getHost()).toString();
|
|
|
|
|
|
+ String taskTrackerName = jobtracker.getNode(
|
|
|
|
+ taskTrackerStatus.getHost()).toString();
|
|
if (status.getIsMap()) {
|
|
if (status.getIsMap()) {
|
|
JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(),
|
|
JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(),
|
|
taskTrackerName);
|
|
taskTrackerName);
|
|
@@ -1526,7 +1540,7 @@ class JobInProgress {
|
|
// Note down that a task has failed on this tasktracker
|
|
// Note down that a task has failed on this tasktracker
|
|
//
|
|
//
|
|
if (status.getRunState() == TaskStatus.State.FAILED) {
|
|
if (status.getRunState() == TaskStatus.State.FAILED) {
|
|
- addTrackerTaskFailure(trackerName);
|
|
|
|
|
|
+ addTrackerTaskFailure(taskTrackerStatus.getTrackerName());
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|