Browse Source

YARN-3045. Implement NM writing container lifecycle events to Timeline Service v2. Contributed by Naganarasimha G R.

Junping Du 9 years ago
parent
commit
477a30f536
19 changed files with 614 additions and 117 deletions
  1. 10 0
      hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
  2. 25 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
  3. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  4. 13 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  5. 39 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  6. 11 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java
  7. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
  8. 19 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  9. 6 101
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
  10. 31 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java
  11. 24 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java
  12. 376 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java
  13. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  14. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  15. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
  16. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java
  17. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
  18. 13 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
  19. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml

@@ -115,6 +115,16 @@
     <Bug pattern="BC_UNCONFIRMED_CAST" />
   </Match>
 
+  <!-- Object cast is based on the event type -->
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.metrics.AbstractTimelineServicePublisher" />
+     <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
+
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher$ApplicationEventHandler" />
+     <Bug pattern="BC_UNCONFIRMED_CAST" />
+  </Match>
 
   <!-- Ignore intentional switch fallthroughs -->
   <Match>

+ 25 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.timeline.PluginStoreTestUtils;
 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
@@ -524,9 +525,32 @@ public class TestDistributedShell {
           "container_" + appId.getClusterTimestamp() + "_000" + appId.getId()
               + "_01_000001"
               + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
-      verifyEntityTypeFileExists(basePath,
+      File containerEntityFile = verifyEntityTypeFileExists(basePath,
           TimelineEntityType.YARN_CONTAINER.toString(),
           containerMetricsTimestampFileName);
+      Assert.assertEquals(
+          "Container created event needs to be published atleast once",
+          1,
+          getNumOfStringOccurences(containerEntityFile,
+              ContainerMetricsConstants.CREATED_EVENT_TYPE));
+
+      // to avoid race condition of testcase, atleast check 4 times with sleep
+      // of 500ms
+      long numOfContainerFinishedOccurences = 0;
+      for (int i = 0; i < 4; i++) {
+        numOfContainerFinishedOccurences =
+            getNumOfStringOccurences(containerEntityFile,
+                ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+        if (numOfContainerFinishedOccurences > 0) {
+          break;
+        } else {
+          Thread.sleep(500l);
+        }
+      }
+      Assert.assertEquals(
+          "Container finished event needs to be published atleast once",
+          1,
+          numOfContainerFinishedOccurences);
 
       // Verify RM posting Application life cycle Events are getting published
       String appMetricsTimestampFileName =

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 
 /**
@@ -120,4 +121,8 @@ public interface Context {
   boolean isDistributedSchedulingEnabled();
 
   OpportunisticContainerAllocator getContainerAllocator();
+
+  void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher);
+
+  NMTimelinePublisher getNMTimelinePublisher();
 }

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -75,6 +75,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 
@@ -506,6 +507,8 @@ public class NodeManager extends CompositeService
 
     private final QueuingContext queuingContext;
 
+    private NMTimelinePublisher nmTimelinePublisher;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -678,6 +681,16 @@ public class NodeManager extends CompositeService
         Map<ApplicationId, String> newRegisteredCollectors) {
       this.registeredCollectors.putAll(newRegisteredCollectors);
     }
+
+    @Override
+    public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
+      this.nmTimelinePublisher = nmMetricsPublisher;
+    }
+
+    @Override
+    public NMTimelinePublisher getNMTimelinePublisher() {
+      return nmTimelinePublisher;
+    }
   }
 
   /**

+ 39 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainersLauncherEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.SignalContainersLauncherEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
@@ -144,6 +145,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.security.authorize.NMPolicyProvider;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -190,6 +192,8 @@ public class ContainerManagerImpl extends CompositeService implements
 
   private long waitForContainersOnShutdownMillis;
 
+  private final NMTimelinePublisher nmMetricsPublisher;
+
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
@@ -216,6 +220,8 @@ public class ContainerManagerImpl extends CompositeService implements
     auxiliaryServices.registerServiceListener(this);
     addService(auxiliaryServices);
 
+    nmMetricsPublisher = createNMTimelinePublisher(context);
+    context.setNMTimelinePublisher(nmMetricsPublisher);
     this.containersMonitor = createContainersMonitor(exec);
     addService(this.containersMonitor);
 
@@ -223,13 +229,16 @@ public class ContainerManagerImpl extends CompositeService implements
         new ContainerEventDispatcher());
     dispatcher.register(ApplicationEventType.class,
         createApplicationEventDispatcher());
-    dispatcher.register(LocalizationEventType.class, rsrcLocalizationSrvc);
+    dispatcher.register(LocalizationEventType.class,
+        new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc,
+            nmMetricsPublisher));
     dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
     dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
     dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
     
     addService(dispatcher);
 
+
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -430,6 +439,13 @@ public class ContainerManagerImpl extends CompositeService implements
     return new SharedCacheUploadService();
   }
 
+  @VisibleForTesting
+  protected NMTimelinePublisher createNMTimelinePublisher(Context context) {
+    NMTimelinePublisher nmTimelinePublisherLocal = new NMTimelinePublisher(context);
+    addIfService(nmTimelinePublisherLocal);
+    return nmTimelinePublisherLocal;
+  }
+
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
     return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, this);
@@ -982,9 +998,9 @@ public class ContainerManagerImpl extends CompositeService implements
               logAggregationContext));
         }
 
-        this.context.getNMStateStore().storeContainer(containerId, request);
         dispatcher.getEventHandler().handle(
           new ApplicationContainerInitEvent(container));
+        this.context.getNMStateStore().storeContainer(containerId, request);
 
         this.context.getContainerTokenSecretManager().startContainerSuccessful(
           containerTokenIdentifier);
@@ -1317,6 +1333,7 @@ public class ContainerManagerImpl extends CompositeService implements
       Container c = containers.get(event.getContainerID());
       if (c != null) {
         c.handle(event);
+        nmMetricsPublisher.publishContainerEvent(event);
       } else {
         LOG.warn("Event " + event + " sent to absent container " +
             event.getContainerID());
@@ -1325,7 +1342,6 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   class ApplicationEventDispatcher implements EventHandler<ApplicationEvent> {
-
     @Override
     public void handle(ApplicationEvent event) {
       Application app =
@@ -1333,6 +1349,7 @@ public class ContainerManagerImpl extends CompositeService implements
               event.getApplicationID());
       if (app != null) {
         app.handle(event);
+        nmMetricsPublisher.publishApplicationEvent(event);
       } else {
         LOG.warn("Event " + event + " sent to absent application "
             + event.getApplicationID());
@@ -1340,6 +1357,25 @@ public class ContainerManagerImpl extends CompositeService implements
     }
   }
 
+  private static final class LocalizationEventHandlerWrapper implements
+      EventHandler<LocalizationEvent> {
+
+    private EventHandler<LocalizationEvent> origLocalizationEventHandler;
+    private NMTimelinePublisher timelinePublisher;
+
+    LocalizationEventHandlerWrapper(EventHandler<LocalizationEvent> handler,
+        NMTimelinePublisher publisher) {
+      this.origLocalizationEventHandler = handler;
+      this.timelinePublisher = publisher;
+    }
+
+    @Override
+    public void handle(LocalizationEvent event) {
+      origLocalizationEventHandler.handle(event);
+      timelinePublisher.publishLocalizationEvent(event);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public void handle(ContainerManagerEvent event) {

+ 11 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationContainerFinishedEvent.java

@@ -19,18 +19,23 @@
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.application;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 
 public class ApplicationContainerFinishedEvent extends ApplicationEvent {
-  private ContainerId containerID;
+  private ContainerStatus containerStatus;
 
-  public ApplicationContainerFinishedEvent(
-      ContainerId containerID) {
-    super(containerID.getApplicationAttemptId().getApplicationId(),
+  public ApplicationContainerFinishedEvent(ContainerStatus containerStatus) {
+    super(containerStatus.getContainerId().getApplicationAttemptId().getApplicationId(),
         ApplicationEventType.APPLICATION_CONTAINER_FINISHED);
-    this.containerID = containerID;
+    this.containerStatus = containerStatus;
   }
 
   public ContainerId getContainerID() {
-    return this.containerID;
+    return containerStatus.getContainerId();
   }
+
+  public ContainerStatus getContainerStatus() {
+    return containerStatus;
+  }
+
 }

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java

@@ -26,6 +26,7 @@ import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
@@ -69,4 +70,6 @@ public interface Container extends EventHandler<ContainerEvent> {
 
   String toString();
 
+  Priority getPriority();
+
 }

+ 19 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -77,6 +79,7 @@ import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredContainerStatus;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -197,6 +200,7 @@ public class ContainerImpl implements Container {
     }
 
     stateMachine = stateMachineFactory.make(this);
+    this.context = context;
   }
 
   // constructor for a recovered container
@@ -442,6 +446,10 @@ public class ContainerImpl implements Container {
     }
   }
 
+  public NMTimelinePublisher getNMTimelinePublisher() {
+    return context.getNMTimelinePublisher();
+  }
+
   @Override
   public String getUser() {
     this.readLock.lock();
@@ -575,7 +583,10 @@ public class ContainerImpl implements Container {
     // Inform the application
     @SuppressWarnings("rawtypes")
     EventHandler eventHandler = dispatcher.getEventHandler();
-    eventHandler.handle(new ApplicationContainerFinishedEvent(containerId));
+
+    ContainerStatus containerStatus = cloneAndGetContainerStatus();
+    eventHandler.handle(new ApplicationContainerFinishedEvent(containerStatus));
+
     // Remove the container from the resource-monitor
     eventHandler.handle(new ContainerStopMonitoringEvent(containerId));
     // Tell the logService too
@@ -1187,7 +1198,8 @@ public class ContainerImpl implements Container {
         container.containerMetrics.finished();
       }
       container.sendFinishedEvents();
-      //if the current state is NEW it means the CONTAINER_INIT was never 
+
+      // if the current state is NEW it means the CONTAINER_INIT was never
       // sent for the event, thus no need to send the CONTAINER_STOP
       if (container.getCurrentState()
           != org.apache.hadoop.yarn.api.records.ContainerState.NEW) {
@@ -1384,4 +1396,9 @@ public class ContainerImpl implements Container {
   ContainerRetryContext getContainerRetryContext() {
     return containerRetryContext;
   }
+
+  @Override
+  public Priority getPriority() {
+    return containerTokenIdentifier.getPriority();
+  }
 }

+ 6 - 101
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -18,13 +18,9 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
@@ -36,10 +32,6 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
-import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
-import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -48,13 +40,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 
 import com.google.common.base.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class ContainersMonitorImpl extends AbstractService implements
     ContainersMonitor {
@@ -86,17 +78,11 @@ public class ContainersMonitorImpl extends AbstractService implements
   private boolean pmemCheckEnabled;
   private boolean vmemCheckEnabled;
   private boolean containersMonitorEnabled;
-  
-  private boolean publishContainerMetricsToTimelineService;
 
   private long maxVCoresAllottedForContainers;
 
   private static final long UNKNOWN_MEMORY_LIMIT = -1L;
   private int nodeCpuPercentageForYARN;
-  
-  // For posting entities in new timeline service in a non-blocking way
-  // TODO replace with event loop in TimelineClient.
-  private static ExecutorService threadPool;
 
   @Private
   public static enum ContainerMetric {
@@ -215,22 +201,6 @@ public class ContainersMonitorImpl extends AbstractService implements
                 1) + "). Thrashing might happen.");
       }
     }
-    
-    publishContainerMetricsToTimelineService =
-        YarnConfiguration.systemMetricsPublisherEnabled(conf);
-
-    if (publishContainerMetricsToTimelineService) {
-      LOG.info("NodeManager has been configured to publish container " +
-          "metrics to Timeline Service V2.");
-      threadPool =
-          Executors.newCachedThreadPool(
-              new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
-              .build());
-    } else {
-      LOG.warn("NodeManager has not been configured to publish container " +
-          "metrics to Timeline Service V2.");
-    }
-    
     super.serviceInit(conf);
   }
 
@@ -274,29 +244,8 @@ public class ContainersMonitorImpl extends AbstractService implements
       }
     }
     
-    shutdownAndAwaitTermination();
-    
     super.serviceStop();
   }
-  
-  // TODO remove threadPool after adding non-blocking call in TimelineClient
-  private static void shutdownAndAwaitTermination() {
-    if (threadPool == null) {
-      return;
-    }
-    threadPool.shutdown();
-    try {
-      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
-        threadPool.shutdownNow(); 
-        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
-            LOG.error("ThreadPool did not terminate");
-      }
-    } catch (InterruptedException ie) {
-      threadPool.shutdownNow();
-      // Preserve interrupt status
-      Thread.currentThread().interrupt();
-    }
-  }
 
   public static class ProcessTreeInfo {
     private ContainerId containerId;
@@ -474,9 +423,6 @@ public class ContainersMonitorImpl extends AbstractService implements
           ContainerId containerId = entry.getKey();
           ProcessTreeInfo ptInfo = entry.getValue();
           
-          ContainerEntity entity = new ContainerEntity();
-          entity.setId(containerId.toString());
-          
           try {
             String pId = ptInfo.getPID();
 
@@ -569,26 +515,6 @@ public class ContainersMonitorImpl extends AbstractService implements
                   containerMetricsUnregisterDelayMs).recordCpuUsage
                   ((int)cpuUsagePercentPerCore, milliVcoresUsed);
             }
-
-            if (publishContainerMetricsToTimelineService) {
-              // if currentPmemUsage data is available
-              if (currentPmemUsage != 
-                  ResourceCalculatorProcessTree.UNAVAILABLE) {
-                TimelineMetric memoryMetric = new TimelineMetric();
-                memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
-                memoryMetric.addValue(currentTime, currentPmemUsage);
-                entity.addMetric(memoryMetric);
-              }
-              // if cpuUsageTotalCoresPercentage data is available
-              if (cpuUsageTotalCoresPercentage != 
-                ResourceCalculatorProcessTree.UNAVAILABLE) {
-                TimelineMetric cpuMetric = new TimelineMetric();
-                cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
-                cpuMetric.addValue(currentTime,
-                    cpuUsageTotalCoresPercentage);
-                entity.addMetric(cpuMetric);
-              }
-            }
             
             boolean isMemoryOverLimit = false;
             String msg = "";
@@ -645,23 +571,16 @@ public class ContainersMonitorImpl extends AbstractService implements
               LOG.info("Removed ProcessTree with root " + pId);
             }
 
+            ContainerImpl container =
+                (ContainerImpl) context.getContainers().get(containerId);
+            container.getNMTimelinePublisher().reportContainerResourceUsage(
+                container, currentTime, pId, currentPmemUsage,
+                cpuUsageTotalCoresPercentage);
           } catch (Exception e) {
             // Log the exception and proceed to the next container.
             LOG.warn("Uncaught exception in ContainersMonitorImpl "
                 + "while monitoring resource of " + containerId, e);
           }
-          
-          if (publishContainerMetricsToTimelineService) {
-            try {
-              TimelineClient timelineClient = context.getApplications().get(
-                  containerId.getApplicationAttemptId().getApplicationId()).
-                      getTimelineClient();
-              putEntityWithoutBlocking(timelineClient, entity);
-            } catch (Exception e) {
-              LOG.error("Exception in ContainersMonitorImpl in putting " +
-                  "resource usage metrics to timeline service.", e);
-            }
-          }
         }
         if (LOG.isDebugEnabled()) {
           LOG.debug("Total Resource Usage stats in NM by all containers : "
@@ -684,20 +603,6 @@ public class ContainersMonitorImpl extends AbstractService implements
         }
       }
     }
-    
-    private void putEntityWithoutBlocking(final TimelineClient timelineClient, 
-        final TimelineEntity entity) {
-      Runnable publishWrapper = new Runnable() {
-        public void run() {
-          try {
-            timelineClient.putEntities(entity);
-          } catch (IOException|YarnException e) {
-            LOG.error("putEntityNonBlocking get failed: " + e);
-          }
-        }
-      };
-      threadPool.execute(publishWrapper);
-    }
 
     private String formatErrorMessage(String memTypeExceeded,
         long currentVmemUsage, long vmemLimit,

+ 31 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEvent.java

@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import org.apache.hadoop.yarn.event.AbstractEvent;
+
+public class NMTimelineEvent extends AbstractEvent<NMTimelineEventType> {
+  public NMTimelineEvent(NMTimelineEventType type) {
+    super(type);
+  }
+
+  public NMTimelineEvent(NMTimelineEventType type, long timestamp) {
+    super(type, timestamp);
+  }
+}

+ 24 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelineEventType.java

@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+public enum NMTimelineEventType {
+  // Publish the NM Timeline entity
+  TIMELINE_ENTITY_PUBLISH,
+}

+ 376 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/timelineservice/NMTimelinePublisher.java

@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.timelineservice;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity.Identifier;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationContainerFinishedEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizationEventType;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl.ContainerMetric;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+public class NMTimelinePublisher extends CompositeService {
+
+  private static final Log LOG = LogFactory.getLog(NMTimelinePublisher.class);
+
+  private Dispatcher dispatcher;
+  private boolean publishSystemMetrics;
+
+  private Context context;
+
+  private NodeId nodeId;
+
+  private String httpAddress;
+
+  public NMTimelinePublisher(Context context) {
+    super(NMTimelinePublisher.class.getName());
+    this.context = context;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    publishSystemMetrics =
+        YarnConfiguration.systemMetricsPublisherEnabled(conf);
+
+    if (publishSystemMetrics) {
+      dispatcher = new AsyncDispatcher();
+      dispatcher.register(NMTimelineEventType.class,
+          new ForwardingEventHandler());
+      dispatcher
+          .register(ContainerEventType.class, new ContainerEventHandler());
+      dispatcher.register(ApplicationEventType.class,
+          new ApplicationEventHandler());
+      dispatcher.register(LocalizationEventType.class,
+          new LocalizationEventDispatcher());
+      addIfService(dispatcher);
+      LOG.info("YARN system metrics publishing service is enabled");
+    } else {
+      LOG.info("YARN system metrics publishing service is not enabled");
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    // context will be updated after containerManagerImpl is started
+    // hence NMMetricsPublisher is added subservice of containerManagerImpl
+    this.nodeId = context.getNodeId();
+    this.httpAddress = nodeId.getHost() + ":" + context.getHttpPort();
+  }
+
+  protected void handleNMTimelineEvent(NMTimelineEvent event) {
+    switch (event.getType()) {
+    case TIMELINE_ENTITY_PUBLISH:
+      putEntity(((TimelinePublishEvent) event).getTimelineEntityToPublish(),
+          ((TimelinePublishEvent) event).getApplicationId());
+      break;
+    default:
+      LOG.error("Unknown NMTimelineEvent type: " + event.getType());
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  public void reportContainerResourceUsage(Container container,
+      long createdTime, String pId, Long pmemUsage,
+      Float cpuUsageTotalCoresPercentage) {
+    if (publishSystemMetrics
+        && (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE || cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE)) {
+      ContainerEntity entity =
+          createContainerEntity(container.getContainerId());
+      long currentTimeMillis = System.currentTimeMillis();
+      if (pmemUsage != ResourceCalculatorProcessTree.UNAVAILABLE) {
+        TimelineMetric memoryMetric = new TimelineMetric();
+        memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
+        memoryMetric.addValue(currentTimeMillis, pmemUsage);
+        entity.addMetric(memoryMetric);
+      }
+      if (cpuUsageTotalCoresPercentage != ResourceCalculatorProcessTree.UNAVAILABLE) {
+        TimelineMetric cpuMetric = new TimelineMetric();
+        cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
+        cpuMetric.addValue(currentTimeMillis, cpuUsageTotalCoresPercentage);
+        entity.addMetric(cpuMetric);
+      }
+      dispatcher.getEventHandler().handle(
+          new TimelinePublishEvent(entity, container.getContainerId()
+              .getApplicationAttemptId().getApplicationId()));
+    }
+  }
+
+  private void publishContainerCreatedEvent(ContainerEntity entity,
+      ContainerId containerId, Resource resource, Priority priority,
+      long timestamp) {
+    Map<String, Object> entityInfo = new HashMap<String, Object>();
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO,
+        resource.getMemory());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO,
+        resource.getVirtualCores());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO,
+        nodeId.getHost());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO,
+        nodeId.getPort());
+    entityInfo.put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO,
+        priority.toString());
+    entityInfo.put(
+        ContainerMetricsConstants.ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO,
+        httpAddress);
+    entity.setInfo(entityInfo);
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.CREATED_EVENT_TYPE);
+    tEvent.setTimestamp(timestamp);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+  }
+
+  private void publishContainerFinishedEvent(ContainerStatus containerStatus,
+      long timeStamp) {
+    ContainerId containerId = containerStatus.getContainerId();
+    TimelineEntity entity = createContainerEntity(containerId);
+
+    Map<String, Object> eventInfo = new HashMap<String, Object>();
+    eventInfo.put(ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO,
+        containerStatus.getDiagnostics());
+    eventInfo.put(ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO,
+        containerStatus.getExitStatus());
+    eventInfo.put(ContainerMetricsConstants.STATE_EVENT_INFO, containerStatus
+        .getState().toString());
+
+    TimelineEvent tEvent = new TimelineEvent();
+    tEvent.setId(ContainerMetricsConstants.FINISHED_EVENT_TYPE);
+    tEvent.setTimestamp(timeStamp);
+    tEvent.setInfo(eventInfo);
+
+    entity.addEvent(tEvent);
+    putEntity(entity, containerId.getApplicationAttemptId().getApplicationId());
+  }
+
+  private static ContainerEntity createContainerEntity(ContainerId containerId) {
+    ContainerEntity entity = new ContainerEntity();
+    entity.setId(containerId.toString());
+    Identifier parentIdentifier = new Identifier();
+    parentIdentifier
+        .setType(TimelineEntityType.YARN_APPLICATION_ATTEMPT.name());
+    parentIdentifier.setId(containerId.getApplicationAttemptId().toString());
+    entity.setParent(parentIdentifier);
+    return entity;
+  }
+
+  private void putEntity(TimelineEntity entity, ApplicationId appId) {
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
+            + TimelineUtils.dumpTimelineRecordtoJSON(entity));
+      }
+      TimelineClient timelineClient =
+          context.getApplications().get(appId).getTimelineClient();
+      timelineClient.putEntities(entity);
+    } catch (Exception e) {
+      LOG.error("Error when publishing entity " + entity, e);
+    }
+  }
+
+  public void publishApplicationEvent(ApplicationEvent event) {
+    if (!publishSystemMetrics) {
+      return;
+    }
+    // publish only when the desired event is received
+    switch (event.getType()) {
+    case INIT_APPLICATION:
+    case FINISH_APPLICATION:
+    case APPLICATION_CONTAINER_FINISHED:
+    case APPLICATION_LOG_HANDLING_FAILED:
+      dispatcher.getEventHandler().handle(event);
+      break;
+
+    default:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(event.getType()
+            + " is not a desired ApplicationEvent which needs to be published by"
+            + " NMTimelinePublisher");
+      }
+      break;
+    }
+  }
+
+  public void publishContainerEvent(ContainerEvent event) {
+    if (!publishSystemMetrics) {
+      return;
+    }
+    // publish only when the desired event is received
+    switch (event.getType()) {
+    case INIT_CONTAINER:
+      dispatcher.getEventHandler().handle(event);
+      break;
+
+    default:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(event.getType()
+            + " is not a desired ContainerEvent which needs to be published by"
+            + " NMTimelinePublisher");
+      }
+      break;
+    }
+  }
+
+  public void publishLocalizationEvent(LocalizationEvent event) {
+    if (!publishSystemMetrics) {
+      return;
+    }
+    // publish only when the desired event is received
+    switch (event.getType()) {
+    case CONTAINER_RESOURCES_LOCALIZED:
+    case INIT_CONTAINER_RESOURCES:
+      dispatcher.getEventHandler().handle(event);
+      break;
+
+    default:
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(event.getType()
+            + " is not a desired LocalizationEvent which needs to be published"
+            + " by NMTimelinePublisher");
+      }
+      break;
+    }
+  }
+
+  private class ApplicationEventHandler implements
+      EventHandler<ApplicationEvent> {
+    @Override
+    public void handle(ApplicationEvent event) {
+      switch (event.getType()) {
+      case APPLICATION_CONTAINER_FINISHED:
+        // this is actually used to publish the container Event
+        ApplicationContainerFinishedEvent evnt =
+            (ApplicationContainerFinishedEvent) event;
+        publishContainerFinishedEvent(evnt.getContainerStatus(),
+            event.getTimestamp());
+        break;
+      default:
+        LOG.error("Seems like event type is captured only in "
+            + "publishApplicationEvent method and not handled here");
+        break;
+      }
+    }
+  }
+
+  private class ContainerEventHandler implements EventHandler<ContainerEvent> {
+    @Override
+    public void handle(ContainerEvent event) {
+      ContainerId containerId = event.getContainerID();
+      Container container = context.getContainers().get(containerId);
+      long timestamp = event.getTimestamp();
+      ContainerEntity entity = createContainerEntity(containerId);
+
+      switch (event.getType()) {
+      case INIT_CONTAINER:
+        publishContainerCreatedEvent(entity, containerId,
+            container.getResource(), container.getPriority(), timestamp);
+        break;
+      default:
+        LOG.error("Seems like event type is captured only in "
+            + "publishContainerEvent method and not handled here");
+        break;
+      }
+    }
+  }
+
+  private static final class LocalizationEventDispatcher implements
+      EventHandler<LocalizationEvent> {
+    @Override
+    public void handle(LocalizationEvent event) {
+      switch (event.getType()) {
+      case INIT_CONTAINER_RESOURCES:
+      case CONTAINER_RESOURCES_LOCALIZED:
+        // TODO after priority based flush jira is finished
+        break;
+      default:
+        LOG.error("Seems like event type is captured only in "
+            + "publishLocalizationEvent method and not handled here");
+        break;
+      }
+    }
+  }
+
+  /**
+   * EventHandler implementation which forward events to NMMetricsPublisher.
+   * Making use of it, NMMetricsPublisher can avoid to have a public handle
+   * method.
+   */
+  private final class ForwardingEventHandler implements
+      EventHandler<NMTimelineEvent> {
+
+    @Override
+    public void handle(NMTimelineEvent event) {
+      handleNMTimelineEvent(event);
+    }
+  }
+
+  private static class TimelinePublishEvent extends NMTimelineEvent {
+    private ApplicationId appId;
+    private TimelineEntity entityToPublish;
+
+    public TimelinePublishEvent(TimelineEntity entity, ApplicationId appId) {
+      super(NMTimelineEventType.TIMELINE_ENTITY_PUBLISH, System
+          .currentTimeMillis());
+      this.appId = appId;
+      this.entityToPublish = entity;
+    }
+
+    public ApplicationId getApplicationId() {
+      return appId;
+    }
+
+    public TimelineEntity getTimelineEntityToPublish() {
+      return entityToPublish;
+    }
+  }
+}

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -85,6 +85,7 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -1215,6 +1216,7 @@ public class TestNodeStatusUpdater {
         BuilderUtils.newContainerToken(containerId, "host", 1234, "user",
             BuilderUtils.newResource(1024, 1), 0, 123,
             "password".getBytes(), 0);
+
     Container completedContainer = new ContainerImpl(conf, null,
         null, null, null,
         BuilderUtils.newContainerTokenIdentifier(containerToken),

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -71,6 +71,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.scheduler.OpportunisticContainerAllocator;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.After;
@@ -709,5 +710,13 @@ public abstract class BaseAMRMProxyTest {
     public OpportunisticContainerAllocator getContainerAllocator() {
       return null;
     }
+
+    public void setNMTimelinePublisher(NMTimelinePublisher nmMetricsPublisher) {
+    }
+
+    @Override
+    public NMTimelinePublisher getNMTimelinePublisher() {
+      return null;
+    }
   }
 }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -101,10 +101,12 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreServic
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.nodemanager.timelineservice.NMTimelinePublisher;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 public class TestContainerManagerRecovery extends BaseContainerManagerTest {
 
@@ -722,6 +724,12 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
               boolean blockNewContainerRequests) {
             // do nothing
           }
+
+          @Override
+          public NMTimelinePublisher createNMTimelinePublisher(Context context) {
+            NMTimelinePublisher timelinePublisher = mock(NMTimelinePublisher.class);
+            return timelinePublisher;
+          }
     };
   }
 }

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/TestApplication.java

