Selaa lähdekoodia

YARN-3390. Reuse TimelineCollectorManager for RM (Zhijie Shen via sjlee)

(cherry picked from commit 58221188811e0f61d842dac89e1f4ad4fd8aa182)
Sangjin Lee 10 vuotta sitten
vanhempi
commit
18abaab67e
18 muutettua tiedostoa jossa 433 lisäystä ja 407 poistoa
  1. 2 0
      hadoop-yarn-project/CHANGES.txt
  2. 7 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
  3. 9 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
  4. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  5. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  6. 7 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  7. 15 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
  8. 0 111
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java
  9. 75 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java
  10. 6 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
  11. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java
  12. 223 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java
  13. 7 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java
  14. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
  15. 43 216
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
  16. 9 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java
  17. 8 8
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java
  18. 10 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java

+ 2 - 0
hadoop-yarn-project/CHANGES.txt

@@ -53,6 +53,8 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage.
     YARN-3391. Clearly define flow ID/ flow run / flow version in API and storage.
     (Zhijie Shen via junping_du)
     (Zhijie Shen via junping_du)
 
 
+    YARN-3390. Reuse TimelineCollectorManager for RM (Zhijie Shen via sjlee)
+
   IMPROVEMENTS
   IMPROVEMENTS
 
 
   OPTIMIZATIONS
   OPTIMIZATIONS

+ 7 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
 
@@ -97,7 +97,7 @@ public class RMActiveServiceContext {
   private ApplicationMasterService applicationMasterService;
   private ApplicationMasterService applicationMasterService;
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private SystemMetricsPublisher systemMetricsPublisher;
   private SystemMetricsPublisher systemMetricsPublisher;
-  private RMTimelineCollector timelineCollector;
+  private RMTimelineCollectorManager timelineCollectorManager;
 
 
   private RMNodeLabelsManager nodeLabelManager;
   private RMNodeLabelsManager nodeLabelManager;
   private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
   private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
@@ -381,14 +381,15 @@ public class RMActiveServiceContext {
 
 
   @Private
   @Private
   @Unstable
   @Unstable
-  public RMTimelineCollector getRMTimelineCollector() {
-    return timelineCollector;
+  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+    return timelineCollectorManager;
   }
   }
 
 
   @Private
   @Private
   @Unstable
   @Unstable
-  public void setRMTimelineCollector(RMTimelineCollector timelineCollector) {
-    this.timelineCollector = timelineCollector;
+  public void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager timelineCollectorManager) {
+    this.timelineCollectorManager = timelineCollectorManager;
   }
   }
 
 
   @Private
   @Private

+ 9 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java

@@ -379,12 +379,13 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     }
     }
 
 
     // Create RMApp
     // Create RMApp
-    RMAppImpl application = new RMAppImpl(applicationId, rmContext, this.conf,
-        submissionContext.getApplicationName(), user,
-        submissionContext.getQueue(), submissionContext, this.scheduler,
-        this.masterService, submitTime, submissionContext.getApplicationType(),
-        submissionContext.getApplicationTags(), amReq);
-
+    RMAppImpl application =
+        new RMAppImpl(applicationId, rmContext, this.conf,
+            submissionContext.getApplicationName(), user,
+            submissionContext.getQueue(),
+            submissionContext, this.scheduler, this.masterService,
+            submitTime, submissionContext.getApplicationType(),
+            submissionContext.getApplicationTags(), amReq);
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with different applicationIds will not
     // Concurrent app submissions with different applicationIds will not
     // influence each other
     // influence each other
@@ -395,6 +396,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       LOG.warn(message);
       LOG.warn(message);
       throw new YarnException(message);
       throw new YarnException(message);
     }
     }
