Browse Source

MAPREDUCE-4431. mapred command should print the reason on killing already completed jobs. Contributed by Devaraj K.

(cherry picked from commit ac8d52bf50bba1a29489ee75fd90717d8a2b0cc9)
Tsuyoshi Ozawa 10 năm trước cách đây
mục cha
commit
941a923514

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -49,6 +49,9 @@ Release 2.7.0 - UNRELEASED
     MAPREDUCE-5335. Rename Job Tracker terminology in ShuffleSchedulerImpl.
     (devaraj via ozawa)
 
+    MAPREDUCE-4431. mapred command should print the reason on killing already 
+    completed jobs. (devaraj via ozawa)
+
   OPTIMIZATIONS
 
     MAPREDUCE-6169. MergeQueue should release reference to the current item 

+ 18 - 3
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/tools/CLI.java

@@ -296,9 +296,24 @@ public class CLI extends Configured implements Tool {
         if (job == null) {
           System.out.println("Could not find job " + jobid);
         } else {
-          job.killJob();
-          System.out.println("Killed job " + jobid);
-          exitCode = 0;
+          JobStatus jobStatus = job.getStatus();
+          if (jobStatus.getState() == JobStatus.State.FAILED) {
+            System.out.println("Could not mark the job " + jobid
+                + " as killed, as it has already failed.");
+            exitCode = -1;
+          } else if (jobStatus.getState() == JobStatus.State.KILLED) {
+            System.out
+                .println("The job " + jobid + " has already been killed.");
+            exitCode = -1;
+          } else if (jobStatus.getState() == JobStatus.State.SUCCEEDED) {
+            System.out.println("Could not kill the job " + jobid
+                + ", as it has already succeeded.");
+            exitCode = -1;
+          } else {
+            job.killJob();
+            System.out.println("Killed job " + jobid);
+            exitCode = 0;
+          }
         }
       } else if (setJobPriority) {
         Job job = cluster.getJob(JobID.forName(jobid));

+ 45 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/tools/TestCLI.java

@@ -19,11 +19,15 @@ package org.apache.hadoop.mapreduce.tools;
 
 import static org.junit.Assert.*;
 
+import java.io.IOException;
 import org.apache.hadoop.mapreduce.Cluster;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.TaskReport;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.JobPriority;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.JobStatus.State;
 import org.junit.Test;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -104,4 +108,45 @@ public class TestCLI {
   private TaskReport[] getTaskReports(JobID jobId, TaskType type) {
     return new TaskReport[] { new TaskReport(), new TaskReport() };
   }
+
+  @Test
+  public void testJobKIll() throws Exception {
+    Cluster mockCluster = mock(Cluster.class);
+    CLI cli = spy(new CLI());
+    doReturn(mockCluster).when(cli).createCluster();
+    String jobId1 = "job_1234654654_001";
+    String jobId2 = "job_1234654654_002";
+    String jobId3 = "job_1234654654_003";
+    String jobId4 = "job_1234654654_004";
+    Job mockJob1 = mockJob(mockCluster, jobId1, State.RUNNING);
+    Job mockJob2 = mockJob(mockCluster, jobId2, State.KILLED);
+    Job mockJob3 = mockJob(mockCluster, jobId3, State.FAILED);
+    Job mockJob4 = mockJob(mockCluster, jobId4, State.PREP);
+
+    int exitCode1 = cli.run(new String[] { "-kill", jobId1 });
+    assertEquals(0, exitCode1);
+    verify(mockJob1, times(1)).killJob();
+
+    int exitCode2 = cli.run(new String[] { "-kill", jobId2 });
+    assertEquals(-1, exitCode2);
+    verify(mockJob2, times(0)).killJob();
+
+    int exitCode3 = cli.run(new String[] { "-kill", jobId3 });
+    assertEquals(-1, exitCode3);
+    verify(mockJob3, times(0)).killJob();
+
+    int exitCode4 = cli.run(new String[] { "-kill", jobId4 });
+    assertEquals(0, exitCode4);
+    verify(mockJob4, times(1)).killJob();
+  }
+
+  private Job mockJob(Cluster mockCluster, String jobId, State jobState)
+      throws IOException, InterruptedException {
+    Job mockJob = mock(Job.class);
+    when(mockCluster.getJob(JobID.forName(jobId))).thenReturn(mockJob);
+    JobStatus status = new JobStatus(null, 0, 0, 0, 0, jobState,
+        JobPriority.HIGH, null, null, null, null);
+    when(mockJob.getStatus()).thenReturn(status);
+    return mockJob;
+  }
 }