Bläddra i källkod

YARN-7087. NM failed to perform log aggregation due to absent container. Contributed by Jason Lowe.

(cherry picked from commit e864f81471407a384395fefe1ceb3b66fc7f87f2)
Eric Payne 7 år sedan
förälder
incheckning
8eb39f775c

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -495,7 +495,7 @@ public class ContainerImpl implements Container {
     eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
     // Tell the logService too
     eventHandler.handle(new LogHandlerContainerFinishedEvent(
-      containerId, exitCode));
+        containerId, containerTokenIdentifier.getContainerType(), exitCode));
   }
 
   @SuppressWarnings("unchecked") // dispatcher not typed

+ 3 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java

@@ -56,7 +56,6 @@ import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
@@ -426,7 +425,8 @@ public class LogAggregationService extends AbstractService implements
     return this.appLogAggregators.size();
   }
 
-  private void stopContainer(ContainerId containerId, int exitCode) {
+  private void stopContainer(ContainerId containerId,
+      ContainerType containerType, int exitCode) {
 
     // A container is complete. Put this containers' logs up for aggregation if
     // this containers' logs are needed.
@@ -437,14 +437,6 @@ public class LogAggregationService extends AbstractService implements
           + ", did it fail to start?");
       return;
     }
-    Container container = context.getContainers().get(containerId);
-    if (null == container) {
-      LOG.warn("Log aggregation cannot be started for " + containerId
-          + ", as its an absent container");
-      return;
-    }
-    ContainerType containerType =
-        container.getContainerTokenIdentifier().getContainerType();
     aggregator.startContainerLogAggregation(
         new ContainerLogContext(containerId, containerType, exitCode));
   }
@@ -482,6 +474,7 @@ public class LogAggregationService extends AbstractService implements
         LogHandlerContainerFinishedEvent containerFinishEvent =
             (LogHandlerContainerFinishedEvent) event;
         stopContainer(containerFinishEvent.getContainerId(),
+            containerFinishEvent.getContainerType(),
             containerFinishEvent.getExitCode());
         break;
       case APPLICATION_FINISHED:

+ 8 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/event/LogHandlerContainerFinishedEvent.java

@@ -19,16 +19,19 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 
 public class LogHandlerContainerFinishedEvent extends LogHandlerEvent {
 
   private final ContainerId containerId;
+  private final ContainerType containerType;
   private final int exitCode;
 
   public LogHandlerContainerFinishedEvent(ContainerId containerId,
-      int exitCode) {
+      ContainerType containerType, int exitCode) {
     super(LogHandlerEventType.CONTAINER_FINISHED);
     this.containerId = containerId;
+    this.containerType = containerType;
     this.exitCode = exitCode;
   }
 
@@ -36,6 +39,10 @@ public class LogHandlerContainerFinishedEvent extends LogHandlerEvent {
     return this.containerId;
   }
 
+  public ContainerType getContainerType() {
+    return containerType;
+  }
+
   public int getExitCode() {
     return this.exitCode;
   }

+ 50 - 55
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java

@@ -133,7 +133,6 @@ import org.mortbay.util.MultiException;
 
 import com.google.common.base.Supplier;
 
-//@Ignore
 public class TestLogAggregationService extends BaseContainerManagerTest {
 
   private Map<ApplicationAccessType, String> acls = createAppAcls();
@@ -199,13 +198,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container11 = createContainer(appAttemptId, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container11 = ContainerId.newContainerId(appAttemptId, 1);
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11, new String[] { "stdout",
         "stderr", "syslog" });
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container11, 0));
+        new LogHandlerContainerFinishedEvent(container11,
+            ContainerType.APPLICATION_MASTER, 0));
 
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application1));
@@ -321,11 +320,11 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     ApplicationAttemptId appAttemptId =
         BuilderUtils.newApplicationAttemptId(app, 1);
-    ContainerId cont = createContainer(appAttemptId, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId cont = ContainerId.newContainerId(appAttemptId, 1);
     writeContainerLogs(appLogDir, cont, new String[] { "stdout",
         "stderr", "syslog" });
-    logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont, 0));
+    logAggregationService.handle(new LogHandlerContainerFinishedEvent(cont,
+        ContainerType.APPLICATION_MASTER, 0));
     logAggregationService.handle(new LogHandlerAppFinishedEvent(app));
     logAggregationService.stop();
     delSrvc.stop();
@@ -407,13 +406,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container11 = createContainer(appAttemptId1, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container11 = ContainerId.newContainerId(appAttemptId1, 1);
 
     // Simulate log-file creation
     writeContainerLogs(app1LogDir, container11, fileNames);
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container11, 0));
+        new LogHandlerContainerFinishedEvent(container11,
+            ContainerType.APPLICATION_MASTER, 0));
 
     ApplicationId application2 = BuilderUtils.newApplicationId(1234, 2);
     ApplicationAttemptId appAttemptId2 =
