|
@@ -160,7 +160,6 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* where JobTrackers are cleaned up.
|
|
|
*/
|
|
|
private class ExpireLaunchingTasks implements Runnable {
|
|
|
- private volatile boolean shouldRun = true;
|
|
|
/**
|
|
|
* This is a map of the tasks that have been assigned to task trackers,
|
|
|
* but that have not yet been seen in a status report.
|
|
@@ -170,7 +169,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
new LinkedHashMap<String, Long>();
|
|
|
|
|
|
public void run() {
|
|
|
- while (shouldRun) {
|
|
|
+ while (true) {
|
|
|
try {
|
|
|
// Every 3 minutes check for any tasks that are overdue
|
|
|
Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
|
|
@@ -216,7 +215,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
} catch (InterruptedException ie) {
|
|
|
// all done
|
|
|
- return;
|
|
|
+ break;
|
|
|
} catch (Exception e) {
|
|
|
LOG.error("Expire Launching Task Thread got exception: " +
|
|
|
StringUtils.stringifyException(e));
|
|
@@ -236,17 +235,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
launchingTasks.remove(taskName);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- public void stop() {
|
|
|
- shouldRun = false;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
///////////////////////////////////////////////////////
|
|
|
// Used to expire TaskTrackers that have gone down
|
|
|
///////////////////////////////////////////////////////
|
|
|
class ExpireTrackers implements Runnable {
|
|
|
- boolean shouldRun = true;
|
|
|
public ExpireTrackers() {
|
|
|
}
|
|
|
/**
|
|
@@ -254,7 +248,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* that have not checked in for some time.
|
|
|
*/
|
|
|
public void run() {
|
|
|
- while (shouldRun) {
|
|
|
+ while (true) {
|
|
|
try {
|
|
|
//
|
|
|
// Thread runs periodically to check whether trackers should be expired.
|
|
@@ -305,6 +299,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ } catch (InterruptedException iex) {
|
|
|
+ break;
|
|
|
} catch (Exception t) {
|
|
|
LOG.error("Tracker Expiry Thread got exception: " +
|
|
|
StringUtils.stringifyException(t));
|
|
@@ -312,19 +308,12 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- /**
|
|
|
- * Stop the tracker on next iteration
|
|
|
- */
|
|
|
- public void stopTracker() {
|
|
|
- shouldRun = false;
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
///////////////////////////////////////////////////////
|
|
|
// Used to remove old finished Jobs that have been around for too long
|
|
|
///////////////////////////////////////////////////////
|
|
|
class RetireJobs implements Runnable {
|
|
|
- boolean shouldRun = true;
|
|
|
public RetireJobs() {
|
|
|
}
|
|
|
|
|
@@ -334,7 +323,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
* finished a long time ago.
|
|
|
*/
|
|
|
public void run() {
|
|
|
- while (shouldRun) {
|
|
|
+ while (true) {
|
|
|
try {
|
|
|
Thread.sleep(RETIRE_JOB_CHECK_INTERVAL);
|
|
|
List<JobInProgress> retiredJobs = new ArrayList<JobInProgress>();
|
|
@@ -380,7 +369,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
} catch (InterruptedException t) {
|
|
|
- shouldRun = false;
|
|
|
+ break;
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("Error in retiring job:\n" +
|
|
|
StringUtils.stringifyException(t));
|
|
@@ -393,12 +382,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
// Used to init new jobs that have just been created
|
|
|
/////////////////////////////////////////////////////////////////
|
|
|
class JobInitThread implements Runnable {
|
|
|
- boolean shouldRun = true;
|
|
|
public JobInitThread() {
|
|
|
}
|
|
|
public void run() {
|
|
|
JobInProgress job;
|
|
|
- while (shouldRun) {
|
|
|
+ while (true) {
|
|
|
job = null;
|
|
|
try {
|
|
|
synchronized (jobInitQueue) {
|
|
@@ -409,7 +397,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
job.initTasks();
|
|
|
} catch (InterruptedException t) {
|
|
|
- shouldRun = false;
|
|
|
+ break;
|
|
|
} catch (Throwable t) {
|
|
|
LOG.error("Job initialization failed:\n" +
|
|
|
StringUtils.stringifyException(t));
|
|
@@ -746,19 +734,16 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
LOG.info("Stopping interTrackerServer");
|
|
|
this.interTrackerServer.stop();
|
|
|
}
|
|
|
- if (this.expireTrackers != null) {
|
|
|
+ if (this.expireTrackersThread != null && this.expireTrackersThread.isAlive()) {
|
|
|
LOG.info("Stopping expireTrackers");
|
|
|
- this.expireTrackers.stopTracker();
|
|
|
+ this.expireTrackersThread.interrupt();
|
|
|
try {
|
|
|
- if(expireTrackersThread != null) {
|
|
|
- this.expireTrackersThread.interrupt();
|
|
|
- this.expireTrackersThread.join();
|
|
|
- }
|
|
|
+ this.expireTrackersThread.join();
|
|
|
} catch (InterruptedException ex) {
|
|
|
ex.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
- if (this.retireJobsThread != null) {
|
|
|
+ if (this.retireJobsThread != null && this.retireJobsThread.isAlive()) {
|
|
|
LOG.info("Stopping retirer");
|
|
|
this.retireJobsThread.interrupt();
|
|
|
try {
|
|
@@ -767,7 +752,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
ex.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
- if (this.initJobsThread != null) {
|
|
|
+ if (this.initJobsThread != null && this.initJobsThread.isAlive()) {
|
|
|
LOG.info("Stopping initer");
|
|
|
this.initJobsThread.interrupt();
|
|
|
try {
|
|
@@ -776,11 +761,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
ex.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
- if (this.expireLaunchingTaskThread != null) {
|
|
|
+ if (this.expireLaunchingTaskThread != null && this.expireLaunchingTaskThread.isAlive()) {
|
|
|
LOG.info("Stopping expireLaunchingTasks");
|
|
|
- this.expireLaunchingTasks.stop();
|
|
|
+ this.expireLaunchingTaskThread.interrupt();
|
|
|
try {
|
|
|
- this.expireLaunchingTaskThread.interrupt();
|
|
|
this.expireLaunchingTaskThread.join();
|
|
|
} catch (InterruptedException ex) {
|
|
|
ex.printStackTrace();
|
|
@@ -2032,9 +2016,7 @@ public class JobTracker implements MRConstants, InterTrackerProtocol, JobSubmiss
|
|
|
}
|
|
|
}
|
|
|
} catch (InterruptedException ie) {
|
|
|
- LOG.warn(getName() + " exiting, got interrupted: " +
|
|
|
- StringUtils.stringifyException(ie));
|
|
|
- return;
|
|
|
+ break;
|
|
|
}
|
|
|
catch (Throwable t) {
|
|
|
LOG.error(getName() + " got an exception: " +
|