|
@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
|
|
+import java.util.Collection;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.Comparator;
|
|
import java.util.Comparator;
|
|
import java.util.Iterator;
|
|
import java.util.Iterator;
|
|
@@ -95,24 +96,26 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli
|
|
/**
|
|
/**
|
|
* Comparator that both looks at priority and utilization
|
|
* Comparator that both looks at priority and utilization
|
|
*/
|
|
*/
|
|
- private class PriorityQueueComparator implements Comparator<CSQueue> {
|
|
|
|
|
|
+ private class PriorityQueueComparator
|
|
|
|
+ implements Comparator<PriorityQueueResourcesForSorting> {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public int compare(CSQueue q1, CSQueue q2) {
|
|
|
|
|
|
+ public int compare(PriorityQueueResourcesForSorting q1Sort,
|
|
|
|
+ PriorityQueueResourcesForSorting q2Sort) {
|
|
String p = partitionToLookAt.get();
|
|
String p = partitionToLookAt.get();
|
|
|
|
|
|
- int rc = compareQueueAccessToPartition(q1, q2, p);
|
|
|
|
|
|
+ int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
|
|
if (0 != rc) {
|
|
if (0 != rc) {
|
|
return rc;
|
|
return rc;
|
|
}
|
|
}
|
|
|
|
|
|
- float used1 = q1.getQueueCapacities().getUsedCapacity(p);
|
|
|
|
- float used2 = q2.getQueueCapacities().getUsedCapacity(p);
|
|
|
|
|
|
+ float used1 = q1Sort.usedCapacity;
|
|
|
|
+ float used2 = q2Sort.usedCapacity;
|
|
int p1 = 0;
|
|
int p1 = 0;
|
|
int p2 = 0;
|
|
int p2 = 0;
|
|
if (respectPriority) {
|
|
if (respectPriority) {
|
|
- p1 = q1.getPriority().getPriority();
|
|
|
|
- p2 = q2.getPriority().getPriority();
|
|
|
|
|
|
+ p1 = q1Sort.queue.getPriority().getPriority();
|
|
|
|
+ p2 = q2Sort.queue.getPriority().getPriority();
|
|
}
|
|
}
|
|
|
|
|
|
rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2);
|
|
rc = PriorityUtilizationQueueOrderingPolicy.compare(used1, used2, p1, p2);
|
|
@@ -120,8 +123,8 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli
|
|
// For queue with same used ratio / priority, queue with higher configured
|
|
// For queue with same used ratio / priority, queue with higher configured
|
|
// capacity goes first
|
|
// capacity goes first
|
|
if (0 == rc) {
|
|
if (0 == rc) {
|
|
- float abs1 = q1.getQueueCapacities().getAbsoluteCapacity(p);
|
|
|
|
- float abs2 = q2.getQueueCapacities().getAbsoluteCapacity(p);
|
|
|
|
|
|
+ float abs1 = q1Sort.absoluteCapacity;
|
|
|
|
+ float abs2 = q2Sort.absoluteCapacity;
|
|
return Float.compare(abs2, abs1);
|
|
return Float.compare(abs2, abs1);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -156,6 +159,29 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ /**
|
|
|
|
+ * A simple storage class to represent a snapshot of a queue.
|
|
|
|
+ */
|
|
|
|
+ public static class PriorityQueueResourcesForSorting {
|
|
|
|
+ private final float usedCapacity;
|
|
|
|
+ private final float absoluteCapacity;
|
|
|
|
+ private final CSQueue queue;
|
|
|
|
+
|
|
|
|
+ PriorityQueueResourcesForSorting(CSQueue queue) {
|
|
|
|
+ this.queue = queue;
|
|
|
|
+ this.usedCapacity =
|
|
|
|
+ queue.getQueueCapacities().
|
|
|
|
+ getUsedCapacity(partitionToLookAt.get());
|
|
|
|
+ this.absoluteCapacity =
|
|
|
|
+ queue.getQueueCapacities().
|
|
|
|
+ getAbsoluteCapacity(partitionToLookAt.get());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public CSQueue getQueue() {
|
|
|
|
+ return queue;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
|
|
public PriorityUtilizationQueueOrderingPolicy(boolean respectPriority) {
|
|
this.respectPriority = respectPriority;
|
|
this.respectPriority = respectPriority;
|
|
}
|
|
}
|
|
@@ -167,12 +193,23 @@ public class PriorityUtilizationQueueOrderingPolicy implements QueueOrderingPoli
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public Iterator<CSQueue> getAssignmentIterator(String partition) {
|
|
public Iterator<CSQueue> getAssignmentIterator(String partition) {
|
|
- // Since partitionToLookAt is a thread local variable, and every time we
|
|
|
|
- // copy and sort queues, so it's safe for multi-threading environment.
|
|
|
|
|
|
+ // partitionToLookAt is a thread local variable, therefore it is safe to mutate it.
|
|
PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
|
|
PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);
|
|
- List<CSQueue> sortedQueue = new ArrayList<>(queues);
|
|
|
|
- Collections.sort(sortedQueue, new PriorityQueueComparator());
|
|
|
|
- return sortedQueue.iterator();
|
|
|
|
|
|
+
|
|
|
|
+ // Sort the snapshot of the queues in order to avoid breaking the prerequisites of TimSort.
|
|
|
|
+ // See YARN-10178 for details.
|
|
|
|
+ List<PriorityQueueResourcesForSorting> queueSnapshots = new ArrayList<>();
|
|
|
|
+ for (CSQueue queue : queues) {
|
|
|
|
+ queueSnapshots.add(new PriorityQueueResourcesForSorting(queue));
|
|
|
|
+ }
|
|
|
|
+ Collections.sort(queueSnapshots, new PriorityQueueComparator());
|
|
|
|
+
|
|
|
|
+ List<CSQueue> sortedQueues = new ArrayList<>();
|
|
|
|
+ for (PriorityQueueResourcesForSorting queueSnapshot : queueSnapshots) {
|
|
|
|
+ sortedQueues.add(queueSnapshot.queue);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return sortedQueues.iterator();
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|