Bläddra i källkod

MAPREDUCE-6895. Job end notification not send due to YarnRuntimeException. Contributed by yunjiong zhao.

Junping Du 8 år sedan
förälder
incheckning
6ed54f3439

+ 0 - 5
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/JobEndNotifier.java

@@ -153,11 +153,6 @@ public class JobEndNotifier implements Configurable {
    */
   public void notify(JobReport jobReport)
     throws InterruptedException {
-    // Do we need job-end notification?
-    if (userUrl == null) {
-      Log.getLog().info("Job end notification URL not set, skipping.");
-      return;
-    }
 
     //Do string replacements for jobId and jobStatus
     if (userUrl.contains(JOB_ID)) {

+ 31 - 20
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -646,6 +646,12 @@ public class MRAppMaster extends CompositeService {
     // note in a workflow scenario, this may lead to creation of a new
     // job (FIXME?)
 
+    JobEndNotifier notifier = null;
+    if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
+      notifier = new JobEndNotifier();
+      notifier.setConf(getConfig());
+    }
+
     try {
       //if isLastAMRetry comes as true, should never set it to false
       if ( !isLastAMRetry){
@@ -660,28 +666,11 @@ public class MRAppMaster extends CompositeService {
       LOG.info("Calling stop for all the services");
       MRAppMaster.this.stop();
 
-      if (isLastAMRetry) {
+      if (isLastAMRetry && notifier != null) {
         // Send job-end notification when it is safe to report termination to
         // users and it is the last AM retry
-        if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
-          try {
-            LOG.info("Job end notification started for jobID : "
-                + job.getReport().getJobId());
-            JobEndNotifier notifier = new JobEndNotifier();
-            notifier.setConf(getConfig());
-            JobReport report = job.getReport();
-            // If unregistration fails, the final state is unavailable. However,
-            // at the last AM Retry, the client will finally be notified FAILED
-            // from RM, so we should let users know FAILED via notifier as well
-            if (!context.hasSuccessfullyUnregistered()) {
-              report.setJobState(JobState.FAILED);
-            }
-            notifier.notify(report);
-          } catch (InterruptedException ie) {
-            LOG.warn("Job end notification interrupted for jobID : "
-                + job.getReport().getJobId(), ie);
-          }
-        }
+        sendJobEndNotify(notifier);
+        notifier = null;
       }
 
       try {
@@ -693,10 +682,32 @@ public class MRAppMaster extends CompositeService {
     } catch (Throwable t) {
       LOG.warn("Graceful stop failed. Exiting.. ", t);
       exitMRAppMaster(1, t);
+    } finally {
+      if (isLastAMRetry && notifier != null) {
+        sendJobEndNotify(notifier);
+      }
     }
     exitMRAppMaster(0, null);
   }
 
+  private void sendJobEndNotify(JobEndNotifier notifier) {
+    try {
+      LOG.info("Job end notification started for jobID : "
+          + job.getReport().getJobId());
+      // If unregistration fails, the final state is unavailable. However,
+      // at the last AM Retry, the client will finally be notified FAILED
+      // from RM, so we should let users know FAILED via notifier as well
+      JobReport report = job.getReport();
+      if (!context.hasSuccessfullyUnregistered()) {
+        report.setJobState(JobState.FAILED);
+      }
+      notifier.notify(report);
+    } catch (InterruptedException ie) {
+      LOG.warn("Job end notification interrupted for jobID : "
+          + job.getReport().getJobId(), ie);
+    }
+  }
+
   /** MRAppMaster exit method which has been instrumented for both runtime and
    *  unit testing.
    * If the main thread has not been started, this method was called from a