|
@@ -20,13 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
import java.util.Comparator;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
|
@@ -186,14 +184,11 @@ public class FairScheduler extends
|
|
|
private float reservableNodesRatio; // percentage of available nodes
|
|
|
// an app can be reserved on
|
|
|
|
|
|
- // Count of number of nodes per rack
|
|
|
- private Map<String, Integer> nodesPerRack = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
|
|
|
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
|
|
|
protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
|
|
|
protected int continuousSchedulingSleepMs; // Sleep time for each pass in continuous scheduling
|
|
|
- private Comparator<NodeId> nodeAvailableResourceComparator =
|
|
|
+ private Comparator<FSSchedulerNode> nodeAvailableResourceComparator =
|
|
|
new NodeAvailableResourceComparator(); // Node available resource comparator
|
|
|
protected double nodeLocalityThreshold; // Cluster threshold for node locality
|
|
|
protected double rackLocalityThreshold; // Cluster threshold for rack locality
|
|
@@ -225,8 +220,8 @@ public class FairScheduler extends
|
|
|
|
|
|
public boolean isAtLeastReservationThreshold(
|
|
|
ResourceCalculator resourceCalculator, Resource resource) {
|
|
|
- return Resources.greaterThanOrEqual(
|
|
|
- resourceCalculator, clusterResource, resource, reservationThreshold);
|
|
|
+ return Resources.greaterThanOrEqual(resourceCalculator,
|
|
|
+ getClusterResource(), resource, reservationThreshold);
|
|
|
}
|
|
|
|
|
|
private void validateConf(Configuration conf) {
|
|
@@ -272,11 +267,7 @@ public class FairScheduler extends
|
|
|
}
|
|
|
|
|
|
public int getNumNodesInRack(String rackName) {
|
|
|
- String rName = rackName == null ? "NULL" : rackName;
|
|
|
- if (nodesPerRack.containsKey(rName)) {
|
|
|
- return nodesPerRack.get(rName);
|
|
|
- }
|
|
|
- return 0;
|
|
|
+ return nodeTracker.nodeCount(rackName);
|
|
|
}
|
|
|
|
|
|
public QueueManager getQueueManager() {
|
|
@@ -352,6 +343,7 @@ public class FairScheduler extends
|
|
|
// Recursively update demands for all queues
|
|
|
rootQueue.updateDemand();
|
|
|
|
|
|
+ Resource clusterResource = getClusterResource();
|
|
|
rootQueue.setFairShare(clusterResource);
|
|
|
// Recursively compute fair shares for all queues
|
|
|
// and update metrics
|
|
@@ -526,6 +518,7 @@ public class FairScheduler extends
|
|
|
Resource resDueToMinShare = Resources.none();
|
|
|
Resource resDueToFairShare = Resources.none();
|
|
|
ResourceCalculator calc = sched.getPolicy().getResourceCalculator();
|
|
|
+ Resource clusterResource = getClusterResource();
|
|
|
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
|
|
|
Resource target = Resources.componentwiseMin(
|
|
|
sched.getMinShare(), sched.getDemand());
|
|
@@ -577,7 +570,7 @@ public class FairScheduler extends
|
|
|
}
|
|
|
|
|
|
private FSSchedulerNode getFSSchedulerNode(NodeId nodeId) {
|
|
|
- return nodes.get(nodeId);
|
|
|
+ return nodeTracker.getNode(nodeId);
|
|
|
}
|
|
|
|
|
|
public double getNodeLocalityThreshold() {
|
|
@@ -882,18 +875,11 @@ public class FairScheduler extends
|
|
|
private synchronized void addNode(List<NMContainerStatus> containerReports,
|
|
|
RMNode node) {
|
|
|
FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
|
|
|
- nodes.put(node.getNodeID(), schedulerNode);
|
|
|
- String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
|
|
|
- if (nodesPerRack.containsKey(rackName)) {
|
|
|
- nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1);
|
|
|
- } else {
|
|
|
- nodesPerRack.put(rackName, 1);
|
|
|
- }
|
|
|
- Resources.addTo(clusterResource, schedulerNode.getTotalResource());
|
|
|
- updateMaximumAllocation(schedulerNode, true);
|
|
|
+ nodeTracker.addNode(schedulerNode);
|
|
|
|
|
|
triggerUpdate();
|
|
|
|
|
|
+ Resource clusterResource = getClusterResource();
|
|
|
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
|
|
|
queueMgr.getRootQueue().recomputeSteadyShares();
|
|
|
LOG.info("Added node " + node.getNodeAddress() +
|
|
@@ -904,15 +890,12 @@ public class FairScheduler extends
|
|
|
}
|
|
|
|
|
|
private synchronized void removeNode(RMNode rmNode) {
|
|
|
- FSSchedulerNode node = getFSSchedulerNode(rmNode.getNodeID());
|
|
|
- // This can occur when an UNHEALTHY node reconnects
|
|
|
+ NodeId nodeId = rmNode.getNodeID();
|
|
|
+ FSSchedulerNode node = nodeTracker.getNode(nodeId);
|
|
|
if (node == null) {
|
|
|
+ LOG.error("Attempting to remove non-existent node " + nodeId);
|
|
|
return;
|
|
|
}
|
|
|
- Resources.subtractFrom(clusterResource, node.getTotalResource());
|
|
|
- updateRootQueueMetrics();
|
|
|
-
|
|
|
- triggerUpdate();
|
|
|
|
|
|
// Remove running containers
|
|
|
List<RMContainer> runningContainers = node.getRunningContainers();
|
|
@@ -934,18 +917,13 @@ public class FairScheduler extends
|
|
|
RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
- nodes.remove(rmNode.getNodeID());
|
|
|
- String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
|
|
|
- if (nodesPerRack.containsKey(rackName)
|
|
|
- && (nodesPerRack.get(rackName) > 0)) {
|
|
|
- nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1);
|
|
|
- } else {
|
|
|
- LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" +
|
|
|
- " unknown rack [" + rackName + "] !!");
|
|
|
- }
|
|
|
+ nodeTracker.removeNode(nodeId);
|
|
|
+ Resource clusterResource = getClusterResource();
|
|
|
queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
|
|
|
queueMgr.getRootQueue().recomputeSteadyShares();
|
|
|
- updateMaximumAllocation(node, false);
|
|
|
+ updateRootQueueMetrics();
|
|
|
+ triggerUpdate();
|
|
|
+
|
|
|
LOG.info("Removed node " + rmNode.getNodeAddress() +
|
|
|
" cluster capacity: " + clusterResource);
|
|
|
}
|
|
@@ -967,7 +945,7 @@ public class FairScheduler extends
|
|
|
|
|
|
// Sanity check
|
|
|
SchedulerUtils.normalizeRequests(ask, DOMINANT_RESOURCE_CALCULATOR,
|
|
|
- clusterResource, minimumAllocation, getMaximumResourceCapability(),
|
|
|
+ getClusterResource(), minimumAllocation, getMaximumResourceCapability(),
|
|
|
incrAllocation);
|
|
|
|
|
|
// Record container allocation start time
|
|
@@ -1034,7 +1012,8 @@ public class FairScheduler extends
|
|
|
private synchronized void nodeUpdate(RMNode nm) {
|
|
|
long start = getClock().getTime();
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterResource);
|
|
|
+ LOG.debug("nodeUpdate: " + nm +
|
|
|
+ " cluster capacity: " + getClusterResource());
|
|
|
}
|
|
|
eventLog.log("HEARTBEAT", nm.getHostName());
|
|
|
FSSchedulerNode node = getFSSchedulerNode(nm.getNodeID());
|
|
@@ -1091,20 +1070,13 @@ public class FairScheduler extends
|
|
|
|
|
|
void continuousSchedulingAttempt() throws InterruptedException {
|
|
|
long start = getClock().getTime();
|
|
|
- List<NodeId> nodeIdList = new ArrayList<NodeId>(nodes.keySet());
|
|
|
- // Sort the nodes by space available on them, so that we offer
|
|
|
- // containers on emptier nodes first, facilitating an even spread. This
|
|
|
- // requires holding the scheduler lock, so that the space available on a
|
|
|
- // node doesn't change during the sort.
|
|
|
- synchronized (this) {
|
|
|
- Collections.sort(nodeIdList, nodeAvailableResourceComparator);
|
|
|
- }
|
|
|
+ List<FSSchedulerNode> nodeIdList =
|
|
|
+ nodeTracker.sortedNodeList(nodeAvailableResourceComparator);
|
|
|
|
|
|
// iterate all nodes
|
|
|
- for (NodeId nodeId : nodeIdList) {
|
|
|
- FSSchedulerNode node = getFSSchedulerNode(nodeId);
|
|
|
+ for (FSSchedulerNode node : nodeIdList) {
|
|
|
try {
|
|
|
- if (node != null && Resources.fitsIn(minimumAllocation,
|
|
|
+ if (Resources.fitsIn(minimumAllocation,
|
|
|
node.getUnallocatedResource())) {
|
|
|
attemptScheduling(node);
|
|
|
}
|
|
@@ -1126,19 +1098,14 @@ public class FairScheduler extends
|
|
|
}
|
|
|
|
|
|
/** Sort nodes by available resource */
|
|
|
- private class NodeAvailableResourceComparator implements Comparator<NodeId> {
|
|
|
+ private class NodeAvailableResourceComparator
|
|
|
+ implements Comparator<FSSchedulerNode> {
|
|
|
|
|
|
@Override
|
|
|
- public int compare(NodeId n1, NodeId n2) {
|
|
|
- if (!nodes.containsKey(n1)) {
|
|
|
- return 1;
|
|
|
- }
|
|
|
- if (!nodes.containsKey(n2)) {
|
|
|
- return -1;
|
|
|
- }
|
|
|
- return RESOURCE_CALCULATOR.compare(clusterResource,
|
|
|
- nodes.get(n2).getUnallocatedResource(),
|
|
|
- nodes.get(n1).getUnallocatedResource());
|
|
|
+ public int compare(FSSchedulerNode n1, FSSchedulerNode n2) {
|
|
|
+ return RESOURCE_CALCULATOR.compare(getClusterResource(),
|
|
|
+ n2.getUnallocatedResource(),
|
|
|
+ n1.getUnallocatedResource());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1150,7 +1117,7 @@ public class FairScheduler extends
|
|
|
}
|
|
|
|
|
|
final NodeId nodeID = node.getNodeID();
|
|
|
- if (!nodes.containsKey(nodeID)) {
|
|
|
+ if (!nodeTracker.exists(nodeID)) {
|
|
|
// The node might have just been removed while this thread was waiting
|
|
|
// on the synchronized lock before it entered this synchronized method
|
|
|
LOG.info("Skipping scheduling as the node " + nodeID +
|
|
@@ -1203,7 +1170,7 @@ public class FairScheduler extends
|
|
|
private void updateRootQueueMetrics() {
|
|
|
rootMetrics.setAvailableResourcesToQueue(
|
|
|
Resources.subtract(
|
|
|
- clusterResource, rootMetrics.getAllocatedResources()));
|
|
|
+ getClusterResource(), rootMetrics.getAllocatedResources()));
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -1214,6 +1181,7 @@ public class FairScheduler extends
|
|
|
*/
|
|
|
private boolean shouldAttemptPreemption() {
|
|
|
if (preemptionEnabled) {
|
|
|
+ Resource clusterResource = getClusterResource();
|
|
|
return (preemptionUtilizationThreshold < Math.max(
|
|
|
(float) rootMetrics.getAllocatedMB() / clusterResource.getMemory(),
|
|
|
(float) rootMetrics.getAllocatedVirtualCores() /
|
|
@@ -1547,7 +1515,7 @@ public class FairScheduler extends
|
|
|
|
|
|
@Override
|
|
|
public int getNumClusterNodes() {
|
|
|
- return nodes.size();
|
|
|
+ return nodeTracker.nodeCount();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1577,7 +1545,7 @@ public class FairScheduler extends
|
|
|
// if it does not already exist, so it can be displayed on the web UI.
|
|
|
synchronized (FairScheduler.this) {
|
|
|
allocConf = queueInfo;
|
|
|
- allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
|
|
|
+ allocConf.getDefaultSchedulingPolicy().initialize(getClusterResource());
|
|
|
queueMgr.updateAllocationConfiguration(allocConf);
|
|
|
maxRunningEnforcer.updateRunnabilityOnReload();
|
|
|
}
|
|
@@ -1721,7 +1689,7 @@ public class FairScheduler extends
|
|
|
ResourceOption resourceOption) {
|
|
|
super.updateNodeResource(nm, resourceOption);
|
|
|
updateRootQueueMetrics();
|
|
|
- queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
|
|
|
+ queueMgr.getRootQueue().setSteadyFairShare(getClusterResource());
|
|
|
queueMgr.getRootQueue().recomputeSteadyShares();
|
|
|
}
|
|
|
|