+    // Start timeline collector for the submitted app
+    application.startTimelineCollector();
     // Inform the ACLs Manager
     // Inform the ACLs Manager
     this.applicationACLsManager.addApplication(applicationId,
     this.applicationACLsManager.addApplication(applicationId,
         submissionContext.getAMContainerSpec().getApplicationACLs());
         submissionContext.getAMContainerSpec().getApplicationACLs());

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -45,7 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 
 
 /**
 /**
  * Context of the ResourceManager.
  * Context of the ResourceManager.
@@ -112,9 +112,10 @@ public interface RMContext {
 
 
   SystemMetricsPublisher getSystemMetricsPublisher();
   SystemMetricsPublisher getSystemMetricsPublisher();
   
   
-  void setRMTimelineCollector(RMTimelineCollector timelineCollector);
+  void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager timelineCollectorManager);
   
   
-  RMTimelineCollector getRMTimelineCollector();
+  RMTimelineCollectorManager getRMTimelineCollectorManager();
 
 
   ConfigurationProvider getConfigurationProvider();
   ConfigurationProvider getConfigurationProvider();
 
 

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
-import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.Clock;
 
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.annotations.VisibleForTesting;
@@ -367,14 +367,14 @@ public class RMContextImpl implements RMContext {
   }
   }
 
 
   @Override
   @Override
-  public void setRMTimelineCollector(
-      RMTimelineCollector timelineCollector) {
-    activeServiceContext.setRMTimelineCollector(timelineCollector);
+  public void setRMTimelineCollectorManager(
+      RMTimelineCollectorManager timelineCollectorManager) {
+    activeServiceContext.setRMTimelineCollectorManager(timelineCollectorManager);
   }
   }
 
 
   @Override
   @Override
-  public RMTimelineCollector getRMTimelineCollector() {
-    return activeServiceContext.getRMTimelineCollector();
+  public RMTimelineCollectorManager getRMTimelineCollectorManager() {
+    return activeServiceContext.getRMTimelineCollectorManager();
   }
   }
   
   
   @Override
   @Override

+ 7 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -94,11 +94,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.AppReportFetcher;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
 import org.apache.hadoop.yarn.server.webproxy.WebAppProxy;
@@ -374,8 +374,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
     return new RMApplicationHistoryWriter();
     return new RMApplicationHistoryWriter();
   }
   }
 
 
-  private RMTimelineCollector createRMTimelineCollector() {
-    return new RMTimelineCollector();
+  private RMTimelineCollectorManager createRMTimelineCollectorManager() {
+    return new RMTimelineCollectorManager(rmContext);
   }
   }
 
 
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
@@ -506,10 +506,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
       addService(systemMetricsPublisher);
       addService(systemMetricsPublisher);
       rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
       rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
 
 
-      RMTimelineCollector timelineCollector =
-          createRMTimelineCollector();
-      addService(timelineCollector);
-      rmContext.setRMTimelineCollector(timelineCollector);
+      RMTimelineCollectorManager timelineCollectorManager =
+          createRMTimelineCollectorManager();
+      addService(timelineCollectorManager);
+      rmContext.setRMTimelineCollectorManager(timelineCollectorManager);
 
 
       // Register event handler for NodesListManager
       // Register event handler for NodesListManager
       nodesListManager = new NodesListManager(rmContext);
       nodesListManager = new NodesListManager(rmContext);

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java

@@ -96,6 +96,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
@@ -526,6 +528,17 @@ public class RMAppImpl implements RMApp, Recoverable {
     }
     }
   }
   }
 
 
+  public void startTimelineCollector() {
+    AppLevelTimelineCollector collector =
+        new AppLevelTimelineCollector(applicationId);
+    rmContext.getRMTimelineCollectorManager().putIfAbsent(
+        applicationId, collector);
+  }
+
+  public void stopTimelineCollector() {
+    rmContext.getRMTimelineCollectorManager().remove(applicationId);
+  }
+
   @Override
   @Override
   public ApplicationId getApplicationId() {
   public ApplicationId getApplicationId() {
     return this.applicationId;
     return this.applicationId;
@@ -1352,6 +1365,8 @@ public class RMAppImpl implements RMApp, Recoverable {
           .applicationFinished(app, finalState);
           .applicationFinished(app, finalState);
       app.rmContext.getSystemMetricsPublisher()
       app.rmContext.getSystemMetricsPublisher()
           .appFinished(app, finalState, app.finishTime);
           .appFinished(app, finalState, app.finishTime);
+
+      app.stopTimelineCollector();
     };
     };
   }
   }
 
 

+ 0 - 111
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java

@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
-
-/**
- * This class is responsible for posting application and appattempt lifecycle
- * related events to timeline service V2
- */
-@Private
-@Unstable
-public class RMTimelineCollector extends TimelineCollector {
-  private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class);
-
-  public RMTimelineCollector() {
-    super("Resource Manager TimelineCollector");
-  }
-
-  private Dispatcher dispatcher;
-
-  private boolean publishSystemMetricsForV2;
-
-  @Override
-  protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetricsForV2 =
-        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
-            && conf.getBoolean(
-                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
-                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
-
-    if (publishSystemMetricsForV2) {
-      // having separate dispatcher to avoid load on RMDispatcher
-      LOG.info("RMTimelineCollector has been configured to publish"
-          + " System Metrics in ATS V2");
-      dispatcher = new AsyncDispatcher();
-      dispatcher.register(SystemMetricsEventType.class,
-          new ForwardingEventHandler());
-    } else {
-      LOG.warn("RMTimelineCollector has not been configured to publish"
-          + " System Metrics in ATS V2");
-    }
-    super.serviceInit(conf);
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    super.serviceStop();
-  }
-
-  protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
-    switch (event.getType()) {
-    default:
-      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
-    }
-  }
-  
-  @Override
-  protected TimelineCollectorContext getTimelineEntityContext() {
-    // TODO address in YARN-3390.
-    return null;
-  }
-
-  /**
-   * EventHandler implementation which forward events to SystemMetricsPublisher.
-   * Making use of it, SystemMetricsPublisher can avoid to have a public handle
-   * method.
-   */
-  private final class ForwardingEventHandler implements
-      EventHandler<SystemMetricsEvent> {
-
-    @Override
-    public void handle(SystemMetricsEvent event) {
-      handleSystemMetricsEvent(event);
-    }
-  }
-}

+ 75 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollectorManager.java

