|
@@ -57,7 +57,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
@@ -84,6 +86,7 @@ import com.codahale.metrics.Timer;
|
|
public class SLSCapacityScheduler extends CapacityScheduler implements
|
|
public class SLSCapacityScheduler extends CapacityScheduler implements
|
|
SchedulerWrapper,Configurable {
|
|
SchedulerWrapper,Configurable {
|
|
private static final String EOL = System.getProperty("line.separator");
|
|
private static final String EOL = System.getProperty("line.separator");
|
|
|
|
+ private static final String QUEUE_COUNTER_PREFIX = "counter.queue.";
|
|
private static final int SAMPLING_SIZE = 60;
|
|
private static final int SAMPLING_SIZE = 60;
|
|
private ScheduledExecutorService pool;
|
|
private ScheduledExecutorService pool;
|
|
// counters for scheduler allocate/handle operations
|
|
// counters for scheduler allocate/handle operations
|
|
@@ -747,6 +750,47 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void initQueueMetrics(CSQueue queue) {
|
|
|
|
+ if (queue instanceof LeafQueue) {
|
|
|
|
+ SortedMap<String, Counter> counterMap = metrics.getCounters();
|
|
|
|
+ String queueName = queue.getQueueName();
|
|
|
|
+ String[] names = new String[]{
|
|
|
|
+ QUEUE_COUNTER_PREFIX + queueName + ".pending.memory",
|
|
|
|
+ QUEUE_COUNTER_PREFIX + queueName + ".pending.cores",
|
|
|
|
+ QUEUE_COUNTER_PREFIX + queueName + ".allocated.memory",
|
|
|
|
+ QUEUE_COUNTER_PREFIX + queueName + ".allocated.cores" };
|
|
|
|
+
|
|
|
|
+ for (int i = names.length - 1; i >= 0; i--) {
|
|
|
|
+ if (!counterMap.containsKey(names[i])) {
|
|
|
|
+ metrics.counter(names[i]);
|
|
|
|
+ counterMap = metrics.getCounters();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ queueLock.lock();
|
|
|
|
+ try {
|
|
|
|
+ if (!schedulerMetrics.isTracked(queueName)) {
|
|
|
|
+ schedulerMetrics.trackQueue(queueName);
|
|
|
|
+ }
|
|
|
|
+ } finally {
|
|
|
|
+ queueLock.unlock();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for (CSQueue child : queue.getChildQueues()) {
|
|
|
|
+ initQueueMetrics(child);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void serviceInit(Configuration configuration) throws Exception {
|
|
|
|
+ super.serviceInit(configuration);
|
|
|
|
+
|
|
|
|
+ initQueueMetrics(getRootQueue());
|
|
|
|
+ }
|
|
|
|
+
|
|
public void setQueueSet(Set<String> queues) {
|
|
public void setQueueSet(Set<String> queues) {
|
|
this.queueSet = queues;
|
|
this.queueSet = queues;
|
|
}
|
|
}
|