Browse Source

YARN-3240. Implement client API to put generic entities. Contributed by Zhijie Shen

(cherry picked from commit 4487da249f448d5c67b712cd0aa723e764eed77d)
Junping Du 10 years ago
parent
commit
1937a14d2e
13 changed files with 344 additions and 46 deletions
  1. 7 0
      hadoop-project/pom.xml
  2. 3 0
      hadoop-yarn-project/CHANGES.txt
  3. 58 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java
  4. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
  5. 59 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java
  6. 93 14
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java
  7. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java
  8. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml
  9. 54 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java
  10. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
  11. 2 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java
  12. 5 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java
  13. 32 22
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java

+ 7 - 0
hadoop-project/pom.xml

@@ -298,6 +298,13 @@
         <version>${project.version}</version>
       </dependency>
 
+      <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+          <version>${project.version}</version>
+          <type>test-jar</type>
+      </dependency>
+
      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-yarn-applications-distributedshell</artifactId>

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -14,6 +14,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3041. Added the overall data model of timeline service next gen.
     (zjshen)
 
+    YARN-3240. Implement client API to put generic entities. (Zhijie Shen via
+    junping_du)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

+ 58 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/timelineservice/TimelineEntities.java

@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.api.records.timelineservice;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import javax.xml.bind.annotation.XmlAccessType;
+import javax.xml.bind.annotation.XmlAccessorType;
+import javax.xml.bind.annotation.XmlElement;
+import javax.xml.bind.annotation.XmlRootElement;
+import java.util.HashSet;
+import java.util.Set;
+
+@XmlRootElement(name = "entities")
+@XmlAccessorType(XmlAccessType.NONE)
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public class TimelineEntities {
+
+  private Set<TimelineEntity> entities = new HashSet<>();
+
+  public TimelineEntities() {
+
+  }
+
+  @XmlElement(name = "entities")
+  public Set<TimelineEntity> getEntities() {
+    return entities;
+  }
+
+  public void setEntities(Set<TimelineEntity> entities) {
+    this.entities = entities;
+  }
+
+  public void addEntities(Set<TimelineEntity> entities) {
+    this.entities.addAll(entities);
+  }
+
+  public void addEntity(TimelineEntity entity) {
+    entities.add(entity);
+  }
+}

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml

@@ -207,6 +207,7 @@
             <exclude>src/main/resources/webapps/jobhistory/.keep</exclude>
             <exclude>src/main/resources/webapps/yarn/.keep</exclude>
             <exclude>src/main/resources/webapps/applicationhistory/.keep</exclude>
+            <exclude>src/main/resources/webapps/timeline/.keep</exclude>
             <exclude>src/main/resources/webapps/cluster/.keep</exclude>
             <exclude>src/main/resources/webapps/test/.keep</exclude>
             <exclude>src/main/resources/webapps/proxy/.keep</exclude>

+ 59 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/TimelineClient.java