@@ -430,19 +429,19 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     logAggregationService.handle(new LogHandlerAppStartedEvent(
         application2, this.user, null, this.acls, contextWithAMOnly));
 
-    ContainerId container21 = createContainer(appAttemptId2, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container21 = ContainerId.newContainerId(appAttemptId2, 1);
 
     writeContainerLogs(app2LogDir, container21, fileNames);
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container21, 0));
+        new LogHandlerContainerFinishedEvent(container21,
+            ContainerType.APPLICATION_MASTER, 0));
 
-    ContainerId container12 = createContainer(appAttemptId1, 2,
-        ContainerType.TASK);
+    ContainerId container12 = ContainerId.newContainerId(appAttemptId1, 2);
 
     writeContainerLogs(app1LogDir, container12, fileNames);
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container12, 0));
+        new LogHandlerContainerFinishedEvent(container12,
+            ContainerType.TASK, 0));
 
     ApplicationId application3 = BuilderUtils.newApplicationId(1234, 3);
     ApplicationAttemptId appAttemptId3 =
@@ -474,29 +473,29 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     checkEvents(appEventHandler, expectedInitEvents, false, "getType", "getApplicationID");
     reset(appEventHandler);
     
-    ContainerId container31 = createContainer(appAttemptId3, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container31 = ContainerId.newContainerId(appAttemptId3, 1);
     writeContainerLogs(app3LogDir, container31, fileNames);
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container31, 0));
+        new LogHandlerContainerFinishedEvent(container31,
+            ContainerType.APPLICATION_MASTER, 0));
 
-    ContainerId container32 = createContainer(appAttemptId3, 2,
-        ContainerType.TASK);
+    ContainerId container32 = ContainerId.newContainerId(appAttemptId3, 2);
     writeContainerLogs(app3LogDir, container32, fileNames);
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container32, 1)); // Failed 
+        new LogHandlerContainerFinishedEvent(container32,
+            ContainerType.TASK, 1)); // Failed
 
-    ContainerId container22 = createContainer(appAttemptId2, 2,
-        ContainerType.TASK);
+    ContainerId container22 = ContainerId.newContainerId(appAttemptId2, 2);
     writeContainerLogs(app2LogDir, container22, fileNames);
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container22, 0));
+        new LogHandlerContainerFinishedEvent(container22,
+            ContainerType.TASK, 0));
 
-    ContainerId container33 = createContainer(appAttemptId3, 3,
-        ContainerType.TASK);
+    ContainerId container33 = ContainerId.newContainerId(appAttemptId3, 3);
     writeContainerLogs(app3LogDir, container33, fileNames);
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container33, 0));
+        new LogHandlerContainerFinishedEvent(container33,
+            ContainerType.TASK, 0));
 
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         application2));
@@ -750,7 +749,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     // verify trying to collect logs for containers/apps we don't know about
     // doesn't blow up and tear down the NM
     logAggregationService.handle(new LogHandlerContainerFinishedEvent(
-        BuilderUtils.newContainerId(4, 1, 1, 1), 0));
+        BuilderUtils.newContainerId(4, 1, 1, 1),
+        ContainerType.APPLICATION_MASTER, 0));
     dispatcher.await();
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         BuilderUtils.newApplicationId(1, 5)));
@@ -802,7 +802,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     // verify trying to collect logs for containers/apps we don't know about
     // doesn't blow up and tear down the NM
     logAggregationService.handle(new LogHandlerContainerFinishedEvent(
-        BuilderUtils.newContainerId(4, 1, 1, 1), 0));
+        BuilderUtils.newContainerId(4, 1, 1, 1),
+        ContainerType.APPLICATION_MASTER, 0));
     dispatcher.await();
     logAggregationService.handle(new LogHandlerAppFinishedEvent(
         BuilderUtils.newApplicationId(1, 5)));
@@ -1325,14 +1326,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
 
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId container1 = createContainer(appAttemptId1, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container1 = ContainerId.newContainerId(appAttemptId1, 1);
 
     // Simulate log-file creation
     writeContainerLogs(appLogDir1, container1, new String[] { "stdout",
         "stderr", "syslog" });
     logAggregationService.handle(new LogHandlerContainerFinishedEvent(
-      container1, 0));
+        container1, ContainerType.APPLICATION_MASTER, 0));
 
     // LogContext for application2 has excludePatten which includes
     // stdout and syslog.
@@ -1348,13 +1348,13 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application2,
       this.user, null, this.acls, LogAggregationContextWithExcludePatterns));
-    ContainerId container2 = createContainer(appAttemptId2, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container2 = ContainerId.newContainerId(appAttemptId2, 1);
 
     writeContainerLogs(app2LogDir, container2, new String[] { "stdout",
         "stderr", "syslog" });
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container2, 0));
+        new LogHandlerContainerFinishedEvent(container2,
+            ContainerType.APPLICATION_MASTER, 0));
 
     // LogContext for application3 has includePattern which is *.log and
     // excludePatten which includes std.log and sys.log.
