|
@@ -198,8 +198,11 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
private final float MIN_HEARTBEATS_SCALING_FACTOR = 0.01f;
|
|
|
private final float DEFAULT_HEARTBEATS_SCALING_FACTOR = 1.0f;
|
|
|
|
|
|
+ final static String JT_INIT_CONFIG_KEY_FOR_TESTS =
|
|
|
+ "mapreduce.jobtracker.init.for.tests";
|
|
|
+
|
|
|
public static enum State { INITIALIZING, RUNNING }
|
|
|
- State state = State.INITIALIZING;
|
|
|
+ volatile State state = State.INITIALIZING;
|
|
|
private static final int FS_ACCESS_RETRY_PERIOD = 1000;
|
|
|
static final String JOB_INFO_FILE = "job-info";
|
|
|
static final String JOB_TOKEN_FILE = "jobToken";
|
|
@@ -2029,6 +2032,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
+ " could not be started", t);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ this.initDone.set(conf.getBoolean(JT_INIT_CONFIG_KEY_FOR_TESTS, true));
|
|
|
}
|
|
|
|
|
|
private static SimpleDateFormat getDateFormat() {
|
|
@@ -2169,6 +2174,8 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
completedJobsStoreThread.start();
|
|
|
}
|
|
|
|
|
|
+ // Just for unit-tests
|
|
|
+ waitForInit();
|
|
|
synchronized (this) {
|
|
|
state = State.RUNNING;
|
|
|
}
|
|
@@ -2179,6 +2186,29 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
LOG.info("Stopped interTrackerServer");
|
|
|
}
|
|
|
|
|
|
+ AtomicBoolean initDone = new AtomicBoolean(true);
|
|
|
+ Object initDoneLock = new Object();
|
|
|
+
|
|
|
+ private void waitForInit() {
|
|
|
+ synchronized (initDoneLock) {
|
|
|
+ while (!initDone.get()) {
|
|
|
+ try {
|
|
|
+ LOG.debug("About to wait since initDone = false");
|
|
|
+ initDoneLock.wait();
|
|
|
+ } catch (InterruptedException ie) {
|
|
|
+ LOG.debug("Ignoring ", ie);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void setInitDone(boolean done) {
|
|
|
+ synchronized (initDoneLock) {
|
|
|
+ initDone.set(done);
|
|
|
+ initDoneLock.notify();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
void close() throws IOException {
|
|
|
if (plugins != null) {
|
|
|
for (ServicePlugin p : plugins) {
|
|
@@ -3498,6 +3528,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
* Allocates a new JobId string.
|
|
|
*/
|
|
|
public synchronized JobID getNewJobId() throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
return new JobID(getTrackerIdentifier(), nextJobId++);
|
|
|
}
|
|
|
|
|
@@ -3511,6 +3544,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
return submitJob(jobId, jobSubmitDir, null, ts, false);
|
|
|
}
|
|
|
|
|
@@ -3737,6 +3773,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
// No 'killJob' in safe-mode
|
|
|
checkSafeMode();
|
|
|
|
|
@@ -3883,6 +3922,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
public synchronized void setJobPriority(JobID jobid,
|
|
|
String priority)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (null == job) {
|
|
|
LOG.info("setJobPriority(): JobId " + jobid.toString()
|
|
@@ -3910,7 +3952,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return job.inited();
|
|
|
}
|
|
|
|
|
|
- public JobProfile getJobProfile(JobID jobid) {
|
|
|
+ public JobProfile getJobProfile(JobID jobid) throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
synchronized (this) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
@@ -3927,7 +3972,10 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
return completedJobStatusStore.readJobProfile(jobid);
|
|
|
}
|
|
|
|
|
|
- public JobStatus getJobStatus(JobID jobid) {
|
|
|
+ public JobStatus getJobStatus(JobID jobid) throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
if (null == jobid) {
|
|
|
LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
|
|
|
return null;
|
|
@@ -3950,6 +3998,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
private static final Counters EMPTY_COUNTERS = new Counters();
|
|
|
public Counters getJobCounters(JobID jobid) throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
UserGroupInformation callerUGI = UserGroupInformation.getCurrentUser();
|
|
|
synchronized (this) {
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
@@ -3984,6 +4035,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized TaskReport[] getMapTaskReports(JobID jobid)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
|
// Check authorization
|
|
@@ -4012,6 +4066,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized TaskReport[] getReduceTaskReports(JobID jobid)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
|
// Check authorization
|
|
@@ -4038,6 +4095,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized TaskReport[] getCleanupTaskReports(JobID jobid)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
|
// Check authorization
|
|
@@ -4067,6 +4127,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
public synchronized TaskReport[] getSetupTaskReports(JobID jobid)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
JobInProgress job = jobs.get(jobid);
|
|
|
if (job != null) {
|
|
|
// Check authorization
|
|
@@ -4110,6 +4173,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
public TaskCompletionEvent[] getTaskCompletionEvents(
|
|
|
JobID jobid, int fromEventId, int maxEvents) throws IOException{
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
JobInProgress job = this.jobs.get(jobid);
|
|
|
|
|
|
if (null != job) {
|
|
@@ -4131,6 +4197,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
public synchronized String[] getTaskDiagnostics(TaskAttemptID taskId)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
List<String> taskDiagnosticInfo = null;
|
|
|
JobID jobId = taskId.getJobID();
|
|
|
TaskID tipId = taskId.getTaskID();
|
|
@@ -4195,6 +4264,9 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
*/
|
|
|
public synchronized boolean killTask(TaskAttemptID taskid, boolean shouldFail)
|
|
|
throws IOException {
|
|
|
+ // Check for JobTracker operational state
|
|
|
+ checkJobTrackerState();
|
|
|
+
|
|
|
// No 'killTask' in safe-mode
|
|
|
checkSafeMode();
|
|
|
|
|
@@ -5093,13 +5165,23 @@ public class JobTracker implements MRConstants, InterTrackerProtocol,
|
|
|
|
|
|
private void checkSafeMode() throws SafeModeException {
|
|
|
if (isInSafeMode()) {
|
|
|
- try {
|
|
|
- throw new SafeModeException((
|
|
|
- isInAdminSafeMode()) ? adminSafeModeUser : null);
|
|
|
- } catch (SafeModeException sfe) {
|
|
|
- LOG.info("JobTracker in safe-mode, aborting operation", sfe);
|
|
|
- throw sfe;
|
|
|
- }
|
|
|
+ SafeModeException sme =
|
|
|
+ new SafeModeException(
|
|
|
+ (isInAdminSafeMode()) ? adminSafeModeUser : null);
|
|
|
+ LOG.info("JobTracker in safe-mode, aborting operation: ", sme);
|
|
|
+ throw sme;
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private void checkJobTrackerState()
|
|
|
+ throws JobTrackerNotYetInitializedException {
|
|
|
+ if (state != State.RUNNING) {
|
|
|
+ JobTrackerNotYetInitializedException jtnyie =
|
|
|
+ new JobTrackerNotYetInitializedException();
|
|
|
+ LOG.info("JobTracker not yet in RUNNING state, aborting operation: ",
|
|
|
+ jtnyie);
|
|
|
+ throw jtnyie;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
}
|