|
@@ -453,8 +453,7 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public List<QueueUserACLInfo>
|
|
|
|
- getQueueUserAclInfo(UserGroupInformation user) {
|
|
|
|
|
|
+ public List<QueueUserACLInfo> getQueueUserAclInfo(UserGroupInformation user) {
|
|
readLock.lock();
|
|
readLock.lock();
|
|
try {
|
|
try {
|
|
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
|
|
QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
|
|
@@ -527,8 +526,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws
|
|
|
|
- IOException {
|
|
|
|
|
|
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
|
|
|
|
+ throws IOException {
|
|
|
|
|
|
writeLock.lock();
|
|
writeLock.lock();
|
|
try {
|
|
try {
|
|
@@ -634,7 +633,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
|
|
|
|
// Check submission limits for queues
|
|
// Check submission limits for queues
|
|
//TODO recalculate max applications because they can depend on capacity
|
|
//TODO recalculate max applications because they can depend on capacity
|
|
- if (getNumApplications() >= getMaxApplications() && !(this instanceof AutoCreatedLeafQueue)) {
|
|
|
|
|
|
+ if (getNumApplications() >= getMaxApplications() &&
|
|
|
|
+ !(this instanceof AutoCreatedLeafQueue)) {
|
|
String msg =
|
|
String msg =
|
|
"Queue " + getQueuePath() + " already has " + getNumApplications()
|
|
"Queue " + getQueuePath() + " already has " + getNumApplications()
|
|
+ " applications,"
|
|
+ " applications,"
|
|
@@ -646,7 +646,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
// Check submission limits for the user on this queue
|
|
// Check submission limits for the user on this queue
|
|
User user = usersManager.getUserAndAddIfAbsent(userName);
|
|
User user = usersManager.getUserAndAddIfAbsent(userName);
|
|
//TODO recalculate max applications because they can depend on capacity
|
|
//TODO recalculate max applications because they can depend on capacity
|
|
- if (user.getTotalApplications() >= getMaxApplicationsPerUser() && !(this instanceof AutoCreatedLeafQueue)) {
|
|
|
|
|
|
+ if (user.getTotalApplications() >= getMaxApplicationsPerUser() &&
|
|
|
|
+ !(this instanceof AutoCreatedLeafQueue)) {
|
|
String msg = "Queue " + getQueuePath() + " already has " + user
|
|
String msg = "Queue " + getQueuePath() + " already has " + user
|
|
.getTotalApplications() + " applications from user " + userName
|
|
.getTotalApplications() + " applications from user " + userName
|
|
+ " cannot accept submission of application: " + applicationId;
|
|
+ " cannot accept submission of application: " + applicationId;
|
|
@@ -825,10 +826,9 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
calculateAndGetAMResourceLimitPerPartition(nodePartition);
|
|
calculateAndGetAMResourceLimitPerPartition(nodePartition);
|
|
}
|
|
}
|
|
|
|
|
|
- for (Iterator<FiCaSchedulerApp> fsApp =
|
|
|
|
- getPendingAppsOrderingPolicy()
|
|
|
|
|
|
+ for (Iterator<FiCaSchedulerApp> fsApp = getPendingAppsOrderingPolicy()
|
|
.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
|
|
.getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
|
|
- fsApp.hasNext(); ) {
|
|
|
|
|
|
+ fsApp.hasNext();) {
|
|
FiCaSchedulerApp application = fsApp.next();
|
|
FiCaSchedulerApp application = fsApp.next();
|
|
ApplicationId applicationId = application.getApplicationId();
|
|
ApplicationId applicationId = application.getApplicationId();
|
|
|
|
|
|
@@ -864,7 +864,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
+ " skipping enforcement to allow at least one application"
|
|
+ " skipping enforcement to allow at least one application"
|
|
+ " to start");
|
|
+ " to start");
|
|
} else{
|
|
} else{
|
|
- application.updateAMContainerDiagnostics(SchedulerApplicationAttempt.AMState.INACTIVATED,
|
|
|
|
|
|
+ application.updateAMContainerDiagnostics(
|
|
|
|
+ SchedulerApplicationAttempt.AMState.INACTIVATED,
|
|
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
|
|
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
|
|
LOG.debug("Not activating application {} as amIfStarted: {}"
|
|
LOG.debug("Not activating application {} as amIfStarted: {}"
|
|
+ " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
|
|
+ " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
|
|
@@ -1189,9 +1190,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
boolean needAssignToQueueCheck = true;
|
|
boolean needAssignToQueueCheck = true;
|
|
IteratorSelector sel = new IteratorSelector();
|
|
IteratorSelector sel = new IteratorSelector();
|
|
sel.setPartition(candidates.getPartition());
|
|
sel.setPartition(candidates.getPartition());
|
|
- for (Iterator<FiCaSchedulerApp> assignmentIterator =
|
|
|
|
- orderingPolicy.getAssignmentIterator(sel);
|
|
|
|
- assignmentIterator.hasNext(); ) {
|
|
|
|
|
|
+ for (Iterator<FiCaSchedulerApp> assignmentIterator = orderingPolicy.getAssignmentIterator(sel);
|
|
|
|
+ assignmentIterator.hasNext();) {
|
|
FiCaSchedulerApp application = assignmentIterator.next();
|
|
FiCaSchedulerApp application = assignmentIterator.next();
|
|
|
|
|
|
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
|
|
ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
|
|
@@ -1821,13 +1821,8 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
|
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
|
|
RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
|
|
RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
|
|
RMNodeLabelsManager.NO_LABEL)) {
|
|
RMNodeLabelsManager.NO_LABEL)) {
|
|
- TreeSet<RMContainer> rmContainers = null;
|
|
|
|
- if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(
|
|
|
|
- nodePartition))) {
|
|
|
|
- rmContainers = new TreeSet<>();
|
|
|
|
- ignorePartitionExclusivityRMContainers.put(nodePartition,
|
|
|
|
- rmContainers);
|
|
|
|
- }
|
|
|
|
|
|
+ TreeSet<RMContainer> rmContainers = ignorePartitionExclusivityRMContainers.computeIfAbsent(
|
|
|
|
+ nodePartition, k -> new TreeSet<>());
|
|
rmContainers.add(rmContainer);
|
|
rmContainers.add(rmContainer);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -2195,8 +2190,7 @@ public class AbstractLeafQueue extends AbstractCSQueue {
|
|
* @return all ignored partition exclusivity RMContainers in the LeafQueue,
|
|
* @return all ignored partition exclusivity RMContainers in the LeafQueue,
|
|
* this will be used by preemption policy.
|
|
* this will be used by preemption policy.
|
|
*/
|
|
*/
|
|
- public Map<String, TreeSet<RMContainer>>
|
|
|
|
- getIgnoreExclusivityRMContainers() {
|
|
|
|
|
|
+ public Map<String, TreeSet<RMContainer>> getIgnoreExclusivityRMContainers() {
|
|
Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
|
|
Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
|
|
|
|
|
|
readLock.lock();
|
|
readLock.lock();
|