Просмотр исходного кода

YARN-3030. Set up TS aggregator with basic request serving structure and lifecycle. Contributed by Sangjin Lee.

(cherry picked from commit f26941b39028ac30c77547e2be2d657bb5bf044a)
Zhijie Shen 10 лет назад
Родитель
Сommit
7c8abec0a8
13 измененных файлов с 1072 добавлено и 7 удалено
  1. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml
  2. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java
  3. 52 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
  4. 57 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java
  5. 136 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java
  6. 33 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java
  7. 104 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
  8. 252 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
  9. 168 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java
  10. 3 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java
  11. 102 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java
  12. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java
  13. 149 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/pom.xml

@@ -52,6 +52,11 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-api</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <version>${project.version}</version>
+    </dependency>
     <dependency>
       <groupId>javax.xml.bind</groupId>
       <artifactId>jaxb-api</artifactId>

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/WebServer.java

@@ -30,6 +30,9 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManager;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.AppLevelServiceManagerProvider;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorWebService;
 import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
 import org.apache.hadoop.yarn.webapp.WebApp;
 import org.apache.hadoop.yarn.webapp.WebApps;
@@ -123,6 +126,12 @@ public class WebServer extends AbstractService {
       bind(NMWebServices.class);
       bind(GenericExceptionHandler.class);
       bind(JAXBContextResolver.class);
+      // host the timeline service aggregator web service temporarily
+      // (see YARN-3087)
+      bind(PerNodeAggregatorWebService.class);
+      // bind to the global singleton instance
+      bind(AppLevelServiceManager.class).
+          toProvider(AppLevelServiceManagerProvider.class);
       bind(ResourceView.class).toInstance(this.resourceView);
       bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
       bind(LocalDirsHandlerService.class).toInstance(dirsHandler);

+ 52 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml

@@ -41,6 +41,11 @@
       <artifactId>hadoop-common</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-annotations</artifactId>
+    </dependency>
+
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-yarn-api</artifactId>
@@ -53,12 +58,57 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-common</artifactId>
+      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>javax.servlet</groupId>
+      <artifactId>servlet-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>javax.xml.bind</groupId>
+      <artifactId>jaxb-api</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>com.sun.jersey</groupId>
+      <artifactId>jersey-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>commons-logging</groupId>
+      <artifactId>commons-logging</artifactId>
+    </dependency>
+
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
+      <artifactId>hadoop-common</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.mockito</groupId>
+      <artifactId>mockito-all</artifactId>
+      <scope>test</scope>
     </dependency>
   </dependencies>
 

+ 57 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelAggregatorService.java

@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage for a given YARN application.
+ *
+ * App-related lifecycle management is handled by this service.
+ */
+@Private
+@Unstable
+public class AppLevelAggregatorService extends BaseAggregatorService {
+  private final String applicationId;
+  // TODO define key metadata such as flow metadata, user, and queue
+
+  public AppLevelAggregatorService(String applicationId) {
+    super(AppLevelAggregatorService.class.getName() + " - " + applicationId);
+    this.applicationId = applicationId;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+}

+ 136 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManager.java

@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+
+/**
+ * Class that manages adding and removing app level aggregator services and
+ * their lifecycle. It provides thread safety access to the app level services.
+ *
+ * It is a singleton, and instances should be obtained via
+ * {@link #getInstance()}.
+ */
+@Private
+@Unstable
+public class AppLevelServiceManager extends CompositeService {
+  private static final Log LOG =
+      LogFactory.getLog(AppLevelServiceManager.class);
+  private static final AppLevelServiceManager INSTANCE =
+      new AppLevelServiceManager();
+
+  // access to this map is synchronized with the map itself
+  private final Map<String,AppLevelAggregatorService> services =
+      Collections.synchronizedMap(
+          new HashMap<String,AppLevelAggregatorService>());
+
+  static AppLevelServiceManager getInstance() {
+    return INSTANCE;
+  }
+
+  AppLevelServiceManager() {
+    super(AppLevelServiceManager.class.getName());
+  }
+
+  /**
+   * Creates and adds an app level aggregator service for the specified
+   * application id. The service is also initialized and started. If the service
+   * already exists, no new service is created.
+   *
+   * @throws YarnRuntimeException if there was any exception in initializing and
+   * starting the app level service
+   * @return whether it was added successfully
+   */
+  public boolean addService(String appId) {
+    synchronized (services) {
+      AppLevelAggregatorService service = services.get(appId);
+      if (service == null) {
+        try {
+          service = new AppLevelAggregatorService(appId);
+          // initialize, start, and add it to the parent service so it can be
+          // cleaned up when the parent shuts down
+          service.init(getConfig());
+          service.start();
+          services.put(appId, service);
+          LOG.info("the application aggregator service for " + appId +
+              " was added");
+          return true;
+        } catch (Exception e) {
+          throw new YarnRuntimeException(e);
+        }
+      } else {
+        String msg = "the application aggregator service for " + appId +
+            " already exists!";
+        LOG.error(msg);
+        return false;
+      }
+    }
+  }
+
+  /**
+   * Removes the app level aggregator service for the specified application id.
+   * The service is also stopped as a result. If the service does not exist, no
+   * change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean removeService(String appId) {
+    synchronized (services) {
+      AppLevelAggregatorService service = services.remove(appId);
+      if (service == null) {
+        String msg = "the application aggregator service for " + appId +
+            " does not exist!";
+        LOG.error(msg);
+        return false;
+      } else {
+        // stop the service to do clean up
+        service.stop();
+        LOG.info("the application aggregator service for " + appId +
+            " was removed");
+        return true;
+      }
+    }
+  }
+
+  /**
+   * Returns the app level aggregator service for the specified application id.
+   *
+   * @return the app level aggregator service or null if it does not exist
+   */
+  public AppLevelAggregatorService getService(String appId) {
+    return services.get(appId);
+  }
+
+  /**
+   * Returns whether the app level aggregator service for the specified
+   * application id exists.
+   */
+  public boolean hasService(String appId) {
+    return services.containsKey(appId);
+  }
+}

+ 33 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/AppLevelServiceManagerProvider.java

@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import com.google.inject.Provider;
+
+/**
+ * A guice provider that provides a global singleton instance of
+ * AppLevelServiceManager.
+ */
+public class AppLevelServiceManagerProvider
+    implements Provider<AppLevelServiceManager> {
+  @Override
+  public AppLevelServiceManager get() {
+    return AppLevelServiceManager.getInstance();
+  }
+}

+ 104 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java

@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+
+/**
+ * Service that handles writes to the timeline service and writes them to the
+ * backing storage.
+ *
+ * Classes that extend this can add their own lifecycle management or
+ * customization of request handling.
+ */
+@Private
+@Unstable
+public class BaseAggregatorService extends CompositeService {
+  private static final Log LOG = LogFactory.getLog(BaseAggregatorService.class);
+
+  public BaseAggregatorService(String name) {
+    super(name);
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  /**
+   * Handles entity writes. These writes are synchronous and are written to the
+   * backing storage without buffering/batching. If any entity already exists,
+   * it results in an update of the entity.
+   *
+   * This method should be reserved for selected critical entities and events.
+   * For normal voluminous writes one should use the async method
+   * {@link #postEntitiesAsync(TimelineEntities, UserGroupInformation)}.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   * @return the response that contains the result of the post.
+   */
+  public TimelinePutResponse postEntities(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+    return null;
+  }
+
+  /**
+   * Handles entity writes in an asynchronous manner. The method returns as soon
+   * as validation is done. No promises are made on how quickly it will be
+   * written to the backing storage or if it will always be written to the
+   * backing storage. Multiple writes to the same entities may be batched and
+   * appropriate values updated and result in fewer writes to the backing
+   * storage.
+   *
+   * @param entities entities to post
+   * @param callerUgi the caller UGI
+   */
+  public void postEntitiesAsync(TimelineEntities entities,
+      UserGroupInformation callerUgi) {
+    // TODO implement
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("postEntitiesAsync(entities=" + entities + ", callerUgi=" +
+          callerUgi + ")");
+    }
+  }
+}

+ 252 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java

@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import java.nio.ByteBuffer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
+import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
+import org.apache.hadoop.yarn.server.api.AuxiliaryService;
+import org.apache.hadoop.yarn.server.api.ContainerContext;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.apache.hadoop.yarn.webapp.WebApp;
+import org.apache.hadoop.yarn.webapp.WebApps;
+import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * The top-level server for the per-node timeline aggregator service. Currently
+ * it is defined as an auxiliary service to accommodate running within another
+ * daemon (e.g. node manager).
+ */
+@Private
+@Unstable
+public class PerNodeAggregatorServer extends AuxiliaryService {
+  private static final Log LOG =
+      LogFactory.getLog(PerNodeAggregatorServer.class);
+  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
+
+  private final AppLevelServiceManager serviceManager;
+  private WebApp webApp;
+
+  public PerNodeAggregatorServer() {
+    // use the same singleton
+    this(AppLevelServiceManager.getInstance());
+  }
+
+  @VisibleForTesting
+  PerNodeAggregatorServer(AppLevelServiceManager serviceManager) {
+    super("timeline_aggregator");
+    this.serviceManager = serviceManager;
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    serviceManager.init(conf);
+    super.serviceInit(conf);
+  }
+
+  @Override
+  protected void serviceStart() throws Exception {
+    super.serviceStart();
+    serviceManager.start();
+    startWebApp();
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    if (webApp != null) {
+      webApp.stop();
+    }
+    // stop the service manager
+    serviceManager.stop();
+    super.serviceStop();
+  }
+
+  private void startWebApp() {
+    Configuration conf = getConfig();
+    // use the same ports as the old ATS for now; we could create new properties
+    // for the new timeline service if needed
+    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
+                          YarnConfiguration.TIMELINE_SERVICE_BIND_HOST,
+                          WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
+    LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
+    try {
+      webApp =
+          WebApps
+            .$for("timeline", null, null, "ws")
+            .with(conf).at(bindAddress).start(
+                new TimelineServiceWebApp());
+    } catch (Exception e) {
+      String msg = "The per-node aggregator webapp failed to start.";
+      LOG.error(msg, e);
+      throw new YarnRuntimeException(msg, e);
+    }
+  }
+
+  private static class TimelineServiceWebApp
+      extends WebApp implements YarnWebParams {
+    @Override
+    public void setup() {
+      bind(PerNodeAggregatorWebService.class);
+      // bind to the global singleton
+      bind(AppLevelServiceManager.class).
+          toProvider(AppLevelServiceManagerProvider.class);
+    }
+  }
+
+  // these methods can be used as the basis for future service methods if the
+  // per-node aggregator runs separate from the node manager
+  /**
+   * Creates and adds an app level aggregator service for the specified
+   * application id. The service is also initialized and started. If the service
+   * already exists, no new service is created.
+   *
+   * @return whether it was added successfully
+   */
+  public boolean addApplication(ApplicationId appId) {
+    String appIdString = appId.toString();
+    return serviceManager.addService(appIdString);
+  }
+
+  /**
+   * Removes the app level aggregator service for the specified application id.
+   * The service is also stopped as a result. If the service does not exist, no
+   * change is made.
+   *
+   * @return whether it was removed successfully
+   */
+  public boolean removeApplication(ApplicationId appId) {
+    String appIdString = appId.toString();
+    return serviceManager.removeService(appIdString);
+  }
+
+  /**
+   * Creates and adds an app level aggregator service for the specified
+   * application id. The service is also initialized and started. If the service
+   * already exists, no new service is created.
+   */
+  @Override
+  public void initializeContainer(ContainerInitializationContext context) {
+    // intercept the event of the AM container being created and initialize the
+    // app level aggregator service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      addApplication(appId);
+    }
+  }
+
+  /**
+   * Removes the app level aggregator service for the specified application id.
+   * The service is also stopped as a result. If the service does not exist, no
+   * change is made.
+   */
+  @Override
+  public void stopContainer(ContainerTerminationContext context) {
+    // intercept the event of the AM container being stopped and remove the app
+    // level aggregator service
+    if (isApplicationMaster(context)) {
+      ApplicationId appId = context.getContainerId().
+          getApplicationAttemptId().getApplicationId();
+      removeApplication(appId);
+    }
+  }
+
+  private boolean isApplicationMaster(ContainerContext context) {
+    // TODO this is based on a (shaky) assumption that the container id (the
+    // last field of the full container id) for an AM is always 1
+    // we want to make this much more reliable
+    ContainerId containerId = context.getContainerId();
+    return containerId.getContainerId() == 1L;
+  }
+
+  @VisibleForTesting
+  boolean hasApplication(String appId) {
+    return serviceManager.hasService(appId);
+  }
+
+  @Override
+  public void initializeApplication(ApplicationInitializationContext context) {
+  }
+
+  @Override
+  public void stopApplication(ApplicationTerminationContext context) {
+  }
+
+  @Override
+  public ByteBuffer getMetaData() {
+    // TODO currently it is not used; we can return a more meaningful data when
+    // we connect it with an AM
+    return ByteBuffer.allocate(0);
+  }
+
+  @VisibleForTesting
+  static PerNodeAggregatorServer launchServer(String[] args) {
+    Thread
+      .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
+    StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,
+        LOG);
+    PerNodeAggregatorServer server = null;
+    try {
+      server = new PerNodeAggregatorServer();
+      ShutdownHookManager.get().addShutdownHook(new ShutdownHook(server),
+          SHUTDOWN_HOOK_PRIORITY);
+      YarnConfiguration conf = new YarnConfiguration();
+      server.init(conf);
+      server.start();
+    } catch (Throwable t) {
+      LOG.fatal("Error starting PerNodeAggregatorServer", t);
+      ExitUtil.terminate(-1, "Error starting PerNodeAggregatorServer");
+    }
+    return server;
+  }
+
+  private static class ShutdownHook implements Runnable {
+    private final PerNodeAggregatorServer server;
+
+    public ShutdownHook(PerNodeAggregatorServer server) {
+      this.server = server;
+    }
+
+    public void run() {
+      server.stop();
+    }
+  }
+
+  public static void main(String[] args) {
+    launchServer(args);
+  }
+}

