Forráskód Böngészése

MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when their EventHandlers get exceptions. (vinodkv)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1291598 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli 13 éve
szülő
commit
5ee495e6f3
22 módosított fájl, 173 hozzáadás és 117 törlés
  1. 4 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 2 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
  3. 2 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java
  4. 0 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
  5. 2 10
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java
  6. 1 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
  7. 11 1
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
  8. 0 27
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java
  9. 67 36
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java
  10. 4 0
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java
  11. 0 2
      hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java
  12. 6 0
      hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  13. 16 14
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java
  14. 10 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java
  15. 1 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java
  16. 2 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  17. 1 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  18. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
  19. 1 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java
  20. 11 7
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java
  21. 20 10
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
  22. 11 3
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -108,6 +108,7 @@ Release 0.23.2 - UNRELEASED
   OPTIMIZATIONS
 
   BUG FIXES
+
     MAPREDUCE-3862.  Nodemanager can appear to hang on shutdown due to lingering
     DeletionService threads (Jason Lowe via bobby)
 
@@ -125,6 +126,9 @@ Release 0.23.2 - UNRELEASED
 
     MAPREDUCE-3583. Change pid to String and stime to BigInteger in order to
     avoid NumberFormatException caused by overflow.  (Zhihong Yu via szetszwo)
+
+    MAPREDUCE-3634. Fixed all daemons to crash instead of hanging around when
+    their EventHandlers get exceptions. (vinodkv)
  
 Release 0.23.1 - 2012-02-17 
 

+ 2 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java

