|
@@ -18,6 +18,8 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.globalpolicygenerator;
|
|
|
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
@@ -25,9 +27,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import org.apache.commons.lang3.time.DurationFormatUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.metrics2.source.JvmMetrics;
|
|
|
import org.apache.hadoop.registry.client.api.RegistryOperations;
|
|
|
+import org.apache.hadoop.security.AuthenticationFilterInitializer;
|
|
|
+import org.apache.hadoop.security.HttpCrossOriginFilterInitializer;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.service.CompositeService;
|
|
|
+import org.apache.hadoop.util.JvmPauseMonitor;
|
|
|
import org.apache.hadoop.util.ShutdownHookManager;
|
|
|
import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
|
@@ -37,9 +43,15 @@ import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade
|
|
|
import org.apache.hadoop.yarn.server.globalpolicygenerator.applicationcleaner.ApplicationCleaner;
|
|
|
import org.apache.hadoop.yarn.server.globalpolicygenerator.policygenerator.PolicyGenerator;
|
|
|
import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
|
|
|
+import org.apache.hadoop.yarn.server.globalpolicygenerator.webapp.GPGWebApp;
|
|
|
+import org.apache.hadoop.yarn.webapp.WebApp;
|
|
|
+import org.apache.hadoop.yarn.webapp.WebApps;
|
|
|
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
/**
|
|
|
* Global Policy Generator (GPG) is a Yarn Federation component. By tuning the
|
|
|
* Federation policies in Federation State Store, GPG overlooks the entire
|
|
@@ -60,6 +72,7 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
|
|
private AtomicBoolean isStopping = new AtomicBoolean(false);
|
|
|
private static final String METRICS_NAME = "Global Policy Generator";
|
|
|
+ private static long gpgStartupTime = System.currentTimeMillis();
|
|
|
|
|
|
// Federation Variables
|
|
|
private GPGContext gpgContext;
|
|
@@ -70,6 +83,9 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
private SubClusterCleaner subClusterCleaner;
|
|
|
private ApplicationCleaner applicationCleaner;
|
|
|
private PolicyGenerator policyGenerator;
|
|
|
+ private String webAppAddress;
|
|
|
+ private JvmPauseMonitor pauseMonitor;
|
|
|
+ private WebApp webApp;
|
|
|
|
|
|
public GlobalPolicyGenerator() {
|
|
|
super(GlobalPolicyGenerator.class.getName());
|
|
@@ -110,7 +126,13 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
|
|
|
this.policyGenerator = new PolicyGenerator(conf, this.gpgContext);
|
|
|
|
|
|
+ this.webAppAddress = WebAppUtils.getGPGWebAppURLWithoutScheme(conf);
|
|
|
+
|
|
|
DefaultMetricsSystem.initialize(METRICS_NAME);
|
|
|
+ JvmMetrics jm = JvmMetrics.initSingleton("GPG", null);
|
|
|
+ pauseMonitor = new JvmPauseMonitor();
|
|
|
+ addService(pauseMonitor);
|
|
|
+ jm.setPauseMonitor(pauseMonitor);
|
|
|
|
|
|
// super.serviceInit after all services are added
|
|
|
super.serviceInit(conf);
|
|
@@ -154,6 +176,7 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
LOG.info("Scheduled policygenerator with interval: {}",
|
|
|
DurationFormatUtils.formatDurationISO(policyGeneratorIntervalMillis));
|
|
|
}
|
|
|
+ startWepApp();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -176,6 +199,9 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
if (this.isStopping.getAndSet(true)) {
|
|
|
return;
|
|
|
}
|
|
|
+ if (webApp != null) {
|
|
|
+ webApp.stop();
|
|
|
+ }
|
|
|
DefaultMetricsSystem.shutdown();
|
|
|
super.serviceStop();
|
|
|
}
|
|
@@ -202,6 +228,42 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
this.start();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public void startWepApp() {
|
|
|
+ boolean enableCors = getConfig().getBoolean(
|
|
|
+ YarnConfiguration.GPG_WEBAPP_ENABLE_CORS_FILTER,
|
|
|
+ YarnConfiguration.DEFAULT_GPG_WEBAPP_ENABLE_CORS_FILTER);
|
|
|
+ if (enableCors) {
|
|
|
+ getConfig().setBoolean(HttpCrossOriginFilterInitializer.PREFIX
|
|
|
+ + HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Always load pseudo authentication filter to parse "user.name" in an URL
|
|
|
+ // to identify a HTTP request's user.
|
|
|
+ boolean hasHadoopAuthFilterInitializer = false;
|
|
|
+ String filterInitializerConfKey = "hadoop.http.filter.initializers";
|
|
|
+ Class<?>[] initializersClasses = getConfig()
|
|
|
+ .getClasses(filterInitializerConfKey);
|
|
|
+ List<String> targets = new ArrayList<String>();
|
|
|
+ if (initializersClasses != null) {
|
|
|
+ for (Class<?> initializer : initializersClasses) {
|
|
|
+ if (initializer.getName()
|
|
|
+ .equals(AuthenticationFilterInitializer.class.getName())) {
|
|
|
+ hasHadoopAuthFilterInitializer = true;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ targets.add(initializer.getName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!hasHadoopAuthFilterInitializer) {
|
|
|
+ targets.add(AuthenticationFilterInitializer.class.getName());
|
|
|
+ getConfig().set(filterInitializerConfKey, StringUtils.join(",", targets));
|
|
|
+ }
|
|
|
+ LOG.info("Instantiating GPGWebApp at " + webAppAddress);
|
|
|
+ GPGWebApp gpgWebApp = new GPGWebApp(this);
|
|
|
+ webApp = WebApps.$for("gpg").at(webAppAddress).start(gpgWebApp);
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("resource")
|
|
|
public static void startGPG(String[] argv, Configuration conf) {
|
|
|
boolean federationEnabled =
|
|
@@ -225,6 +287,10 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ public static long getGPGStartupTime() {
|
|
|
+ return gpgStartupTime;
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] argv) {
|
|
|
startGPG(argv, new YarnConfiguration());
|
|
|
}
|