|
@@ -85,9 +85,7 @@ public class QueueManager {
|
|
|
* could be referred to as just "parent1.queue2".
|
|
|
*/
|
|
|
public FSLeafQueue getLeafQueue(String name, boolean create) {
|
|
|
- if (!name.startsWith(ROOT_QUEUE + ".")) {
|
|
|
- name = ROOT_QUEUE + "." + name;
|
|
|
- }
|
|
|
+ name = ensureRootPrefix(name);
|
|
|
synchronized (queues) {
|
|
|
FSQueue queue = queues.get(name);
|
|
|
if (queue == null && create) {
|
|
@@ -174,13 +172,107 @@ public class QueueManager {
|
|
|
return leafQueue;
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Make way for the given leaf queue if possible, by removing incompatible
|
|
|
+ * queues with no apps in them. Incompatibility could be due to
|
|
|
+ * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
|
|
|
+ * the ancestry of leafToCreate.
|
|
|
+ *
|
|
|
+ * We will never remove the root queue or the default queue in this way.
|
|
|
+ *
|
|
|
+ * @return true if we can create leafToCreate or it already exists.
|
|
|
+ */
|
|
|
+ private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
|
|
|
+ leafToCreate = ensureRootPrefix(leafToCreate);
|
|
|
+
|
|
|
+ // Ensure leafToCreate is not root and doesn't have the default queue in its
|
|
|
+ // ancestry.
|
|
|
+ if (leafToCreate.equals(ROOT_QUEUE) ||
|
|
|
+ leafToCreate.startsWith(
|
|
|
+ ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ FSQueue queue = queues.get(leafToCreate);
|
|
|
+ // Queue exists already.
|
|
|
+ if (queue != null) {
|
|
|
+ if (queue instanceof FSLeafQueue) {
|
|
|
+ // If it's an already existing leaf, we're ok.
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+ // If it's an existing parent queue, remove it if it's empty.
|
|
|
+ return removeQueueIfEmpty(queue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Queue doesn't exist already. Check if the new queue would be created
|
|
|
+ // under an existing leaf queue. If so, try removing that leaf queue.
|
|
|
+ int sepIndex = leafToCreate.length();
|
|
|
+ sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
|
|
|
+ while (sepIndex != -1) {
|
|
|
+ String prefixString = leafToCreate.substring(0, sepIndex);
|
|
|
+ FSQueue prefixQueue = queues.get(prefixString);
|
|
|
+ if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
|
|
|
+ return removeQueueIfEmpty(prefixQueue);
|
|
|
+ }
|
|
|
+ sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Remove the queue if it and its descendents are all empty.
|
|
|
+ * @param queue
|
|
|
+ * @return true if removed, false otherwise
|
|
|
+ */
|
|
|
+ private boolean removeQueueIfEmpty(FSQueue queue) {
|
|
|
+ if (isEmpty(queue)) {
|
|
|
+ removeQueue(queue);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Remove a queue and all its descendents.
|
|
|
+ */
|
|
|
+ private void removeQueue(FSQueue queue) {
|
|
|
+ if (queue instanceof FSLeafQueue) {
|
|
|
+ leafQueues.remove(queue);
|
|
|
+ } else {
|
|
|
+ List<FSQueue> childQueues = queue.getChildQueues();
|
|
|
+ while (!childQueues.isEmpty()) {
|
|
|
+ removeQueue(childQueues.get(0));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ queues.remove(queue.getName());
|
|
|
+ queue.getParent().getChildQueues().remove(queue);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Returns true if there are no applications, running or not, in the given
|
|
|
+ * queue or any of its descendents.
|
|
|
+ */
|
|
|
+ protected boolean isEmpty(FSQueue queue) {
|
|
|
+ if (queue instanceof FSLeafQueue) {
|
|
|
+ FSLeafQueue leafQueue = (FSLeafQueue)queue;
|
|
|
+ return queue.getNumRunnableApps() == 0 &&
|
|
|
+ leafQueue.getNonRunnableAppSchedulables().isEmpty();
|
|
|
+ } else {
|
|
|
+ for (FSQueue child : queue.getChildQueues()) {
|
|
|
+ if (!isEmpty(child)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Gets a queue by name.
|
|
|
*/
|
|
|
public FSQueue getQueue(String name) {
|
|
|
- if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
|
|
|
- name = ROOT_QUEUE + "." + name;
|
|
|
- }
|
|
|
+ name = ensureRootPrefix(name);
|
|
|
synchronized (queues) {
|
|
|
return queues.get(name);
|
|
|
}
|
|
@@ -190,9 +282,7 @@ public class QueueManager {
|
|
|
* Return whether a queue exists already.
|
|
|
*/
|
|
|
public boolean exists(String name) {
|
|
|
- if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
|
|
|
- name = ROOT_QUEUE + "." + name;
|
|
|
- }
|
|
|
+ name = ensureRootPrefix(name);
|
|
|
synchronized (queues) {
|
|
|
return queues.containsKey(name);
|
|
|
}
|
|
@@ -214,10 +304,19 @@ public class QueueManager {
|
|
|
return queues.values();
|
|
|
}
|
|
|
|
|
|
+ private String ensureRootPrefix(String name) {
|
|
|
+ if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
|
|
|
+ name = ROOT_QUEUE + "." + name;
|
|
|
+ }
|
|
|
+ return name;
|
|
|
+ }
|
|
|
+
|
|
|
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
|
|
// Make sure all queues exist
|
|
|
for (String name : queueConf.getQueueNames()) {
|
|
|
- getLeafQueue(name, true);
|
|
|
+ if (removeEmptyIncompatibleQueues(name)) {
|
|
|
+ getLeafQueue(name, true);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
for (FSQueue queue : queues.values()) {
|