|
@@ -40,7 +40,6 @@ import org.apache.hadoop.metrics.Metrics;
|
|
|
* @author Mike Cafarella
|
|
|
*******************************************************/
|
|
|
public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmissionProtocol {
|
|
|
- static long JOBINIT_SLEEP_INTERVAL = 2000;
|
|
|
static long RETIRE_JOB_INTERVAL;
|
|
|
static long RETIRE_JOB_CHECK_INTERVAL;
|
|
|
static float TASK_ALLOC_EPSILON;
|
|
@@ -269,11 +268,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
*/
|
|
|
public void run() {
|
|
|
while (shouldRun) {
|
|
|
- try {
|
|
|
- Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
|
|
|
- } catch (InterruptedException ie) {
|
|
|
- }
|
|
|
-
|
|
|
+ try {
|
|
|
+ Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
|
|
|
+
|
|
|
synchronized (jobs) {
|
|
|
synchronized (jobsByArrival) {
|
|
|
synchronized (jobInitQueue) {
|
|
@@ -293,11 +290,14 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ } catch (InterruptedException t) {
|
|
|
+ shouldRun = false;
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error in retiring job:\n" +
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- public void stopRetirer() {
|
|
|
- shouldRun = false;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
/////////////////////////////////////////////////////////////////
|
|
@@ -308,31 +308,27 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
public JobInitThread() {
|
|
|
}
|
|
|
public void run() {
|
|
|
- while (shouldRun) {
|
|
|
- JobInProgress job = null;
|
|
|
- synchronized (jobInitQueue) {
|
|
|
- if (jobInitQueue.size() > 0) {
|
|
|
- job = (JobInProgress) jobInitQueue.elementAt(0);
|
|
|
- jobInitQueue.remove(job);
|
|
|
- } else {
|
|
|
- try {
|
|
|
- jobInitQueue.wait(JOBINIT_SLEEP_INTERVAL);
|
|
|
- } catch (InterruptedException iex) {
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- try {
|
|
|
- if (job != null) {
|
|
|
- job.initTasks();
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("job init failed", e);
|
|
|
- job.kill();
|
|
|
+ JobInProgress job;
|
|
|
+ while (shouldRun) {
|
|
|
+ job = null;
|
|
|
+ try {
|
|
|
+ synchronized (jobInitQueue) {
|
|
|
+ while (jobInitQueue.isEmpty()) {
|
|
|
+ jobInitQueue.wait();
|
|
|
}
|
|
|
+ job = jobInitQueue.remove(0);
|
|
|
+ }
|
|
|
+ job.initTasks();
|
|
|
+ } catch (InterruptedException t) {
|
|
|
+ shouldRun = false;
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Job initialization failed:\n" +
|
|
|
+ StringUtils.stringifyException(t));
|
|
|
+ if (job != null) {
|
|
|
+ job.kill();
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
- public void stopIniter() {
|
|
|
- shouldRun = false;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -430,7 +426,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
int totalMaps = 0;
|
|
|
int totalReduces = 0;
|
|
|
private TreeMap taskTrackers = new TreeMap();
|
|
|
- Vector jobInitQueue = new Vector();
|
|
|
+ List<JobInProgress> jobInitQueue = new ArrayList();
|
|
|
ExpireTrackers expireTrackers = new ExpireTrackers();
|
|
|
Thread expireTrackersThread = null;
|
|
|
RetireJobs retireJobs = new RetireJobs();
|
|
@@ -438,7 +434,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
JobInitThread initJobs = new JobInitThread();
|
|
|
Thread initJobsThread = null;
|
|
|
ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
|
|
|
- Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
|
|
|
+ Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks,
|
|
|
+ "expireLaunchingTasks");
|
|
|
|
|
|
/**
|
|
|
* It might seem like a bug to maintain a TreeSet of status objects,
|
|
@@ -524,11 +521,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
this.startTime = System.currentTimeMillis();
|
|
|
|
|
|
myMetrics = new JobTrackerMetrics();
|
|
|
- this.expireTrackersThread = new Thread(this.expireTrackers);
|
|
|
+ this.expireTrackersThread = new Thread(this.expireTrackers,
|
|
|
+ "expireTrackers");
|
|
|
this.expireTrackersThread.start();
|
|
|
- this.retireJobsThread = new Thread(this.retireJobs);
|
|
|
+ this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
|
|
|
this.retireJobsThread.start();
|
|
|
- this.initJobsThread = new Thread(this.initJobs);
|
|
|
+ this.initJobsThread = new Thread(this.initJobs, "initJobs");
|
|
|
this.initJobsThread.start();
|
|
|
expireLaunchingTaskThread.start();
|
|
|
}
|
|
@@ -582,9 +580,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
if (this.retireJobs != null) {
|
|
|
LOG.info("Stopping retirer");
|
|
|
- this.retireJobs.stopRetirer();
|
|
|
+ this.retireJobsThread.interrupt();
|
|
|
try {
|
|
|
- this.retireJobsThread.interrupt();
|
|
|
this.retireJobsThread.join();
|
|
|
} catch (InterruptedException ex) {
|
|
|
ex.printStackTrace();
|
|
@@ -592,9 +589,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
if (this.initJobs != null) {
|
|
|
LOG.info("Stopping initer");
|
|
|
- this.initJobs.stopIniter();
|
|
|
+ this.initJobsThread.interrupt();
|
|
|
try {
|
|
|
- this.initJobsThread.interrupt();
|
|
|
this.initJobsThread.join();
|
|
|
} catch (InterruptedException ex) {
|
|
|
ex.printStackTrace();
|