Parcourir la source

MAPREDUCE-3641. Making CapacityScheduler more conservative so as to assign only one off-switch container in a single scheduling iteration. Contributed by Arun C Murthy.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1232182 13f79535-47bb-0310-9956-ffa450edef68
Vinod Kumar Vavilapalli il y a 13 ans
Parent
commit
4a343c9d4a

+ 4 - 0
hadoop-mapreduce-project/CHANGES.txt

@@ -183,6 +183,10 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3553. Add support for data returned when exceptions thrown from web 
     MAPREDUCE-3553. Add support for data returned when exceptions thrown from web 
     service apis to be in either xml or in JSON. (Thomas Graves via mahadev)
     service apis to be in either xml or in JSON. (Thomas Graves via mahadev)
 
 
+    MAPREDUCE-3641. Making CapacityScheduler more conservative so as to 
+    assign only one off-switch container in a single scheduling
+    iteration. (Arun C Murthy via vinodkv)
+
   OPTIMIZATIONS
   OPTIMIZATIONS
 
 
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
     MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar

+ 19 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java

@@ -26,8 +26,26 @@ import org.apache.hadoop.yarn.util.Records;
 @Private
 @Private
 @Evolving
 @Evolving
 public class Resources {
 public class Resources {
+  
   // Java doesn't have const :(
   // Java doesn't have const :(
-  private static final Resource NONE = createResource(0);
+  private static final Resource NONE = new Resource() {
+
+    @Override
+    public int getMemory() {
+      return 0;
+    }
+
+    @Override
+    public void setMemory(int memory) {
+      throw new RuntimeException("NONE cannot be modified!");
+    }
+
+    @Override
+    public int compareTo(Resource o) {
+      return (0 - o.getMemory());
+    }
+    
+  };
 
 
   public static Resource createResource(int memory) {
   public static Resource createResource(int memory) {
     Resource resource = Records.newRecord(Resource.class);
     Resource resource = Records.newRecord(Resource.class);
@@ -36,7 +54,6 @@ public class Resources {
   }
   }
 
 
   public static Resource none() {
   public static Resource none() {
-    assert NONE.getMemory() == 0 : "NONE should be empty";
     return NONE;
     return NONE;
   }
   }
 
 

+ 48 - 0
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java

@@ -0,0 +1,48 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
+
+@Private
+@Unstable
+public class CSAssignment {
+  final private Resource resource;
+  final private NodeType type;
+  
+  public CSAssignment(Resource resource, NodeType type) {
+    this.resource = resource;
+    this.type = type;
+  }
+
+  public Resource getResource() {
+    return resource;
+  }
+
+  public NodeType getType() {
+    return type;
+  }
+  
+  @Override
+  public String toString() {
+    return resource.getMemory() + ":" + type;
+  }
+}

+ 3 - 2
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java

@@ -155,9 +155,10 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * Assign containers to applications in the queue or it's children (if any).
    * Assign containers to applications in the queue or it's children (if any).
    * @param clusterResource the resource of the cluster.
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
    * @param node node on which resources are available
-   * @return the resource that is being assigned.
+   * @return the assignment
    */
    */
-  public Resource assignContainers(Resource clusterResource, SchedulerNode node);
+  public CSAssignment assignContainers(
+      Resource clusterResource, SchedulerNode node);
   
   
   /**
   /**
    * A container assigned to the queue has completed.
    * A container assigned to the queue has completed.

+ 29 - 23
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -19,7 +19,6 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 
 import java.io.IOException;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Collections;
@@ -35,7 +34,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -703,8 +701,11 @@ public class LeafQueue implements CSQueue {
     return applicationsMap.get(applicationAttemptId);
     return applicationsMap.get(applicationAttemptId);
   }
   }
 
 
+  private static final CSAssignment NULL_ASSIGNMENT =
+      new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
+  
   @Override
   @Override
-  public synchronized Resource 
+  public synchronized CSAssignment 
   assignContainers(Resource clusterResource, SchedulerNode node) {
   assignContainers(Resource clusterResource, SchedulerNode node) {
 
 
     if(LOG.isDebugEnabled()) {
     if(LOG.isDebugEnabled()) {
@@ -717,8 +718,11 @@ public class LeafQueue implements CSQueue {
     if (reservedContainer != null) {
     if (reservedContainer != null) {
       SchedulerApp application = 
       SchedulerApp application = 
           getApplication(reservedContainer.getApplicationAttemptId());
           getApplication(reservedContainer.getApplicationAttemptId());
-      return assignReservedContainer(application, node, reservedContainer, 
-          clusterResource);
+      return new CSAssignment(
+          assignReservedContainer(application, node, reservedContainer, 
+              clusterResource),
+          NodeType.NODE_LOCAL); // Don't care about locality constraints 
+                                // for reserved containers
     }
     }
     
     
     // Try to assign containers to applications in order
     // Try to assign containers to applications in order
@@ -746,7 +750,7 @@ public class LeafQueue implements CSQueue {
           // Are we going over limits by allocating to this application?
           // Are we going over limits by allocating to this application?
           // Maximum Capacity of the queue
           // Maximum Capacity of the queue
           if (!assignToQueue(clusterResource, required)) {
           if (!assignToQueue(clusterResource, required)) {
-            return Resources.none();
+            return NULL_ASSIGNMENT;
           }
           }
 
 
           // User limits
           // User limits
@@ -760,24 +764,23 @@ public class LeafQueue implements CSQueue {
           application.addSchedulingOpportunity(priority);
           application.addSchedulingOpportunity(priority);
           
           
           // Try to schedule
           // Try to schedule
-          Resource assigned = 
+          CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
             assignContainersOnNode(clusterResource, node, application, priority, 
                 null);
                 null);
-  
+          
+          Resource assigned = assignment.getResource();
+            
           // Did we schedule or reserve a container?
           // Did we schedule or reserve a container?
           if (Resources.greaterThan(assigned, Resources.none())) {
           if (Resources.greaterThan(assigned, Resources.none())) {
-            Resource assignedResource = 
-              application.getResourceRequest(priority, RMNode.ANY).getCapability();
 
 
             // Book-keeping
             // Book-keeping
-            allocateResource(clusterResource, 
-                application, assignedResource);
+            allocateResource(clusterResource, application, assigned);
             
             
             // Reset scheduling opportunities
             // Reset scheduling opportunities
             application.resetSchedulingOpportunities(priority);
             application.resetSchedulingOpportunities(priority);
             
             
             // Done
             // Done
-            return assignedResource; 
+            return assignment;
           } else {
           } else {
             // Do not assign out of order w.r.t priorities
             // Do not assign out of order w.r.t priorities
             break;
             break;
@@ -792,7 +795,7 @@ public class LeafQueue implements CSQueue {
       application.showRequests();
       application.showRequests();
     }
     }
   
   
-    return Resources.none();
+    return NULL_ASSIGNMENT;
 
 
   }
   }
 
 
@@ -809,11 +812,12 @@ public class LeafQueue implements CSQueue {
               container.getId(), 
               container.getId(), 
               SchedulerUtils.UNRESERVED_CONTAINER), 
               SchedulerUtils.UNRESERVED_CONTAINER), 
           RMContainerEventType.RELEASED);
           RMContainerEventType.RELEASED);
-      return container.getResource();
+      return container.getResource(); // Ugh, return resource to force re-sort
     }
     }
 
 
     // Try to assign if we have sufficient resources
     // Try to assign if we have sufficient resources
-    assignContainersOnNode(clusterResource, node, application, priority, rmContainer);
+    assignContainersOnNode(clusterResource, node, application, priority, 
+        rmContainer);
     
     
     // Doesn't matter... since it's already charged for at time of reservation
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
     // "re-reservation" is *free*
@@ -966,7 +970,7 @@ public class LeafQueue implements CSQueue {
     return (((starvation + requiredContainers) - reservedContainers) > 0);
     return (((starvation + requiredContainers) - reservedContainers) > 0);
   }
   }
 
 
-  private Resource assignContainersOnNode(Resource clusterResource, 
+  private CSAssignment assignContainersOnNode(Resource clusterResource, 
       SchedulerNode node, SchedulerApp application, 
       SchedulerNode node, SchedulerApp application, 
       Priority priority, RMContainer reservedContainer) {
       Priority priority, RMContainer reservedContainer) {
 
 
@@ -977,7 +981,7 @@ public class LeafQueue implements CSQueue {
         assignNodeLocalContainers(clusterResource, node, application, priority,
         assignNodeLocalContainers(clusterResource, node, application, priority,
             reservedContainer); 
             reservedContainer); 
     if (Resources.greaterThan(assigned, Resources.none())) {
     if (Resources.greaterThan(assigned, Resources.none())) {
-      return assigned;
+      return new CSAssignment(assigned, NodeType.NODE_LOCAL);
     }
     }
 
 
     // Rack-local
     // Rack-local
@@ -985,12 +989,14 @@ public class LeafQueue implements CSQueue {
         assignRackLocalContainers(clusterResource, node, application, priority, 
         assignRackLocalContainers(clusterResource, node, application, priority, 
             reservedContainer);
             reservedContainer);
     if (Resources.greaterThan(assigned, Resources.none())) {
     if (Resources.greaterThan(assigned, Resources.none())) {
-      return assigned;
+      return new CSAssignment(assigned, NodeType.RACK_LOCAL);
     }
     }
     
     
     // Off-switch
     // Off-switch
-    return assignOffSwitchContainers(clusterResource, node, application, 
-        priority, reservedContainer);
+    return new CSAssignment(
+        assignOffSwitchContainers(clusterResource, node, application, 
+            priority, reservedContainer), 
+        NodeType.OFF_SWITCH);
   }
   }
 
 
   private Resource assignNodeLocalContainers(Resource clusterResource, 
   private Resource assignNodeLocalContainers(Resource clusterResource, 
@@ -1272,7 +1278,7 @@ public class LeafQueue implements CSQueue {
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
     LOG.info(getQueueName() + 
     LOG.info(getQueueName() + 
         " used=" + usedResources + " numContainers=" + numContainers + 
         " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " resources=" + user.getConsumedResources());
+        " user=" + userName + " user-resources=" + user.getConsumedResources());
   }
   }
 
 
   synchronized void releaseResource(Resource clusterResource, 
   synchronized void releaseResource(Resource clusterResource, 
@@ -1290,7 +1296,7 @@ public class LeafQueue implements CSQueue {
       
       
     LOG.info(getQueueName() + 
     LOG.info(getQueueName() + 
         " used=" + usedResources + " numContainers=" + numContainers + 
         " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " resources=" + user.getConsumedResources());
+        " user=" + userName + " user-resources=" + user.getConsumedResources());
   }
   }
 
 
   @Override
   @Override

+ 36 - 18
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java

@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
@@ -500,10 +501,12 @@ public class ParentQueue implements CSQueue {
   }
   }
 
 
   @Override
   @Override
-  public synchronized Resource assignContainers(
+  public synchronized CSAssignment assignContainers(
       Resource clusterResource, SchedulerNode node) {
       Resource clusterResource, SchedulerNode node) {
-    Resource assigned = Resources.createResource(0);
-
+    CSAssignment assignment = 
+        new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
+    boolean assignedOffSwitch = false;
+    
     while (canAssign(node)) {
     while (canAssign(node)) {
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign containers to child-queue of "
         LOG.debug("Trying to assign containers to child-queue of "
@@ -516,16 +519,18 @@ public class ParentQueue implements CSQueue {
       }
       }
       
       
       // Schedule
       // Schedule
-      Resource assignedToChild = 
+      CSAssignment assignedToChild = 
           assignContainersToChildQueues(clusterResource, node);
           assignContainersToChildQueues(clusterResource, node);
+      assignedOffSwitch = (assignedToChild.getType() == NodeType.OFF_SWITCH);
       
       
       // Done if no child-queue assigned anything
       // Done if no child-queue assigned anything
-      if (Resources.greaterThan(assignedToChild, Resources.none())) {
+      if (Resources.greaterThan(assignedToChild.getResource(), 
+              Resources.none())) {
         // Track resource utilization for the parent-queue
         // Track resource utilization for the parent-queue
-        allocateResource(clusterResource, assignedToChild);
+        allocateResource(clusterResource, assignedToChild.getResource());
         
         
         // Track resource utilization in this pass of the scheduler
         // Track resource utilization in this pass of the scheduler
-        Resources.addTo(assigned, assignedToChild);
+        Resources.addTo(assignment.getResource(), assignedToChild.getResource());
         
         
         LOG.info("assignedContainer" +
         LOG.info("assignedContainer" +
             " queue=" + getQueueName() + 
             " queue=" + getQueueName() + 
@@ -539,17 +544,26 @@ public class ParentQueue implements CSQueue {
 
 
       if (LOG.isDebugEnabled()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("ParentQ=" + getQueueName()
         LOG.debug("ParentQ=" + getQueueName()
-          + " assignedSoFarInThisIteration=" + assigned
+          + " assignedSoFarInThisIteration=" + assignment.getResource()
           + " utilization=" + getUtilization());
           + " utilization=" + getUtilization());
       }
       }
 
 
       // Do not assign more than one container if this isn't the root queue
       // Do not assign more than one container if this isn't the root queue
-      if (!rootQueue) {
+      // or if we've already assigned an off-switch container
+      if (rootQueue) {
+        if (assignedOffSwitch) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Not assigning more than one off-switch container," +
+            		" assignments so far: " + assignment);
+          }
+          break;
+        }
+      } else {
         break;
         break;
       }
       }
     } 
     } 
     
     
-    return assigned;
+    return assignment;
   }
   }
 
 
   private synchronized boolean assignToQueue(Resource clusterResource) {
   private synchronized boolean assignToQueue(Resource clusterResource) {
@@ -573,9 +587,10 @@ public class ParentQueue implements CSQueue {
                                      minimumAllocation);
                                      minimumAllocation);
   }
   }
   
   
-  synchronized Resource assignContainersToChildQueues(Resource cluster, 
+  synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
       SchedulerNode node) {
       SchedulerNode node) {
-    Resource assigned = Resources.createResource(0);
+    CSAssignment assignment = 
+        new CSAssignment(Resources.createResource(0), NodeType.NODE_LOCAL);
     
     
     printChildQueues();
     printChildQueues();
 
 
@@ -586,25 +601,28 @@ public class ParentQueue implements CSQueue {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
           + " stats: " + childQueue);
           + " stats: " + childQueue);
       }
       }
-      assigned = childQueue.assignContainers(cluster, node);
+      assignment = childQueue.assignContainers(cluster, node);
       if(LOG.isDebugEnabled()) {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Assignedto queue: " + childQueue.getQueuePath()
-          + " stats: " + childQueue + " --> " + assigned.getMemory());
+        LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
+          " stats: " + childQueue + " --> " + 
+          assignment.getResource().getMemory() + ", " + assignment.getType());
       }
       }
 
 
       // If we do assign, remove the queue and re-insert in-order to re-sort
       // If we do assign, remove the queue and re-insert in-order to re-sort
-      if (Resources.greaterThan(assigned, Resources.none())) {
+      if (Resources.greaterThan(assignment.getResource(), Resources.none())) {
         // Remove and re-insert to sort
         // Remove and re-insert to sort
         iter.remove();
         iter.remove();
         LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + 
         LOG.info("Re-sorting queues since queue: " + childQueue.getQueuePath() + 
             " stats: " + childQueue);
             " stats: " + childQueue);
         childQueues.add(childQueue);
         childQueues.add(childQueue);
-        printChildQueues();
+        if (LOG.isDebugEnabled()) {
+          printChildQueues();
+        }
         break;
         break;
       }
       }
     }
     }
     
     
-    return assigned;
+    return assignment;
   }
   }
 
 
   String getChildQueuesToPrint() {
   String getChildQueuesToPrint() {

+ 15 - 7
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -811,49 +811,56 @@ public class TestLeafQueue {
     app_0.updateResourceRequests(app_0_requests_0);
     app_0.updateResourceRequests(app_0_requests_0);
 
 
     // Start testing...
     // Start testing...
+    CSAssignment assignment = null;
     
     
     // Start with off switch, shouldn't allocate due to delay scheduling
     // Start with off switch, shouldn't allocate due to delay scheduling
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
 
     // Another off switch, shouldn't allocate due to delay scheduling
     // Another off switch, shouldn't allocate due to delay scheduling
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     
     // Another off switch, shouldn't allocate due to delay scheduling
     // Another off switch, shouldn't allocate due to delay scheduling
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
     assertEquals(3, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
     
     
     // Another off switch, now we should allocate 
     // Another off switch, now we should allocate 
     // since missedOpportunities=3 and reqdContainers=3
     // since missedOpportunities=3 and reqdContainers=3
-    a.assignContainers(clusterResource, node_2);
+    assignment = a.assignContainers(clusterResource, node_2);
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(2, app_0.getTotalRequiredResources(priority));
     assertEquals(2, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.OFF_SWITCH, assignment.getType());
     
     
     // NODE_LOCAL - node_0
     // NODE_LOCAL - node_0
-    a.assignContainers(clusterResource, node_0);
+    assignment = a.assignContainers(clusterResource, node_0);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(1, app_0.getTotalRequiredResources(priority));
     assertEquals(1, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     
     // NODE_LOCAL - node_1
     // NODE_LOCAL - node_1
-    a.assignContainers(clusterResource, node_1);
+    assignment = a.assignContainers(clusterResource, node_1);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getTotalRequiredResources(priority));
     assertEquals(0, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.NODE_LOCAL, assignment.getType());
     
     
     // Add 1 more request to check for RACK_LOCAL
     // Add 1 more request to check for RACK_LOCAL
     app_0_requests_0.clear();
     app_0_requests_0.clear();
@@ -872,11 +879,12 @@ public class TestLeafQueue {
     String host_3 = "host_3"; // on rack_1
     String host_3 = "host_3"; // on rack_1
     SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
     SchedulerNode node_3 = TestUtils.getMockNode(host_3, rack_1, 0, 8*GB);
     
     
-    a.assignContainers(clusterResource, node_3);
+    assignment = a.assignContainers(clusterResource, node_3);
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
     assertEquals(0, app_0.getTotalRequiredResources(priority));
     assertEquals(0, app_0.getTotalRequiredResources(priority));
+    assertEquals(NodeType.RACK_LOCAL, assignment.getType());
   }
   }
   
   
   @Test
   @Test

+ 85 - 5
hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

@@ -29,6 +29,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 
 
@@ -92,11 +93,18 @@ public class TestParentQueue {
   private void stubQueueAllocation(final CSQueue queue, 
   private void stubQueueAllocation(final CSQueue queue, 
       final Resource clusterResource, final SchedulerNode node, 
       final Resource clusterResource, final SchedulerNode node, 
       final int allocation) {
       final int allocation) {
+    stubQueueAllocation(queue, clusterResource, node, allocation, 
+        NodeType.NODE_LOCAL);
+  }
+  
+  private void stubQueueAllocation(final CSQueue queue, 
+      final Resource clusterResource, final SchedulerNode node, 
+      final int allocation, final NodeType type) {
     
     
     // Simulate the queue allocation
     // Simulate the queue allocation
-    doAnswer(new Answer<Resource>() {
+    doAnswer(new Answer<CSAssignment>() {
       @Override
       @Override
-      public Resource answer(InvocationOnMock invocation) throws Throwable {
+      public CSAssignment answer(InvocationOnMock invocation) throws Throwable {
         try {
         try {
           throw new Exception();
           throw new Exception();
         } catch (Exception e) {
         } catch (Exception e) {
@@ -115,8 +123,8 @@ public class TestParentQueue {
         
         
         // Next call - nothing
         // Next call - nothing
         if (allocation > 0) {
         if (allocation > 0) {
-          doReturn(Resources.none()).when(queue).assignContainers(
-              eq(clusterResource), eq(node));
+          doReturn(new CSAssignment(Resources.none(), type)).
+            when(queue).assignContainers(eq(clusterResource), eq(node));
 
 
           // Mock the node's resource availability
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
           Resource available = node.getAvailableResource();
@@ -124,7 +132,7 @@ public class TestParentQueue {
           when(node).getAvailableResource();
           when(node).getAvailableResource();
         }
         }
 
 
-        return allocatedResource;
+        return new CSAssignment(allocatedResource, type);
       }
       }
     }).
     }).
     when(queue).assignContainers(eq(clusterResource), eq(node));
     when(queue).assignContainers(eq(clusterResource), eq(node));
@@ -401,6 +409,78 @@ public class TestParentQueue {
     
     
   }
   }
   
   
+  @Test
+  public void testOffSwitchScheduling() throws Exception {
+    // Setup queue configs
+    setupSingleLevelQueues(csConf);
+    
+    Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
+    CSQueue root = 
+        CapacityScheduler.parseQueue(csContext, csConf, null, 
+            CapacitySchedulerConfiguration.ROOT, queues, queues, 
+            CapacityScheduler.queueComparator, 
+            CapacityScheduler.applicationComparator,
+            TestUtils.spyHook);
+
+    // Setup some nodes
+    final int memoryPerNode = 10;
+    final int numNodes = 2;
+    
+    SchedulerNode node_0 = 
+        TestUtils.getMockNode("host_0", DEFAULT_RACK, 0, memoryPerNode*GB);
+    SchedulerNode node_1 = 
+        TestUtils.getMockNode("host_1", DEFAULT_RACK, 0, memoryPerNode*GB);
+    
+    final Resource clusterResource = 
+        Resources.createResource(numNodes * (memoryPerNode*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Start testing
+    LeafQueue a = (LeafQueue)queues.get(A);
+    LeafQueue b = (LeafQueue)queues.get(B);
+    final float delta = 0.0001f;
+    
+    // Simulate B returning a container on node_0
+    stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
+    stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
+    root.assignContainers(clusterResource, node_0);
+    assertEquals(0.0f, a.getUtilization(), delta);
+    assertEquals(computeQueueUtilization(b, 1*GB, clusterResource), 
+        b.getUtilization(), delta);
+    
+    // Now, A should get the scheduling opportunity since A=0G/6G, B=1G/14G
+    // also, B gets a scheduling opportunity since A allocates RACK_LOCAL
+    stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
+    stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
+    root.assignContainers(clusterResource, node_1);
+    InOrder allocationOrder = inOrder(a, b);
+    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), 
+        a.getUtilization(), delta);
+    assertEquals(computeQueueUtilization(b, 2*GB, clusterResource), 
+        b.getUtilization(), delta);
+    
+    // Now, B should get the scheduling opportunity 
+    // since A has 2/6G while B has 2/14G, 
+    // However, since B returns off-switch, A won't get an opportunity
+    stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
+    stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
+    root.assignContainers(clusterResource, node_0);
+    allocationOrder = inOrder(b, a);
+    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    allocationOrder.verify(a).assignContainers(eq(clusterResource), 
+        any(SchedulerNode.class));
+    assertEquals(computeQueueUtilization(a, 2*GB, clusterResource), 
+        a.getUtilization(), delta);
+    assertEquals(computeQueueUtilization(b, 4*GB, clusterResource), 
+        b.getUtilization(), delta);
+
+  }
+  
   @After
   @After
   public void tearDown() throws Exception {
   public void tearDown() throws Exception {
   }
   }