Pārlūkot izejas kodu

YARN-4270. Limit application resource reservation on nodes for non-node/rack specific requests (asuresh)

Arun Suresh 9 gadi atpakaļ
vecāks
revīzija
7e2837f830

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -967,6 +967,9 @@ Release 2.8.0 - UNRELEASED
     YARN-4155. TestLogAggregationService.testLogAggregationServiceWithInterval failing
     (Bibin A Chundatt via stevel)
 
+    YARN-4270. Limit application resource reservation on nodes for non-node/rack
+    specific requests (asuresh)
+
 Release 2.7.2 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 89 - 14
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java

@@ -19,10 +19,12 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 
 import java.io.Serializable;
+import java.text.DecimalFormat;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -52,6 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -78,6 +81,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
   private RMContainerComparator comparator = new RMContainerComparator();
   private final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
 
+  // Used to record node reservation by an app.
+  // Key = RackName, Value = Set of Nodes reserved by app on rack
+  private Map<String, Set<String>> reservations = new HashMap<>();
+
   /**
    * Delay scheduling: We often want to prioritize scheduling of node-local
    * containers over rack-local or off-switch containers. To achieve this
@@ -446,22 +453,53 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
    * in {@link FSSchedulerNode}..
    */
   private void reserve(Priority priority, FSSchedulerNode node,
-      Container container, boolean alreadyReserved) {
-    LOG.info("Making reservation: node=" + node.getNodeName() +
-        " app_id=" + getApplicationId());
-
-    if (!alreadyReserved) {
-      getMetrics().reserveResource(getUser(), container.getResource());
-      RMContainer rmContainer =
-          super.reserve(node, priority, null, container);
-      node.reserveResource(this, priority, rmContainer);
-    } else {
-      RMContainer rmContainer = node.getReservedContainer();
-      super.reserve(node, priority, rmContainer, container);
-      node.reserveResource(this, priority, rmContainer);
+      Container container, NodeType type, boolean alreadyReserved) {
+
+    if (!reservationExceedsThreshold(node, type)) {
+      LOG.info("Making reservation: node=" + node.getNodeName() +
+              " app_id=" + getApplicationId());
+      if (!alreadyReserved) {
+        getMetrics().reserveResource(getUser(), container.getResource());
+        RMContainer rmContainer =
+                super.reserve(node, priority, null, container);
+        node.reserveResource(this, priority, rmContainer);
+        setReservation(node);
+      } else {
+        RMContainer rmContainer = node.getReservedContainer();
+        super.reserve(node, priority, rmContainer, container);
+        node.reserveResource(this, priority, rmContainer);
+        setReservation(node);
+      }
     }
   }
 
+  private boolean reservationExceedsThreshold(FSSchedulerNode node,
+                                                 NodeType type) {
+    // Only if not node-local
+    if (type != NodeType.NODE_LOCAL) {
+      int existingReservations = getNumReservations(node.getRackName(),
+              type == NodeType.OFF_SWITCH);
+      int totalAvailNodes =
+              (type == NodeType.OFF_SWITCH) ? scheduler.getNumClusterNodes() :
+                      scheduler.getNumNodesInRack(node.getRackName());
+      int numAllowedReservations =
+              (int)Math.ceil(
+                      totalAvailNodes * scheduler.getReservableNodesRatio());
+      if (existingReservations >= numAllowedReservations) {
+        DecimalFormat df = new DecimalFormat();
+        df.setMaximumFractionDigits(2);
+        LOG.info("Reservation Exceeds Allowed number of nodes:" +
+                " app_id=" + getApplicationId() +
+                " existingReservations=" + existingReservations +
+                " totalAvailableNodes=" + totalAvailNodes +
+                " reservableNodesRatio=" + df.format(
+                                        scheduler.getReservableNodesRatio()) +
+                " numAllowedReservations=" + numAllowedReservations);
+        return true;
+      }
+    }
+    return false;
+  }
   /**
    * Remove the reservation on {@code node} at the given {@link Priority}.
    * This dispatches SchedulerNode handlers as well.
@@ -470,10 +508,47 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     RMContainer rmContainer = node.getReservedContainer();
     unreserveInternal(priority, node);
     node.unreserveResource(this);
+    clearReservation(node);
     getMetrics().unreserveResource(
         getUser(), rmContainer.getContainer().getResource());
   }
 
+  private synchronized void setReservation(SchedulerNode node) {
+    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
+    Set<String> rackReservations = reservations.get(rackName);
+    if (rackReservations == null) {
+      rackReservations = new HashSet<>();
+      reservations.put(rackName, rackReservations);
+    }
+    rackReservations.add(node.getNodeName());
+  }
+
+  private synchronized void clearReservation(SchedulerNode node) {
+    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
+    Set<String> rackReservations = reservations.get(rackName);
+    if (rackReservations != null) {
+      rackReservations.remove(node.getNodeName());
+    }
+  }
+
+  int getNumReservations(String rackName, boolean isAny) {
+    int counter = 0;
+    if (isAny) {
+      for (Set<String> nodes : reservations.values()) {
+        if (nodes != null) {
+          counter += nodes.size();
+        }
+      }
+    } else {
+      Set<String> nodes = reservations.get(
+              rackName == null ? "NULL" : rackName);
+      if (nodes != null) {
+        counter += nodes.size();
+      }
+    }
+    return counter;
+  }
+
   /**
    * Assign a container to this node to facilitate {@code request}. If node does
    * not have enough memory, create a reservation. This is called once we are
@@ -545,7 +620,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
 
     if (isReservable(container)) {
       // The desired container won't fit here, so reserve
-      reserve(request.getPriority(), node, container, reserved);
+      reserve(request.getPriority(), node, container, type, reserved);
 
       return FairScheduler.CONTAINER_RESERVED;
     } else {

+ 35 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -26,6 +26,7 @@ import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -180,7 +181,13 @@ public class FairScheduler extends
   
   // Containers whose AMs have been warned that they will be preempted soon.
   private List<RMContainer> warnedContainers = new ArrayList<RMContainer>();
-  
+
+  private float reservableNodesRatio; // percentage of available nodes
+                                      // an app can be reserved on
+
+  // Count of number of nodes per rack
+  private Map<String, Integer> nodesPerRack = new ConcurrentHashMap<>();
+
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
   protected boolean continuousSchedulingEnabled; // Continuous Scheduling enabled or not
@@ -263,6 +270,14 @@ public class FairScheduler extends
     return conf;
   }
 
+  public int getNumNodesInRack(String rackName) {
+    String rName = rackName == null ? "NULL" : rackName;
+    if (nodesPerRack.containsKey(rName)) {
+      return nodesPerRack.get(rName);
+    }
+    return 0;
+  }
+
   public QueueManager getQueueManager() {
     return queueMgr;
   }
@@ -870,6 +885,12 @@ public class FairScheduler extends
       RMNode node) {
     FSSchedulerNode schedulerNode = new FSSchedulerNode(node, usePortForNodeName);
     nodes.put(node.getNodeID(), schedulerNode);
+    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
+    if (nodesPerRack.containsKey(rackName)) {
+      nodesPerRack.put(rackName, nodesPerRack.get(rackName) + 1);
+    } else {
+      nodesPerRack.put(rackName, 1);
+    }
     Resources.addTo(clusterResource, node.getTotalCapability());
     updateMaximumAllocation(schedulerNode, true);
 
@@ -916,6 +937,14 @@ public class FairScheduler extends
     }
 
     nodes.remove(rmNode.getNodeID());
+    String rackName = node.getRackName() == null ? "NULL" : node.getRackName();
+    if (nodesPerRack.containsKey(rackName)
+            && (nodesPerRack.get(rackName) > 0)) {
+      nodesPerRack.put(rackName, nodesPerRack.get(rackName) - 1);
+    } else {
+      LOG.error("Node [" + rmNode.getNodeAddress() + "] being removed from" +
+              " unknown rack [" + rackName + "] !!");
+    }
     queueMgr.getRootQueue().setSteadyFairShare(clusterResource);
     queueMgr.getRootQueue().recomputeSteadyShares();
     updateMaximumAllocation(node, false);
@@ -1367,6 +1396,7 @@ public class FairScheduler extends
       preemptionInterval = this.conf.getPreemptionInterval();
       waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
       usePortForNodeName = this.conf.getUsePortForNodeName();
+      reservableNodesRatio = this.conf.getReservableNodes();
 
       updateInterval = this.conf.getUpdateInterval();
       if (updateInterval < 0) {
@@ -1747,4 +1777,8 @@ public class FairScheduler extends
       SchedulerApplicationAttempt attempt) {
     // TODO Auto-generated method stub    
   }
+
+  public float getReservableNodesRatio() {
+    return reservableNodesRatio;
+  }
 }

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java

@@ -137,6 +137,11 @@ public class FairSchedulerConfiguration extends Configuration {
       CONF_PREFIX + "update-interval-ms";
   public static final int DEFAULT_UPDATE_INTERVAL_MS = 500;
 
+  /** Ratio of nodes available for an app to make an reservation on. */
+  public static final String RESERVABLE_NODES =
+          CONF_PREFIX + "reservable-nodes";
+  public static final float RESERVABLE_NODES_DEFAULT = 0.05f;
+
   public FairSchedulerConfiguration() {
     super();
   }
@@ -247,6 +252,10 @@ public class FairSchedulerConfiguration extends Configuration {
         YarnConfiguration.DEFAULT_RM_SCHEDULER_USE_PORT_FOR_NODE_NAME);
   }
 