@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
+
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class RMTimelineCollectorManager extends TimelineCollectorManager {
+  private RMContext rmContext;
+
+  public RMTimelineCollectorManager(RMContext rmContext) {
+    super(RMTimelineCollectorManager.class.getName());
+    this.rmContext = rmContext;
+  }
+
+  @Override
+  public void postPut(ApplicationId appId, TimelineCollector collector) {
+    RMApp app = rmContext.getRMApps().get(appId);
+    if (app == null) {
+      throw new YarnRuntimeException(
+          "Unable to get the timeline collector context info for a non-existing app " +
+              appId);
+    }
+    String userId = app.getUser();
+    if (userId != null && !userId.isEmpty()) {
+      collector.getTimelineEntityContext().setUserId(userId);
+    }
+    for (String tag : app.getApplicationTags()) {
+      String[] parts = tag.split(":", 2);
+      if (parts.length != 2 || parts[1].isEmpty()) {
+        continue;
+      }
+      switch (parts[0]) {
+        case TimelineUtils.FLOW_NAME_TAG_PREFIX:
+          collector.getTimelineEntityContext().setFlowName(parts[1]);
+          break;
+        case TimelineUtils.FLOW_VERSION_TAG_PREFIX:
+          collector.getTimelineEntityContext().setFlowVersion(parts[1]);
+          break;
+        case TimelineUtils.FLOW_RUN_ID_TAG_PREFIX:
+          collector.getTimelineEntityContext().setFlowRunId(
+              Long.valueOf(parts[1]));
+          break;
+        default:
+          break;
+      }
+    }
+  }
+}

+ 6 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java

@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
 import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
-import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorManager;
+import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
 import org.junit.AfterClass;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.Test;
@@ -42,13 +42,13 @@ import org.junit.Test;
 import java.io.IOException;
 import java.io.IOException;
 
 
 public class TestTimelineServiceClientIntegration {
 public class TestTimelineServiceClientIntegration {
-  private static TimelineCollectorManager collectorManager;
+  private static NodeTimelineCollectorManager collectorManager;
   private static PerNodeTimelineCollectorsAuxService auxService;
   private static PerNodeTimelineCollectorsAuxService auxService;
 
 
   @BeforeClass
   @BeforeClass
   public static void setupClass() throws Exception {
   public static void setupClass() throws Exception {
     try {
     try {
-      collectorManager = new MyTimelineCollectorManager();
+      collectorManager = new MockNodeTimelineCollectorManager();
       auxService =
       auxService =
           PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
           PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
               collectorManager);
               collectorManager);
@@ -85,9 +85,9 @@ public class TestTimelineServiceClientIntegration {
     }
     }
   }
   }
 
 
-  private static class MyTimelineCollectorManager extends
-      TimelineCollectorManager {
-    public MyTimelineCollectorManager() {
+  private static class MockNodeTimelineCollectorManager extends
+      NodeTimelineCollectorManager {
+    public MockNodeTimelineCollectorManager() {
       super();
       super();
     }
     }
 
 

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/AppLevelTimelineCollector.java

@@ -75,7 +75,7 @@ public class AppLevelTimelineCollector extends TimelineCollector {
   }
   }
 
 
   @Override
   @Override
-  protected TimelineCollectorContext getTimelineEntityContext() {
+  public TimelineCollectorContext getTimelineEntityContext() {
     return context;
     return context;
   }
   }
 
 

+ 223 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/NodeTimelineCollectorManager.java

@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.collector;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.HashMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer2;
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
+import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
+import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/**
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ *
+ */
+@Private
+@Unstable
+public class NodeTimelineCollectorManager extends TimelineCollectorManager {
+  private static final Log LOG =
+      LogFactory.getLog(NodeTimelineCollectorManager.class);
+  private static final NodeTimelineCollectorManager INSTANCE =
+      new NodeTimelineCollectorManager();
+
+
+  // REST server for this collector manager
+  private HttpServer2 timelineRestServer;
+
+  private String timelineRestServerBindAddress;
+
+  private CollectorNodemanagerProtocol nmCollectorService;
+
+  private InetSocketAddress nmCollectorServiceAddress;
+
+  static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
+
+  static NodeTimelineCollectorManager getInstance() {
+    return INSTANCE;
+  }
+
+  @VisibleForTesting
+  protected NodeTimelineCollectorManager() {
+    super(NodeTimelineCollectorManager.class.getName());
+  }
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    this.nmCollectorServiceAddress = conf.getSocketAddr(
+        YarnConfiguration.NM_BIND_HOST,
+        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
+        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    nmCollectorService = getNMCollectorService();
+    startWebApp();
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (timelineRestServer != null) {
+      timelineRestServer.stop();
+    }
+    super.serviceStop();
+  }
+
+  @Override
+  public void postPut(ApplicationId appId, TimelineCollector collector) {
+    try {
+      // Get context info from NM
+      updateTimelineCollectorContext(appId, collector);
+      // Report to NM if a new collector is added.
+      reportNewCollectorToNM(appId);
+    } catch (YarnException | IOException e) {
+      // throw exception here as it cannot be used if failed communicate with NM
+      LOG.error("Failed to communicate with NM Collector Service for " + appId);
+      throw new YarnRuntimeException(e);
+    }
+  }
+
+  /**
+   * Launch the REST web server for this collector manager
+   */
+  private void startWebApp() {
+    Configuration conf = getConfig();
+    String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
+    try {
+      Configuration confForInfoServer = new Configuration(conf);
+      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
+      HttpServer2.Builder builder = new HttpServer2.Builder()
+          .setName("timeline")
+          .setConf(conf)
+          .addEndpoint(URI.create(
+              (YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
+                  bindAddress));
+      timelineRestServer = builder.build();
+      // TODO: replace this by an authentication filter in future.
+      HashMap<String, String> options = new HashMap<>();
+      String username = conf.get(HADOOP_HTTP_STATIC_USER,
+          DEFAULT_HADOOP_HTTP_STATIC_USER);
+      options.put(HADOOP_HTTP_STATIC_USER, username);
+      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
+          "static_user_filter_timeline",
+          StaticUserWebFilter.StaticUserFilter.class.getName(),
+          options, new String[] {"/*"});
+
+      timelineRestServer.addJerseyResourcePackage(
+          TimelineCollectorWebService.class.getPackage().getName() + ";"
+              + GenericExceptionHandler.class.getPackage().getName() + ";"
+              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
+          "/*");
+      timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this);
+      timelineRestServer.start();
+    } catch (Exception e) {
+      String msg = "The per-node collector webapp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+    //TODO: We need to think of the case of multiple interfaces
+    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
+        timelineRestServer.getConnectorAddress(0));
+    LOG.info("Instantiated the per-node collector webapp at " +
+        timelineRestServerBindAddress);
+  }
+
+  private void reportNewCollectorToNM(ApplicationId appId)
+      throws YarnException, IOException {
+    ReportNewCollectorInfoRequest request =
+        ReportNewCollectorInfoRequest.newInstance(appId,
+            this.timelineRestServerBindAddress);
+    LOG.info("Report a new collector for application: " + appId +
+        " to the NM Collector Service.");
+    nmCollectorService.reportNewCollectorInfo(request);
+  }
+
+  private void updateTimelineCollectorContext(
+      ApplicationId appId, TimelineCollector collector)
+      throws YarnException, IOException {
+    GetTimelineCollectorContextRequest request =
+        GetTimelineCollectorContextRequest.newInstance(appId);
+    LOG.info("Get timeline collector context for " + appId);
+    GetTimelineCollectorContextResponse response =
+        nmCollectorService.getTimelineCollectorContext(request);
+    String userId = response.getUserId();
+    if (userId != null && !userId.isEmpty()) {
+      collector.getTimelineEntityContext().setUserId(userId);
+    }
+    String flowName = response.getFlowName();
+    if (flowName != null && !flowName.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowName(flowName);
+    }
+    String flowVersion = response.getFlowVersion();
+    if (flowVersion != null && !flowVersion.isEmpty()) {
+      collector.getTimelineEntityContext().setFlowVersion(flowVersion);
+    }
+    long flowRunId = response.getFlowRunId();
+    if (flowRunId != 0L) {
+      collector.getTimelineEntityContext().setFlowRunId(flowRunId);
+    }
+  }
+
+  @VisibleForTesting
+  protected CollectorNodemanagerProtocol getNMCollectorService() {
+    Configuration conf = getConfig();
+    final YarnRPC rpc = YarnRPC.create(conf);
+
+    // TODO Security settings.
+    return (CollectorNodemanagerProtocol) rpc.getProxy(
+        CollectorNodemanagerProtocol.class,
+        nmCollectorServiceAddress, conf);
+  }
+
+  @VisibleForTesting
+  public String getRestServerBindAddress() {
+    return timelineRestServerBindAddress;
+  }
+}

+ 7 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/PerNodeTimelineCollectorsAuxService.java

@@ -53,15 +53,15 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
       LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class);
       LogFactory.getLog(PerNodeTimelineCollectorsAuxService.class);
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
   private static final int SHUTDOWN_HOOK_PRIORITY = 30;
 
 
