|
@@ -178,11 +178,22 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
// This is to make sure non-partitioned-resource-request will prefer
|
|
|
// to be allocated to non-partitioned nodes
|
|
|
int missedNonPartitionedRequestSchedulingOpportunity = 0;
|
|
|
+ SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS =
|
|
|
+ appInfo.getSchedulingPlacementSet(schedulerKey);
|
|
|
+ if (null == schedulingPS){
|
|
|
+ // This is possible when #pending resource decreased by a different
|
|
|
+ // thread.
|
|
|
+ ActivitiesLogger.APP.recordSkippedAppActivityWithoutAllocation(
|
|
|
+ activitiesManager, node, application, priority,
|
|
|
+ ActivityDiagnosticConstant.PRIORITY_SKIPPED_BECAUSE_NULL_ANY_REQUEST);
|
|
|
+ return ContainerAllocation.PRIORITY_SKIPPED;
|
|
|
+ }
|
|
|
+ String requestPartition =
|
|
|
+ schedulingPS.getPrimaryRequestedNodePartition();
|
|
|
+
|
|
|
// Only do this when request associated with given scheduler key accepts
|
|
|
// NO_LABEL under RESPECT_EXCLUSIVITY mode
|
|
|
- if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL,
|
|
|
- appInfo.getSchedulingPlacementSet(schedulerKey)
|
|
|
- .getPrimaryRequestedNodePartition())) {
|
|
|
+ if (StringUtils.equals(RMNodeLabelsManager.NO_LABEL, requestPartition)) {
|
|
|
missedNonPartitionedRequestSchedulingOpportunity =
|
|
|
application.addMissedNonPartitionedRequestSchedulingOpportunity(
|
|
|
schedulerKey);
|
|
@@ -260,12 +271,9 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
return result;
|
|
|
}
|
|
|
|
|
|
- public float getLocalityWaitFactor(
|
|
|
- SchedulerRequestKey schedulerKey, int clusterNodes) {
|
|
|
+ public float getLocalityWaitFactor(int uniqAsks, int clusterNodes) {
|
|
|
// Estimate: Required unique resources (i.e. hosts + racks)
|
|
|
- int requiredResources = Math.max(
|
|
|
- application.getSchedulingPlacementSet(schedulerKey)
|
|
|
- .getUniqueLocationAsks() - 1, 0);
|
|
|
+ int requiredResources = Math.max(uniqAsks - 1, 0);
|
|
|
|
|
|
// waitFactor can't be more than '1'
|
|
|
// i.e. no point skipping more than clustersize opportunities
|
|
@@ -295,10 +303,16 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
if (rmContext.getScheduler().getNumClusterNodes() == 0) {
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
+ int uniqLocationAsks = 0;
|
|
|
+ SchedulingPlacementSet<FiCaSchedulerNode> schedulingPS =
|
|
|
+ appInfo.getSchedulingPlacementSet(schedulerKey);
|
|
|
+ if (schedulingPS != null) {
|
|
|
+ uniqLocationAsks = schedulingPS.getUniqueLocationAsks();
|
|
|
+ }
|
|
|
// If we have only ANY requests for this schedulerKey, we should not
|
|
|
// delay its scheduling.
|
|
|
- if (application.getSchedulingPlacementSet(schedulerKey)
|
|
|
- .getUniqueLocationAsks() == 1) {
|
|
|
+ if (uniqLocationAsks == 1) {
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -312,7 +326,7 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
} else {
|
|
|
long requiredContainers = application.getOutstandingAsksCount(
|
|
|
schedulerKey);
|
|
|
- float localityWaitFactor = getLocalityWaitFactor(schedulerKey,
|
|
|
+ float localityWaitFactor = getLocalityWaitFactor(uniqLocationAsks,
|
|
|
rmContext.getScheduler().getNumClusterNodes());
|
|
|
// Cap the delay by the number of nodes in the cluster.
|
|
|
return (Math.min(rmContext.getScheduler().getNumClusterNodes(),
|
|
@@ -825,6 +839,12 @@ public class RegularContainerAllocator extends AbstractContainerAllocator {
|
|
|
application.getAppSchedulingInfo().getSchedulingPlacementSet(
|
|
|
schedulerKey);
|
|
|
|
|
|
+ // This could be null when #pending request decreased by another thread.
|
|
|
+ if (schedulingPS == null) {
|
|
|
+ return new ContainerAllocation(reservedContainer, null,
|
|
|
+ AllocationState.QUEUE_SKIPPED);
|
|
|
+ }
|
|
|
+
|
|
|
result = ContainerAllocation.PRIORITY_SKIPPED;
|
|
|
|
|
|
Iterator<FiCaSchedulerNode> iter = schedulingPS.getPreferredNodeIterator(
|