Browse Source

YARN-11745: Fix TimSort contract violation in PriorityQueueComparator Class (#7278)

Hean Chhinling 3 months ago
parent
commit
9bf5e38c48

+ 3 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/PriorityUtilizationQueueOrderingPolicy.java

@@ -91,12 +91,12 @@ public class PriorityUtilizationQueueOrderingPolicy
   /**
    * Comparator that both looks at priority and utilization
    */
-  final private class PriorityQueueComparator
+  final public class PriorityQueueComparator
       implements Comparator<PriorityQueueResourcesForSorting> {
 
     final private String partition;
 
-    private PriorityQueueComparator(String partition) {
+    public PriorityQueueComparator(String partition) {
       this.partition = partition;
     }
 
@@ -164,7 +164,7 @@ public class PriorityUtilizationQueueOrderingPolicy
             q1Sort.configuredMinResource;
         Resource minEffRes2 =
             q2Sort.configuredMinResource;
-        if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals(
+        if (!minEffRes1.equals(Resources.none()) || !minEffRes2.equals(
             Resources.none())) {
           return minEffRes2.compareTo(minEffRes1);
         }

+ 54 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/policy/TestPriorityUtilizationQueueOrderingPolicy.java

@@ -33,6 +33,8 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.Collections;
+
 import java.util.concurrent.ThreadLocalRandom;
 
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
@@ -309,6 +311,58 @@ public class TestPriorityUtilizationQueueOrderingPolicy {
     assertDoesNotThrow(() -> policy.getAssignmentIterator(partition));
   }
 
+  @Test
+  public void testComparatorClassDoesNotViolateTimSortContract() {
+    String partition = "testPartition";
+
+    List<PriorityUtilizationQueueOrderingPolicy.
+            PriorityQueueResourcesForSorting> queues = new ArrayList<>();
+    for (int i = 0; i < 1000; i++) { // 1000 queues to have enough queues so the exception occur
+      queues.add(createMockPriorityQueueResourcesForSorting(partition));
+    }
+
+    Collections.shuffle(queues);
+    // java.lang.IllegalArgumentException: Comparison method violates its general contract!
+    assertDoesNotThrow(() -> queues.sort(new PriorityUtilizationQueueOrderingPolicy(true)
+            .new PriorityQueueComparator(partition)));
+
+  }
+
+  private PriorityUtilizationQueueOrderingPolicy.
+          PriorityQueueResourcesForSorting createMockPriorityQueueResourcesForSorting(
+          String partition) {
+    QueueResourceQuotas resourceQuotas = randomResourceQuotas(partition);
+
+    boolean isZeroResource = ThreadLocalRandom.current().nextBoolean();
+    if (isZeroResource) {
+      resourceQuotas.setConfiguredMinResource(partition,  Resource.newInstance(0, 0));
+    }
+
+    QueueCapacities mockQueueCapacities = mock(QueueCapacities.class);
+    when(mockQueueCapacities.getAbsoluteUsedCapacity(partition))
+            .thenReturn(4.2f); // could be any specific number, so that there are equal values
+    when(mockQueueCapacities.getUsedCapacity(partition))
+            .thenReturn(1.0f); // could be any specific number, so that there are equal values
+    when(mockQueueCapacities.getAbsoluteCapacity(partition))
+            .thenReturn(6.2f); // could be any specific number, so that there are equal values
+
+    CSQueue mockQueue = mock(CSQueue.class);
+    when(mockQueue.getQueueCapacities())
+            .thenReturn(mockQueueCapacities);
+    when(mockQueue.getPriority())
+            .thenReturn(Priority.newInstance(7)); // could be any specific number,
+    // so that there are equal values
+    when(mockQueue.getAccessibleNodeLabels())
+            .thenReturn(Collections.singleton(partition));
+    when(mockQueue.getQueueResourceQuotas())
+            .thenReturn(resourceQuotas);
+
+    return new PriorityUtilizationQueueOrderingPolicy.PriorityQueueResourcesForSorting(
+            mockQueue, partition
+    );
+
+  }
+
   private QueueCapacities randomQueueCapacities(String partition) {
     QueueCapacities qc = new QueueCapacities(false);
     qc.setAbsoluteCapacity(partition, (float) randFloat(0.0d, 100.0d));