|
@@ -38,7 +38,7 @@ import java.util.*;
|
|
|
///////////////////////////////////////////////////////
|
|
|
class JobInProgress {
|
|
|
private static final Log LOG = LogFactory.getLog("org.apache.hadoop.mapred.JobInProgress");
|
|
|
-
|
|
|
+
|
|
|
JobProfile profile;
|
|
|
JobStatus status;
|
|
|
Path localJobFile = null;
|
|
@@ -57,8 +57,15 @@ class JobInProgress {
|
|
|
JobTracker jobtracker = null;
|
|
|
Map<String,List<TaskInProgress>> hostToMaps = new HashMap();
|
|
|
private int taskCompletionEventTracker = 0 ;
|
|
|
- List<TaskCompletionEvent> taskCompletionEvents ;
|
|
|
-
|
|
|
+ List<TaskCompletionEvent> taskCompletionEvents ;
|
|
|
+
|
|
|
+ // The no. of tasktrackers where >= conf.getMaxTaskFailuresPerTracker()
|
|
|
+ // tasks have failed
|
|
|
+ private volatile int flakyTaskTrackers = 0;
|
|
|
+ // Map of trackerHostName -> no. of task failures
|
|
|
+ private Map<String, Integer> trackerToFailuresMap =
|
|
|
+ new TreeMap<String, Integer>();
|
|
|
+
|
|
|
long startTime;
|
|
|
long finishTime;
|
|
|
|
|
@@ -102,6 +109,7 @@ class JobInProgress {
|
|
|
this.numReduceTasks = conf.getNumReduceTasks();
|
|
|
this.taskCompletionEvents = new ArrayList(
|
|
|
numMapTasks + numReduceTasks + 10);
|
|
|
+
|
|
|
JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(),
|
|
|
System.currentTimeMillis(), jobFile);
|
|
|
|
|
@@ -373,6 +381,61 @@ class JobInProgress {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
+ private String convertTrackerNameToHostName(String trackerName) {
|
|
|
+ // Ugly!
|
|
|
+ // Convert the trackerName to it's host name
|
|
|
+ int indexOfColon = trackerName.indexOf(":");
|
|
|
+ String trackerHostName = (indexOfColon == -1) ?
|
|
|
+ trackerName :
|
|
|
+ trackerName.substring(0, indexOfColon);
|
|
|
+ return trackerHostName;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addTrackerTaskFailure(String trackerName) {
|
|
|
+ String trackerHostName = convertTrackerNameToHostName(trackerName);
|
|
|
+
|
|
|
+ Integer trackerFailures = trackerToFailuresMap.get(trackerHostName);
|
|
|
+ if (trackerFailures == null) {
|
|
|
+ trackerFailures = new Integer(0);
|
|
|
+ }
|
|
|
+ trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
|
|
|
+
|
|
|
+ // Check if this tasktracker has turned 'flaky'
|
|
|
+ if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
|
|
|
+ ++flakyTaskTrackers;
|
|
|
+ LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private int getTrackerTaskFailures(String trackerName) {
|
|
|
+ String trackerHostName = convertTrackerNameToHostName(trackerName);
|
|
|
+ Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
|
|
|
+ return (failedTasks != null) ? failedTasks.intValue() : 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the no. of 'flaky' tasktrackers for a given job.
|
|
|
+ *
|
|
|
+ * @return the no. of 'flaky' tasktrackers for a given job.
|
|
|
+ */
|
|
|
+ int getNoOfBlackListedTrackers() {
|
|
|
+ return flakyTaskTrackers;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Get the information on tasktrackers and no. of errors which occurred
|
|
|
+ * on them for a given job.
|
|
|
+ *
|
|
|
+ * @return the map of tasktrackers and no. of errors which occurred
|
|
|
+ * on them for a given job.
|
|
|
+ */
|
|
|
+ synchronized Map<String, Integer> getTaskTrackerErrors() {
|
|
|
+ // Clone the 'trackerToFailuresMap' and return the copy
|
|
|
+ Map<String, Integer> trackerErrors =
|
|
|
+ new TreeMap<String, Integer>(trackerToFailuresMap);
|
|
|
+ return trackerErrors;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Find a new task to run.
|
|
|
* @param tts The task tracker that is asking for a task
|
|
@@ -389,6 +452,25 @@ class JobInProgress {
|
|
|
TaskInProgress[] tasks,
|
|
|
List cachedTasks) {
|
|
|
String taskTracker = tts.getTrackerName();
|
|
|
+
|
|
|
+ //
|
|
|
+ // Check if too many tasks of this job have failed on this
|
|
|
+ // tasktracker prior to assigning it a new one.
|
|
|
+ //
|
|
|
+ int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
|
|
|
+ if (taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
|
|
|
+ String flakyTracker = convertTrackerNameToHostName(taskTracker);
|
|
|
+ if (flakyTaskTrackers < clusterSize) {
|
|
|
+ LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
|
|
|
+ + "' for assigning a new task");
|
|
|
+ return -1;
|
|
|
+ } else {
|
|
|
+ LOG.warn("Trying to assign a new task for black-listed tracker " +
|
|
|
+ flakyTracker + " since all task-trackers in the cluster are " +
|
|
|
+ "'flaky' !");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
// See if there is a split over a block that is stored on
|
|
|
// the TaskTracker checking in. That means the block
|
|
@@ -647,6 +729,11 @@ class JobInProgress {
|
|
|
failedReduceTasks++;
|
|
|
}
|
|
|
|
|
|
+ //
|
|
|
+ // Note down that a task has failed on this tasktracker
|
|
|
+ //
|
|
|
+ addTrackerTaskFailure(trackerName);
|
|
|
+
|
|
|
//
|
|
|
// Let the JobTracker know that this task has failed
|
|
|
//
|