|
@@ -18,14 +18,16 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
|
|
|
|
|
|
+import java.net.URI;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.util.HashMap;
|
|
|
|
|
|
-import com.google.inject.Inject;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.http.lib.StaticUserWebFilter;
|
|
|
import org.apache.hadoop.util.ExitUtil;
|
|
|
import org.apache.hadoop.util.ShutdownHookManager;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
@@ -40,11 +42,16 @@ import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
|
|
import org.apache.hadoop.yarn.server.api.ContainerContext;
|
|
|
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
|
|
|
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
|
|
|
-import org.apache.hadoop.yarn.webapp.*;
|
|
|
+import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
|
|
+import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
|
|
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
+import org.apache.hadoop.http.HttpServer2;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
|
|
|
+import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
|
|
|
+
|
|
|
/**
|
|
|
* The top-level server for the per-node timeline aggregator service. Currently
|
|
|
* it is defined as an auxiliary service to accommodate running within another
|
|
@@ -56,9 +63,10 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
|
|
|
private static final Log LOG =
|
|
|
LogFactory.getLog(PerNodeAggregatorServer.class);
|
|
|
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
|
|
+ static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
|
|
|
|
|
|
private final AppLevelServiceManager serviceManager;
|
|
|
- private WebApp webApp;
|
|
|
+ private HttpServer2 timelineRestServer;
|
|
|
|
|
|
public PerNodeAggregatorServer() {
|
|
|
// use the same singleton
|
|
@@ -86,8 +94,8 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
|
- if (webApp != null) {
|
|
|
- webApp.stop();
|
|
|
+ if (timelineRestServer != null) {
|
|
|
+ timelineRestServer.stop();
|
|
|
}
|
|
|
// stop the service manager
|
|
|
serviceManager.stop();
|
|
@@ -103,11 +111,31 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
|
|
|
WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
|
|
|
LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
|
|
|
try {
|
|
|
- webApp =
|
|
|
- WebApps
|
|
|
- .$for("timeline", null, null, "ws")
|
|
|
- .with(conf).at(bindAddress).start(
|
|
|
- new TimelineServiceWebApp());
|
|
|
+ Configuration confForInfoServer = new Configuration(conf);
|
|
|
+ confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
|
|
|
+ HttpServer2.Builder builder = new HttpServer2.Builder()
|
|
|
+ .setName("timeline")
|
|
|
+ .setConf(conf)
|
|
|
+ .addEndpoint(URI.create("http://" + bindAddress));
|
|
|
+ timelineRestServer = builder.build();
|
|
|
+ // TODO: replace this by an authentification filter in future.
|
|
|
+ HashMap<String, String> options = new HashMap<String, String>();
|
|
|
+ String username = conf.get(HADOOP_HTTP_STATIC_USER,
|
|
|
+ DEFAULT_HADOOP_HTTP_STATIC_USER);
|
|
|
+ options.put(HADOOP_HTTP_STATIC_USER, username);
|
|
|
+ HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
|
|
|
+ "static_user_filter_timeline",
|
|
|
+ StaticUserWebFilter.StaticUserFilter.class.getName(),
|
|
|
+ options, new String[] {"/*"});
|
|
|
+
|
|
|
+ timelineRestServer.addJerseyResourcePackage(
|
|
|
+ PerNodeAggregatorWebService.class.getPackage().getName() + ";"
|
|
|
+ + GenericExceptionHandler.class.getPackage().getName() + ";"
|
|
|
+ + YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
|
|
|
+ "/*");
|
|
|
+ timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
|
|
|
+ AppLevelServiceManager.getInstance());
|
|
|
+ timelineRestServer.start();
|
|
|
} catch (Exception e) {
|
|
|
String msg = "The per-node aggregator webapp failed to start.";
|
|
|
LOG.error(msg, e);
|
|
@@ -115,19 +143,6 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private static class TimelineServiceWebApp
|
|
|
- extends WebApp implements YarnWebParams {
|
|
|
- @Override
|
|
|
- public void setup() {
|
|
|
- bind(YarnJacksonJaxbJsonProvider.class);
|
|
|
- bind(GenericExceptionHandler.class);
|
|
|
- bind(PerNodeAggregatorWebService.class);
|
|
|
- // bind to the global singleton
|
|
|
- bind(AppLevelServiceManager.class).
|
|
|
- toProvider(AppLevelServiceManagerProvider.class);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
// these methods can be used as the basis for future service methods if the
|
|
|
// per-node aggregator runs separate from the node manager
|
|
|
/**
|