|
@@ -344,6 +344,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
while (true) {
|
|
|
try {
|
|
|
TaskTrackerAction action = tasksToCleanup.take();
|
|
|
+ checkJobStatusAndWait(action);
|
|
|
if (action instanceof KillJobAction) {
|
|
|
purgeJob((KillJobAction) action);
|
|
|
} else if (action instanceof KillTaskAction) {
|
|
@@ -367,6 +368,29 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
LOG.info("Received KillTaskAction for task: " + killAction.getTaskID());
|
|
|
purgeTask(tip, false);
|
|
|
}
|
|
|
+
|
|
|
+ private void checkJobStatusAndWait(TaskTrackerAction action)
|
|
|
+ throws InterruptedException {
|
|
|
+ JobID jobId = null;
|
|
|
+ if (action instanceof KillJobAction) {
|
|
|
+ jobId = ((KillJobAction)action).getJobID();
|
|
|
+ } else if (action instanceof KillTaskAction) {
|
|
|
+ jobId = ((KillTaskAction)action).getTaskID().getJobID();
|
|
|
+ } else {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ RunningJob rjob = null;
|
|
|
+ synchronized (runningJobs) {
|
|
|
+ rjob = runningJobs.get(jobId);
|
|
|
+ }
|
|
|
+ if (rjob != null) {
|
|
|
+ synchronized (rjob) {
|
|
|
+ while (rjob.localizing) {
|
|
|
+ rjob.wait();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
public TaskController getTaskController() {
|
|
|
return taskController;
|
|
@@ -951,8 +975,18 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
JobID jobId = t.getJobID();
|
|
|
RunningJob rjob = addTaskToJob(jobId, tip);
|
|
|
InetSocketAddress ttAddr = getTaskTrackerReportAddress();
|
|
|
-
|
|
|
- synchronized (rjob) {
|
|
|
+ try {
|
|
|
+ synchronized (rjob) {
|
|
|
+ if (!rjob.localized) {
|
|
|
+ while (rjob.localizing) {
|
|
|
+ rjob.wait();
|
|
|
+ }
|
|
|
+ if (!rjob.localized) {
|
|
|
+ //this thread is localizing the job
|
|
|
+ rjob.localizing = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
if (!rjob.localized) {
|
|
|
Path localJobConfPath = initializeJob(t, rjob, ttAddr);
|
|
|
JobConf localJobConf = new JobConf(localJobConfPath);
|
|
@@ -963,12 +997,21 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
resetNumTasksPerJvm(localJobConf);
|
|
|
//set the base jobconf path in rjob; all tasks will use
|
|
|
//this as the base path when they run
|
|
|
- rjob.localizedJobConf = localJobConfPath;
|
|
|
- rjob.jobConf = localJobConf;
|
|
|
- rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
|
|
|
- localJobConf.getKeepFailedTaskFiles());
|
|
|
-
|
|
|
- rjob.localized = true;
|
|
|
+ synchronized (rjob) {
|
|
|
+ rjob.localizedJobConf = localJobConfPath;
|
|
|
+ rjob.jobConf = localJobConf;
|
|
|
+ rjob.keepJobFiles = ((localJobConf.getKeepTaskFilesPattern() != null) ||
|
|
|
+ localJobConf.getKeepFailedTaskFiles());
|
|
|
+
|
|
|
+ rjob.localized = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ synchronized (rjob) {
|
|
|
+ if (rjob.localizing) {
|
|
|
+ rjob.localizing = false;
|
|
|
+ rjob.notifyAll();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
synchronized (runningJobs) {
|
|
@@ -1005,15 +1048,17 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
|
|
|
// save local copy of JobToken file
|
|
|
final String localJobTokenFile = localizeJobTokenFile(t.getUser(), jobId);
|
|
|
- rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
|
|
|
+ synchronized (rjob) {
|
|
|
+ rjob.ugi = UserGroupInformation.createRemoteUser(t.getUser());
|
|
|
|
|
|
- Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
|
|
|
- Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
|
|
|
- if (jt != null) { //could be null in the case of some unit tests
|
|
|
- getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
|
|
|
- }
|
|
|
- for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
|
|
|
- rjob.ugi.addToken(token);
|
|
|
+ Credentials ts = TokenCache.loadTokens(localJobTokenFile, conf);
|
|
|
+ Token<JobTokenIdentifier> jt = TokenCache.getJobToken(ts);
|
|
|
+ if (jt != null) { //could be null in the case of some unit tests
|
|
|
+ getJobTokenSecretManager().addTokenForJob(jobId.toString(), jt);
|
|
|
+ }
|
|
|
+ for (Token<? extends TokenIdentifier> token : ts.getAllTokens()) {
|
|
|
+ rjob.ugi.addToken(token);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
FileSystem userFs = getFS(jobFile, jobId, conf);
|
|
@@ -2336,7 +2381,9 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
}
|
|
|
setTaskRunner(task.createRunner(TaskTracker.this, this, rjob));
|
|
|
this.runner.start();
|
|
|
- this.taskStatus.setStartTime(System.currentTimeMillis());
|
|
|
+ long now = System.currentTimeMillis();
|
|
|
+ this.taskStatus.setStartTime(now);
|
|
|
+ this.lastProgressReport = now;
|
|
|
} else {
|
|
|
LOG.info("Not launching task: " + task.getTaskID() +
|
|
|
" since it's state is " + this.taskStatus.getRunState());
|
|
@@ -3197,7 +3244,11 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
private Path localizedJobConf;
|
|
|
// keep this for later use
|
|
|
volatile Set<TaskInProgress> tasks;
|
|
|
+ //the 'localizing' and 'localized' fields have the following
|
|
|
+ //state transitions (first entry is for 'localizing')
|
|
|
+ //{false,false} -> {true,false} -> {false,true}
|
|
|
volatile boolean localized;
|
|
|
+ boolean localizing;
|
|
|
boolean keepJobFiles;
|
|
|
UserGroupInformation ugi;
|
|
|
FetchStatus f;
|
|
@@ -3206,6 +3257,7 @@ public class TaskTracker implements MRConstants, TaskUmbilicalProtocol,
|
|
|
RunningJob(JobID jobid) {
|
|
|
this.jobid = jobid;
|
|
|
localized = false;
|
|
|
+ localizing = false;
|
|
|
tasks = new HashSet<TaskInProgress>();
|
|
|
keepJobFiles = false;
|
|
|
}
|