|
@@ -80,14 +80,14 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
|
|
|
|
|
|
- private Queue root;
|
|
|
+ private CSQueue root;
|
|
|
|
|
|
private final static List<Container> EMPTY_CONTAINER_LIST =
|
|
|
new ArrayList<Container>();
|
|
|
|
|
|
- static final Comparator<Queue> queueComparator = new Comparator<Queue>() {
|
|
|
+ static final Comparator<CSQueue> queueComparator = new Comparator<CSQueue>() {
|
|
|
@Override
|
|
|
- public int compare(Queue q1, Queue q2) {
|
|
|
+ public int compare(CSQueue q1, CSQueue q2) {
|
|
|
if (q1.getUtilization() < q2.getUtilization()) {
|
|
|
return -1;
|
|
|
} else if (q1.getUtilization() > q2.getUtilization()) {
|
|
@@ -110,7 +110,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
private ContainerTokenSecretManager containerTokenSecretManager;
|
|
|
private RMContext rmContext;
|
|
|
|
|
|
- private Map<String, Queue> queues = new ConcurrentHashMap<String, Queue>();
|
|
|
+ private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
|
|
|
|
|
|
private Map<NodeId, SchedulerNode> nodes =
|
|
|
new ConcurrentHashMap<NodeId, SchedulerNode>();
|
|
@@ -127,7 +127,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
|
|
|
private boolean initialized = false;
|
|
|
|
|
|
- public Queue getRootQueue() {
|
|
|
+ public CSQueue getRootQueue() {
|
|
|
return root;
|
|
|
}
|
|
|
|
|
@@ -207,7 +207,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
CapacitySchedulerConfiguration.PREFIX + ROOT;
|
|
|
|
|
|
static class QueueHook {
|
|
|
- public Queue hook(Queue queue) {
|
|
|
+ public CSQueue hook(CSQueue queue) {
|
|
|
return queue;
|
|
|
}
|
|
|
}
|
|
@@ -225,8 +225,8 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
private void reinitializeQueues(CapacitySchedulerConfiguration conf)
|
|
|
throws IOException {
|
|
|
// Parse new queues
|
|
|
- Map<String, Queue> newQueues = new HashMap<String, Queue>();
|
|
|
- Queue newRoot =
|
|
|
+ Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
|
|
|
+ CSQueue newRoot =
|
|
|
parseQueue(this, conf, null, ROOT, newQueues, queues,
|
|
|
queueComparator, applicationComparator, noop);
|
|
|
|
|
@@ -247,7 +247,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
*/
|
|
|
@Lock(CapacityScheduler.class)
|
|
|
private void validateExistingQueues(
|
|
|
- Map<String, Queue> queues, Map<String, Queue> newQueues)
|
|
|
+ Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
|
|
throws IOException {
|
|
|
for (String queue : queues.keySet()) {
|
|
|
if (!newQueues.containsKey(queue)) {
|
|
@@ -264,11 +264,11 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
*/
|
|
|
@Lock(CapacityScheduler.class)
|
|
|
private void addNewQueues(
|
|
|
- Map<String, Queue> queues, Map<String, Queue> newQueues)
|
|
|
+ Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
|
|
|
{
|
|
|
- for (Map.Entry<String, Queue> e : newQueues.entrySet()) {
|
|
|
+ for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
|
|
|
String queueName = e.getKey();
|
|
|
- Queue queue = e.getValue();
|
|
|
+ CSQueue queue = e.getValue();
|
|
|
if (!queues.containsKey(queueName)) {
|
|
|
queues.put(queueName, queue);
|
|
|
}
|
|
@@ -276,15 +276,15 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
}
|
|
|
|
|
|
@Lock(CapacityScheduler.class)
|
|
|
- static Queue parseQueue(
|
|
|
+ static CSQueue parseQueue(
|
|
|
CapacitySchedulerContext csContext,
|
|
|
CapacitySchedulerConfiguration conf,
|
|
|
- Queue parent, String queueName, Map<String, Queue> queues,
|
|
|
- Map<String, Queue> oldQueues,
|
|
|
- Comparator<Queue> queueComparator,
|
|
|
+ CSQueue parent, String queueName, Map<String, CSQueue> queues,
|
|
|
+ Map<String, CSQueue> oldQueues,
|
|
|
+ Comparator<CSQueue> queueComparator,
|
|
|
Comparator<SchedulerApp> applicationComparator,
|
|
|
QueueHook hook) {
|
|
|
- Queue queue;
|
|
|
+ CSQueue queue;
|
|
|
String[] childQueueNames =
|
|
|
conf.getQueues((parent == null) ?
|
|
|
queueName : (parent.getQueuePath()+"."+queueName));
|
|
@@ -306,9 +306,9 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
// Used only for unit tests
|
|
|
queue = hook.hook(parentQueue);
|
|
|
|
|
|
- List<Queue> childQueues = new ArrayList<Queue>();
|
|
|
+ List<CSQueue> childQueues = new ArrayList<CSQueue>();
|
|
|
for (String childQueueName : childQueueNames) {
|
|
|
- Queue childQueue =
|
|
|
+ CSQueue childQueue =
|
|
|
parseQueue(csContext, conf, queue, childQueueName,
|
|
|
queues, oldQueues, queueComparator, applicationComparator, hook);
|
|
|
childQueues.add(childQueue);
|
|
@@ -322,7 +322,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
return queue;
|
|
|
}
|
|
|
|
|
|
- synchronized Queue getQueue(String queueName) {
|
|
|
+ synchronized CSQueue getQueue(String queueName) {
|
|
|
return queues.get(queueName);
|
|
|
}
|
|
|
|
|
@@ -331,7 +331,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
String queueName, String user) {
|
|
|
|
|
|
// Sanity checks
|
|
|
- Queue queue = getQueue(queueName);
|
|
|
+ CSQueue queue = getQueue(queueName);
|
|
|
if (queue == null) {
|
|
|
String message = "Application " + applicationAttemptId +
|
|
|
" submitted by user " + user + " to unknown queue: " + queueName;
|
|
@@ -405,7 +405,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
|
|
|
// Inform the queue
|
|
|
String queueName = application.getQueue().getQueueName();
|
|
|
- Queue queue = queues.get(queueName);
|
|
|
+ CSQueue queue = queues.get(queueName);
|
|
|
if (!(queue instanceof LeafQueue)) {
|
|
|
LOG.error("Cannot finish application " + "from non-leaf queue: "
|
|
|
+ queueName);
|
|
@@ -479,7 +479,7 @@ implements ResourceScheduler, CapacitySchedulerContext {
|
|
|
public QueueInfo getQueueInfo(String queueName,
|
|
|
boolean includeChildQueues, boolean recursive)
|
|
|
throws IOException {
|
|
|
- Queue queue = null;
|
|
|
+ CSQueue queue = null;
|
|
|
|
|
|
synchronized (this) {
|
|
|
queue = this.queues.get(queueName);
|