浏览代码

YARN-6109. Add an ability to convert ChildQueue to ParentQueue. (Xuan Gong via wangda)

(cherry picked from commit 3fdae0a2b66c5fb6853875b66fcf50bc96d6e2e9)
Wangda Tan 8 年之前
父节点
当前提交
70a810adc0

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java

@@ -263,6 +263,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
   /**
    * Ensure all existing queues are present. Queues cannot be deleted if its not
    * in Stopped state, Queue's cannot be moved from one hierarchy to other also.
+   * Previous child queue could be converted into parent queue if it is in
+   * STOPPED state.
    *
    * @param queues existing queues
    * @param newQueues new queues
@@ -291,6 +293,17 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
           throw new IOException(queueName + " is moved from:"
               + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
               + " after refresh, which is not allowed.");
+        } else  if (oldQueue instanceof LeafQueue
+            && newQueue instanceof ParentQueue) {
+          if (oldQueue.getState() == QueueState.STOPPED) {
+            LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath()
+                + " to parent queue.");
+          } else {
+            throw new IOException("Can not convert the leaf queue: "
+                + oldQueue.getQueuePath() + " to parent queue since "
+                + "it is not yet in stopped state. Current State : "
+                + oldQueue.getState());
+          }
         }
       }
     }

+ 16 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -315,7 +315,22 @@ public class ParentQueue extends AbstractCSQueue {
 
         // Check if the child-queue already exists
         if (childQueue != null) {
-          // Re-init existing child queues
+          // Check if the child-queue has been converted into parent queue.
+          // The CS has already checked to ensure that this child-queue is in
+          // STOPPED state.
+          if (childQueue instanceof LeafQueue
+              && newChildQueue instanceof ParentQueue) {
+            // We would convert this LeafQueue to ParentQueue, consider this
+            // as the combination of DELETE then ADD.
+            newChildQueue.setParent(this);
+            currentChildQueues.put(newChildQueueName, newChildQueue);
+            // inform CapacitySchedulerQueueManager
+            CapacitySchedulerQueueManager queueManager = this.csContext
+                .getCapacitySchedulerQueueManager();
+            queueManager.addQueue(newChildQueueName, newChildQueue);
+            continue;
+          }
+          // Re-init existing queues
           childQueue.reinitialize(newChildQueue, clusterResource);
           LOG.info(getQueueName() + ": re-configured queue: " + childQueue);
         } else{

+ 103 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -470,6 +470,52 @@ public class TestCapacityScheduler {
     return conf;
   }
 
+  /**
+   * @param conf, to be modified
+   * @return, CS configuration which has converted b1 to parent queue
+   *           root
+   *          /     \
+   *        a        b
+   *       / \    /  |  \
+   *      a1  a2 b1  b2  b3
+   *              |
+   *             b11
+   */
+  private CapacitySchedulerConfiguration
+      setupQueueConfigurationWithB1AsParentQueue(
+          CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(A, new String[] { "a1", "a2" });
+    conf.setCapacity(A1, A1_CAPACITY);
+    conf.setUserLimitFactor(A1, 100.0f);
+    conf.setCapacity(A2, A2_CAPACITY);
+    conf.setUserLimitFactor(A2, 100.0f);
+
+    conf.setQueues(B, new String[] {"b1","b2", "b3"});
+    conf.setCapacity(B1, B1_CAPACITY);
+    conf.setUserLimitFactor(B1, 100.0f);
+    conf.setCapacity(B2, B2_CAPACITY);
+    conf.setUserLimitFactor(B2, 100.0f);
+    conf.setCapacity(B3, B3_CAPACITY);
+    conf.setUserLimitFactor(B3, 100.0f);
+
+    // Set childQueue for B1
+    conf.setQueues(B1, new String[] {"b11"});
+    String B11 = B1 + ".b11";
+    conf.setCapacity(B11, 100.0f);
+    conf.setUserLimitFactor(B11, 100.0f);
+
+    return conf;
+  }
+
   /**
    * @param conf, to be modified
    * @return, CS configuration which has deleted a
@@ -4140,4 +4186,61 @@ public class TestCapacityScheduler {
 
     cs.stop();
   }
+
+  /**
+   * Test if we can convert a leaf queue to a parent queue
+   * @throws Exception
+   */
+  @Test (timeout = 10000)
+  public void testConvertLeafQueueToParentQueue() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    String targetQueue = "b1";
+    CSQueue b1 = cs.getQueue(targetQueue);
+    Assert.assertEquals(b1.getState(), QueueState.RUNNING);
+
+    // test if we can convert a leaf queue which is in RUNNING state
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithB1AsParentQueue(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to convert"
+          + " a child queue to a parent queue.");
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // now set queue state for b1 to STOPPED
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    conf.set("yarn.scheduler.capacity.root.b.b1.state", "STOPPED");
+    cs.reinitialize(conf, mockContext);
+    Assert.assertEquals(b1.getState(), QueueState.STOPPED);
+
+    // test if we can convert a leaf queue which is in STOPPED state
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithB1AsParentQueue(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      fail("Expected to NOT throw exception when refresh queue tries"
+          + " to convert a leaf queue WITHOUT running apps");
+    }
+    b1 = cs.getQueue(targetQueue);
+    Assert.assertTrue(b1 instanceof ParentQueue);
+    Assert.assertEquals(b1.getState(), QueueState.RUNNING);
+    Assert.assertTrue(!b1.getChildQueues().isEmpty());
+  }
 }