Explorar o código

YARN-8330. Improved publishing ALLOCATED events to ATS.
Contributed by Suma Shivaprasad

Eric Yang %!s(int64=6) %!d(string=hai) anos
pai
achega
f93ecf5c1e

+ 35 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java

@@ -244,23 +244,13 @@ public class RMContainerImpl implements RMContainer {
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
 
-    saveNonAMContainerMetaInfo = rmContext.getYarnConfiguration().getBoolean(
-       YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
-       YarnConfiguration
-                 .DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
+    saveNonAMContainerMetaInfo =
+        shouldPublishNonAMContainerEventstoATS(rmContext);
 
     if (container.getId() != null) {
       rmContext.getRMApplicationHistoryWriter().containerStarted(this);
     }
 
-    // If saveNonAMContainerMetaInfo is true, store system metrics for all
-    // containers. If false, and if this container is marked as the AM, metrics
-    // will still be published for this container, but that calculation happens
-    // later.
-    if (saveNonAMContainerMetaInfo && null != container.getId()) {
-      rmContext.getSystemMetricsPublisher().containerCreated(
-          this, this.creationTime);
-    }
     if (this.container != null) {
       this.allocationTags = this.container.getAllocationTags();
     }
@@ -590,8 +580,12 @@ public class RMContainerImpl implements RMContainer {
           container.getNodeId(), container.getContainerId(),
           container.getAllocationTags());
 
-      container.eventHandler.handle(new RMAppAttemptEvent(
-          container.appAttemptId, RMAppAttemptEventType.CONTAINER_ALLOCATED));
+      container.eventHandler.handle(
+          new RMAppAttemptEvent(container.appAttemptId,
+              RMAppAttemptEventType.CONTAINER_ALLOCATED));
+
+      publishNonAMContainerEventstoATS(container);
+
     }
   }
 
@@ -610,9 +604,11 @@ public class RMContainerImpl implements RMContainer {
       // Tell the app
       container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
           .getApplicationAttemptId().getApplicationId(), container.nodeId));
+
+      publishNonAMContainerEventstoATS(container);
     }
   }
-  
+
   private static final class ContainerAcquiredWhileRunningTransition extends
       BaseTransition {
 
@@ -718,17 +714,12 @@ public class RMContainerImpl implements RMContainer {
         container);
 
       boolean saveNonAMContainerMetaInfo =
-          container.rmContext.getYarnConfiguration().getBoolean(
-              YarnConfiguration
-                .APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
-              YarnConfiguration
-                .DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
+          shouldPublishNonAMContainerEventstoATS(container.rmContext);
 
       if (saveNonAMContainerMetaInfo || container.isAMContainer()) {
         container.rmContext.getSystemMetricsPublisher().containerFinished(
             container, container.finishTime);
       }
-
     }
 
     private static void updateAttemptMetrics(RMContainerImpl container) {
@@ -754,6 +745,29 @@ public class RMContainerImpl implements RMContainer {
     }
   }
 
+  private static boolean shouldPublishNonAMContainerEventstoATS(
+      RMContext rmContext) {
+    return rmContext.getYarnConfiguration().getBoolean(
+        YarnConfiguration.APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO,
+        YarnConfiguration
+            .DEFAULT_APPLICATION_HISTORY_SAVE_NON_AM_CONTAINER_META_INFO);
+  }
+
+  private static void publishNonAMContainerEventstoATS(
+      RMContainerImpl rmContainer) {
+    boolean saveNonAMContainerMetaInfo = shouldPublishNonAMContainerEventstoATS(
+        rmContainer.rmContext);
+
+    // If saveNonAMContainerMetaInfo is true, store system metrics for all
+    // containers. If false, and if this container is marked as the AM, metrics
+    // will still be published for this container, but that calculation happens
+    // later.
+    if (saveNonAMContainerMetaInfo && null != rmContainer.container.getId()) {
+      rmContainer.rmContext.getSystemMetricsPublisher().containerCreated(
+          rmContainer, rmContainer.creationTime);
+    }
+  }
+
   private static final class KillTransition extends FinishedTransition {
 
     @Override
@@ -884,13 +898,5 @@ public class RMContainerImpl implements RMContainer {
     if (containerId != null) {
       rmContext.getRMApplicationHistoryWriter().containerStarted(this);
     }
-    // If saveNonAMContainerMetaInfo is true, store system metrics for all
-    // containers. If false, and if this container is marked as the AM, metrics
-    // will still be published for this container, but that calculation happens
-    // later.
-    if (saveNonAMContainerMetaInfo && null != container.getId()) {
-      rmContext.getSystemMetricsPublisher().containerCreated(
-          this, this.creationTime);
-    }
   }
 }

+ 8 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/TestRMContainerImpl.java

@@ -135,7 +135,6 @@ public class TestRMContainerImpl {
     assertEquals(priority,
         rmContainer.getAllocatedSchedulerKey().getPriority());
     verify(writer).containerStarted(any(RMContainer.class));
-    verify(publisher).containerCreated(any(RMContainer.class), anyLong());
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
@@ -150,6 +149,8 @@ public class TestRMContainerImpl {
         RMContainerEventType.LAUNCHED));
     drainDispatcher.await();
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+    verify(publisher, times(2)).containerCreated(any(RMContainer.class),
+        anyLong());
     assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
         rmContainer.getLogURL());
 
@@ -240,22 +241,25 @@ public class TestRMContainerImpl {
     assertEquals(priority,
         rmContainer.getAllocatedSchedulerKey().getPriority());
     verify(writer).containerStarted(any(RMContainer.class));
-    verify(publisher).containerCreated(any(RMContainer.class), anyLong());
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.START));
     drainDispatcher.await();
     assertEquals(RMContainerState.ALLOCATED, rmContainer.getState());
+    verify(publisher).containerCreated(any(RMContainer.class), anyLong());
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.ACQUIRED));
     drainDispatcher.await();
     assertEquals(RMContainerState.ACQUIRED, rmContainer.getState());
+    verify(publisher, times(2)).containerCreated(any(RMContainer.class),
+        anyLong());
 
     rmContainer.handle(new RMContainerEvent(containerId,
         RMContainerEventType.LAUNCHED));
     drainDispatcher.await();
     assertEquals(RMContainerState.RUNNING, rmContainer.getState());
+
     assertEquals("http://host:3465/node/containerlogs/container_1_0001_01_000001/user",
         rmContainer.getLogURL());
 
@@ -340,7 +344,8 @@ public class TestRMContainerImpl {
     // RMContainer should be publishing system metrics for all containers.
     // Since there is 1 AM container and 1 non-AM container, there should be 2
     // container created events and 2 container finished events.
-    verify(publisher, times(2)).containerCreated(any(RMContainer.class), anyLong());
+    verify(publisher, times(4)).containerCreated(any(RMContainer.class),
+        anyLong());
     verify(publisher, times(2)).containerFinished(any(RMContainer.class), anyLong());
   }