+ 168 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java

@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.webapp.ForbiddenException;
+import org.apache.hadoop.yarn.webapp.NotFoundException;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+
+/**
+ * The main per-node REST end point for timeline service writes. It is
+ * essentially a container service that routes requests to the appropriate
+ * per-app services.
+ */
+@Private
+@Unstable
+@Singleton
+@Path("/ws/v2/timeline")
+public class PerNodeAggregatorWebService {
+  private static final Log LOG =
+      LogFactory.getLog(PerNodeAggregatorWebService.class);
+
+  private final AppLevelServiceManager serviceManager;
+
+  @Inject
+  public PerNodeAggregatorWebService(AppLevelServiceManager serviceManager) {
+    this.serviceManager = serviceManager;
+  }
+
+  @XmlRootElement(name = "about")
+  @XmlAccessorType(XmlAccessType.NONE)
+  @Public
+  @Unstable
+  public static class AboutInfo {
+
+    private String about;
+
+    public AboutInfo() {
+
+    }
+
+    public AboutInfo(String about) {
+      this.about = about;
+    }
+
+    @XmlElement(name = "About")
+    public String getAbout() {
+      return about;
+    }
+
+    public void setAbout(String about) {
+      this.about = about;
+    }
+
+  }
+
+  /**
+   * Return the description of the timeline web services.
+   */
+  @GET
+  @Produces({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public AboutInfo about(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res) {
+    init(res);
+    return new AboutInfo("Timeline API");
+  }
+
+  /**
+   * Accepts writes to the aggregator, and returns a response. It simply routes
+   * the request to the app level aggregator. It expects an application as a
+   * context.
+   */
+  @POST
+  @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
+  public TimelinePutResponse postEntities(
+      @Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      TimelineEntities entities) {
+    init(res);
+    UserGroupInformation callerUgi = getUser(req);
+    if (callerUgi == null) {
+      String msg = "The owner of the posted timeline entities is not set";
+      LOG.error(msg);
+      throw new ForbiddenException(msg);
+    }
+
+    // TODO how to express async posts and handle them
+    try {
+      AppLevelAggregatorService service = getAggregatorService(req);
+      if (service == null) {
+        LOG.error("Application not found");
+        throw new NotFoundException(); // different exception?
+      }
+      return service.postEntities(entities, callerUgi);
+    } catch (Exception e) {
+      LOG.error("Error putting entities", e);
+      throw new WebApplicationException(e,
+          Response.Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  private AppLevelAggregatorService
+      getAggregatorService(HttpServletRequest req) {
+    String appIdString = getApplicationId(req);
+    return serviceManager.getService(appIdString);
+  }
+
+  private String getApplicationId(HttpServletRequest req) {
+    // TODO the application id from the request
+    // (most likely from the URI)
+    return null;
+  }
+
+  private void init(HttpServletResponse response) {
+    response.setContentType(null);
+  }
+
+  private UserGroupInformation getUser(HttpServletRequest req) {
+    String remoteUser = req.getRemoteUser();
+    UserGroupInformation callerUgi = null;
+    if (remoteUser != null) {
+      callerUgi = UserGroupInformation.createRemoteUser(remoteUser);
+    }
+    return callerUgi;
+  }
+}

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/TimelineAggregator.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelAggregatorService.java

@@ -16,8 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.server.timelineservice;
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
 
-public class TimelineAggregator {
 
-}
+public class TestAppLevelAggregatorService {
+}

+ 102 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestAppLevelServiceManager.java

@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+public class TestAppLevelServiceManager {
+
+  @Test(timeout=60000)
+  public void testMultithreadedAdd() throws Exception {
+    final AppLevelServiceManager serviceManager =
+        spy(new AppLevelServiceManager());
+    doReturn(new Configuration()).when(serviceManager).getConfig();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final String appId = String.valueOf(i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          return serviceManager.addService(appId);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      assertTrue(serviceManager.hasService(String.valueOf(i)));
+    }
+  }
+
+  @Test
+  public void testMultithreadedAddAndRemove() throws Exception {
+    final AppLevelServiceManager serviceManager =
+        spy(new AppLevelServiceManager());
+    doReturn(new Configuration()).when(serviceManager).getConfig();
+
+    final int NUM_APPS = 5;
+    List<Callable<Boolean>> tasks = new ArrayList<Callable<Boolean>>();
+    for (int i = 0; i < NUM_APPS; i++) {
+      final String appId = String.valueOf(i);
+      Callable<Boolean> task = new Callable<Boolean>() {
+        public Boolean call() {
+          return serviceManager.addService(appId) &&
+              serviceManager.removeService(appId);
+        }
+      };
+      tasks.add(task);
+    }
+    ExecutorService executor = Executors.newFixedThreadPool(NUM_APPS);
+    try {
+      List<Future<Boolean>> futures = executor.invokeAll(tasks);
+      for (Future<Boolean> future: futures) {
+        assertTrue(future.get());
+      }
+    } finally {
+      executor.shutdownNow();
+    }
+    // check the keys
+    for (int i = 0; i < NUM_APPS; i++) {
+      assertFalse(serviceManager.hasService(String.valueOf(i)));
+    }
+  }
+}

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineAggregator.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestBaseAggregatorService.java

@@ -16,8 +16,8 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.yarn.server.timelineservice;
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
 
-public class TestTimelineAggregator {
+public class TestBaseAggregatorService {
 
 }

+ 149 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/TestPerNodeAggregatorServer.java

@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.aggregator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
+import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
+import org.junit.Test;
+
+public class TestPerNodeAggregatorServer {
+  private ApplicationAttemptId appAttemptId;
+
+  public TestPerNodeAggregatorServer() {
+    ApplicationId appId =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+  }
+
+  @Test
+  public void testAddApplication() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+    // aggregator should have a single app
+    assertTrue(aggregator.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+    aggregator.close();
+  }
+
+  @Test
+  public void testAddApplicationNonAMContainer() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregator();
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.initializeContainer(context);
+    // aggregator should not have that app
+    assertFalse(aggregator.hasApplication(
+        appAttemptId.getApplicationId().toString()));
+  }
+
+  @Test
+  public void testRemoveApplication() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+    // aggregator should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(aggregator.hasApplication(appIdStr));
+
+    ContainerId containerId = getAMContainerId();
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.stopContainer(context);
+    // aggregator should not have that app
+    assertFalse(aggregator.hasApplication(appIdStr));
+    aggregator.close();
+  }
+
+  @Test
+  public void testRemoveApplicationNonAMContainer() throws Exception {
+    PerNodeAggregatorServer aggregator = createAggregatorAndAddApplication();
+    // aggregator should have a single app
+    String appIdStr = appAttemptId.getApplicationId().toString();
+    assertTrue(aggregator.hasApplication(appIdStr));
+
+    ContainerId containerId = getContainerId(2L); // not an AM
+    ContainerTerminationContext context =
+        mock(ContainerTerminationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.stopContainer(context);
+    // aggregator should still have that app
+    assertTrue(aggregator.hasApplication(appIdStr));
+    aggregator.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testLaunch() throws Exception {
+    ExitUtil.disableSystemExit();
+    PerNodeAggregatorServer server = null;
+    try {
+      server =
+          PerNodeAggregatorServer.launchServer(new String[0]);
+    } catch (ExitUtil.ExitException e) {
+      assertEquals(0, e.status);
+      ExitUtil.resetFirstExitException();
+      fail();
+    } finally {
+      if (server != null) {
+        server.stop();
+      }
+    }
+  }
+
+  private PerNodeAggregatorServer createAggregatorAndAddApplication() {
+    PerNodeAggregatorServer aggregator = createAggregator();
+    // create an AM container
+    ContainerId containerId = getAMContainerId();
+    ContainerInitializationContext context =
+        mock(ContainerInitializationContext.class);
+    when(context.getContainerId()).thenReturn(containerId);
+    aggregator.initializeContainer(context);
+    return aggregator;
+  }
+
+  private PerNodeAggregatorServer createAggregator() {
+    AppLevelServiceManager serviceManager = spy(new AppLevelServiceManager());
+    doReturn(new Configuration()).when(serviceManager).getConfig();
+    PerNodeAggregatorServer aggregator =
+        spy(new PerNodeAggregatorServer(serviceManager));
+    return aggregator;
+  }
+
+  private ContainerId getAMContainerId() {
+    return getContainerId(1L);
+  }
+
+  private ContainerId getContainerId(long id) {
+    return ContainerId.newContainerId(appAttemptId, id);
+  }
+}