Selaa lähdekoodia

YARN-8992. Fair scheduler can delete a dynamic queue while an application attempt is being added to the queue. (Contributed by Wilfred Spiegelenburg)

Haibo Chen 6 vuotta sitten
vanhempi
commit
a41b648e98

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java

@@ -457,6 +457,20 @@ public class FSLeafQueue extends FSQueue {
     }
   }
 
+  @Override
+  public boolean isEmpty() {
+    readLock.lock();
+    try {
+      if (runnableApps.size() > 0 || nonRunnableApps.size() > 0 ||
+          assignedApps.size() > 0) {
+        return false;
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return true;
+  }
+
   /**
    * TODO: Based on how frequently this is called, we might want to club
    * counting pending and active apps in the same method.

+ 15 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java

@@ -263,6 +263,21 @@ public class FSParentQueue extends FSQueue {
     }
   }
 
+  @Override
+  public boolean isEmpty() {
+    readLock.lock();
+    try {
+      for (FSQueue queue: childQueues) {
+        if (!queue.isEmpty()) {
+          return false;
+        }
+      }
+    } finally {
+      readLock.unlock();
+    }
+    return true;
+  }
+
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java

@@ -601,4 +601,6 @@ public abstract class FSQueue implements Queue, Schedulable {
   public void setDynamic(boolean dynamic) {
     this.isDynamic = dynamic;
   }
+
+  public abstract boolean isEmpty();
 }

+ 2 - 22
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java

@@ -498,7 +498,7 @@ public class QueueManager {
       }
       while (!parentQueuesToCheck.isEmpty()) {
         FSParentQueue queue = parentQueuesToCheck.iterator().next();
-        if (queue.getChildQueues().isEmpty()) {
+        if (queue.isEmpty()) {
           removeQueue(queue);
           if (queue.getParent().isDynamic()) {
             parentQueuesToCheck.add(queue.getParent());
@@ -528,7 +528,7 @@ public class QueueManager {
    * @return true if removed, false otherwise
    */
   private boolean removeQueueIfEmpty(FSQueue queue) {
-    if (isEmpty(queue)) {
+    if (queue.isEmpty()) {
       removeQueue(queue);
       return true;
     }
@@ -553,26 +553,6 @@ public class QueueManager {
     }
   }
   
-  /**
-   * Returns true if there are no applications, running or not, in the given
-   * queue or any of its descendents.
-   */
-  protected boolean isEmpty(FSQueue queue) {
-    if (queue instanceof FSLeafQueue) {
-      FSLeafQueue leafQueue = (FSLeafQueue)queue;
-      return queue.getNumRunnableApps() == 0 &&
-          leafQueue.getNumNonRunnableApps() == 0 &&
-          leafQueue.getNumAssignedApps() == 0;
-    } else {
-      for (FSQueue child : queue.getChildQueues()) {
-        if (!isEmpty(child)) {
-          return false;
-        }
-      }
-      return true;
-    }
-  }
-
   /**
    * Gets a queue by name.
    */

+ 1 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSParentQueue.java

@@ -23,9 +23,6 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.HashSet;
-import java.util.Set;
-
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
@@ -34,7 +31,6 @@ public class TestFSParentQueue {
 
   private FairSchedulerConfiguration conf;
   private QueueManager queueManager;
-  private Set<FSQueue> notEmptyQueues;
 
   @Before
   public void setUp() throws Exception {
@@ -47,13 +43,7 @@ public class TestFSParentQueue {
         new DefaultResourceCalculator());
     SystemClock clock = SystemClock.getInstance();
     when(scheduler.getClock()).thenReturn(clock);
-    notEmptyQueues = new HashSet<FSQueue>();
-    queueManager = new QueueManager(scheduler) {
-      @Override
-      public boolean isEmpty(FSQueue queue) {
-        return !notEmptyQueues.contains(queue);
-      }
-    };
+    queueManager = new QueueManager(scheduler);
     FSQueueMetrics.forQueue("root", null, true, conf);
     queueManager.initialize(conf);
   }

+ 156 - 90
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java

@@ -21,7 +21,6 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
 
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -41,7 +40,6 @@ import com.google.common.collect.Sets;
 public class TestQueueManager {
   private FairSchedulerConfiguration conf;
   private QueueManager queueManager;
-  private Set<FSQueue> notEmptyQueues;
   private FairScheduler scheduler;
   
   @Before
@@ -64,21 +62,18 @@ public class TestQueueManager {
     SystemClock clock = SystemClock.getInstance();
 
     when(scheduler.getClock()).thenReturn(clock);
-    notEmptyQueues = new HashSet<>();
-    queueManager = new QueueManager(scheduler) {
-      @Override
-      public boolean isEmpty(FSQueue queue) {
-        return !notEmptyQueues.contains(queue);
-      }
-    };
+    queueManager = new QueueManager(scheduler);
 
     FSQueueMetrics.forQueue("root", null, true, conf);
-
     queueManager.initialize(conf);
+    queueManager.updateAllocationConfiguration(allocConf);
   }
 
+  /**
+   * Test the leaf to parent queue conversion, excluding the default queue.
+   */
   @Test
-  public void testReloadTurnsLeafQueueIntoParent() throws Exception {
+  public void testReloadTurnsLeafQueueIntoParent() {
     updateConfiguredLeafQueues(queueManager, "queue1");
     
     // When no apps are running in the leaf queue, should be fine turning it
@@ -95,16 +90,19 @@ public class TestQueueManager {
     
     // When apps exist in leaf queue, we shouldn't be able to create
     // children under it, but things should work otherwise.
-    notEmptyQueues.add(queueManager.getLeafQueue("queue1", false));
+    FSLeafQueue q1 = queueManager.getLeafQueue("queue1", false);
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    q1.addAssignedApp(appId);
     updateConfiguredLeafQueues(queueManager, "queue1.queue2");
     assertNull(queueManager.getLeafQueue("queue1.queue2", false));
     assertNotNull(queueManager.getLeafQueue("queue1", false));
     
     // When apps exist in leaf queues under a parent queue, shouldn't be
     // able to turn it into a leaf queue, but things should work otherwise.
-    notEmptyQueues.clear();
+    q1.removeAssignedApp(appId);
     updateConfiguredLeafQueues(queueManager, "queue1.queue2");
-    notEmptyQueues.add(queueManager.getQueue("root.queue1"));
+    FSLeafQueue q2 = queueManager.getLeafQueue("queue1.queue2", false);
+    q2.addAssignedApp(appId);
     updateConfiguredLeafQueues(queueManager, "queue1");
     assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
     assertNull(queueManager.getLeafQueue("queue1", false));
@@ -114,7 +112,10 @@ public class TestQueueManager {
     assertNull(queueManager.getLeafQueue("default.queue3", false));
     assertNotNull(queueManager.getLeafQueue("default", false));
   }
-  
+
+  /**
+   * Test the postponed leaf to parent queue conversion (app running).
+   */
   @Test
   public void testReloadTurnsLeafToParentWithNoLeaf() {
     AllocationConfiguration allocConf = new AllocationConfiguration(conf);
@@ -126,7 +127,9 @@ public class TestQueueManager {
     // Lets say later on admin makes queue1 a parent queue by
     // specifying "type=parent" in the alloc xml and lets say apps running in
     // queue1
-    notEmptyQueues.add(queueManager.getLeafQueue("root.queue1", false));
+    FSLeafQueue q1 = queueManager.getLeafQueue("queue1", false);
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    q1.addAssignedApp(appId);
     allocConf = new AllocationConfiguration(conf);
     allocConf.configuredQueues.get(FSQueueType.PARENT)
         .add("root.queue1");
@@ -137,7 +140,7 @@ public class TestQueueManager {
     assertNull(queueManager.getParentQueue("root.queue1", false));
 
     // Now lets assume apps completed and there are no apps in queue1
-    notEmptyQueues.clear();
+    q1.removeAssignedApp(appId);
     // We should see queue1 transform from leaf queue to parent queue.
     queueManager.updateAllocationConfiguration(allocConf);
     assertNull(queueManager.getLeafQueue("root.queue1", false));
@@ -147,6 +150,9 @@ public class TestQueueManager {
         .getChildQueues().isEmpty());
   }
 
+  /**
+   * Check the queue name parsing (blank space in all forms).
+   */
   @Test
   public void testCheckQueueNodeName() {
     assertFalse(queueManager.isQueueNameValid(""));
@@ -161,9 +167,11 @@ public class TestQueueManager {
     assertTrue(queueManager.isQueueNameValid("a"));
   }
 
-  private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLeafQueues) {
+  private void updateConfiguredLeafQueues(QueueManager queueMgr,
+                                          String... confLeafQueues) {
     AllocationConfiguration allocConf = new AllocationConfiguration(conf);
-    allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
+    allocConf.configuredQueues.get(FSQueueType.LEAF)
+        .addAll(Sets.newHashSet(confLeafQueues));
     queueMgr.updateAllocationConfiguration(allocConf);
   }
 
@@ -172,10 +180,6 @@ public class TestQueueManager {
    */
   @Test
   public void testCreateLeafQueue() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF);
 
     assertNotNull("Leaf queue root.queue1 was not created",
@@ -189,10 +193,6 @@ public class TestQueueManager {
    */
   @Test
   public void testCreateLeafQueueAndParent() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSQueue q2 = queueManager.createQueue("root.queue1.queue2",
         FSQueueType.LEAF);
 
@@ -212,9 +212,6 @@ public class TestQueueManager {
    */
   @Test
   public void testCreateQueueWithChildDefaults() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
     queueManager.getQueue("root.test").setMaxChildQueueResource(
         new ConfigurableResource(Resources.createResource(8192, 256)));
 
@@ -258,7 +255,6 @@ public class TestQueueManager {
    */
   @Test
   public void testCreateLeafQueueWithDefaults() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.LEAF);
 
     assertNotNull("Leaf queue root.queue1 was not created",
@@ -281,10 +277,6 @@ public class TestQueueManager {
    */
   @Test
   public void testCreateParentQueue() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSQueue q1 = queueManager.createQueue("root.queue1", FSQueueType.PARENT);
 
     assertNotNull("Parent queue root.queue1 was not created",
@@ -298,10 +290,6 @@ public class TestQueueManager {
    */
   @Test
   public void testCreateParentQueueAndParent() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSQueue q2 = queueManager.createQueue("root.queue1.queue2",
         FSQueueType.PARENT);
 
@@ -313,13 +301,13 @@ public class TestQueueManager {
         "root.queue1.queue2", q2.getName());
   }
 
+  /**
+   * Test the removal of a dynamic leaf under a hierarchy of static parents.
+   */
   @Test
   public void testRemovalOfDynamicLeafQueue() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
-
-    FSQueue q1 = queueManager.getLeafQueue("root.test.childB.dynamic1", true);
+    FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childB.dynamic1",
+        true);
 
     assertNotNull("Queue root.test.childB.dynamic1 was not created", q1);
     assertEquals("createQueue() returned wrong queue",
@@ -328,7 +316,8 @@ public class TestQueueManager {
         q1.isDynamic());
 
     // an application is submitted to root.test.childB.dynamic1
-    notEmptyQueues.add(q1);
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    q1.addAssignedApp(appId);
 
     // root.test.childB.dynamic1 is not empty and should not be removed
     queueManager.removePendingIncompatibleQueues();
@@ -338,7 +327,7 @@ public class TestQueueManager {
 
     // the application finishes, the next removeEmptyDynamicQueues() should
     // clean root.test.childB.dynamic1 up, but keep its static parent
-    notEmptyQueues.remove(q1);
+    q1.removeAssignedApp(appId);
 
     queueManager.removePendingIncompatibleQueues();
     queueManager.removeEmptyDynamicQueues();
@@ -348,12 +337,11 @@ public class TestQueueManager {
         queueManager.getParentQueue("root.test.childB", false));
   }
 
+  /**
+   * Test the removal of a dynamic parent and its child in one cleanup action.
+   */
   @Test
   public void testRemovalOfDynamicParentQueue() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSQueue q1 = queueManager.getLeafQueue("root.parent1.dynamic1", true);
 
     assertNotNull("Queue root.parent1.dynamic1 was not created", q1);
@@ -374,12 +362,11 @@ public class TestQueueManager {
     assertNull("Queue root.parent1 was not deleted", p1);
   }
 
+  /**
+   * Test the change from dynamic to static for a leaf queue.
+   */
   @Test
   public void testNonEmptyDynamicQueueBecomingStaticQueue() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSLeafQueue q1 = queueManager.getLeafQueue("root.leaf1", true);
 
     assertNotNull("Queue root.leaf1 was not created", q1);
@@ -388,7 +375,8 @@ public class TestQueueManager {
     assertTrue("root.leaf1 is not a dynamic queue", q1.isDynamic());
 
     // pretend that we submitted an app to the queue
-    notEmptyQueues.add(q1);
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    q1.addAssignedApp(appId);
 
     // non-empty queues should not be deleted
     queueManager.removePendingIncompatibleQueues();
@@ -397,6 +385,7 @@ public class TestQueueManager {
     assertNotNull("Queue root.leaf1 was deleted", q1);
 
     // next we add leaf1 under root in the allocation config
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.leaf1");
     queueManager.updateAllocationConfiguration(allocConf);
 
@@ -406,7 +395,7 @@ public class TestQueueManager {
     // application finished now and the queue is empty, but since leaf1 is a
     // static queue at this point, hence not affected by
     // removeEmptyDynamicQueues()
-    notEmptyQueues.clear();
+    q1.removeAssignedApp(appId);
     queueManager.removePendingIncompatibleQueues();
     queueManager.removeEmptyDynamicQueues();
     q1 = queueManager.getLeafQueue("root.leaf1", false);
@@ -414,11 +403,11 @@ public class TestQueueManager {
     assertFalse("root.leaf1 is not a static queue", q1.isDynamic());
   }
 
+  /**
+   * Test the change from static to dynamic for a leaf queue.
+   */
   @Test
   public void testNonEmptyStaticQueueBecomingDynamicQueue() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSLeafQueue q1 = queueManager.getLeafQueue("root.test.childA", false);
 
     assertNotNull("Queue root.test.childA does not exist", q1);
@@ -427,7 +416,8 @@ public class TestQueueManager {
     assertFalse("root.test.childA is not a static queue", q1.isDynamic());
 
     // we submitted an app to the queue
-    notEmptyQueues.add(q1);
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    q1.addAssignedApp(appId);
 
     // the next removeEmptyDynamicQueues() call should not modify
     // root.test.childA
@@ -439,6 +429,7 @@ public class TestQueueManager {
 
     // next we remove all queues from the allocation config,
     // this causes all queues to change to dynamic
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     for (Set<String> queueNames : allocConf.configuredQueues.values()) {
       queueManager.setQueuesToDynamic(queueNames);
       queueNames.clear();
@@ -451,7 +442,7 @@ public class TestQueueManager {
 
     // application finished - the queue does not have runnable app
     // the next removeEmptyDynamicQueues() call should remove the queues
-    notEmptyQueues.remove(q1);
+    q1.removeAssignedApp(appId);
 
     queueManager.removePendingIncompatibleQueues();
     queueManager.removeEmptyDynamicQueues();
@@ -463,11 +454,11 @@ public class TestQueueManager {
     assertNull("Queue root.test was not deleted", p1);
   }
 
+  /**
+   * Testing the removal of a dynamic parent queue without a child.
+   */
   @Test
   public void testRemovalOfChildlessParentQueue() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSParentQueue q1 = queueManager.getParentQueue("root.test.childB", false);
 
     assertNotNull("Queue root.test.childB was not created", q1);
@@ -482,6 +473,7 @@ public class TestQueueManager {
     assertNotNull("Queue root.test.childB was deleted", q1);
 
     // next we remove root.test.childB from the allocation config
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     allocConf.configuredQueues.get(FSQueueType.PARENT)
         .remove("root.test.childB");
     queueManager.updateAllocationConfiguration(allocConf);
@@ -495,11 +487,12 @@ public class TestQueueManager {
     assertNull("Queue root.leaf1 was not deleted", q1);
   }
 
+  /**
+   * Test if a queue is correctly changed from dynamic to static and vice
+   * versa.
+   */
   @Test
   public void testQueueTypeChange() {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-    queueManager.updateAllocationConfiguration(allocConf);
-
     FSQueue q1 = queueManager.getLeafQueue("root.parent1.leaf1", true);
     assertNotNull("Queue root.parent1.leaf1 was not created", q1);
     assertEquals("createQueue() returned wrong queue",
@@ -511,6 +504,7 @@ public class TestQueueManager {
     assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
 
     // adding root.parent1.leaf1 and root.parent1 to the allocation config
+    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     allocConf.configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
     allocConf.configuredQueues.get(FSQueueType.LEAF)
         .add("root.parent1.leaf1");
@@ -539,30 +533,27 @@ public class TestQueueManager {
     assertTrue("root.parent1 is not a dynamic queue", p1.isDynamic());
   }
 
+  /**
+   * Test that an assigned app flags a queue as being not empty.
+   */
   @Test
-  public void testApplicationAssignmentPreventsRemovalOfDynamicQueue()
-      throws Exception {
-    AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
-    queueManager = new QueueManager(scheduler);
-    queueManager.initialize(conf);
-    queueManager.updateAllocationConfiguration(allocConf);
-
+  public void testApplicationAssignmentPreventsRemovalOfDynamicQueue() {
     FSLeafQueue q = queueManager.getLeafQueue("root.leaf1", true);
     assertNotNull("root.leaf1 does not exist", q);
-    assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
+    assertTrue("root.leaf1 is not empty", q.isEmpty());
 
     // assigning an application (without an appAttempt so far) to the queue
     // removeEmptyDynamicQueues() should not remove the queue
     ApplicationId applicationId = ApplicationId.newInstance(1L, 0);
     q.addAssignedApp(applicationId);
     q = queueManager.getLeafQueue("root.leaf1", false);
-    assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+    assertFalse("root.leaf1 is empty", q.isEmpty());
 
     queueManager.removePendingIncompatibleQueues();
     queueManager.removeEmptyDynamicQueues();
     q = queueManager.getLeafQueue("root.leaf1", false);
     assertNotNull("root.leaf1 has been removed", q);
-    assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+    assertFalse("root.leaf1 is empty", q.isEmpty());
 
     ApplicationAttemptId applicationAttemptId =
         ApplicationAttemptId.newInstance(applicationId, 0);
@@ -578,12 +569,12 @@ public class TestQueueManager {
     queueManager.removeEmptyDynamicQueues();
     q = queueManager.getLeafQueue("root.leaf1", false);
     assertNotNull("root.leaf1 has been removed", q);
-    assertFalse("root.leaf1 is empty", queueManager.isEmpty(q));
+    assertFalse("root.leaf1 is empty", q.isEmpty());
 
     // the appAttempt finished, the queue should be empty
     q.removeApp(appAttempt);
     q = queueManager.getLeafQueue("root.leaf1", false);
-    assertTrue("root.leaf1 is not empty", queueManager.isEmpty(q));
+    assertTrue("root.leaf1 is not empty", q.isEmpty());
 
     // removeEmptyDynamicQueues() should remove the queue
     queueManager.removePendingIncompatibleQueues();
@@ -592,9 +583,12 @@ public class TestQueueManager {
     assertNull("root.leaf1 has not been removed", q);
   }
 
+  /**
+   * Test changing a leaf into a parent queue and auto create of the leaf queue
+   * under the newly created parent.
+   */
   @Test
-  public void testRemovalOfIncompatibleNonEmptyQueue()
-      throws Exception {
+  public void testRemovalOfIncompatibleNonEmptyQueue() {
     AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
     allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.a");
     scheduler.allocConf = allocConf;
@@ -602,13 +596,12 @@ public class TestQueueManager {
 
     FSLeafQueue q = queueManager.getLeafQueue("root.a", true);
     assertNotNull("root.a does not exist", q);
-    assertTrue("root.a is not empty", queueManager.isEmpty(q));
+    assertTrue("root.a is not empty", q.isEmpty());
 
     // we start to run an application on root.a
-    notEmptyQueues.add(q);
-    q = queueManager.getLeafQueue("root.a", false);
-    assertNotNull("root.a does not exist", q);
-    assertFalse("root.a is empty", queueManager.isEmpty(q));
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    q.addAssignedApp(appId);
+    assertFalse("root.a is empty", q.isEmpty());
 
     // root.a should not be removed by removeEmptyDynamicQueues or by
     // removePendingIncompatibleQueues
@@ -626,20 +619,93 @@ public class TestQueueManager {
     // since root.a has running applications, it should be still a leaf queue
     q = queueManager.getLeafQueue("root.a", false);
     assertNotNull("root.a has been removed", q);
-    assertFalse("root.a is empty", queueManager.isEmpty(q));
+    assertFalse("root.a is empty", q.isEmpty());
 
     // removePendingIncompatibleQueues should still keep root.a as a leaf queue
     queueManager.removePendingIncompatibleQueues();
     q = queueManager.getLeafQueue("root.a", false);
     assertNotNull("root.a has been removed", q);
-    assertFalse("root.a is empty", queueManager.isEmpty(q));
+    assertFalse("root.a is empty", q.isEmpty());
 
-    // when the application finishes, root.a should be a parent queue
-    notEmptyQueues.clear();
+    // when the application finishes, root.a will become a parent queue on next
+    // config cleanup. The leaf queue will be created below it on reload of the
+    // config.
+    q.removeAssignedApp(appId);
     queueManager.removePendingIncompatibleQueues();
     queueManager.removeEmptyDynamicQueues();
     FSParentQueue p = queueManager.getParentQueue("root.a", false);
     assertNotNull("root.a does not exist", p);
+    queueManager.updateAllocationConfiguration(allocConf);
+    q = queueManager.getLeafQueue("root.a.b", false);
+    assertNotNull("root.a.b was not created", q);
+  }
+
+  /**
+   * Test to check multiple levels of parent queue removal.
+   */
+  @Test
+  public void testRemoveDeepHierarchy() {
+    // create a deeper queue hierarchy
+    FSLeafQueue q = queueManager.getLeafQueue("root.p1.p2.p3.leaf", true);
+    assertNotNull("root.p1.p2.p3.leaf does not exist", q);
+    assertTrue("root.p1.p2.p3.leaf is not empty", q.isEmpty());
+
+    // Add an application to make the queue not empty
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    q.addAssignedApp(appId);
+
+    // remove should not remove the queues
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    q = queueManager.getLeafQueue("root.p1.p2.p3.leaf", false);
+    assertNotNull("root.p1.p2.p3.leaf does not exist", q);
+
+    // Remove the application
+    q.removeAssignedApp(appId);
+    // Cleanup should remove the whole tree
+    queueManager.removeEmptyDynamicQueues();
+    q = queueManager.getLeafQueue("root.p1.p2.p3.leaf", false);
+    assertNull("root.p1.p2.p3.leaf does exist", q);
+    FSParentQueue p = queueManager.getParentQueue("root.p1", false);
+    assertNull("root.p1 does exist", p);
   }
 
+  /**
+   * Test the removal of queues when a parent is shared in the tree. First
+   * remove one branch then the second branch of the tree.
+   */
+  @Test
+  public void testRemoveSplitHierarchy()  {
+    // create a deeper queue hierarchy
+    FSLeafQueue leaf1 = queueManager.getLeafQueue("root.p1.p2-1.leaf-1", true);
+    assertNotNull("root.p1.p2-1.leaf-1 does not exist", leaf1);
+    assertTrue("root.p1.p2-1.leaf1 is not empty", leaf1.isEmpty());
+    // Create a split below the first level
+    FSLeafQueue leaf2 = queueManager.getLeafQueue("root.p1.p2-2.leaf-2", true);
+    assertNotNull("root.p1.p2-2.leaf2 does not exist", leaf2);
+    assertTrue("root.p1.p2-2.leaf2 is not empty", leaf2.isEmpty());
+
+    // Add an application to make one of the queues not empty
+    ApplicationId appId = ApplicationId.newInstance(0, 0);
+    leaf1.addAssignedApp(appId);
+
+    // remove should not remove the non empty split
+    queueManager.removePendingIncompatibleQueues();
+    queueManager.removeEmptyDynamicQueues();
+    leaf1 = queueManager.getLeafQueue("root.p1.p2-1.leaf-1", false);
+    assertNotNull("root.p1.p2-1.leaf-1 does not exist", leaf1);
+    leaf2 = queueManager.getLeafQueue("root.p1.p2-2.leaf-2", false);
+    assertNull("root.p1.p2-2.leaf2 does exist", leaf2);
+    FSParentQueue p = queueManager.getParentQueue("root.p1.p2-2", false);
+    assertNull("root.p1.p2-2 does exist", p);
+
+    // Remove the application
+    leaf1.removeAssignedApp(appId);
+    // Cleanup should remove the whole tree
+    queueManager.removeEmptyDynamicQueues();
+    leaf1 = queueManager.getLeafQueue("root.p1.p2-1.leaf-1", false);
+    assertNull("root.p1.p2-1.leaf-1 does exist", leaf1);
+    p = queueManager.getParentQueue("root.p1", false);
+    assertNull("root.p1 does exist", p);
+  }
 }