浏览代码

MAPREDUCE-4431. mapred command should print the reason on killing already completed jobs. Contributed by Devaraj K.

Tsuyoshi Ozawa 10 年之前
父节点
当前提交
ac8d52bf50

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -283,6 +283,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-5335. Rename Job Tracker terminology in ShuffleSchedulerImpl.
     MAPREDUCE-5335. Rename Job Tracker terminology in ShuffleSchedulerImpl.
     (devaraj via ozawa)
     (devaraj via ozawa)
 
 
+    MAPREDUCE-4431. mapred command should print the reason on killing already 
+    completed jobs. (devaraj via ozawa)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

+ 18 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java

@@ -296,9 +296,24 @@ public class CLI extends Configured implements Tool {
         if (job == null) {
         if (job == null) {
           System.out.println("Could not find job " + jobid);
           System.out.println("Could not find job " + jobid);
         } else {
         } else {
-          job.killJob();
-          System.out.println("Killed job " + jobid);
-          exitCode = 0;
+          JobStatus jobStatus = job.getStatus();
+          if (jobStatus.getState() == JobStatus.State.FAILED) {
+            System.out.println("Could not mark the job " + jobid
+                + " as killed, as it has already failed.");
+            exitCode = -1;
+          } else if (jobStatus.getState() == JobStatus.State.KILLED) {
+            System.out
+                .println("The job " + jobid + " has already been killed.");
+            exitCode = -1;
+          } else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) {
+            System.out.println("Could not kill the job " + jobid
+                + ", as it has already succeeded.");
+            exitCode = -1;
+          } else {
+            job.killJob();
+            System.out.println("Killed job " + jobid);
+            exitCode = 0;
+          }
         }
         }
       } else if (setJobPriority) {
       } else if (setJobPriority) {
         Job job = cluster.getJob(JobID.forName(jobid));
         Job job = cluster.getJob(JobID.forName(jobid));

+ 45 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java

@@ -19,11 +19,15 @@ package org.apache.hadoop.mapreduce.tools;
 
 
 import static org.junit.Assert.*;
 import static org.junit.Assert.*;
 
 
+import java.io.IOException;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.junit.Test;
 import org.junit.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
@@ -104,4 +108,45 @@ public class TestCLI {
   private TaskReport[] getTaskReports(JobID jobId, TaskType type) {
   private TaskReport[] getTaskReports(JobID jobId, TaskType type) {
     return new TaskReport[] { new TaskReport(), new TaskReport() };
     return new TaskReport[] { new TaskReport(), new TaskReport() };
   }
   }
+
+  @Test
+  public void testJobKIll() throws Exception {
+    Cluster mockCluster = mock(Cluster.class);
+    CLI cli = spy(new CLI());
+    doReturn(mockCluster).when(cli).createCluster();
+    String jobId1 = "job_1234654654_001";
+    String jobId2 = "job_1234654654_002";
+    String jobId3 = "job_1234654654_003";
+    String jobId4 = "job_1234654654_004";
+    Job mockJob1 = mockJob(mockCluster, jobId1, State.RUNNING);
+    Job mockJob2 = mockJob(mockCluster, jobId2, State.KILLED);
+    Job mockJob3 = mockJob(mockCluster, jobId3, State.FAILED);
+    Job mockJob4 = mockJob(mockCluster, jobId4, State.PREP);
+
+    int exitCode1 = cli.run(new String[] { "-kill", jobId1 });
+    assertEquals(0, exitCode1);
+    verify(mockJob1, times(1)).killJob();
+
+    int exitCode2 = cli.run(new String[] { "-kill", jobId2 });
+    assertEquals(-1, exitCode2);
+    verify(mockJob2, times(0)).killJob();
+
+    int exitCode3 = cli.run(new String[] { "-kill", jobId3 });
+    assertEquals(-1, exitCode3);
+    verify(mockJob3, times(0)).killJob();
+
+    int exitCode4 = cli.run(new String[] { "-kill", jobId4 });
+    assertEquals(0, exitCode4);
+    verify(mockJob4, times(1)).killJob();
+  }
+
+  private Job mockJob(Cluster mockCluster, String jobId, State jobState)
+      throws IOException, InterruptedException {
+    Job mockJob = mock(Job.class);
+    when(mockCluster.getJob(JobID.forName(jobId))).thenReturn(mockJob);
+    JobStatus status = new JobStatus(null, 0, 0, 0, 0, jobState,
+        JobPriority.HIGH, null, null, null, null);
+    when(mockJob.getStatus()).thenReturn(status);
+    return mockJob;
+  }
 }
 }