Jelajahi Sumber

YARN-3334. NM uses timeline client to publish container metrics to new timeline service. Contributed by Junping Du.

Zhijie Shen 10 tahun lalu
induk
melakukan
cf0eeaf476
29 mengubah file dengan 336 tambahan dan 114 penghapusan
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 3 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java
  3. 8 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 19 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
  5. 35 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
  6. 4 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
  7. 3 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  8. 13 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  9. 45 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  10. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java
  11. 3 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java
  12. 25 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java
  13. 116 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
  14. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
  15. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
  16. 5 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  17. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  18. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
  19. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
  20. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
  21. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
  22. 7 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java
  23. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java
  24. 6 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java
  25. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
  26. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java
  27. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java
  28. 9 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  29. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -47,6 +47,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3374. Collector's web server should randomly bind an available port. (
     Zhijie Shen via junping_du)
 
+    YARN-3334. NM uses timeline client to publish container metrics to new
+    timeline service. (Junping Du via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

+ 3 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/HierarchicalTimelineEntity.java

@@ -58,7 +58,9 @@ public abstract class HierarchicalTimelineEntity extends TimelineEntity {
 
   // required by JAXB
   @InterfaceAudience.Private
-  @XmlElement(name = "children")
+  // comment out XmlElement here because it cause UnrecognizedPropertyException
+  // TODO we need a better fix
+  //@XmlElement(name = "children")
   public HashMap<String, Set<String>> getChildrenJAXB() {
     return children;
   }

+ 8 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2476,6 +2476,14 @@ public class YarnConfiguration extends Configuration {
     }
     return clusterId;
   }
+  
+  public static boolean systemMetricsPublisherEnabled(Configuration conf) {
+    return conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+        && conf.getBoolean(
+            YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+            YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);  
+  }
 
   /* For debugging. mp configurations to system output as XML format. */
   public static void main(String[] args) throws Exception {

+ 19 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java

@@ -314,19 +314,8 @@ public class ApplicationMaster {
       }
       appMaster.run();
       result = appMaster.finish();
-
-      threadPool.shutdown();
-
-      while (!threadPool.isTerminated()) { // wait for all posting thread to finish
-        try {
-          if (!threadPool.awaitTermination(30, TimeUnit.SECONDS)) {
-            threadPool.shutdownNow(); // send interrupt to hurry them along
-          }
-        } catch (InterruptedException e) {
-          LOG.warn("Timeline client service stop interrupted!");
-          break;
-        }
-      }
+      
+      shutdownAndAwaitTermination();
     } catch (Throwable t) {
       LOG.fatal("Error running ApplicationMaster", t);
       LogManager.shutdown();
@@ -340,6 +329,23 @@ public class ApplicationMaster {
       System.exit(2);
     }
   }
+  
+  //TODO remove threadPool after adding non-blocking call in TimelineClient
+  private static void shutdownAndAwaitTermination() {
+    threadPool.shutdown();
+    try {
+      // Wait a while for existing tasks to terminate
+      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        threadPool.shutdownNow();
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
+            LOG.error("ThreadPool did not terminate");
+      }
+    } catch (InterruptedException ie) {
+      threadPool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
 
   /**
    * Dump out contents of $CWD and the environment to stdout for debugging

+ 35 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java

@@ -49,13 +49,17 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
 import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.util.LinuxResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -108,6 +112,16 @@ public class TestDistributedShell {
     conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
     conf.set("mapreduce.jobhistory.address",
         "0.0.0.0:" + ServerSocketUtil.getPort(10021, 10));
+    // Enable ContainersMonitorImpl
+    conf.set(YarnConfiguration.NM_CONTAINER_MON_RESOURCE_CALCULATOR,
+        LinuxResourceCalculatorPlugin.class.getName());
+    conf.set(YarnConfiguration.NM_CONTAINER_MON_PROCESS_TREE, 
+        ProcfsBasedProcessTree.class.getName());
+    conf.setBoolean(YarnConfiguration.NM_PMEM_CHECK_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.NM_VMEM_CHECK_ENABLED, true);
+    conf.setBoolean(YarnConfiguration.YARN_MINICLUSTER_CONTROL_RESOURCE_MONITORING,
+        true);
+    conf.setBoolean(YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED, true);
 
     if (yarnCluster == null) {
       yarnCluster =
@@ -357,15 +371,14 @@ public class TestDistributedShell {
     File tmpRootFolder = new File(tmpRoot);
     try {
       Assert.assertTrue(tmpRootFolder.isDirectory());
-
-      // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
-      String outputDirApp = tmpRoot +
+      String basePath = tmpRoot +
           YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
           UserGroupInformation.getCurrentUser().getShortUserName() +
           (defaultFlow ? "/" +
               TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
-              "/0/" : "/test_flow_id/12345678/") +
-          appId.toString() + "/DS_APP_ATTEMPT/";
+              "/0/" : "/test_flow_id/12345678/") + appId.toString();
+      // for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
+      String outputDirApp = basePath + "/DS_APP_ATTEMPT/";
 
       File entityFolder = new File(outputDirApp);
       Assert.assertTrue(entityFolder.isDirectory());
@@ -378,13 +391,7 @@ public class TestDistributedShell {
       File appAttemptFile = new File(appAttemptFileName);
       Assert.assertTrue(appAttemptFile.exists());
 
-      String outputDirContainer = tmpRoot +
-          YarnConfiguration.DEFAULT_RM_CLUSTER_ID + "/" +
-          UserGroupInformation.getCurrentUser().getShortUserName() +
-          (defaultFlow ? "/" +
-              TimelineUtils.generateDefaultFlowIdBasedOnAppId(appId) +
-              "/0/" : "/test_flow_id/12345678/") +
-          appId.toString() + "/DS_CONTAINER/";
+      String outputDirContainer = basePath + "/DS_CONTAINER/";
       File containerFolder = new File(outputDirContainer);
       Assert.assertTrue(containerFolder.isDirectory());
 
@@ -396,6 +403,22 @@ public class TestDistributedShell {
       Assert.assertTrue(containerFile.exists());
       String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
           + "_";
+
+      // Verify NM posting container metrics info.
+      String outputDirContainerMetrics = basePath + "/" + 
+          TimelineEntityType.YARN_CONTAINER + "/";
+      File containerMetricsFolder = new File(outputDirContainerMetrics);
+      Assert.assertTrue(containerMetricsFolder.isDirectory());
+
+      String containerMetricsTimestampFileName = "container_"
+        + appId.getClusterTimestamp() + "_000" + appId.getId()
+        + "_01_000001.thist";
+      String containerMetricsFileName = outputDirContainerMetrics + 
+        containerMetricsTimestampFileName;
+
+      File containerMetricsFile = new File(containerMetricsFileName);
+      Assert.assertTrue(containerMetricsFile.exists());
+
     } finally {
       FileUtils.deleteDirectory(tmpRootFolder.getParentFile());
     }

+ 4 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java

@@ -472,14 +472,11 @@ public class TimelineClientImpl extends TimelineClient {
     }
     if (resp == null ||
         resp.getClientResponseStatus() != ClientResponse.Status.OK) {
-      String msg =
-          "Failed to get the response from the timeline server.";
+      String msg = "Response from the timeline server is " + 
+          ((resp == null) ? "null": 
+          "not successful," + " HTTP error code: " + resp.getStatus()
+          + ", Server response:\n" + resp.getEntity(String.class));
       LOG.error(msg);
-      if (LOG.isDebugEnabled() && resp != null) {
-        String output = resp.getEntity(String.class);
-        LOG.debug("HTTP error code: " + resp.getStatus()
-            + " Server response:\n" + output);
-      }
       throw new YarnException(msg);
     }
   }

+ 3 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -22,6 +22,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -66,13 +67,6 @@ public interface Context {
    */
   Map<ApplicationId, String> getRegisteredCollectors();
 
-  /**
-   * Return the known collectors which get from RM for all active applications
-   * running on this NM.
-   * @return known collectors.
-   */
-  Map<ApplicationId, String> getKnownCollectors();
-
   ConcurrentMap<ContainerId, Container> getContainers();
 
   ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
@@ -95,6 +89,8 @@ public interface Context {
   NMStateStoreService getNMStateStore();
 
   boolean getDecommissioned();
+  
+  Configuration getConf();
 
   void setDecommissioned(boolean isDecommissioned);
 

+ 13 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -193,9 +193,9 @@ public class NodeManager extends CompositeService
   protected NMContext createNMContext(
       NMContainerTokenSecretManager containerTokenSecretManager,
       NMTokenSecretManagerInNM nmTokenSecretManager,
-      NMStateStoreService stateStore) {
+      NMStateStoreService stateStore, Configuration conf) {
     return new NMContext(containerTokenSecretManager, nmTokenSecretManager,
-        dirsHandler, aclsManager, stateStore);
+        dirsHandler, aclsManager, stateStore, conf);
   }
 
   protected void doSecureLogin() throws IOException {
@@ -317,7 +317,7 @@ public class NodeManager extends CompositeService
     addService(nodeHealthChecker);
 
     this.context = createNMContext(containerTokenSecretManager,
-        nmTokenSecretManager, nmStore);
+        nmTokenSecretManager, nmStore, conf);
 
     nodeLabelsProvider = createNodeLabelsProvider(conf);
 
@@ -441,6 +441,9 @@ public class NodeManager extends CompositeService
   public static class NMContext implements Context {
 
     private NodeId nodeId = null;
+    
+    private Configuration conf = null;
+    
     protected final ConcurrentMap<ApplicationId, Application> applications =
         new ConcurrentHashMap<ApplicationId, Application>();
 
@@ -453,9 +456,6 @@ public class NodeManager extends CompositeService
     protected Map<ApplicationId, String> registeredCollectors =
         new ConcurrentHashMap<ApplicationId, String>();
 
-    protected Map<ApplicationId, String> knownCollectors =
-        new ConcurrentHashMap<ApplicationId, String>();
-
     protected final ConcurrentMap<ContainerId,
         org.apache.hadoop.yarn.api.records.Container> increasedContainers =
             new ConcurrentHashMap<>();
@@ -477,7 +477,7 @@ public class NodeManager extends CompositeService
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
-        NMStateStoreService stateStore) {
+        NMStateStoreService stateStore, Configuration conf) {
       this.containerTokenSecretManager = containerTokenSecretManager;
       this.nmTokenSecretManager = nmTokenSecretManager;
       this.dirsHandler = dirsHandler;
@@ -488,6 +488,7 @@ public class NodeManager extends CompositeService
       this.stateStore = stateStore;
       this.logAggregationReportForApps = new ConcurrentLinkedQueue<
           LogAggregationReport>();
+      this.conf = conf;
     }
 
     /**
@@ -507,6 +508,11 @@ public class NodeManager extends CompositeService
     public ConcurrentMap<ApplicationId, Application> getApplications() {
       return this.applications;
     }
+    
+    @Override
+    public Configuration getConf() {
+      return this.conf;
+    }
 
     @Override
     public ConcurrentMap<ContainerId, Container> getContainers() {
@@ -609,19 +615,6 @@ public class NodeManager extends CompositeService
     public void addRegisteredCollectors(
         Map<ApplicationId, String> newRegisteredCollectors) {
       this.registeredCollectors.putAll(newRegisteredCollectors);
-      // Update to knownCollectors as well so it can immediately be consumed by
-      // this NM's TimelineClient.
-      this.knownCollectors.putAll(newRegisteredCollectors);
-    }
-
-    @Override
-    public Map<ApplicationId, String> getKnownCollectors() {
-      return this.knownCollectors;
-    }
-
-    public void addKnownCollectors(
-        Map<ApplicationId, String> knownCollectors) {
-      this.knownCollectors.putAll(knownCollectors);
     }
   }
 

+ 45 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -77,6 +78,7 @@ import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@@ -821,10 +823,9 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
               dispatcher.getEventHandler().handle(
                   new CMgrSignalContainersEvent(containersToSignal));
             }
-
-            Map<ApplicationId, String> knownCollectors =
-                response.getAppCollectorsMap();
-            ((NodeManager.NMContext)context).addKnownCollectors(knownCollectors);
+            if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
+              updateTimelineClientsAddress(response);
+            }
 
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
@@ -853,6 +854,46 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         }
       }
 
+      /**
+       * Caller should take care of sending non null nodelabels for both
+       * arguments
+       * 
+       * @param nodeLabelsNew
+       * @param nodeLabelsOld
+       * @return if the New node labels are diff from the older one.
+       */
+      private boolean areNodeLabelsUpdated(Set<NodeLabel> nodeLabelsNew,
+          Set<NodeLabel> nodeLabelsOld) {
+        if (nodeLabelsNew.size() != nodeLabelsOld.size()
+            || !nodeLabelsOld.containsAll(nodeLabelsNew)) {
+          return true;
+        }
+        return false;
+      }
+
+      private void updateTimelineClientsAddress(
+          NodeHeartbeatResponse response) {
+        Set<Map.Entry<ApplicationId, String>> rmKnownCollectors = 
+            response.getAppCollectorsMap().entrySet();
+        for (Map.Entry<ApplicationId, String> entry : rmKnownCollectors) {
+          ApplicationId appId = entry.getKey();
+          String collectorAddr = entry.getValue();
+
+          // Only handle applications running on local node.
+          // Not include apps with timeline collectors running in local
+          Application application = context.getApplications().get(appId);
+          if (application != null &&
+              !context.getRegisteredCollectors().containsKey(appId)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Sync a new collector address: " + collectorAddr + 
+                  " for application: " + appId + " from RM.");
+            }
+            TimelineClient client = application.getTimelineClient();
+            client.setTimelineServiceAddress(collectorAddr);
+          }
+        }
+      }
+      
       private void updateMasterKeys(NodeHeartbeatResponse response) {
         // See if the master-key has rolled over
         MasterKey updatedMasterKey = response.getContainerTokenMasterKey();

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/collectormanager/NMCollectorService.java

@@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -103,7 +104,15 @@ public class NMCollectorService extends CompositeService implements
       Map<ApplicationId, String> newCollectorsMap =
           new HashMap<ApplicationId, String>();
       for (AppCollectorsMap collector : newCollectorsList) {
-        newCollectorsMap.put(collector.getApplicationId(), collector.getCollectorAddr());
+        ApplicationId appId = collector.getApplicationId();
+        String collectorAddr = collector.getCollectorAddr();
+        newCollectorsMap.put(appId, collectorAddr);
+        // set registered collector address to TimelineClient.
+        if (YarnConfiguration.systemMetricsPublisherEnabled(context.getConf())) {
+          TimelineClient client = 
+              context.getApplications().get(appId).getTimelineClient();
+          client.setTimelineServiceAddress(collectorAddr);
+        }
       }
       ((NodeManager.NMContext)context).addRegisteredCollectors(newCollectorsMap);
     }

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/Application.java

