|
@@ -18,8 +18,11 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.globalpolicygenerator;
|
|
|
|
|
|
+import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
|
|
|
+import org.apache.commons.lang.time.DurationFormatUtils;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.service.CompositeService;
|
|
@@ -28,6 +31,7 @@ import org.apache.hadoop.util.StringUtils;
|
|
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
|
|
|
+import org.apache.hadoop.yarn.server.globalpolicygenerator.subclustercleaner.SubClusterCleaner;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -55,36 +59,26 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
// Federation Variables
|
|
|
private GPGContext gpgContext;
|
|
|
|
|
|
+ // Scheduler service that runs tasks periodically
|
|
|
+ private ScheduledThreadPoolExecutor scheduledExecutorService;
|
|
|
+ private SubClusterCleaner subClusterCleaner;
|
|
|
+
|
|
|
public GlobalPolicyGenerator() {
|
|
|
super(GlobalPolicyGenerator.class.getName());
|
|
|
this.gpgContext = new GPGContextImpl();
|
|
|
}
|
|
|
|
|
|
- protected void initAndStart(Configuration conf, boolean hasToReboot) {
|
|
|
- try {
|
|
|
- // Remove the old hook if we are rebooting.
|
|
|
- if (hasToReboot && null != gpgShutdownHook) {
|
|
|
- ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
|
|
|
- }
|
|
|
-
|
|
|
- gpgShutdownHook = new CompositeServiceShutdownHook(this);
|
|
|
- ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
|
|
|
- SHUTDOWN_HOOK_PRIORITY);
|
|
|
-
|
|
|
- this.init(conf);
|
|
|
- this.start();
|
|
|
- } catch (Throwable t) {
|
|
|
- LOG.error("Error starting globalpolicygenerator", t);
|
|
|
- System.exit(-1);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
@Override
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
|
// Set up the context
|
|
|
this.gpgContext
|
|
|
.setStateStoreFacade(FederationStateStoreFacade.getInstance());
|
|
|
|
|
|
+ this.scheduledExecutorService = new ScheduledThreadPoolExecutor(
|
|
|
+ conf.getInt(YarnConfiguration.GPG_SCHEDULED_EXECUTOR_THREADS,
|
|
|
+ YarnConfiguration.DEFAULT_GPG_SCHEDULED_EXECUTOR_THREADS));
|
|
|
+ this.subClusterCleaner = new SubClusterCleaner(conf, this.gpgContext);
|
|
|
+
|
|
|
DefaultMetricsSystem.initialize(METRICS_NAME);
|
|
|
|
|
|
// super.serviceInit after all services are added
|
|
@@ -94,10 +88,32 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
@Override
|
|
|
protected void serviceStart() throws Exception {
|
|
|
super.serviceStart();
|
|
|
+
|
|
|
+ // Scheduler SubClusterCleaner service
|
|
|
+ long scCleanerIntervalMs = getConfig().getLong(
|
|
|
+ YarnConfiguration.GPG_SUBCLUSTER_CLEANER_INTERVAL_MS,
|
|
|
+ YarnConfiguration.DEFAULT_GPG_SUBCLUSTER_CLEANER_INTERVAL_MS);
|
|
|
+ if (scCleanerIntervalMs > 0) {
|
|
|
+ this.scheduledExecutorService.scheduleAtFixedRate(this.subClusterCleaner,
|
|
|
+ 0, scCleanerIntervalMs, TimeUnit.MILLISECONDS);
|
|
|
+ LOG.info("Scheduled sub-cluster cleaner with interval: {}",
|
|
|
+ DurationFormatUtils.formatDurationISO(scCleanerIntervalMs));
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected void serviceStop() throws Exception {
|
|
|
+ try {
|
|
|
+ if (this.scheduledExecutorService != null
|
|
|
+ && !this.scheduledExecutorService.isShutdown()) {
|
|
|
+ this.scheduledExecutorService.shutdown();
|
|
|
+ LOG.info("Stopped ScheduledExecutorService");
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.error("Failed to shutdown ScheduledExecutorService", e);
|
|
|
+ throw e;
|
|
|
+ }
|
|
|
+
|
|
|
if (this.isStopping.getAndSet(true)) {
|
|
|
return;
|
|
|
}
|
|
@@ -113,20 +129,40 @@ public class GlobalPolicyGenerator extends CompositeService {
|
|
|
return this.gpgContext;
|
|
|
}
|
|
|
|
|
|
+ private void initAndStart(Configuration conf, boolean hasToReboot) {
|
|
|
+ // Remove the old hook if we are rebooting.
|
|
|
+ if (hasToReboot && null != gpgShutdownHook) {
|
|
|
+ ShutdownHookManager.get().removeShutdownHook(gpgShutdownHook);
|
|
|
+ }
|
|
|
+
|
|
|
+ gpgShutdownHook = new CompositeServiceShutdownHook(this);
|
|
|
+ ShutdownHookManager.get().addShutdownHook(gpgShutdownHook,
|
|
|
+ SHUTDOWN_HOOK_PRIORITY);
|
|
|
+
|
|
|
+ this.init(conf);
|
|
|
+ this.start();
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("resource")
|
|
|
public static void startGPG(String[] argv, Configuration conf) {
|
|
|
boolean federationEnabled =
|
|
|
conf.getBoolean(YarnConfiguration.FEDERATION_ENABLED,
|
|
|
YarnConfiguration.DEFAULT_FEDERATION_ENABLED);
|
|
|
- if (federationEnabled) {
|
|
|
- Thread.setDefaultUncaughtExceptionHandler(
|
|
|
- new YarnUncaughtExceptionHandler());
|
|
|
- StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv,
|
|
|
- LOG);
|
|
|
- GlobalPolicyGenerator globalPolicyGenerator = new GlobalPolicyGenerator();
|
|
|
- globalPolicyGenerator.initAndStart(conf, false);
|
|
|
- } else {
|
|
|
- LOG.warn("Federation is not enabled. The gpg cannot start.");
|
|
|
+ try {
|
|
|
+ if (federationEnabled) {
|
|
|
+ Thread.setDefaultUncaughtExceptionHandler(
|
|
|
+ new YarnUncaughtExceptionHandler());
|
|
|
+ StringUtils.startupShutdownMessage(GlobalPolicyGenerator.class, argv,
|
|
|
+ LOG);
|
|
|
+ GlobalPolicyGenerator globalPolicyGenerator =
|
|
|
+ new GlobalPolicyGenerator();
|
|
|
+ globalPolicyGenerator.initAndStart(conf, false);
|
|
|
+ } else {
|
|
|
+ LOG.warn("Federation is not enabled. The gpg cannot start.");
|
|
|
+ }
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error starting globalpolicygenerator", t);
|
|
|
+ System.exit(-1);
|
|
|
}
|
|
|
}
|
|
|
|