Browse Source

YARN-6741. Deleting all children of a Parent Queue on refresh throws exception. Contributed by Naganarasimha G R.

bibinchundatt 7 years ago
parent
commit
d8f74c3964

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java

@@ -327,6 +327,10 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
                 + "it is not yet in stopped state. Current State : "
                 + "it is not yet in stopped state. Current State : "
                 + oldQueue.getState());
                 + oldQueue.getState());
           }
           }
+        } else if (oldQueue instanceof ParentQueue
+            && newQueue instanceof LeafQueue) {
+          LOG.info("Converting the parent queue: " + oldQueue.getQueuePath()
+              + " to leaf queue.");
         }
         }
       }
       }
     }
     }

+ 20 - 19
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -18,6 +18,14 @@
 
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
@@ -34,7 +42,6 @@ import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.AccessType;
@@ -45,7 +52,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerStat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
@@ -62,14 +68,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
 @Private
 @Private
 @Evolving
 @Evolving
 public class ParentQueue extends AbstractCSQueue {
 public class ParentQueue extends AbstractCSQueue {
@@ -315,18 +313,21 @@ public class ParentQueue extends AbstractCSQueue {
 
 
         // Check if the child-queue already exists
         // Check if the child-queue already exists
         if (childQueue != null) {
         if (childQueue != null) {
-          // Check if the child-queue has been converted into parent queue.
-          // The CS has already checked to ensure that this child-queue is in
-          // STOPPED state.
-          if (childQueue instanceof LeafQueue
-              && newChildQueue instanceof ParentQueue) {
-            // We would convert this LeafQueue to ParentQueue, consider this
-            // as the combination of DELETE then ADD.
+          // Check if the child-queue has been converted into parent queue or
+          // parent Queue has been converted to child queue. The CS has already
+          // checked to ensure that this child-queue is in STOPPED state if
+          // Child queue has been converted to ParentQueue.
+          if ((childQueue instanceof LeafQueue
+              && newChildQueue instanceof ParentQueue)
+              || (childQueue instanceof ParentQueue
+                  && newChildQueue instanceof LeafQueue)) {
+            // We would convert this LeafQueue to ParentQueue, or vice versa.
+            // consider this as the combination of DELETE then ADD.
             newChildQueue.setParent(this);
             newChildQueue.setParent(this);
             currentChildQueues.put(newChildQueueName, newChildQueue);
             currentChildQueues.put(newChildQueueName, newChildQueue);
             // inform CapacitySchedulerQueueManager
             // inform CapacitySchedulerQueueManager
-            CapacitySchedulerQueueManager queueManager = this.csContext
-                .getCapacitySchedulerQueueManager();
+            CapacitySchedulerQueueManager queueManager =
+                this.csContext.getCapacitySchedulerQueueManager();
             queueManager.addQueue(newChildQueueName, newChildQueue);
             queueManager.addQueue(newChildQueueName, newChildQueue);
             continue;
             continue;
           }
           }

+ 113 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
@@ -42,7 +43,6 @@ import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.CyclicBarrier;
 
 
-import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -167,6 +167,7 @@ import org.junit.Before;
 import org.junit.Test;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.Mockito;
 
 
+import com.google.common.base.Supplier;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import com.google.common.collect.Sets;
@@ -665,6 +666,36 @@ public class TestCapacityScheduler {
     return conf;
     return conf;
   }
   }
 
 
+  /**
+   * @param conf, to be modified
+   * @return, CS configuration which has deleted all childred of queue(b)
+   *           root
+   *          /     \
+   *        a        b
+   *       / \
+   *      a1  a2
+   */
+  private CapacitySchedulerConfiguration setupQueueConfWithOutChildrenOfB(
+      CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] {"a","b"});
+
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(A, new String[] {"a1","a2"});
+    conf.setCapacity(A1, A1_CAPACITY);
+    conf.setUserLimitFactor(A1, 100.0f);
+    conf.setCapacity(A2, A2_CAPACITY);
+    conf.setUserLimitFactor(A2, 100.0f);
+
+    LOG.info("Setup top-level queues a and b (without children)");
+    return conf;
+  }
+
   /**
   /**
    * @param conf, to be modified
    * @param conf, to be modified
    * @return, CS configuration which has deleted a queue(b1)
    * @return, CS configuration which has deleted a queue(b1)
@@ -4643,6 +4674,10 @@ public class TestCapacityScheduler {
     try {
     try {
       cs.reinitialize(conf, mockContext);
       cs.reinitialize(conf, mockContext);
     } catch (IOException e) {
     } catch (IOException e) {
+      LOG.error(
+          "Expected to NOT throw exception when refresh queue tries to delete"
+              + " a queue WITHOUT running apps",
+          e);
       fail("Expected to NOT throw exception when refresh queue tries to delete"
       fail("Expected to NOT throw exception when refresh queue tries to delete"
           + " a queue WITHOUT running apps");
           + " a queue WITHOUT running apps");
     }
     }
@@ -4712,6 +4747,83 @@ public class TestCapacityScheduler {
     cs.stop();
     cs.stop();
   }
   }
 
 
+  /**
+   * Test for all child queue deletion and thus making parent queue a child.
+   * @throws Exception
+   */
+  @Test
+  public void testRefreshQueuesWithAllChildQueuesDeleted() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    // test delete all leaf queues when there is no application running.
+    Map<String, CSQueue> queues =
+        cs.getCapacitySchedulerQueueManager().getQueues();
+
+    CSQueue bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
+    when(bQueue.getState()).thenReturn(QueueState.RUNNING)
+        .thenReturn(QueueState.STOPPED);
+    queues.put("b1", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    queues.put("b2", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    queues.put("b3", bQueue);
+
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfWithOutChildrenOfB(conf);
+
+    // test convert parent queue to leaf queue(root.b) when there is no
+    // application running.
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to make parent"
+          + " queue a child queue when one of its children is still running.");
+    } catch (IOException e) {
+      //do not do anything, expected exception
+    }
+
+    // test delete leaf queues(root.b.b1,b2,b3) when there is no application
+    // running.
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      e.printStackTrace();
+      fail("Expected to NOT throw exception when refresh queue tries to delete"
+          + " all children of a parent queue(without running apps).");
+    }
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueB = findQueue(rootQueue, B);
+    assertNotNull("Parent Queue B should not be deleted", queueB);
+    Assert.assertTrue("As Queue'B children are not deleted",
+        queueB instanceof LeafQueue);
+
+    String message =
+        "Refresh needs to support delete of all children of Parent queue.";
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b3"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
+
+    cs.stop();
+  }
+
   /**
   /**
    * Test if we can convert a leaf queue to a parent queue
    * Test if we can convert a leaf queue to a parent queue
    * @throws Exception
    * @throws Exception