@@ -30,6 +30,9 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
+import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
 import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -52,15 +55,25 @@ public abstract class TimelineClient extends AbstractService {
    *
    * @return a timeline client
    */
+  protected ApplicationId contextAppId;
+  protected String timelineServiceAddress;
+
   @Public
   public static TimelineClient createTimelineClient() {
     TimelineClient client = new TimelineClientImpl();
     return client;
   }
 
+  @Public
+  public static TimelineClient createTimelineClient(ApplicationId appId) {
+    TimelineClient client = new TimelineClientImpl(appId);
+    return client;
+  }
+
   @Private
-  protected TimelineClient(String name) {
+  protected TimelineClient(String name, ApplicationId appId) {
     super(name);
+    contextAppId = appId;
   }
 
   /**
@@ -185,4 +198,49 @@ public abstract class TimelineClient extends AbstractService {
   public abstract void cancelDelegationToken(
       Token<TimelineDelegationTokenIdentifier> timelineDT)
           throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * aggregator. It is a blocking API. The method will not return until all the
+   * put entities have been persisted.
+   * </p>
+   *
+   * @param entities
+   *          the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+   * @throws IOException
+   * @throws YarnException
+   */
+  @Public
+  public abstract void putEntities(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+      throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Send the information of a number of conceptual entities to the timeline
+   * aggregator. It is an asynchronous API. The method will return once all the
+   * entities are received.
+   * </p>
+   *
+   * @param entities
+   *          the collection of {@link org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
+   * @throws IOException
+   * @throws YarnException
+   */
+  @Public
+  public abstract void putEntitiesAsync(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+      throws IOException, YarnException;
+
+  /**
+   * <p>
+   * Update the timeline service address where the request will be sent to
+   * </p>
+   * @param address
+   *          the timeline service address
+   */
+  public void setTimelineServiceAddress(String address) {
+    timelineServiceAddress = address;
+  }
 }

+ 93 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/api/impl/TimelineClientImpl.java

@@ -33,6 +33,9 @@ import java.security.PrivilegedExceptionAction;
 import javax.net.ssl.HostnameVerifier;
 import javax.net.ssl.HttpsURLConnection;
 import javax.net.ssl.SSLSocketFactory;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.MultivaluedMap;
+
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -42,8 +45,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -53,6 +56,7 @@ import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthentica
 import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
 import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
@@ -77,13 +81,15 @@ import com.sun.jersey.api.client.config.DefaultClientConfig;
 import com.sun.jersey.api.client.filter.ClientFilter;
 import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
 import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
+import com.sun.jersey.core.util.MultivaluedMapImpl;
 
 @Private
 @Evolving
 public class TimelineClientImpl extends TimelineClient {
 
   private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
-  private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
+  private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
+  private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
   private static final Joiner JOINER = Joiner.on("");
   public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
 
@@ -250,7 +256,11 @@ public class TimelineClientImpl extends TimelineClient {
   }
 
   public TimelineClientImpl() {
-    super(TimelineClientImpl.class.getName());
+    super(TimelineClientImpl.class.getName(), null);
+  }
+
+  public TimelineClientImpl(ApplicationId applicationId) {
+    super(TimelineClientImpl.class.getName(), applicationId);
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -282,21 +292,19 @@ public class TimelineClientImpl extends TimelineClient {
     client.addFilter(retryFilter);
 
     if (YarnConfiguration.useHttps(conf)) {
-      resURI = URI
-          .create(JOINER.join("https://", conf.get(
-              YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
-              YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS),
-              RESOURCE_URI_STR));
+      timelineServiceAddress = conf.get(
+          YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
     } else {
-      resURI = URI.create(JOINER.join("http://", conf.get(
+      timelineServiceAddress = conf.get(
           YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
-          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS),
-          RESOURCE_URI_STR));
+          YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
     }
     LOG.info("Timeline service address: " + resURI);
     timelineServiceVersion =
         conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
           YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
+    LOG.info("Timeline service address: " + timelineServiceAddress);
     super.serviceInit(conf);
   }
 
@@ -331,6 +339,39 @@ public class TimelineClientImpl extends TimelineClient {
     return timelineWriter.putEntities(entities);
   }
 
+  @Override
+  public void putEntities(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+      throws IOException, YarnException {
+    putEntities(false, entities);
+  }
+
+  @Override
+  public void putEntitiesAsync(
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+      throws IOException, YarnException {
+    putEntities(true, entities);
+  }
+
+  private void putEntities(boolean async,
+      org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity... entities)
+      throws IOException, YarnException {
+    org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
+        entitiesContainer =
+        new org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities();
+    for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity entity : entities) {
+      entitiesContainer.addEntity(entity);
+    }
+    MultivaluedMap<String, String> params = new MultivaluedMapImpl();
+    if (contextAppId != null) {
+      params.add("appid", contextAppId.toString());
+    }
+    if (async) {
+      params.add("async", Boolean.TRUE.toString());
+    }
+    putObjects(constructResURI(getConfig(), timelineServiceAddress, true),
+        "entities", params, entitiesContainer);
+  }
 
   @Override
   public void putDomain(TimelineDomain domain) throws IOException,
@@ -338,6 +379,36 @@ public class TimelineClientImpl extends TimelineClient {
     timelineWriter.putDomain(domain);
   }
 
+  private void putObjects(
+      URI base, String path, MultivaluedMap<String, String> params, Object obj)
+          throws IOException, YarnException {
+    ClientResponse resp;
+    try {
+      resp = client.resource(base).path(path).queryParams(params)
+          .accept(MediaType.APPLICATION_JSON)
+          .type(MediaType.APPLICATION_JSON)
+          .put(ClientResponse.class, obj);
+    } catch (RuntimeException re) {
+      // runtime exception is expected if the client cannot connect the server
+      String msg =
+          "Failed to get the response from the timeline server.";
+      LOG.error(msg, re);
+      throw new IOException(re);
+    }
+    if (resp == null ||
+        resp.getClientResponseStatus() != ClientResponse.Status.OK) {
+      String msg =
+          "Failed to get the response from the timeline server.";
+      LOG.error(msg);
+      if (LOG.isDebugEnabled() && resp != null) {
+        String output = resp.getEntity(String.class);
+        LOG.debug("HTTP error code: " + resp.getStatus()
+            + " Server response:\n" + output);
+      }
+      throw new YarnException(msg);
+    }
+  }
+
   @SuppressWarnings("unchecked")
   @Override
   public Token<TimelineDelegationTokenIdentifier> getDelegationToken(
@@ -352,7 +423,8 @@ public class TimelineClientImpl extends TimelineClient {
                 new DelegationTokenAuthenticatedURL(authenticator,
                     connConfigurator);
             return (Token) authUrl.getDelegationToken(
-                resURI.toURL(), token, renewer, doAsUser);
+                constructResURI(getConfig(), timelineServiceAddress, false).toURL(),
+                token, renewer, doAsUser);
           }
         };
     return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
@@ -387,7 +459,7 @@ public class TimelineClientImpl extends TimelineClient {
             // the configured service address.
             final URI serviceURI = isTokenServiceAddrEmpty ? resURI
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR, null, null);
+                address.getPort(), RESOURCE_URI_STR_V1, null, null);
             return authUrl
                 .renewDelegationToken(serviceURI.toURL(), token, doAsUser);
           }
@@ -424,7 +496,7 @@ public class TimelineClientImpl extends TimelineClient {
             // the configured service address.
             final URI serviceURI = isTokenServiceAddrEmpty ? resURI
                 : new URI(scheme, null, address.getHostName(),
-                address.getPort(), RESOURCE_URI_STR, null, null);
+                address.getPort(), RESOURCE_URI_STR_V1, null, null);
             authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
             return null;
           }
@@ -531,6 +603,13 @@ public class TimelineClientImpl extends TimelineClient {
     connection.setReadTimeout(socketTimeout);
   }
 
+  private static URI constructResURI(
+      Configuration conf, String address, boolean v2) {
+    return URI.create(
+        JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
+            address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
+  }
+
   public static void main(String[] argv) throws Exception {
     CommandLine cliParser = new GnuParser().parse(opts, argv);
     if (cliParser.hasOption("put")) {

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/records/timelineservice/TestTimelineServiceRecords.java

@@ -76,6 +76,13 @@ public class TestTimelineServiceRecords {
     entity.addIsRelatedToEntity("test type 4", "test id 4");
     entity.addIsRelatedToEntity("test type 5", "test id 5");
     LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entity, true));
+
+    TimelineEntities entities = new TimelineEntities();
+    TimelineEntity entity1 = new TimelineEntity();
+    entities.addEntity(entity1);
+    TimelineEntity entity2 = new TimelineEntity();
+    entities.addEntity(entity2);
+    LOG.info(TimelineUtils.dumpTimelineRecordtoJSON(entities, true));
   }
 
   @Test

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/pom.xml

@@ -86,6 +86,18 @@
       <scope>test</scope>
     </dependency>
     <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-server-timelineservice</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-minikdc</artifactId>

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java

@@ -0,0 +1,54 @@
+package org.apache.hadoop.yarn.server.timelineservice;
+
+
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.client.api.TimelineClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeAggregatorServer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+
+public class TestTimelineServiceClientIntegration {
+  private static PerNodeAggregatorServer server;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    try {
+      server = PerNodeAggregatorServer.launchServer(new String[0]);
+      server.addApplication(ApplicationId.newInstance(0, 1));
+    } catch (ExitUtil.ExitException e) {
+      fail();
+    }
+  }
+
+  @AfterClass
+  public static void tearDownClass() throws Exception {
+    if (server != null) {
+      server.stop();
+    }
+  }
+
+  @Test
+  public void testPutEntities() throws Exception {
+    TimelineClient client =
+        TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
+    try {
+      client.init(new YarnConfiguration());
+      client.start();
+      TimelineEntity entity = new TimelineEntity();
+      entity.setType("test entity type");
+      entity.setId("test entity id");
+      client.putEntities(entity);
+      client.putEntitiesAsync(entity);
+    } catch(Exception e) {
+      fail();
+    } finally {
+      client.stop();
+    }
+  }
+}

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml

@@ -114,6 +114,17 @@
 
   <build>
     <plugins>
+      <plugin>
+        <artifactId>maven-jar-plugin</artifactId>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+            <phase>test-compile</phase>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
 </project>

+ 2 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/BaseAggregatorService.java

@@ -25,8 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 
 /**
  * Service that handles writes to the timeline service and writes them to the
@@ -70,16 +69,14 @@ public class BaseAggregatorService extends CompositeService {
    *
    * @param entities entities to post
    * @param callerUgi the caller UGI
-   * @return the response that contains the result of the post.
    */
-  public TimelinePutResponse postEntities(TimelineEntities entities,
+  public void postEntities(TimelineEntities entities,
       UserGroupInformation callerUgi) {
     // TODO implement
     if (LOG.isDebugEnabled()) {
       LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
           callerUgi + ")");
     }
-    return null;
   }
 
   /**

+ 5 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorServer.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator;
 
 import java.nio.ByteBuffer;
 
+import com.google.inject.Inject;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -39,9 +40,7 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
 import org.apache.hadoop.yarn.server.api.ContainerContext;
 import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
 import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
-import org.apache.hadoop.yarn.webapp.WebApp;
-import org.apache.hadoop.yarn.webapp.WebApps;
-import org.apache.hadoop.yarn.webapp.YarnWebParams;
+import org.apache.hadoop.yarn.webapp.*;
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -120,6 +119,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
       extends WebApp implements YarnWebParams {
     @Override
     public void setup() {
+      bind(YarnJacksonJaxbJsonProvider.class);
+      bind(GenericExceptionHandler.class);
       bind(PerNodeAggregatorWebService.class);
       // bind to the global singleton
       bind(AppLevelServiceManager.class).
@@ -214,7 +215,7 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
   }
 
   @VisibleForTesting
-  static PerNodeAggregatorServer launchServer(String[] args) {
+  public static PerNodeAggregatorServer launchServer(String[] args) {
     Thread
       .setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
     StringUtils.startupShutdownMessage(PerNodeAggregatorServer.class, args,

+ 32 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/aggregator/PerNodeAggregatorWebService.java

@@ -20,12 +20,7 @@ package org.apache.hadoop.yarn.server.timelineservice.aggregator;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.Produces;
-import javax.ws.rs.WebApplicationException;
+import javax.ws.rs.*;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
@@ -40,14 +35,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
-import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.hadoop.yarn.webapp.ForbiddenException;
 import org.apache.hadoop.yarn.webapp.NotFoundException;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
+import java.net.URI;
+
 /**
  * The main per-node REST end point for timeline service writes. It is
  * essentially a container service that routes requests to the appropriate
@@ -112,11 +110,14 @@ public class PerNodeAggregatorWebService {
    * the request to the app level aggregator. It expects an application as a
    * context.
    */
-  @POST
+  @PUT
+  @Path("/entities")
   @Consumes({ MediaType.APPLICATION_JSON /* , MediaType.APPLICATION_XML */})
-  public TimelinePutResponse postEntities(
+  public Response putEntities(
       @Context HttpServletRequest req,
       @Context HttpServletResponse res,
+      @QueryParam("async") String async,
+      @QueryParam("appid") String appId,
       TimelineEntities entities) {
     init(res);
     UserGroupInformation callerUgi = getUser(req);
@@ -127,13 +128,20 @@ public class PerNodeAggregatorWebService {
     }
 
     // TODO how to express async posts and handle them
+    boolean isAsync = async != null && async.trim().equalsIgnoreCase("true");
+
     try {
-      AppLevelAggregatorService service = getAggregatorService(req);
+      appId = parseApplicationId(appId);
+      if (appId == null) {
+        return Response.status(Response.Status.BAD_REQUEST).build();
+      }
+      AppLevelAggregatorService service = serviceManager.getService(appId);
       if (service == null) {
         LOG.error("Application not found");
         throw new NotFoundException(); // different exception?
       }
-      return service.postEntities(entities, callerUgi);
+      service.postEntities(entities, callerUgi);
+      return Response.ok().build();
     } catch (Exception e) {
       LOG.error("Error putting entities", e);
       throw new WebApplicationException(e,
@@ -141,16 +149,18 @@ public class PerNodeAggregatorWebService {
     }
   }
 
-  private AppLevelAggregatorService
-      getAggregatorService(HttpServletRequest req) {
-    String appIdString = getApplicationId(req);
-    return serviceManager.getService(appIdString);
-  }
-
-  private String getApplicationId(HttpServletRequest req) {
-    // TODO the application id from the request
-    // (most likely from the URI)
-    return null;
+  private String parseApplicationId(String appId) {
+    // Make sure the appId is not null and is valid
+    ApplicationId appID;
+    try {
+      if (appId != null) {
+        return ConverterUtils.toApplicationId(appId.trim()).toString();
+      } else {
+        return null;
+      }
+    } catch (Exception e) {
+      return null;
+    }
   }
 
   private void init(HttpServletResponse response) {