Jelajahi Sumber

HADOOP-4737. Adds the KILLED notification when jobs get killed. Contributed by Amareshwari Sriramadasu.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/core/trunk@725729 13f79535-47bb-0310-9956-ffa450edef68
Devaraj Das 16 tahun lalu
induk
melakukan
8eff61009f

+ 3 - 0
CHANGES.txt

@@ -218,6 +218,9 @@ Trunk (unreleased changes)
     HADOOP-4688. Modify the MiniMRDFSSort unit test to spill multiple times,
     exercising the map-side merge code. (cdouglas)
 
+    HADOOP-4737. Adds the KILLED notification when jobs get killed.
+    (Amareshwari Sriramadasu via ddas)
+
   OPTIMIZATIONS
 
     HADOOP-3293. Fixes FileInputFormat to do provide locations for splits

+ 2 - 1
src/mapred/org/apache/hadoop/mapred/JobEndNotifier.java

@@ -105,7 +105,8 @@ public class JobEndNotifier {
       }
       if (uri.contains("$jobStatus")) {
         String statusStr =
-          (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" : "FAILED";
+          (status.getRunState() == JobStatus.SUCCEEDED) ? "SUCCEEDED" : 
+            (status.getRunState() == JobStatus.FAILED) ? "FAILED" : "KILLED";
         uri = uri.replace("$jobStatus", statusStr);
       }
       notification = new JobEndStatusInfo(uri, retryAttempts, retryInterval);

+ 62 - 36
src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java

@@ -62,6 +62,7 @@ class LocalJobRunner implements JobSubmissionProtocol {
     private JobProfile profile;
     private Path localFile;
     private FileSystem localFs;
+    boolean killed = false;
     
     // Counters summed over all the map/reduce tasks which
     // have successfully completed
@@ -100,6 +101,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
     
     @Override
     public void run() {
+      JobContext jContext = new JobContext(conf);
+      OutputCommitter outputCommitter = job.getOutputCommitter();
       try {
         // split input into minimum number of splits
         InputSplit[] splits;
@@ -112,33 +115,35 @@ class LocalJobRunner implements JobSubmissionProtocol {
           numReduceTasks = 1;
           job.setNumReduceTasks(1);
         }
-        JobContext jContext = new JobContext(conf);
-        OutputCommitter outputCommitter = job.getOutputCommitter();
         outputCommitter.setupJob(jContext);
         status.setSetupProgress(1.0f);
         
         DataOutputBuffer buffer = new DataOutputBuffer();
         for (int i = 0; i < splits.length; i++) {
-          TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
-          mapIds.add(mapId);
-          buffer.reset();
-          splits[i].write(buffer);
-          BytesWritable split = new BytesWritable();
-          split.set(buffer.getData(), 0, buffer.getLength());
-          MapTask map = new MapTask(file.toString(),  
-                                    mapId, i,
-                                    splits[i].getClass().getName(),
-                                    split);
-          JobConf localConf = new JobConf(job);
-          map.setJobFile(localFile.toString());
-          map.localizeConfiguration(localConf);
-          map.setConf(localConf);
-          map_tasks += 1;
-          myMetrics.launchMap(mapId);
-          map.run(localConf, this);
-          myMetrics.completeMap(mapId);
-          map_tasks -= 1;
-          updateCounters(map);
+          if (!this.isInterrupted()) {
+            TaskAttemptID mapId = new TaskAttemptID(new TaskID(jobId, true, i),0);  
+            mapIds.add(mapId);
+            buffer.reset();
+            splits[i].write(buffer);
+            BytesWritable split = new BytesWritable();
+            split.set(buffer.getData(), 0, buffer.getLength());
+            MapTask map = new MapTask(file.toString(),  
+                                      mapId, i,
+                                      splits[i].getClass().getName(),
+                                      split);
+            JobConf localConf = new JobConf(job);
+            map.setJobFile(localFile.toString());
+            map.localizeConfiguration(localConf);
+            map.setConf(localConf);
+            map_tasks += 1;
+            myMetrics.launchMap(mapId);
+            map.run(localConf, this);
+            myMetrics.completeMap(mapId);
+            map_tasks -= 1;
+            updateCounters(map);
+          } else {
+            throw new InterruptedException();
+          }
         }
         TaskAttemptID reduceId = 
           new TaskAttemptID(new TaskID(jobId, false, 0), 0);
@@ -146,19 +151,23 @@ class LocalJobRunner implements JobSubmissionProtocol {
           if (numReduceTasks > 0) {
             // move map output to reduce input  
             for (int i = 0; i < mapIds.size(); i++) {
-              TaskAttemptID mapId = mapIds.get(i);
-              Path mapOut = this.mapoutputFile.getOutputFile(mapId);
-              Path reduceIn = this.mapoutputFile.getInputFileForWrite(mapId.getTaskID(),reduceId,
-                  localFs.getLength(mapOut));
-              if (!localFs.mkdirs(reduceIn.getParent())) {
-                throw new IOException("Mkdirs failed to create "
-                    + reduceIn.getParent().toString());
+              if (!this.isInterrupted()) {
+                TaskAttemptID mapId = mapIds.get(i);
+                Path mapOut = this.mapoutputFile.getOutputFile(mapId);
+                Path reduceIn = this.mapoutputFile.getInputFileForWrite(
+                                  mapId.getTaskID(),reduceId,
+                                  localFs.getLength(mapOut));
+                if (!localFs.mkdirs(reduceIn.getParent())) {
+                  throw new IOException("Mkdirs failed to create "
+                      + reduceIn.getParent().toString());
+                }
+                if (!localFs.rename(mapOut, reduceIn))
+                  throw new IOException("Couldn't rename " + mapOut);
+              } else {
+                throw new InterruptedException();
               }
-              if (!localFs.rename(mapOut, reduceIn))
-                throw new IOException("Couldn't rename " + mapOut);
             }
-
-            {
+            if (!this.isInterrupted()) {
               ReduceTask reduce = new ReduceTask(file.toString(), 
                                                  reduceId, 0, mapIds.size());
               JobConf localConf = new JobConf(job);
@@ -171,6 +180,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
               myMetrics.completeReduce(reduce.getTaskID());
               reduce_tasks -= 1;
               updateCounters(reduce);
+            } else {
+              throw new InterruptedException();
             }
           }
         } finally {
@@ -185,12 +196,26 @@ class LocalJobRunner implements JobSubmissionProtocol {
         outputCommitter.cleanupJob(jContext);
         status.setCleanupProgress(1.0f);
 
-        this.status.setRunState(JobStatus.SUCCEEDED);
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.SUCCEEDED);
+        }
 
         JobEndNotifier.localRunnerNotification(job, status);
 
       } catch (Throwable t) {
-        this.status.setRunState(JobStatus.FAILED);
+        try {
+          outputCommitter.cleanupJob(jContext);
+        } catch (IOException ioe) {
+          LOG.info("Error cleaning up job:" + id);
+        }
+        status.setCleanupProgress(1.0f);
+        if (killed) {
+          this.status.setRunState(JobStatus.KILLED);
+        } else {
+          this.status.setRunState(JobStatus.FAILED);
+        }
         LOG.warn(id, t);
 
         JobEndNotifier.localRunnerNotification(job, status);
@@ -307,7 +332,8 @@ class LocalJobRunner implements JobSubmissionProtocol {
   }
 
   public void killJob(JobID id) {
-    jobs.get(id).stop();
+    jobs.get(id).killed = true;
+    jobs.get(id).interrupt();
   }
 
   public void setJobPriority(JobID id, String jp) throws IOException {

+ 34 - 1
src/test/org/apache/hadoop/mapred/NotificationTestCase.java

@@ -100,7 +100,24 @@ public abstract class NotificationTestCase extends HadoopTestCase {
 
     protected void doGet(HttpServletRequest req, HttpServletResponse res)
       throws ServletException, IOException {
-      if (counter == 0) {
+      switch (counter) {
+        case 0:
+        {
+          assertTrue(req.getQueryString().contains("SUCCEEDED"));
+        }
+        break;
+        case 2:
+        {
+          assertTrue(req.getQueryString().contains("KILLED"));
+        }
+        break;
+        case 4:
+        {
+          assertTrue(req.getQueryString().contains("FAILED"));
+        }
+        break;
+      }
+      if (counter % 2 == 0) {
         stdPrintln((new Date()).toString() +
                    "Receiving First notification for [" + req.getQueryString() +
                    "], returning error");
@@ -148,6 +165,22 @@ public abstract class NotificationTestCase extends HadoopTestCase {
       Thread.currentThread().sleep(2000);
     }
     assertEquals(2, NotificationServlet.counter);
+    
+    // run a job with KILLED status
+    System.out.println(TestJobKillAndFail.runJobKill(this.createJobConf()));
+    synchronized(Thread.currentThread()) {
+      stdPrintln("Sleeping for 2 seconds to give time for retry");
+      Thread.currentThread().sleep(2000);
+    }
+    assertEquals(4, NotificationServlet.counter);
+    
+    // run a job with FAILED status
+    System.out.println(TestJobKillAndFail.runJobFail(this.createJobConf()));
+    synchronized(Thread.currentThread()) {
+      stdPrintln("Sleeping for 2 seconds to give time for retry");
+      Thread.currentThread().sleep(2000);
+    }
+    assertEquals(6, NotificationServlet.counter);
   }
 
   private String launchWordCount(JobConf conf,