|
@@ -68,6 +68,9 @@ public class TaskTracker
|
|
|
|
|
|
Server taskReportServer = null;
|
|
|
InterTrackerProtocol jobClient;
|
|
|
+
|
|
|
+ // last heartbeat response recieved
|
|
|
+ short heartbeatResponseId = -1;
|
|
|
|
|
|
StatusHttpServer server = null;
|
|
|
|
|
@@ -187,7 +190,7 @@ public class TaskTracker
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
static String getCacheSubdir() {
|
|
|
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
|
|
|
}
|
|
@@ -451,15 +454,23 @@ public class TaskTracker
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (!transmitHeartBeat()) {
|
|
|
+ // Send the heartbeat and process the jobtracker's directives
|
|
|
+ HeartbeatResponse heartbeatResponse = transmitHeartBeat();
|
|
|
+ TaskTrackerAction[] actions = heartbeatResponse.getActions();
|
|
|
+ LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
|
|
|
+ heartbeatResponse.getResponseId() + " and " +
|
|
|
+ ((actions != null) ? actions.length : 0) + " actions");
|
|
|
+
|
|
|
+ if (reinitTaskTracker(actions)) {
|
|
|
return State.STALE;
|
|
|
}
|
|
|
+
|
|
|
lastHeartbeat = now;
|
|
|
justStarted = false;
|
|
|
|
|
|
- checkForNewTasks();
|
|
|
+ checkAndStartNewTasks(actions);
|
|
|
markUnresponsiveTasks();
|
|
|
- closeCompletedTasks();
|
|
|
+ closeCompletedTasks(actions);
|
|
|
killOverflowingTasks();
|
|
|
|
|
|
//we've cleaned up, resume normal operation
|
|
@@ -491,56 +502,94 @@ public class TaskTracker
|
|
|
* @return false if the tracker was unknown
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private boolean transmitHeartBeat() throws IOException {
|
|
|
+ private HeartbeatResponse transmitHeartBeat() throws IOException {
|
|
|
//
|
|
|
// Build the heartbeat information for the JobTracker
|
|
|
//
|
|
|
- List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
|
|
|
+ List<TaskStatus> taskReports =
|
|
|
+ new ArrayList<TaskStatus>(runningTasks.size());
|
|
|
synchronized (this) {
|
|
|
- for (TaskInProgress tip: runningTasks.values()) {
|
|
|
- taskReports.add(tip.createStatus());
|
|
|
- }
|
|
|
+ for (TaskInProgress tip: runningTasks.values()) {
|
|
|
+ taskReports.add(tip.createStatus());
|
|
|
+ }
|
|
|
}
|
|
|
TaskTrackerStatus status =
|
|
|
new TaskTrackerStatus(taskTrackerName, localHostname,
|
|
|
- httpPort, taskReports,
|
|
|
- failures);
|
|
|
-
|
|
|
+ httpPort, taskReports,
|
|
|
+ failures);
|
|
|
+
|
|
|
+ //
|
|
|
+ // Check if we should ask for a new Task
|
|
|
+ //
|
|
|
+ boolean askForNewTask = false;
|
|
|
+ if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
|
|
|
+ acceptNewTasks) {
|
|
|
+ checkLocalDirs(fConf.getLocalDirs());
|
|
|
+
|
|
|
+ if (enoughFreeSpace(minSpaceStart)) {
|
|
|
+ askForNewTask = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
//
|
|
|
// Xmit the heartbeat
|
|
|
//
|
|
|
+ HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
|
|
|
+ justStarted, askForNewTask,
|
|
|
+ heartbeatResponseId);
|
|
|
+ heartbeatResponseId = heartbeatResponse.getResponseId();
|
|
|
|
|
|
- int resultCode = jobClient.emitHeartbeat(status, justStarted);
|
|
|
synchronized (this) {
|
|
|
- for (TaskStatus taskStatus: taskReports) {
|
|
|
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
- if (taskStatus.getIsMap()) {
|
|
|
- mapTotal--;
|
|
|
- } else {
|
|
|
- reduceTotal--;
|
|
|
- }
|
|
|
- myMetrics.completeTask();
|
|
|
- runningTasks.remove(taskStatus.getTaskId());
|
|
|
+ for (TaskStatus taskStatus : taskReports) {
|
|
|
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
|
|
|
+ if (taskStatus.getIsMap()) {
|
|
|
+ mapTotal--;
|
|
|
+ } else {
|
|
|
+ reduceTotal--;
|
|
|
}
|
|
|
+ myMetrics.completeTask();
|
|
|
+ runningTasks.remove(taskStatus.getTaskId());
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
|
|
|
+ return heartbeatResponse;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Check if the jobtracker directed a 'reset' of the tasktracker.
|
|
|
+ *
|
|
|
+ * @param actions the directives of the jobtracker for the tasktracker.
|
|
|
+ * @return <code>true</code> if tasktracker is to be reset,
|
|
|
+ * <code>false</code> otherwise.
|
|
|
+ */
|
|
|
+ private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
|
|
|
+ if (actions != null) {
|
|
|
+ for (TaskTrackerAction action : actions) {
|
|
|
+ if (action.getActionId() ==
|
|
|
+ TaskTrackerAction.ActionType.REINIT_TRACKER) {
|
|
|
+ LOG.info("Recieved RenitTrackerAction from JobTracker");
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Check to see if there are any new tasks that we should run.
|
|
|
* @throws IOException
|
|
|
*/
|
|
|
- private void checkForNewTasks() throws IOException {
|
|
|
- //
|
|
|
- // Check if we should ask for a new Task
|
|
|
- //
|
|
|
- if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
|
|
|
- acceptNewTasks) {
|
|
|
- checkLocalDirs(fConf.getLocalDirs());
|
|
|
-
|
|
|
- if (enoughFreeSpace(minSpaceStart)) {
|
|
|
- Task t = jobClient.pollForNewTask(taskTrackerName);
|
|
|
+ private void checkAndStartNewTasks(TaskTrackerAction[] actions)
|
|
|
+ throws IOException {
|
|
|
+ if (actions == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (TaskTrackerAction action : actions) {
|
|
|
+ if (action.getActionId() ==
|
|
|
+ TaskTrackerAction.ActionType.LAUNCH_TASK) {
|
|
|
+ Task t = ((LaunchTaskAction)(action)).getTask();
|
|
|
+ LOG.info("LaunchTaskAction: " + t.getTaskId());
|
|
|
if (t != null) {
|
|
|
startNewTask(t);
|
|
|
}
|
|
@@ -573,24 +622,73 @@ public class TaskTracker
|
|
|
* Ask the JobTracker if there are any tasks that we should clean up,
|
|
|
* either because we don't need them any more or because the job is done.
|
|
|
*/
|
|
|
- private void closeCompletedTasks() throws IOException {
|
|
|
- String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
|
|
|
- if (toCloseIds != null) {
|
|
|
- synchronized (this) {
|
|
|
- for (int i = 0; i < toCloseIds.length; i++) {
|
|
|
- TaskInProgress tip = tasks.get(toCloseIds[i]);
|
|
|
- if (tip != null) {
|
|
|
- // remove the task from running jobs, removing the job if
|
|
|
- // it is the last task
|
|
|
- removeTaskFromJob(tip.getTask().getJobId(), tip);
|
|
|
- tasksToCleanup.put(tip);
|
|
|
+ private void closeCompletedTasks(TaskTrackerAction[] actions)
|
|
|
+ throws IOException {
|
|
|
+ if (actions == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ for (TaskTrackerAction action : actions) {
|
|
|
+ TaskTrackerAction.ActionType actionType = action.getActionId();
|
|
|
+
|
|
|
+ if (actionType == TaskTrackerAction.ActionType.KILL_JOB) {
|
|
|
+ String jobId = ((KillJobAction)action).getJobId();
|
|
|
+ LOG.info("Received 'KillJobAction' for job: " + jobId);
|
|
|
+ synchronized (runningJobs) {
|
|
|
+ RunningJob rjob = runningJobs.get(jobId);
|
|
|
+ if (rjob == null) {
|
|
|
+ LOG.warn("Unknown job " + jobId + " being deleted.");
|
|
|
} else {
|
|
|
- LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
|
|
|
+ synchronized (rjob) {
|
|
|
+ int noJobTasks = rjob.tasks.size();
|
|
|
+ int taskCtr = 0;
|
|
|
+
|
|
|
+ // Add this tips of this job to queue of tasks to be purged
|
|
|
+ for (TaskInProgress tip : rjob.tasks) {
|
|
|
+ // Purge the job files for the last element in rjob.tasks
|
|
|
+ if (++taskCtr == noJobTasks) {
|
|
|
+ tip.setPurgeJobFiles(true);
|
|
|
+ }
|
|
|
+
|
|
|
+ tasksToCleanup.put(tip);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Remove this job
|
|
|
+ rjob.tasks.clear();
|
|
|
+ runningJobs.remove(jobId);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) {
|
|
|
+ String taskId = ((KillTaskAction)action).getTaskId();
|
|
|
+ LOG.info("Received KillTaskAction for task: " + taskId);
|
|
|
+ purgeTask(tasks.get(taskId), false);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Remove the tip and update all relevant state.
|
|
|
+ *
|
|
|
+ * @param tip {@link TaskInProgress} to be removed.
|
|
|
+ * @param purgeJobFiles <code>true</code> if the job files are to be
|
|
|
+ * purged, <code>false</code> otherwise.
|
|
|
+ */
|
|
|
+ private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) {
|
|
|
+ if (tip != null) {
|
|
|
+ LOG.info("About to purge task: " + tip.getTask().getTaskId());
|
|
|
+
|
|
|
+ // Cleanup the job files?
|
|
|
+ tip.setPurgeJobFiles(purgeJobFiles);
|
|
|
+
|
|
|
+ // Remove the task from running jobs,
|
|
|
+ // removing the job if it's the last task
|
|
|
+ removeTaskFromJob(tip.getTask().getJobId(), tip);
|
|
|
+
|
|
|
+ // Add this tip to queue of tasks to be purged
|
|
|
+ tasksToCleanup.put(tip);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
/** Check if we're dangerously low on disk space
|
|
|
* If so, kill jobs to free up space and make sure
|
|
@@ -822,6 +920,9 @@ public class TaskTracker
|
|
|
private boolean alwaysKeepTaskFiles;
|
|
|
private TaskStatus taskStatus ;
|
|
|
private boolean keepJobFiles;
|
|
|
+
|
|
|
+ /** Cleanup the job files when the job is complete (done/failed) */
|
|
|
+ private boolean purgeJobFiles = false;
|
|
|
|
|
|
/**
|
|
|
*/
|
|
@@ -886,6 +987,10 @@ public class TaskTracker
|
|
|
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
|
|
|
}
|
|
|
|
|
|
+ public void setPurgeJobFiles(boolean purgeJobFiles) {
|
|
|
+ this.purgeJobFiles = purgeJobFiles;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
*/
|
|
|
public synchronized TaskStatus createStatus() {
|
|
@@ -1017,32 +1122,39 @@ public class TaskTracker
|
|
|
* We no longer need anything from this task, as the job has
|
|
|
* finished. If the task is still running, kill it (and clean up
|
|
|
*/
|
|
|
- public synchronized void jobHasFinished() throws IOException {
|
|
|
-
|
|
|
- if (getRunState() == TaskStatus.State.RUNNING) {
|
|
|
+ public void jobHasFinished() throws IOException {
|
|
|
+ boolean killTask = false;
|
|
|
+ synchronized(this){
|
|
|
+ killTask = (getRunState() == TaskStatus.State.RUNNING);
|
|
|
+ if (killTask) {
|
|
|
killAndCleanup(false);
|
|
|
- } else {
|
|
|
- cleanup();
|
|
|
- }
|
|
|
- if (keepJobFiles)
|
|
|
- return;
|
|
|
-
|
|
|
- // Delete temp directory in case any task used PhasedFileSystem.
|
|
|
- try{
|
|
|
- String systemDir = task.getConf().get("mapred.system.dir");
|
|
|
- Path taskTempDir = new Path(systemDir + "/" +
|
|
|
- task.getJobId() + "/" + task.getTipId());
|
|
|
- if( fs.exists(taskTempDir)){
|
|
|
- fs.delete(taskTempDir) ;
|
|
|
}
|
|
|
- }catch(IOException e){
|
|
|
- LOG.warn("Error in deleting reduce temporary output",e);
|
|
|
+ }
|
|
|
+ if (!killTask) {
|
|
|
+ cleanup();
|
|
|
+ }
|
|
|
+ if (keepJobFiles)
|
|
|
+ return;
|
|
|
+
|
|
|
+ synchronized(this){
|
|
|
+ // Delete temp directory in case any task used PhasedFileSystem.
|
|
|
+ try{
|
|
|
+ String systemDir = task.getConf().get("mapred.system.dir");
|
|
|
+ Path taskTempDir = new Path(systemDir + "/" +
|
|
|
+ task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId());
|
|
|
+ if( fs.exists(taskTempDir)){
|
|
|
+ fs.delete(taskTempDir) ;
|
|
|
+ }
|
|
|
+ }catch(IOException e){
|
|
|
+ LOG.warn("Error in deleting reduce temporary output",e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Delete the job directory for this
|
|
|
+ // task if the job is done/failed
|
|
|
+ if (purgeJobFiles) {
|
|
|
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
|
|
|
+ JOBCACHE + Path.SEPARATOR + task.getJobId());
|
|
|
}
|
|
|
-
|
|
|
- // delete the job diretory for this task
|
|
|
- // since the job is done/failed
|
|
|
- this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
|
|
|
- JOBCACHE + Path.SEPARATOR + task.getJobId());
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1090,6 +1202,9 @@ public class TaskTracker
|
|
|
* We no longer need anything from this task. Either the
|
|
|
* controlling job is all done and the files have been copied
|
|
|
* away, or the task failed and we don't need the remains.
|
|
|
+ * Any calls to cleanup should not lock the tip first.
|
|
|
+ * cleanup does the right thing- updates tasks in Tasktracker
|
|
|
+ * by locking tasktracker first and then locks the tip.
|
|
|
*/
|
|
|
void cleanup() throws IOException {
|
|
|
String taskId = task.getTaskId();
|