@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 
@@ -38,5 +39,7 @@ public interface Application extends EventHandler<ApplicationEvent> {
   String getFlowId();
 
   String getFlowRunId();
+  
+  TimelineClient getTimelineClient();
 
 }

+ 25 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/application/ApplicationImpl.java

@@ -28,12 +28,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
@@ -73,6 +76,7 @@ public class ApplicationImpl implements Application {
   private final ReadLock readLock;
   private final WriteLock writeLock;
   private final Context context;
+  private TimelineClient timelineClient;
 
   private static final Log LOG = LogFactory.getLog(ApplicationImpl.class);
 
@@ -96,6 +100,17 @@ public class ApplicationImpl implements Application {
     readLock = lock.readLock();
     writeLock = lock.writeLock();
     stateMachine = stateMachineFactory.make(this);
+    Configuration conf = context.getConf();
+    if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
+      createAndStartTimelienClient(conf);
+    }
+  }
+  
+  private void createAndStartTimelienClient(Configuration conf) {
+    // create and start timeline client
+    this.timelineClient = TimelineClient.createTimelineClient(appId);
+    timelineClient.init(conf);
+    timelineClient.start();
   }
 
   @Override
@@ -107,6 +122,11 @@ public class ApplicationImpl implements Application {
   public ApplicationId getAppId() {
     return appId;
   }
+  
+  @Override
+  public TimelineClient getTimelineClient() {
+    return timelineClient;
+  }
 
   @Override
   public ApplicationState getApplicationState() {
@@ -433,7 +453,11 @@ public class ApplicationImpl implements Application {
       // TODO check we remove related collectors info in failure cases
       // (YARN-3038)
       app.context.getRegisteredCollectors().remove(app.getAppId());
-      app.context.getKnownCollectors().remove(app.getAppId());
+      // stop timelineClient when application get finished.
+      TimelineClient timelineClient = app.getTimelineClient();
+      if (timelineClient != null) {
+        timelineClient.stop();
+      }
     }
   }
 

+ 116 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -18,32 +18,43 @@
 
 package org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor;
 
+import java.io.IOException;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.timelineservice.ContainerEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.api.records.ResourceUtilization;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
 import org.apache.hadoop.yarn.server.nodemanager.util.NodeManagerHardwareUtils;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 import org.apache.hadoop.yarn.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class ContainersMonitorImpl extends AbstractService implements
     ContainersMonitor {
@@ -75,11 +86,25 @@ public class ContainersMonitorImpl extends AbstractService implements
   private boolean pmemCheckEnabled;
   private boolean vmemCheckEnabled;
   private boolean containersMonitorEnabled;
+  
+  private boolean publishContainerMetricsToTimelineService;
 
   private long maxVCoresAllottedForContainers;
 
   private static final long UNKNOWN_MEMORY_LIMIT = -1L;
   private int nodeCpuPercentageForYARN;
+  
+  // For posting entities in new timeline service in a non-blocking way
+  // TODO replace with event loop in TimelineClient.
+  private static ExecutorService threadPool =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder().setNameFormat("TimelineService #%d")
+          .build());
+  
+  @Private
+  public static enum ContainerMetric {
+    CPU, MEMORY
+  }
 
   private ResourceUtilization containersUtilization;
 
@@ -188,6 +213,18 @@ public class ContainersMonitorImpl extends AbstractService implements
                 1) + "). Thrashing might happen.");
       }
     }
