ソースを参照

YARN-139. Interrupted Exception within AsyncDispatcher leads to user confusion. Contributed by Vinod Kumar Vavilapalli

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1401726 13f79535-47bb-0310-9956-ffa450edef68
Jason Darrell Lowe 12 年 前
コミット
614a743fea

+ 56 - 41
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.CompositeService;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -398,52 +400,65 @@ public class MRAppMaster extends CompositeService {
   protected void sysexit() {
     System.exit(0);
   }
-  
-  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
-    @Override
-    public void handle(JobFinishEvent event) {
-      // job has finished
-      // this is the only job, so shut down the Appmaster
-      // note in a workflow scenario, this may lead to creation of a new
-      // job (FIXME?)
-      // Send job-end notification
-      if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
-        try {
-          LOG.info("Job end notification started for jobID : "
-              + job.getReport().getJobId());
-          JobEndNotifier notifier = new JobEndNotifier();
-          notifier.setConf(getConfig());
-          notifier.notify(job.getReport());
-        } catch (InterruptedException ie) {
-          LOG.warn("Job end notification interrupted for jobID : "
-              + job.getReport().getJobId(), ie);
-        }
-      }
 
-      // TODO:currently just wait for some time so clients can know the
-      // final states. Will be removed once RM come on.
+  @VisibleForTesting
+  public void shutDownJob() {
+    // job has finished
+    // this is the only job, so shut down the Appmaster
+    // note in a workflow scenario, this may lead to creation of a new
+    // job (FIXME?)
+    // Send job-end notification
+    if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
       try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
+        LOG.info("Job end notification started for jobID : "
+            + job.getReport().getJobId());
+        JobEndNotifier notifier = new JobEndNotifier();
+        notifier.setConf(getConfig());
+        notifier.notify(job.getReport());
+      } catch (InterruptedException ie) {
+        LOG.warn("Job end notification interrupted for jobID : "
+            + job.getReport().getJobId(), ie);
       }
+    }
 
-      try {
-        //We are finishing cleanly so this is the last retry
-        isLastAMRetry = true;
-        // Stop all services
-        // This will also send the final report to the ResourceManager
-        LOG.info("Calling stop for all the services");
-        stop();
-
-      } catch (Throwable t) {
-        LOG.warn("Graceful stop failed ", t);
-      }
+    // TODO:currently just wait for some time so clients can know the
+    // final states. Will be removed once RM come on.
+    try {
+      Thread.sleep(5000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    try {
+      //We are finishing cleanly so this is the last retry
+      isLastAMRetry = true;
+      // Stop all services
+      // This will also send the final report to the ResourceManager
+      LOG.info("Calling stop for all the services");
+      MRAppMaster.this.stop();
+
+    } catch (Throwable t) {
+      LOG.warn("Graceful stop failed ", t);
+    }
 
-      //Bring the process down by force.
-      //Not needed after HADOOP-7140
-      LOG.info("Exiting MR AppMaster..GoodBye!");
-      sysexit();
+    //Bring the process down by force.
+    //Not needed after HADOOP-7140
+    LOG.info("Exiting MR AppMaster..GoodBye!");
+    sysexit();   
+  }
+ 
+  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+    @Override
+    public void handle(JobFinishEvent event) {
+      // Create a new thread to shutdown the AM. We should not do it in-line
+      // to avoid blocking the dispatcher itself.
+      new Thread() {
+        
+        @Override
+        public void run() {
+          shutDownJob();
+        }
+      }.start();
     }
   }
   

+ 2 - 8
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

@@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.app;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -68,7 +65,6 @@ import org.junit.Test;
    private Path stagingJobPath = new Path(stagingJobDir);
    private final static RecordFactory recordFactory = RecordFactoryProvider.
        getRecordFactory(null);
-   private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
    
    @Test
    public void testDeletionofStaging() throws IOException {
@@ -86,9 +82,7 @@ import org.junit.Test;
      jobid.setAppId(appId);
      MRAppMaster appMaster = new TestMRApp(attemptId);
      appMaster.init(conf);
-     EventHandler<JobFinishEvent> handler = 
-         appMaster.createJobFinishEventHandler();
-     handler.handle(new JobFinishEvent(jobid));
+     appMaster.shutDownJob();
      verify(fs).delete(stagingJobPath, true);
    }
    

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -182,6 +182,9 @@ Release 0.23.5 - UNRELEASED
     YARN-180. Capacity scheduler - containers that get reserved create 
     container token to early (acmurthy and bobby)
 
+    YARN-139. Interrupted Exception within AsyncDispatcher leads to user
+    confusion. (Vinod Kumar Vavilapalli via jlowe)
+
 Release 0.23.4 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java

@@ -68,7 +68,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
           try {
             event = eventQueue.take();
           } catch(InterruptedException ie) {
-            LOG.warn("AsyncDispatcher thread interrupted", ie);
+            if (!stopped) {
+              LOG.warn("AsyncDispatcher thread interrupted", ie);
+            }
             return;
           }
           if (event != null) {
@@ -180,7 +182,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       try {
         eventQueue.put(event);
       } catch (InterruptedException e) {
-        LOG.warn("AsyncDispatcher thread interrupted", e);
+        if (!stopped) {
+          LOG.warn("AsyncDispatcher thread interrupted", e);
+        }
         throw new YarnException(e);
       }
     };