@@ -34,7 +34,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
@@ -599,7 +602,7 @@ public class TestApplication {
 
     public void containerFinished(int containerNum) {
       app.handle(new ApplicationContainerFinishedEvent(containers.get(
-          containerNum).getContainerId()));
+          containerNum).cloneAndGetContainerStatus()));
       drainDispatcherEvents();
     }
 
@@ -643,6 +646,8 @@ public class TestApplication {
     when(c.getLaunchContext()).thenReturn(launchContext);
     when(launchContext.getApplicationACLs()).thenReturn(
         new HashMap<ApplicationAccessType, String>());
+    when(c.cloneAndGetContainerStatus()).thenReturn(BuilderUtils.newContainerStatus(cId,
+        ContainerState.NEW, "", 0, Resource.newInstance(1024, 1)));
     return c;
   }
 }

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -172,4 +173,8 @@ public class MockContainer implements Container {
   @Override
   public void setLogDir(String logDir) {
   }
+
+  public Priority getPriority() {
+    return Priority.UNDEFINED;
+  }
 }

+ 13 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java

@@ -137,9 +137,19 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
     // intercept the event of the AM container being stopped and remove the app
     // level collector service
     if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
-      ApplicationId appId = context.getContainerId().
-          getApplicationAttemptId().getApplicationId();
-      removeApplication(appId);
+      final ApplicationId appId =
+          context.getContainerId().getApplicationAttemptId().getApplicationId();
+      new Thread(new Runnable() {
+        public void run() {
+          try {
+            // TODO Temporary Fix until solution for YARN-3995 is finalized.
+            Thread.sleep(1000l);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+          removeApplication(appId);
+        }
+      }).start();
     }
   }
 

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java

@@ -98,6 +98,15 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerType()).thenReturn(
         ContainerType.APPLICATION_MASTER);
     auxService.stopContainer(context);
+
+    // TODO Temporary Fix until solution for YARN-3995 is finalized
+    for (int i = 0; i < 4; i++) {
+      Thread.sleep(500l);
+      if (!auxService.hasApplication(appAttemptId.getApplicationId())) {
+        break;
+      }
+    }
+
     // auxService should not have that app
     assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
     auxService.close();