-  private final TimelineCollectorManager collectorManager;
+  private final NodeTimelineCollectorManager collectorManager;
 
 
   public PerNodeTimelineCollectorsAuxService() {
   public PerNodeTimelineCollectorsAuxService() {
     // use the same singleton
     // use the same singleton
-    this(TimelineCollectorManager.getInstance());
+    this(NodeTimelineCollectorManager.getInstance());
   }
   }
 
 
   @VisibleForTesting PerNodeTimelineCollectorsAuxService(
   @VisibleForTesting PerNodeTimelineCollectorsAuxService(
-      TimelineCollectorManager collectorsManager) {
+      NodeTimelineCollectorManager collectorsManager) {
     super("timeline_collector");
     super("timeline_collector");
     this.collectorManager = collectorsManager;
     this.collectorManager = collectorsManager;
   }
   }
@@ -108,8 +108,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
    * @return whether it was removed successfully
    * @return whether it was removed successfully
    */
    */
   public boolean removeApplication(ApplicationId appId) {
   public boolean removeApplication(ApplicationId appId) {
-    String appIdString = appId.toString();
-    return collectorManager.remove(appIdString);
+    return collectorManager.remove(appId);
   }
   }
 
 
   /**
   /**
@@ -153,8 +152,8 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  boolean hasApplication(String appId) {
-    return collectorManager.containsKey(appId);
+  boolean hasApplication(ApplicationId appId) {
+    return collectorManager.containsTimelineCollector(appId);
   }
   }
 
 
   @Override
   @Override
@@ -174,7 +173,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
 
 
   @VisibleForTesting
   @VisibleForTesting
   public static PerNodeTimelineCollectorsAuxService
   public static PerNodeTimelineCollectorsAuxService
-      launchServer(String[] args, TimelineCollectorManager collectorManager) {
+      launchServer(String[] args, NodeTimelineCollectorManager collectorManager) {
     Thread
     Thread
       .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
       .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(
     StringUtils.startupShutdownMessage(

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java

@@ -124,6 +124,6 @@ public abstract class TimelineCollector extends CompositeService {
     }
     }
   }
   }
 
 
-  protected abstract TimelineCollectorContext getTimelineEntityContext();
+  public abstract TimelineCollectorContext getTimelineEntityContext();
 
 
 }
 }

+ 43 - 216
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java

@@ -18,173 +18,97 @@
 
 
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 package org.apache.hadoop.yarn.server.timelineservice.collector;
 
 
-import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
-import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
 
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.http.HttpServer2;
-import org.apache.hadoop.http.lib.StaticUserWebFilter;
-import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
-import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
-import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
-import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
-import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
-import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 
-import com.google.common.annotations.VisibleForTesting;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 
 /**
 /**
  * Class that manages adding and removing collectors and their lifecycle. It
  * Class that manages adding and removing collectors and their lifecycle. It
  * provides thread safety access to the collectors inside.
  * provides thread safety access to the collectors inside.
  *
  *
- * It is a singleton, and instances should be obtained via
- * {@link #getInstance()}.
  */
  */
