|
@@ -302,38 +302,13 @@ public class CapacityScheduler extends
|
|
|
IOException, YarnException {
|
|
|
writeLock.lock();
|
|
|
try {
|
|
|
- String confProviderStr = configuration.get(
|
|
|
- YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
|
|
|
- YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
|
|
|
- switch (confProviderStr) {
|
|
|
- case YarnConfiguration.FILE_CONFIGURATION_STORE:
|
|
|
- this.csConfProvider =
|
|
|
- new FileBasedCSConfigurationProvider(rmContext);
|
|
|
- break;
|
|
|
- case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
|
|
|
- case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
|
|
|
- case YarnConfiguration.ZK_CONFIGURATION_STORE:
|
|
|
- case YarnConfiguration.FS_CONFIGURATION_STORE:
|
|
|
- this.csConfProvider = new MutableCSConfigurationProvider(rmContext);
|
|
|
- break;
|
|
|
- default:
|
|
|
- throw new IOException("Invalid configuration store class: " +
|
|
|
- confProviderStr);
|
|
|
- }
|
|
|
+ this.csConfProvider = getCsConfProvider(configuration);
|
|
|
this.csConfProvider.init(configuration);
|
|
|
this.conf = this.csConfProvider.loadConfiguration(configuration);
|
|
|
validateConf(this.conf);
|
|
|
this.minimumAllocation = super.getMinimumAllocation();
|
|
|
initMaximumResourceCapability(super.getMaximumAllocation());
|
|
|
- this.calculator = this.conf.getResourceCalculator();
|
|
|
- if (this.calculator instanceof DefaultResourceCalculator
|
|
|
- && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
|
|
- throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
|
|
|
- + " used only memory as resource-type but invalid resource-types"
|
|
|
- + " specified " + ResourceUtils.getResourceTypes() + ". Use"
|
|
|
- + " DominantResourceCalculator instead to make effective use of"
|
|
|
- + " these resource-types");
|
|
|
- }
|
|
|
+ this.calculator = initResourceCalculator();
|
|
|
this.usePortForNodeName = this.conf.getUsePortForNodeName();
|
|
|
this.applications = new ConcurrentHashMap<>();
|
|
|
this.labelManager = rmContext.getNodeLabelManager();
|
|
@@ -341,71 +316,109 @@ public class CapacityScheduler extends
|
|
|
this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
|
|
|
this.labelManager, this.appPriorityACLManager);
|
|
|
this.queueManager.setCapacitySchedulerContext(this);
|
|
|
-
|
|
|
this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager();
|
|
|
-
|
|
|
this.activitiesManager = new ActivitiesManager(rmContext);
|
|
|
activitiesManager.init(conf);
|
|
|
initializeQueues(this.conf);
|
|
|
this.isLazyPreemptionEnabled = conf.getLazyPreemptionEnabled();
|
|
|
-
|
|
|
- scheduleAsynchronously = this.conf.getScheduleAynschronously();
|
|
|
- asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
|
|
|
- DEFAULT_ASYNC_SCHEDULER_INTERVAL);
|
|
|
-
|
|
|
this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled();
|
|
|
this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat();
|
|
|
-
|
|
|
- this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(
|
|
|
- getConfig());
|
|
|
-
|
|
|
- // number of threads for async scheduling
|
|
|
- int maxAsyncSchedulingThreads = this.conf.getInt(
|
|
|
- CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
|
|
|
- 1);
|
|
|
- maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
|
|
|
-
|
|
|
- if (scheduleAsynchronously) {
|
|
|
- asyncSchedulerThreads = new ArrayList<>();
|
|
|
- for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
|
|
|
- asyncSchedulerThreads.add(new AsyncScheduleThread(this));
|
|
|
- }
|
|
|
- resourceCommitterService = new ResourceCommitterService(this);
|
|
|
- asyncMaxPendingBacklogs = this.conf.getInt(
|
|
|
- CapacitySchedulerConfiguration.
|
|
|
- SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
|
|
|
- CapacitySchedulerConfiguration.
|
|
|
- DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
|
|
|
- }
|
|
|
+ this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast(getConfig());
|
|
|
+ initAsyncSchedulingProperties();
|
|
|
|
|
|
// Setup how many containers we can allocate for each round
|
|
|
offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit();
|
|
|
|
|
|
- // Register CS specific multi-node policies to common MultiNodeManager
|
|
|
- // which will add to a MultiNodeSorter which gives a pre-sorted list of
|
|
|
- // nodes to scheduler's allocation.
|
|
|
- multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
|
|
|
- if(rmContext.getMultiNodeSortingManager() != null) {
|
|
|
- rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
|
|
|
- multiNodePlacementEnabled,
|
|
|
- this.conf.getMultiNodePlacementPolicies());
|
|
|
- }
|
|
|
-
|
|
|
- LOG.info("Initialized CapacityScheduler with " + "calculator="
|
|
|
- + getResourceCalculator().getClass() + ", " + "minimumAllocation="
|
|
|
- + getMinimumResourceCapability() + ", " + "maximumAllocation="
|
|
|
- + getMaximumResourceCapability() + ", " + "asynchronousScheduling="
|
|
|
- + scheduleAsynchronously + ", " + "asyncScheduleInterval="
|
|
|
- + asyncScheduleInterval + "ms" + ",multiNodePlacementEnabled="
|
|
|
- + multiNodePlacementEnabled + ", " + "assignMultipleEnabled="
|
|
|
- + assignMultipleEnabled + ", " + "maxAssignPerHeartbeat="
|
|
|
- + maxAssignPerHeartbeat + ", " + "offswitchPerHeartbeatLimit="
|
|
|
- + offswitchPerHeartbeatLimit);
|
|
|
+ initMultiNodePlacement();
|
|
|
+ printSchedulerInitialized();
|
|
|
} finally {
|
|
|
writeLock.unlock();
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private CSConfigurationProvider getCsConfProvider(Configuration configuration)
|
|
|
+ throws IOException {
|
|
|
+ String confProviderStr = configuration.get(
|
|
|
+ YarnConfiguration.SCHEDULER_CONFIGURATION_STORE_CLASS,
|
|
|
+ YarnConfiguration.DEFAULT_CONFIGURATION_STORE);
|
|
|
+ switch (confProviderStr) {
|
|
|
+ case YarnConfiguration.FILE_CONFIGURATION_STORE:
|
|
|
+ return new FileBasedCSConfigurationProvider(rmContext);
|
|
|
+ case YarnConfiguration.MEMORY_CONFIGURATION_STORE:
|
|
|
+ case YarnConfiguration.LEVELDB_CONFIGURATION_STORE:
|
|
|
+ case YarnConfiguration.ZK_CONFIGURATION_STORE:
|
|
|
+ case YarnConfiguration.FS_CONFIGURATION_STORE:
|
|
|
+ return new MutableCSConfigurationProvider(rmContext);
|
|
|
+ default:
|
|
|
+ throw new IOException("Invalid configuration store class: " + confProviderStr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private ResourceCalculator initResourceCalculator() {
|
|
|
+ ResourceCalculator resourceCalculator = this.conf.getResourceCalculator();
|
|
|
+ if (resourceCalculator instanceof DefaultResourceCalculator
|
|
|
+ && ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
|
|
+ throw new YarnRuntimeException("RM uses DefaultResourceCalculator which"
|
|
|
+ + " used only memory as resource-type but invalid resource-types"
|
|
|
+ + " specified " + ResourceUtils.getResourceTypes() + ". Use"
|
|
|
+ + " DominantResourceCalculator instead to make effective use of"
|
|
|
+ + " these resource-types");
|
|
|
+ }
|
|
|
+ return resourceCalculator;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initAsyncSchedulingProperties() {
|
|
|
+ scheduleAsynchronously = this.conf.getScheduleAynschronously();
|
|
|
+ asyncScheduleInterval = this.conf.getLong(ASYNC_SCHEDULER_INTERVAL,
|
|
|
+ DEFAULT_ASYNC_SCHEDULER_INTERVAL);
|
|
|
+
|
|
|
+ // number of threads for async scheduling
|
|
|
+ int maxAsyncSchedulingThreads = this.conf.getInt(
|
|
|
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, 1);
|
|
|
+ maxAsyncSchedulingThreads = Math.max(maxAsyncSchedulingThreads, 1);
|
|
|
+
|
|
|
+ if (scheduleAsynchronously) {
|
|
|
+ asyncSchedulerThreads = new ArrayList<>();
|
|
|
+ for (int i = 0; i < maxAsyncSchedulingThreads; i++) {
|
|
|
+ asyncSchedulerThreads.add(new AsyncScheduleThread(this));
|
|
|
+ }
|
|
|
+ resourceCommitterService = new ResourceCommitterService(this);
|
|
|
+ asyncMaxPendingBacklogs = this.conf.getInt(
|
|
|
+ CapacitySchedulerConfiguration.
|
|
|
+ SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS,
|
|
|
+ CapacitySchedulerConfiguration.
|
|
|
+ DEFAULT_SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_PENDING_BACKLOGS);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void initMultiNodePlacement() {
|
|
|
+ // Register CS specific multi-node policies to common MultiNodeManager
|
|
|
+ // which will add to a MultiNodeSorter which gives a pre-sorted list of
|
|
|
+ // nodes to scheduler's allocation.
|
|
|
+ multiNodePlacementEnabled = this.conf.getMultiNodePlacementEnabled();
|
|
|
+ if (rmContext.getMultiNodeSortingManager() != null) {
|
|
|
+ rmContext.getMultiNodeSortingManager().registerMultiNodePolicyNames(
|
|
|
+ multiNodePlacementEnabled,
|
|
|
+ this.conf.getMultiNodePlacementPolicies());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void printSchedulerInitialized() {
|
|
|
+ LOG.info("Initialized CapacityScheduler with calculator={}, minimumAllocation={}, " +
|
|
|
+ "maximumAllocation={}, asynchronousScheduling={}, asyncScheduleInterval={} ms, " +
|
|
|
+ "multiNodePlacementEnabled={}, assignMultipleEnabled={}, maxAssignPerHeartbeat={}, " +
|
|
|
+ "offswitchPerHeartbeatLimit={}",
|
|
|
+ getResourceCalculator().getClass(),
|
|
|
+ getMinimumResourceCapability(),
|
|
|
+ getMaximumResourceCapability(),
|
|
|
+ scheduleAsynchronously,
|
|
|
+ asyncScheduleInterval,
|
|
|
+ multiNodePlacementEnabled,
|
|
|
+ assignMultipleEnabled,
|
|
|
+ maxAssignPerHeartbeat,
|
|
|
+ offswitchPerHeartbeatLimit);
|
|
|
+ }
|
|
|
+
|
|
|
private void startSchedulerThreads() {
|
|
|
writeLock.lock();
|
|
|
try {
|