|
@@ -22,20 +22,23 @@ import com.sun.jersey.api.core.PackagesResourceConfig;
|
|
|
import com.sun.jersey.api.core.ResourceConfig;
|
|
import com.sun.jersey.api.core.ResourceConfig;
|
|
|
import com.sun.net.httpserver.HttpServer;
|
|
import com.sun.net.httpserver.HttpServer;
|
|
|
|
|
|
|
|
|
|
+import javax.net.ssl.SSLContext;
|
|
|
import javax.ws.rs.core.UriBuilder;
|
|
import javax.ws.rs.core.UriBuilder;
|
|
|
-import java.io.IOException;
|
|
|
|
|
import java.net.InetAddress;
|
|
import java.net.InetAddress;
|
|
|
import java.net.URI;
|
|
import java.net.URI;
|
|
|
import java.net.URL;
|
|
import java.net.URL;
|
|
|
import java.net.UnknownHostException;
|
|
import java.net.UnknownHostException;
|
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
|
|
|
|
|
|
|
|
+import com.sun.net.httpserver.HttpsConfigurator;
|
|
|
|
|
+import com.sun.net.httpserver.HttpsServer;
|
|
|
import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
|
|
import org.apache.hadoop.metrics2.sink.timeline.AbstractMetricPublisher;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
|
|
import org.apache.hadoop.metrics2.sink.timeline.AggregatedMetricsPublisher;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
|
|
import org.apache.hadoop.metrics2.sink.timeline.RawMetricsPublisher;
|
|
|
|
|
+import org.eclipse.jetty.util.ssl.SslContextFactory;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
|
* WEB application with 2 publisher threads that processes received metrics and submits results to the collector
|
|
* WEB application with 2 publisher threads that processes received metrics and submits results to the collector
|
|
@@ -45,10 +48,12 @@ public class AggregatorApplication
|
|
|
private static final int STOP_SECONDS_DELAY = 0;
|
|
private static final int STOP_SECONDS_DELAY = 0;
|
|
|
private static final int JOIN_SECONDS_TIMEOUT = 5;
|
|
private static final int JOIN_SECONDS_TIMEOUT = 5;
|
|
|
private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
|
|
private static final String METRICS_SITE_CONFIGURATION_FILE = "ams-site.xml";
|
|
|
|
|
+ private static final String METRICS_SSL_SERVER_CONFIGURATION_FILE = "ssl-server.xml";
|
|
|
private Log LOG;
|
|
private Log LOG;
|
|
|
private final int webApplicationPort;
|
|
private final int webApplicationPort;
|
|
|
private final int rawPublishingInterval;
|
|
private final int rawPublishingInterval;
|
|
|
private final int aggregationInterval;
|
|
private final int aggregationInterval;
|
|
|
|
|
+ private final String webServerProtocol;
|
|
|
private Configuration configuration;
|
|
private Configuration configuration;
|
|
|
private Thread aggregatePublisherThread;
|
|
private Thread aggregatePublisherThread;
|
|
|
private Thread rawPublisherThread;
|
|
private Thread rawPublisherThread;
|
|
@@ -65,10 +70,11 @@ public class AggregatorApplication
|
|
|
this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
|
|
this.aggregationInterval = configuration.getInt("timeline.metrics.host.aggregator.minute.interval", 300);
|
|
|
this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
|
|
this.rawPublishingInterval = configuration.getInt("timeline.metrics.sink.report.interval", 60);
|
|
|
this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
|
|
this.webApplicationPort = configuration.getInt("timeline.metrics.host.inmemory.aggregation.port", 61888);
|
|
|
|
|
+ this.webServerProtocol = configuration.get("timeline.metrics.host.inmemory.aggregation.http.policy", "HTTP_ONLY").equalsIgnoreCase("HTTP_ONLY") ? "http" : "https";
|
|
|
this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
|
|
this.timelineMetricsHolder = TimelineMetricsHolder.getInstance(rawPublishingInterval, aggregationInterval);
|
|
|
try {
|
|
try {
|
|
|
this.httpServer = createHttpServer();
|
|
this.httpServer = createHttpServer();
|
|
|
- } catch (IOException e) {
|
|
|
|
|
|
|
+ } catch (Exception e) {
|
|
|
LOG.error("Exception while starting HTTP server. Exiting", e);
|
|
LOG.error("Exception while starting HTTP server. Exiting", e);
|
|
|
System.exit(1);
|
|
System.exit(1);
|
|
|
}
|
|
}
|
|
@@ -88,13 +94,20 @@ public class AggregatorApplication
|
|
|
|
|
|
|
|
URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
|
|
URL amsResUrl = classLoader.getResource(METRICS_SITE_CONFIGURATION_FILE);
|
|
|
LOG.info("Found metric service configuration: " + amsResUrl);
|
|
LOG.info("Found metric service configuration: " + amsResUrl);
|
|
|
|
|
+ URL sslConfUrl = classLoader.getResource(METRICS_SSL_SERVER_CONFIGURATION_FILE);
|
|
|
|
|
+ LOG.info("Found metric service configuration: " + sslConfUrl);
|
|
|
if (amsResUrl == null) {
|
|
if (amsResUrl == null) {
|
|
|
- throw new IllegalStateException("Unable to initialize the metrics " +
|
|
|
|
|
- "subsystem. No ams-site present in the classpath.");
|
|
|
|
|
|
|
+ throw new IllegalStateException(String.format("Unable to initialize the metrics " +
|
|
|
|
|
+ "subsystem. No %s present in the classpath.", METRICS_SITE_CONFIGURATION_FILE));
|
|
|
|
|
+ }
|
|
|
|
|
+ if (sslConfUrl == null) {
|
|
|
|
|
+ throw new IllegalStateException(String.format("Unable to initialize the metrics " +
|
|
|
|
|
+ "subsystem. No %s present in the classpath.", METRICS_SSL_SERVER_CONFIGURATION_FILE));
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
|
configuration.addResource(amsResUrl.toURI().toURL());
|
|
configuration.addResource(amsResUrl.toURI().toURL());
|
|
|
|
|
+ configuration.addResource(sslConfUrl.toURI().toURL());
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
LOG.error("Couldn't init configuration. ", e);
|
|
LOG.error("Couldn't init configuration. ", e);
|
|
|
System.exit(1);
|
|
System.exit(1);
|
|
@@ -112,17 +125,41 @@ public class AggregatorApplication
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
protected URI getURI() {
|
|
protected URI getURI() {
|
|
|
- URI uri = UriBuilder.fromUri("http://" + getHostName() + "/").port(this.webApplicationPort).build();
|
|
|
|
|
|
|
+ URI uri = UriBuilder.fromUri("/").scheme(this.webServerProtocol).host(getHostName()).port(this.webApplicationPort).build();
|
|
|
LOG.info(String.format("Web server at %s", uri));
|
|
LOG.info(String.format("Web server at %s", uri));
|
|
|
return uri;
|
|
return uri;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- protected HttpServer createHttpServer() throws IOException {
|
|
|
|
|
|
|
+ protected HttpServer createHttpServer() throws Exception {
|
|
|
ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
|
|
ResourceConfig resourceConfig = new PackagesResourceConfig("org.apache.hadoop.metrics2.host.aggregator");
|
|
|
HashMap<String, Object> params = new HashMap();
|
|
HashMap<String, Object> params = new HashMap();
|
|
|
params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
|
|
params.put("com.sun.jersey.api.json.POJOMappingFeature", "true");
|
|
|
resourceConfig.setPropertiesAndFeatures(params);
|
|
resourceConfig.setPropertiesAndFeatures(params);
|
|
|
- return HttpServerFactory.create(getURI(), resourceConfig);
|
|
|
|
|
|
|
+ HttpServer server = HttpServerFactory.create(getURI(), resourceConfig);
|
|
|
|
|
+
|
|
|
|
|
+ if (webServerProtocol.equalsIgnoreCase("https")) {
|
|
|
|
|
+ HttpsServer httpsServer = (HttpsServer) server;
|
|
|
|
|
+ SslContextFactory sslContextFactory = new SslContextFactory();
|
|
|
|
|
+ String keyStorePath = configuration.get("ssl.server.keystore.location");
|
|
|
|
|
+ String keyStorePassword = configuration.get("ssl.server.keystore.password");
|
|
|
|
|
+ String keyManagerPassword = configuration.get("ssl.server.keystore.keypassword");
|
|
|
|
|
+ String trustStorePath = configuration.get("ssl.server.truststore.location");
|
|
|
|
|
+ String trustStorePassword = configuration.get("ssl.server.truststore.password");
|
|
|
|
|
+
|
|
|
|
|
+ sslContextFactory.setKeyStorePath(keyStorePath);
|
|
|
|
|
+ sslContextFactory.setKeyStorePassword(keyStorePassword);
|
|
|
|
|
+ sslContextFactory.setKeyManagerPassword(keyManagerPassword);
|
|
|
|
|
+ sslContextFactory.setTrustStorePath(trustStorePath);
|
|
|
|
|
+ sslContextFactory.setTrustStorePassword(trustStorePassword);
|
|
|
|
|
+
|
|
|
|
|
+ sslContextFactory.start();
|
|
|
|
|
+ SSLContext sslContext = sslContextFactory.getSslContext();
|
|
|
|
|
+ sslContextFactory.stop();
|
|
|
|
|
+ HttpsConfigurator httpsConfigurator = new HttpsConfigurator(sslContext);
|
|
|
|
|
+ httpsServer.setHttpsConfigurator(httpsConfigurator);
|
|
|
|
|
+ server = httpsServer;
|
|
|
|
|
+ }
|
|
|
|
|
+ return server;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
private void startWebServer() {
|
|
private void startWebServer() {
|