-@Private
-@Unstable
-public class TimelineCollectorManager extends CompositeService {
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TimelineCollectorManager extends AbstractService {
   private static final Log LOG =
   private static final Log LOG =
       LogFactory.getLog(TimelineCollectorManager.class);
       LogFactory.getLog(TimelineCollectorManager.class);
-  private static final TimelineCollectorManager INSTANCE =
-      new TimelineCollectorManager();
 
 
   // access to this map is synchronized with the map itself
   // access to this map is synchronized with the map itself
-  private final Map<String, TimelineCollector> collectors =
+  private final Map<ApplicationId, TimelineCollector> collectors =
       Collections.synchronizedMap(
       Collections.synchronizedMap(
-          new HashMap<String, TimelineCollector>());
-
-  // REST server for this collector manager
-  private HttpServer2 timelineRestServer;
-
-  private String timelineRestServerBindAddress;
-
-  private CollectorNodemanagerProtocol nmCollectorService;
-
-  private InetSocketAddress nmCollectorServiceAddress;
-
-  static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
-
-  static TimelineCollectorManager getInstance() {
-    return INSTANCE;
-  }
+          new HashMap<ApplicationId, TimelineCollector>());
 
 
-  @VisibleForTesting
-  protected TimelineCollectorManager() {
-    super(TimelineCollectorManager.class.getName());
-  }
-
-  @Override
-  public void serviceInit(Configuration conf) throws Exception {
-    this.nmCollectorServiceAddress = conf.getSocketAddr(
-        YarnConfiguration.NM_BIND_HOST,
-        YarnConfiguration.NM_COLLECTOR_SERVICE_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_ADDRESS,
-        YarnConfiguration.DEFAULT_NM_COLLECTOR_SERVICE_PORT);
-
-  }
-
-  @Override
-  protected void serviceStart() throws Exception {
-    nmCollectorService = getNMCollectorService();
-    startWebApp();
-    super.serviceStart();
-  }
-
-  @Override
-  protected void serviceStop() throws Exception {
-    if (timelineRestServer != null) {
-      timelineRestServer.stop();
-    }
-    super.serviceStop();
+  protected TimelineCollectorManager(String name) {
+    super(name);
   }
   }
 
 
   /**
   /**
    * Put the collector into the collection if an collector mapped by id does
    * Put the collector into the collection if an collector mapped by id does
    * not exist.
    * not exist.
    *
    *
-   * @throws YarnRuntimeException if there was any exception in initializing and
-   * starting the app level service
+   * @throws YarnRuntimeException if there  was any exception in initializing
+   *                              and starting the app level service
    * @return the collector associated with id after the potential put.
    * @return the collector associated with id after the potential put.
    */
    */
   public TimelineCollector putIfAbsent(ApplicationId appId,
   public TimelineCollector putIfAbsent(ApplicationId appId,
       TimelineCollector collector) {
       TimelineCollector collector) {
-    String id = appId.toString();
-    TimelineCollector collectorInTable;
-    boolean collectorIsNew = false;
+    TimelineCollector collectorInTable = null;
     synchronized (collectors) {
     synchronized (collectors) {
-      collectorInTable = collectors.get(id);
+      collectorInTable = collectors.get(appId);
       if (collectorInTable == null) {
       if (collectorInTable == null) {
         try {
         try {
           // initialize, start, and add it to the collection so it can be
           // initialize, start, and add it to the collection so it can be
           // cleaned up when the parent shuts down
           // cleaned up when the parent shuts down
           collector.init(getConfig());
           collector.init(getConfig());
           collector.start();
           collector.start();
-          collectors.put(id, collector);
-          LOG.info("the collector for " + id + " was added");
+          collectors.put(appId, collector);
+          LOG.info("the collector for " + appId + " was added");
           collectorInTable = collector;
           collectorInTable = collector;
-          collectorIsNew = true;
+          postPut(appId, collectorInTable);
         } catch (Exception e) {
         } catch (Exception e) {
           throw new YarnRuntimeException(e);
           throw new YarnRuntimeException(e);
         }
         }
       } else {
       } else {
-        String msg = "the collector for " + id + " already exists!";
-        LOG.error(msg);
-      }
-
-    }
-    // Report to NM if a new collector is added.
-    if (collectorIsNew) {
-      try {
-        updateTimelineCollectorContext(appId, collector);
-        reportNewCollectorToNM(appId);
-      } catch (Exception e) {
-        // throw exception here as it cannot be used if failed communicate with NM
-        LOG.error("Failed to communicate with NM Collector Service for " + appId);
-        throw new YarnRuntimeException(e);
+        LOG.info("the collector for " + appId + " already exists!");
       }
       }
     }
     }
-
     return collectorInTable;
     return collectorInTable;
   }
   }
 
 
+  protected void postPut(ApplicationId appId, TimelineCollector collector) {
+
+  }
+
   /**
   /**
    * Removes the collector for the specified id. The collector is also stopped
    * Removes the collector for the specified id. The collector is also stopped
    * as a result. If the collector does not exist, no change is made.
    * as a result. If the collector does not exist, no change is made.
    *
    *
    * @return whether it was removed successfully
    * @return whether it was removed successfully
    */
    */
-  public boolean remove(String id) {
-    synchronized (collectors) {
-      TimelineCollector collector = collectors.remove(id);
-      if (collector == null) {
-        String msg = "the collector for " + id + " does not exist!";
-        LOG.error(msg);
-        return false;
-      } else {
-        // stop the service to do clean up
-        collector.stop();
-        LOG.info("the collector service for " + id + " was removed");
-        return true;
-      }
+  public boolean remove(ApplicationId appId) {
+    TimelineCollector collector = collectors.remove(appId);
+    if (collector == null) {
+      LOG.error("the collector for " + appId + " does not exist!");
+    } else {
+      postRemove(appId, collector);
+      // stop the service to do clean up
+      collector.stop();
+      LOG.info("the collector service for " + appId + " was removed");
     }
     }
+    return collector != null;
+  }
+
+  protected void postRemove(ApplicationId appId, TimelineCollector collector) {
+
   }
   }
 
 
   /**
   /**
@@ -192,113 +116,16 @@ public class TimelineCollectorManager extends CompositeService {
    *
    *
    * @return the collector or null if it does not exist
    * @return the collector or null if it does not exist
    */
    */
-  public TimelineCollector get(String id) {
-    return collectors.get(id);
+  public TimelineCollector get(ApplicationId appId) {
+    return collectors.get(appId);
   }
   }
 
 
   /**
   /**
    * Returns whether the collector for the specified id exists in this
    * Returns whether the collector for the specified id exists in this
    * collection.
    * collection.
    */
    */
-  public boolean containsKey(String id) {
-    return collectors.containsKey(id);
-  }
-
-  /**
-   * Launch the REST web server for this collector manager
-   */
-  private void startWebApp() {
-    Configuration conf = getConfig();
-    String bindAddress = conf.get(YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
-        YarnConfiguration.DEFAULT_TIMELINE_SERVICE_BIND_HOST) + ":0";
-    try {
-      Configuration confForInfoServer = new Configuration(conf);
-      confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
-      HttpServer2.Builder builder = new HttpServer2.Builder()
-          .setName("timeline")
-          .setConf(conf)
-          .addEndpoint(URI.create(
-              (YarnConfiguration.useHttps(conf) ? "https://" : "http://") +
-                  bindAddress));
-      timelineRestServer = builder.build();
-      // TODO: replace this by an authentication filter in future.
-      HashMap<String, String> options = new HashMap<>();
-      String username = conf.get(HADOOP_HTTP_STATIC_USER,
-          DEFAULT_HADOOP_HTTP_STATIC_USER);
-      options.put(HADOOP_HTTP_STATIC_USER, username);
-      HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
-          "static_user_filter_timeline",
-          StaticUserWebFilter.StaticUserFilter.class.getName(),
-          options, new String[] {"/*"});
-
-      timelineRestServer.addJerseyResourcePackage(
-          TimelineCollectorWebService.class.getPackage().getName() + ";"
-              + GenericExceptionHandler.class.getPackage().getName() + ";"
-              + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
-          "/*");
-      timelineRestServer.setAttribute(COLLECTOR_MANAGER_ATTR_KEY, this);
-      timelineRestServer.start();
-    } catch (Exception e) {
-      String msg = "The per-node collector webapp failed to start.";
-      LOG.error(msg, e);
-      throw new YarnRuntimeException(msg, e);
-    }
-    //TODO: We need to think of the case of multiple interfaces
-    this.timelineRestServerBindAddress = WebAppUtils.getResolvedAddress(
-        timelineRestServer.getConnectorAddress(0));
-    LOG.info("Instantiated the per-node collector webapp at " +
-        timelineRestServerBindAddress);
-  }
-
-  private void reportNewCollectorToNM(ApplicationId appId)
-      throws YarnException, IOException {
-    ReportNewCollectorInfoRequest request =
-        ReportNewCollectorInfoRequest.newInstance(appId,
-            this.timelineRestServerBindAddress);
-    LOG.info("Report a new collector for application: " + appId +
-        " to the NM Collector Service.");
-    nmCollectorService.reportNewCollectorInfo(request);
+  public boolean containsTimelineCollector(ApplicationId appId) {
+    return collectors.containsKey(appId);
   }
   }
 
 
-  private void updateTimelineCollectorContext(
-      ApplicationId appId, TimelineCollector collector)
-      throws YarnException, IOException {
-    GetTimelineCollectorContextRequest request =
-        GetTimelineCollectorContextRequest.newInstance(appId);
-    LOG.info("Get timeline collector context for " + appId);
-    GetTimelineCollectorContextResponse response =
-        nmCollectorService.getTimelineCollectorContext(request);
-    String userId = response.getUserId();
-    if (userId != null && !userId.isEmpty()) {
-      collector.getTimelineEntityContext().setUserId(userId);
-    }
-    String flowName = response.getFlowName();
-    if (flowName != null && !flowName.isEmpty()) {
-      collector.getTimelineEntityContext().setFlowName(flowName);
-    }
-    String flowVersion = response.getFlowVersion();
-    if (flowVersion != null && !flowVersion.isEmpty()) {
-      collector.getTimelineEntityContext().setFlowVersion(flowVersion);
-    }
-    long flowRunId = response.getFlowRunId();
-    if (flowRunId != 0L) {
-      collector.getTimelineEntityContext().setFlowRunId(flowRunId);
-    }
-  }
-
-  @VisibleForTesting
-  protected CollectorNodemanagerProtocol getNMCollectorService() {
-    Configuration conf = getConfig();
-    final YarnRPC rpc = YarnRPC.create(conf);
-
-    // TODO Security settings.
-    return (CollectorNodemanagerProtocol) rpc.getProxy(
-        CollectorNodemanagerProtocol.class,
-        nmCollectorServiceAddress, conf);
-  }
-
-  @VisibleForTesting
-  public String getRestServerBindAddress() {
-    return timelineRestServerBindAddress;
-  }
 }
 }

