|
@@ -139,11 +139,11 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
|
|
|
|
|
|
public FairSchedulerConfiguration getConf() {
|
|
|
- return this.conf;
|
|
|
+ return conf;
|
|
|
}
|
|
|
|
|
|
public QueueManager getQueueManager() {
|
|
|
- return this.queueMgr;
|
|
|
+ return queueMgr;
|
|
|
}
|
|
|
|
|
|
public List<FSQueueSchedulable> getQueueSchedulables() {
|
|
@@ -183,36 +183,34 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
* fair shares, deficits, minimum slot allocations, and amount of used and
|
|
|
* required resources per job.
|
|
|
*/
|
|
|
- protected void update() {
|
|
|
- synchronized (this) {
|
|
|
- queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
|
|
|
- updateRunnability(); // Set job runnability based on user/queue limits
|
|
|
- updatePreemptionVariables(); // Determine if any queues merit preemption
|
|
|
-
|
|
|
- // Update demands of apps and queues
|
|
|
- for (FSQueue queue: queueMgr.getQueues()) {
|
|
|
- queue.getQueueSchedulable().updateDemand();
|
|
|
- }
|
|
|
+ protected synchronized void update() {
|
|
|
+ queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
|
|
|
+ updateRunnability(); // Set job runnability based on user/queue limits
|
|
|
+ updatePreemptionVariables(); // Determine if any queues merit preemption
|
|
|
|
|
|
- // Compute fair shares based on updated demands
|
|
|
- List<FSQueueSchedulable> queueScheds = this.getQueueSchedulables();
|
|
|
- SchedulingAlgorithms.computeFairShares(
|
|
|
- queueScheds, clusterCapacity);
|
|
|
+ // Update demands of apps and queues
|
|
|
+ for (FSQueue queue: queueMgr.getQueues()) {
|
|
|
+ queue.getQueueSchedulable().updateDemand();
|
|
|
+ }
|
|
|
|
|
|
- // Update queue metrics for this queue
|
|
|
- for (FSQueueSchedulable sched : queueScheds) {
|
|
|
- sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
|
|
|
- }
|
|
|
+ // Compute fair shares based on updated demands
|
|
|
+ List<FSQueueSchedulable> queueScheds = getQueueSchedulables();
|
|
|
+ SchedulingAlgorithms.computeFairShares(
|
|
|
+ queueScheds, clusterCapacity);
|
|
|
|
|
|
- // Use the computed shares to assign shares within each queue
|
|
|
- for (FSQueue queue: queueMgr.getQueues()) {
|
|
|
- queue.getQueueSchedulable().redistributeShare();
|
|
|
- }
|
|
|
+ // Update queue metrics for this queue
|
|
|
+ for (FSQueueSchedulable sched : queueScheds) {
|
|
|
+ sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
|
|
|
+ }
|
|
|
|
|
|
- // Update recorded capacity of root queue (child queues are updated
|
|
|
- // when fair share is calculated).
|
|
|
- rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
|
|
|
+ // Use the computed shares to assign shares within each queue
|
|
|
+ for (FSQueue queue: queueMgr.getQueues()) {
|
|
|
+ queue.getQueueSchedulable().redistributeShare();
|
|
|
}
|
|
|
+
|
|
|
+ // Update recorded capacity of root queue (child queues are updated
|
|
|
+ // when fair share is calculated).
|
|
|
+ rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -257,17 +255,16 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
* have been below half their fair share for the fairSharePreemptionTimeout.
|
|
|
* If such queues exist, compute how many tasks of each type need to be
|
|
|
* preempted and then select the right ones using preemptTasks.
|
|
|
- *
|
|
|
- * This method computes and logs the number of tasks we want to preempt even
|
|
|
- * if preemption is disabled, for debugging purposes.
|
|
|
*/
|
|
|
protected void preemptTasksIfNecessary() {
|
|
|
- if (!preemptionEnabled)
|
|
|
+ if (!preemptionEnabled) {
|
|
|
return;
|
|
|
+ }
|
|
|
|
|
|
long curTime = clock.getTime();
|
|
|
- if (curTime - lastPreemptCheckTime < preemptionInterval)
|
|
|
+ if (curTime - lastPreemptCheckTime < preemptionInterval) {
|
|
|
return;
|
|
|
+ }
|
|
|
lastPreemptCheckTime = curTime;
|
|
|
|
|
|
Resource resToPreempt = Resources.none();
|
|
@@ -288,8 +285,9 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
* lowest priority to preempt.
|
|
|
*/
|
|
|
protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
|
|
|
- if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none()))
|
|
|
+ if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
|
|
|
return;
|
|
|
+ }
|
|
|
|
|
|
Map<RMContainer, FSSchedulerApp> apps =
|
|
|
new HashMap<RMContainer, FSSchedulerApp>();
|
|
@@ -330,7 +328,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
|
|
|
// TODO: Not sure if this ever actually adds this to the list of cleanup
|
|
|
// containers on the RMNode (see SchedulerNode.releaseContainer()).
|
|
|
- this.completedContainer(container, status, RMContainerEventType.KILL);
|
|
|
+ completedContainer(container, status, RMContainerEventType.KILL);
|
|
|
|
|
|
toPreempt = Resources.subtract(toPreempt,
|
|
|
container.getContainer().getResource());
|
|
@@ -413,7 +411,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
}
|
|
|
|
|
|
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
|
|
|
- return this.rmContext.getContainerTokenSecretManager();
|
|
|
+ return rmContext.getContainerTokenSecretManager();
|
|
|
}
|
|
|
|
|
|
public double getAppWeight(AppSchedulable app) {
|
|
@@ -437,28 +435,28 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
|
|
|
@Override
|
|
|
public Resource getMinimumResourceCapability() {
|
|
|
- return this.minimumAllocation;
|
|
|
+ return minimumAllocation;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public Resource getMaximumResourceCapability() {
|
|
|
- return this.maximumAllocation;
|
|
|
+ return maximumAllocation;
|
|
|
}
|
|
|
|
|
|
public double getNodeLocalityThreshold() {
|
|
|
- return this.nodeLocalityThreshold;
|
|
|
+ return nodeLocalityThreshold;
|
|
|
}
|
|
|
|
|
|
public double getRackLocalityThreshold() {
|
|
|
- return this.rackLocalityThreshold;
|
|
|
+ return rackLocalityThreshold;
|
|
|
}
|
|
|
|
|
|
public Resource getClusterCapacity() {
|
|
|
- return this.clusterCapacity;
|
|
|
+ return clusterCapacity;
|
|
|
}
|
|
|
|
|
|
public Clock getClock() {
|
|
|
- return this.clock;
|
|
|
+ return clock;
|
|
|
}
|
|
|
|
|
|
protected void setClock(Clock clock) {
|
|
@@ -478,11 +476,11 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
addApplication(ApplicationAttemptId applicationAttemptId,
|
|
|
String queueName, String user) {
|
|
|
|
|
|
- FSQueue queue = this.queueMgr.getQueue(queueName);
|
|
|
+ FSQueue queue = queueMgr.getQueue(queueName);
|
|
|
|
|
|
FSSchedulerApp schedulerApp =
|
|
|
new FSSchedulerApp(applicationAttemptId, user,
|
|
|
- queue.getQueueSchedulable(), new ActiveUsersManager(this.getRootQueueMetrics()),
|
|
|
+ queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()),
|
|
|
rmContext, null);
|
|
|
|
|
|
// Inforce ACLs
|
|
@@ -553,8 +551,8 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
application.stop(rmAppAttemptFinalState);
|
|
|
|
|
|
// Inform the queue
|
|
|
- FSQueue queue = this.queueMgr.getQueue(application.getQueue().getQueueName());
|
|
|
- queue.removeJob(application);
|
|
|
+ FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName());
|
|
|
+ queue.removeApp(application);
|
|
|
|
|
|
// Remove from our data-structure
|
|
|
applications.remove(applicationAttemptId);
|
|
@@ -600,7 +598,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
}
|
|
|
|
|
|
private synchronized void addNode(RMNode node) {
|
|
|
- this.nodes.put(node.getNodeID(), new FSSchedulerNode(node));
|
|
|
+ nodes.put(node.getNodeID(), new FSSchedulerNode(node));
|
|
|
Resources.addTo(clusterCapacity, node.getTotalCapability());
|
|
|
|
|
|
LOG.info("Added node " + node.getNodeAddress() +
|
|
@@ -608,7 +606,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
}
|
|
|
|
|
|
private synchronized void removeNode(RMNode rmNode) {
|
|
|
- FSSchedulerNode node = this.nodes.get(rmNode.getNodeID());
|
|
|
+ FSSchedulerNode node = nodes.get(rmNode.getNodeID());
|
|
|
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
|
|
|
|
|
|
// Remove running containers
|
|
@@ -631,7 +629,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
- this.nodes.remove(rmNode.getNodeID());
|
|
|
+ nodes.remove(rmNode.getNodeID());
|
|
|
LOG.info("Removed node " + rmNode.getNodeAddress() +
|
|
|
" cluster capacity: " + clusterCapacity);
|
|
|
}
|
|
@@ -669,10 +667,8 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
}
|
|
|
|
|
|
synchronized (application) {
|
|
|
-
|
|
|
if (!ask.isEmpty()) {
|
|
|
-
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("allocate: pre-update" +
|
|
|
" applicationAttemptId=" + appAttemptId +
|
|
|
" application=" + application.getApplicationId());
|
|
@@ -686,7 +682,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
application.showRequests();
|
|
|
}
|
|
|
|
|
|
- if(LOG.isDebugEnabled()) {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("allocate:" +
|
|
|
" applicationAttemptId=" + appAttemptId +
|
|
|
" #ask=" + ask.size());
|
|
@@ -764,7 +760,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
int assignedContainers = 0;
|
|
|
while (true) {
|
|
|
// At most one task is scheduled each iteration of this loop
|
|
|
- List<FSQueueSchedulable> scheds = this.getQueueSchedulables();
|
|
|
+ List<FSQueueSchedulable> scheds = getQueueSchedulables();
|
|
|
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
|
|
|
boolean assignedContainer = false;
|
|
|
for (FSQueueSchedulable sched : scheds) {
|
|
@@ -796,11 +792,11 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
@Override
|
|
|
public SchedulerAppReport getSchedulerAppInfo(
|
|
|
ApplicationAttemptId appAttemptId) {
|
|
|
- if (!this.applications.containsKey(appAttemptId)) {
|
|
|
+ if (!applications.containsKey(appAttemptId)) {
|
|
|
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
|
|
|
return null;
|
|
|
}
|
|
|
- return new SchedulerAppReport(this.applications.get(appAttemptId));
|
|
|
+ return new SchedulerAppReport(applications.get(appAttemptId));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -812,37 +808,30 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
public void handle(SchedulerEvent event) {
|
|
|
switch(event.getType()) {
|
|
|
case NODE_ADDED:
|
|
|
- {
|
|
|
if (!(event instanceof NodeAddedSchedulerEvent)) {
|
|
|
throw new RuntimeException("Unexpected event type: " + event);
|
|
|
}
|
|
|
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
|
|
|
addNode(nodeAddedEvent.getAddedRMNode());
|
|
|
- }
|
|
|
- break;
|
|
|
+ break;
|
|
|
case NODE_REMOVED:
|
|
|
- {
|
|
|
if (!(event instanceof NodeRemovedSchedulerEvent)) {
|
|
|
throw new RuntimeException("Unexpected event type: " + event);
|
|
|
}
|
|
|
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
|
|
|
removeNode(nodeRemovedEvent.getRemovedRMNode());
|
|
|
- }
|
|
|
- break;
|
|
|
+ break;
|
|
|
case NODE_UPDATE:
|
|
|
- {
|
|
|
if (!(event instanceof NodeUpdateSchedulerEvent)) {
|
|
|
throw new RuntimeException("Unexpected event type: " + event);
|
|
|
}
|
|
|
NodeUpdateSchedulerEvent nodeUpdatedEvent =
|
|
|
(NodeUpdateSchedulerEvent)event;
|
|
|
- this.nodeUpdate(nodeUpdatedEvent.getRMNode(),
|
|
|
+ nodeUpdate(nodeUpdatedEvent.getRMNode(),
|
|
|
nodeUpdatedEvent.getNewlyLaunchedContainers(),
|
|
|
nodeUpdatedEvent.getCompletedContainers());
|
|
|
- }
|
|
|
- break;
|
|
|
+ break;
|
|
|
case APP_ADDED:
|
|
|
- {
|
|
|
if (!(event instanceof AppAddedSchedulerEvent)) {
|
|
|
throw new RuntimeException("Unexpected event type: " + event);
|
|
|
}
|
|
@@ -857,20 +846,16 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
|
|
|
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
|
|
|
appAddedEvent.getUser());
|
|
|
- }
|
|
|
- break;
|
|
|
+ break;
|
|
|
case APP_REMOVED:
|
|
|
- {
|
|
|
if (!(event instanceof AppRemovedSchedulerEvent)) {
|
|
|
throw new RuntimeException("Unexpected event type: " + event);
|
|
|
}
|
|
|
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
|
|
|
- this.removeApplication(appRemovedEvent.getApplicationAttemptID(),
|
|
|
+ removeApplication(appRemovedEvent.getApplicationAttemptID(),
|
|
|
appRemovedEvent.getFinalAttemptState());
|
|
|
- }
|
|
|
- break;
|
|
|
+ break;
|
|
|
case CONTAINER_EXPIRED:
|
|
|
- {
|
|
|
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
|
|
|
throw new RuntimeException("Unexpected event type: " + event);
|
|
|
}
|
|
@@ -882,8 +867,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
containerId,
|
|
|
SchedulerUtils.EXPIRED_CONTAINER),
|
|
|
RMContainerEventType.EXPIRE);
|
|
|
- }
|
|
|
- break;
|
|
|
+ break;
|
|
|
default:
|
|
|
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
|
|
|
}
|
|
@@ -897,9 +881,9 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
@Override
|
|
|
public synchronized void
|
|
|
reinitialize(Configuration conf, RMContext rmContext) throws IOException {
|
|
|
- if (!this.initialized) {
|
|
|
+ if (!initialized) {
|
|
|
this.conf = new FairSchedulerConfiguration(conf);
|
|
|
- this.rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
|
|
|
+ rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
|
|
|
this.rmContext = rmContext;
|
|
|
this.clock = new SystemClock();
|
|
|
this.eventLog = new FairSchedulerEventLog();
|
|
@@ -973,7 +957,7 @@ public class FairScheduler implements ResourceScheduler {
|
|
|
|
|
|
@Override
|
|
|
public int getNumClusterNodes() {
|
|
|
- return this.nodes.size();
|
|
|
+ return nodes.size();
|
|
|
}
|
|
|
|
|
|
}
|