|
@@ -21,15 +21,22 @@ package org.apache.hadoop.yarn.server.globalpolicygenerator;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.UnknownHostException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
import org.apache.commons.lang.time.DurationFormatUtils;
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.source.JvmMetrics;
|
|
|
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
|
|
+import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
|
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
|
import org.apache.hadoop.service.CompositeService;
|
|
|
+import org.apache.hadoop.util.JvmPauseMonitor;
|
|
|
import org.apache.hadoop.util.ShutdownHookManager;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
|
@@ -38,6 +45,10 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
|
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
|
|
|
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
|
|
|
+import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
|
|
|
+import org.apache.hadoop.yarn.webapp.WebApp;
|
|
|
+import org.apache.hadoop.yarn.webapp.WebApps;
|
|
|
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -61,6 +72,7 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
|
|
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
|
|
private static final String METRICS_NAME = "Global Policy Generator";
|
|
|
+ private static long gpgStartupTime = System.currentTimeMillis();
|
|
|
|
|
|
// Federation Variables
|
|
|
private GPGContext gpgContext;
|
|
@@ -69,6 +81,9 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
private ScheduledThreadPoolExecutor scheduledExecutorService;
|
|
|
private SubClusterCleaner subClusterCleaner;
|
|
|
private PolicyGenerator policyGenerator;
|
|
|
+ private String webAppAddress;
|
|
|
+ private JvmPauseMonitor pauseMonitor;
|
|
|
+ private WebApp webApp;
|
|
|
|
|
|
public GlobalPolicyGenerator() {
|
|
|
super(GlobalPolicyGenerator.class.getName());
|
|
@@ -107,7 +122,12 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
|
|
|
this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
|
|
|
|
|
|
+ this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
|
|
|
DefaultMetricsSystem.initialize(METRICS_NAME);
|
|
|
+ JvmMetrics jm = JvmMetrics.initSingleton("GPG", null);
|
|
|
+ pauseMonitor = new JvmPauseMonitor();
|
|
|
+ addService(pauseMonitor);
|
|
|
+ jm.setPauseMonitor(pauseMonitor);
|
|
|
|
|
|
// super.serviceInit after all services are added
|
|
|
super.serviceInit(conf);
|
|
@@ -163,6 +183,7 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
LOG.info("Scheduled policy-generator with interval: {}",
|
|
|
DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
|
|
|
}
|
|
|
+ startWepApp();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -181,6 +202,9 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
if (this.isStopping.getAndSet(true)) {
|
|
|
return;
|
|
|
}
|
|
|
+ if (webApp != null) {
|
|
|
+ webApp.stop();
|
|
|
+ }
|
|
|
DefaultMetricsSystem.shutdown();
|
|
|
super.serviceStop();
|
|
|
}
|
|
@@ -193,6 +217,43 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
return this.gpgContext;
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public void startWepApp() {
|
|
|
+ Configuration configuration = getConfig();
|
|
|
+
|
|
|
+ boolean enableCors = configuration.getBoolean(YarnConfiguration.GPG_WEBAPP_ENABLE_CORS_FILTER,
|
|
|
+ YarnConfiguration.DEFAULT_GPG_WEBAPP_ENABLE_CORS_FILTER);
|
|
|
+
|
|
|
+ if (enableCors) {
|
|
|
+ configuration.setBoolean(HttpCrossOriginFilterInitializer.PREFIX
|
|
|
+ + HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Always load pseudo authentication filter to parse "user.name" in an URL
|
|
|
+ // to identify a HTTP request's user.
|
|
|
+ boolean hasHadoopAuthFilterInitializer = false;
|
|
|
+ String filterInitializerConfKey = "hadoop.http.filter.initializers";
|
|
|
+ Class<?>[] initializersClasses = configuration.getClasses(filterInitializerConfKey);
|
|
|
+
|
|
|
+ List<String> targets = new ArrayList<>();
|
|
|
+ if (initializersClasses != null) {
|
|
|
+ for (Class<?> initializer : initializersClasses) {
|
|
|
+ if (initializer.getName().equals(AuthenticationFilterInitializer.class.getName())) {
|
|
|
+ hasHadoopAuthFilterInitializer = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ targets.add(initializer.getName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!hasHadoopAuthFilterInitializer) {
|
|
|
+ targets.add(AuthenticationFilterInitializer.class.getName());
|
|
|
+ configuration.set(filterInitializerConfKey, StringUtils.join(",", targets));
|
|
|
+ }
|
|
|
+ LOG.info("Instantiating GPGWebApp at {}.", webAppAddress);
|
|
|
+ GPGWebApp gpgWebApp = new GPGWebApp(this);
|
|
|
+ webApp = WebApps.$for("gpg").at(webAppAddress).start(gpgWebApp);
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("resource")
|
|
|
public static void startGPG(String[] argv, Configuration conf) {
|
|
|
boolean federationEnabled = conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
|
|
@@ -232,4 +293,13 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
System.exit(-1);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public static long getGPGStartupTime() {
|
|
|
+ return gpgStartupTime;
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public WebApp getWebApp() {
|
|
|
+ return webApp;
|
|
|
+ }
|
|
|
}
|