Browse Source

YARN-3463. Integrate OrderingPolicy Framework with CapacityScheduler. (Craig Welch via wangda)

(cherry picked from commit 44872b76fcc0ddfbc7b0a4e54eef50fe8708e0f5)
Wangda Tan 10 years ago
parent
commit
d0ea982e64
13 changed files with 242 additions and 49 deletions
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 2 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  3. 23 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  4. 30 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
  5. 47 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  6. 2 11
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
  7. 4 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
  8. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
  9. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
  10. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
  11. 13 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
  12. 8 7
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
  13. 98 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -39,6 +39,9 @@ Release 2.8.0 - UNRELEASED
     YARN-1402. Update related Web UI and CLI with exposing client API to check
     log aggregation status. (Xuan Gong via junping_du)
 
+    YARN-3463. Integrate OrderingPolicy Framework with CapacityScheduler.
+    (Craig Welch via wangda)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

+ 2 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -550,9 +550,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
 
         // lock the leafqueue while we scan applications and unreserve
         synchronized (qT.leafQueue) {
-          NavigableSet<FiCaSchedulerApp> ns = 
-              (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
-          Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
+          Iterator<FiCaSchedulerApp> desc =   
+            qT.leafQueue.getOrderingPolicy().getPreemptionIterator();
           qT.actuallyPreempted = Resources.clone(resToObtain);
           while (desc.hasNext()) {
             FiCaSchedulerApp fc = desc.next();

+ 23 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainer
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -72,7 +74,7 @@ import com.google.common.collect.Multiset;
  */
 @Private
 @Unstable
-public class SchedulerApplicationAttempt {
+public class SchedulerApplicationAttempt implements SchedulableEntity {
   
   private static final Log LOG = LogFactory
     .getLog(SchedulerApplicationAttempt.class);
@@ -710,4 +712,24 @@ public class SchedulerApplicationAttempt {
   public ResourceUsage getAppAttemptResourceUsage() {
     return this.attemptResourceUsage;
   }
+  
+  @Override
+  public String getId() {
+    return getApplicationId().toString();
+  }
+  
+  @Override
+  public int compareInputOrderTo(SchedulableEntity other) {
+    if (other instanceof SchedulerApplicationAttempt) {
+      return getApplicationId().compareTo(
+        ((SchedulerApplicationAttempt)other).getApplicationId());
+    }
+    return 1;//let other types go before this, if any
+  }
+  
+  @Override
+  public synchronized ResourceUsage getSchedulingResourceUsage() {
+    return attemptResourceUsage;
+  }
+  
 }

+ 30 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -48,6 +48,9 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
+
+
 import com.google.common.collect.ImmutableSet;
 
 public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration {
@@ -116,7 +119,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   @Private
   public static final String MAXIMUM_ALLOCATION_VCORES =
       "maximum-allocation-vcores";
-
+  
+  public static final String ORDERING_POLICY = "ordering-policy";
+  
+  public static final String DEFAULT_ORDERING_POLICY = "fifo";
+  
   @Private
   public static final int DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS = 10000;
   
@@ -378,6 +385,28 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         DEFAULT_USER_LIMIT);
     return userLimit;
   }
+  
+  @SuppressWarnings("unchecked")
+  public <S extends SchedulableEntity> OrderingPolicy<S> getOrderingPolicy(
+      String queue) {
+  
+    String policyType = get(getQueuePrefix(queue) + ORDERING_POLICY, 
+      DEFAULT_ORDERING_POLICY);
+    
+    OrderingPolicy<S> orderingPolicy;
+    
+    if (policyType.trim().equals("fifo")) {
+       policyType = FifoOrderingPolicy.class.getName();
+    }
+    try {
+      orderingPolicy = (OrderingPolicy<S>)
+        Class.forName(policyType).newInstance();
+    } catch (Exception e) {
+      String message = "Unable to construct ordering policy for: " + policyType + ", " + e.getMessage();
+      throw new RuntimeException(message, e);
+    }
+    return orderingPolicy;
+  }
 
   public void setUserLimit(String queue, int userLimit) {
     setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);

+ 47 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -93,7 +94,6 @@ public class LeafQueue extends AbstractCSQueue {
   
   private int nodeLocalityDelay;
 
-  Set<FiCaSchedulerApp> activeApplications;
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = 
       new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
   
@@ -121,6 +121,9 @@ public class LeafQueue extends AbstractCSQueue {
   
   private volatile ResourceLimits currentResourceLimits = null;
   
+  private OrderingPolicy<FiCaSchedulerApp> 
+    orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
+  
   public LeafQueue(CapacitySchedulerContext cs, 
       String queueName, CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
@@ -137,7 +140,6 @@ public class LeafQueue extends AbstractCSQueue {
         cs.getApplicationComparator();
     this.pendingApplications = 
         new TreeSet<FiCaSchedulerApp>(applicationComparator);
-    this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
     
     setupQueueConfigs(cs.getClusterResource());
   }
@@ -159,6 +161,9 @@ public class LeafQueue extends AbstractCSQueue {
     setQueueResourceLimitsInfo(clusterResource);
 
     CapacitySchedulerConfiguration conf = csContext.getConfiguration();
+    
+    setOrderingPolicy(conf.<FiCaSchedulerApp>getOrderingPolicy(getQueuePath()));
+    
     userLimit = conf.getUserLimit(getQueuePath());
     userLimitFactor = conf.getUserLimitFactor(getQueuePath());
 
@@ -322,7 +327,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   public synchronized int getNumActiveApplications() {
-    return activeApplications.size();
+    return orderingPolicy.getNumSchedulableEntities();
   }
 
   @Private
@@ -637,7 +642,7 @@ public class LeafQueue extends AbstractCSQueue {
         }
       }
       user.activateApplication();
-      activeApplications.add(application);
+      orderingPolicy.addSchedulableEntity(application);
       queueUsage.incAMUsed(application.getAMResource());
       user.getResourceUsage().incAMUsed(application.getAMResource());
       i.remove();
@@ -686,7 +691,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   public synchronized void removeApplicationAttempt(
       FiCaSchedulerApp application, User user) {
-    boolean wasActive = activeApplications.remove(application);
+    boolean wasActive =
+      orderingPolicy.removeSchedulableEntity(application);
     if (!wasActive) {
       pendingApplications.remove(application);
     } else {
@@ -727,7 +733,8 @@ public class LeafQueue extends AbstractCSQueue {
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
-        + " #applications=" + activeApplications.size());
+        + " #applications=" + 
+        orderingPolicy.getNumSchedulableEntities());
     }
     
     // Check for reserved resources
@@ -759,9 +766,10 @@ public class LeafQueue extends AbstractCSQueue {
       return NULL_ASSIGNMENT;
     }
     
-    // Try to assign containers to applications in order
-    for (FiCaSchedulerApp application : activeApplications) {
-      
+    for (Iterator<FiCaSchedulerApp> assignmentIterator =
+        orderingPolicy.getAssignmentIterator();
+        assignmentIterator.hasNext();) {
+      FiCaSchedulerApp application = assignmentIterator.next();
       if(LOG.isDebugEnabled()) {
         LOG.debug("pre-assignContainers for application "
         + application.getApplicationId());
@@ -1606,6 +1614,9 @@ public class LeafQueue extends AbstractCSQueue {
 
       // Inform the node
       node.allocateContainer(allocatedContainer);
+            
+      // Inform the ordering policy
+      orderingPolicy.containerAllocated(application, allocatedContainer);
 
       LOG.info("assignedContainer" +
           " application attempt=" + application.getApplicationAttemptId() +
@@ -1715,11 +1726,16 @@ public class LeafQueue extends AbstractCSQueue {
           removed =
               application.containerCompleted(rmContainer, containerStatus,
                   event, node.getPartition());
+          
           node.releaseContainer(container);
         }
 
         // Book-keeping
         if (removed) {
+          
+          // Inform the ordering policy
+          orderingPolicy.containerReleased(application, rmContainer);
+          
           releaseResource(clusterResource, application,
               container.getResource(), node.getPartition());
           LOG.info("completedContainer" +
@@ -1822,7 +1838,8 @@ public class LeafQueue extends AbstractCSQueue {
     activateApplications();
 
     // Update application properties
-    for (FiCaSchedulerApp application : activeApplications) {
+    for (FiCaSchedulerApp application :
+      orderingPolicy.getSchedulableEntities()) {
       synchronized (application) {
         computeUserLimitAndSetHeadroom(application, clusterResource,
             Resources.none(), RMNodeLabelsManager.NO_LABEL,
@@ -1916,19 +1933,19 @@ public class LeafQueue extends AbstractCSQueue {
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
-
+  
   /**
    * Obtain (read-only) collection of active applications.
    */
-  public Set<FiCaSchedulerApp> getApplications() {
-    // need to access the list of apps from the preemption monitor
-    return activeApplications;
+  public Collection<FiCaSchedulerApp> getApplications() {
+    return orderingPolicy.getSchedulableEntities();
   }
 
   // return a single Resource capturing the overal amount of pending resources
   public synchronized Resource getTotalResourcePending() {
     Resource ret = BuilderUtils.newResource(0, 0);
-    for (FiCaSchedulerApp f : activeApplications) {
+    for (FiCaSchedulerApp f : 
+      orderingPolicy.getSchedulableEntities()) {
       Resources.addTo(ret, f.getTotalPendingRequests());
     }
     return ret;
@@ -1940,7 +1957,8 @@ public class LeafQueue extends AbstractCSQueue {
     for (FiCaSchedulerApp pendingApp : pendingApplications) {
       apps.add(pendingApp.getApplicationAttemptId());
     }
-    for (FiCaSchedulerApp app : activeApplications) {
+    for (FiCaSchedulerApp app : 
+      orderingPolicy.getSchedulableEntities()) {
       apps.add(app.getApplicationAttemptId());
     }
   }
@@ -1993,6 +2011,19 @@ public class LeafQueue extends AbstractCSQueue {
     this.maxApplications = maxApplications;
   }
   
+  public synchronized OrderingPolicy<FiCaSchedulerApp>
+      getOrderingPolicy() {
+    return orderingPolicy;
+  }
+  
+  public synchronized void setOrderingPolicy(
+      OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
+   orderingPolicy.addAllSchedulableEntities(
+     this.orderingPolicy.getSchedulableEntities()
+     );
+    this.orderingPolicy = orderingPolicy;
+  }
+  
   /*
    * Holds shared values used by all applications in
    * the queue to calculate headroom on demand

+ 2 - 11
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java

@@ -68,15 +68,6 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
     schedulableEntities.add(schedulableEntity);
   }
   
-  public void setComparator(Comparator<SchedulableEntity> comparator) {
-    this.comparator = comparator;
-    TreeSet<S> schedulableEntities = new TreeSet<S>(comparator);
-    if (this.schedulableEntities != null) {
-      schedulableEntities.addAll(this.schedulableEntities);
-    }
-    this.schedulableEntities = schedulableEntities;
-  }
-  
   @VisibleForTesting
   public Comparator<SchedulableEntity> getComparator() {
     return comparator; 
@@ -103,7 +94,7 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
   }
   
   @Override
-  public abstract void configure(String conf);
+  public abstract void configure(Map<String, String> conf);
   
   @Override
   public abstract void containerAllocated(S schedulableEntity, 
@@ -114,6 +105,6 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
     RMContainer r);
   
   @Override
-  public abstract String getStatusMessage();
+  public abstract String getInfo();
   
 }

+ 4 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java

@@ -28,11 +28,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
 public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> {
   
   public FifoOrderingPolicy() {
-    setComparator(new FifoComparator());
+    this.comparator = new FifoComparator();
+    this.schedulableEntities = new TreeSet<S>(comparator);
   }
   
   @Override
-  public void configure(String conf) {
+  public void configure(Map<String, String> conf) {
     
   }
   
@@ -47,7 +48,7 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
     }
   
   @Override
-  public String getStatusMessage() {
+  public String getInfo() {
     return "FifoOrderingPolicy";
   }
   

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java

@@ -83,7 +83,7 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
    * Provides configuration information for the policy from the scheduler
    * configuration
    */
-  public void configure(String conf);
+  public void configure(Map<String, String> conf);
   
   /**
    * The passed SchedulableEntity has been allocated the passed Container,
@@ -104,6 +104,6 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
   /**
    * Display information regarding configuration & status
    */
-  public String getStatusMessage();
+  public String getInfo();
   
 }

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java

@@ -94,6 +94,7 @@ class CapacitySchedulerPage extends RmView {
           _("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
           _("Configured User Limit Factor:", String.format("%.1f", lqinfo.getUserLimitFactor())).
           _("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())).
+          _("Ordering Policy: ", lqinfo.getOrderingPolicyInfo()).
           _("Preemption:", lqinfo.getPreemptionDisabled() ? "disabled" : "enabled");
 
       html._(InfoBlock.class);

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao;
 import javax.xml.bind.annotation.XmlAccessType;
 import javax.xml.bind.annotation.XmlAccessorType;
 import javax.xml.bind.annotation.XmlRootElement;
+import javax.xml.bind.annotation.XmlTransient;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 
@@ -39,6 +40,9 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
   protected ResourceInfo usedAMResource;
   protected ResourceInfo userAMResourceLimit;
   protected boolean preemptionDisabled;
+  
+  @XmlTransient
+  protected String orderingPolicyInfo;
 
   CapacitySchedulerLeafQueueInfo() {
   };
@@ -57,6 +61,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
     usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
     userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
     preemptionDisabled = q.getPreemptionDisabled();
+    orderingPolicyInfo = q.getOrderingPolicy().getInfo();
   }
 
   public int getNumActiveApplications() {
@@ -107,4 +112,8 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
   public boolean getPreemptionDisabled() {
     return preemptionDisabled;
   }
+  
+  public String getOrderingPolicyInfo() {
+    return orderingPolicyInfo;
+  }
 }

+ 13 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -25,6 +25,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
 import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -38,6 +39,8 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer; 
 
 import java.util.ArrayList;
 import java.util.Comparator;
@@ -46,6 +49,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Random;
@@ -1032,7 +1036,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     when(lq.getTotalResourcePending()).thenReturn(
         Resource.newInstance(pending[i], 0));
     // consider moving where CapacityScheduler::comparator accessible
-    NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
+    final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
       new Comparator<FiCaSchedulerApp>() {
         @Override
         public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
@@ -1056,6 +1060,14 @@ public class TestProportionalCapacityPreemptionPolicy {
               .thenReturn(appAttemptIdList);
     }
     when(lq.getApplications()).thenReturn(qApps);
+    @SuppressWarnings("unchecked")
+    OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+    when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+     public Object answer(InvocationOnMock invocation) {
+         return qApps.descendingIterator();
+       }
+     });
+    when(lq.getOrderingPolicy()).thenReturn(so);
     if(setAMResourcePercent != 0.0f){
       when(lq.getMaxAMResourcePerQueuePercent()).thenReturn(setAMResourcePercent);
     }

+ 8 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java

@@ -155,6 +155,7 @@ public class TestApplicationLimits {
     doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
     doReturn(user).when(application).getUser();
     doReturn(amResource).when(application).getAMResource();
+    when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); 
     return application;
   }
   
@@ -469,7 +470,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.activeApplications.contains(app_0));
+    assertTrue(queue.getApplications().contains(app_0));
 
     // Submit second application
     FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
@@ -479,7 +480,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.activeApplications.contains(app_1));
+    assertTrue(queue.getApplications().contains(app_1));
 
     // Submit third application, should remain pending
     FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
@@ -508,7 +509,7 @@ public class TestApplicationLimits {
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(1, queue.getNumPendingApplications(user_0));
     assertFalse(queue.pendingApplications.contains(app_2));
-    assertFalse(queue.activeApplications.contains(app_2));
+    assertFalse(queue.getApplications().contains(app_2));
 
     // Finish 1st application, app_3 should become active
     queue.finishApplicationAttempt(app_0, A);
@@ -516,9 +517,9 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(2, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertTrue(queue.activeApplications.contains(app_3));
+    assertTrue(queue.getApplications().contains(app_3));
     assertFalse(queue.pendingApplications.contains(app_3));
-    assertFalse(queue.activeApplications.contains(app_0));
+    assertFalse(queue.getApplications().contains(app_0));
 
     // Finish 2nd application
     queue.finishApplicationAttempt(app_1, A);
@@ -526,7 +527,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(1, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertFalse(queue.activeApplications.contains(app_1));
+    assertFalse(queue.getApplications().contains(app_1));
 
     // Finish 4th application
     queue.finishApplicationAttempt(app_3, A);
@@ -534,7 +535,7 @@ public class TestApplicationLimits {
     assertEquals(0, queue.getNumPendingApplications());
     assertEquals(0, queue.getNumActiveApplications(user_0));
     assertEquals(0, queue.getNumPendingApplications(user_0));
-    assertFalse(queue.activeApplications.contains(app_3));
+    assertFalse(queue.getApplications().contains(app_3));
   }
 
   @Test

+ 98 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java

@@ -73,6 +73,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -381,6 +384,20 @@ public class TestLeafQueue {
     d.submitApplicationAttempt(app_1, user_d); // same user
   }
 
+  @Test
+  public void testPolicyConfiguration() throws Exception {
+    
+    CapacitySchedulerConfiguration testConf = 
+        new CapacitySchedulerConfiguration();
+    
+    String tproot = CapacitySchedulerConfiguration.ROOT + "." + 
+      "testPolicyRoot" + System.currentTimeMillis();
+
+    OrderingPolicy<FiCaSchedulerApp> comPol =    
+      testConf.<FiCaSchedulerApp>getOrderingPolicy(tproot);
+    
+    
+  }
 
   @Test
   public void testAppAttemptMetrics() throws Exception {
@@ -2011,7 +2028,7 @@ public class TestLeafQueue {
     e.submitApplicationAttempt(app_2, user_e);  // same user
 
     // before reinitialization
-    assertEquals(2, e.activeApplications.size());
+    assertEquals(2, e.getNumActiveApplications());
     assertEquals(1, e.pendingApplications.size());
 
     csConf.setDouble(CapacitySchedulerConfiguration
@@ -2028,7 +2045,7 @@ public class TestLeafQueue {
     root.reinitialize(newRoot, csContext.getClusterResource());
 
     // after reinitialization
-    assertEquals(3, e.activeApplications.size());
+    assertEquals(3, e.getNumActiveApplications());
     assertEquals(0, e.pendingApplications.size());
   }
   
@@ -2092,7 +2109,7 @@ public class TestLeafQueue {
     e.submitApplicationAttempt(app_2, user_e);  // same user
 
     // before updating cluster resource
-    assertEquals(2, e.activeApplications.size());
+    assertEquals(2, e.getNumActiveApplications());
     assertEquals(1, e.pendingApplications.size());
 
     Resource clusterResource = Resources.createResource(200 * 16 * GB, 100 * 32); 
@@ -2100,7 +2117,7 @@ public class TestLeafQueue {
         new ResourceLimits(clusterResource));
 
     // after updating cluster resource
-    assertEquals(3, e.activeApplications.size());
+    assertEquals(3, e.getNumActiveApplications());
     assertEquals(0, e.pendingApplications.size());
   }
 
@@ -2450,6 +2467,83 @@ public class TestLeafQueue {
       Assert.fail("NPE when allocating container on node but "
           + "forget to set off-switch request should be handled");
     }
+  }
+  
+    @Test
+  public void testFifoAssignment() throws Exception {
+
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    
+    a.setOrderingPolicy(new FifoOrderingPolicy<FiCaSchedulerApp>()); 
+    
+    String host_0_0 = "127.0.0.1";
+    String rack_0 = "rack_0";
+    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 16*GB);
+    
+    final int numNodes = 4;
+    Resource clusterResource = Resources.createResource(
+        numNodes * (16*GB), numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    String user_0 = "user_0";
+    
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    FiCaSchedulerApp app_0 = 
+        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), spyRMContext));
+    a.submitApplicationAttempt(app_0, user_0);
+    
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    FiCaSchedulerApp app_1 = 
+        spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
+            mock(ActiveUsersManager.class), spyRMContext));
+    a.submitApplicationAttempt(app_1, user_0);
+ 
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    List<ResourceRequest> app_1_requests_0 = new ArrayList<ResourceRequest>();
+    
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, 
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    
+    app_1_requests_0.clear();
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_1.updateResourceRequests(app_1_requests_0);
+    
+    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    
+    app_0_requests_0.clear();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    
+    app_1_requests_0.clear();
+    app_1_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_1.updateResourceRequests(app_1_requests_0);
+    
+    //Even thought it already has more resources, app_0 will still get 
+    //assigned first
+    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
+    Assert.assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    
+    //and only then will app_1
+    a.assignContainers(clusterResource, node_0_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    Assert.assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
+
   }
 
   @Test