|
@@ -296,7 +296,7 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* it, if necessary
|
|
* it, if necessary
|
|
*/
|
|
*/
|
|
synchronized void ensureFreshStatus()
|
|
synchronized void ensureFreshStatus()
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws IOException {
|
|
if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
|
|
if (System.currentTimeMillis() - statustime > MAX_JOBSTATUS_AGE) {
|
|
updateStatus();
|
|
updateStatus();
|
|
}
|
|
}
|
|
@@ -306,13 +306,18 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* immediately
|
|
* immediately
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- synchronized void updateStatus() throws IOException, InterruptedException {
|
|
|
|
- this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
|
|
|
|
- @Override
|
|
|
|
- public JobStatus run() throws IOException, InterruptedException {
|
|
|
|
- return cluster.getClient().getJobStatus(status.getJobID());
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ synchronized void updateStatus() throws IOException {
|
|
|
|
+ try {
|
|
|
|
+ this.status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
|
|
|
|
+ @Override
|
|
|
|
+ public JobStatus run() throws IOException, InterruptedException {
|
|
|
|
+ return cluster.getClient().getJobStatus(status.getJobID());
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ catch (InterruptedException ie) {
|
|
|
|
+ throw new IOException(ie);
|
|
|
|
+ }
|
|
if (this.status == null) {
|
|
if (this.status == null) {
|
|
throw new IOException("Job status not available ");
|
|
throw new IOException("Job status not available ");
|
|
}
|
|
}
|
|
@@ -537,7 +542,7 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @return the progress of the job's map-tasks.
|
|
* @return the progress of the job's map-tasks.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public float mapProgress() throws IOException, InterruptedException {
|
|
|
|
|
|
+ public float mapProgress() throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
ensureFreshStatus();
|
|
ensureFreshStatus();
|
|
return status.getMapProgress();
|
|
return status.getMapProgress();
|
|
@@ -550,7 +555,7 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @return the progress of the job's reduce-tasks.
|
|
* @return the progress of the job's reduce-tasks.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public float reduceProgress() throws IOException, InterruptedException {
|
|
|
|
|
|
+ public float reduceProgress() throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
ensureFreshStatus();
|
|
ensureFreshStatus();
|
|
return status.getReduceProgress();
|
|
return status.getReduceProgress();
|
|
@@ -576,7 +581,7 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @return the progress of the job's setup-tasks.
|
|
* @return the progress of the job's setup-tasks.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public float setupProgress() throws IOException, InterruptedException {
|
|
|
|
|
|
+ public float setupProgress() throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
ensureFreshStatus();
|
|
ensureFreshStatus();
|
|
return status.getSetupProgress();
|
|
return status.getSetupProgress();
|
|
@@ -589,7 +594,7 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @return <code>true</code> if the job is complete, else <code>false</code>.
|
|
* @return <code>true</code> if the job is complete, else <code>false</code>.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public boolean isComplete() throws IOException, InterruptedException {
|
|
|
|
|
|
+ public boolean isComplete() throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
updateStatus();
|
|
updateStatus();
|
|
return status.isJobComplete();
|
|
return status.isJobComplete();
|
|
@@ -601,7 +606,7 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @return <code>true</code> if the job succeeded, else <code>false</code>.
|
|
* @return <code>true</code> if the job succeeded, else <code>false</code>.
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public boolean isSuccessful() throws IOException, InterruptedException {
|
|
|
|
|
|
+ public boolean isSuccessful() throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
updateStatus();
|
|
updateStatus();
|
|
return status.getState() == JobStatus.State.SUCCEEDED;
|
|
return status.getState() == JobStatus.State.SUCCEEDED;
|
|
@@ -613,9 +618,14 @@ public class Job extends JobContextImpl implements JobContext {
|
|
*
|
|
*
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
- public void killJob() throws IOException, InterruptedException {
|
|
|
|
|
|
+ public void killJob() throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
- cluster.getClient().killJob(getJobID());
|
|
|
|
|
|
+ try {
|
|
|
|
+ cluster.getClient().killJob(getJobID());
|
|
|
|
+ }
|
|
|
|
+ catch (InterruptedException ie) {
|
|
|
|
+ throw new IOException(ie);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -673,7 +683,7 @@ public class Job extends JobContextImpl implements JobContext {
|
|
try {
|
|
try {
|
|
return getTaskCompletionEvents(startFrom, 10);
|
|
return getTaskCompletionEvents(startFrom, 10);
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
- throw new RuntimeException(ie);
|
|
|
|
|
|
+ throw new IOException(ie);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -684,13 +694,18 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public boolean killTask(final TaskAttemptID taskId)
|
|
public boolean killTask(final TaskAttemptID taskId)
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
|
- public Boolean run() throws IOException, InterruptedException {
|
|
|
|
- return cluster.getClient().killTask(taskId, false);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ try {
|
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
|
+ public Boolean run() throws IOException, InterruptedException {
|
|
|
|
+ return cluster.getClient().killTask(taskId, false);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ catch (InterruptedException ie) {
|
|
|
|
+ throw new IOException(ie);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -700,14 +715,19 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public boolean failTask(final TaskAttemptID taskId)
|
|
public boolean failTask(final TaskAttemptID taskId)
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
- return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
|
- @Override
|
|
|
|
- public Boolean run() throws IOException, InterruptedException {
|
|
|
|
- return cluster.getClient().killTask(taskId, true);
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ try {
|
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Boolean run() throws IOException, InterruptedException {
|
|
|
|
+ return cluster.getClient().killTask(taskId, true);
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ catch (InterruptedException ie) {
|
|
|
|
+ throw new IOException(ie);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -718,14 +738,19 @@ public class Job extends JobContextImpl implements JobContext {
|
|
* @throws IOException
|
|
* @throws IOException
|
|
*/
|
|
*/
|
|
public Counters getCounters()
|
|
public Counters getCounters()
|
|
- throws IOException, InterruptedException {
|
|
|
|
|
|
+ throws IOException {
|
|
ensureState(JobState.RUNNING);
|
|
ensureState(JobState.RUNNING);
|
|
- return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
|
|
|
|
- @Override
|
|
|
|
- public Counters run() throws IOException, InterruptedException {
|
|
|
|
- return cluster.getClient().getJobCounters(getJobID());
|
|
|
|
- }
|
|
|
|
- });
|
|
|
|
|
|
+ try {
|
|
|
|
+ return ugi.doAs(new PrivilegedExceptionAction<Counters>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Counters run() throws IOException, InterruptedException {
|
|
|
|
+ return cluster.getClient().getJobCounters(getJobID());
|
|
|
|
+ }
|
|
|
|
+ });
|
|
|
|
+ }
|
|
|
|
+ catch (InterruptedException ie) {
|
|
|
|
+ throw new IOException(ie);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|