Przeglądaj źródła

svn merge -c 1345362. FIXES: MAPREDUCE-4302. NM goes down if error encountered during log aggregation (Daryn Sharp via bobby)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1345363 13f79535-47bb-0310-9956-ffa450edef68
Robert Joseph Evans 13 lat temu
rodzic
commit
5e8a982209
8 zmienionych plików z 279 dodań i 54 usunięć
  1. 3 0
      hadoop-mapreduce-project/CHANGES.txt
  2. 3 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  3. 2 1
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java
  4. 46 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java
  5. 25 9
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  6. 23 2
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
  7. 4 0
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
  8. 173 40
      hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

+ 3 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -434,6 +434,9 @@ Release 0.23.3 - UNRELEASED
     MAPREDUCE-4297. Usersmap file in gridmix should not fail on empty lines
     (Ravi Prakash via bobby)
 
+    MAPREDUCE-4302. NM goes down if error encountered during log aggregation 
+    (Daryn Sharp via bobby)
+
 Release 0.23.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 3 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -524,8 +525,8 @@ public class ContainerManagerImpl extends CompositeService implements
           (CMgrCompletedAppsEvent) event;
       for (ApplicationId appID : appsFinishedEvent.getAppsToCleanup()) {
         this.dispatcher.getEventHandler().handle(
-            new ApplicationEvent(appID,
-                ApplicationEventType.FINISH_APPLICATION));
+            new ApplicationFinishEvent(appID,
+                "Application Killed by ResourceManager"));
       }
       break;
     case FINISH_CONTAINERS:

+ 2 - 1
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationEventType.java

@@ -23,7 +23,7 @@ public enum ApplicationEventType {
   // Source: ContainerManager
   INIT_APPLICATION,
   INIT_CONTAINER,
-  FINISH_APPLICATION,
+  FINISH_APPLICATION, // Source: LogAggregationService if init fails
 
   // Source: ResourceLocalizationService
   APPLICATION_INITED,
@@ -33,5 +33,6 @@ public enum ApplicationEventType {
   APPLICATION_CONTAINER_FINISHED,
 
   // Source: Log Handler
+  APPLICATION_LOG_HANDLING_INITED,
   APPLICATION_LOG_HANDLING_FINISHED
 }

+ 46 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationFinishEvent.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * Finish/abort event
+ */
+public class ApplicationFinishEvent extends ApplicationEvent {
+  private final String diagnostic;
+
+  /**
+   * Application event to abort all containers associated with the app
+   * @param appId to abort containers
+   * @param diagnostic reason for the abort
+   */
+  public ApplicationFinishEvent(ApplicationId appId, String diagnostic) {
+    super(appId, ApplicationEventType.FINISH_APPLICATION);
+    this.diagnostic = diagnostic;
+  }
+
+  /**
+   * Why the app was aborted
+   * @return diagnostic message
+   */
+  public String getDiagnostic() {
+    return diagnostic;
+  }
+}

+ 25 - 9
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -141,6 +141,9 @@ public class ApplicationImpl implements Application {
                    ApplicationState.APPLICATION_RESOURCES_CLEANINGUP),
                ApplicationEventType.FINISH_APPLICATION,
                new AppFinishTriggeredTransition())
+           .addTransition(ApplicationState.INITING, ApplicationState.INITING,
+               ApplicationEventType.APPLICATION_LOG_HANDLING_INITED,
+               new AppLogInitDoneTransition())
            .addTransition(ApplicationState.INITING, ApplicationState.RUNNING,
                ApplicationEventType.APPLICATION_INITED,
                new AppInitDoneTransition())