@@ -1373,12 +1373,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application3,
       this.user, null, this.acls, context1));
-    ContainerId container3 = createContainer(appAttemptId3, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container3 = ContainerId.newContainerId(appAttemptId3, 1);
     writeContainerLogs(app3LogDir, container3, new String[] { "stdout",
         "sys.log", "std.log", "out.log", "err.log", "log" });
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container3, 0));
+        new LogHandlerContainerFinishedEvent(container3,
+            ContainerType.APPLICATION_MASTER, 0));
 
     // LogContext for application4 has includePattern
     // which includes std.log and sys.log and
@@ -1398,12 +1398,12 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         AMOnlyLogAggregationPolicy.class.getName());
     logAggregationService.handle(new LogHandlerAppStartedEvent(application4,
       this.user, null, this.acls, context2));
-    ContainerId container4 = createContainer(appAttemptId4, 1,
-        ContainerType.APPLICATION_MASTER);
+    ContainerId container4 = ContainerId.newContainerId(appAttemptId4, 1);
     writeContainerLogs(app4LogDir, container4, new String[] { "stdout",
         "sys.log", "std.log", "out.log", "err.log", "log" });
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container4, 0));
+        new LogHandlerContainerFinishedEvent(container4,
+            ContainerType.APPLICATION_MASTER, 0));
 
     dispatcher.await();
     ApplicationEvent expectedInitEvents[] =
@@ -1530,7 +1530,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
         new ContainerId[] {container}, logFiles, 1, true);
 
     logAggregationService.handle(
-        new LogHandlerContainerFinishedEvent(container, 0));
+        new LogHandlerContainerFinishedEvent(container,
+            ContainerType.APPLICATION_MASTER, 0));
 
     dispatcher.await();
 
@@ -1654,14 +1655,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(appId, 1);
     ContainerId containerId = BuilderUtils.newContainerId(appAttemptId1, 2l);
-    try {
-      logAggregationService.handle(new LogHandlerContainerFinishedEvent(
-          containerId, 100));
-      assertTrue("Should skip when null containerID", true);
-    } catch (Exception e) {
-      Assert.assertFalse("Exception not expected should skip null containerid",
-          true);
-    }
+    logAggregationService.handle(new LogHandlerContainerFinishedEvent(
+        containerId, ContainerType.APPLICATION_MASTER, 100));
   }
 
   @Test (timeout = 50000)
@@ -1986,8 +1981,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
       long cId, int exitCode, String[] logFiles) throws IOException {
     ApplicationAttemptId appAttemptId1 =
         BuilderUtils.newApplicationAttemptId(application1, 1);
-    ContainerId containerId = createContainer(appAttemptId1, cId,
-        containerType);
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId1, cId);
     // Simulate log-file creation
     File appLogDir1 =
         new File(localLogDir, application1.toString());
@@ -1995,7 +1989,7 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     writeContainerLogs(appLogDir1, containerId, logFiles);
 
     logAggregationService.handle(new LogHandlerContainerFinishedEvent(
-        containerId, exitCode));
+        containerId, containerType, exitCode));
     return containerId;
 
   }
@@ -2185,7 +2179,8 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     writeContainerLogs(appLogDir, container, logFiles3);
 
     logAggregationService.handle(
-      new LogHandlerContainerFinishedEvent(container, 0));
+        new LogHandlerContainerFinishedEvent(container,
+            ContainerType.APPLICATION_MASTER, 0));
 
     dispatcher.await();
     logAggregationService.handle(new LogHandlerAppFinishedEvent(application));

+ 7 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java

@@ -62,6 +62,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.event.InlineDispatcher;
+import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
@@ -152,7 +153,8 @@ public class TestNonAggregatingLogHandler {
 
     logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
 
-    logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+    logHandler.handle(new LogHandlerContainerFinishedEvent(container11,
+        ContainerType.APPLICATION_MASTER, 0));
 
     logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
@@ -192,7 +194,8 @@ public class TestNonAggregatingLogHandler {
 
     logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
 
-    logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+    logHandler.handle(new LogHandlerContainerFinishedEvent(container11,
+        ContainerType.APPLICATION_MASTER, 0));
 
     logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
@@ -361,7 +364,8 @@ public class TestNonAggregatingLogHandler {
     logHandler.start();
 
     logHandler.handle(new LogHandlerAppStartedEvent(appId, user, null, null));
-    logHandler.handle(new LogHandlerContainerFinishedEvent(container11, 0));
+    logHandler.handle(new LogHandlerContainerFinishedEvent(container11,
+        ContainerType.APPLICATION_MASTER, 0));
     logHandler.handle(new LogHandlerAppFinishedEvent(appId));
 
     // simulate a restart and verify deletion is rescheduled