+    
+    publishContainerMetricsToTimelineService =
+        YarnConfiguration.systemMetricsPublisherEnabled(conf);
+
+    if (publishContainerMetricsToTimelineService) {
+      LOG.info("NodeManager has been configured to publish container " +
+          "metrics to Timeline Service V2.");
+    } else {
+      LOG.warn("NodeManager has not been configured to publish container " +
+          "metrics to Timeline Service V2.");
+    }
+    
     super.serviceInit(conf);
   }
 
@@ -230,8 +267,27 @@ public class ContainersMonitorImpl extends AbstractService implements
         ;
       }
     }
+    
+    shutdownAndAwaitTermination();
+    
     super.serviceStop();
   }
+  
+  // TODO remove threadPool after adding non-blocking call in TimelineClient
+  private static void shutdownAndAwaitTermination() {
+    threadPool.shutdown();
+    try {
+      if (!threadPool.awaitTermination(60, TimeUnit.SECONDS)) {
+        threadPool.shutdownNow(); 
+        if (!threadPool.awaitTermination(60, TimeUnit.SECONDS))
+            LOG.error("ThreadPool did not terminate");
+      }
+    } catch (InterruptedException ie) {
+      threadPool.shutdownNow();
+      // Preserve interrupt status
+      Thread.currentThread().interrupt();
+    }
+  }
 
   @VisibleForTesting
   static class ProcessTreeInfo {
@@ -409,6 +465,10 @@ public class ContainersMonitorImpl extends AbstractService implements
             .entrySet()) {
           ContainerId containerId = entry.getKey();
           ProcessTreeInfo ptInfo = entry.getValue();
+          
+          ContainerEntity entity = new ContainerEntity();
+          entity.setId(containerId.toString());
+          
           try {
             String pId = ptInfo.getPID();
 
@@ -423,7 +483,8 @@ public class ContainersMonitorImpl extends AbstractService implements
                     + " for the first time");
 
                 ResourceCalculatorProcessTree pt =
-                    ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(pId, processTreeClass, conf);
+                    ResourceCalculatorProcessTree.getResourceCalculatorProcessTree(
+                        pId, processTreeClass, conf);
                 ptInfo.setPid(pId);
                 ptInfo.setProcessTree(pt);
 
@@ -447,13 +508,15 @@ public class ContainersMonitorImpl extends AbstractService implements
             pTree.updateProcessTree();    // update process-tree
             long currentVmemUsage = pTree.getVirtualMemorySize();
             long currentPmemUsage = pTree.getRssMemorySize();
+            long currentTime = System.currentTimeMillis();
+
             // if machine has 6 cores and 3 are used,
             // cpuUsagePercentPerCore should be 300% and
             // cpuUsageTotalCoresPercentage should be 50%
             float cpuUsagePercentPerCore = pTree.getCpuUsagePercent();
             float cpuUsageTotalCoresPercentage = cpuUsagePercentPerCore /
                 resourceCalculatorPlugin.getNumProcessors();
-
+            
             // Multiply by 1000 to avoid losing data when converting to int
             int milliVcoresUsed = (int) (cpuUsageTotalCoresPercentage * 1000
                 * maxVCoresAllottedForContainers /nodeCpuPercentageForYARN);
@@ -490,6 +553,26 @@ public class ContainersMonitorImpl extends AbstractService implements
                   ((int)cpuUsagePercentPerCore, milliVcoresUsed);
             }
 
