|
@@ -131,6 +131,7 @@ public class JobInProgress {
|
|
|
private volatile boolean launchedSetup = false;
|
|
|
private volatile boolean jobKilled = false;
|
|
|
private volatile boolean jobFailed = false;
|
|
|
+ private boolean jobSetupCleanupNeeded = true;
|
|
|
|
|
|
JobPriority priority = JobPriority.NORMAL;
|
|
|
final JobTracker jobtracker;
|
|
@@ -361,6 +362,8 @@ public class JobInProgress {
|
|
|
|
|
|
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
|
|
|
(numMapTasks + numReduceTasks + 10);
|
|
|
+ JobContext jobContext = new JobContext(conf, jobId);
|
|
|
+ this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
|
|
|
try {
|
|
|
this.userUGI = UserGroupInformation.getCurrentUser();
|
|
|
} catch (IOException ie){
|
|
@@ -449,6 +452,9 @@ public class JobInProgress {
|
|
|
|
|
|
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
|
|
|
(numMapTasks + numReduceTasks + 10);
|
|
|
+
|
|
|
+ JobContext jobContext = new JobContext(conf, jobId);
|
|
|
+ this.jobSetupCleanupNeeded = jobContext.getJobSetupCleanupNeeded();
|
|
|
|
|
|
// Construct the jobACLs
|
|
|
status.setJobACLs(jobtracker.getJobACLsManager().constructJobACLs(conf));
|
|
@@ -757,7 +763,35 @@ public class JobInProgress {
|
|
|
|
|
|
// ... use the same for estimating the total output of all maps
|
|
|
resourceEstimator.setThreshhold(completedMapsForReduceSlowstart);
|
|
|
+
|
|
|
+ initSetupCleanupTasks(jobFile);
|
|
|
+
|
|
|
+ synchronized(jobInitKillStatus){
|
|
|
+ jobInitKillStatus.initDone = true;
|
|
|
+
|
|
|
+ // set this before the throw to make sure cleanup works properly
|
|
|
+ tasksInited = true;
|
|
|
+
|
|
|
+ if(jobInitKillStatus.killed) {
|
|
|
+ throw new KillInterruptedException("Job " + jobId + " killed in init");
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
|
|
|
+ numMapTasks, numReduceTasks);
|
|
|
|
|
|
+ // if setup is not needed, mark it complete
|
|
|
+ if (!jobSetupCleanupNeeded) {
|
|
|
+ setupComplete();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initSetupCleanupTasks(String jobFile) {
|
|
|
+ if (!jobSetupCleanupNeeded) {
|
|
|
+ // nothing to initialize
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
// create cleanup two cleanup tips, one map and one reduce.
|
|
|
cleanup = new TaskInProgress[2];
|
|
|
|
|
@@ -787,25 +821,23 @@ public class JobInProgress {
|
|
|
numReduceTasks + 1, jobtracker, conf, this, 1);
|
|
|
setup[1].setJobSetupTask();
|
|
|
|
|
|
- synchronized(jobInitKillStatus){
|
|
|
- jobInitKillStatus.initDone = true;
|
|
|
-
|
|
|
- // set this before the throw to make sure cleanup works properly
|
|
|
- tasksInited = true;
|
|
|
-
|
|
|
- if(jobInitKillStatus.killed) {
|
|
|
- throw new KillInterruptedException("Job " + jobId + " killed in init");
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
|
|
|
- numMapTasks, numReduceTasks);
|
|
|
-
|
|
|
// Log the number of map and reduce tasks
|
|
|
LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
|
|
|
+ " map tasks and " + numReduceTasks + " reduce tasks.");
|
|
|
}
|
|
|
|
|
|
+ private void setupComplete() {
|
|
|
+ status.setSetupProgress(1.0f);
|
|
|
+ if (maps.length == 0 && reduces.length == 0 && !jobSetupCleanupNeeded) {
|
|
|
+ jobComplete();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (this.status.getRunState() == JobStatus.PREP) {
|
|
|
+ this.status.setRunState(JobStatus.RUNNING);
|
|
|
+ JobHistory.JobInfo.logStarted(profile.getJobID());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
|
|
|
throws IOException {
|
|
|
TaskSplitMetaInfo[] allTaskSplitMetaInfo =
|
|
@@ -2622,13 +2654,7 @@ public class JobInProgress {
|
|
|
if (tip.isJobSetupTask()) {
|
|
|
// setup task has finished. kill the extra setup tip
|
|
|
killSetupTip(!tip.isMapTask());
|
|
|
- // Job can start running now.
|
|
|
- this.status.setSetupProgress(1.0f);
|
|
|
- // move the job to running state if the job is in prep state
|
|
|
- if (this.status.getRunState() == JobStatus.PREP) {
|
|
|
- changeStateTo(JobStatus.RUNNING);
|
|
|
- JobHistory.JobInfo.logStarted(profile.getJobID());
|
|
|
- }
|
|
|
+ setupComplete();
|
|
|
} else if (tip.isJobCleanupTask()) {
|
|
|
// cleanup task has finished. Kill the extra cleanup tip
|
|
|
if (tip.isMapTask()) {
|
|
@@ -2687,6 +2713,9 @@ public class JobInProgress {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ if (!jobSetupCleanupNeeded && canLaunchJobCleanupTask()) {
|
|
|
+ jobComplete();
|
|
|
+ }
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -2747,7 +2776,8 @@ public class JobInProgress {
|
|
|
//
|
|
|
// All tasks are complete, then the job is done!
|
|
|
//
|
|
|
- if (this.status.getRunState() == JobStatus.RUNNING ) {
|
|
|
+ if (this.status.getRunState() == JobStatus.RUNNING ||
|
|
|
+ this.status.getRunState() == JobStatus.PREP) {
|
|
|
changeStateTo(JobStatus.SUCCEEDED);
|
|
|
this.status.setCleanupProgress(1.0f);
|
|
|
if (maps.length == 0) {
|
|
@@ -2881,6 +2911,9 @@ public class JobInProgress {
|
|
|
for (int i = 0; i < reduces.length; i++) {
|
|
|
reduces[i].kill();
|
|
|
}
|
|
|
+ if (!jobSetupCleanupNeeded) {
|
|
|
+ terminateJob(jobTerminationState);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3169,7 +3202,9 @@ public class JobInProgress {
|
|
|
}
|
|
|
|
|
|
boolean isSetupFinished() {
|
|
|
- if (setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
|
|
|
+ // if there is no setup to be launched, consider setup is finished.
|
|
|
+ if ((tasksInited && setup.length == 0) ||
|
|
|
+ setup[0].isComplete() || setup[0].isFailed() || setup[1].isComplete()
|
|
|
|| setup[1].isFailed()) {
|
|
|
return true;
|
|
|
}
|
|
@@ -3283,10 +3318,12 @@ public class JobInProgress {
|
|
|
*/
|
|
|
public synchronized TaskInProgress getTaskInProgress(TaskID tipid) {
|
|
|
if (tipid.isMap()) {
|
|
|
- if (tipid.equals(cleanup[0].getTIPId())) { // cleanup map tip
|
|
|
+ // cleanup map tip
|
|
|
+ if (cleanup.length > 0 && tipid.equals(cleanup[0].getTIPId())) {
|
|
|
return cleanup[0];
|
|
|
}
|
|
|
- if (tipid.equals(setup[0].getTIPId())) { //setup map tip
|
|
|
+ // setup map tip
|
|
|
+ if (setup.length > 0 && tipid.equals(setup[0].getTIPId())) {
|
|
|
return setup[0];
|
|
|
}
|
|
|
for (int i = 0; i < maps.length; i++) {
|
|
@@ -3295,10 +3332,12 @@ public class JobInProgress {
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
- if (tipid.equals(cleanup[1].getTIPId())) { // cleanup reduce tip
|
|
|
+ // cleanup reduce tip
|
|
|
+ if (cleanup.length > 0 && tipid.equals(cleanup[1].getTIPId())) {
|
|
|
return cleanup[1];
|
|
|
}
|
|
|
- if (tipid.equals(setup[1].getTIPId())) { //setup reduce tip
|
|
|
+ // setup reduce tip
|
|
|
+ if (setup.length > 0 && tipid.equals(setup[1].getTIPId())) {
|
|
|
return setup[1];
|
|
|
}
|
|
|
for (int i = 0; i < reduces.length; i++) {
|