浏览代码

YARN-5556. CapacityScheduler: Support deleting queues without requiring a RM restart. (Naganarasimha G R via wangda)

Wangda Tan 8 年之前
父节点
当前提交
72054a817d

+ 1 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -90,8 +90,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContai
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
-
-
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
@@ -418,7 +416,7 @@ public class CapacityScheduler extends
       } catch (Throwable t) {
         this.conf = oldConf;
         refreshMaximumAllocation(this.conf.getMaximumAllocation());
-        throw new IOException("Failed to re-init queues", t);
+        throw new IOException("Failed to re-init queues : "+ t.getMessage(), t);
       }
 
       // update lazy preemption

+ 39 - 18
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java

@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import com.google.common.annotations.VisibleForTesting;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -27,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -34,6 +34,7 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.Permission;
@@ -45,6 +46,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  *
  * Context of the Queues in Capacity Scheduler.
@@ -164,11 +167,11 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
     CSQueue newRoot =  parseQueue(this.csContext, newConf, null,
         CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
 
-    // Ensure all existing queues are still present
-    validateExistingQueues(queues, newQueues);
+    // Ensure queue hiearchy in the new XML file is proper.
+    validateQueueHierarchy(queues, newQueues);
 
-    // Add new queues
-    addNewQueues(queues, newQueues);
+    // Add new queues and delete OldQeueus only after validation.
+    updateQueues(queues, newQueues);
 
     // Re-configure queues
     root.reinitialize(newRoot, this.csContext.getClusterResource());
@@ -261,13 +264,14 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
   }
 
   /**
-   * Ensure all existing queues are present. Queues cannot be deleted
+   * Ensure all existing queues are present. Queues cannot be deleted if its not
+   * in Stopped state, Queue's cannot be moved from one hierarchy to other also.
+   *
    * @param queues existing queues
    * @param newQueues new queues
    */
