Browse Source

YARN-8709: CS preemption monitor always fails since one under-served queue was deleted. Contributed by Tao Yang.

Eric E Payne 6 years ago
parent
commit
987d8191ad

+ 3 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -467,6 +467,9 @@ public class ProportionalCapacityPreemptionPolicy
     Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
     Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
         percentageClusterPreemptionAllowed);
         percentageClusterPreemptionAllowed);
 
 
+    //clear under served queues for every run
+    partitionToUnderServedQueues.clear();
+
     // based on ideal allocation select containers to be preemptionCandidates from each
     // based on ideal allocation select containers to be preemptionCandidates from each
     // queue and each application
     // queue and each application
     Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
     Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java

@@ -206,6 +206,11 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         mClock);
         mClock);
   }
   }
 
 
+  public void updateQueueConfig(String queuesConfig) {
+    ParentQueue root = mockQueueHierarchy(queuesConfig);
+    when(cs.getRootQueue()).thenReturn(root);
+  }
+
   private void mockContainers(String containersConfig, FiCaSchedulerApp app,
   private void mockContainers(String containersConfig, FiCaSchedulerApp app,
       ApplicationAttemptId attemptId, String queueName,
       ApplicationAttemptId attemptId, String queueName,
       List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
       List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {

+ 86 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java

@@ -71,9 +71,9 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
         "n1= res=100";
         "n1= res=100";
     String queuesConfig =
     String queuesConfig =
         // guaranteed,max,used,pending,reserved
         // guaranteed,max,used,pending,reserved
-        "root(=[100 100 79 120 0]);" + // root
+        "root(=[100 100 79 110 0]);" + // root
             "-a(=[11 100 11 50 0]);" + // a
             "-a(=[11 100 11 50 0]);" + // a
-            "-b(=[40 100 38 60 0]);" + // b
+            "-b(=[40 100 38 50 0]);" + // b
             "-c(=[20 100 10 10 0]);" + // c
             "-c(=[20 100 10 10 0]);" + // c
             "-d(=[29 100 20 0 0])"; // d
             "-d(=[29 100 20 0 0])"; // d
 
 
@@ -128,9 +128,9 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
         "n1= res=100";
         "n1= res=100";
     String queuesConfig =
     String queuesConfig =
         // guaranteed,max,used,pending,reserved
         // guaranteed,max,used,pending,reserved
-        "root(=[100 100 80 120 0]);" + // root
+        "root(=[100 100 80 110 0]);" + // root
             "-a(=[11 100 11 50 0]);" + // a
             "-a(=[11 100 11 50 0]);" + // a
-            "-b(=[40 100 38 60 0]);" + // b
+            "-b(=[40 100 38 50 0]);" + // b
             "-c(=[20 100 10 10 0]);" + // c
             "-c(=[20 100 10 10 0]);" + // c
             "-d(=[29 100 20 0 0])"; // d
             "-d(=[29 100 20 0 0])"; // d
 
 
@@ -942,4 +942,86 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     verify(mDisp, times(22))
     verify(mDisp, times(22))
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
         .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
   }
   }
+
+  @Test
+  public void testIntraQueuePreemptionAfterQueueDropped()
+      throws IOException {
+    /**
+     * Test intra queue preemption after under-served queue dropped,
+     * At first, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * </pre>
+     *
+     * After dropped under-served queue "c", Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  |  \
+     *    a   b  d
+     * </pre>
+     *
+     * Verify no exception is thrown and preemption results is correct
+     */
+    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
+        "priority_first");
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 79 110 0]);" + // root
+            "-a(=[11 100 11 50 0]);" + // a
+            "-b(=[40 100 38 50 0]);" + // b
+            "-c(=[20 100 10 10 0]);" + // c
+            "-d(=[29 100 20 0 0])"; // d
+
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved,
+        // pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,6,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(1,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,34,false,20);" + // app3 b
+            "b\t" // app4 in b
+            + "(4,1,n1,,2,false,10);" + // app4 b
+            "b\t" // app4 in b
+            + "(5,1,n1,,1,false,10);" + // app5 b
+            "b\t" // app4 in b
+            + "(6,1,n1,,1,false,10);" + // app6 in b
+            "c\t" // app1 in a
+            + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+            + "(1,1,n1,,20,false,0)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 69 100 0]);" + // root
+            "-a(=[11 100 11 50 0]);" + // a
+            "-b(=[40 100 38 50 0]);" + // b
+            "-d(=[49 100 20 0 0])"; // d
+
+    updateQueueConfig(queuesConfig);
+
+    // will throw YarnRuntimeException(This shouldn't happen, cannot find
+    // TempQueuePerPartition for queueName=c) without patch in YARN-8709
+    policy.editSchedule();
+
+    // For queue B, app3 and app4 were of lower priority. Hence take 8
+    // containers from them by hitting the intraQueuePreemptionDemand of 20%.
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+    verify(mDisp, times(7)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
 }
 }