|
@@ -26,6 +26,7 @@ import java.util.Set;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Arrays;
|
|
|
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
@@ -67,7 +68,9 @@ public class CLI extends Configured implements Tool {
|
|
|
protected Cluster cluster;
|
|
|
private final Set<String> taskStates = new HashSet<String>(
|
|
|
Arrays.asList("pending", "running", "completed", "failed", "killed"));
|
|
|
-
|
|
|
+ private static final Set<String> taskTypes = new HashSet<String>(
|
|
|
+ Arrays.asList("MAP", "REDUCE"));
|
|
|
+
|
|
|
public CLI() {
|
|
|
}
|
|
|
|
|
@@ -219,6 +222,11 @@ public class CLI extends Configured implements Tool {
|
|
|
taskType = argv[2];
|
|
|
taskState = argv[3];
|
|
|
displayTasks = true;
|
|
|
+ if (!taskTypes.contains(taskType.toUpperCase())) {
|
|
|
+ System.out.println("Error: Invalid task-type: "+taskType);
|
|
|
+ displayUsage(cmd);
|
|
|
+ return exitCode;
|
|
|
+ }
|
|
|
} else if ("-logs".equals(cmd)) {
|
|
|
if (argv.length == 2 || argv.length ==3) {
|
|
|
logs = true;
|
|
@@ -238,7 +246,7 @@ public class CLI extends Configured implements Tool {
|
|
|
}
|
|
|
|
|
|
// initialize cluster
|
|
|
- cluster = new Cluster(getConf());
|
|
|
+ cluster = createCluster();
|
|
|
|
|
|
// Submit the request
|
|
|
try {
|
|
@@ -371,6 +379,10 @@ public class CLI extends Configured implements Tool {
|
|
|
return exitCode;
|
|
|
}
|
|
|
|
|
|
+ Cluster createCluster() throws IOException {
|
|
|
+ return new Cluster(getConf());
|
|
|
+ }
|
|
|
+
|
|
|
private String getJobPriorityNames() {
|
|
|
StringBuffer sb = new StringBuffer();
|
|
|
for (JobPriority p : JobPriority.values()) {
|
|
@@ -379,22 +391,18 @@ public class CLI extends Configured implements Tool {
|
|
|
return sb.substring(0, sb.length()-1);
|
|
|
}
|
|
|
|
|
|
- private String getTaskTypess() {
|
|
|
- StringBuffer sb = new StringBuffer();
|
|
|
- for (TaskType t : TaskType.values()) {
|
|
|
- sb.append(t.name()).append(" ");
|
|
|
- }
|
|
|
- return sb.substring(0, sb.length()-1);
|
|
|
+ private String getTaskTypes() {
|
|
|
+ return StringUtils.join(taskTypes, " ");
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* Display usage of the command-line tool and terminate execution.
|
|
|
*/
|
|
|
private void displayUsage(String cmd) {
|
|
|
String prefix = "Usage: CLI ";
|
|
|
String jobPriorityValues = getJobPriorityNames();
|
|
|
- String taskTypes = getTaskTypess();
|
|
|
String taskStates = "running, completed";
|
|
|
+
|
|
|
if ("-submit".equals(cmd)) {
|
|
|
System.err.println(prefix + "[" + cmd + " <job-file>]");
|
|
|
} else if ("-status".equals(cmd) || "-kill".equals(cmd)) {
|
|
@@ -422,7 +430,7 @@ public class CLI extends Configured implements Tool {
|
|
|
} else if ("-list-attempt-ids".equals(cmd)) {
|
|
|
System.err.println(prefix + "[" + cmd +
|
|
|
" <job-id> <task-type> <task-state>]. " +
|
|
|
- "Valid values for <task-type> are " + taskTypes + ". " +
|
|
|
+ "Valid values for <task-type> are " + getTaskTypes() + ". " +
|
|
|
"Valid values for <task-state> are " + taskStates);
|
|
|
} else if ("-logs".equals(cmd)) {
|
|
|
System.err.println(prefix + "[" + cmd +
|
|
@@ -443,7 +451,7 @@ public class CLI extends Configured implements Tool {
|
|
|
System.err.printf("\t[-list-blacklisted-trackers]%n");
|
|
|
System.err.println("\t[-list-attempt-ids <job-id> <task-type> " +
|
|
|
"<task-state>]. " +
|
|
|
- "Valid values for <task-type> are " + taskTypes + ". " +
|
|
|
+ "Valid values for <task-type> are " + getTaskTypes() + ". " +
|
|
|
"Valid values for <task-state> are " + taskStates);
|
|
|
System.err.printf("\t[-kill-task <task-attempt-id>]%n");
|
|
|
System.err.printf("\t[-fail-task <task-attempt-id>]%n");
|
|
@@ -563,18 +571,13 @@ public class CLI extends Configured implements Tool {
|
|
|
*/
|
|
|
protected void displayTasks(Job job, String type, String state)
|
|
|
throws IOException, InterruptedException {
|
|
|
-
|
|
|
+
|
|
|
if (!taskStates.contains(state)) {
|
|
|
throw new java.lang.IllegalArgumentException("Invalid state: " + state +
|
|
|
". Valid states for task are: pending, running, completed, failed, killed.");
|
|
|
}
|
|
|
TaskReport[] reports=null;
|
|
|
- try{
|
|
|
- reports = job.getTaskReports(TaskType.valueOf(type));
|
|
|
- }catch(IllegalArgumentException e){
|
|
|
- throw new IllegalArgumentException("Invalid type: " + type +
|
|
|
- ". Valid types for task are: MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP.");
|
|
|
- }
|
|
|
+ reports = job.getTaskReports(TaskType.valueOf(type.toUpperCase()));
|
|
|
for (TaskReport report : reports) {
|
|
|
TIPStatus status = report.getCurrentStatus();
|
|
|
if ((state.equals("pending") && status ==TIPStatus.PENDING) ||
|