|
@@ -117,7 +117,7 @@ public class FairScheduler extends
|
|
|
|
|
|
private Resource incrAllocation;
|
|
private Resource incrAllocation;
|
|
private QueueManager queueMgr;
|
|
private QueueManager queueMgr;
|
|
- private Clock clock;
|
|
|
|
|
|
+ private volatile Clock clock;
|
|
private boolean usePortForNodeName;
|
|
private boolean usePortForNodeName;
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
|
private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
|
@@ -555,11 +555,12 @@ public class FairScheduler extends
|
|
return continuousSchedulingSleepMs;
|
|
return continuousSchedulingSleepMs;
|
|
}
|
|
}
|
|
|
|
|
|
- public synchronized Clock getClock() {
|
|
|
|
|
|
+ public Clock getClock() {
|
|
return clock;
|
|
return clock;
|
|
}
|
|
}
|
|
|
|
|
|
- protected synchronized void setClock(Clock clock) {
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ void setClock(Clock clock) {
|
|
this.clock = clock;
|
|
this.clock = clock;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1204,64 +1205,65 @@ public class FairScheduler extends
|
|
this.rmContext = rmContext;
|
|
this.rmContext = rmContext;
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void initScheduler(Configuration conf)
|
|
|
|
- throws IOException {
|
|
|
|
- this.conf = new FairSchedulerConfiguration(conf);
|
|
|
|
- validateConf(this.conf);
|
|
|
|
- minimumAllocation = this.conf.getMinimumAllocation();
|
|
|
|
- maximumAllocation = this.conf.getMaximumAllocation();
|
|
|
|
- incrAllocation = this.conf.getIncrementAllocation();
|
|
|
|
- continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
|
|
|
- continuousSchedulingSleepMs =
|
|
|
|
- this.conf.getContinuousSchedulingSleepMs();
|
|
|
|
- nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
|
|
|
- rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
|
|
|
- nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
|
|
|
- rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
|
|
|
- preemptionEnabled = this.conf.getPreemptionEnabled();
|
|
|
|
- preemptionUtilizationThreshold =
|
|
|
|
- this.conf.getPreemptionUtilizationThreshold();
|
|
|
|
- assignMultiple = this.conf.getAssignMultiple();
|
|
|
|
- maxAssign = this.conf.getMaxAssign();
|
|
|
|
- sizeBasedWeight = this.conf.getSizeBasedWeight();
|
|
|
|
- preemptionInterval = this.conf.getPreemptionInterval();
|
|
|
|
- waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
|
|
|
- usePortForNodeName = this.conf.getUsePortForNodeName();
|
|
|
|
-
|
|
|
|
- updateInterval = this.conf.getUpdateInterval();
|
|
|
|
- if (updateInterval < 0) {
|
|
|
|
- updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
|
|
|
|
- LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
|
|
|
|
- + " is invalid, so using default value " +
|
|
|
|
- + FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
|
|
|
|
- + " ms instead");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
|
|
|
- fsOpDurations = FSOpDurations.getInstance(true);
|
|
|
|
-
|
|
|
|
- // This stores per-application scheduling information
|
|
|
|
- this.applications = new ConcurrentHashMap<
|
|
|
|
- ApplicationId, SchedulerApplication<FSAppAttempt>>();
|
|
|
|
- this.eventLog = new FairSchedulerEventLog();
|
|
|
|
- eventLog.init(this.conf);
|
|
|
|
-
|
|
|
|
- allocConf = new AllocationConfiguration(conf);
|
|
|
|
- try {
|
|
|
|
- queueMgr.initialize(conf);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- throw new IOException("Failed to start FairScheduler", e);
|
|
|
|
- }
|
|
|
|
|
|
+ private void initScheduler(Configuration conf) throws IOException {
|
|
|
|
+ synchronized (this) {
|
|
|
|
+ this.conf = new FairSchedulerConfiguration(conf);
|
|
|
|
+ validateConf(this.conf);
|
|
|
|
+ minimumAllocation = this.conf.getMinimumAllocation();
|
|
|
|
+ maximumAllocation = this.conf.getMaximumAllocation();
|
|
|
|
+ incrAllocation = this.conf.getIncrementAllocation();
|
|
|
|
+ continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
|
|
|
|
+ continuousSchedulingSleepMs =
|
|
|
|
+ this.conf.getContinuousSchedulingSleepMs();
|
|
|
|
+ nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
|
|
|
|
+ rackLocalityThreshold = this.conf.getLocalityThresholdRack();
|
|
|
|
+ nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
|
|
|
|
+ rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
|
|
|
|
+ preemptionEnabled = this.conf.getPreemptionEnabled();
|
|
|
|
+ preemptionUtilizationThreshold =
|
|
|
|
+ this.conf.getPreemptionUtilizationThreshold();
|
|
|
|
+ assignMultiple = this.conf.getAssignMultiple();
|
|
|
|
+ maxAssign = this.conf.getMaxAssign();
|
|
|
|
+ sizeBasedWeight = this.conf.getSizeBasedWeight();
|
|
|
|
+ preemptionInterval = this.conf.getPreemptionInterval();
|
|
|
|
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
|
|
|
|
+ usePortForNodeName = this.conf.getUsePortForNodeName();
|
|
|
|
+
|
|
|
|
+ updateInterval = this.conf.getUpdateInterval();
|
|
|
|
+ if (updateInterval < 0) {
|
|
|
|
+ updateInterval = FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS;
|
|
|
|
+ LOG.warn(FairSchedulerConfiguration.UPDATE_INTERVAL_MS
|
|
|
|
+ + " is invalid, so using default value " +
|
|
|
|
+ +FairSchedulerConfiguration.DEFAULT_UPDATE_INTERVAL_MS
|
|
|
|
+ + " ms instead");
|
|
|
|
+ }
|
|
|
|
|
|
- updateThread = new UpdateThread();
|
|
|
|
- updateThread.setName("FairSchedulerUpdateThread");
|
|
|
|
- updateThread.setDaemon(true);
|
|
|
|
|
|
+ rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
|
|
|
+ fsOpDurations = FSOpDurations.getInstance(true);
|
|
|
|
|
|
- if (continuousSchedulingEnabled) {
|
|
|
|
- // start continuous scheduling thread
|
|
|
|
- schedulingThread = new ContinuousSchedulingThread();
|
|
|
|
- schedulingThread.setName("FairSchedulerContinuousScheduling");
|
|
|
|
- schedulingThread.setDaemon(true);
|
|
|
|
|
|
+ // This stores per-application scheduling information
|
|
|
|
+ this.applications = new ConcurrentHashMap<
|
|
|
|
+ ApplicationId, SchedulerApplication<FSAppAttempt>>();
|
|
|
|
+ this.eventLog = new FairSchedulerEventLog();
|
|
|
|
+ eventLog.init(this.conf);
|
|
|
|
+
|
|
|
|
+ allocConf = new AllocationConfiguration(conf);
|
|
|
|
+ try {
|
|
|
|
+ queueMgr.initialize(conf);
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new IOException("Failed to start FairScheduler", e);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ updateThread = new UpdateThread();
|
|
|
|
+ updateThread.setName("FairSchedulerUpdateThread");
|
|
|
|
+ updateThread.setDaemon(true);
|
|
|
|
+
|
|
|
|
+ if (continuousSchedulingEnabled) {
|
|
|
|
+ // start continuous scheduling thread
|
|
|
|
+ schedulingThread = new ContinuousSchedulingThread();
|
|
|
|
+ schedulingThread.setName("FairSchedulerContinuousScheduling");
|
|
|
|
+ schedulingThread.setDaemon(true);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
allocsLoader.init(conf);
|
|
allocsLoader.init(conf);
|
|
@@ -1321,7 +1323,7 @@ public class FairScheduler extends
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public synchronized void reinitialize(Configuration conf, RMContext rmContext)
|
|
|
|
|
|
+ public void reinitialize(Configuration conf, RMContext rmContext)
|
|
throws IOException {
|
|
throws IOException {
|
|
try {
|
|
try {
|
|
allocsLoader.reloadAllocations();
|
|
allocsLoader.reloadAllocations();
|