|
@@ -143,7 +143,6 @@ public class QueueManager {
|
|
|
if (leafQueue == null) {
|
|
|
return null;
|
|
|
}
|
|
|
- leafQueue.setSchedulingMode(info.defaultSchedulingMode);
|
|
|
queue = leafQueue;
|
|
|
} else if (queue instanceof FSParentQueue) {
|
|
|
return null;
|
|
@@ -302,7 +301,7 @@ public class QueueManager {
|
|
|
Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
|
|
|
Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
|
|
|
Map<String, Double> queueWeights = new HashMap<String, Double>();
|
|
|
- Map<String, SchedulingMode> queueModes = new HashMap<String, SchedulingMode>();
|
|
|
+ Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
|
|
|
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
|
|
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls =
|
|
|
new HashMap<String, Map<QueueACL, AccessControlList>>();
|
|
@@ -310,7 +309,7 @@ public class QueueManager {
|
|
|
int queueMaxAppsDefault = Integer.MAX_VALUE;
|
|
|
long fairSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
- SchedulingMode defaultSchedulingMode = SchedulingMode.getDefault();
|
|
|
+ SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
|
|
|
|
|
|
// Remember all queue names so we can display them on web UI, etc.
|
|
|
List<String> queueNamesInAllocFile = new ArrayList<String>();
|
|
@@ -339,7 +338,7 @@ public class QueueManager {
|
|
|
if ("queue".equals(element.getTagName()) ||
|
|
|
"pool".equals(element.getTagName())) {
|
|
|
loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
|
|
|
- userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
|
|
|
+ userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
|
|
|
queueAcls, queueNamesInAllocFile);
|
|
|
} else if ("user".equals(element.getTagName())) {
|
|
|
String userName = element.getAttribute("name");
|
|
@@ -370,11 +369,12 @@ public class QueueManager {
|
|
|
} else if ("queueMaxAppsDefault".equals(element.getTagName())) {
|
|
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
|
int val = Integer.parseInt(text);
|
|
|
- queueMaxAppsDefault = val;}
|
|
|
- else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
|
|
|
+ queueMaxAppsDefault = val;
|
|
|
+ } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
|
|
|
+ || "defaultQueueSchedulingMode".equals(element.getTagName())) {
|
|
|
String text = ((Text)element.getFirstChild()).getData().trim();
|
|
|
- SchedulingMode.setDefault(text);
|
|
|
- defaultSchedulingMode = SchedulingMode.getDefault();
|
|
|
+ SchedulingPolicy.setDefault(text);
|
|
|
+ defaultSchedPolicy = SchedulingPolicy.getDefault();
|
|
|
} else {
|
|
|
LOG.warn("Bad element in allocations file: " + element.getTagName());
|
|
|
}
|
|
@@ -385,7 +385,7 @@ public class QueueManager {
|
|
|
synchronized (this) {
|
|
|
info = new QueueManagerInfo(minQueueResources, maxQueueResources,
|
|
|
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
|
|
|
- queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
|
|
|
+ queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
|
|
|
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
|
|
|
|
|
|
// Root queue should have empty ACLs. As a queue's ACL is the union of
|
|
@@ -396,14 +396,15 @@ public class QueueManager {
|
|
|
rootAcls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(" "));
|
|
|
rootAcls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(" "));
|
|
|
queueAcls.put(ROOT_QUEUE, rootAcls);
|
|
|
-
|
|
|
+
|
|
|
+ // Create all queus
|
|
|
for (String name: queueNamesInAllocFile) {
|
|
|
- FSLeafQueue queue = getLeafQueue(name);
|
|
|
- if (queueModes.containsKey(name)) {
|
|
|
- queue.setSchedulingMode(queueModes.get(name));
|
|
|
- } else {
|
|
|
- queue.setSchedulingMode(defaultSchedulingMode);
|
|
|
- }
|
|
|
+ getLeafQueue(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set custom policies as specified
|
|
|
+ for (Map.Entry<String, SchedulingPolicy> entry : queuePolicies.entrySet()) {
|
|
|
+ queues.get(entry.getKey()).setPolicy(entry.getValue());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -414,7 +415,8 @@ public class QueueManager {
|
|
|
private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
|
|
|
Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
|
|
|
Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
|
|
|
- Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
|
|
|
+ Map<String, SchedulingPolicy> queuePolicies,
|
|
|
+ Map<String, Long> minSharePreemptionTimeouts,
|
|
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
|
|
|
throws AllocationConfigurationException {
|
|
|
String queueName = parentName + "." + element.getAttribute("name");
|
|
@@ -448,9 +450,10 @@ public class QueueManager {
|
|
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
|
|
long val = Long.parseLong(text) * 1000L;
|
|
|
minSharePreemptionTimeouts.put(queueName, val);
|
|
|
- } else if ("schedulingMode".equals(field.getTagName())) {
|
|
|
+ } else if ("schedulingPolicy".equals(field.getTagName())
|
|
|
+ || "schedulingMode".equals(field.getTagName())) {
|
|
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
|
|
- queueModes.put(queueName, SchedulingMode.parse(text));
|
|
|
+ queuePolicies.put(queueName, SchedulingPolicy.parse(text));
|
|
|
} else if ("aclSubmitApps".equals(field.getTagName())) {
|
|
|
String text = ((Text)field.getFirstChild()).getData().trim();
|
|
|
acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
|
|
@@ -459,8 +462,9 @@ public class QueueManager {
|
|
|
acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
|
|
|
} else if ("queue".endsWith(field.getTagName()) ||
|
|
|
"pool".equals(field.getTagName())) {
|
|
|
- loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
|
|
|
- userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
|
|
|
+ loadQueue(queueName, field, minQueueResources, maxQueueResources,
|
|
|
+ queueMaxApps, userMaxApps, queueWeights, queuePolicies,
|
|
|
+ minSharePreemptionTimeouts,
|
|
|
queueAcls, queueNamesInAllocFile);
|
|
|
isLeaf = false;
|
|
|
}
|
|
@@ -615,13 +619,13 @@ public class QueueManager {
|
|
|
// below half its fair share for this long, it is allowed to preempt tasks.
|
|
|
public final long fairSharePreemptionTimeout;
|
|
|
|
|
|
- public final SchedulingMode defaultSchedulingMode;
|
|
|
+ public final SchedulingPolicy defaultSchedulingPolicy;
|
|
|
|
|
|
public QueueManagerInfo(Map<String, Resource> minQueueResources,
|
|
|
Map<String, Resource> maxQueueResources,
|
|
|
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
|
|
|
Map<String, Double> queueWeights, int userMaxAppsDefault,
|
|
|
- int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode,
|
|
|
+ int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
|
|
|
Map<String, Long> minSharePreemptionTimeouts,
|
|
|
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
|
|
|
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
|
|
@@ -632,7 +636,7 @@ public class QueueManager {
|
|
|
this.queueWeights = queueWeights;
|
|
|
this.userMaxAppsDefault = userMaxAppsDefault;
|
|
|
this.queueMaxAppsDefault = queueMaxAppsDefault;
|
|
|
- this.defaultSchedulingMode = defaultSchedulingMode;
|
|
|
+ this.defaultSchedulingPolicy = defaultSchedulingPolicy;
|
|
|
this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
|
|
|
this.queueAcls = queueAcls;
|
|
|
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
|
|
@@ -651,7 +655,7 @@ public class QueueManager {
|
|
|
minSharePreemptionTimeouts = new HashMap<String, Long>();
|
|
|
defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
fairSharePreemptionTimeout = Long.MAX_VALUE;
|
|
|
- defaultSchedulingMode = SchedulingMode.getDefault();
|
|
|
+ defaultSchedulingPolicy = SchedulingPolicy.getDefault();
|
|
|
}
|
|
|
}
|
|
|
}
|