|
@@ -26,7 +26,6 @@ import java.util.Set;
|
|
import java.util.HashSet;
|
|
import java.util.HashSet;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
|
|
|
|
-import com.google.common.annotations.VisibleForTesting;
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.lang.StringUtils;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
@@ -44,7 +43,6 @@ import org.apache.hadoop.mapreduce.Job;
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
import org.apache.hadoop.mapreduce.JobID;
|
|
import org.apache.hadoop.mapreduce.JobPriority;
|
|
import org.apache.hadoop.mapreduce.JobPriority;
|
|
import org.apache.hadoop.mapreduce.JobStatus;
|
|
import org.apache.hadoop.mapreduce.JobStatus;
|
|
-import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
import org.apache.hadoop.mapreduce.TaskAttemptID;
|
|
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
|
import org.apache.hadoop.mapreduce.TaskCompletionEvent;
|
|
import org.apache.hadoop.mapreduce.TaskReport;
|
|
import org.apache.hadoop.mapreduce.TaskReport;
|
|
@@ -270,7 +268,7 @@ public class CLI extends Configured implements Tool {
|
|
System.out.println("Created job " + job.getJobID());
|
|
System.out.println("Created job " + job.getJobID());
|
|
exitCode = 0;
|
|
exitCode = 0;
|
|
} else if (getStatus) {
|
|
} else if (getStatus) {
|
|
- Job job = getJob(JobID.forName(jobid));
|
|
|
|
|
|
+ Job job = cluster.getJob(JobID.forName(jobid));
|
|
if (job == null) {
|
|
if (job == null) {
|
|
System.out.println("Could not find job " + jobid);
|
|
System.out.println("Could not find job " + jobid);
|
|
} else {
|
|
} else {
|
|
@@ -285,7 +283,7 @@ public class CLI extends Configured implements Tool {
|
|
exitCode = 0;
|
|
exitCode = 0;
|
|
}
|
|
}
|
|
} else if (getCounter) {
|
|
} else if (getCounter) {
|
|
- Job job = getJob(JobID.forName(jobid));
|
|
|
|
|
|
+ Job job = cluster.getJob(JobID.forName(jobid));
|
|
if (job == null) {
|
|
if (job == null) {
|
|
System.out.println("Could not find job " + jobid);
|
|
System.out.println("Could not find job " + jobid);
|
|
} else {
|
|
} else {
|
|
@@ -301,7 +299,7 @@ public class CLI extends Configured implements Tool {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else if (killJob) {
|
|
} else if (killJob) {
|
|
- Job job = getJob(JobID.forName(jobid));
|
|
|
|
|
|
+ Job job = cluster.getJob(JobID.forName(jobid));
|
|
if (job == null) {
|
|
if (job == null) {
|
|
System.out.println("Could not find job " + jobid);
|
|
System.out.println("Could not find job " + jobid);
|
|
} else {
|
|
} else {
|
|
@@ -325,7 +323,7 @@ public class CLI extends Configured implements Tool {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} else if (setJobPriority) {
|
|
} else if (setJobPriority) {
|
|
- Job job = getJob(JobID.forName(jobid));
|
|
|
|
|
|
+ Job job = cluster.getJob(JobID.forName(jobid));
|
|
if (job == null) {
|
|
if (job == null) {
|
|
System.out.println("Could not find job " + jobid);
|
|
System.out.println("Could not find job " + jobid);
|
|
} else {
|
|
} else {
|
|
@@ -341,7 +339,7 @@ public class CLI extends Configured implements Tool {
|
|
viewHistory(historyFile, viewAllHistory);
|
|
viewHistory(historyFile, viewAllHistory);
|
|
exitCode = 0;
|
|
exitCode = 0;
|
|
} else if (listEvents) {
|
|
} else if (listEvents) {
|
|
- listEvents(getJob(JobID.forName(jobid)), fromEvent, nEvents);
|
|
|
|
|
|
+ listEvents(cluster.getJob(JobID.forName(jobid)), fromEvent, nEvents);
|
|
exitCode = 0;
|
|
exitCode = 0;
|
|
} else if (listJobs) {
|
|
} else if (listJobs) {
|
|
listJobs(cluster);
|
|
listJobs(cluster);
|
|
@@ -356,11 +354,11 @@ public class CLI extends Configured implements Tool {
|
|
listBlacklistedTrackers(cluster);
|
|
listBlacklistedTrackers(cluster);
|
|
exitCode = 0;
|
|
exitCode = 0;
|
|
} else if (displayTasks) {
|
|
} else if (displayTasks) {
|
|
- displayTasks(getJob(JobID.forName(jobid)), taskType, taskState);
|
|
|
|
|
|
+ displayTasks(cluster.getJob(JobID.forName(jobid)), taskType, taskState);
|
|
exitCode = 0;
|
|
exitCode = 0;
|
|
} else if(killTask) {
|
|
} else if(killTask) {
|
|
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
|
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
|
- Job job = getJob(taskID.getJobID());
|
|
|
|
|
|
+ Job job = cluster.getJob(taskID.getJobID());
|
|
if (job == null) {
|
|
if (job == null) {
|
|
System.out.println("Could not find job " + jobid);
|
|
System.out.println("Could not find job " + jobid);
|
|
} else if (job.killTask(taskID, false)) {
|
|
} else if (job.killTask(taskID, false)) {
|
|
@@ -372,7 +370,7 @@ public class CLI extends Configured implements Tool {
|
|
}
|
|
}
|
|
} else if(failTask) {
|
|
} else if(failTask) {
|
|
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
|
TaskAttemptID taskID = TaskAttemptID.forName(taskid);
|
|
- Job job = getJob(taskID.getJobID());
|
|
|
|
|
|
+ Job job = cluster.getJob(taskID.getJobID());
|
|
if (job == null) {
|
|
if (job == null) {
|
|
System.out.println("Could not find job " + jobid);
|
|
System.out.println("Could not find job " + jobid);
|
|
} else if(job.killTask(taskID, true)) {
|
|
} else if(job.killTask(taskID, true)) {
|
|
@@ -533,29 +531,6 @@ public class CLI extends Configured implements Tool {
|
|
protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
|
|
protected static String getTaskLogURL(TaskAttemptID taskId, String baseUrl) {
|
|
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
|
|
return (baseUrl + "/tasklog?plaintext=true&attemptid=" + taskId);
|
|
}
|
|
}
|
|
-
|
|
|
|
- @VisibleForTesting
|
|
|
|
- Job getJob(JobID jobid) throws IOException, InterruptedException {
|
|
|
|
-
|
|
|
|
- int maxRetry = getConf().getInt(MRJobConfig.MR_CLIENT_JOB_MAX_RETRIES,
|
|
|
|
- MRJobConfig.DEFAULT_MR_CLIENT_JOB_MAX_RETRIES);
|
|
|
|
- long retryInterval = getConf()
|
|
|
|
- .getLong(MRJobConfig.MR_CLIENT_JOB_RETRY_INTERVAL,
|
|
|
|
- MRJobConfig.DEFAULT_MR_CLIENT_JOB_RETRY_INTERVAL);
|
|
|
|
- Job job = cluster.getJob(jobid);
|
|
|
|
-
|
|
|
|
- for (int i = 0; i < maxRetry; ++i) {
|
|
|
|
- if (job != null) {
|
|
|
|
- return job;
|
|
|
|
- }
|
|
|
|
- LOG.info("Could not obtain job info after " + String.valueOf(i + 1)
|
|
|
|
- + " attempt(s). Sleeping for " + String.valueOf(retryInterval / 1000)
|
|
|
|
- + " seconds and retrying.");
|
|
|
|
- Thread.sleep(retryInterval);
|
|
|
|
- job = cluster.getJob(jobid);
|
|
|
|
- }
|
|
|
|
- return job;
|
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
/**
|