+  public float getReservableNodes() {
+    return getFloat(RESERVABLE_NODES, RESERVABLE_NODES_DEFAULT);
+  }
+
   /**
    * Parses a resource config value of a form like "1024", "1024 mb",
    * or "1024 mb, 3 vcores". If no units are given, megabytes are assumed.

+ 192 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java

@@ -760,6 +760,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
     // Now queue 2 requests likewise
     ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user1", 1);
+
     scheduler.update();
     scheduler.handle(updateEvent);
 
@@ -789,6 +790,197 @@ public class TestFairScheduler extends FairSchedulerTestBase {
 
   }
 
+  @Test (timeout = 5000)
+  public void testOffSwitchAppReservationThreshold() throws Exception {
+    conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add three node
+    RMNode node1 =
+            MockNodes
+                    .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    RMNode node2 =
+            MockNodes
+                    .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+            MockNodes
+                    .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+
+    // Ensure capacity on all nodes are allocated
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node3));
+
+    // Verify capacity allocation
+    assertEquals(6144, scheduler.getQueueManager().getQueue("queue1").
+            getResourceUsage().getMemory());
+
+    // Create new app with a resource request that can be satisfied by any
+    // node but would be
+    ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+
+    assertEquals(1,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    assertEquals(2,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node3));
+
+    // No new reservations should happen since it exceeds threshold
+    assertEquals(2,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+
+    // Add 1 more node
+    RMNode node4 =
+            MockNodes
+                    .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.4");
+    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+    scheduler.handle(nodeEvent4);
+
+    // New node satisfies resource request
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node4));
+    assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
+            getResourceUsage().getMemory());
+
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    scheduler.handle(new NodeUpdateSchedulerEvent(node3));
+    scheduler.update();
+
+    // Verify number of reservations have decremented
+    assertEquals(0,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+  }
+
+  @Test (timeout = 5000)
+  public void testRackLocalAppReservationThreshold() throws Exception {
+    conf.setFloat(FairSchedulerConfiguration.RESERVABLE_NODES, 0.50f);
+    scheduler.init(conf);
+    scheduler.start();
+    scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+    // Add four node
+    RMNode node1 =
+            MockNodes
+                    .newNodeInfo(1, Resources.createResource(3072), 1, "127.0.0.1");
+    NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+    scheduler.handle(nodeEvent1);
+
+    // These 3 on different rack
+    RMNode node2 =
+            MockNodes
+                    .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.2");
+    NodeAddedSchedulerEvent nodeEvent2 = new NodeAddedSchedulerEvent(node2);
+    scheduler.handle(nodeEvent2);
+
+    RMNode node3 =
+            MockNodes
+                    .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.3");
+    NodeAddedSchedulerEvent nodeEvent3 = new NodeAddedSchedulerEvent(node3);
+    scheduler.handle(nodeEvent3);
+
+    RMNode node4 =
+            MockNodes
+                    .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.4");
+    NodeAddedSchedulerEvent nodeEvent4 = new NodeAddedSchedulerEvent(node4);
+    scheduler.handle(nodeEvent4);
+
+    // Ensure capacity on all nodes are allocated
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node3));
+    createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node4));
+
+    // Verify capacity allocation
+    assertEquals(8192, scheduler.getQueueManager().getQueue("queue1").
+            getResourceUsage().getMemory());
+
+    // Create new app with a resource request that can be satisfied by any
+    // node but would be
+    ApplicationAttemptId attemptId =
+            createAppAttemptId(this.APP_ID++, this.ATTEMPT_ID++);
+    createMockRMApp(attemptId);
+
+    scheduler.addApplication(attemptId.getApplicationId(), "queue1", "user1",
+            false);
+    scheduler.addApplicationAttempt(attemptId, false, false);
+    List<ResourceRequest> asks = new ArrayList<ResourceRequest>();
+    asks.add(createResourceRequest(2048, node2.getRackName(), 1, 1, false));
+
+    scheduler.allocate(attemptId, asks, new ArrayList<ContainerId>(), null,
+            null, null, null);
+
+    ApplicationAttemptId attId = createSchedulingRequest(2048, "queue1", "user1", 1);
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+
+    assertEquals(1,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    assertEquals(2,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node3));
+
+    // No new reservations should happen since it exceeds threshold
+    assertEquals(2,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+
+    // Add 1 more node
+    RMNode node5 =
+            MockNodes
+                    .newNodeInfo(2, Resources.createResource(3072), 1, "127.0.0.4");
+    NodeAddedSchedulerEvent nodeEvent5 = new NodeAddedSchedulerEvent(node5);
+    scheduler.handle(nodeEvent5);
+
+    // New node satisfies resource request
+    scheduler.update();
+    scheduler.handle(new NodeUpdateSchedulerEvent(node4));
+    assertEquals(10240, scheduler.getQueueManager().getQueue("queue1").
+            getResourceUsage().getMemory());
+
+    scheduler.handle(new NodeUpdateSchedulerEvent(node1));
+    scheduler.handle(new NodeUpdateSchedulerEvent(node2));
+    scheduler.handle(new NodeUpdateSchedulerEvent(node3));
+    scheduler.handle(new NodeUpdateSchedulerEvent(node4));
+    scheduler.update();
+
+    // Verify number of reservations have decremented
+    assertEquals(0,
+            scheduler.getSchedulerApp(attId).getNumReservations(null, true));
+  }
+
   @Test (timeout = 500000)
   public void testContainerReservationAttemptExceedingQueueMax()
       throws Exception {