|
@@ -778,6 +778,31 @@ public class CapacityScheduler extends
|
|
|
return this.queueManager.getQueue(queueName);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Returns the normalized queue name, which should be used for internal
|
|
|
+ * queue references. Currently this is the fullQueuename which disambiguously
|
|
|
+ * identifies a queue.
|
|
|
+ * @param name Name of the queue to be normalized
|
|
|
+ * @return The normalized (full name) of the queue
|
|
|
+ */
|
|
|
+ public String normalizeQueueName(String name) {
|
|
|
+ if (this.queueManager == null) {
|
|
|
+ return name;
|
|
|
+ }
|
|
|
+ return this.queueManager.normalizeQueueName(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Determines if a short queue name reference is ambiguous, if there are at
|
|
|
+ * least two queues with the same name, it is considered ambiguous. Otherwise
|
|
|
+ * it is not.
|
|
|
+ * @param queueName The name of the queue to check for ambiguity
|
|
|
+ * @return true if there are at least 2 queues with the same name
|
|
|
+ */
|
|
|
+ public boolean isAmbiguous(String queueName) {
|
|
|
+ return this.queueManager.isAmbiguous(queueName);
|
|
|
+ }
|
|
|
+
|
|
|
private void addApplicationOnRecovery(ApplicationId applicationId,
|
|
|
String queueName, String user,
|
|
|
Priority priority, ApplicationPlacementContext placementContext) {
|
|
@@ -919,9 +944,17 @@ public class CapacityScheduler extends
|
|
|
queueName, placementContext, false);
|
|
|
|
|
|
if (queue == null) {
|
|
|
- final String message =
|
|
|
- "Application " + applicationId + " submitted by user " + user
|
|
|
- + " to unknown queue: " + queueName;
|
|
|
+ String message;
|
|
|
+ if (isAmbiguous(queueName)) {
|
|
|
+ message = "Application " + applicationId
|
|
|
+ + " submitted by user " + user
|
|
|
+ + " to ambiguous queue: " + queueName
|
|
|
+ + " please use full queue path instead.";
|
|
|
+ } else {
|
|
|
+ message =
|
|
|
+ "Application " + applicationId + " submitted by user " + user
|
|
|
+ + " to unknown queue: " + queueName;
|
|
|
+ }
|
|
|
|
|
|
this.rmContext.getDispatcher().getEventHandler().handle(
|
|
|
new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED,
|
|
@@ -955,12 +988,12 @@ public class CapacityScheduler extends
|
|
|
// For a queue which exists already and
|
|
|
// not auto-created above, then its parent queue should match
|
|
|
// the parent queue specified in queue mapping
|
|
|
- } else if (!queue.getParent().getQueueName().equals(
|
|
|
+ } else if (!queue.getParent().getQueueShortName().equals(
|
|
|
placementContext.getParentQueue())) {
|
|
|
String message =
|
|
|
"Auto created Leaf queue " + placementContext.getQueue() + " "
|
|
|
+ "already exists under queue : " + queue
|
|
|
- .getParent().getQueuePath()
|
|
|
+ .getParent().getQueueShortName()
|
|
|
+ ".But Queue mapping configuration " +
|
|
|
CapacitySchedulerConfiguration.QUEUE_MAPPING + " has been "
|
|
|
+ "updated to a different parent queue : "
|
|
@@ -1048,7 +1081,7 @@ public class CapacityScheduler extends
|
|
|
queue.submitApplicationAttempt(attempt, application.getUser());
|
|
|
LOG.info("Added Application Attempt " + applicationAttemptId
|
|
|
+ " to scheduler from user " + application.getUser() + " in queue "
|
|
|
- + queue.getQueueName());
|
|
|
+ + queue.getQueuePath());
|
|
|
if (isAttemptRecovering) {
|
|
|
LOG.debug("{} is recovering. Skipping notifying ATTEMPT_ADDED",
|
|
|
applicationAttemptId);
|
|
@@ -1077,7 +1110,7 @@ public class CapacityScheduler extends
|
|
|
CSQueue queue = (CSQueue) application.getQueue();
|
|
|
if (!(queue instanceof LeafQueue)) {
|
|
|
LOG.error("Cannot finish application " + "from non-leaf queue: " + queue
|
|
|
- .getQueueName());
|
|
|
+ .getQueuePath());
|
|
|
} else{
|
|
|
queue.finishApplication(applicationId, application.getUser());
|
|
|
}
|
|
@@ -1132,13 +1165,14 @@ public class CapacityScheduler extends
|
|
|
attempt.stop(rmAppAttemptFinalState);
|
|
|
|
|
|
// Inform the queue
|
|
|
- String queueName = attempt.getQueue().getQueueName();
|
|
|
- CSQueue queue = this.getQueue(queueName);
|
|
|
- if (!(queue instanceof LeafQueue)) {
|
|
|
+ Queue queue = attempt.getQueue();
|
|
|
+ CSQueue csQueue = (CSQueue) queue;
|
|
|
+ if (!(csQueue instanceof LeafQueue)) {
|
|
|
LOG.error(
|
|
|
- "Cannot finish application " + "from non-leaf queue: " + queueName);
|
|
|
+ "Cannot finish application " + "from non-leaf queue: "
|
|
|
+ + csQueue.getQueuePath());
|
|
|
} else{
|
|
|
- queue.finishApplicationAttempt(attempt, queue.getQueueName());
|
|
|
+ csQueue.finishApplicationAttempt(attempt, csQueue.getQueuePath());
|
|
|
}
|
|
|
} finally {
|
|
|
writeLock.unlock();
|
|
@@ -1260,7 +1294,13 @@ public class CapacityScheduler extends
|
|
|
CSQueue queue = null;
|
|
|
queue = this.getQueue(queueName);
|
|
|
if (queue == null) {
|
|
|
- throw new IOException("Unknown queue: " + queueName);
|
|
|
+ if (isAmbiguous(queueName)) {
|
|
|
+ throw new IOException("Ambiguous queue reference: " + queueName
|
|
|
+ + " please use full queue path instead.");
|
|
|
+ } else {
|
|
|
+ throw new IOException("Unknown queue: " + queueName);
|
|
|
+ }
|
|
|
+
|
|
|
}
|
|
|
return queue.getQueueInfo(includeChildQueues, recursive);
|
|
|
}
|
|
@@ -1516,7 +1556,7 @@ public class CapacityScheduler extends
|
|
|
LOG.error("Trying to schedule on a removed node, please double check, "
|
|
|
+ "nodeId=" + node.getNodeID());
|
|
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
|
|
- "", getRootQueue().getQueueName(), ActivityState.REJECTED,
|
|
|
+ "", getRootQueue().getQueuePath(), ActivityState.REJECTED,
|
|
|
ActivityDiagnosticConstant.INIT_CHECK_SINGLE_NODE_REMOVED);
|
|
|
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
|
|
|
node);
|
|
@@ -1545,7 +1585,7 @@ public class CapacityScheduler extends
|
|
|
LOG.debug("This node " + node.getNodeID() + " doesn't have sufficient "
|
|
|
+ "available or preemptible resource for minimum allocation");
|
|
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
|
|
- "", getRootQueue().getQueueName(), ActivityState.REJECTED,
|
|
|
+ "", getRootQueue().getQueuePath(), ActivityState.REJECTED,
|
|
|
ActivityDiagnosticConstant.
|
|
|
INIT_CHECK_SINGLE_NODE_RESOURCE_INSUFFICIENT);
|
|
|
ActivitiesLogger.NODE.finishSkippedNodeAllocation(activitiesManager,
|
|
@@ -1594,14 +1634,14 @@ public class CapacityScheduler extends
|
|
|
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
|
|
|
|
|
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
|
|
- queue.getParent().getQueueName(), queue.getQueueName(),
|
|
|
+ queue.getParent().getQueuePath(), queue.getQueuePath(),
|
|
|
ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
|
|
|
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
|
|
|
node, reservedContainer.getContainerId(),
|
|
|
AllocationState.ALLOCATED_FROM_RESERVED);
|
|
|
} else if (assignment.getAssignmentInformation().getNumReservations() > 0) {
|
|
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
|
|
|
- queue.getParent().getQueueName(), queue.getQueueName(),
|
|
|
+ queue.getParent().getQueuePath(), queue.getQueuePath(),
|
|
|
ActivityState.RE_RESERVED, ActivityDiagnosticConstant.EMPTY);
|
|
|
ActivitiesLogger.NODE.finishAllocatedNodeAllocation(activitiesManager,
|
|
|
node, reservedContainer.getContainerId(), AllocationState.RESERVED);
|
|
@@ -1693,7 +1733,7 @@ public class CapacityScheduler extends
|
|
|
LOG.debug("This partition '{}' doesn't have available or "
|
|
|
+ "killable resource", candidates.getPartition());
|
|
|
ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, null,
|
|
|
- "", getRootQueue().getQueueName(), ActivityState.REJECTED,
|
|
|
+ "", getRootQueue().getQueuePath(), ActivityState.REJECTED,
|
|
|
ActivityDiagnosticConstant.
|
|
|
INIT_CHECK_PARTITION_RESOURCE_INSUFFICIENT);
|
|
|
ActivitiesLogger.NODE
|
|
@@ -1907,10 +1947,10 @@ public class CapacityScheduler extends
|
|
|
.validateAndApplyQueueManagementChanges(queueManagementChanges);
|
|
|
} catch (SchedulerDynamicEditException sde) {
|
|
|
LOG.error("Queue Management Change event cannot be applied for "
|
|
|
- + "parent queue : " + parentQueue.getQueueName(), sde);
|
|
|
+ + "parent queue : " + parentQueue.getQueuePath(), sde);
|
|
|
} catch (IOException ioe) {
|
|
|
LOG.error("Queue Management Change event cannot be applied for "
|
|
|
- + "parent queue : " + parentQueue.getQueueName(), ioe);
|
|
|
+ + "parent queue : " + parentQueue.getQueuePath(), ioe);
|
|
|
}
|
|
|
}
|
|
|
break;
|
|
@@ -2185,10 +2225,10 @@ public class CapacityScheduler extends
|
|
|
// notify PreemptionManager
|
|
|
// Get the application for the finished container
|
|
|
if (null != application) {
|
|
|
- String leafQueueName = application.getCSLeafQueue().getQueueName();
|
|
|
+ String leafQueuePath = application.getCSLeafQueue().getQueuePath();
|
|
|
getPreemptionManager().addKillableContainer(
|
|
|
new KillableContainer(killableContainer, node.getPartition(),
|
|
|
- leafQueueName));
|
|
|
+ leafQueuePath));
|
|
|
}
|
|
|
}
|
|
|
} finally {
|
|
@@ -2214,10 +2254,10 @@ public class CapacityScheduler extends
|
|
|
// notify PreemptionManager
|
|
|
// Get the application for the finished container
|
|
|
if (null != application) {
|
|
|
- String leafQueueName = application.getCSLeafQueue().getQueueName();
|
|
|
+ String leafQueuePath = application.getCSLeafQueue().getQueuePath();
|
|
|
getPreemptionManager().removeKillableContainer(
|
|
|
new KillableContainer(nonKillableContainer, node.getPartition(),
|
|
|
- leafQueueName));
|
|
|
+ leafQueuePath));
|
|
|
}
|
|
|
} finally {
|
|
|
writeLock.unlock();
|
|
@@ -2287,7 +2327,7 @@ public class CapacityScheduler extends
|
|
|
message));
|
|
|
return null;
|
|
|
}
|
|
|
- if (!queue.getParent().getQueueName().equals(queueName)) {
|
|
|
+ if (!queue.getParent().getQueuePath().equals(queueName)) {
|
|
|
String message =
|
|
|
"Application: " + applicationId + " submitted to a reservation "
|
|
|
+ resQName + " which does not belong to the specified queue: "
|
|
@@ -2372,9 +2412,9 @@ public class CapacityScheduler extends
|
|
|
|
|
|
AbstractManagedParentQueue parent =
|
|
|
(AbstractManagedParentQueue) newQueue.getParent();
|
|
|
- String queuename = newQueue.getQueueName();
|
|
|
+ String queuePath = newQueue.getQueuePath();
|
|
|
parent.addChildQueue(newQueue);
|
|
|
- this.queueManager.addQueue(queuename, newQueue);
|
|
|
+ this.queueManager.addQueue(queuePath, newQueue);
|
|
|
|
|
|
LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");
|
|
|
} finally {
|
|
@@ -2486,7 +2526,9 @@ public class CapacityScheduler extends
|
|
|
if (application == null) {
|
|
|
throw new YarnException("App to be moved " + appId + " not found.");
|
|
|
}
|
|
|
- String sourceQueueName = application.getQueue().getQueueName();
|
|
|
+ Queue queue = application.getQueue();
|
|
|
+ String sourceQueueName = queue instanceof CSQueue ?
|
|
|
+ ((CSQueue) queue).getQueuePath() : queue.getQueueName();
|
|
|
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
|
|
|
String destQueueName = handleMoveToPlanQueue(newQueue);
|
|
|
LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
|
|
@@ -2535,7 +2577,7 @@ public class CapacityScheduler extends
|
|
|
}
|
|
|
if (nonAccessiblelabels.size() > 0) {
|
|
|
throw new YarnException(
|
|
|
- "Specified queue=" + dest.getQueueName() + " can't satisfy following "
|
|
|
+ "Specified queue=" + dest.getQueuePath() + " can't satisfy following "
|
|
|
+ "apps label expressions =" + nonAccessiblelabels
|
|
|
+ " accessible node labels =" + targetqueuelabels);
|
|
|
}
|
|
@@ -2558,7 +2600,12 @@ public class CapacityScheduler extends
|
|
|
}
|
|
|
CSQueue queue = getQueue(queueName);
|
|
|
if (queue == null) {
|
|
|
- LOG.error("Unknown queue: " + queueName);
|
|
|
+ if (isAmbiguous(queueName)) {
|
|
|
+ LOG.error("Ambiguous queue reference: " + queueName
|
|
|
+ + " please use full queue path instead.");
|
|
|
+ } else {
|
|
|
+ LOG.error("Unknown queue: " + queueName);
|
|
|
+ }
|
|
|
return getMaximumResourceCapability();
|
|
|
}
|
|
|
if (!(queue instanceof LeafQueue)) {
|
|
@@ -2600,8 +2647,8 @@ public class CapacityScheduler extends
|
|
|
|
|
|
@Override
|
|
|
public Priority checkAndGetApplicationPriority(
|
|
|
- Priority priorityRequestedByApp, UserGroupInformation user,
|
|
|
- String queueName, ApplicationId applicationId) throws YarnException {
|
|
|
+ Priority priorityRequestedByApp, UserGroupInformation user,
|
|
|
+ String queuePath, ApplicationId applicationId) throws YarnException {
|
|
|
readLock.lock();
|
|
|
try {
|
|
|
Priority appPriority = priorityRequestedByApp;
|
|
@@ -2612,14 +2659,16 @@ public class CapacityScheduler extends
|
|
|
// user's default priority will get precedence over queue default.
|
|
|
// for updateApplicationPriority call flow, this check is done in
|
|
|
// CientRMService itself.
|
|
|
- appPriority = this.appPriorityACLManager.getDefaultPriority(queueName,
|
|
|
+ appPriority = this.appPriorityACLManager.getDefaultPriority(
|
|
|
+ normalizeQueueName(queuePath),
|
|
|
user);
|
|
|
|
|
|
// Get the default priority for the Queue. If Queue is non-existent,
|
|
|
// then
|
|
|
// use default priority. Do it only if user doesn't have any default.
|
|
|
if (null == appPriority) {
|
|
|
- appPriority = this.queueManager.getDefaultPriorityForQueue(queueName);
|
|
|
+ appPriority = this.queueManager.getDefaultPriorityForQueue(
|
|
|
+ normalizeQueueName(queuePath));
|
|
|
}
|
|
|
|
|
|
LOG.info(
|
|
@@ -2637,14 +2686,14 @@ public class CapacityScheduler extends
|
|
|
}
|
|
|
|
|
|
// Lets check for ACLs here.
|
|
|
- if (!appPriorityACLManager.checkAccess(user, queueName, appPriority)) {
|
|
|
+ if (!appPriorityACLManager.checkAccess(user, queuePath, appPriority)) {
|
|
|
throw new YarnException(new AccessControlException(
|
|
|
"User " + user + " does not have permission to submit/update "
|
|
|
+ applicationId + " for " + appPriority));
|
|
|
}
|
|
|
|
|
|
LOG.info("Priority '" + appPriority.getPriority()
|
|
|
- + "' is acceptable in queue : " + queueName + " for application: "
|
|
|
+ + "' is acceptable in queue : " + queuePath + " for application: "
|
|
|
+ applicationId);
|
|
|
|
|
|
return appPriority;
|
|
@@ -3142,7 +3191,12 @@ public class CapacityScheduler extends
|
|
|
public long getMaximumApplicationLifetime(String queueName) {
|
|
|
CSQueue queue = getQueue(queueName);
|
|
|
if (queue == null || !(queue instanceof LeafQueue)) {
|
|
|
- LOG.error("Unknown queue: " + queueName);
|
|
|
+ if (isAmbiguous(queueName)) {
|
|
|
+ LOG.error("Ambiguous queue reference: " + queueName
|
|
|
+ + " please use full queue path instead.");
|
|
|
+ } else {
|
|
|
+ LOG.error("Unknown queue: " + queueName);
|
|
|
+ }
|
|
|
return -1;
|
|
|
}
|
|
|
// In seconds
|