|
@@ -149,6 +149,7 @@ public class FairScheduler extends
|
|
|
|
|
|
// Aggregate metrics
|
|
|
FSQueueMetrics rootMetrics;
|
|
|
+ FSOpDurations fsOpDurations;
|
|
|
|
|
|
// Time when we last updated preemption vars
|
|
|
protected long lastPreemptionUpdateTime;
|
|
@@ -256,8 +257,11 @@ public class FairScheduler extends
|
|
|
while (!Thread.currentThread().isInterrupted()) {
|
|
|
try {
|
|
|
Thread.sleep(updateInterval);
|
|
|
+ long start = getClock().getTime();
|
|
|
update();
|
|
|
preemptTasksIfNecessary();
|
|
|
+ long duration = getClock().getTime() - start;
|
|
|
+ fsOpDurations.addUpdateThreadRunDuration(duration);
|
|
|
} catch (InterruptedException ie) {
|
|
|
LOG.warn("Update thread interrupted. Exiting.");
|
|
|
return;
|
|
@@ -294,6 +298,7 @@ public class FairScheduler extends
|
|
|
* required resources per job.
|
|
|
*/
|
|
|
protected synchronized void update() {
|
|
|
+ long start = getClock().getTime();
|
|
|
updatePreemptionVariables(); // Determine if any queues merit preemption
|
|
|
|
|
|
FSQueue rootQueue = queueMgr.getRootQueue();
|
|
@@ -317,6 +322,9 @@ public class FairScheduler extends
|
|
|
" Demand: " + rootQueue.getDemand());
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ long duration = getClock().getTime() - start;
|
|
|
+ fsOpDurations.addUpdateCallDuration(duration);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -325,7 +333,7 @@ public class FairScheduler extends
|
|
|
* for each type of task.
|
|
|
*/
|
|
|
private void updatePreemptionVariables() {
|
|
|
- long now = clock.getTime();
|
|
|
+ long now = getClock().getTime();
|
|
|
lastPreemptionUpdateTime = now;
|
|
|
for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
|
|
|
if (!isStarvedForMinShare(sched)) {
|
|
@@ -352,7 +360,8 @@ public class FairScheduler extends
|
|
|
* defined as being below half its fair share.
|
|
|
*/
|
|
|
boolean isStarvedForFairShare(FSLeafQueue sched) {
|
|
|
- Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR, clusterResource,
|
|
|
+ Resource desiredFairShare = Resources.min(RESOURCE_CALCULATOR,
|
|
|
+ clusterResource,
|
|
|
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
|
|
|
return Resources.lessThan(RESOURCE_CALCULATOR, clusterResource,
|
|
|
sched.getResourceUsage(), desiredFairShare);
|
|
@@ -370,7 +379,7 @@ public class FairScheduler extends
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- long curTime = clock.getTime();
|
|
|
+ long curTime = getClock().getTime();
|
|
|
if (curTime - lastPreemptCheckTime < preemptionInterval) {
|
|
|
return;
|
|
|
}
|
|
@@ -398,6 +407,7 @@ public class FairScheduler extends
|
|
|
* We make sure that no queue is placed below its fair share in the process.
|
|
|
*/
|
|
|
protected void preemptResources(Resource toPreempt) {
|
|
|
+ long start = getClock().getTime();
|
|
|
if (Resources.equals(toPreempt, Resources.none())) {
|
|
|
return;
|
|
|
}
|
|
@@ -448,6 +458,9 @@ public class FairScheduler extends
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ long duration = getClock().getTime() - start;
|
|
|
+ fsOpDurations.addPreemptCallDuration(duration);
|
|
|
}
|
|
|
|
|
|
protected void warnOrKillContainer(RMContainer container) {
|
|
@@ -463,7 +476,7 @@ public class FairScheduler extends
|
|
|
if (time != null) {
|
|
|
// if we asked for preemption more than maxWaitTimeBeforeKill ms ago,
|
|
|
// proceed with kill
|
|
|
- if (time + waitTimeBeforeKill < clock.getTime()) {
|
|
|
+ if (time + waitTimeBeforeKill < getClock().getTime()) {
|
|
|
ContainerStatus status =
|
|
|
SchedulerUtils.createPreemptedContainerStatus(
|
|
|
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
|
|
@@ -474,11 +487,11 @@ public class FairScheduler extends
|
|
|
completedContainer(container, status, RMContainerEventType.KILL);
|
|
|
LOG.info("Killing container" + container +
|
|
|
" (after waiting for premption for " +
|
|
|
- (clock.getTime() - time) + "ms)");
|
|
|
+ (getClock().getTime() - time) + "ms)");
|
|
|
}
|
|
|
} else {
|
|
|
// track the request in the FSSchedulerApp itself
|
|
|
- app.addPreemption(container, clock.getTime());
|
|
|
+ app.addPreemption(container, getClock().getTime());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -659,7 +672,7 @@ public class FairScheduler extends
|
|
|
rmContext);
|
|
|
if (transferStateFromPreviousAttempt) {
|
|
|
attempt.transferStateFromPreviousAttempt(application
|
|
|
- .getCurrentAppAttempt());
|
|
|
+ .getCurrentAppAttempt());
|
|
|
}
|
|
|
application.setCurrentAppAttempt(attempt);
|
|
|
|
|
@@ -960,6 +973,7 @@ public class FairScheduler extends
|
|
|
* Process a heartbeat update from a node.
|
|
|
*/
|
|
|
private synchronized void nodeUpdate(RMNode nm) {
|
|
|
+ long start = getClock().getTime();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
|
|
|
}
|
|
@@ -996,9 +1010,13 @@ public class FairScheduler extends
|
|
|
} else {
|
|
|
attemptScheduling(node);
|
|
|
}
|
|
|
+
|
|
|
+ long duration = getClock().getTime() - start;
|
|
|
+ fsOpDurations.addNodeUpdateDuration(duration);
|
|
|
}
|
|
|
|
|
|
void continuousSchedulingAttempt() throws InterruptedException {
|
|
|
+ long start = getClock().getTime();
|
|
|
List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
|
|
|
// Sort the nodes by space available on them, so that we offer
|
|
|
// containers on emptier nodes first, facilitating an even spread. This
|
|
@@ -1021,6 +1039,9 @@ public class FairScheduler extends
|
|
|
": " + ex.toString(), ex);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ long duration = getClock().getTime() - start;
|
|
|
+ fsOpDurations.addContinuousSchedulingRunDuration(duration);
|
|
|
}
|
|
|
|
|
|
/** Sort nodes by available resource */
|
|
@@ -1244,6 +1265,8 @@ public class FairScheduler extends
|
|
|
}
|
|
|
|
|
|
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
|
|
|
+ fsOpDurations = FSOpDurations.getInstance(true);
|
|
|
+
|
|
|
// This stores per-application scheduling information
|
|
|
this.applications =
|
|
|
new ConcurrentHashMap<ApplicationId,SchedulerApplication<FSSchedulerApp>>();
|