|
@@ -20,8 +20,12 @@ package org.apache.hadoop.mapreduce.tools;
|
|
|
import java.io.IOException;
|
|
|
import java.io.PrintWriter;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.HashSet;
|
|
|
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -59,6 +63,8 @@ import org.apache.hadoop.yarn.logaggregation.LogDumper;
|
|
|
public class CLI extends Configured implements Tool {
|
|
|
private static final Log LOG = LogFactory.getLog(CLI.class);
|
|
|
protected Cluster cluster;
|
|
|
+ private static final Set<String> taskTypes = new HashSet<String>(
|
|
|
+ Arrays.asList("MAP", "REDUCE"));
|
|
|
|
|
|
public CLI() {
|
|
|
}
|
|
@@ -211,6 +217,11 @@ public class CLI extends Configured implements Tool {
|
|
|
taskType = argv[2];
|
|
|
taskState = argv[3];
|
|
|
displayTasks = true;
|
|
|
+ if (!taskTypes.contains(taskType.toUpperCase())) {
|
|
|
+ System.out.println("Error: Invalid task-type: "+taskType);
|
|
|
+ displayUsage(cmd);
|
|
|
+ return exitCode;
|
|
|
+ }
|
|
|
} else if ("-logs".equals(cmd)) {
|
|
|
if (argv.length == 2 || argv.length ==3) {
|
|
|
logs = true;
|
|
@@ -230,7 +241,7 @@ public class CLI extends Configured implements Tool {
|
|
|
}
|
|
|
|
|
|
// initialize cluster
|
|
|
- cluster = new Cluster(getConf());
|
|
|
+ cluster = createCluster();
|
|
|
|
|
|
// Submit the request
|
|
|
try {
|
|
@@ -363,6 +374,10 @@ public class CLI extends Configured implements Tool {
|
|
|
return exitCode;
|
|
|
}
|
|
|
|
|
|
+ Cluster createCluster() throws IOException {
|
|
|
+ return new Cluster(getConf());
|
|
|
+ }
|
|
|
+
|
|
|
private String getJobPriorityNames() {
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
for (JobPriority p : JobPriority.values()) {
|
|
@@ -371,12 +386,8 @@ public class CLI extends Configured implements Tool {
|
|
|
return sb.substring(0, sb.length()-1);
|
|
|
}
|
|
|
|
|
|
- private String getTaskTypess() {
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
- for (TaskType t : TaskType.values()) {
|
|
|
- sb.append(t.name()).append(" ");
|
|
|
- }
|
|
|
- return sb.substring(0, sb.length()-1);
|
|
|
+ private String getTaskTypes() {
|
|
|
+ return StringUtils.join(taskTypes, " ");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -385,8 +396,8 @@ public class CLI extends Configured implements Tool {
|
|
|
private void displayUsage(String cmd) {
|
|
|
String prefix = "Usage: CLI ";
|
|
|
String jobPriorityValues = getJobPriorityNames();
|
|
|
- String taskTypes = getTaskTypess();
|
|
|
String taskStates = "running, completed";
|
|
|
+
|
|
|
if ("-submit".equals(cmd)) {
|
|
|
System.err.println(prefix + "[" + cmd + " <job-file>]");
|
|
|
} else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
|
|
@@ -414,7 +425,7 @@ public class CLI extends Configured implements Tool {
|
|
|
} else if ("-list-attempt-ids".equals(cmd)) {
|
|
|
System.err.println(prefix + "[" + cmd +
|
|
|
" <job-id> <task-type> <task-state>]. " +
|
|
|
- "Valid values for <task-type> are " + taskTypes + ". " +
|
|
|
+ "Valid values for <task-type> are " + getTaskTypes() + ". " +
|
|
|
"Valid values for <task-state> are " + taskStates);
|
|
|
} else if ("-logs".equals(cmd)) {
|
|
|
System.err.println(prefix + "[" + cmd +
|
|
@@ -435,7 +446,7 @@ public class CLI extends Configured implements Tool {
|
|
|
System.err.printf("\t[-list-blacklisted-trackers]\n");
|
|
|
System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
|
|
|
"<task-state>]. " +
|
|
|
- "Valid values for <task-type> are " + taskTypes + ". " +
|
|
|
+ "Valid values for <task-type> are " + getTaskTypes() + ". " +
|
|
|
"Valid values for <task-state> are " + taskStates);
|
|
|
System.err.printf("\t[-kill-task <task-attempt-id>]\n");
|
|
|
System.err.printf("\t[-fail-task <task-attempt-id>]\n");
|
|
@@ -552,7 +563,7 @@ public class CLI extends Configured implements Tool {
|
|
|
*/
|
|
|
protected void displayTasks(Job job, String type, String state)
|
|
|
throws IOException, InterruptedException {
|
|
|
- TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type));
|
|
|
+ TaskReport[] reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase()));
|
|
|
for (TaskReport report : reports) {
|
|
|
TIPStatus status = report.getCurrentStatus();
|
|
|
if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
|