|
@@ -20,10 +20,14 @@ package org.apache.hadoop.mapred;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Collection;
|
|
import java.util.IdentityHashMap;
|
|
import java.util.IdentityHashMap;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
|
|
+import java.util.LinkedHashSet;
|
|
|
|
+import java.util.LinkedList;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
|
|
+import java.util.Set;
|
|
import java.util.TreeMap;
|
|
import java.util.TreeMap;
|
|
import java.util.Vector;
|
|
import java.util.Vector;
|
|
|
|
|
|
@@ -77,8 +81,23 @@ class JobInProgress {
|
|
JobTracker jobtracker = null;
|
|
JobTracker jobtracker = null;
|
|
|
|
|
|
// NetworkTopology Node to the set of TIPs
|
|
// NetworkTopology Node to the set of TIPs
|
|
- Map<Node, List<TaskInProgress>> nodesToMaps;
|
|
|
|
|
|
+ Map<Node, List<TaskInProgress>> nonRunningMapCache;
|
|
|
|
|
|
|
|
+ // Map of NetworkTopology Node to set of running TIPs
|
|
|
|
+ Map<Node, Set<TaskInProgress>> runningMapCache;
|
|
|
|
+
|
|
|
|
+ // A list of non-local non-running maps
|
|
|
|
+ List<TaskInProgress> nonLocalMaps;
|
|
|
|
+
|
|
|
|
+ // A set of non-local running maps
|
|
|
|
+ Set<TaskInProgress> nonLocalRunningMaps;
|
|
|
|
+
|
|
|
|
+ // A list of non-running reduce TIPs
|
|
|
|
+ List<TaskInProgress> nonRunningReduces;
|
|
|
|
+
|
|
|
|
+ // A set of running reduce TIPs
|
|
|
|
+ Set<TaskInProgress> runningReduces;
|
|
|
|
+
|
|
private int maxLevel;
|
|
private int maxLevel;
|
|
|
|
|
|
private int taskCompletionEventTracker = 0;
|
|
private int taskCompletionEventTracker = 0;
|
|
@@ -183,6 +202,12 @@ class JobInProgress {
|
|
this.jobMetrics.setTag("jobId", jobid);
|
|
this.jobMetrics.setTag("jobId", jobid);
|
|
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
|
|
hasSpeculativeMaps = conf.getMapSpeculativeExecution();
|
|
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
|
|
hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
|
|
|
|
+ this.maxLevel = jobtracker.getNumTaskCacheLevels();
|
|
|
|
+ this.nonLocalMaps = new LinkedList<TaskInProgress>();
|
|
|
|
+ this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
|
|
|
|
+ this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
|
|
|
|
+ this.nonRunningReduces = new LinkedList<TaskInProgress>();
|
|
|
|
+ this.runningReduces = new LinkedHashSet<TaskInProgress>();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -215,12 +240,6 @@ class JobInProgress {
|
|
jobMetrics.remove();
|
|
jobMetrics.remove();
|
|
}
|
|
}
|
|
|
|
|
|
- private Node getParentNode(Node node, int level) {
|
|
|
|
- for (int i = 0; node != null && i < level; i++) {
|
|
|
|
- node = node.getParent();
|
|
|
|
- }
|
|
|
|
- return node;
|
|
|
|
- }
|
|
|
|
private void printCache (Map<Node, List<TaskInProgress>> cache) {
|
|
private void printCache (Map<Node, List<TaskInProgress>> cache) {
|
|
LOG.info("The taskcache info:");
|
|
LOG.info("The taskcache info:");
|
|
for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
|
|
for (Map.Entry<Node, List<TaskInProgress>> n : cache.entrySet()) {
|
|
@@ -238,18 +257,16 @@ class JobInProgress {
|
|
new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
|
|
new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
|
|
|
|
|
|
for (int i = 0; i < splits.length; i++) {
|
|
for (int i = 0; i < splits.length; i++) {
|
|
- for(String host: splits[i].getLocations()) {
|
|
|
|
|
|
+ String[] splitLocations = splits[i].getLocations();
|
|
|
|
+ if (splitLocations.length == 0) {
|
|
|
|
+ nonLocalMaps.add(maps[i]);
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for(String host: splitLocations) {
|
|
Node node = jobtracker.resolveAndAddToTopology(host);
|
|
Node node = jobtracker.resolveAndAddToTopology(host);
|
|
- if (node == null) {
|
|
|
|
- continue;
|
|
|
|
- }
|
|
|
|
- if (node.getLevel() < maxLevel) {
|
|
|
|
- LOG.warn("Got a host whose level is: " + node.getLevel() +
|
|
|
|
- ". Should get at least a level of value: " + maxLevel);
|
|
|
|
- return null;
|
|
|
|
- }
|
|
|
|
for (int j = 0; j < maxLevel; j++) {
|
|
for (int j = 0; j < maxLevel; j++) {
|
|
- node = getParentNode(node, j);
|
|
|
|
|
|
+ node = JobTracker.getParentNode(node, j);
|
|
List<TaskInProgress> hostMaps = cache.get(node);
|
|
List<TaskInProgress> hostMaps = cache.get(node);
|
|
if (hostMaps == null) {
|
|
if (hostMaps == null) {
|
|
hostMaps = new ArrayList<TaskInProgress>();
|
|
hostMaps = new ArrayList<TaskInProgress>();
|
|
@@ -300,7 +317,7 @@ class JobInProgress {
|
|
jobtracker, conf, this, i);
|
|
jobtracker, conf, this, i);
|
|
}
|
|
}
|
|
if (numMapTasks > 0) {
|
|
if (numMapTasks > 0) {
|
|
- nodesToMaps = createCache(splits, (maxLevel = jobtracker.getNumTaskCacheLevels()));
|
|
|
|
|
|
+ nonRunningMapCache = createCache(splits, maxLevel);
|
|
}
|
|
}
|
|
|
|
|
|
// if no split is returned, job is considered completed and successful
|
|
// if no split is returned, job is considered completed and successful
|
|
@@ -331,6 +348,7 @@ class JobInProgress {
|
|
reduces[i] = new TaskInProgress(jobId, jobFile,
|
|
reduces[i] = new TaskInProgress(jobId, jobFile,
|
|
numMapTasks, i,
|
|
numMapTasks, i,
|
|
jobtracker, conf, this);
|
|
jobtracker, conf, this);
|
|
|
|
+ nonRunningReduces.add(reduces[i]);
|
|
}
|
|
}
|
|
|
|
|
|
// create job specific temporary directory in output path
|
|
// create job specific temporary directory in output path
|
|
@@ -633,8 +651,7 @@ class JobInProgress {
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
- int target = findNewTask(tts, clusterSize, status.mapProgress(),
|
|
|
|
- maps, nodesToMaps, hasSpeculativeMaps);
|
|
|
|
|
|
+ int target = findNewMapTask(tts, clusterSize, status.mapProgress());
|
|
if (target == -1) {
|
|
if (target == -1) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -669,8 +686,7 @@ class JobInProgress {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
|
|
- int target = findNewTask(tts, clusterSize, status.reduceProgress() ,
|
|
|
|
- reduces, null, hasSpeculativeReduces);
|
|
|
|
|
|
+ int target = findNewReduceTask(tts, clusterSize, status.reduceProgress());
|
|
if (target == -1) {
|
|
if (target == -1) {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
@@ -755,156 +771,507 @@ class JobInProgress {
|
|
new TreeMap<String, Integer>(trackerToFailuresMap);
|
|
new TreeMap<String, Integer>(trackerToFailuresMap);
|
|
return trackerErrors;
|
|
return trackerErrors;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Remove a map TIP from the lists for running maps.
|
|
|
|
+ * Called when a map fails/completes (note if a map is killed,
|
|
|
|
+ * it won't be present in the list since it was completed earlier)
|
|
|
|
+ * @param tip the tip that needs to be retired
|
|
|
|
+ */
|
|
|
|
+ private synchronized void retireMap(TaskInProgress tip) {
|
|
|
|
+ // Since a list for running maps is maintained if speculation is 'ON'
|
|
|
|
+ if (hasSpeculativeMaps) {
|
|
|
|
+ if (runningMapCache == null) {
|
|
|
|
+ LOG.warn("Running cache for maps missing!! "
|
|
|
|
+ + "Job details are missing.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ String[] splitLocations = tip.getSplitLocations();
|
|
|
|
+
|
|
|
|
+ // Remove the TIP from the list for running non-local maps
|
|
|
|
+ if (splitLocations.length == 0) {
|
|
|
|
+ nonLocalRunningMaps.remove(tip);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Remove from the running map caches
|
|
|
|
+ for(String host: splitLocations) {
|
|
|
|
+ Node node = jobtracker.getNode(host);
|
|
|
|
+
|
|
|
|
+ for (int j = 0; j < maxLevel; ++j) {
|
|
|
|
+ Set<TaskInProgress> hostMaps = runningMapCache.get(node);
|
|
|
|
+ if (hostMaps != null) {
|
|
|
|
+ hostMaps.remove(tip);
|
|
|
|
+ if (hostMaps.size() == 0) {
|
|
|
|
+ runningMapCache.remove(node);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ node = node.getParent();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Remove a reduce TIP from the list for running-reduces
|
|
|
|
+ * Called when a reduce fails/completes
|
|
|
|
+ * @param tip the tip that needs to be retired
|
|
|
|
+ */
|
|
|
|
+ private synchronized void retireReduce(TaskInProgress tip) {
|
|
|
|
+ // Since a list for running reduces is maintained if speculation is 'ON'
|
|
|
|
+ if (hasSpeculativeReduces) {
|
|
|
|
+ if (runningReduces == null) {
|
|
|
|
+ LOG.warn("Running list for reducers missing!! "
|
|
|
|
+ + "Job details are missing.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ runningReduces.remove(tip);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Adds a map tip to the list of running maps.
|
|
|
|
+ * @param tip the tip that needs to be scheduled as running
|
|
|
|
+ */
|
|
|
|
+ private synchronized void scheduleMap(TaskInProgress tip) {
|
|
|
|
|
|
- private boolean shouldRunSpeculativeTask(long currentTime,
|
|
|
|
- TaskInProgress task,
|
|
|
|
- double avgProgress,
|
|
|
|
- String taskTracker) {
|
|
|
|
- return task.hasSpeculativeTask(currentTime, avgProgress) &&
|
|
|
|
- !task.hasRunOnMachine(taskTracker);
|
|
|
|
|
|
+ // Since a running list is maintained only if speculation is 'ON'
|
|
|
|
+ if (hasSpeculativeMaps) {
|
|
|
|
+ if (runningMapCache == null) {
|
|
|
|
+ LOG.warn("Running cache for maps is missing!! "
|
|
|
|
+ + "Job details are missing.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ String[] splitLocations = tip.getSplitLocations();
|
|
|
|
+
|
|
|
|
+ // Add the TIP to the list of non-local running TIPs
|
|
|
|
+ if (splitLocations.length == 0) {
|
|
|
|
+ nonLocalRunningMaps.add(tip);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for(String host: splitLocations) {
|
|
|
|
+ Node node = jobtracker.getNode(host);
|
|
|
|
+
|
|
|
|
+ for (int j = 0; j < maxLevel; ++j) {
|
|
|
|
+ Set<TaskInProgress> hostMaps = runningMapCache.get(node);
|
|
|
|
+ if (hostMaps == null) {
|
|
|
|
+ // create a cache if needed
|
|
|
|
+ hostMaps = new LinkedHashSet<TaskInProgress>();
|
|
|
|
+ runningMapCache.put(node, hostMaps);
|
|
|
|
+ }
|
|
|
|
+ hostMaps.add(tip);
|
|
|
|
+ node = node.getParent();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Adds a reduce tip to the list of running reduces
|
|
|
|
+ * @param tip the tip that needs to be scheduled as running
|
|
|
|
+ */
|
|
|
|
+ private synchronized void scheduleReduce(TaskInProgress tip) {
|
|
|
|
+ // Since a list for running reduces is maintained if speculation is 'ON'
|
|
|
|
+ if (hasSpeculativeReduces) {
|
|
|
|
+ if (runningReduces == null) {
|
|
|
|
+ LOG.warn("Running cache for reducers missing!! "
|
|
|
|
+ + "Job details are missing.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ runningReduces.add(tip);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Find a new task to run.
|
|
|
|
|
|
+ * Adds the failed TIP in the front of the list for non-running maps
|
|
|
|
+ * @param tip the tip that needs to be failed
|
|
|
|
+ */
|
|
|
|
+ private synchronized void failMap(TaskInProgress tip) {
|
|
|
|
+ if (nonRunningMapCache == null) {
|
|
|
|
+ LOG.warn("Non-running cache for maps missing!! "
|
|
|
|
+ + "Job details are missing.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 1. Its added everywhere since other nodes (having this split local)
|
|
|
|
+ // might have removed this tip from their local cache
|
|
|
|
+ // 2. Give high priority to failed tip - fail early
|
|
|
|
+
|
|
|
|
+ String[] splitLocations = tip.getSplitLocations();
|
|
|
|
+
|
|
|
|
+ // Add the TIP in the front of the list for non-local non-running maps
|
|
|
|
+ if (splitLocations.length == 0) {
|
|
|
|
+ nonLocalMaps.add(0, tip);
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for(String host: splitLocations) {
|
|
|
|
+ Node node = jobtracker.getNode(host);
|
|
|
|
+
|
|
|
|
+ for (int j = 0; j < maxLevel; ++j) {
|
|
|
|
+ List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
|
|
|
|
+ if (hostMaps == null) {
|
|
|
|
+ hostMaps = new LinkedList<TaskInProgress>();
|
|
|
|
+ nonRunningMapCache.put(node, hostMaps);
|
|
|
|
+ }
|
|
|
|
+ hostMaps.add(0, tip);
|
|
|
|
+ node = node.getParent();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Adds a failed TIP in the front of the list for non-running reduces
|
|
|
|
+ * @param tip the tip that needs to be failed
|
|
|
|
+ */
|
|
|
|
+ private synchronized void failReduce(TaskInProgress tip) {
|
|
|
|
+ if (nonRunningReduces == null) {
|
|
|
|
+ LOG.warn("Failed cache for reducers missing!! "
|
|
|
|
+ + "Job details are missing.");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ nonRunningReduces.add(0, tip);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Find a non-running task in the passed list of TIPs
|
|
|
|
+ * @param tips a collection of TIPs
|
|
|
|
+ * @param taskTracker the tracker that has requested a task to run
|
|
|
|
+ * @param removeFailedTip whether to remove the failed tips
|
|
|
|
+ */
|
|
|
|
+ private synchronized TaskInProgress findTaskFromList(
|
|
|
|
+ Collection<TaskInProgress> tips, String taskTracker, boolean removeFailedTip) {
|
|
|
|
+ Iterator<TaskInProgress> iter = tips.iterator();
|
|
|
|
+ while (iter.hasNext()) {
|
|
|
|
+ TaskInProgress tip = iter.next();
|
|
|
|
+
|
|
|
|
+ // Select a tip if
|
|
|
|
+ // 1. runnable : still needs to be run and is not completed
|
|
|
|
+ // 2. ~running : no other node is running it
|
|
|
|
+ // 3. earlier attempt failed : has not failed on this host
|
|
|
|
+ // and has failed on all the other hosts
|
|
|
|
+ // A TIP is removed from the list if
|
|
|
|
+ // (1) this tip is scheduled
|
|
|
|
+ // (2) if the passed list is a level 0 (host) cache
|
|
|
|
+ // (3) when the TIP is non-schedulable (running, killed, complete)
|
|
|
|
+ if (tip.isRunnable() && !tip.isRunning()) {
|
|
|
|
+ // check if the tip has failed on this host
|
|
|
|
+ if (!tip.hasFailedOnMachine(taskTracker) ||
|
|
|
|
+ tip.getNumberOfFailedMachines() >= clusterSize) {
|
|
|
|
+ // check if the tip has failed on all the nodes
|
|
|
|
+ iter.remove();
|
|
|
|
+ return tip;
|
|
|
|
+ } else if (removeFailedTip) {
|
|
|
|
+ // the case where we want to remove a failed tip from the host cache
|
|
|
|
+ // point#3 in the TIP removal logic above
|
|
|
|
+ iter.remove();
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ // see point#3 in the comment above for TIP removal logic
|
|
|
|
+ iter.remove();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Find a speculative task
|
|
|
|
+ * @param list a list of tips
|
|
|
|
+ * @param taskTracker the tracker that has requested a tip
|
|
|
|
+ * @param avgProgress the average progress for speculation
|
|
|
|
+ * @param currentTime current time in milliseconds
|
|
|
|
+ * @param shouldRemove whether to remove the tips
|
|
|
|
+ * @return a tip that can be speculated on the tracker
|
|
|
|
+ */
|
|
|
|
+ private synchronized TaskInProgress findSpeculativeTask(
|
|
|
|
+ Collection<TaskInProgress> list, String taskTracker, double avgProgress,
|
|
|
|
+ long currentTime, boolean shouldRemove) {
|
|
|
|
+
|
|
|
|
+ Iterator<TaskInProgress> iter = list.iterator();
|
|
|
|
+
|
|
|
|
+ while (iter.hasNext()) {
|
|
|
|
+ TaskInProgress tip = iter.next();
|
|
|
|
+ // should never be true! (since we delete completed/failed tasks)
|
|
|
|
+ if (!tip.isRunning()) {
|
|
|
|
+ iter.remove();
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (!tip.hasRunOnMachine(taskTracker)) {
|
|
|
|
+ if (tip.hasSpeculativeTask(currentTime, avgProgress)) {
|
|
|
|
+ // In case of shared list we don't remove it. Since the TIP failed
|
|
|
|
+ // on this tracker can be scheduled on some other tracker.
|
|
|
|
+ if (shouldRemove) {
|
|
|
|
+ iter.remove(); //this tracker is never going to run it again
|
|
|
|
+ }
|
|
|
|
+ return tip;
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ // Check if this tip can be removed from the list.
|
|
|
|
+ // If the list is shared then we should not remove.
|
|
|
|
+ if (shouldRemove) {
|
|
|
|
+ // This tracker will never speculate this tip
|
|
|
|
+ iter.remove();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Find new map task
|
|
* @param tts The task tracker that is asking for a task
|
|
* @param tts The task tracker that is asking for a task
|
|
* @param clusterSize The number of task trackers in the cluster
|
|
* @param clusterSize The number of task trackers in the cluster
|
|
* @param avgProgress The average progress of this kind of task in this job
|
|
* @param avgProgress The average progress of this kind of task in this job
|
|
- * @param tasks The list of potential tasks to try
|
|
|
|
- * @param firstTaskToTry The first index in tasks to check
|
|
|
|
- * @param cachedTasks A list of tasks that would like to run on this node
|
|
|
|
- * @param hasSpeculative Should it try to find speculative tasks
|
|
|
|
* @return the index in tasks of the selected task (or -1 for no task)
|
|
* @return the index in tasks of the selected task (or -1 for no task)
|
|
*/
|
|
*/
|
|
- private int findNewTask(TaskTrackerStatus tts,
|
|
|
|
- int clusterSize,
|
|
|
|
- double avgProgress,
|
|
|
|
- TaskInProgress[] tasks,
|
|
|
|
- Map<Node,List<TaskInProgress>> cachedTasks,
|
|
|
|
- boolean hasSpeculative) {
|
|
|
|
|
|
+ private synchronized int findNewMapTask(TaskTrackerStatus tts,
|
|
|
|
+ int clusterSize,
|
|
|
|
+ double avgProgress) {
|
|
String taskTracker = tts.getTrackerName();
|
|
String taskTracker = tts.getTrackerName();
|
|
- int specTarget = -1;
|
|
|
|
-
|
|
|
|
|
|
+ TaskInProgress tip = null;
|
|
|
|
+
|
|
//
|
|
//
|
|
// Update the last-known clusterSize
|
|
// Update the last-known clusterSize
|
|
//
|
|
//
|
|
this.clusterSize = clusterSize;
|
|
this.clusterSize = clusterSize;
|
|
|
|
|
|
- Node node = jobtracker.getNode(tts.getHost());
|
|
|
|
- //
|
|
|
|
- // Check if too many tasks of this job have failed on this
|
|
|
|
- // tasktracker prior to assigning it a new one.
|
|
|
|
- //
|
|
|
|
- int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
|
|
|
|
- if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) &&
|
|
|
|
- taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- String flakyTracker = convertTrackerNameToHostName(taskTracker);
|
|
|
|
- LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
|
|
|
|
- + "' for assigning a new task");
|
|
|
|
- }
|
|
|
|
|
|
+ if (!shouldRunOnTaskTracker(taskTracker)) {
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
- long currentTime = System.currentTimeMillis();
|
|
|
|
-
|
|
|
|
- //
|
|
|
|
- // See if there is a split over a block that is stored on
|
|
|
|
- // the TaskTracker checking in or the rack it belongs to and so on till
|
|
|
|
- // maxLevel. That means the block
|
|
|
|
- // doesn't have to be transmitted from another node/rack/and so on.
|
|
|
|
- // The way the cache is updated is such that in every lookup, the TIPs
|
|
|
|
- // which are complete is removed. Running/Failed TIPs are not removed
|
|
|
|
- // since we want to have locality optimizations even for FAILED/SPECULATIVE
|
|
|
|
- // tasks.
|
|
|
|
|
|
+
|
|
|
|
+ Node node = jobtracker.getNode(tts.getHost());
|
|
|
|
+
|
|
|
|
+ // For scheduling a map task, we have two caches and a list (optional)
|
|
|
|
+ // I) one for non-running task
|
|
|
|
+ // II) one for running task (this is for handling speculation)
|
|
|
|
+ // III) a list of TIPs that have empty locations (e.g., dummy splits),
|
|
|
|
+ // the list is empty if all TIPs have associated locations
|
|
|
|
+
|
|
|
|
+ // First a look up is done on the non-running cache and on a miss, a look
|
|
|
|
+ // up is done on the running cache. The order for lookup within the cache:
|
|
|
|
+ // 1. from local node to root [bottom up]
|
|
|
|
+ // 2. breadth wise for all the parent nodes at max level
|
|
|
|
+
|
|
|
|
+ // We fall to linear scan of the list (III above) if we have misses in the
|
|
|
|
+ // above caches
|
|
|
|
+
|
|
//
|
|
//
|
|
- if (cachedTasks != null && node != null) {
|
|
|
|
|
|
+ // I) Non-running TIP :
|
|
|
|
+ //
|
|
|
|
+
|
|
|
|
+ // 1. check from local node to the root [bottom up cache lookup]
|
|
|
|
+ // i.e if the cache is available and the host has been resolved
|
|
|
|
+ // (node!=null)
|
|
|
|
+
|
|
|
|
+ if (node != null) {
|
|
Node key = node;
|
|
Node key = node;
|
|
- for (int level = 0; level < maxLevel && key != null; level++) {
|
|
|
|
- List <TaskInProgress> cacheForLevel = cachedTasks.get(key);
|
|
|
|
|
|
+ for (int level = 0; level < maxLevel; ++level) {
|
|
|
|
+ List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
|
|
if (cacheForLevel != null) {
|
|
if (cacheForLevel != null) {
|
|
- Iterator<TaskInProgress> i = cacheForLevel.iterator();
|
|
|
|
- while (i.hasNext()) {
|
|
|
|
- TaskInProgress tip = i.next();
|
|
|
|
- // we remove only those TIPs that are data-local (the host having
|
|
|
|
- // the data is running the task). We don't remove TIPs that are
|
|
|
|
- // rack-local for example since that would negatively impact
|
|
|
|
- // the performance of speculative and failed tasks (imagine a case
|
|
|
|
- // where we schedule one TIP rack-local and after sometime another
|
|
|
|
- // tasktracker from the same rack is asking for a task, and the TIP
|
|
|
|
- // in question has either failed or could be a speculative task
|
|
|
|
- // candidate)
|
|
|
|
- if (tip.isComplete() || level == 0) {
|
|
|
|
- i.remove();
|
|
|
|
- }
|
|
|
|
- if (tip.isRunnable() &&
|
|
|
|
- !tip.isRunning() &&
|
|
|
|
- !tip.hasFailedOnMachine(taskTracker)) {
|
|
|
|
- int cacheTarget = tip.getIdWithinJob();
|
|
|
|
- if (level == 0) {
|
|
|
|
- LOG.info("Choosing data-local task " + tip.getTIPId());
|
|
|
|
- jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
|
|
|
|
- } else if (level == 1){
|
|
|
|
- LOG.info("Choosing rack-local task " + tip.getTIPId());
|
|
|
|
- jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
|
|
|
|
- } else {
|
|
|
|
- LOG.info("Choosing cached task at level " + level + " " +
|
|
|
|
- tip.getTIPId());
|
|
|
|
- }
|
|
|
|
- return cacheTarget;
|
|
|
|
|
|
+ tip = findTaskFromList(cacheForLevel, taskTracker, level == 0);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ // Add to running cache
|
|
|
|
+ scheduleMap(tip);
|
|
|
|
+
|
|
|
|
+ // remove the cache if its empty
|
|
|
|
+ if (cacheForLevel.size() == 0) {
|
|
|
|
+ nonRunningMapCache.remove(key);
|
|
}
|
|
}
|
|
- if (hasSpeculative && specTarget == -1 &&
|
|
|
|
- shouldRunSpeculativeTask(currentTime, tip, avgProgress,
|
|
|
|
- taskTracker)) {
|
|
|
|
- specTarget = tip.getIdWithinJob();
|
|
|
|
|
|
+
|
|
|
|
+ if (level == 0) {
|
|
|
|
+ LOG.info("Choosing data-local task " + tip.getTIPId());
|
|
|
|
+ jobCounters.incrCounter(Counter.DATA_LOCAL_MAPS, 1);
|
|
|
|
+ } else if (level == 1){
|
|
|
|
+ LOG.info("Choosing rack-local task " + tip.getTIPId());
|
|
|
|
+ jobCounters.incrCounter(Counter.RACK_LOCAL_MAPS, 1);
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info("Choosing cached task at level " + level
|
|
|
|
+ + tip.getTIPId());
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ return tip.getIdWithinJob();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
key = key.getParent();
|
|
key = key.getParent();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //2. Search breadth-wise across parents at max level for non-running
|
|
|
|
+ // TIP if
|
|
|
|
+ // - cache exists and there is a cache miss
|
|
|
|
+ // - node information for the tracker is missing (tracker's topology
|
|
|
|
+ // info not obtained yet)
|
|
|
|
+
|
|
|
|
+ // get the node parent at max level
|
|
|
|
+ Node nodeParentAtMaxLevel = JobTracker.getParentNode(node, maxLevel - 1);
|
|
|
|
+ // collection of node at max level in the cache structure
|
|
|
|
+ Collection<Node> nodesAtMaxLevel = jobtracker.getNodesAtMaxLevel();
|
|
|
|
+
|
|
|
|
+ for (Node parent : nodesAtMaxLevel) {
|
|
|
|
+
|
|
|
|
+ // skip the parent that has already been scanned
|
|
|
|
+ if (parent == nodeParentAtMaxLevel) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<TaskInProgress> cache = nonRunningMapCache.get(parent);
|
|
|
|
+ if (cache != null) {
|
|
|
|
+ tip = findTaskFromList(cache, taskTracker, false);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ // Add to the running cache
|
|
|
|
+ scheduleMap(tip);
|
|
|
|
+
|
|
|
|
+ // remove the cache if empty
|
|
|
|
+ if (cache.size() == 0) {
|
|
|
|
+ nonRunningMapCache.remove(parent);
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Choosing a non-local task " + tip.getTIPId());
|
|
|
|
+ return tip.getIdWithinJob();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 3. Search non-local tips for a new task
|
|
|
|
+ tip = findTaskFromList(nonLocalMaps, taskTracker, false);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ // Add to the running list
|
|
|
|
+ scheduleMap(tip);
|
|
|
|
+
|
|
|
|
+ LOG.info("Choosing a non-local task " + tip.getTIPId());
|
|
|
|
+ return tip.getIdWithinJob();
|
|
|
|
+ }
|
|
|
|
|
|
//
|
|
//
|
|
- // If there's no cached target, see if there's
|
|
|
|
- // a std. task to run.
|
|
|
|
- //
|
|
|
|
- int failedTarget = -1;
|
|
|
|
- for (int i = 0; i < tasks.length; i++) {
|
|
|
|
- TaskInProgress task = tasks[i];
|
|
|
|
- if (task.isRunnable()) {
|
|
|
|
- // if it failed here and we haven't tried every machine, we
|
|
|
|
- // don't schedule it here.
|
|
|
|
- boolean hasFailed = task.hasFailedOnMachine(taskTracker);
|
|
|
|
- if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {
|
|
|
|
|
|
+ // II) Running TIP :
|
|
|
|
+ //
|
|
|
|
+
|
|
|
|
+ if (hasSpeculativeMaps) {
|
|
|
|
+ long currentTime = System.currentTimeMillis();
|
|
|
|
+
|
|
|
|
+ // 1. Check bottom up for speculative tasks from the running cache
|
|
|
|
+ if (node != null) {
|
|
|
|
+ Node key = node;
|
|
|
|
+ for (int level = 0; level < maxLevel; ++level) {
|
|
|
|
+ Set<TaskInProgress> cacheForLevel = runningMapCache.get(key);
|
|
|
|
+ if (cacheForLevel != null) {
|
|
|
|
+ tip = findSpeculativeTask(cacheForLevel, taskTracker,
|
|
|
|
+ avgProgress, currentTime, level == 0);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ if (cacheForLevel.size() == 0) {
|
|
|
|
+ runningMapCache.remove(key);
|
|
|
|
+ }
|
|
|
|
+ if (level == 0) {
|
|
|
|
+ LOG.info("Choosing a data-local task " + tip.getTIPId()
|
|
|
|
+ + " for speculation");
|
|
|
|
+ } else if (level == 1){
|
|
|
|
+ LOG.info("Choosing a rack-local task " + tip.getTIPId()
|
|
|
|
+ + " for speculation");
|
|
|
|
+ } else {
|
|
|
|
+ LOG.info("Choosing a cached task at level " + level
|
|
|
|
+ + tip.getTIPId() + " for speculation");
|
|
|
|
+ }
|
|
|
|
+ return tip.getIdWithinJob();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ key = key.getParent();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 2. Check breadth-wise for speculative tasks
|
|
|
|
+
|
|
|
|
+ for (Node parent : nodesAtMaxLevel) {
|
|
|
|
+ // ignore the parent which is already scanned
|
|
|
|
+ if (parent == nodeParentAtMaxLevel) {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
- boolean isRunning = task.isRunning();
|
|
|
|
- if (hasFailed) {
|
|
|
|
- // failed tasks that aren't running can be scheduled as a last
|
|
|
|
- // resort
|
|
|
|
- if (!isRunning && failedTarget == -1) {
|
|
|
|
- failedTarget = i;
|
|
|
|
- }
|
|
|
|
- } else {
|
|
|
|
- if (!isRunning) {
|
|
|
|
- LOG.info("Choosing normal task " + tasks[i].getTIPId());
|
|
|
|
- return i;
|
|
|
|
- } else if (hasSpeculative && specTarget == -1 &&
|
|
|
|
- shouldRunSpeculativeTask(currentTime, task, avgProgress,
|
|
|
|
- taskTracker)) {
|
|
|
|
- specTarget = i;
|
|
|
|
|
|
+
|
|
|
|
+ Set<TaskInProgress> cache = runningMapCache.get(parent);
|
|
|
|
+ if (cache != null) {
|
|
|
|
+ tip = findSpeculativeTask(cache, taskTracker, avgProgress,
|
|
|
|
+ currentTime, false);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ // remove empty cache entries
|
|
|
|
+ if (cache.size() == 0) {
|
|
|
|
+ runningMapCache.remove(parent);
|
|
|
|
+ }
|
|
|
|
+ LOG.info("Choosing a non-local task " + tip.getTIPId()
|
|
|
|
+ + " for speculation");
|
|
|
|
+ return tip.getIdWithinJob();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // 3. Check non-local tips for speculation
|
|
|
|
+ tip = findSpeculativeTask(nonLocalRunningMaps, taskTracker, avgProgress,
|
|
|
|
+ currentTime, false);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ LOG.info("Choosing a non-local task " + tip.getTIPId()
|
|
|
|
+ + " for speculation");
|
|
|
|
+ return tip.getIdWithinJob();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- if (specTarget != -1) {
|
|
|
|
- LOG.info("Choosing speculative task " +
|
|
|
|
- tasks[specTarget].getTIPId());
|
|
|
|
- } else if (failedTarget != -1) {
|
|
|
|
- LOG.info("Choosing failed task " +
|
|
|
|
- tasks[failedTarget].getTIPId());
|
|
|
|
- }
|
|
|
|
|
|
+ return -1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Find new reduce task
|
|
|
|
+ * @param tts The task tracker that is asking for a task
|
|
|
|
+ * @param clusterSize The number of task trackers in the cluster
|
|
|
|
+ * @param avgProgress The average progress of this kind of task in this job
|
|
|
|
+ * @return the index in tasks of the selected task (or -1 for no task)
|
|
|
|
+ */
|
|
|
|
+ private synchronized int findNewReduceTask(TaskTrackerStatus tts,
|
|
|
|
+ int clusterSize,
|
|
|
|
+ double avgProgress) {
|
|
|
|
+ String taskTracker = tts.getTrackerName();
|
|
|
|
+ TaskInProgress tip = null;
|
|
|
|
|
|
- return specTarget != -1 ? specTarget : failedTarget;
|
|
|
|
|
|
+ // Update the last-known clusterSize
|
|
|
|
+ this.clusterSize = clusterSize;
|
|
|
|
+
|
|
|
|
+ if (!shouldRunOnTaskTracker(taskTracker)) {
|
|
|
|
+ return -1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 1. check for a never-executed reduce tip
|
|
|
|
+ // reducers don't have a cache and so pass -1 to explicitly call that out
|
|
|
|
+ tip = findTaskFromList(nonRunningReduces, taskTracker, false);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ scheduleReduce(tip);
|
|
|
|
+ return tip.getIdWithinJob();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 2. check for a reduce tip to be speculated
|
|
|
|
+ if (hasSpeculativeReduces) {
|
|
|
|
+ tip = findSpeculativeTask(runningReduces, taskTracker, avgProgress,
|
|
|
|
+ System.currentTimeMillis(), false);
|
|
|
|
+ if (tip != null) {
|
|
|
|
+ scheduleReduce(tip);
|
|
|
|
+ return tip.getIdWithinJob();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return -1;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private boolean shouldRunOnTaskTracker(String taskTracker) {
|
|
|
|
+ //
|
|
|
|
+ // Check if too many tasks of this job have failed on this
|
|
|
|
+ // tasktracker prior to assigning it a new one.
|
|
|
|
+ //
|
|
|
|
+ int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
|
|
|
|
+ if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) &&
|
|
|
|
+ taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ String flakyTracker = convertTrackerNameToHostName(taskTracker);
|
|
|
|
+ LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker
|
|
|
|
+ + "' for assigning a new task");
|
|
|
|
+ }
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ return true;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -969,10 +1336,14 @@ class JobInProgress {
|
|
runningMapTasks -= 1;
|
|
runningMapTasks -= 1;
|
|
finishedMapTasks += 1;
|
|
finishedMapTasks += 1;
|
|
metrics.completeMap();
|
|
metrics.completeMap();
|
|
|
|
+ // remove the completed map from the resp running caches
|
|
|
|
+ retireMap(tip);
|
|
} else{
|
|
} else{
|
|
runningReduceTasks -= 1;
|
|
runningReduceTasks -= 1;
|
|
finishedReduceTasks += 1;
|
|
finishedReduceTasks += 1;
|
|
metrics.completeReduce();
|
|
metrics.completeReduce();
|
|
|
|
+ // remove the completed reduces from the running reducers set
|
|
|
|
+ retireReduce(tip);
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|
|
@@ -999,23 +1370,15 @@ class JobInProgress {
|
|
* @return
|
|
* @return
|
|
*/
|
|
*/
|
|
private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
|
|
private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
|
|
- boolean allDone = true;
|
|
|
|
- for (int i = 0; i < maps.length; i++) {
|
|
|
|
- if (!(maps[i].isComplete() || maps[i].isFailed())) {
|
|
|
|
- allDone = false;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ // Job is complete if total-tips = finished-tips + failed-tips
|
|
|
|
+ boolean allDone =
|
|
|
|
+ ((finishedMapTasks + failedMapTIPs) == numMapTasks);
|
|
if (allDone) {
|
|
if (allDone) {
|
|
if (tip.isMapTask()) {
|
|
if (tip.isMapTask()) {
|
|
this.status.setMapProgress(1.0f);
|
|
this.status.setMapProgress(1.0f);
|
|
}
|
|
}
|
|
- for (int i = 0; i < reduces.length; i++) {
|
|
|
|
- if (!(reduces[i].isComplete() || reduces[i].isFailed())) {
|
|
|
|
- allDone = false;
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ allDone =
|
|
|
|
+ ((finishedReduceTasks + failedReduceTIPs) == numReduceTasks);
|
|
}
|
|
}
|
|
|
|
|
|
//
|
|
//
|
|
@@ -1079,7 +1442,9 @@ class JobInProgress {
|
|
TaskStatus status, String trackerName,
|
|
TaskStatus status, String trackerName,
|
|
boolean wasRunning, boolean wasComplete,
|
|
boolean wasRunning, boolean wasComplete,
|
|
JobTrackerMetrics metrics) {
|
|
JobTrackerMetrics metrics) {
|
|
-
|
|
|
|
|
|
+ // check if the TIP is already failed
|
|
|
|
+ boolean wasFailed = tip.isFailed();
|
|
|
|
+
|
|
// Mark the taskid as FAILED or KILLED
|
|
// Mark the taskid as FAILED or KILLED
|
|
tip.incompleteSubTask(taskid, trackerName, this.status);
|
|
tip.incompleteSubTask(taskid, trackerName, this.status);
|
|
|
|
|
|
@@ -1090,8 +1455,20 @@ class JobInProgress {
|
|
if (wasRunning && !isRunning) {
|
|
if (wasRunning && !isRunning) {
|
|
if (tip.isMapTask()){
|
|
if (tip.isMapTask()){
|
|
runningMapTasks -= 1;
|
|
runningMapTasks -= 1;
|
|
|
|
+ // remove from the running queue and put it in the non-running cache
|
|
|
|
+ // if the tip is not complete i.e if the tip still needs to be run
|
|
|
|
+ if (!isComplete) {
|
|
|
|
+ retireMap(tip);
|
|
|
|
+ failMap(tip);
|
|
|
|
+ }
|
|
} else {
|
|
} else {
|
|
runningReduceTasks -= 1;
|
|
runningReduceTasks -= 1;
|
|
|
|
+ // remove from the running queue and put in the failed queue if the tip
|
|
|
|
+ // is not complete
|
|
|
|
+ if (!isComplete) {
|
|
|
|
+ retireReduce(tip);
|
|
|
|
+ failReduce(tip);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1100,25 +1477,12 @@ class JobInProgress {
|
|
if (tip.isMapTask()) {
|
|
if (tip.isMapTask()) {
|
|
// Put the task back in the cache. This will help locality for cases
|
|
// Put the task back in the cache. This will help locality for cases
|
|
// where we have a different TaskTracker from the same rack/switch
|
|
// where we have a different TaskTracker from the same rack/switch
|
|
- // asking for a task. Note that we don't add the TIP to the host's cache
|
|
|
|
- // again since we don't execute a failed TIP on the same TT again,
|
|
|
|
- // and also we bother about only those TIPs that were successful
|
|
|
|
|
|
+ // asking for a task.
|
|
|
|
+ // We bother about only those TIPs that were successful
|
|
// earlier (wasComplete and !isComplete)
|
|
// earlier (wasComplete and !isComplete)
|
|
// (since they might have been removed from the cache of other
|
|
// (since they might have been removed from the cache of other
|
|
// racks/switches, if the input split blocks were present there too)
|
|
// racks/switches, if the input split blocks were present there too)
|
|
- for (String host : tip.getSplitLocations()) {
|
|
|
|
- Node node = jobtracker.getNode(host);
|
|
|
|
- for (int level = 1; (node != null && level < maxLevel); level++) {
|
|
|
|
- node = getParentNode(node, level);
|
|
|
|
- if (node == null) {
|
|
|
|
- break;
|
|
|
|
- }
|
|
|
|
- List<TaskInProgress> list = nodesToMaps.get(node);
|
|
|
|
- if (list != null) {
|
|
|
|
- list.add(tip);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
|
|
+ failMap(tip);
|
|
finishedMapTasks -= 1;
|
|
finishedMapTasks -= 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -1176,8 +1540,9 @@ class JobInProgress {
|
|
//
|
|
//
|
|
// Check if we need to kill the job because of too many failures or
|
|
// Check if we need to kill the job because of too many failures or
|
|
// if the job is complete since all component tasks have completed
|
|
// if the job is complete since all component tasks have completed
|
|
- //
|
|
|
|
- if (tip.isFailed()) {
|
|
|
|
|
|
+
|
|
|
|
+ // We do it once per TIP and that too for the task that fails the TIP
|
|
|
|
+ if (!wasFailed && tip.isFailed()) {
|
|
//
|
|
//
|
|
// Allow upto 'mapFailuresPercent' of map tasks to fail or
|
|
// Allow upto 'mapFailuresPercent' of map tasks to fail or
|
|
// 'reduceFailuresPercent' of reduce tasks to fail
|
|
// 'reduceFailuresPercent' of reduce tasks to fail
|
|
@@ -1285,20 +1650,28 @@ class JobInProgress {
|
|
}
|
|
}
|
|
|
|
|
|
cleanUpMetrics();
|
|
cleanUpMetrics();
|
|
|
|
+ // free up the memory used by the data structures
|
|
|
|
+ this.nonRunningMapCache = null;
|
|
|
|
+ this.runningMapCache = null;
|
|
|
|
+ this.nonRunningReduces = null;
|
|
|
|
+ this.runningReduces = null;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Return the TaskInProgress that matches the tipid.
|
|
* Return the TaskInProgress that matches the tipid.
|
|
*/
|
|
*/
|
|
public TaskInProgress getTaskInProgress(String tipid){
|
|
public TaskInProgress getTaskInProgress(String tipid){
|
|
- for (int i = 0; i < maps.length; i++) {
|
|
|
|
- if (tipid.equals(maps[i].getTIPId())){
|
|
|
|
- return maps[i];
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- for (int i = 0; i < reduces.length; i++) {
|
|
|
|
- if (tipid.equals(reduces[i].getTIPId())){
|
|
|
|
- return reduces[i];
|
|
|
|
|
|
+ if (TaskInProgress.isMapId(tipid)) {
|
|
|
|
+ for (int i = 0; i < maps.length; i++) {
|
|
|
|
+ if (tipid.equals(maps[i].getTIPId())){
|
|
|
|
+ return maps[i];
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ for (int i = 0; i < reduces.length; i++) {
|
|
|
|
+ if (tipid.equals(reduces[i].getTIPId())){
|
|
|
|
+ return reduces[i];
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return null;
|
|
return null;
|