+            if (publishContainerMetricsToTimelineService) {
+              // if currentPmemUsage data is available
+              if (currentPmemUsage != 
+                  ResourceCalculatorProcessTree.UNAVAILABLE) {
+                TimelineMetric memoryMetric = new TimelineMetric();
+                memoryMetric.setId(ContainerMetric.MEMORY.toString() + pId);
+                memoryMetric.addTimeSeriesData(currentTime, currentPmemUsage);
+                entity.addMetric(memoryMetric);
+              }
+              // if cpuUsageTotalCoresPercentage data is available
+              if (cpuUsageTotalCoresPercentage != 
+                ResourceCalculatorProcessTree.UNAVAILABLE) {
+                TimelineMetric cpuMetric = new TimelineMetric();
+                cpuMetric.setId(ContainerMetric.CPU.toString() + pId);
+                cpuMetric.addTimeSeriesData(currentTime, 
+                    cpuUsageTotalCoresPercentage);
+                entity.addMetric(cpuMetric);
+              }
+            }
+            
             boolean isMemoryOverLimit = false;
             String msg = "";
             int containerExitStatus = ContainerExitStatus.INVALID;
@@ -544,10 +627,23 @@ public class ContainersMonitorImpl extends AbstractService implements
               trackingContainers.remove(containerId);
               LOG.info("Removed ProcessTree with root " + pId);
             }
