|
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
-import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
@@ -32,6 +31,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerRequestKey;
|
|
|
+
|
|
|
+
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
|
|
@@ -80,7 +83,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
private ContainerAllocation preCheckForNewContainer(Resource clusterResource,
|
|
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
|
|
- ResourceLimits resourceLimits, Priority priority) {
|
|
|
+ ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey) {
|
|
|
if (SchedulerAppUtils.isPlaceBlacklisted(application, node, LOG)) {
|
|
|
application.updateAppSkipNodeDiagnostics(
|
|
|
CSAMContainerLaunchDiagnosticsConstants.SKIP_AM_ALLOCATION_IN_BLACK_LISTED_NODE);
|
|
@@ -88,7 +91,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
ResourceRequest anyRequest =
|
|
|
- application.getResourceRequest(priority, ResourceRequest.ANY);
|
|
|
+ application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
|
|
|
if (null == anyRequest) {
|
|
|
return ContainerAllocation.PRIORITY_SKIPPED;
|
|
|
}
|
|
@@ -97,7 +100,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
Resource required = anyRequest.getCapability();
|
|
|
|
|
|
// Do we need containers at this 'priority'?
|
|
|
- if (application.getTotalRequiredResources(priority) <= 0) {
|
|
|
+ if (application.getTotalRequiredResources(schedulerKey) <= 0) {
|
|
|
return ContainerAllocation.PRIORITY_SKIPPED;
|
|
|
}
|
|
|
|
|
@@ -126,7 +129,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
if (!application.getCSLeafQueue().getReservationContinueLooking()) {
|
|
|
- if (!shouldAllocOrReserveNewContainer(priority, required)) {
|
|
|
+ if (!shouldAllocOrReserveNewContainer(schedulerKey, required)) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("doesn't need containers based on reservation algo!");
|
|
|
}
|
|
@@ -143,7 +146,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
// Inform the application it is about to get a scheduling opportunity
|
|
|
- application.addSchedulingOpportunity(priority);
|
|
|
+ application.addSchedulingOpportunity(schedulerKey);
|
|
|
|
|
|
// Increase missed-non-partitioned-resource-request-opportunity.
|
|
|
// This is to make sure non-partitioned-resource-request will prefer
|
|
@@ -152,8 +155,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
if (anyRequest.getNodeLabelExpression()
|
|
|
.equals(RMNodeLabelsManager.NO_LABEL)) {
|
|
|
missedNonPartitionedRequestSchedulingOpportunity =
|
|
|
- application
|
|
|
- .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
|
|
|
+ application.addMissedNonPartitionedRequestSchedulingOpportunity(
|
|
|
+ schedulerKey);
|
|
|
}
|
|
|
|
|
|
if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
|
@@ -164,7 +167,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
.getScheduler().getNumClusterNodes()) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
|
|
|
- + " priority=" + priority
|
|
|
+ + " priority=" + schedulerKey.getPriority()
|
|
|
+ " because missed-non-partitioned-resource-request"
|
|
|
+ " opportunity under requred:" + " Now="
|
|
|
+ missedNonPartitionedRequestSchedulingOpportunity + " required="
|
|
@@ -180,20 +183,20 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
ContainerAllocation preAllocation(Resource clusterResource,
|
|
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
|
|
- ResourceLimits resourceLimits, Priority priority,
|
|
|
+ ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
|
|
|
RMContainer reservedContainer) {
|
|
|
ContainerAllocation result;
|
|
|
if (null == reservedContainer) {
|
|
|
// pre-check when allocating new container
|
|
|
result =
|
|
|
preCheckForNewContainer(clusterResource, node, schedulingMode,
|
|
|
- resourceLimits, priority);
|
|
|
+ resourceLimits, schedulerKey);
|
|
|
if (null != result) {
|
|
|
return result;
|
|
|
}
|
|
|
} else {
|
|
|
// pre-check when allocating reserved container
|
|
|
- if (application.getTotalRequiredResources(priority) == 0) {
|
|
|
+ if (application.getTotalRequiredResources(schedulerKey) == 0) {
|
|
|
// Release
|
|
|
return new ContainerAllocation(reservedContainer, null,
|
|
|
AllocationState.QUEUE_SKIPPED);
|
|
@@ -202,13 +205,13 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
// Try to allocate containers on node
|
|
|
result =
|
|
|
- assignContainersOnNode(clusterResource, node, priority,
|
|
|
+ assignContainersOnNode(clusterResource, node, schedulerKey,
|
|
|
reservedContainer, schedulingMode, resourceLimits);
|
|
|
|
|
|
if (null == reservedContainer) {
|
|
|
if (result.state == AllocationState.PRIORITY_SKIPPED) {
|
|
|
// Don't count 'skipped nodes' as a scheduling opportunity!
|
|
|
- application.subtractSchedulingOpportunity(priority);
|
|
|
+ application.subtractSchedulingOpportunity(schedulerKey);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -216,10 +219,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
public synchronized float getLocalityWaitFactor(
|
|
|
- Priority priority, int clusterNodes) {
|
|
|
+ SchedulerRequestKey schedulerKey, int clusterNodes) {
|
|
|
// Estimate: Required unique resources (i.e. hosts + racks)
|
|
|
int requiredResources =
|
|
|
- Math.max(application.getResourceRequests(priority).size() - 1, 0);
|
|
|
+ Math.max(application.getResourceRequests(schedulerKey).size() - 1, 0);
|
|
|
|
|
|
// waitFactor can't be more than '1'
|
|
|
// i.e. no point skipping more than clustersize opportunities
|
|
@@ -231,8 +234,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
.getCSLeafQueue().getNodeLocalityDelay());
|
|
|
}
|
|
|
|
|
|
- private boolean canAssign(Priority priority, FiCaSchedulerNode node,
|
|
|
- NodeType type, RMContainer reservedContainer) {
|
|
|
+ private boolean canAssign(SchedulerRequestKey schedulerKey,
|
|
|
+ FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
|
|
|
|
|
|
// Clearly we need containers for this application...
|
|
|
if (type == NodeType.OFF_SWITCH) {
|
|
@@ -242,15 +245,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
// 'Delay' off-switch
|
|
|
ResourceRequest offSwitchRequest =
|
|
|
- application.getResourceRequest(priority, ResourceRequest.ANY);
|
|
|
- long missedOpportunities = application.getSchedulingOpportunities(priority);
|
|
|
+ application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
|
|
|
+ long missedOpportunities =
|
|
|
+ application.getSchedulingOpportunities(schedulerKey);
|
|
|
long requiredContainers = offSwitchRequest.getNumContainers();
|
|
|
|
|
|
float localityWaitFactor =
|
|
|
- getLocalityWaitFactor(priority, rmContext.getScheduler()
|
|
|
+ getLocalityWaitFactor(schedulerKey, rmContext.getScheduler()
|
|
|
.getNumClusterNodes());
|
|
|
- // Cap the delay by the number of nodes in the cluster. Under most conditions
|
|
|
- // this means we will consider each node in the cluster before
|
|
|
+ // Cap the delay by the number of nodes in the cluster. Under most
|
|
|
+ // conditions this means we will consider each node in the cluster before
|
|
|
// accepting an off-switch assignment.
|
|
|
return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
|
|
|
(requiredContainers * localityWaitFactor)) < missedOpportunities);
|
|
@@ -258,7 +262,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
// Check if we need containers on this rack
|
|
|
ResourceRequest rackLocalRequest =
|
|
|
- application.getResourceRequest(priority, node.getRackName());
|
|
|
+ application.getResourceRequest(schedulerKey, node.getRackName());
|
|
|
if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
|
|
|
return false;
|
|
|
}
|
|
@@ -266,7 +270,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
// If we are here, we do need containers on this rack for RACK_LOCAL req
|
|
|
if (type == NodeType.RACK_LOCAL) {
|
|
|
// 'Delay' rack-local just a little bit...
|
|
|
- long missedOpportunities = application.getSchedulingOpportunities(priority);
|
|
|
+ long missedOpportunities =
|
|
|
+ application.getSchedulingOpportunities(schedulerKey);
|
|
|
return getActualNodeLocalityDelay() < missedOpportunities;
|
|
|
}
|
|
|
|
|
@@ -274,7 +279,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
if (type == NodeType.NODE_LOCAL) {
|
|
|
// Now check if we need containers on this host...
|
|
|
ResourceRequest nodeLocalRequest =
|
|
|
- application.getResourceRequest(priority, node.getNodeName());
|
|
|
+ application.getResourceRequest(schedulerKey, node.getNodeName());
|
|
|
if (nodeLocalRequest != null) {
|
|
|
return nodeLocalRequest.getNumContainers() > 0;
|
|
|
}
|
|
@@ -285,10 +290,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
private ContainerAllocation assignNodeLocalContainers(
|
|
|
Resource clusterResource, ResourceRequest nodeLocalResourceRequest,
|
|
|
- FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
|
|
|
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
|
|
|
- if (canAssign(priority, node, NodeType.NODE_LOCAL, reservedContainer)) {
|
|
|
- return assignContainer(clusterResource, node, priority,
|
|
|
+ FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
|
|
|
+ RMContainer reservedContainer, SchedulingMode schedulingMode,
|
|
|
+ ResourceLimits currentResoureLimits) {
|
|
|
+ if (canAssign(schedulerKey, node, NodeType.NODE_LOCAL, reservedContainer)) {
|
|
|
+ return assignContainer(clusterResource, node, schedulerKey,
|
|
|
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
|
|
|
schedulingMode, currentResoureLimits);
|
|
|
}
|
|
@@ -299,10 +305,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
private ContainerAllocation assignRackLocalContainers(
|
|
|
Resource clusterResource, ResourceRequest rackLocalResourceRequest,
|
|
|
- FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
|
|
|
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
|
|
|
- if (canAssign(priority, node, NodeType.RACK_LOCAL, reservedContainer)) {
|
|
|
- return assignContainer(clusterResource, node, priority,
|
|
|
+ FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
|
|
|
+ RMContainer reservedContainer, SchedulingMode schedulingMode,
|
|
|
+ ResourceLimits currentResoureLimits) {
|
|
|
+ if (canAssign(schedulerKey, node, NodeType.RACK_LOCAL, reservedContainer)) {
|
|
|
+ return assignContainer(clusterResource, node, schedulerKey,
|
|
|
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
|
|
|
schedulingMode, currentResoureLimits);
|
|
|
}
|
|
@@ -313,10 +320,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
private ContainerAllocation assignOffSwitchContainers(
|
|
|
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
|
|
|
- FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
|
|
|
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
|
|
|
- if (canAssign(priority, node, NodeType.OFF_SWITCH, reservedContainer)) {
|
|
|
- return assignContainer(clusterResource, node, priority,
|
|
|
+ FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
|
|
|
+ RMContainer reservedContainer, SchedulingMode schedulingMode,
|
|
|
+ ResourceLimits currentResoureLimits) {
|
|
|
+ if (canAssign(schedulerKey, node, NodeType.OFF_SWITCH, reservedContainer)) {
|
|
|
+ return assignContainer(clusterResource, node, schedulerKey,
|
|
|
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
|
|
|
schedulingMode, currentResoureLimits);
|
|
|
}
|
|
@@ -327,20 +335,21 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
private ContainerAllocation assignContainersOnNode(Resource clusterResource,
|
|
|
- FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer,
|
|
|
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
|
|
|
+ FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
|
|
|
+ RMContainer reservedContainer, SchedulingMode schedulingMode,
|
|
|
+ ResourceLimits currentResoureLimits) {
|
|
|
|
|
|
ContainerAllocation allocation;
|
|
|
|
|
|
NodeType requestType = null;
|
|
|
// Data-local
|
|
|
ResourceRequest nodeLocalResourceRequest =
|
|
|
- application.getResourceRequest(priority, node.getNodeName());
|
|
|
+ application.getResourceRequest(schedulerKey, node.getNodeName());
|
|
|
if (nodeLocalResourceRequest != null) {
|
|
|
requestType = NodeType.NODE_LOCAL;
|
|
|
allocation =
|
|
|
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
|
|
|
- node, priority, reservedContainer, schedulingMode,
|
|
|
+ node, schedulerKey, reservedContainer, schedulingMode,
|
|
|
currentResoureLimits);
|
|
|
if (Resources.greaterThan(rc, clusterResource,
|
|
|
allocation.getResourceToBeAllocated(), Resources.none())) {
|
|
@@ -351,7 +360,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
// Rack-local
|
|
|
ResourceRequest rackLocalResourceRequest =
|
|
|
- application.getResourceRequest(priority, node.getRackName());
|
|
|
+ application.getResourceRequest(schedulerKey, node.getRackName());
|
|
|
if (rackLocalResourceRequest != null) {
|
|
|
if (!rackLocalResourceRequest.getRelaxLocality()) {
|
|
|
return ContainerAllocation.PRIORITY_SKIPPED;
|
|
@@ -363,7 +372,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
allocation =
|
|
|
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
|
|
|
- node, priority, reservedContainer, schedulingMode,
|
|
|
+ node, schedulerKey, reservedContainer, schedulingMode,
|
|
|
currentResoureLimits);
|
|
|
if (Resources.greaterThan(rc, clusterResource,
|
|
|
allocation.getResourceToBeAllocated(), Resources.none())) {
|
|
@@ -374,7 +383,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
// Off-switch
|
|
|
ResourceRequest offSwitchResourceRequest =
|
|
|
- application.getResourceRequest(priority, ResourceRequest.ANY);
|
|
|
+ application.getResourceRequest(schedulerKey, ResourceRequest.ANY);
|
|
|
if (offSwitchResourceRequest != null) {
|
|
|
if (!offSwitchResourceRequest.getRelaxLocality()) {
|
|
|
return ContainerAllocation.PRIORITY_SKIPPED;
|
|
@@ -386,7 +395,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
allocation =
|
|
|
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
|
|
|
- node, priority, reservedContainer, schedulingMode,
|
|
|
+ node, schedulerKey, reservedContainer, schedulingMode,
|
|
|
currentResoureLimits);
|
|
|
allocation.requestNodeType = requestType;
|
|
|
|
|
@@ -403,21 +412,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
private ContainerAllocation assignContainer(Resource clusterResource,
|
|
|
- FiCaSchedulerNode node, Priority priority, ResourceRequest request,
|
|
|
- NodeType type, RMContainer rmContainer, SchedulingMode schedulingMode,
|
|
|
- ResourceLimits currentResoureLimits) {
|
|
|
+ FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
|
|
|
+ ResourceRequest request, NodeType type, RMContainer rmContainer,
|
|
|
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
|
|
|
lastResourceRequest = request;
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("assignContainers: node=" + node.getNodeName()
|
|
|
- + " application=" + application.getApplicationId()
|
|
|
- + " priority=" + priority.getPriority()
|
|
|
- + " request=" + request + " type=" + type);
|
|
|
+ + " application=" + application.getApplicationId()
|
|
|
+ + " priority=" + schedulerKey.getPriority()
|
|
|
+ + " request=" + request + " type=" + type);
|
|
|
}
|
|
|
|
|
|
// check if the resource request can access the label
|
|
|
if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
|
|
|
- request.getNodeLabelExpression(), node.getPartition(), schedulingMode)) {
|
|
|
+ request.getNodeLabelExpression(), node.getPartition(),
|
|
|
+ schedulingMode)) {
|
|
|
// this is a reserved container, but we cannot allocate it now according
|
|
|
// to label not match. This can be caused by node label changed
|
|
|
// We should un-reserve this container.
|
|
@@ -439,7 +449,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
|
|
|
- priority, capability);
|
|
|
+ schedulerKey, capability);
|
|
|
|
|
|
// Can we allocate a container on this node?
|
|
|
long availableContainers =
|
|
@@ -504,8 +514,8 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
resourceNeedToUnReserve = capability;
|
|
|
}
|
|
|
unreservedContainer =
|
|
|
- application.findNodeToUnreserve(clusterResource, node, priority,
|
|
|
- resourceNeedToUnReserve);
|
|
|
+ application.findNodeToUnreserve(clusterResource, node,
|
|
|
+ schedulerKey, resourceNeedToUnReserve);
|
|
|
// When (minimum-unreserved-resource > 0 OR we cannot allocate
|
|
|
// new/reserved
|
|
|
// container (That means we *have to* unreserve some resource to
|
|
@@ -553,28 +563,28 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- boolean
|
|
|
- shouldAllocOrReserveNewContainer(Priority priority, Resource required) {
|
|
|
- int requiredContainers = application.getTotalRequiredResources(priority);
|
|
|
- int reservedContainers = application.getNumReservedContainers(priority);
|
|
|
+ boolean shouldAllocOrReserveNewContainer(
|
|
|
+ SchedulerRequestKey schedulerKey, Resource required) {
|
|
|
+ int requiredContainers =
|
|
|
+ application.getTotalRequiredResources(schedulerKey);
|
|
|
+ int reservedContainers = application.getNumReservedContainers(schedulerKey);
|
|
|
int starvation = 0;
|
|
|
if (reservedContainers > 0) {
|
|
|
- float nodeFactor =
|
|
|
- Resources
|
|
|
- .ratio(rc, required, application.getCSLeafQueue().getMaximumAllocation());
|
|
|
+ float nodeFactor = Resources.ratio(
|
|
|
+ rc, required, application.getCSLeafQueue().getMaximumAllocation());
|
|
|
|
|
|
// Use percentage of node required to bias against large containers...
|
|
|
// Protect against corner case where you need the whole node with
|
|
|
// Math.min(nodeFactor, minimumAllocationFactor)
|
|
|
starvation =
|
|
|
- (int) ((application.getReReservations(priority) /
|
|
|
+ (int) ((application.getReReservations(schedulerKey) /
|
|
|
(float) reservedContainers) * (1.0f - (Math.min(
|
|
|
nodeFactor, application.getCSLeafQueue()
|
|
|
.getMinimumAllocationFactor()))));
|
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("needsContainers:" + " app.#re-reserve="
|
|
|
- + application.getReReservations(priority) + " reserved="
|
|
|
+ + application.getReReservations(schedulerKey) + " reserved="
|
|
|
+ reservedContainers + " nodeFactor=" + nodeFactor
|
|
|
+ " minAllocFactor="
|
|
|
+ application.getCSLeafQueue().getMinimumAllocationFactor()
|
|
@@ -585,13 +595,14 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
private Container getContainer(RMContainer rmContainer,
|
|
|
- FiCaSchedulerNode node, Resource capability, Priority priority) {
|
|
|
+ FiCaSchedulerNode node, Resource capability,
|
|
|
+ SchedulerRequestKey schedulerKey) {
|
|
|
return (rmContainer != null) ? rmContainer.getContainer()
|
|
|
- : createContainer(node, capability, priority);
|
|
|
+ : createContainer(node, capability, schedulerKey);
|
|
|
}
|
|
|
|
|
|
private Container createContainer(FiCaSchedulerNode node, Resource capability,
|
|
|
- Priority priority) {
|
|
|
+ SchedulerRequestKey schedulerKey) {
|
|
|
|
|
|
NodeId nodeId = node.getRMNode().getNodeID();
|
|
|
ContainerId containerId =
|
|
@@ -600,22 +611,23 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
// Create the container
|
|
|
return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
|
|
|
- .getHttpAddress(), capability, priority, null);
|
|
|
+ .getHttpAddress(), capability, schedulerKey.getPriority(), null);
|
|
|
}
|
|
|
|
|
|
private ContainerAllocation handleNewContainerAllocation(
|
|
|
ContainerAllocation allocationResult, FiCaSchedulerNode node,
|
|
|
- Priority priority, RMContainer reservedContainer, Container container) {
|
|
|
+ SchedulerRequestKey schedulerKey, RMContainer reservedContainer,
|
|
|
+ Container container) {
|
|
|
// Handling container allocation
|
|
|
// Did we previously reserve containers at this 'priority'?
|
|
|
if (reservedContainer != null) {
|
|
|
- application.unreserve(priority, node, reservedContainer);
|
|
|
+ application.unreserve(schedulerKey, node, reservedContainer);
|
|
|
}
|
|
|
|
|
|
// Inform the application
|
|
|
RMContainer allocatedContainer =
|
|
|
application.allocate(allocationResult.containerNodeType, node,
|
|
|
- priority, lastResourceRequest, container);
|
|
|
+ schedulerKey, lastResourceRequest, container);
|
|
|
|
|
|
// Does the application need this resource?
|
|
|
if (allocatedContainer == null) {
|
|
@@ -637,12 +649,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
ContainerAllocation doAllocation(ContainerAllocation allocationResult,
|
|
|
- FiCaSchedulerNode node, Priority priority,
|
|
|
+ FiCaSchedulerNode node, SchedulerRequestKey schedulerKey,
|
|
|
RMContainer reservedContainer) {
|
|
|
// Create the container if necessary
|
|
|
Container container =
|
|
|
getContainer(reservedContainer, node,
|
|
|
- allocationResult.getResourceToBeAllocated(), priority);
|
|
|
+ allocationResult.getResourceToBeAllocated(), schedulerKey);
|
|
|
|
|
|
// something went wrong getting/creating the container
|
|
|
if (container == null) {
|
|
@@ -655,11 +667,11 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) {
|
|
|
// When allocating container
|
|
|
allocationResult =
|
|
|
- handleNewContainerAllocation(allocationResult, node, priority,
|
|
|
+ handleNewContainerAllocation(allocationResult, node, schedulerKey,
|
|
|
reservedContainer, container);
|
|
|
} else {
|
|
|
// When reserving container
|
|
|
- application.reserve(priority, node, reservedContainer, container);
|
|
|
+ application.reserve(schedulerKey, node, reservedContainer, container);
|
|
|
}
|
|
|
allocationResult.updatedContainer = container;
|
|
|
|
|
@@ -678,14 +690,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
// RACK_LOCAL without delay.
|
|
|
if (allocationResult.containerNodeType == NodeType.NODE_LOCAL
|
|
|
|| application.getCSLeafQueue().getRackLocalityFullReset()) {
|
|
|
- application.resetSchedulingOpportunities(priority);
|
|
|
+ application.resetSchedulingOpportunities(schedulerKey);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Non-exclusive scheduling opportunity is different: we need reset
|
|
|
// it every time to make sure non-labeled resource request will be
|
|
|
// most likely allocated on non-labeled nodes first.
|
|
|
- application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
|
|
|
+ application.resetMissedNonPartitionedRequestSchedulingOpportunity(
|
|
|
+ schedulerKey);
|
|
|
}
|
|
|
|
|
|
return allocationResult;
|
|
@@ -693,15 +706,15 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
|
|
|
private ContainerAllocation allocate(Resource clusterResource,
|
|
|
FiCaSchedulerNode node, SchedulingMode schedulingMode,
|
|
|
- ResourceLimits resourceLimits, Priority priority,
|
|
|
+ ResourceLimits resourceLimits, SchedulerRequestKey schedulerKey,
|
|
|
RMContainer reservedContainer) {
|
|
|
ContainerAllocation result =
|
|
|
preAllocation(clusterResource, node, schedulingMode, resourceLimits,
|
|
|
- priority, reservedContainer);
|
|
|
+ schedulerKey, reservedContainer);
|
|
|
|
|
|
if (AllocationState.ALLOCATED == result.state
|
|
|
|| AllocationState.RESERVED == result.state) {
|
|
|
- result = doAllocation(result, node, priority, reservedContainer);
|
|
|
+ result = doAllocation(result, node, schedulerKey, reservedContainer);
|
|
|
}
|
|
|
|
|
|
return result;
|
|
@@ -725,10 +738,10 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
}
|
|
|
|
|
|
// Schedule in priority order
|
|
|
- for (Priority priority : application.getPriorities()) {
|
|
|
+ for (SchedulerRequestKey schedulerKey : application.getSchedulerKeys()) {
|
|
|
ContainerAllocation result =
|
|
|
allocate(clusterResource, node, schedulingMode, resourceLimits,
|
|
|
- priority, null);
|
|
|
+ schedulerKey, null);
|
|
|
|
|
|
AllocationState allocationState = result.getAllocationState();
|
|
|
if (allocationState == AllocationState.PRIORITY_SKIPPED) {
|
|
@@ -744,7 +757,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
} else {
|
|
|
ContainerAllocation result =
|
|
|
allocate(clusterResource, node, schedulingMode, resourceLimits,
|
|
|
- reservedContainer.getReservedPriority(), reservedContainer);
|
|
|
+ reservedContainer.getReservedSchedulerKey(), reservedContainer);
|
|
|
return getCSAssignmentFromAllocateResult(clusterResource, result,
|
|
|
reservedContainer);
|
|
|
}
|