@@ -192,8 +195,7 @@ public class ApplicationImpl implements Application {
   /**
    * Notify services of new application.
    * 
-   * In particular, this requests that the {@link ResourceLocalizationService}
-   * localize the application-scoped resources.
+   * In particular, this initializes the {@link LogAggregationService}
    */
   @SuppressWarnings("unchecked")
   static class AppInitTransition implements
@@ -203,6 +205,27 @@ public class ApplicationImpl implements Application {
       ApplicationInitEvent initEvent = (ApplicationInitEvent)event;
       app.applicationACLs = initEvent.getApplicationACLs();
       app.aclsManager.addApplication(app.getAppId(), app.applicationACLs);
+      // Inform the logAggregator
+      app.dispatcher.getEventHandler().handle(
+          new LogHandlerAppStartedEvent(app.appId, app.user,
+              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
+              app.applicationACLs)); 
+    }
+  }
+
+  /**
+   * Handles the APPLICATION_LOG_HANDLING_INITED event that occurs after
+   * {@link LogAggregationService} has created the directories for the app
+   * and started the aggregation thread for the app.
+   * 
+   * In particular, this requests that the {@link ResourceLocalizationService}
+   * localize the application-scoped resources.
+   */
+  @SuppressWarnings("unchecked")
+  static class AppLogInitDoneTransition implements
+      SingleArcTransition<ApplicationImpl, ApplicationEvent> {
+    @Override
+    public void transition(ApplicationImpl app, ApplicationEvent event) {
       app.dispatcher.getEventHandler().handle(
           new ApplicationLocalizationEvent(
               LocalizationEventType.INIT_APPLICATION_RESOURCES, app));
@@ -248,13 +271,6 @@ public class ApplicationImpl implements Application {
       SingleArcTransition<ApplicationImpl, ApplicationEvent> {
     @Override
     public void transition(ApplicationImpl app, ApplicationEvent event) {
-
-      // Inform the logAggregator
-      app.dispatcher.getEventHandler().handle(
-          new LogHandlerAppStartedEvent(app.appId, app.user,
-              app.credentials, ContainerLogsRetentionPolicy.ALL_CONTAINERS,
-              app.applicationACLs)); 
-
       // Start all the containers waiting for ApplicationInit
       for (Container container : app.containers.values()) {
         app.dispatcher.getEventHandler().handle(new ContainerInitEvent(

+ 23 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -49,6 +49,9 @@ import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -56,6 +59,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LogAggregationService extends AbstractService implements
@@ -146,13 +150,13 @@ public class LogAggregationService extends AbstractService implements
     try {
       remoteFS = FileSystem.get(conf);
     } catch (IOException e) {
-      throw new YarnException("Unable to get Remote FileSystem isntance", e);
+      throw new YarnException("Unable to get Remote FileSystem instance", e);
     }
     boolean remoteExists = false;
     try {
       remoteExists = remoteFS.exists(this.remoteRootLogDir);
     } catch (IOException e) {
-      throw new YarnException("Failed to check for existance of remoteLogDir ["
+      throw new YarnException("Failed to check for existence of remoteLogDir ["
           + this.remoteRootLogDir + "]");
     }
     if (remoteExists) {
@@ -266,9 +270,26 @@ public class LogAggregationService extends AbstractService implements
     }
   }
 
+  @SuppressWarnings("unchecked")
   private void initApp(final ApplicationId appId, String user,
       Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
       Map<ApplicationAccessType, String> appAcls) {
+    ApplicationEvent eventResponse;
+    try {
+      initAppAggregator(appId, user, credentials, logRetentionPolicy, appAcls);
+      eventResponse = new ApplicationEvent(appId,
+          ApplicationEventType.APPLICATION_LOG_HANDLING_INITED);
+    } catch (YarnException e) {
+      eventResponse = new ApplicationFinishEvent(appId,
+          "Application failed to init aggregation: " + e.getMessage());
+    }
+    this.dispatcher.getEventHandler().handle(eventResponse);
+  }
+
+  @VisibleForTesting
+  public void initAppAggregator(final ApplicationId appId, String user,
+      Credentials credentials, ContainerLogsRetentionPolicy logRetentionPolicy,
+      Map<ApplicationAccessType, String> appAcls) {
 
     // Get user's FileSystem credentials
     UserGroupInformation userUgi =

+ 4 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java

@@ -93,6 +93,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
     super.stop();
   }
 
+  @SuppressWarnings("unchecked")
   @Override
   public void handle(LogHandlerEvent event) {
     switch (event.getType()) {
@@ -101,6 +102,9 @@ public class NonAggregatingLogHandler extends AbstractService implements
             (LogHandlerAppStartedEvent) event;
         this.appOwners.put(appStartedEvent.getApplicationId(),
             appStartedEvent.getUser());
+        this.dispatcher.getEventHandler().handle(
+            new ApplicationEvent(appStartedEvent.getApplicationId(),
+                ApplicationEventType.APPLICATION_LOG_HANDLING_INITED));
         break;
       case CONTAINER_FINISHED:
         // Ignore

+ 173 - 40
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -18,10 +18,7 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 import static junit.framework.Assert.assertEquals;
 import static junit.framework.Assert.assertTrue;
 
@@ -32,6 +29,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Writer;
+import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -47,6 +45,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.yarn.YarnException;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
+import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
@@ -74,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationFinishEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
@@ -81,6 +83,7 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mortbay.util.MultiException;
 
 
 //@Ignore
@@ -112,7 +115,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testLocalFileDeletionAfterUpload() throws IOException {
+  public void testLocalFileDeletionAfterUpload() throws Exception {
     this.delSrvc = new DeletionService(createContainerExecutor());
     this.delSrvc.init(conf);
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
@@ -170,19 +173,23 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         logFilePath.toUri().getPath()).exists());
     
     dispatcher.await();
-    ArgumentCaptor<ApplicationEvent> eventCaptor =
-        ArgumentCaptor.forClass(ApplicationEvent.class);
-    verify(appEventHandler).handle(eventCaptor.capture());
-    assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
-        eventCaptor.getValue().getType());
-    assertEquals(appAttemptId.getApplicationId(), eventCaptor.getValue()
-        .getApplicationID());
     
+    ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+        new ApplicationEvent(
+            appAttemptId.getApplicationId(),
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+        new ApplicationEvent(
+            appAttemptId.getApplicationId(),
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
+    };
+
+    checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
+    dispatcher.stop();
   }
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testNoContainerOnNode() {
+  public void testNoContainerOnNode() throws Exception {
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
         this.remoteRootLogDir.getAbsolutePath());
@@ -218,19 +225,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         .exists());
     
     dispatcher.await();
-    ArgumentCaptor<ApplicationEvent> eventCaptor =
-        ArgumentCaptor.forClass(ApplicationEvent.class);
-    verify(appEventHandler).handle(eventCaptor.capture());
-    assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
-        eventCaptor.getValue().getType());
-    verify(appEventHandler).handle(eventCaptor.capture());
-    assertEquals(application1, eventCaptor.getValue()
-        .getApplicationID());
+    
+    ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+        new ApplicationEvent(
+            application1,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+        new ApplicationEvent(
+            application1,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
+    };
+    checkEvents(appEventHandler, expectedEvents, true, "getType", "getApplicationID");
+    dispatcher.stop();
   }
 
   @Test
   @SuppressWarnings("unchecked")
-  public void testMultipleAppsLogAggregation() throws IOException {
+  public void testMultipleAppsLogAggregation() throws Exception {
 
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
@@ -299,9 +309,22 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     app3LogDir.mkdir();
     logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
         this.user, null,
-        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));
-        
-
+        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));        
+
+    ApplicationEvent expectedInitEvents[] = new ApplicationEvent[]{
+        new ApplicationEvent(
+            application1,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+        new ApplicationEvent(
+            application2,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED),
+        new ApplicationEvent(
+            application3,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)
+    };
+    checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID");
+    reset(appEventHandler);
+    
     ContainerId container31 = BuilderUtils.newContainerId(appAttemptId3, 1);
     writeContainerLogs(app3LogDir, container31);
     logAggregationService.handle(
@@ -339,22 +362,59 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new ContainerId[] { container31, container32 });
     
     dispatcher.await();
-    ArgumentCaptor<ApplicationEvent> eventCaptor =
-        ArgumentCaptor.forClass(ApplicationEvent.class);
-
-    verify(appEventHandler, times(3)).handle(eventCaptor.capture());
-    List<ApplicationEvent> capturedEvents = eventCaptor.getAllValues();
-    Set<ApplicationId> appIds = new HashSet<ApplicationId>();
-    for (ApplicationEvent cap : capturedEvents) {
-      assertEquals(ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED,
-          eventCaptor.getValue().getType());
-      appIds.add(cap.getApplicationID());
-    }
-    assertTrue(appIds.contains(application1));
-    assertTrue(appIds.contains(application2));
-    assertTrue(appIds.contains(application3));
+    
+    ApplicationEvent[] expectedFinishedEvents = new ApplicationEvent[]{
+        new ApplicationEvent(
+            application1,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
+        new ApplicationEvent(
+            application2,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED),
+        new ApplicationEvent(
+            application3,
+            ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)
+    };
+    checkEvents(appEventHandler, expectedFinishedEvents, false, "getType", "getApplicationID");
+    dispatcher.stop();
   }
-
+  
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testLogAggregationInitFailsWithoutKillingNM() throws Exception {
+    
+    this.conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDir.getAbsolutePath());
+    this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+        this.remoteRootLogDir.getAbsolutePath());
+    
+    DrainDispatcher dispatcher = createDispatcher();
+    EventHandler<ApplicationEvent> appEventHandler = mock(EventHandler.class);
+    dispatcher.register(ApplicationEventType.class, appEventHandler);
+    
+    LogAggregationService logAggregationService = spy(
+        new LogAggregationService(dispatcher, this.context, this.delSrvc,
+                                  super.dirsHandler));
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+    
+    ApplicationId appId = BuilderUtils.newApplicationId(
+        System.currentTimeMillis(), (int)Math.random());
+    doThrow(new YarnException("KABOOM!"))
+      .when(logAggregationService).initAppAggregator(
+          eq(appId), eq(user), any(Credentials.class),
+          any(ContainerLogsRetentionPolicy.class), anyMap());
+    
+    logAggregationService.handle(new LogHandlerAppStartedEvent(appId,
+        this.user, null,
+        ContainerLogsRetentionPolicy.AM_AND_FAILED_CONTAINERS_ONLY, this.acls));        
+    
+    dispatcher.await();
+    ApplicationEvent expectedEvents[] = new ApplicationEvent[]{
+        new ApplicationFinishEvent(appId, "Application failed to init aggregation: KABOOM!")
+    };
+    checkEvents(appEventHandler, expectedEvents, false,
+        "getType", "getApplicationID", "getDiagnostic");    
+  }
+  
   private void writeContainerLogs(File appLogDir, ContainerId containerId)
       throws IOException {
     // ContainerLogDir should be created
@@ -599,4 +659,77 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     Assert.assertEquals("Log aggregator failed to cleanup!", 0,
         logAggregationService.getNumAggregators());
   }
+  
+  @SuppressWarnings("unchecked")
+  private static <T extends Event<?>>
+  void checkEvents(EventHandler<T> eventHandler,
+                   T expectedEvents[], boolean inOrder,
+                   String... methods) throws Exception {
+    Class<T> genericClass = (Class<T>)expectedEvents.getClass().getComponentType();
+    ArgumentCaptor<T> eventCaptor = ArgumentCaptor.forClass(genericClass);
+    // captor work work unless used via a verify
+    verify(eventHandler, atLeast(0)).handle(eventCaptor.capture());
+    List<T> actualEvents = eventCaptor.getAllValues();
+
+    // batch up exceptions so junit presents them as one
+    MultiException failures = new MultiException();
+    try {
+      assertEquals("expected events", expectedEvents.length, actualEvents.size());
+    } catch (Throwable e) {
+      failures.add(e);
+    }
+    if (inOrder) {
+    	// sequentially verify the events
+      int len = Math.max(expectedEvents.length, actualEvents.size());
+      for (int n=0; n < len; n++) {
+        try {
+          String expect = (n < expectedEvents.length)
+              ? eventToString(expectedEvents[n], methods) : null;
+          String actual = (n < actualEvents.size())
+              ? eventToString(actualEvents.get(n), methods) : null;
+          assertEquals("event#"+n, expect, actual);
+        } catch (Throwable e) {
+          failures.add(e);
+        }
+      }
+    } else {
+    	// verify the actual events were expected
+    	// verify no expected events were not seen
+      Set<String> expectedSet = new HashSet<String>();
+      for (T expectedEvent : expectedEvents) {
+        expectedSet.add(eventToString(expectedEvent, methods));
+      }
+      for (T actualEvent : actualEvents) {
+        try {
+          String actual = eventToString(actualEvent, methods);
+          assertTrue("unexpected event: "+actual, expectedSet.remove(actual));
+        } catch (Throwable e) {
+          failures.add(e);
+        }
+      }
+      for (String expected : expectedSet) {
+        try {
+          Assert.fail("missing event: "+expected);
+        } catch (Throwable e) {
+          failures.add(e);
+        }
+      }
+    }
+    failures.ifExceptionThrow();
+  }
+  
+  private static String eventToString(Event<?> event, String[] methods) throws Exception {
+    StringBuilder sb = new StringBuilder("[ ");
+    for (String m : methods) {
+      try {
+      	Method method = event.getClass().getMethod(m);
+        String value = method.invoke(event).toString();
+        sb.append(method.getName()).append("=").append(value).append(" ");
+      } catch (Exception e) {
+        // ignore, actual event may not implement the method...
+      }
+    }
+    sb.append("]");
+    return sb.toString();
+  }
 }