@@ -188,6 +188,8 @@ public class MRAppMaster extends CompositeService {
   @Override
   public void init(final Configuration conf) {
 
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
     downloadTokensAndSetupUGI(conf);
 
     context = new RunningAppContext(conf);

+ 2 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherEvent.java

@@ -61,7 +61,8 @@ public class ContainerLauncherEvent
 
   @Override
   public String toString() {
-    return super.toString() + " for taskAttempt " + taskAttemptID;
+    return super.toString() + " for container " + containerID + " taskAttempt "
+        + taskAttemptID;
   }
 
   @Override

+ 0 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java

@@ -334,7 +334,6 @@ public class ContainerLauncherImpl extends AbstractService implements
             LOG.error("Returning, interrupted : " + e);
             return;
           }
-
           int poolSize = launcherPool.getCorePoolSize();
 
           // See if we need up the pool size only if haven't reached the

+ 2 - 10
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java

@@ -216,26 +216,18 @@ public class RecoveryService extends CompositeService implements Recovery {
   protected Dispatcher createRecoveryDispatcher() {
     return new RecoveryDispatcher();
   }
-  
-  protected Dispatcher createRecoveryDispatcher(boolean exitOnException) {
-    return new RecoveryDispatcher(exitOnException);
-  }
 
   @SuppressWarnings("rawtypes")
   class RecoveryDispatcher extends AsyncDispatcher {
     private final EventHandler actualHandler;
     private final EventHandler handler;
 
-    RecoveryDispatcher(boolean exitOnException) {
-      super(exitOnException);
+    RecoveryDispatcher() {
+      super();
       actualHandler = super.getEventHandler();
       handler = new InterceptingEventHandler(actualHandler);
     }
 
-    RecoveryDispatcher() {
-      this(false);
-    }
-
     @Override
     @SuppressWarnings("unchecked")
     public void dispatch(Event event) {

+ 1 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java

@@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;

+ 11 - 1
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.mapreduce.v2.app;
 
 import java.io.IOException;
+import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map;
 
@@ -40,6 +41,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.yarn.api.ContainerManager;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerToken;
@@ -49,6 +51,7 @@ import org.junit.Test;
  * Tests the state machine with respect to Job/Task/TaskAttempt failure 
  * scenarios.
  */
+@SuppressWarnings("unchecked")
 public class TestFail {
 
   @Test
@@ -247,10 +250,17 @@ public class TestFail {
       //when attempt times out, heartbeat handler will send the lost event
       //leading to Attempt failure
       return new TaskAttemptListenerImpl(getContext(), null) {
+        @Override
         public void startRpcServer(){};
+        @Override
         public void stopRpcServer(){};
+        @Override
+        public InetSocketAddress getAddress() {
+          return NetUtils.createSocketAddr("localhost", 1234);
+        }
         public void init(Configuration conf) {
-          conf.setInt("mapreduce.task.timeout", 1*1000);//reduce timeout
+          conf.setInt(MRJobConfig.TASK_TIMEOUT, 1*1000);//reduce timeout
+          conf.setInt(MRJobConfig.TASK_TIMEOUT_CHECK_INTERVAL_MS, 1*1000);
           super.init(conf);
         }
       };

+ 0 - 27
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java

@@ -54,12 +54,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
-import org.apache.hadoop.mapreduce.v2.app.recover.Recovery;
-import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.yarn.Clock;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.junit.Test;
 
@@ -724,13 +719,6 @@ public class TestRecovery {
       super(maps, reduces, autoComplete, testName, cleanOnStart, startCount);
     }
 
-    @Override
-    protected Recovery createRecoveryService(AppContext appContext) {
-      return new RecoveryServiceWithCustomDispatcher(
-          appContext.getApplicationAttemptId(), appContext.getClock(),
-          getCommitter());
-    }
-
     @Override
     protected ContainerLauncher createContainerLauncher(AppContext context) {
       MockContainerLauncher launcher = new MockContainerLauncher() {
@@ -757,21 +745,6 @@ public class TestRecovery {
     }
   }
 
-  static class RecoveryServiceWithCustomDispatcher extends RecoveryService {
-
-    public RecoveryServiceWithCustomDispatcher(
-        ApplicationAttemptId applicationAttemptId, Clock clock,
-        OutputCommitter committer) {
-      super(applicationAttemptId, clock, committer);
-    }
-
-    @Override
-    public Dispatcher createRecoveryDispatcher() {
-      return super.createRecoveryDispatcher(false);
-    }
-
-  }
-
   public static void main(String[] arg) throws Exception {
     TestRecovery test = new TestRecovery();
     test.testCrashed();

+ 67 - 36
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/launcher/TestContainerLauncher.java

@@ -23,6 +23,7 @@ import static org.mockito.Mockito.mock;
 import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import junit.framework.Assert;
 
@@ -64,8 +65,6 @@ public class TestContainerLauncher {
       appId, 3);
     JobId jobId = MRBuilderUtils.newJobId(appId, 8);
     TaskId taskId = MRBuilderUtils.newTaskId(jobId, 9, TaskType.MAP);
-    TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 0);
-    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 10);
 
     AppContext context = mock(AppContext.class);
     CustomContainerLauncher containerLauncher = new CustomContainerLauncher(
@@ -83,6 +82,8 @@ public class TestContainerLauncher {
 
     containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
     for (int i = 0; i < 10; i++) {
+      ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, i);
+      TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, i);
       containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
         containerId, "host" + i + ":1234", null,
         ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@@ -92,9 +93,21 @@ public class TestContainerLauncher {
     Assert.assertNull(containerLauncher.foundErrors);
 
     // Same set of hosts, so no change
-    containerLauncher.expectedCorePoolSize = ContainerLauncherImpl.INITIAL_POOL_SIZE;
     containerLauncher.finishEventHandling = true;
+    int timeOut = 0;
+    while (containerLauncher.numEventsProcessed.get() < 10 && timeOut++ < 200) {
+      LOG.info("Waiting for number of events processed to become " + 10
+          + ". It is now " + containerLauncher.numEventsProcessed.get()
+          + ". Timeout is " + timeOut);
+      Thread.sleep(1000);
+    }
+    Assert.assertEquals(10, containerLauncher.numEventsProcessed.get());
+    containerLauncher.finishEventHandling = false;
     for (int i = 0; i < 10; i++) {
+      ContainerId containerId =
+          BuilderUtils.newContainerId(appAttemptId, i + 10);
+      TaskAttemptId taskAttemptId =
+          MRBuilderUtils.newTaskAttemptId(taskId, i + 10);
       containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
         containerId, "host" + i + ":1234", null,
         ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
@@ -106,14 +119,16 @@ public class TestContainerLauncher {
     // Different hosts, there should be an increase in core-thread-pool size to
     // 21(11hosts+10buffer)
     // Core pool size should be 21 but the live pool size should be only 11.
-    containerLauncher.expectedCorePoolSize = 12 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
-    for (int i = 1; i <= 2; i++) {
-      containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
-        containerId, "host1" + i + ":1234", null,
-        ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
-    }
-    waitForEvents(containerLauncher, 22);
-    Assert.assertEquals(12, threadPool.getPoolSize());
+    containerLauncher.expectedCorePoolSize =
+        11 + ContainerLauncherImpl.INITIAL_POOL_SIZE;
+    containerLauncher.finishEventHandling = false;
+    ContainerId containerId = BuilderUtils.newContainerId(appAttemptId, 21);
+    TaskAttemptId taskAttemptId = MRBuilderUtils.newTaskAttemptId(taskId, 21);
+    containerLauncher.handle(new ContainerLauncherEvent(taskAttemptId,
+      containerId, "host11:1234", null,
+      ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH));
+    waitForEvents(containerLauncher, 21);
+    Assert.assertEquals(11, threadPool.getPoolSize());
     Assert.assertNull(containerLauncher.foundErrors);
 
     containerLauncher.stop();
@@ -172,15 +187,15 @@ public class TestContainerLauncher {
 
   private void waitForEvents(CustomContainerLauncher containerLauncher,
       int expectedNumEvents) throws InterruptedException {
-    int timeOut = 20;
-    while (expectedNumEvents != containerLauncher.numEventsProcessed
-        || timeOut++ < 20) {
+    int timeOut = 0;
+    while (containerLauncher.numEventsProcessing.get() < expectedNumEvents
+        && timeOut++ < 20) {
       LOG.info("Waiting for number of events to become " + expectedNumEvents
-          + ". It is now " + containerLauncher.numEventsProcessed);
+          + ". It is now " + containerLauncher.numEventsProcessing.get());
       Thread.sleep(1000);
     }
-    Assert
-      .assertEquals(expectedNumEvents, containerLauncher.numEventsProcessed);
+    Assert.assertEquals(expectedNumEvents,
+      containerLauncher.numEventsProcessing.get());
   }
 
   @Test
@@ -244,9 +259,11 @@ public class TestContainerLauncher {
   private final class CustomContainerLauncher extends ContainerLauncherImpl {
 
     private volatile int expectedCorePoolSize = 0;
-    private volatile int numEventsProcessed = 0;
+    private AtomicInteger numEventsProcessing = new AtomicInteger(0);
+    private AtomicInteger numEventsProcessed = new AtomicInteger(0);
     private volatile String foundErrors = null;
     private volatile boolean finishEventHandling;
+
     private CustomContainerLauncher(AppContext context) {
       super(context);
     }
@@ -255,8 +272,38 @@ public class TestContainerLauncher {
       return super.launcherPool;
     }
 
+    private final class CustomEventProcessor extends
+        ContainerLauncherImpl.EventProcessor {
+      private final ContainerLauncherEvent event;
+
+      private CustomEventProcessor(ContainerLauncherEvent event) {
+        super(event);
+        this.event = event;
+      }
+
+      @Override
+      public void run() {
+        // do nothing substantial
+
+        LOG.info("Processing the event " + event.toString());
+
+        numEventsProcessing.incrementAndGet();
+        // Stall
+        while (!finishEventHandling) {
+          synchronized (this) {
+            try {
+              wait(1000);
+            } catch (InterruptedException e) {
+              ;
+            }
+          }
+        }
+        numEventsProcessed.incrementAndGet();
+      }
+    }
+
     protected ContainerLauncherImpl.EventProcessor createEventProcessor(
-        ContainerLauncherEvent event) {
+        final ContainerLauncherEvent event) {
       // At this point of time, the EventProcessor is being created and so no
       // additional threads would have been created.
 
@@ -266,23 +313,7 @@ public class TestContainerLauncher {
             + launcherPool.getCorePoolSize();
       }
 
-      return new ContainerLauncherImpl.EventProcessor(event) {
-        @Override
-        public void run() {
-          // do nothing substantial
-          numEventsProcessed++;
-          // Stall
-          synchronized(this) {
-            try {
-              while(!finishEventHandling) {
-                wait(1000);
-              }
-            } catch (InterruptedException e) {
-              ;
-            }
-          }
-        }
-      };
+      return new CustomEventProcessor(event);
     }
   }
 

+ 4 - 0
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java

@@ -30,6 +30,7 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.service.CompositeService;
 
 /******************************************************************
@@ -51,6 +52,9 @@ public class JobHistoryServer extends CompositeService {
   @Override
   public synchronized void init(Configuration conf) {
     Configuration config = new YarnConfiguration(conf);
+
+    config.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
     try {
       doSecureLogin(conf);
     } catch(IOException ie) {

+ 0 - 2
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestJobHistoryEvents.java

@@ -53,7 +53,6 @@ public class TestJobHistoryEvents {
   @Test
   public void testHistoryEvents() throws Exception {
     Configuration conf = new Configuration();
-    conf.set(MRJobConfig.USER_NAME, "test");
     MRApp app = new MRAppWithHistory(2, 1, true, this.getClass().getName(), true);
     app.submit(conf);
     Job job = app.getContext().getAllJobs().values().iterator().next();
@@ -102,7 +101,6 @@ public class TestJobHistoryEvents {
   public void testEventsFlushOnStop() throws Exception {
 
     Configuration conf = new Configuration();
-    conf.set(MRJobConfig.USER_NAME, "test");
     MRApp app = new MRAppWithSpecialHistoryHandler(1, 0, true, this
         .getClass().getName(), true);
     app.submit(conf);

+ 6 - 0
hadoop-mapreduce-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -193,6 +193,12 @@
     <Method name="dispatch" />
     <Bug pattern="DM_EXIT" />
   </Match>
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.ResourceManager$SchedulerEventDispatcher$EventProcessor" />
+    <Method name="run" />
+    <Bug pattern="DM_EXIT" />
+  </Match>
+ 
 
   <!-- Ignore heartbeat exception when killing localizer -->
   <Match>

+ 16 - 14
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java

@@ -27,6 +27,7 @@ import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.service.AbstractService;
 
@@ -48,22 +49,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
   private boolean exitOnDispatchException;
 
   public AsyncDispatcher() {
-    this(new HashMap<Class<? extends Enum>, EventHandler>(),
-         new LinkedBlockingQueue<Event>(), true);
-  }
-  
-  public AsyncDispatcher(boolean exitOnException) {
-    this(new HashMap<Class<? extends Enum>, EventHandler>(),
-         new LinkedBlockingQueue<Event>(), exitOnException);
+    this(new LinkedBlockingQueue<Event>());
   }
 
-  AsyncDispatcher(
-      Map<Class<? extends Enum>, EventHandler> eventDispatchers,
-      BlockingQueue<Event> eventQueue, boolean exitOnException) {
+  public AsyncDispatcher(BlockingQueue<Event> eventQueue) {
     super("Dispatcher");
     this.eventQueue = eventQueue;
-    this.eventDispatchers = eventDispatchers;
-    this.exitOnDispatchException = exitOnException;
+    this.eventDispatchers = new HashMap<Class<? extends Enum>, EventHandler>();
   }
 
   Runnable createThread() {
@@ -86,6 +78,14 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
     };
   }
 
+  @Override
+  public synchronized void init(Configuration conf) {
+    this.exitOnDispatchException =
+        conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+          Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
+    super.init(conf);
+  }
+
   @Override
   public void start() {
     //start all the components
@@ -103,7 +103,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       try {
         eventHandlingThread.join();
       } catch (InterruptedException ie) {
-        LOG.debug("Interrupted Exception while stopping", ie);
+        LOG.warn("Interrupted Exception while stopping", ie);
       }
     }
 
@@ -126,8 +126,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
     }
     catch (Throwable t) {
       //TODO Maybe log the state of the queue
-      LOG.fatal("Error in dispatcher thread. Exiting..", t);
+      LOG.fatal("Error in dispatcher thread", t);
       if (exitOnDispatchException) {
+        LOG.info("Exiting, bbye..");
         System.exit(-1);
       }
     }
@@ -177,6 +178,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
       try {
         eventQueue.put(event);
       } catch (InterruptedException e) {
+        LOG.warn("AsyncDispatcher thread interrupted", e);
         throw new YarnException(e);
       }
     };

+ 10 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/Dispatcher.java

@@ -23,8 +23,18 @@ package org.apache.hadoop.yarn.event;
  * event handlers based on event types.
  * 
  */
+@SuppressWarnings("rawtypes")
 public interface Dispatcher {
 
+  // Configuration to make sure dispatcher crashes but doesn't do system-exit in
+  // case of errors. By default, it should be false, so that tests are not
+  // affected. For all daemons it should be explicitly set to true so that
+  // daemons can crash instead of hanging around.
+  public static final String DISPATCHER_EXIT_ON_ERROR_KEY =
+      "yarn.dispatcher.exit-on-error";
+
+  public static final boolean DEFAULT_DISPATCHER_EXIT_ON_ERROR = false;
+
   EventHandler getEventHandler();
 
   void register(Class<? extends Enum> eventType, EventHandler handler);

+ 1 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java

@@ -17,10 +17,8 @@
 */
 package org.apache.hadoop.yarn.event;
 
-import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
 
 @SuppressWarnings("rawtypes")
 public class DrainDispatcher extends AsyncDispatcher {
@@ -36,7 +34,7 @@ public class DrainDispatcher extends AsyncDispatcher {
   }
 
   private DrainDispatcher(BlockingQueue<Event> eventQueue) {
-    super(new HashMap<Class<? extends Enum>, EventHandler>(), eventQueue, true);
+    super(eventQueue);
     this.queue = eventQueue;
   }
 

+ 2 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -99,6 +99,8 @@ public class NodeManager extends CompositeService implements
   @Override
   public void init(Configuration conf) {
 
+    conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
     Context context = new NMContext();
 
     // Create the secretManager if need be.

+ 1 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -137,6 +137,7 @@ public class ContainerManagerImpl extends CompositeService implements
     this.context = context;
     this.dirsHandler = dirsHandler;
 
+    // ContainerManager level dispatcher.
     dispatcher = new AsyncDispatcher();
     this.deletionService = deletionContext;
     this.metrics = metrics;

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java

@@ -376,7 +376,7 @@ public class TestApplication {
 
     WrappedApplication(int id, long timestamp, String user, int numContainers) {
       dispatcher = new DrainDispatcher();
-      dispatcher.init(null);
+      dispatcher.init(new Configuration());
 
       localizerBus = mock(EventHandler.class);
       launcherBus = mock(EventHandler.class);

+ 1 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/TestContainer.java

@@ -517,7 +517,7 @@ public class TestContainer {
     WrappedContainer(int appId, long timestamp, int id, String user,
         boolean withLocalRes, boolean withServiceData) {
       dispatcher = new DrainDispatcher();
-      dispatcher.init(null);
+      dispatcher.init(new Configuration());
 
       localizerBus = mock(EventHandler.class);
       launcherBus = mock(EventHandler.class);

+ 11 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalizedResource.java

@@ -17,6 +17,15 @@
 */
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -29,19 +38,14 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
-
 import org.junit.Test;
-import static org.junit.Assert.*;
-
 import org.mockito.ArgumentMatcher;
-import static org.mockito.Mockito.*;
 
 public class TestLocalizedResource {
 
@@ -62,7 +66,7 @@ public class TestLocalizedResource {
   @SuppressWarnings("unchecked") // mocked generic
   public void testNotification() throws Exception {
     DrainDispatcher dispatcher = new DrainDispatcher();
-    dispatcher.init(null);
+    dispatcher.init(new Configuration());
     try {
       dispatcher.start();
       EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
@@ -175,7 +179,7 @@ public class TestLocalizedResource {
   @Test
   public void testDirectLocalization() throws Exception {
     DrainDispatcher dispatcher = new DrainDispatcher();
-    dispatcher.init(null);
+    dispatcher.init(new Configuration());
     try {
       dispatcher.start();
       LocalResource apiRsrc = createMockResource();

+ 20 - 10
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -18,8 +18,23 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
 
-import java.net.InetSocketAddress;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyShort;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.isNull;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -33,7 +48,6 @@ import java.util.Set;
 
 import junit.framework.Assert;
 
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.AbstractFileSystem;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,6 +56,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -60,7 +75,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
-import org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerHeartbeatResponse;
@@ -81,13 +95,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-
 import org.junit.Test;
-import static org.junit.Assert.*;
-
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
-import static org.mockito.Mockito.*;
 
 public class TestResourceLocalizationService {
 
@@ -98,11 +108,11 @@ public class TestResourceLocalizationService {
   public void testLocalizationInit() throws Exception {
     final Configuration conf = new Configuration();
     AsyncDispatcher dispatcher = new AsyncDispatcher();
-    dispatcher.init(null);
+    dispatcher.init(new Configuration());
 
     ContainerExecutor exec = mock(ContainerExecutor.class);
     DeletionService delService = spy(new DeletionService(exec));
-    delService.init(null);
+    delService.init(new Configuration());
     delService.start();
 
     AbstractFileSystem spylfs =
@@ -371,7 +381,7 @@ public class TestResourceLocalizationService {
 
     DeletionService delServiceReal = new DeletionService(exec);
     DeletionService delService = spy(delServiceReal);
-    delService.init(null);
+    delService.init(new Configuration());
     delService.start();
 
     ResourceLocalizationService rawService =

+ 11 - 3
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -131,6 +131,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
 
     this.conf = conf;
 
+    this.conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);
+
     this.rmDispatcher = createDispatcher();
     addIfService(this.rmDispatcher);
 
@@ -265,6 +267,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
     private final BlockingQueue<SchedulerEvent> eventQueue =
       new LinkedBlockingQueue<SchedulerEvent>();
     private final Thread eventProcessor;
+    private volatile boolean stopped = false;
 
     public SchedulerEventDispatcher(ResourceScheduler scheduler) {
       super(SchedulerEventDispatcher.class.getName());
@@ -285,7 +288,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
 
         SchedulerEvent event;
 
-        while (!Thread.currentThread().isInterrupted()) {
+        while (!stopped && !Thread.currentThread().isInterrupted()) {
           try {
             event = eventQueue.take();
           } catch (InterruptedException e) {
@@ -296,9 +299,13 @@ public class ResourceManager extends CompositeService implements Recoverable {
           try {
             scheduler.handle(event);
           } catch (Throwable t) {
-            LOG.error("Error in handling event type " + event.getType()
+            LOG.fatal("Error in handling event type " + event.getType()
                 + " to the scheduler", t);
-            return; // TODO: Kill RM.
+            if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
+              Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) {
+              LOG.info("Exiting, bbye..");
+              System.exit(-1);
+            }
           }
         }
       }
@@ -306,6 +313,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
 
     @Override
     public synchronized void stop() {
+      this.stopped = true;
       this.eventProcessor.interrupt();
       try {
         this.eventProcessor.join();