|
@@ -183,7 +183,8 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
* a JobProfile object to provide some info, and interacts with the
|
|
* a JobProfile object to provide some info, and interacts with the
|
|
* remote service to provide certain functionality.
|
|
* remote service to provide certain functionality.
|
|
*/
|
|
*/
|
|
- class NetworkedJob implements RunningJob {
|
|
|
|
|
|
+ static class NetworkedJob implements RunningJob {
|
|
|
|
+ private JobSubmissionProtocol jobSubmitClient;
|
|
JobProfile profile;
|
|
JobProfile profile;
|
|
JobStatus status;
|
|
JobStatus status;
|
|
long statustime;
|
|
long statustime;
|
|
@@ -191,16 +192,26 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
/**
|
|
/**
|
|
* We store a JobProfile and a timestamp for when we last
|
|
* We store a JobProfile and a timestamp for when we last
|
|
* acquired the job profile. If the job is null, then we cannot
|
|
* acquired the job profile. If the job is null, then we cannot
|
|
- * perform any of the tasks. The job might be null if the JobTracker
|
|
|
|
- * has completely forgotten about the job. (eg, 24 hours after the
|
|
|
|
- * job completes.)
|
|
|
|
|
|
+ * perform any of the tasks, so we throw an exception.
|
|
|
|
+ * The job might be null if the JobTracker has completely forgotten
|
|
|
|
+ * about the job. (eg, 24 hours after the job completes.)
|
|
*/
|
|
*/
|
|
- public NetworkedJob(JobStatus job) throws IOException {
|
|
|
|
|
|
+ public NetworkedJob(JobStatus job, JobProfile prof, JobSubmissionProtocol jobSubmitClient) throws IOException {
|
|
this.status = job;
|
|
this.status = job;
|
|
- this.profile = jobSubmitClient.getJobProfile(job.getJobID());
|
|
|
|
|
|
+ this.profile = prof;
|
|
|
|
+ this.jobSubmitClient = jobSubmitClient;
|
|
|
|
+ if(this.status == null) {
|
|
|
|
+ throw new IOException("The Job status cannot be null");
|
|
|
|
+ }
|
|
|
|
+ if(this.profile == null) {
|
|
|
|
+ throw new IOException("The Job profile cannot be null");
|
|
|
|
+ }
|
|
|
|
+ if(this.jobSubmitClient == null) {
|
|
|
|
+ throw new IOException("The Job Submission Protocol cannot be null");
|
|
|
|
+ }
|
|
this.statustime = System.currentTimeMillis();
|
|
this.statustime = System.currentTimeMillis();
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Some methods rely on having a recent job profile object. Refresh
|
|
* Some methods rely on having a recent job profile object. Refresh
|
|
* it, if necessary
|
|
* it, if necessary
|
|
@@ -217,6 +228,9 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
*/
|
|
*/
|
|
synchronized void updateStatus() throws IOException {
|
|
synchronized void updateStatus() throws IOException {
|
|
this.status = jobSubmitClient.getJobStatus(profile.getJobID());
|
|
this.status = jobSubmitClient.getJobStatus(profile.getJobID());
|
|
|
|
+ if(this.status == null) {
|
|
|
|
+ throw new IOException("The job appears to have been removed.");
|
|
|
|
+ }
|
|
this.statustime = System.currentTimeMillis();
|
|
this.statustime = System.currentTimeMillis();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -863,8 +877,9 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
printTokens(jobId, jobCopy.getCredentials());
|
|
printTokens(jobId, jobCopy.getCredentials());
|
|
status = jobSubmitClient.submitJob(
|
|
status = jobSubmitClient.submitJob(
|
|
jobId, submitJobDir.toString(), jobCopy.getCredentials());
|
|
jobId, submitJobDir.toString(), jobCopy.getCredentials());
|
|
- if (status != null) {
|
|
|
|
- return new NetworkedJob(status);
|
|
|
|
|
|
+ JobProfile prof = jobSubmitClient.getJobProfile(jobId);
|
|
|
|
+ if (status != null && prof != null) {
|
|
|
|
+ return new NetworkedJob(status, prof, jobSubmitClient);
|
|
} else {
|
|
} else {
|
|
throw new IOException("Could not launch job");
|
|
throw new IOException("Could not launch job");
|
|
}
|
|
}
|
|
@@ -1011,8 +1026,9 @@ public class JobClient extends Configured implements MRConstants, Tool {
|
|
*/
|
|
*/
|
|
public RunningJob getJob(JobID jobid) throws IOException {
|
|
public RunningJob getJob(JobID jobid) throws IOException {
|
|
JobStatus status = jobSubmitClient.getJobStatus(jobid);
|
|
JobStatus status = jobSubmitClient.getJobStatus(jobid);
|
|
- if (status != null) {
|
|
|
|
- return new NetworkedJob(status);
|
|
|
|
|
|
+ JobProfile profile = jobSubmitClient.getJobProfile(jobid);
|
|
|
|
+ if (status != null && profile != null) {
|
|
|
|
+ return new NetworkedJob(status, profile, jobSubmitClient);
|
|
} else {
|
|
} else {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|