-  private void validateExistingQueues(
-      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
-      throws IOException {
+  private void validateQueueHierarchy(Map<String, CSQueue> queues,
+      Map<String, CSQueue> newQueues) throws IOException {
     // check that all static queues are included in the newQueues list
     for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
       if (!(e.getValue() instanceof ReservationQueue)) {
@@ -275,8 +279,18 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
         CSQueue oldQueue = e.getValue();
         CSQueue newQueue = newQueues.get(queueName);
         if (null == newQueue) {
-          throw new IOException(queueName + " cannot be found during refresh!");
+          // old queue doesn't exist in the new XML
+          if (oldQueue.getState() == QueueState.STOPPED) {
+            LOG.info("Deleting Queue " + queueName + ", as it is not"
+                + " present in the modified capacity configuration xml");
+          } else {
+            throw new IOException(oldQueue.getQueuePath() + " is deleted from"
+                + " the new capacity scheduler configuration, but the"
+                + " queue is not yet in stopped state. "
+                + "Current State : " + oldQueue.getState());
+          }
         } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
+          //Queue's cannot be moved from one hierarchy to other
           throw new IOException(queueName + " is moved from:"
               + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
               + " after refresh, which is not allowed.");
@@ -286,18 +300,25 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
   }
 
   /**
-   * Add the new queues (only) to our list of queues...
-   * ... be careful, do not overwrite existing queues.
-   * @param queues the existing queues
-   * @param newQueues the new queues
+   * Updates to our list of queues: Adds the new queues and deletes the removed
+   * ones... be careful, do not overwrite existing queues.
+   *
+   * @param existingQueues, the existing queues
+   * @param newQueues the new queues based on new XML
    */
-  private void addNewQueues(
-      Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) {
+  private void updateQueues(Map<String, CSQueue> existingQueues,
+      Map<String, CSQueue> newQueues) {
     for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
       String queueName = e.getKey();
       CSQueue queue = e.getValue();
-      if (!queues.containsKey(queueName)) {
-        queues.put(queueName, queue);
+      if (!existingQueues.containsKey(queueName)) {
+        existingQueues.put(queueName, queue);
+      }
+    }
+    for (Map.Entry<String, CSQueue> e : existingQueues.entrySet()) {
+      String queueName = e.getKey();
+      if (!newQueues.containsKey(queueName)) {
+        existingQueues.remove(queueName);
       }
     }
   }

+ 27 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -18,6 +18,18 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,9 +54,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
@@ -56,18 +73,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.Placeme
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
 @Private
 @Evolving
 public class ParentQueue extends AbstractCSQueue {
@@ -317,6 +322,14 @@ public class ParentQueue extends AbstractCSQueue {
         }
       }
 
+      // remove the deleted queue in the refreshed xml.
+      for (Map.Entry<String, CSQueue> e : currentChildQueues.entrySet()) {
+        String queueName = e.getKey();
+        if (!newChildQueues.containsKey(queueName)) {
+          currentChildQueues.remove(queueName);
+        }
+      }
+
       // Re-sort all queues
       childQueues.clear();
       childQueues.addAll(currentChildQueues.values());

+ 188 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
@@ -115,13 +116,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdate
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
-    ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-
-import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
@@ -131,6 +128,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
+    ContainerExpiredSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -144,6 +143,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
+import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@@ -395,6 +395,16 @@ public class TestCapacityScheduler {
     resourceManager.getResourceScheduler().handle(nodeUpdate);
   }
 
+  /**
+   * @param conf
+   * @return
+   *           root
+   *          /      \
+   *        a         b
+   *       / \     /  |  \
+   *      a1  a2  b1  b2 b3
+   *
+   */
   private CapacitySchedulerConfiguration setupQueueConfiguration(
       CapacitySchedulerConfiguration conf) {
 
@@ -423,6 +433,67 @@ public class TestCapacityScheduler {
     return conf;
   }
 
+  /**
+   * @param conf, to be modified
+   * @return, CS configuration which has deleted a queue(b1)
+   *           root
+   *          /     \
+   *        a        b
+   *       / \       | \
+   *      a1  a2    b2  b3
+   */
+  private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB1(
+      CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT,
+        new String[] { "a", "b" });
+
+    conf.setCapacity(A, A_CAPACITY);
+    conf.setCapacity(B, B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(A, new String[] { "a1", "a2" });
+    conf.setCapacity(A1, A1_CAPACITY);
+    conf.setUserLimitFactor(A1, 100.0f);
+    conf.setCapacity(A2, A2_CAPACITY);
+    conf.setUserLimitFactor(A2, 100.0f);
+
+    conf.setQueues(B, new String[] { "b2", "b3" });
+    conf.setCapacity(B2, B2_CAPACITY + B1_CAPACITY); //as B1 is deleted
+    conf.setUserLimitFactor(B2, 100.0f);
+    conf.setCapacity(B3, B3_CAPACITY);
+    conf.setUserLimitFactor(B3, 100.0f);
+
+    LOG.info("Setup top-level queues a and b (without b3)");
+    return conf;
+  }
+
+  /**
+   * @param conf, to be modified
+   * @return, CS configuration which has deleted a
+   *          Parent queue(b)
+   */
+  private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB(
+      CapacitySchedulerConfiguration conf) {
+
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" });
+
+    conf.setCapacity(A, A_CAPACITY + B_CAPACITY);
+
+    // Define 2nd-level queues
+    conf.setQueues(A, new String[] { "a1", "a2" });
+    conf.setCapacity(A1, A1_CAPACITY);
+    conf.setUserLimitFactor(A1, 100.0f);
+    conf.setCapacity(A2, A2_CAPACITY);
+    conf.setUserLimitFactor(A2, 100.0f);
+
+    LOG.info("Setup top-level queues a");
+    return conf;
+  }
+
+
   private CapacitySchedulerConfiguration setupBlockedQueueConfiguration(
       CapacitySchedulerConfiguration conf) {
 
@@ -3758,4 +3829,117 @@ public class TestCapacityScheduler {
     Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } },
         attemptMetrics.getLocalityStatistics());
   }
+
+  /**
+   * Test for queue deletion.
+   * @throws Exception
+   */
+  @Test
+  public void testRefreshQueuesWithQueueDelete() throws Exception {
+    CapacityScheduler cs = new CapacityScheduler();
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+    RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
+        null, new RMContainerTokenSecretManager(conf),
+        new NMTokenSecretManagerInRM(conf),
+        new ClientToAMTokenSecretManagerInRM(), null);
+    setupQueueConfiguration(conf);
+    cs.setConf(new YarnConfiguration());
+    cs.setRMContext(resourceManager.getRMContext());
+    cs.init(conf);
+    cs.start();
+    cs.reinitialize(conf, rmContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    // test delete leaf queue when there is application running.
+    Map<String, CSQueue> queues =
+        cs.getCapacitySchedulerQueueManager().getQueues();
+    String b1QTobeDeleted = "b1";
+    LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted));
+    when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
+        .thenReturn(QueueState.STOPPED);
+    queues.put(b1QTobeDeleted, csB1Queue);
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithOutB1(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to delete a"
+          + " queue with running apps");
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // test delete leaf queue(root.b.b1) when there is no application running.
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithOutB1(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      fail("Expected to NOT throw exception when refresh queue tries to delete"
+          + " a queue WITHOUT running apps");
+    }
+    CSQueue rootQueue = cs.getRootQueue();
+    CSQueue queueB = findQueue(rootQueue, B);
+    CSQueue queueB3 = findQueue(queueB, B1);
+    assertNull("Refresh needs to support delete of leaf queue ", queueB3);
+
+    // reset back to default configuration for testing parent queue delete
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfiguration(conf);
+    cs.reinitialize(conf, rmContext);
+    checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
+
+    // set the configurations such that it fails once but should be successfull
+    // next time
+    queues = cs.getCapacitySchedulerQueueManager().getQueues();
+    CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b"));
+    when(bQueue.getState()).thenReturn(QueueState.DRAINING)
+        .thenReturn(QueueState.STOPPED);
+    queues.put("b", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    queues.put("b1", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    queues.put("b2", bQueue);
+
+    bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
+    when(bQueue.getState()).thenReturn(QueueState.STOPPED);
+    queues.put("b3", bQueue);
+
+    // test delete Parent queue when there is application running.
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithOutB(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+      fail("Expected to throw exception when refresh queue tries to delete a"
+          + " parent queue with running apps in children queue");
+    } catch (IOException e) {
+      // ignore
+    }
+
+    // test delete Parent queue when there is no application running.
+    conf = new CapacitySchedulerConfiguration();
+    setupQueueConfigurationWithOutB(conf);
+    try {
+      cs.reinitialize(conf, mockContext);
+    } catch (IOException e) {
+      fail("Expected to not throw exception when refresh queue tries to delete"
+          + " a queue without running apps");
+    }
+    rootQueue = cs.getRootQueue();
+    queueB = findQueue(rootQueue, B);
+    String message =
+        "Refresh needs to support delete of Parent queue and its children.";
+    assertNull(message, queueB);
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
+    assertNull(message,
+        cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
+
+    cs.stop();
+  }
 }