|
@@ -21,6 +21,7 @@ import java.io.IOException;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.EnumMap;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
@@ -31,6 +32,7 @@ import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.SortedSet;
|
|
|
import java.util.TreeMap;
|
|
|
import java.util.TreeSet;
|
|
|
import java.util.Vector;
|
|
@@ -79,8 +81,8 @@ public class JobInProgress {
|
|
|
/**
|
|
|
* Used when the a kill is issued to a job which is initializing.
|
|
|
*/
|
|
|
+ @SuppressWarnings("serial")
|
|
|
static class KillInterruptedException extends InterruptedException {
|
|
|
- private static final long serialVersionUID = 1L;
|
|
|
public KillInterruptedException(String msg) {
|
|
|
super(msg);
|
|
|
}
|
|
@@ -122,8 +124,8 @@ public class JobInProgress {
|
|
|
int speculativeMapTasks = 0;
|
|
|
int speculativeReduceTasks = 0;
|
|
|
|
|
|
- int mapFailuresPercent = 0;
|
|
|
- int reduceFailuresPercent = 0;
|
|
|
+ final int mapFailuresPercent;
|
|
|
+ final int reduceFailuresPercent;
|
|
|
int failedMapTIPs = 0;
|
|
|
int failedReduceTIPs = 0;
|
|
|
private volatile boolean launchedCleanup = false;
|
|
@@ -142,14 +144,17 @@ public class JobInProgress {
|
|
|
// Map of NetworkTopology Node to set of running TIPs
|
|
|
Map<Node, Set<TaskInProgress>> runningMapCache;
|
|
|
|
|
|
- // A list of non-local non-running maps
|
|
|
- List<TaskInProgress> nonLocalMaps;
|
|
|
+ // A list of non-local, non-running maps
|
|
|
+ final List<TaskInProgress> nonLocalMaps;
|
|
|
+
|
|
|
+ // Set of failed, non-running maps sorted by #failures
|
|
|
+ final SortedSet<TaskInProgress> failedMaps;
|
|
|
|
|
|
// A set of non-local running maps
|
|
|
Set<TaskInProgress> nonLocalRunningMaps;
|
|
|
|
|
|
// A list of non-running reduce TIPs
|
|
|
- List<TaskInProgress> nonRunningReduces;
|
|
|
+ Set<TaskInProgress> nonRunningReduces;
|
|
|
|
|
|
// A set of running reduce TIPs
|
|
|
Set<TaskInProgress> runningReduces;
|
|
@@ -160,6 +165,16 @@ public class JobInProgress {
|
|
|
// A list of cleanup tasks for the reduce task attempts, to be launched
|
|
|
List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
|
|
|
|
|
|
+ // keep failedMaps, nonRunningReduces ordered by failure count to bias
|
|
|
+ // scheduling toward failing tasks
|
|
|
+ private static final Comparator<TaskInProgress> failComparator =
|
|
|
+ new Comparator<TaskInProgress>() {
|
|
|
+ @Override
|
|
|
+ public int compare(TaskInProgress t1, TaskInProgress t2) {
|
|
|
+ return t2.numTaskFailures() - t1.numTaskFailures();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
private final int maxLevel;
|
|
|
|
|
|
/**
|
|
@@ -223,7 +238,7 @@ public class JobInProgress {
|
|
|
private final int restartCount;
|
|
|
|
|
|
private JobConf conf;
|
|
|
- AtomicBoolean tasksInited = new AtomicBoolean(false);
|
|
|
+ volatile boolean tasksInited = false;
|
|
|
private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
|
|
|
|
|
|
private LocalFileSystem localFs;
|
|
@@ -315,9 +330,10 @@ public class JobInProgress {
|
|
|
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
|
|
|
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
|
|
|
this.nonLocalMaps = new LinkedList<TaskInProgress>();
|
|
|
+ this.failedMaps = new TreeSet<TaskInProgress>(failComparator);
|
|
|
this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
|
|
|
this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
|
|
|
- this.nonRunningReduces = new LinkedList<TaskInProgress>();
|
|
|
+ this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
|
|
|
this.runningReduces = new LinkedHashSet<TaskInProgress>();
|
|
|
this.resourceEstimator = new ResourceEstimator(this);
|
|
|
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
|
|
@@ -327,6 +343,8 @@ public class JobInProgress {
|
|
|
this.memoryPerMap = conf.getMemoryForMapTask();
|
|
|
this.memoryPerReduce = conf.getMemoryForReduceTask();
|
|
|
this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
|
|
|
+ this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
|
|
|
+ this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
|
|
|
|
|
|
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
|
|
|
(numMapTasks + numReduceTasks + 10);
|
|
@@ -427,9 +445,10 @@ public class JobInProgress {
|
|
|
this.maxLevel = jobtracker.getNumTaskCacheLevels();
|
|
|
this.anyCacheLevel = this.maxLevel+1;
|
|
|
this.nonLocalMaps = new LinkedList<TaskInProgress>();
|
|
|
+ this.failedMaps = new TreeSet<TaskInProgress>(failComparator);
|
|
|
this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
|
|
|
this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
|
|
|
- this.nonRunningReduces = new LinkedList<TaskInProgress>();
|
|
|
+ this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
|
|
|
this.runningReduces = new LinkedHashSet<TaskInProgress>();
|
|
|
this.resourceEstimator = new ResourceEstimator(this);
|
|
|
this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit",
|
|
@@ -541,7 +560,7 @@ public class JobInProgress {
|
|
|
* <code>false</code> otherwise
|
|
|
*/
|
|
|
public boolean inited() {
|
|
|
- return tasksInited.get();
|
|
|
+ return tasksInited;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -612,7 +631,7 @@ public class JobInProgress {
|
|
|
*/
|
|
|
public synchronized void initTasks()
|
|
|
throws IOException, KillInterruptedException {
|
|
|
- if (tasksInited.get() || isComplete()) {
|
|
|
+ if (tasksInited || isComplete()) {
|
|
|
return;
|
|
|
}
|
|
|
synchronized(jobInitKillStatus){
|
|
@@ -740,7 +759,7 @@ public class JobInProgress {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- tasksInited.set(true);
|
|
|
+ tasksInited = true;
|
|
|
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
|
|
|
numMapTasks, numReduceTasks);
|
|
|
|
|
@@ -748,7 +767,7 @@ public class JobInProgress {
|
|
|
LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
|
|
|
+ " map tasks and " + numReduceTasks + " reduce tasks.");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
|
|
|
throws IOException {
|
|
|
TaskSplitMetaInfo[] allTaskSplitMetaInfo =
|
|
@@ -1267,7 +1286,7 @@ public class JobInProgress {
|
|
|
public Task obtainTaskCleanupTask(TaskTrackerStatus tts,
|
|
|
boolean isMapSlot)
|
|
|
throws IOException {
|
|
|
- if (!tasksInited.get()) {
|
|
|
+ if (!tasksInited) {
|
|
|
return null;
|
|
|
}
|
|
|
synchronized (this) {
|
|
@@ -1303,7 +1322,7 @@ public class JobInProgress {
|
|
|
int clusterSize,
|
|
|
int numUniqueHosts)
|
|
|
throws IOException {
|
|
|
- if (!tasksInited.get()) {
|
|
|
+ if (!tasksInited) {
|
|
|
LOG.info("Cannot create task split for " + profile.getJobID());
|
|
|
try { throw new IOException("state = " + status.getRunState()); }
|
|
|
catch (IOException ioe) {ioe.printStackTrace();}
|
|
@@ -1329,7 +1348,7 @@ public class JobInProgress {
|
|
|
int clusterSize,
|
|
|
int numUniqueHosts)
|
|
|
throws IOException {
|
|
|
- if (!tasksInited.get()) {
|
|
|
+ if (!tasksInited) {
|
|
|
LOG.info("Cannot create task split for " + profile.getJobID());
|
|
|
try { throw new IOException("state = " + status.getRunState()); }
|
|
|
catch (IOException ioe) {ioe.printStackTrace();}
|
|
@@ -1370,13 +1389,12 @@ public class JobInProgress {
|
|
|
|
|
|
/**
|
|
|
* Check if we can schedule an off-switch task for this job.
|
|
|
+ * @param numTaskTrackers TaskTrackers in the cluster.
|
|
|
* @param numTaskTrackers number of tasktrackers
|
|
|
- *
|
|
|
- * We check the number of missed opportunities for the job.
|
|
|
- * If it has 'waited' long enough we go ahead and schedule.
|
|
|
- *
|
|
|
* @return <code>true</code> if we can schedule off-switch,
|
|
|
* <code>false</code> otherwise
|
|
|
+ * We check the number of missed opportunities for the job.
|
|
|
+ * If it has 'waited' long enough we go ahead and schedule.
|
|
|
*/
|
|
|
public boolean scheduleOffSwitch(int numTaskTrackers) {
|
|
|
long missedTaskTrackers = getNumSchedulingOpportunities();
|
|
@@ -1395,7 +1413,7 @@ public class JobInProgress {
|
|
|
int numUniqueHosts,
|
|
|
boolean isMapSlot
|
|
|
) throws IOException {
|
|
|
- if(!tasksInited.get()) {
|
|
|
+ if(!tasksInited) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -1487,7 +1505,7 @@ public class JobInProgress {
|
|
|
int numUniqueHosts,
|
|
|
boolean isMapSlot
|
|
|
) throws IOException {
|
|
|
- if(!tasksInited.get()) {
|
|
|
+ if(!tasksInited) {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -1537,7 +1555,7 @@ public class JobInProgress {
|
|
|
* @return true/false
|
|
|
*/
|
|
|
private synchronized boolean canLaunchSetupTask() {
|
|
|
- return (tasksInited.get() && status.getRunState() == JobStatus.PREP &&
|
|
|
+ return (tasksInited && status.getRunState() == JobStatus.PREP &&
|
|
|
!launchedSetup && !jobKilled && !jobFailed);
|
|
|
}
|
|
|
|
|
@@ -2002,37 +2020,14 @@ public class JobInProgress {
|
|
|
* @param tip the tip that needs to be failed
|
|
|
*/
|
|
|
private synchronized void failMap(TaskInProgress tip) {
|
|
|
- if (nonRunningMapCache == null) {
|
|
|
- LOG.warn("Non-running cache for maps missing!! "
|
|
|
- + "Job details are missing.");
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // 1. Its added everywhere since other nodes (having this split local)
|
|
|
- // might have removed this tip from their local cache
|
|
|
- // 2. Give high priority to failed tip - fail early
|
|
|
-
|
|
|
- String[] splitLocations = tip.getSplitLocations();
|
|
|
-
|
|
|
- // Add the TIP in the front of the list for non-local non-running maps
|
|
|
- if (splitLocations == null || splitLocations.length == 0) {
|
|
|
- nonLocalMaps.add(0, tip);
|
|
|
+ if (failedMaps == null) {
|
|
|
+ LOG.warn("Failed cache for maps is missing! Job details are missing.");
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- for(String host: splitLocations) {
|
|
|
- Node node = jobtracker.getNode(host);
|
|
|
-
|
|
|
- for (int j = 0; j < maxLevel; ++j) {
|
|
|
- List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
|
|
|
- if (hostMaps == null) {
|
|
|
- hostMaps = new LinkedList<TaskInProgress>();
|
|
|
- nonRunningMapCache.put(node, hostMaps);
|
|
|
- }
|
|
|
- hostMaps.add(0, tip);
|
|
|
- node = node.getParent();
|
|
|
- }
|
|
|
- }
|
|
|
+ // Ignore locality for subsequent scheduling on this TIP. Always schedule
|
|
|
+ // it ahead of other tasks.
|
|
|
+ failedMaps.add(tip);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2045,7 +2040,7 @@ public class JobInProgress {
|
|
|
+ "Job details are missing.");
|
|
|
return;
|
|
|
}
|
|
|
- nonRunningReduces.add(0, tip);
|
|
|
+ nonRunningReduces.add(tip);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -2189,24 +2184,33 @@ public class JobInProgress {
|
|
|
}
|
|
|
|
|
|
|
|
|
- // For scheduling a map task, we have two caches and a list (optional)
|
|
|
- // I) one for non-running task
|
|
|
- // II) one for running task (this is for handling speculation)
|
|
|
- // III) a list of TIPs that have empty locations (e.g., dummy splits),
|
|
|
- // the list is empty if all TIPs have associated locations
|
|
|
+ // When scheduling a map task:
|
|
|
+ // 0) Schedule a failed task without considering locality
|
|
|
+ // 1) Schedule non-running tasks
|
|
|
+ // 2) Schedule speculative tasks
|
|
|
+ // 3) Schedule tasks with no location information
|
|
|
|
|
|
// First a look up is done on the non-running cache and on a miss, a look
|
|
|
// up is done on the running cache. The order for lookup within the cache:
|
|
|
// 1. from local node to root [bottom up]
|
|
|
// 2. breadth wise for all the parent nodes at max level
|
|
|
-
|
|
|
- // We fall to linear scan of the list (III above) if we have misses in the
|
|
|
+ // We fall to linear scan of the list ((3) above) if we have misses in the
|
|
|
// above caches
|
|
|
|
|
|
+ // 0) Schedule the task with the most failures, unless failure was on this
|
|
|
+ // machine
|
|
|
+ tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
|
|
|
+ if (tip != null) {
|
|
|
+ // Add to the running list
|
|
|
+ scheduleMap(tip);
|
|
|
+ LOG.info("Choosing a failed task " + tip.getTIPId());
|
|
|
+ return tip.getIdWithinJob();
|
|
|
+ }
|
|
|
+
|
|
|
Node node = jobtracker.getNode(tts.getHost());
|
|
|
|
|
|
//
|
|
|
- // I) Non-running TIP :
|
|
|
+ // 1) Non-running TIP :
|
|
|
//
|
|
|
|
|
|
// 1. check from local node to the root [bottom up cache lookup]
|
|
@@ -2216,10 +2220,10 @@ public class JobInProgress {
|
|
|
Node key = node;
|
|
|
int level = 0;
|
|
|
// maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
|
|
|
- // called to schedule any task (local, rack-local, off-switch or speculative)
|
|
|
- // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
|
|
|
- // (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
|
|
|
- // tasks
|
|
|
+ // called to schedule any task (local, rack-local, off-switch or
|
|
|
+ // speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
|
|
|
+ // findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
|
|
|
+ // off-switch/speculative tasks
|
|
|
int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
|
|
|
for (level = 0;level < maxLevelToSchedule; ++level) {
|
|
|
List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
|
|
@@ -2295,7 +2299,7 @@ public class JobInProgress {
|
|
|
}
|
|
|
|
|
|
//
|
|
|
- // II) Running TIP :
|
|
|
+ // 2) Running TIP :
|
|
|
//
|
|
|
|
|
|
if (hasSpeculativeMaps) {
|
|
@@ -2702,7 +2706,7 @@ public class JobInProgress {
|
|
|
* @param jobTerminationState job termination state
|
|
|
*/
|
|
|
private synchronized void terminate(int jobTerminationState) {
|
|
|
- if(!tasksInited.get()) {
|
|
|
+ if(!tasksInited) {
|
|
|
//init could not be done, we just terminate directly.
|
|
|
terminateJob(jobTerminationState);
|
|
|
return;
|
|
@@ -3088,6 +3092,7 @@ public class JobInProgress {
|
|
|
|
|
|
cleanUpMetrics();
|
|
|
// free up the memory used by the data structures
|
|
|
+ this.failedMaps.clear();
|
|
|
this.nonRunningMapCache = null;
|
|
|
this.runningMapCache = null;
|
|
|
this.nonRunningReduces = null;
|
|
@@ -3184,9 +3189,7 @@ public class JobInProgress {
|
|
|
|
|
|
float failureRate = (float)fetchFailures / runningReduceTasks;
|
|
|
// declare faulty if fetch-failures >= max-allowed-failures
|
|
|
- boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT)
|
|
|
- ? true
|
|
|
- : false;
|
|
|
+ boolean isMapFaulty = failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT;
|
|
|
if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
|
|
|
&& isMapFaulty) {
|
|
|
LOG.info("Too many fetch-failures for output of task: " + mapTaskId
|