فهرست منبع

YARN-3034. Implement RM starting its timeline collector. Contributed by Naganarasimha G R

Junping Du 10 سال پیش
والد
کامیت
acc9e5483f
10فایلهای تغییر یافته به همراه242 افزوده شده و 31 حذف شده
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 11 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  3. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  4. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml
  5. 43 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
  6. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  7. 13 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  8. 33 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  9. 16 15
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java
  10. 104 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -38,6 +38,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3377. Fixed test failure in TestTimelineServiceClientIntegration.
     (Sangjin Lee via zjshen)
 
+    YARN-3034. Implement RM starting its timeline collector. (Naganarasimha G R
+    via junping_du)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

+ 11 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -391,12 +391,20 @@ public class YarnConfiguration extends Configuration {
 
   /**
    *  The setting that controls whether yarn system metrics is published on the
-   *  timeline server or not by RM.
+   *  timeline server or not by RM. This configuration setting is for ATS V1
    */
-  public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED =
-      RM_PREFIX + "system-metrics-publisher.enabled";
+  public static final String RM_SYSTEM_METRICS_PUBLISHER_ENABLED = RM_PREFIX
+      + "system-metrics-publisher.enabled";
   public static final boolean DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
 
+  /**
+   *  The setting that controls whether yarn system metrics is published on the
+   *  timeline server or not by RM and NM. This configuration setting is for ATS V2
+   */
+  public static final String SYSTEM_METRICS_PUBLISHER_ENABLED = YARN_PREFIX
+      + "system-metrics-publisher.enabled";
+  public static final boolean DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED = false;
+
   public static final String RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =
       RM_PREFIX + "system-metrics-publisher.dispatcher.pool-size";
   public static final int DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE =

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -770,11 +770,20 @@
 
   <property>
     <description>The setting that controls whether yarn system metrics is
-    published on the timeline server or not by RM.</description>
+    published to the Timeline server (version one) or not, by RM. 
+    This configuration is deprecated.</description>
     <name>yarn.resourcemanager.system-metrics-publisher.enabled</name>
     <value>false</value>
   </property>
 
+  <property>
+    <description>The setting that controls whether yarn system metrics is
+    published on the Timeline server (version two) or not by RM And NM.</description>
+    <name>yarn.system-metrics-publisher.enabled</name>
+    <value>false</value>
+  </property>
+ 
+
   <property>
     <description>Number of worker threads that send the yarn system metrics
     data.</description>

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml

@@ -170,6 +170,10 @@
       <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>	
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-server-web-proxy</artifactId>

+ 43 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java

@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
 
@@ -94,6 +95,10 @@ public class RMActiveServiceContext {
   private NodesListManager nodesListManager;
   private ResourceTrackerService resourceTrackerService;
   private ApplicationMasterService applicationMasterService;
+  private RMApplicationHistoryWriter rmApplicationHistoryWriter;
+  private SystemMetricsPublisher systemMetricsPublisher;
+  private RMTimelineCollector timelineCollector;
+
   private RMNodeLabelsManager nodeLabelManager;
   private RMDelegatedNodeLabelsUpdater rmDelegatedNodeLabelsUpdater;
   private long epoch;
@@ -368,6 +373,44 @@ public class RMActiveServiceContext {
     return this.isWorkPreservingRecoveryEnabled;
   }
 
+  @Private
+  @Unstable
+  public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
+    return rmApplicationHistoryWriter;
+  }
+
+  @Private
+  @Unstable
+  public RMTimelineCollector getRMTimelineCollector() {
+    return timelineCollector;
+  }
+
+  @Private
+  @Unstable
+  public void setRMTimelineCollector(RMTimelineCollector timelineCollector) {
+    this.timelineCollector = timelineCollector;
+  }
+
+  @Private
+  @Unstable
+  public void setSystemMetricsPublisher(
+      SystemMetricsPublisher systemMetricsPublisher) {
+    this.systemMetricsPublisher = systemMetricsPublisher;
+  }
+
+  @Private
+  @Unstable
+  public SystemMetricsPublisher getSystemMetricsPublisher() {
+    return systemMetricsPublisher;
+  }
+
+  @Private
+  @Unstable
+  public void setRMApplicationHistoryWriter(
+      RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+    this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
+  }
+
   @Private
   @Unstable
   public long getEpoch() {

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 
 /**
  * Context of the ResourceManager.
@@ -110,6 +111,10 @@ public interface RMContext {
   void setSystemMetricsPublisher(SystemMetricsPublisher systemMetricsPublisher);
 
   SystemMetricsPublisher getSystemMetricsPublisher();
+  
+  void setRMTimelineCollector(RMTimelineCollector timelineCollector);
+  
+  RMTimelineCollector getRMTimelineCollector();
 
   ConfigurationProvider getConfigurationProvider();
 

+ 13 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -22,8 +22,8 @@ import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ha.HAServiceProtocol;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRen
 import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMDelegationTokenSecretManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.util.Clock;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -365,6 +366,17 @@ public class RMContextImpl implements RMContext {
     return this.rmApplicationHistoryWriter;
   }
 
+  @Override
+  public void setRMTimelineCollector(
+      RMTimelineCollector timelineCollector) {
+    activeServiceContext.setRMTimelineCollector(timelineCollector);
+  }
+
+  @Override
+  public RMTimelineCollector getRMTimelineCollector() {
+    return activeServiceContext.getRMTimelineCollector();
+  }
+  
   @Override
   public void setSystemMetricsPublisher(
       SystemMetricsPublisher systemMetricsPublisher) {

+ 33 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -18,7 +18,16 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -78,11 +87,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAlloca
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.QueueACLsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollector;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilter;
@@ -97,15 +109,7 @@ import org.apache.hadoop.yarn.webapp.WebApps;
 import org.apache.hadoop.yarn.webapp.WebApps.Builder;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * The ResourceManager is the main class that is a set of components.
@@ -370,6 +374,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
     return new RMApplicationHistoryWriter();
   }
 
+  private RMTimelineCollector createRMTimelineCollector() {
+    return new RMTimelineCollector();
+  }
+
   protected SystemMetricsPublisher createSystemMetricsPublisher() {
     return new SystemMetricsPublisher(); 
   }
@@ -489,6 +497,20 @@ public class ResourceManager extends CompositeService implements Recoverable {
         rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
       }
 
+      RMApplicationHistoryWriter rmApplicationHistoryWriter =
+          createRMApplicationHistoryWriter();
+      addService(rmApplicationHistoryWriter);
+      rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+
+      SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
+      addService(systemMetricsPublisher);
+      rmContext.setSystemMetricsPublisher(systemMetricsPublisher);
+
+      RMTimelineCollector timelineCollector =
+          createRMTimelineCollector();
+      addService(timelineCollector);
+      rmContext.setRMTimelineCollector(timelineCollector);
+
       // Register event handler for NodesListManager
       nodesListManager = new NodesListManager(rmContext);
       rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);

+ 16 - 15
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/metrics/SystemMetricsPublisher.java

@@ -55,7 +55,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 
 /**
- * The class that helps RM publish metrics to the timeline server. RM will
+ * The class that helps RM publish metrics to the timeline server V1. RM will
  * always invoke the methods of this class regardless the service is enabled or
  * not. If it is disabled, publishing requests will be ignored silently.
  */
@@ -68,7 +68,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   private Dispatcher dispatcher;
   private TimelineClient client;
-  private boolean publishSystemMetrics;
+  private boolean publishSystemMetricsToATSv1;
 
   public SystemMetricsPublisher() {
     super(SystemMetricsPublisher.class.getName());
@@ -76,13 +76,14 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    publishSystemMetrics =
+    publishSystemMetricsToATSv1 =
         conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
-            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED) &&
-        conf.getBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED,
-            YarnConfiguration.DEFAULT_RM_SYSTEM_METRICS_PUBLISHER_ENABLED);
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+            && conf.getBoolean(
+                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
 
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       client = TimelineClient.createTimelineClient();
       addIfService(client);
 
@@ -99,7 +100,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void appCreated(RMApp app, long createdTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       ApplicationSubmissionContext appSubmissionContext =
           app.getApplicationSubmissionContext();
       dispatcher.getEventHandler().handle(
@@ -121,7 +122,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void appUpdated(RMApp app, long updatedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler()
           .handle(new ApplicationUpdatedEvent(app.getApplicationId(),
               app.getQueue(), updatedTime,
@@ -131,7 +132,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void appFinished(RMApp app, RMAppState state, long finishedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ApplicationFinishedEvent(
               app.getApplicationId(),
@@ -148,7 +149,7 @@ public class SystemMetricsPublisher extends CompositeService {
   @SuppressWarnings("unchecked")
   public void appACLsUpdated(RMApp app, String appViewACLs,
       long updatedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ApplicationACLsUpdatedEvent(
               app.getApplicationId(),
@@ -160,7 +161,7 @@ public class SystemMetricsPublisher extends CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptRegistered(RMAppAttempt appAttempt,
       long registeredTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       ContainerId container = (appAttempt.getMasterContainer() == null) ? null
           : appAttempt.getMasterContainer().getId();
       dispatcher.getEventHandler().handle(
@@ -178,7 +179,7 @@ public class SystemMetricsPublisher extends CompositeService {
   @SuppressWarnings("unchecked")
   public void appAttemptFinished(RMAppAttempt appAttempt,
       RMAppAttemptState appAttemtpState, RMApp app, long finishedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       ContainerId container = (appAttempt.getMasterContainer() == null) ? null
           : appAttempt.getMasterContainer().getId();
       dispatcher.getEventHandler().handle(
@@ -198,7 +199,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerCreated(RMContainer container, long createdTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ContainerCreatedEvent(
               container.getContainerId(),
@@ -211,7 +212,7 @@ public class SystemMetricsPublisher extends CompositeService {
 
   @SuppressWarnings("unchecked")
   public void containerFinished(RMContainer container, long finishedTime) {
-    if (publishSystemMetrics) {
+    if (publishSystemMetricsToATSv1) {
       dispatcher.getEventHandler().handle(
           new ContainerFinishedEvent(
               container.getContainerId(),

+ 104 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/timelineservice/RMTimelineCollector.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.timelineservice;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsEventType;
+import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollector;
+
+/**
+ * This class is responsible for posting application and appattempt lifecycle
+ * related events to timeline service V2
+ */
+@Private
+@Unstable
+public class RMTimelineCollector extends TimelineCollector {
+  private static final Log LOG = LogFactory.getLog(RMTimelineCollector.class);
+
+  public RMTimelineCollector() {
+    super("Resource Manager TimelineCollector");
+  }
+
+  private Dispatcher dispatcher;
+
+  private boolean publishSystemMetricsForV2;
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    publishSystemMetricsForV2 =
+        conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
+            YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
+            && conf.getBoolean(
+                YarnConfiguration.SYSTEM_METRICS_PUBLISHER_ENABLED,
+                YarnConfiguration.DEFAULT_SYSTEM_METRICS_PUBLISHER_ENABLED);
+
+    if (publishSystemMetricsForV2) {
+      // having separate dispatcher to avoid load on RMDispatcher
+      LOG.info("RMTimelineCollector has been configured to publish"
+          + " System Metrics in ATS V2");
+      dispatcher = new AsyncDispatcher();
+      dispatcher.register(SystemMetricsEventType.class,
+          new ForwardingEventHandler());
+    } else {
+      LOG.warn("RMTimelineCollector has not been configured to publish"
+          + " System Metrics in ATS V2");
+    }
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  protected void handleSystemMetricsEvent(SystemMetricsEvent event) {
+    switch (event.getType()) {
+    default:
+      LOG.error("Unknown SystemMetricsEvent type: " + event.getType());
+    }
+  }
+
+  /**
+   * EventHandler implementation which forward events to SystemMetricsPublisher.
+   * Making use of it, SystemMetricsPublisher can avoid to have a public handle
+   * method.
+   */
+  private final class ForwardingEventHandler implements
+      EventHandler<SystemMetricsEvent> {
+
+    @Override
+    public void handle(SystemMetricsEvent event) {
+      handleSystemMetricsEvent(event);
+    }
+  }
+}