|
@@ -119,6 +119,9 @@ class JobInProgress {
|
|
private Map<String, Integer> trackerToFailuresMap =
|
|
private Map<String, Integer> trackerToFailuresMap =
|
|
new TreeMap<String, Integer>();
|
|
new TreeMap<String, Integer>();
|
|
|
|
|
|
|
|
+ //Confine estimation algorithms to an "oracle" class that JIP queries.
|
|
|
|
+ private ResourceEstimator resourceEstimator;
|
|
|
|
+
|
|
long startTime;
|
|
long startTime;
|
|
long finishTime;
|
|
long finishTime;
|
|
|
|
|
|
@@ -129,6 +132,7 @@ class JobInProgress {
|
|
private JobID jobId;
|
|
private JobID jobId;
|
|
private boolean hasSpeculativeMaps;
|
|
private boolean hasSpeculativeMaps;
|
|
private boolean hasSpeculativeReduces;
|
|
private boolean hasSpeculativeReduces;
|
|
|
|
+ private long inputLength = 0;
|
|
|
|
|
|
// Per-job counters
|
|
// Per-job counters
|
|
public static enum Counter {
|
|
public static enum Counter {
|
|
@@ -220,6 +224,7 @@ class JobInProgress {
|
|
this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
|
|
this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
|
|
this.nonRunningReduces = new LinkedList<TaskInProgress>();
|
|
this.nonRunningReduces = new LinkedList<TaskInProgress>();
|
|
this.runningReduces = new LinkedHashSet<TaskInProgress>();
|
|
this.runningReduces = new LinkedHashSet<TaskInProgress>();
|
|
|
|
+ this.resourceEstimator = new ResourceEstimator(this);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -335,10 +340,12 @@ class JobInProgress {
|
|
numMapTasks = splits.length;
|
|
numMapTasks = splits.length;
|
|
maps = new TaskInProgress[numMapTasks];
|
|
maps = new TaskInProgress[numMapTasks];
|
|
for(int i=0; i < numMapTasks; ++i) {
|
|
for(int i=0; i < numMapTasks; ++i) {
|
|
|
|
+ inputLength += splits[i].getDataLength();
|
|
maps[i] = new TaskInProgress(jobId, jobFile,
|
|
maps[i] = new TaskInProgress(jobId, jobFile,
|
|
splits[i],
|
|
splits[i],
|
|
jobtracker, conf, this, i);
|
|
jobtracker, conf, this, i);
|
|
}
|
|
}
|
|
|
|
+ LOG.info("Input size for job "+ jobId + " = " + inputLength);
|
|
if (numMapTasks > 0) {
|
|
if (numMapTasks > 0) {
|
|
LOG.info("Split info for job:" + jobId);
|
|
LOG.info("Split info for job:" + jobId);
|
|
nonRunningMapCache = createCache(splits, maxLevel);
|
|
nonRunningMapCache = createCache(splits, maxLevel);
|
|
@@ -434,6 +441,10 @@ class JobInProgress {
|
|
this.priority = priority;
|
|
this.priority = priority;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ long getInputLength() {
|
|
|
|
+ return inputLength;
|
|
|
|
+ }
|
|
|
|
|
|
/**
|
|
/**
|
|
* Get the list of map tasks
|
|
* Get the list of map tasks
|
|
@@ -1076,6 +1087,17 @@ class JobInProgress {
|
|
Node node = jobtracker.getNode(tts.getHost());
|
|
Node node = jobtracker.getNode(tts.getHost());
|
|
Node nodeParentAtMaxLevel = null;
|
|
Node nodeParentAtMaxLevel = null;
|
|
|
|
|
|
|
|
+
|
|
|
|
+ long outSize = resourceEstimator.getEstimatedMapOutputSize();
|
|
|
|
+ if(tts.getAvailableSpace() < outSize) {
|
|
|
|
+ LOG.warn("No room for map task. Node " + node +
|
|
|
|
+ " has " + tts.getAvailableSpace() +
|
|
|
|
+ " bytes free; but we expect map to take " + outSize);
|
|
|
|
+
|
|
|
|
+ return -1; //see if a different TIP might work better.
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
// For scheduling a map task, we have two caches and a list (optional)
|
|
// For scheduling a map task, we have two caches and a list (optional)
|
|
// I) one for non-running task
|
|
// I) one for non-running task
|
|
// II) one for running task (this is for handling speculation)
|
|
// II) one for running task (this is for handling speculation)
|
|
@@ -1272,6 +1294,15 @@ class JobInProgress {
|
|
return -1;
|
|
return -1;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ long outSize = resourceEstimator.getEstimatedReduceInputSize();
|
|
|
|
+ if(tts.getAvailableSpace() < outSize) {
|
|
|
|
+ LOG.warn("No room for reduce task. Node " + taskTracker + " has " +
|
|
|
|
+ tts.getAvailableSpace() +
|
|
|
|
+ " bytes free; but we expect reduce input to take " + outSize);
|
|
|
|
+
|
|
|
|
+ return -1; //see if a different TIP might work better.
|
|
|
|
+ }
|
|
|
|
+
|
|
// 1. check for a never-executed reduce tip
|
|
// 1. check for a never-executed reduce tip
|
|
// reducers don't have a cache and so pass -1 to explicitly call that out
|
|
// reducers don't have a cache and so pass -1 to explicitly call that out
|
|
tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
|
|
tip = findTaskFromList(nonRunningReduces, tts, numUniqueHosts, false);
|
|
@@ -1342,6 +1373,7 @@ class JobInProgress {
|
|
|
|
|
|
// Mark the TIP as complete
|
|
// Mark the TIP as complete
|
|
tip.completed(taskid);
|
|
tip.completed(taskid);
|
|
|
|
+ resourceEstimator.updateWithCompletedTask(status, tip);
|
|
|
|
|
|
// Update jobhistory
|
|
// Update jobhistory
|
|
String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
|
|
String taskTrackerName = jobtracker.getNode(jobtracker.getTaskTracker(
|