+ 9 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorWebService.java

@@ -42,6 +42,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
@@ -129,11 +130,14 @@ public class TimelineCollectorWebService {
     boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
     boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
 
 
     try {
     try {
-      appId = parseApplicationId(appId);
-      if (appId == null) {
+      ApplicationId appID = parseApplicationId(appId);
+      if (appID == null) {
         return Response.status(Response.Status.BAD_REQUEST).build();
         return Response.status(Response.Status.BAD_REQUEST).build();
       }
       }
-      TimelineCollector collector = getCollector(req, appId);
+      NodeTimelineCollectorManager collectorManager =
+          (NodeTimelineCollectorManager) context.getAttribute(
+              NodeTimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
+      TimelineCollector collector = collectorManager.get(appID);
       if (collector == null) {
       if (collector == null) {
         LOG.error("Application: "+ appId + " is not found");
         LOG.error("Application: "+ appId + " is not found");
         throw new NotFoundException(); // different exception?
         throw new NotFoundException(); // different exception?
@@ -147,10 +151,10 @@ public class TimelineCollectorWebService {
     }
     }
   }
   }
 
 
-  private String parseApplicationId(String appId) {
+  private ApplicationId parseApplicationId(String appId) {
     try {
     try {
       if (appId != null) {
       if (appId != null) {
-        return ConverterUtils.toApplicationId(appId.trim()).toString();
+        return ConverterUtils.toApplicationId(appId.trim());
       } else {
       } else {
         return null;
         return null;
       }
       }
@@ -159,15 +163,6 @@ public class TimelineCollectorWebService {
     }
     }
   }
   }
 
 
-  private TimelineCollector
-      getCollector(HttpServletRequest req, String appIdToParse) {
-    String appIdString = parseApplicationId(appIdToParse);
-    final TimelineCollectorManager collectorManager =
-        (TimelineCollectorManager) context.getAttribute(
-            TimelineCollectorManager.COLLECTOR_MANAGER_ATTR_KEY);
-    return collectorManager.get(appIdString);
-  }
-
   private void init(HttpServletResponse response) {
   private void init(HttpServletResponse response) {
     response.setContentType(null);
     response.setContentType(null);
   }
   }

+ 8 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestTimelineCollectorManager.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestNMTimelineCollectorManager.java

@@ -49,8 +49,8 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 
 
-public class TestTimelineCollectorManager {
-  private TimelineCollectorManager collectorManager;
+public class TestNMTimelineCollectorManager {
+  private NodeTimelineCollectorManager collectorManager;
 
 
   @Before
   @Before
   public void setup() throws Exception {
   public void setup() throws Exception {
@@ -103,7 +103,7 @@ public class TestTimelineCollectorManager {
     // check the keys
     // check the keys
     for (int i = 0; i < NUM_APPS; i++) {
     for (int i = 0; i < NUM_APPS; i++) {
       final ApplicationId appId = ApplicationId.newInstance(0L, i);
       final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      assertTrue(collectorManager.containsKey(appId.toString()));
+      assertTrue(collectorManager.containsTimelineCollector(appId));
     }
     }
   }
   }
 
 
@@ -119,7 +119,7 @@ public class TestTimelineCollectorManager {
               new AppLevelTimelineCollector(appId);
               new AppLevelTimelineCollector(appId);
           boolean successPut =
           boolean successPut =
               (collectorManager.putIfAbsent(appId, collector) == collector);
               (collectorManager.putIfAbsent(appId, collector) == collector);
-          return successPut && collectorManager.remove(appId.toString());
+          return successPut && collectorManager.remove(appId);
         }
         }
       };
       };
       tasks.add(task);
       tasks.add(task);
@@ -136,13 +136,13 @@ public class TestTimelineCollectorManager {
     // check the keys
     // check the keys
     for (int i = 0; i < NUM_APPS; i++) {
     for (int i = 0; i < NUM_APPS; i++) {
       final ApplicationId appId = ApplicationId.newInstance(0L, i);
       final ApplicationId appId = ApplicationId.newInstance(0L, i);
-      assertFalse(collectorManager.containsKey(appId.toString()));
+      assertFalse(collectorManager.containsTimelineCollector(appId));
     }
     }
   }
   }
 
 
-  private TimelineCollectorManager createCollectorManager() {
-    final TimelineCollectorManager collectorManager =
-        spy(new TimelineCollectorManager());
+  private NodeTimelineCollectorManager createCollectorManager() {
+    final NodeTimelineCollectorManager collectorManager =
+        spy(new NodeTimelineCollectorManager());
     doReturn(new Configuration()).when(collectorManager).getConfig();
     doReturn(new Configuration()).when(collectorManager).getConfig();
     CollectorNodemanagerProtocol nmCollectorService =
     CollectorNodemanagerProtocol nmCollectorService =
         mock(CollectorNodemanagerProtocol.class);
         mock(CollectorNodemanagerProtocol.class);

+ 10 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/collector/TestPerNodeTimelineCollectorsAuxService.java

@@ -67,8 +67,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
   public void testAddApplication() throws Exception {
   public void testAddApplication() throws Exception {
     auxService = createCollectorAndAddApplication();
     auxService = createCollectorAndAddApplication();
     // auxService should have a single app
     // auxService should have a single app
-    assertTrue(auxService.hasApplication(
-        appAttemptId.getApplicationId().toString()));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
     auxService.close();
     auxService.close();
   }
   }
 
 
@@ -82,16 +81,14 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerId()).thenReturn(containerId);
     when(context.getContainerId()).thenReturn(containerId);
     auxService.initializeContainer(context);
     auxService.initializeContainer(context);
     // auxService should not have that app
     // auxService should not have that app
-    assertFalse(auxService.hasApplication(
-        appAttemptId.getApplicationId().toString()));
+    assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
   }
   }
 
 
   @Test
   @Test
   public void testRemoveApplication() throws Exception {
   public void testRemoveApplication() throws Exception {
     auxService = createCollectorAndAddApplication();
     auxService = createCollectorAndAddApplication();
     // auxService should have a single app
     // auxService should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(auxService.hasApplication(appIdStr));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
 
 
     ContainerId containerId = getAMContainerId();
     ContainerId containerId = getAMContainerId();
     ContainerTerminationContext context =
     ContainerTerminationContext context =
@@ -99,7 +96,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerId()).thenReturn(containerId);
     when(context.getContainerId()).thenReturn(containerId);
     auxService.stopContainer(context);
     auxService.stopContainer(context);
     // auxService should not have that app
     // auxService should not have that app
-    assertFalse(auxService.hasApplication(appIdStr));
+    assertFalse(auxService.hasApplication(appAttemptId.getApplicationId()));
     auxService.close();
     auxService.close();
   }
   }
 
 
@@ -107,8 +104,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
   public void testRemoveApplicationNonAMContainer() throws Exception {
   public void testRemoveApplicationNonAMContainer() throws Exception {
     auxService = createCollectorAndAddApplication();
     auxService = createCollectorAndAddApplication();
     // auxService should have a single app
     // auxService should have a single app
-    String appIdStr = appAttemptId.getApplicationId().toString();
-    assertTrue(auxService.hasApplication(appIdStr));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
 
 
     ContainerId containerId = getContainerId(2L); // not an AM
     ContainerId containerId = getContainerId(2L); // not an AM
     ContainerTerminationContext context =
     ContainerTerminationContext context =
@@ -116,7 +112,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
     when(context.getContainerId()).thenReturn(containerId);
     when(context.getContainerId()).thenReturn(containerId);
     auxService.stopContainer(context);
     auxService.stopContainer(context);
     // auxService should still have that app
     // auxService should still have that app
-    assertTrue(auxService.hasApplication(appIdStr));
+    assertTrue(auxService.hasApplication(appAttemptId.getApplicationId()));
     auxService.close();
     auxService.close();
   }
   }
 
 
@@ -147,7 +143,7 @@ public class TestPerNodeTimelineCollectorsAuxService {
   }
   }
 
 
   private PerNodeTimelineCollectorsAuxService createCollector() {
   private PerNodeTimelineCollectorsAuxService createCollector() {
-    TimelineCollectorManager collectorManager = createCollectorManager();
+    NodeTimelineCollectorManager collectorManager = createCollectorManager();
     PerNodeTimelineCollectorsAuxService auxService =
     PerNodeTimelineCollectorsAuxService auxService =
         spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
         spy(new PerNodeTimelineCollectorsAuxService(collectorManager));
     auxService.init(new YarnConfiguration());
     auxService.init(new YarnConfiguration());
@@ -155,9 +151,9 @@ public class TestPerNodeTimelineCollectorsAuxService {
     return auxService;
     return auxService;
   }
   }
 
 
-  private TimelineCollectorManager createCollectorManager() {
-    TimelineCollectorManager collectorManager =
-        spy(new TimelineCollectorManager());
+  private NodeTimelineCollectorManager createCollectorManager() {
+    NodeTimelineCollectorManager collectorManager =
+        spy(new NodeTimelineCollectorManager());
     doReturn(new Configuration()).when(collectorManager).getConfig();
     doReturn(new Configuration()).when(collectorManager).getConfig();
     CollectorNodemanagerProtocol nmCollectorService =
     CollectorNodemanagerProtocol nmCollectorService =
         mock(CollectorNodemanagerProtocol.class);
         mock(CollectorNodemanagerProtocol.class);