|
@@ -21,6 +21,8 @@ import java.io.IOException;
|
|
|
import java.security.PrivilegedExceptionAction;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Comparator;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.IdentityHashMap;
|
|
@@ -286,6 +288,17 @@ class JobInProgress {
|
|
|
this.anyCacheLevel = this.maxLevel+1;
|
|
|
this.jobtracker = tracker;
|
|
|
this.restartCount = 0;
|
|
|
+ hasSpeculativeMaps = conf.getMapSpeculativeExecution();
|
|
|
+ hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
|
|
|
+ this.nonLocalMaps = new LinkedList<TaskInProgress>();
|
|
|
+ this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
|
|
|
+ this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
|
|
|
+ this.nonRunningReduces = new LinkedList<TaskInProgress>();
|
|
|
+ this.runningReduces = new LinkedHashSet<TaskInProgress>();
|
|
|
+ this.resourceEstimator = new ResourceEstimator(this);
|
|
|
+ this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
|
|
|
+ this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
|
|
|
+ (numMapTasks + numReduceTasks + 10);
|
|
|
try {
|
|
|
this.userUGI = UserGroupInformation.getCurrentUser();
|
|
|
} catch (IOException ie){
|
|
@@ -319,7 +332,7 @@ class JobInProgress {
|
|
|
this.jobtracker = jobtracker;
|
|
|
this.status = new JobStatus(jobId, 0.0f, 0.0f, JobStatus.PREP);
|
|
|
this.jobtracker.getInstrumentation().addPrepJob(conf, jobId);
|
|
|
- this.startTime = System.currentTimeMillis();
|
|
|
+ this.startTime = jobtracker.getClock().getTime();
|
|
|
status.setStartTime(startTime);
|
|
|
this.localFs = jobtracker.getLocalFileSystem();
|
|
|
|
|
@@ -430,7 +443,7 @@ class JobInProgress {
|
|
|
|
|
|
for (int i = 0; i < splits.length; i++) {
|
|
|
String[] splitLocations = splits[i].getLocations();
|
|
|
- if (splitLocations.length == 0) {
|
|
|
+ if (splitLocations == null || splitLocations.length == 0) {
|
|
|
nonLocalMaps.add(maps[i]);
|
|
|
continue;
|
|
|
}
|
|
@@ -588,7 +601,7 @@ class JobInProgress {
|
|
|
}
|
|
|
|
|
|
// set the launch time
|
|
|
- this.launchTime = System.currentTimeMillis();
|
|
|
+ this.launchTime = jobtracker.getClock().getTime();
|
|
|
|
|
|
//
|
|
|
// Create reduce tasks
|
|
@@ -1586,7 +1599,7 @@ class JobInProgress {
|
|
|
Map<TaskTracker, FallowSlotInfo> map =
|
|
|
(type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
|
|
|
|
|
|
- long now = System.currentTimeMillis();
|
|
|
+ long now = jobtracker.getClock().getTime();
|
|
|
|
|
|
FallowSlotInfo info = map.get(taskTracker);
|
|
|
int reservedSlots = 0;
|
|
@@ -1632,7 +1645,7 @@ class JobInProgress {
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- long now = System.currentTimeMillis();
|
|
|
+ long now = jobtracker.getClock().getTime();
|
|
|
|
|
|
Enum<Counter> counter =
|
|
|
(type == TaskType.MAP) ?
|
|
@@ -1720,7 +1733,7 @@ class JobInProgress {
|
|
|
String[] splitLocations = tip.getSplitLocations();
|
|
|
|
|
|
// Remove the TIP from the list for running non-local maps
|
|
|
- if (splitLocations.length == 0) {
|
|
|
+ if (splitLocations == null || splitLocations.length == 0) {
|
|
|
nonLocalRunningMaps.remove(tip);
|
|
|
return;
|
|
|
}
|
|
@@ -1760,7 +1773,7 @@ class JobInProgress {
|
|
|
* Adds a map tip to the list of running maps.
|
|
|
* @param tip the tip that needs to be scheduled as running
|
|
|
*/
|
|
|
- private synchronized void scheduleMap(TaskInProgress tip) {
|
|
|
+ protected synchronized void scheduleMap(TaskInProgress tip) {
|
|
|
|
|
|
if (runningMapCache == null) {
|
|
|
LOG.warn("Running cache for maps is missing!! "
|
|
@@ -1770,7 +1783,7 @@ class JobInProgress {
|
|
|
String[] splitLocations = tip.getSplitLocations();
|
|
|
|
|
|
// Add the TIP to the list of non-local running TIPs
|
|
|
- if (splitLocations.length == 0) {
|
|
|
+ if (splitLocations == null || splitLocations.length == 0) {
|
|
|
nonLocalRunningMaps.add(tip);
|
|
|
return;
|
|
|
}
|
|
@@ -1795,7 +1808,7 @@ class JobInProgress {
|
|
|
* Adds a reduce tip to the list of running reduces
|
|
|
* @param tip the tip that needs to be scheduled as running
|
|
|
*/
|
|
|
- private synchronized void scheduleReduce(TaskInProgress tip) {
|
|
|
+ protected synchronized void scheduleReduce(TaskInProgress tip) {
|
|
|
if (runningReduces == null) {
|
|
|
LOG.warn("Running cache for reducers missing!! "
|
|
|
+ "Job details are missing.");
|
|
@@ -1822,7 +1835,7 @@ class JobInProgress {
|
|
|
String[] splitLocations = tip.getSplitLocations();
|
|
|
|
|
|
// Add the TIP in the front of the list for non-local non-running maps
|
|
|
- if (splitLocations.length == 0) {
|
|
|
+ if (splitLocations == null || splitLocations.length == 0) {
|
|
|
nonLocalMaps.add(0, tip);
|
|
|
return;
|
|
|
}
|
|
@@ -2106,7 +2119,7 @@ class JobInProgress {
|
|
|
//
|
|
|
|
|
|
if (hasSpeculativeMaps) {
|
|
|
- long currentTime = System.currentTimeMillis();
|
|
|
+ long currentTime = jobtracker.getClock().getTime();
|
|
|
|
|
|
// 1. Check bottom up for speculative tasks from the running cache
|
|
|
if (node != null) {
|
|
@@ -2214,7 +2227,7 @@ class JobInProgress {
|
|
|
// 2. check for a reduce tip to be speculated
|
|
|
if (hasSpeculativeReduces) {
|
|
|
tip = findSpeculativeTask(runningReduces, tts, avgProgress,
|
|
|
- System.currentTimeMillis(), false);
|
|
|
+ jobtracker.getClock().getTime(), false);
|
|
|
if (tip != null) {
|
|
|
scheduleReduce(tip);
|
|
|
return tip.getIdWithinJob();
|
|
@@ -2435,7 +2448,7 @@ class JobInProgress {
|
|
|
if (reduces.length == 0) {
|
|
|
this.status.setReduceProgress(1.0f);
|
|
|
}
|
|
|
- this.finishTime = System.currentTimeMillis();
|
|
|
+ this.finishTime = jobtracker.getClock().getTime();
|
|
|
LOG.info("Job " + this.status.getJobID() +
|
|
|
" has completed successfully.");
|
|
|
|
|
@@ -2460,7 +2473,7 @@ class JobInProgress {
|
|
|
private synchronized void terminateJob(int jobTerminationState) {
|
|
|
if ((status.getRunState() == JobStatus.RUNNING) ||
|
|
|
(status.getRunState() == JobStatus.PREP)) {
|
|
|
- this.finishTime = System.currentTimeMillis();
|
|
|
+ this.finishTime = jobtracker.getClock().getTime();
|
|
|
this.status.setMapProgress(1.0f);
|
|
|
this.status.setReduceProgress(1.0f);
|
|
|
this.status.setCleanupProgress(1.0f);
|
|
@@ -2848,10 +2861,10 @@ class JobInProgress {
|
|
|
// update the actual start-time of the attempt
|
|
|
TaskStatus oldStatus = tip.getTaskStatus(taskid);
|
|
|
long startTime = oldStatus == null
|
|
|
- ? System.currentTimeMillis()
|
|
|
+ ? jobtracker.getClock().getTime()
|
|
|
: oldStatus.getStartTime();
|
|
|
status.setStartTime(startTime);
|
|
|
- status.setFinishTime(System.currentTimeMillis());
|
|
|
+ status.setFinishTime(jobtracker.getClock().getTime());
|
|
|
boolean wasComplete = tip.isComplete();
|
|
|
updateTaskStatus(tip, status);
|
|
|
boolean isComplete = tip.isComplete();
|