|
@@ -27,7 +27,6 @@ import org.apache.commons.logging.Log;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.commons.logging.LogFactory;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
-import org.apache.hadoop.metrics2.MetricsSystem;
|
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
@@ -41,7 +40,6 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
-import org.apache.hadoop.yarn.security.ApplicationTokenSecretManager;
|
|
|
|
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
|
import org.apache.hadoop.yarn.security.client.ClientToAMSecretManager;
|
|
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.RMDelegationTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
@@ -65,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp;
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
|
|
@@ -82,8 +81,10 @@ import org.apache.hadoop.yarn.webapp.WebApps.Builder;
|
|
|
|
|
|
/**
|
|
/**
|
|
* The ResourceManager is the main class that is a set of components.
|
|
* The ResourceManager is the main class that is a set of components.
|
|
|
|
+ * "I am the ResourceManager. All your resources are belong to us..."
|
|
*
|
|
*
|
|
*/
|
|
*/
|
|
|
|
+@SuppressWarnings("unchecked")
|
|
public class ResourceManager extends CompositeService implements Recoverable {
|
|
public class ResourceManager extends CompositeService implements Recoverable {
|
|
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
|
private static final Log LOG = LogFactory.getLog(ResourceManager.class);
|
|
public static final long clusterTimeStamp = System.currentTimeMillis();
|
|
public static final long clusterTimeStamp = System.currentTimeMillis();
|
|
@@ -94,8 +95,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
protected ContainerTokenSecretManager containerTokenSecretManager =
|
|
protected ContainerTokenSecretManager containerTokenSecretManager =
|
|
new ContainerTokenSecretManager();
|
|
new ContainerTokenSecretManager();
|
|
|
|
|
|
- protected ApplicationTokenSecretManager appTokenSecretManager =
|
|
|
|
- new ApplicationTokenSecretManager();
|
|
|
|
|
|
+ protected ApplicationTokenSecretManager appTokenSecretManager;
|
|
|
|
|
|
private Dispatcher rmDispatcher;
|
|
private Dispatcher rmDispatcher;
|
|
|
|
|
|
@@ -137,6 +137,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
this.rmDispatcher = createDispatcher();
|
|
this.rmDispatcher = createDispatcher();
|
|
addIfService(this.rmDispatcher);
|
|
addIfService(this.rmDispatcher);
|
|
|
|
|
|
|
|
+ this.appTokenSecretManager = createApplicationTokenSecretManager(conf);
|
|
|
|
+
|
|
this.containerAllocationExpirer = new ContainerAllocationExpirer(
|
|
this.containerAllocationExpirer = new ContainerAllocationExpirer(
|
|
this.rmDispatcher);
|
|
this.rmDispatcher);
|
|
addService(this.containerAllocationExpirer);
|
|
addService(this.containerAllocationExpirer);
|
|
@@ -147,8 +149,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
|
DelegationTokenRenewer tokenRenewer = createDelegationTokenRenewer();
|
|
addService(tokenRenewer);
|
|
addService(tokenRenewer);
|
|
|
|
|
|
- this.rmContext = new RMContextImpl(this.store, this.rmDispatcher,
|
|
|
|
- this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer);
|
|
|
|
|
|
+ this.rmContext =
|
|
|
|
+ new RMContextImpl(this.store, this.rmDispatcher,
|
|
|
|
+ this.containerAllocationExpirer, amLivelinessMonitor, tokenRenewer,
|
|
|
|
+ this.appTokenSecretManager);
|
|
|
|
|
|
// Register event handler for NodesListManager
|
|
// Register event handler for NodesListManager
|
|
this.nodesListManager = new NodesListManager(this.rmContext);
|
|
this.nodesListManager = new NodesListManager(this.rmContext);
|
|
@@ -175,10 +179,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
this.rmDispatcher.register(RMNodeEventType.class,
|
|
this.rmDispatcher.register(RMNodeEventType.class,
|
|
new NodeEventDispatcher(this.rmContext));
|
|
new NodeEventDispatcher(this.rmContext));
|
|
|
|
|
|
- //TODO change this to be random
|
|
|
|
- this.appTokenSecretManager.setMasterKey(ApplicationTokenSecretManager
|
|
|
|
- .createSecretKey("Dummy".getBytes()));
|
|
|
|
-
|
|
|
|
this.nmLivelinessMonitor = createNMLivelinessMonitor();
|
|
this.nmLivelinessMonitor = createNMLivelinessMonitor();
|
|
addService(this.nmLivelinessMonitor);
|
|
addService(this.nmLivelinessMonitor);
|
|
|
|
|
|
@@ -233,6 +233,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ protected ApplicationTokenSecretManager createApplicationTokenSecretManager(
|
|
|
|
+ Configuration conf) {
|
|
|
|
+ return new ApplicationTokenSecretManager(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
protected ResourceScheduler createScheduler() {
|
|
protected ResourceScheduler createScheduler() {
|
|
return ReflectionUtils.newInstance(this.conf.getClass(
|
|
return ReflectionUtils.newInstance(this.conf.getClass(
|
|
YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
|
|
YarnConfiguration.RM_SCHEDULER, FifoScheduler.class,
|
|
@@ -240,9 +245,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
}
|
|
}
|
|
|
|
|
|
protected ApplicationMasterLauncher createAMLauncher() {
|
|
protected ApplicationMasterLauncher createAMLauncher() {
|
|
- return new ApplicationMasterLauncher(
|
|
|
|
- this.appTokenSecretManager, this.clientToAMSecretManager,
|
|
|
|
- this.rmContext);
|
|
|
|
|
|
+ return new ApplicationMasterLauncher(this.clientToAMSecretManager,
|
|
|
|
+ this.rmContext);
|
|
}
|
|
}
|
|
|
|
|
|
private NMLivelinessMonitor createNMLivelinessMonitor() {
|
|
private NMLivelinessMonitor createNMLivelinessMonitor() {
|
|
@@ -273,6 +277,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
new LinkedBlockingQueue<SchedulerEvent>();
|
|
new LinkedBlockingQueue<SchedulerEvent>();
|
|
private final Thread eventProcessor;
|
|
private final Thread eventProcessor;
|
|
private volatile boolean stopped = false;
|
|
private volatile boolean stopped = false;
|
|
|
|
+ private boolean shouldExitOnError = false;
|
|
|
|
|
|
public SchedulerEventDispatcher(ResourceScheduler scheduler) {
|
|
public SchedulerEventDispatcher(ResourceScheduler scheduler) {
|
|
super(SchedulerEventDispatcher.class.getName());
|
|
super(SchedulerEventDispatcher.class.getName());
|
|
@@ -281,6 +286,14 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
this.eventProcessor.setName("ResourceManager Event Processor");
|
|
this.eventProcessor.setName("ResourceManager Event Processor");
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void init(Configuration conf) {
|
|
|
|
+ this.shouldExitOnError =
|
|
|
|
+ conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
|
|
|
|
+ Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR);
|
|
|
|
+ super.init(conf);
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public synchronized void start() {
|
|
public synchronized void start() {
|
|
this.eventProcessor.start();
|
|
this.eventProcessor.start();
|
|
@@ -306,8 +319,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
} catch (Throwable t) {
|
|
} catch (Throwable t) {
|
|
LOG.fatal("Error in handling event type " + event.getType()
|
|
LOG.fatal("Error in handling event type " + event.getType()
|
|
+ " to the scheduler", t);
|
|
+ " to the scheduler", t);
|
|
- if (getConfig().getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY,
|
|
|
|
- Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR)) {
|
|
|
|
|
|
+ if (shouldExitOnError) {
|
|
LOG.info("Exiting, bbye..");
|
|
LOG.info("Exiting, bbye..");
|
|
System.exit(-1);
|
|
System.exit(-1);
|
|
}
|
|
}
|
|
@@ -453,6 +465,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
throw new YarnException("Failed to login", ie);
|
|
throw new YarnException("Failed to login", ie);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ this.appTokenSecretManager.start();
|
|
|
|
+
|
|
startWepApp();
|
|
startWepApp();
|
|
DefaultMetricsSystem.initialize("ResourceManager");
|
|
DefaultMetricsSystem.initialize("ResourceManager");
|
|
JvmMetrics.initSingleton("ResourceManager", null);
|
|
JvmMetrics.initSingleton("ResourceManager", null);
|
|
@@ -487,6 +501,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
}
|
|
}
|
|
rmDTSecretManager.stopThreads();
|
|
rmDTSecretManager.stopThreads();
|
|
|
|
|
|
|
|
+ this.appTokenSecretManager.stop();
|
|
|
|
+
|
|
/*synchronized(shutdown) {
|
|
/*synchronized(shutdown) {
|
|
shutdown.set(true);
|
|
shutdown.set(true);
|
|
shutdown.notifyAll();
|
|
shutdown.notifyAll();
|
|
@@ -524,8 +540,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
}
|
|
}
|
|
|
|
|
|
protected ApplicationMasterService createApplicationMasterService() {
|
|
protected ApplicationMasterService createApplicationMasterService() {
|
|
- return new ApplicationMasterService(this.rmContext,
|
|
|
|
- this.appTokenSecretManager, scheduler);
|
|
|
|
|
|
+ return new ApplicationMasterService(this.rmContext, scheduler);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -571,6 +586,11 @@ public class ResourceManager extends CompositeService implements Recoverable {
|
|
return this.applicationACLsManager;
|
|
return this.applicationACLsManager;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Private
|
|
|
|
+ public ApplicationTokenSecretManager getApplicationTokenSecretManager(){
|
|
|
|
+ return this.appTokenSecretManager;
|
|
|
|
+ }
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void recover(RMState state) throws Exception {
|
|
public void recover(RMState state) throws Exception {
|
|
resourceTracker.recover(state);
|
|
resourceTracker.recover(state);
|