+
           } catch (Exception e) {
             // Log the exception and proceed to the next container.
-            LOG.warn("Uncaught exception in ContainerMemoryManager "
-                + "while managing memory of " + containerId, e);
+            LOG.warn("Uncaught exception in ContainersMonitorImpl "
+                + "while monitoring resource of " + containerId, e);
+          }
+          
+          if (publishContainerMetricsToTimelineService) {
+            try {
+              TimelineClient timelineClient = context.getApplications().get(
+                  containerId.getApplicationAttemptId().getApplicationId()).
+                      getTimelineClient();
+              putEntityWithoutBlocking(timelineClient, entity);
+            } catch (Exception e) {
+              LOG.error("Exception in ContainersMonitorImpl in putting " +
+                  "resource usage metrics to timeline service.", e);
+            }
           }
         }
         if (LOG.isDebugEnabled()) {
@@ -571,6 +667,21 @@ public class ContainersMonitorImpl extends AbstractService implements
         }
       }
     }
+    
+    private void putEntityWithoutBlocking(final TimelineClient timelineClient, 
+        final TimelineEntity entity) {
+      Runnable publishWrapper = new Runnable() {
+        public void run() {
+          try {
+            timelineClient.putEntities(entity);
+          } catch (IOException|YarnException e) {
+            LOG.error("putEntityNonBlocking get failed: " + e);
+            throw new RuntimeException(e.toString());
+          }
+        }
+      };
+      threadPool.execute(publishWrapper);
+    }
 
     private String formatErrorMessage(String memTypeExceeded,
         long currentVmemUsage, long vmemLimit,

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java

@@ -80,7 +80,7 @@ public class TestEventFlow {
     
     Context context = new NMContext(new NMContainerTokenSecretManager(conf),
         new NMTokenSecretManagerInNM(), null, null,
-        new NMNullStateStoreService()) {
+        new NMNullStateStoreService(), conf) {
       @Override
       public int getHttpPort() {
         return 1234;

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java

@@ -1565,9 +1565,9 @@ public class TestNodeStatusUpdater {
       protected NMContext createNMContext(
           NMContainerTokenSecretManager containerTokenSecretManager,
           NMTokenSecretManagerInNM nmTokenSecretManager,
-          NMStateStoreService store) {
+          NMStateStoreService store, Configuration conf) {
         return new MyNMContext(containerTokenSecretManager,
-          nmTokenSecretManager);
+          nmTokenSecretManager, conf);
       }
     };
 
@@ -1798,9 +1798,9 @@ public class TestNodeStatusUpdater {
 
     public MyNMContext(
         NMContainerTokenSecretManager containerTokenSecretManager,
-        NMTokenSecretManagerInNM nmTokenSecretManager) {
+        NMTokenSecretManagerInNM nmTokenSecretManager, Configuration conf) {
       super(containerTokenSecretManager, nmTokenSecretManager, null, null,
-          new NMNullStateStoreService());
+          new NMNullStateStoreService(), conf);
     }
 
     @Override

+ 5 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -619,11 +619,6 @@ public abstract class BaseAMRMProxyTest {
       return null;
     }
 
-    @Override
-    public Map<ApplicationId, String> getKnownCollectors() {
-      return null;
-    }
-
     @Override
     public ConcurrentMap<ContainerId, Container> getContainers() {
       return null;
@@ -674,6 +669,11 @@ public abstract class BaseAMRMProxyTest {
       return false;
     }
 
+    @Override
+    public Configuration getConf() {
+      return null;
+    }
+
     @Override
     public void setDecommissioned(boolean isDecommissioned) {
     }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -110,7 +110,7 @@ public abstract class BaseContainerManagerTest {
   protected Configuration conf = new YarnConfiguration();
   protected Context context = new NMContext(new NMContainerTokenSecretManager(
     conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+    new ApplicationACLsManager(conf), new NMNullStateStoreService(), conf) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.isA;
+import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.spy;
@@ -471,7 +471,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
       NMStateStoreService stateStore) {
     NMContext context = new NMContext(new NMContainerTokenSecretManager(
         conf), new NMTokenSecretManagerInNM(), null,
-        new ApplicationACLsManager(conf), stateStore){
+        new ApplicationACLsManager(conf), stateStore, conf){
       public int getHttpPort() {
         return HTTP_PORT;
       }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java

@@ -113,7 +113,7 @@ public class TestContainerLaunch extends BaseContainerManagerTest {
   private static final String INVALID_JAVA_HOME = "/no/jvm/here";
   protected Context distContext = new NMContext(new NMContainerTokenSecretManager(
     conf), new NMTokenSecretManagerInNM(), null,
-    new ApplicationACLsManager(conf), new NMNullStateStoreService()) {
+    new ApplicationACLsManager(conf), new NMNullStateStoreService(), conf) {
     public int getHttpPort() {
       return HTTP_PORT;
     };

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java

@@ -81,7 +81,7 @@ public class TestLocalCacheDirectoryManager {
     NMContext nmContext =
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
-          new ApplicationACLsManager(conf), new NMNullStateStoreService());
+          new ApplicationACLsManager(conf), new NMNullStateStoreService(), conf);
     ResourceLocalizationService service =
         new ResourceLocalizationService(null, null, null, null, nmContext);
     try {

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java

@@ -186,7 +186,7 @@ public class TestResourceLocalizationService {
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
     nmContext = new NMContext(new NMContainerTokenSecretManager(
       conf), new NMTokenSecretManagerInNM(), null,
-      new ApplicationACLsManager(conf), new NMNullStateStoreService());
+      new ApplicationACLsManager(conf), new NMNullStateStoreService(), conf);
   }
 
   @After
@@ -2365,7 +2365,7 @@ public class TestResourceLocalizationService {
     NMContext nmContext =
         new NMContext(new NMContainerTokenSecretManager(conf),
           new NMTokenSecretManagerInNM(), null,
-          new ApplicationACLsManager(conf), stateStore);
+          new ApplicationACLsManager(conf), stateStore, conf);
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
                                       dirsHandler, nmContext);

+ 7 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockApp.java

@@ -24,6 +24,7 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
@@ -41,7 +42,7 @@ public class MockApp implements Application {
   Application app;
   String flowId;
   String flowRunId;
-
+  TimelineClient timelineClient = null;
 
   public MockApp(int uniqId) {
     this("mockUser", 1234, uniqId);
@@ -87,4 +88,9 @@ public class MockApp implements Application {
   public String getFlowRunId() {
     return flowRunId;
   }
+  
+  @Override
+  public TimelineClient getTimelineClient() {
+    return timelineClient;
+  }
 }

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestContainerLogsPage.java

@@ -21,8 +21,8 @@ package org.apache.hadoop.yarn.server.nodemanager.webapp;
 import static org.junit.Assume.assumeTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import java.io.BufferedOutputStream;
 import java.io.File;
@@ -47,10 +47,9 @@ import org.apache.hadoop.util.NodeHealthScriptRunner;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -63,6 +62,7 @@ import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
@@ -96,7 +96,7 @@ public class TestContainerLogsPage {
     healthChecker.init(conf);
     LocalDirsHandlerService dirsHandler = healthChecker.getDiskHandler();
     NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), conf);
     // Add an application and the corresponding containers
     RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(conf);
     String user = "nobody";
@@ -136,7 +136,7 @@ public class TestContainerLogsPage {
     when(dirsHandlerForFullDisk.getLogDirsForRead()).
         thenReturn(Arrays.asList(new String[] {absLogDir.getAbsolutePath()}));
     nmContext = new NodeManager.NMContext(null, null, dirsHandlerForFullDisk,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), conf);
     nmContext.getApplications().put(appId, app);
     container.setState(ContainerState.RUNNING);
     nmContext.getContainers().put(container1, container);
@@ -158,7 +158,7 @@ public class TestContainerLogsPage {
     LocalDirsHandlerService dirsHandler = new LocalDirsHandlerService();
     dirsHandler.init(conf);
     NMContext nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-        new ApplicationACLsManager(conf), new NMNullStateStoreService());
+        new ApplicationACLsManager(conf), new NMNullStateStoreService(), conf);
     // Add an application and the corresponding containers
     String user = "nobody";
     long clusterTimeStamp = 1234;

+ 6 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServer.java

@@ -86,8 +86,9 @@ public class TestNMWebServer {
   }
 
   private int startNMWebAppServer(String webAddr) {
+    Configuration conf = new Configuration();
     Context nmContext = new NodeManager.NMContext(null, null, null, null,
-        null);
+        null, conf);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -110,7 +111,7 @@ public class TestNMWebServer {
         return true;
       }
     };
-    Configuration conf = new Configuration();
+    
     conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
     conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
     NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);
@@ -149,8 +150,9 @@ public class TestNMWebServer {
 
   @Test
   public void testNMWebApp() throws IOException, YarnException {
+    Configuration conf = new Configuration();
     Context nmContext = new NodeManager.NMContext(null, null, null, null,
-        null);
+        null, conf);
     ResourceView resourceView = new ResourceView() {
       @Override
       public long getVmemAllocatedForContainers() {
@@ -173,7 +175,7 @@ public class TestNMWebServer {
         return true;
       }
     };
-    Configuration conf = new Configuration();
+    
     conf.set(YarnConfiguration.NM_LOCAL_DIRS, testRootDir.getAbsolutePath());
     conf.set(YarnConfiguration.NM_LOG_DIRS, testLogDir.getAbsolutePath());
     NodeHealthCheckerService healthChecker = createNodeHealthCheckerService(conf);

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java

@@ -111,7 +111,7 @@ public class TestNMWebServices extends JerseyTestBase {
       healthChecker.init(conf);
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null);
+          aclsManager, null, conf);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 8042);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesApps.java

@@ -104,7 +104,7 @@ public class TestNMWebServicesApps extends JerseyTestBase {
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null);
+          aclsManager, null, conf);
       NodeId nodeId = NodeId.newInstance("testhost.foo.com", 9999);
       ((NodeManager.NMContext)nmContext).setNodeId(nodeId);
       resourceView = new ResourceView() {

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServicesContainers.java

@@ -132,7 +132,7 @@ public class TestNMWebServicesContainers extends JerseyTestBase {
       dirsHandler = healthChecker.getDiskHandler();
       aclsManager = new ApplicationACLsManager(conf);
       nmContext = new NodeManager.NMContext(null, null, dirsHandler,
-          aclsManager, null) {
+          aclsManager, null, conf) {
         public NodeId getNodeId() {
           return NodeId.newInstance("testhost.foo.com", 8042);
         };

+ 9 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -513,18 +513,16 @@ public class ResourceTrackerService extends AbstractService implements
     Map<ApplicationId, String> liveAppCollectorsMap = new
         ConcurrentHashMap<ApplicationId, String>();
     Map<ApplicationId, RMApp> rmApps = rmContext.getRMApps();
-      for (ApplicationId appId : liveApps) {
-        String appCollectorAddr = rmApps.get(appId).getCollectorAddr();
-        if (appCollectorAddr != null) {
-          liveAppCollectorsMap.put(appId, appCollectorAddr);
-        } else {
-          // Log a debug info if collector address is not found.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Collector for applicaton: " + appId +
-                " hasn't registered yet!");
-          }
-        }
+    // Set collectors for all apps now.
+    // TODO set collectors for only active apps running on NM (liveApps cannot be
+    // used for this case)
+    for (Map.Entry<ApplicationId, RMApp> rmApp : rmApps.entrySet()) {
+      ApplicationId appId = rmApp.getKey();
+      String appCollectorAddr = rmApp.getValue().getCollectorAddr();
+      if (appCollectorAddr != null) {
+        liveAppCollectorsMap.put(appId, appCollectorAddr);
       }
+    }
     response.setAppCollectorsMap(liveAppCollectorsMap);
   }
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java

@@ -135,7 +135,7 @@ public class TimelineCollectorWebService {
       }
       TimelineCollector collector = getCollector(req, appId);
       if (collector == null) {
-        LOG.error("Application not found");
+        LOG.error("Application: "+ appId + " is not found");
         throw new NotFoundException(); // different exception?
       }
       collector.putEntities(entities, callerUgi);