Explorar o código

YARN-569. Add support for requesting and enforcing preemption requests via
a capacity monitor. Contributed by Carlo Curino, Chris Douglas


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.1-beta@1502085 13f79535-47bb-0310-9956-ffa450edef68

Christopher Douglas %!s(int64=12) %!d(string=hai) anos
pai
achega
bf1d19e9d2
Modificáronse 18 ficheiros con 1824 adicións e 102 borrados
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java
  3. 12 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  4. 16 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  5. 14 12
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
  6. 73 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  7. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
  8. 92 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
  9. 669 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  10. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  11. 57 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java
  12. 26 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java
  13. 50 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java
  14. 47 10
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  15. 45 26
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
  16. 105 38
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
  17. 21 13
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
  18. 541 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -432,6 +432,9 @@ Release 2.1.0-beta - 2013-07-02
 
     YARN-883. Expose Fair Scheduler-specific queue metrics. (sandyr via tucu)
 
+    YARN-569. Add support for requesting and enforcing preemption requests via
+    a capacity monitor. (Carlo Curino, cdouglas)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Priority.java

@@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.util.Records;
 @Stable
 public abstract class Priority implements Comparable<Priority> {
 
+  public static final Priority UNDEFINED = newInstance(-1);
+
   @Public
   @Stable
   public static Priority newInstance(int p) {

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -132,6 +132,18 @@ public class YarnConfiguration extends Configuration {
     RM_PREFIX + "scheduler.client.thread-count";
   public static final int DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT = 50;
 
+  /**
+   * Enable periodic monitor threads.
+   * @see #RM_SCHEDULER_MONITOR_POLICIES
+   */
+  public static final String RM_SCHEDULER_ENABLE_MONITORS =
+    RM_PREFIX + "scheduler.monitor.enable";
+  public static final boolean DEFAULT_RM_SCHEDULER_ENABLE_MONITORS = false;
+
+  /** List of SchedulingEditPolicy classes affecting the scheduler. */
+  public static final String RM_SCHEDULER_MONITOR_POLICIES =
+    RM_PREFIX + "scheduler.monitor.policies";
+
   /** The address of the RM web application.*/
   public static final String RM_WEBAPP_ADDRESS = 
     RM_PREFIX + "webapp.address";

+ 16 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -291,6 +291,22 @@
     <value>1000</value>
   </property>
 
+  <property>
+    <description>Enable a set of periodic monitors (specified in
+        yarn.resourcemanager.scheduler.monitor.policies) that affect the
+        scheduler.</description>
+    <name>yarn.resourcemanager.scheduler.monitor.enable</name>
+    <value>false</value>
+  </property>
+
+  <property>
+    <description>The list of SchedulingEditPolicy classes that interact with
+        the scheduler. A particular module may be incompatible with the
+        scheduler, other policies, or a configuration of either.</description>
+    <name>yarn.resourcemanager.scheduler.monitor.policies</name>
+    <value>org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy</value>
+  </property>
+
   <!-- Node Manager Configs -->
   <property>
     <description>The hostname of the NM.</description>

+ 14 - 12
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java

@@ -411,28 +411,30 @@ public class ApplicationMasterService extends AbstractService implements
       allocateResponse.setResponseId(lastResponse.getResponseId() + 1);
       allocateResponse.setAvailableResources(allocation.getResourceLimit());
       
-      AllocateResponse oldResponse =
-          responseMap.put(appAttemptId, allocateResponse);
-      if (oldResponse == null) {
-        // appAttempt got unregistered, remove it back out
-        responseMap.remove(appAttemptId);
-        String message = "App Attempt removed from the cache during allocate"
-            + appAttemptId;
-        LOG.error(message);
-        return resync;
-      }
-      
       allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
    
       // add preemption to the allocateResponse message (if any)
       allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
-      
+
       // Adding NMTokens for allocated containers.
       if (!allocation.getContainers().isEmpty()) {
         allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
             .createAndGetNMTokens(app.getUser(), appAttemptId,
                 allocation.getContainers()));
       }
+
+      // before returning response, verify in sync
+      AllocateResponse oldResponse =
+          responseMap.put(appAttemptId, allocateResponse);
+      if (oldResponse == null) {
+        // appAttempt got unregistered, remove it back out
+        responseMap.remove(appAttemptId);
+        String message = "App Attempt removed from the cache during allocate"
+            + appAttemptId;
+        LOG.error(message);
+        return resync;
+      }
+
       return allocateResponse;
     }
   }

+ 73 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
-
 import java.io.IOException;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -48,6 +48,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -61,9 +63,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -237,6 +243,9 @@ public class ResourceManager extends CompositeService implements Recoverable {
       throw new RuntimeException("Failed to initialize scheduler", ioe);
     }
 
+    // creating monitors that handle preemption
+    createPolicyMonitors();
+
     masterService = createApplicationMasterService();
     addService(masterService) ;
 
@@ -315,7 +324,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
     } catch (ClassNotFoundException e) {
       throw new YarnRuntimeException("Could not instantiate Scheduler: "
           + schedulerClassName, e);
-    }  }
+    }
+  }
 
   protected ApplicationMasterLauncher createAMLauncher() {
     return new ApplicationMasterLauncher(this.rmContext);
@@ -476,6 +486,36 @@ public class ResourceManager extends CompositeService implements Recoverable {
     }
   }
 
+  @Private
+  public static final class
+    RMContainerPreemptEventDispatcher
+      implements EventHandler<ContainerPreemptEvent> {
+
+    private final PreemptableResourceScheduler scheduler;
+
+    public RMContainerPreemptEventDispatcher(
+        PreemptableResourceScheduler scheduler) {
+      this.scheduler = scheduler;
+    }
+
+    @Override
+    public void handle(ContainerPreemptEvent event) {
+      ApplicationAttemptId aid = event.getAppId();
+      RMContainer container = event.getContainer();
+      switch (event.getType()) {
+      case DROP_RESERVATION:
+        scheduler.dropContainerReservation(container);
+        break;
+      case PREEMPT_CONTAINER:
+        scheduler.preemptContainer(aid, container);
+        break;
+      case KILL_CONTAINER:
+        scheduler.killContainer(container);
+        break;
+      }
+    }
+  }
+
   @Private
   public static final class ApplicationAttemptEventDispatcher implements
       EventHandler<RMAppAttemptEvent> {
@@ -676,7 +716,37 @@ public class ResourceManager extends CompositeService implements Recoverable {
   protected ApplicationMasterService createApplicationMasterService() {
     return new ApplicationMasterService(this.rmContext, scheduler);
   }
-  
+
+  protected void createPolicyMonitors() {
+    if (scheduler instanceof PreemptableResourceScheduler
+        && conf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
+          YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS)) {
+      LOG.info("Loading policy monitors");
+      List<SchedulingEditPolicy> policies = conf.getInstances(
+              YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+              SchedulingEditPolicy.class);
+      if (policies.size() > 0) {
+        this.rmDispatcher.register(ContainerPreemptEventType.class,
+          new RMContainerPreemptEventDispatcher(
+            (PreemptableResourceScheduler) scheduler));
+        for (SchedulingEditPolicy policy : policies) {
+          LOG.info("LOADING SchedulingEditPolicy:" + policy.getPolicyName());
+          policy.init(conf, this.rmContext.getDispatcher().getEventHandler(),
+              (PreemptableResourceScheduler) scheduler);
+          // periodically check whether we need to take action to guarantee
+          // constraints
+          SchedulingMonitor mon = new SchedulingMonitor(policy);
+          addService(mon);
+
+        }
+      } else {
+        LOG.warn("Policy monitors configured (" +
+            YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS +
+            ") but none specified (" +
+            YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES + ")");
+      }
+    }
+  }
 
   protected AdminService createAdminService(
       ClientRMService clientRMService, 

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.monitor;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+
+public interface SchedulingEditPolicy {
+
+  public void init(Configuration config,
+      EventHandler<ContainerPreemptEvent> dispatcher,
+      PreemptableResourceScheduler scheduler);
+
+  /**
+   * This method is invoked at regular intervals. Internally the policy is
+   * allowed to track containers and affect the scheduler. The "actions"
+   * performed are passed back through an EventHandler.
+   */
+  public void editSchedule();
+
+  public long getMonitoringInterval();
+
+  public String getPolicyName();
+
+}

+ 92 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java

@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.monitor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class SchedulingMonitor extends AbstractService {
+
+  private final SchedulingEditPolicy scheduleEditPolicy;
+  private static final Log LOG = LogFactory.getLog(SchedulingMonitor.class);
+
+  //thread which runs periodically to see the last time since a heartbeat is
+  //received.
+  private Thread checkerThread;
+  private volatile boolean stopped;
+  private long monitorInterval;
+
+  public SchedulingMonitor(SchedulingEditPolicy scheduleEditPolicy) {
+    super("SchedulingMonitor (" + scheduleEditPolicy.getPolicyName() + ")");
+    this.scheduleEditPolicy = scheduleEditPolicy;
+    this.monitorInterval = scheduleEditPolicy.getMonitoringInterval();
+  }
+
+  public long getMonitorInterval() {
+    return monitorInterval;
+  }
+
+  public void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+  }
+
+  @Override
+  public void serviceStart() throws Exception {
+    assert !stopped : "starting when already stopped";
+    checkerThread = new Thread(new PreemptionChecker());
+    checkerThread.setName(getName());
+    checkerThread.start();
+    super.serviceStart();
+  }
+
+  @Override
+  public void serviceStop() throws Exception {
+    stopped = true;
+    if (checkerThread != null) {
+      checkerThread.interrupt();
+    }
+    super.serviceStop();
+  }
+
+  @VisibleForTesting
+  public void invokePolicy(){
+    scheduleEditPolicy.editSchedule();
+  }
+
+  private class PreemptionChecker implements Runnable {
+    @Override
+    public void run() {
+      while (!stopped && !Thread.currentThread().isInterrupted()) {
+        //invoke the preemption policy at a regular pace
+        //the policy will generate preemption or kill events
+        //managed by the dispatcher
+        invokePolicy();
+        try {
+          Thread.sleep(monitorInterval);
+        } catch (InterruptedException e) {
+          LOG.info(getName() + " thread interrupted");
+          break;
+        }
+      }
+    }
+  }
+}

+ 669 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -0,0 +1,669 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class implement a {@link SchedulingEditPolicy} that is designed to be
+ * paired with the {@code CapacityScheduler}. At every invocation of {@code
+ * editSchedule()} it computes the ideal amount of resources assigned to each
+ * queue (for each queue in the hierarchy), and determines whether preemption
+ * is needed. Overcapacity is distributed among queues in a weighted fair manner,
+ * where the weight is the amount of guaranteed capacity for the queue.
+ * Based on this ideal assignment it determines whether preemption is required
+ * and select a set of containers from each application that would be killed if
+ * the corresponding amount of resources is not freed up by the application.
+ *
+ * If not in {@code observeOnly} mode, it triggers preemption requests via a
+ * {@link ContainerPreemptEvent} that the {@code ResourceManager} will ensure
+ * to deliver to the application (or to execute).
+ *
+ * If the deficit of resources is persistent over a long enough period of time
+ * this policy will trigger forced termination of containers (again by generating
+ * {@link ContainerPreemptEvent}).
+ */
+public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolicy {
+
+  private static final Log LOG =
+    LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
+
+  /** If true, run the policy but do not affect the cluster with preemption and
+   * kill events. */
+  public static final String OBSERVE_ONLY =
+      "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
+  /** Time in milliseconds between invocations of this policy */
+  public static final String MONITORING_INTERVAL =
+      "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
+  /** Time in milliseconds between requesting a preemption from an application
+   * and killing the container. */
+  public static final String WAIT_TIME_BEFORE_KILL =
+      "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
+  /** Maximum percentage of resources preempted in a single round. By
+   * controlling this value one can throttle the pace at which containers are
+   * reclaimed from the cluster. After computing the total desired preemption,
+   * the policy scales it back within this limit. */
+  public static final String TOTAL_PREEMPTION_PER_ROUND =
+      "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
+  /** Maximum amount of resources above the target capacity ignored for
+   * preemption. This defines a deadzone around the target capacity that helps
+   * prevent thrashing and oscillations around the computed target balance.
+   * High values would slow the time to capacity and (absent natural
+   * completions) it might prevent convergence to guaranteed capacity. */
+  public static final String MAX_IGNORED_OVER_CAPACITY =
+    "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
+  /**
+   * Given a computed preemption target, account for containers naturally
+   * expiring and preempt only this percentage of the delta. This determines
+   * the rate of geometric convergence into the deadzone ({@link
+   * #MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
+   * will reclaim almost 95% of resources within 5 * {@link
+   * #WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
+  public static final String NATURAL_TERMINATION_FACTOR =
+      "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
+
+  //the dispatcher to send preempt and kill events
+  public EventHandler<ContainerPreemptEvent> dispatcher;
+
+  private final Clock clock;
+  private double maxIgnoredOverCapacity;
+  private long maxWaitTime;
+  private CapacityScheduler scheduler;
+  private long monitoringInterval;
+  private final Map<RMContainer,Long> preempted =
+    new HashMap<RMContainer,Long>();
+  private ResourceCalculator rc;
+  private float percentageClusterPreemptionAllowed;
+  private double naturalTerminationFactor;
+  private boolean observeOnly;
+
+  public ProportionalCapacityPreemptionPolicy() {
+    clock = new SystemClock();
+  }
+
+  public ProportionalCapacityPreemptionPolicy(Configuration config,
+      EventHandler<ContainerPreemptEvent> dispatcher,
+      CapacityScheduler scheduler) {
+    this(config, dispatcher, scheduler, new SystemClock());
+  }
+
+  public ProportionalCapacityPreemptionPolicy(Configuration config,
+      EventHandler<ContainerPreemptEvent> dispatcher,
+      CapacityScheduler scheduler, Clock clock) {
+    init(config, dispatcher, scheduler);
+    this.clock = clock;
+  }
+
+  public void init(Configuration config,
+      EventHandler<ContainerPreemptEvent> disp,
+      PreemptableResourceScheduler sched) {
+    LOG.info("Preemption monitor:" + this.getClass().getCanonicalName());
+    assert null == scheduler : "Unexpected duplicate call to init";
+    if (!(sched instanceof CapacityScheduler)) {
+      throw new YarnRuntimeException("Class " +
+          sched.getClass().getCanonicalName() + " not instance of " +
+          CapacityScheduler.class.getCanonicalName());
+    }
+    dispatcher = disp;
+    scheduler = (CapacityScheduler) sched;
+    maxIgnoredOverCapacity = config.getDouble(MAX_IGNORED_OVER_CAPACITY, 0.1);
+    naturalTerminationFactor =
+      config.getDouble(NATURAL_TERMINATION_FACTOR, 0.2);
+    maxWaitTime = config.getLong(WAIT_TIME_BEFORE_KILL, 15000);
+    monitoringInterval = config.getLong(MONITORING_INTERVAL, 3000);
+    percentageClusterPreemptionAllowed =
+      config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
+    observeOnly = config.getBoolean(OBSERVE_ONLY, false);
+    rc = scheduler.getResourceCalculator();
+  }
+
+  @Override
+  public void editSchedule(){
+    CSQueue root = scheduler.getRootQueue();
+    Resource clusterResources =
+      Resources.clone(scheduler.getClusterResources());
+    containerBasedPreemptOrKill(root, clusterResources);
+  }
+
+  /**
+   * This method selects and tracks containers to be preempted. If a container
+   * is in the target list for more than maxWaitTime it is killed.
+   *
+   * @param root the root of the CapacityScheduler queue hierarchy
+   * @param clusterResources the total amount of resources in the cluster
+   */
+  private void containerBasedPreemptOrKill(CSQueue root,
+      Resource clusterResources) {
+
+    // extract a summary of the queues from scheduler
+    TempQueue tRoot;
+    synchronized (scheduler) {
+      tRoot = cloneQueues(root, clusterResources);
+    }
+
+    // compute the ideal distribution of resources among queues
+    // updates cloned queues state accordingly
+    tRoot.idealAssigned = tRoot.guaranteed;
+    Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
+        percentageClusterPreemptionAllowed);
+    List<TempQueue> queues =
+      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+
+    // based on ideal allocation select containers to be preempted from each
+    // queue and each application
+    Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
+        getContainersToPreempt(queues, clusterResources);
+
+    logToCSV(queues);
+
+    // if we are in observeOnly mode return before any action is taken
+    if (observeOnly) {
+      return;
+    }
+
+    // preempt (or kill) the selected containers
+    for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
+         : toPreempt.entrySet()) {
+      for (RMContainer container : e.getValue()) {
+        // if we tried to preempt this for more than maxWaitTime
+        if (preempted.get(container) != null &&
+            preempted.get(container) + maxWaitTime < clock.getTime()) {
+          // kill it
+          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
+                ContainerPreemptEventType.KILL_CONTAINER));
+          preempted.remove(container);
+        } else {
+          //otherwise just send preemption events
+          dispatcher.handle(new ContainerPreemptEvent(e.getKey(), container,
+                ContainerPreemptEventType.PREEMPT_CONTAINER));
+          if (preempted.get(container) == null) {
+            preempted.put(container, clock.getTime());
+          }
+        }
+      }
+    }
+
+    // Keep the preempted list clean
+    for (Iterator<RMContainer> i = preempted.keySet().iterator(); i.hasNext();){
+      RMContainer id = i.next();
+      // garbage collect containers that are irrelevant for preemption
+      if (preempted.get(id) + 2 * maxWaitTime < clock.getTime()) {
+        i.remove();
+      }
+    }
+  }
+
+  /**
+   * This method recursively computes the ideal assignment of resources to each
+   * level of the hierarchy. This ensures that leafs that are over-capacity but
+   * with parents within capacity will not be preempted. Preemptions are allowed
+   * within each subtree according to local over/under capacity.
+   *
+   * @param root the root of the cloned queue hierachy
+   * @param totalPreemptionAllowed maximum amount of preemption allowed
+   * @return a list of leaf queues updated with preemption targets
+   */
+  private List<TempQueue> recursivelyComputeIdealAssignment(
+      TempQueue root, Resource totalPreemptionAllowed) {
+    List<TempQueue> leafs = new ArrayList<TempQueue>();
+    if (root.getChildren() != null &&
+        root.getChildren().size() > 0) {
+      // compute ideal distribution at this level
+      computeIdealResourceDistribution(rc, root.getChildren(),
+          totalPreemptionAllowed, root.idealAssigned);
+      // compute recursively for lower levels and build list of leafs
+      for(TempQueue t : root.getChildren()) {
+        leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
+      }
+    } else {
+      // we are in a leaf nothing to do, just return yourself
+      return Collections.singletonList(root);
+    }
+    return leafs;
+  }
+
+  /**
+   * This method computes (for a single level in the tree, passed as a {@code
+   * List<TempQueue>}) the ideal assignment of resources. This is done
+   * recursively to allocate capacity fairly across all queues with pending
+   * demands. It terminates when no resources are left to assign, or when all
+   * demand is satisfied.
+   *
+   * @param rc resource calculator
+   * @param queues a list of cloned queues to be assigned capacity to (this is
+   * an out param)
+   * @param totalPreemptionAllowed total amount of preemption we allow
+   * @param tot_guarant the amount of capacity assigned to this pool of queues
+   */
+  private void computeIdealResourceDistribution(ResourceCalculator rc,
+      List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
+
+    // qAlloc tracks currently active queues (will decrease progressively as
+    // demand is met)
+    List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
+    // unassigned tracks how much resources are still to assign, initialized
+    // with the total capacity for this set of queues
+    Resource unassigned = Resources.clone(tot_guarant);
+
+    //assign all cluster resources until no more demand, or no resources are left
+    while (!qAlloc.isEmpty() && Resources.greaterThan(rc, tot_guarant,
+          unassigned, Resources.none())) {
+      Resource wQassigned = Resource.newInstance(0, 0);
+
+      // we compute normalizedGuarantees capacity based on currently active
+      // queues
+      resetCapacity(rc, unassigned, qAlloc);
+
+      // offer for each queue their capacity first and in following invocations
+      // their share of over-capacity
+      for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
+        TempQueue sub = i.next();
+        Resource wQavail =
+          Resources.multiply(unassigned, sub.normalizedGuarantee);
+        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+        Resource wQdone = Resources.subtract(wQavail, wQidle);
+        // if the queue returned a value > 0 it means it is fully satisfied
+        // and it is removed from the list of active queues qAlloc
+        if (!Resources.greaterThan(rc, tot_guarant,
+              wQdone, Resources.none())) {
+          i.remove();
+        }
+        Resources.addTo(wQassigned, wQdone);
+      }
+      Resources.subtractFrom(unassigned, wQassigned);
+    }
+
+    // based on ideal assignment computed above and current assignment we derive
+    // how much preemption is required overall
+    Resource totPreemptionNeeded = Resource.newInstance(0, 0);
+    for (TempQueue t:queues) {
+      if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
+        Resources.addTo(totPreemptionNeeded,
+            Resources.subtract(t.current, t.idealAssigned));
+      }
+    }
+
+    // if we need to preempt more than is allowed, compute a factor (0<f<1)
+    // that is used to scale down how much we ask back from each queue
+    float scalingFactor = 1.0F;
+    if (Resources.greaterThan(rc, tot_guarant,
+          totPreemptionNeeded, totalPreemptionAllowed)) {
+       scalingFactor = Resources.divide(rc, tot_guarant,
+           totalPreemptionAllowed, totPreemptionNeeded);
+    }
+
+    // assign to each queue the amount of actual preemption based on local
+    // information of ideal preemption and scaling factor
+    for (TempQueue t : queues) {
+      t.assignPreemption(scalingFactor, rc, tot_guarant);
+    }
+    if (LOG.isDebugEnabled()) {
+      long time = clock.getTime();
+      for (TempQueue t : queues) {
+        LOG.debug(time + ": " + t);
+      }
+    }
+
+  }
+
+  /**
+   * Computes a normalizedGuaranteed capacity based on active queues
+   * @param rc resource calculator
+   * @param clusterResource the total amount of resources in the cluster
+   * @param queues the list of queues to consider
+   */
+  private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
+      List<TempQueue> queues) {
+    Resource activeCap = Resource.newInstance(0, 0);
+    for (TempQueue q : queues) {
+      Resources.addTo(activeCap, q.guaranteed);
+    }
+    for (TempQueue q : queues) {
+      q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+          q.guaranteed, activeCap);
+    }
+  }
+
+  /**
+   * Based a resource preemption target drop reservations of containers and
+   * if necessary select containers for preemption from applications in each
+   * over-capacity queue. It uses {@link #NATURAL_TERMINATION_FACTOR} to
+   * account for containers that will naturally complete.
+   *
+   * @param queues set of leaf queues to preempt from
+   * @param clusterResource total amount of cluster resources
+   * @return a map of applciationID to set of containers to preempt
+   */
+  private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
+      List<TempQueue> queues, Resource clusterResource) {
+
+    Map<ApplicationAttemptId,Set<RMContainer>> list =
+        new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+
+    for (TempQueue qT : queues) {
+      // we act only if we are violating balance by more than
+      // maxIgnoredOverCapacity
+      if (Resources.greaterThan(rc, clusterResource, qT.current,
+          Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
+        // we introduce a dampening factor naturalTerminationFactor that
+        // accounts for natural termination of containers
+        Resource resToObtain =
+          Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+
+        // lock the leafqueue while we scan applications and unreserve
+        synchronized(qT.leafQueue) {
+          NavigableSet<FiCaSchedulerApp> ns =
+            (NavigableSet<FiCaSchedulerApp>) qT.leafQueue.getApplications();
+          Iterator<FiCaSchedulerApp> desc = ns.descendingIterator();
+          qT.actuallyPreempted = Resources.clone(resToObtain);
+          while (desc.hasNext()) {
+            FiCaSchedulerApp fc = desc.next();
+            if (Resources.lessThanOrEqual(rc, clusterResource,
+                resToObtain, Resources.none())) {
+              break;
+            }
+            list.put(fc.getApplicationAttemptId(),
+            preemptFrom(fc, clusterResource, resToObtain));
+          }
+        }
+      }
+    }
+    return list;
+  }
+
+  /**
+   * Given a target preemption for a specific application, select containers
+   * to preempt (after unreserving all reservation for that app).
+   *
+   * @param app
+   * @param clusterResource
+   * @param rsrcPreempt
+   * @return
+   */
+  private Set<RMContainer> preemptFrom(
+      FiCaSchedulerApp app, Resource clusterResource, Resource rsrcPreempt) {
+    Set<RMContainer> ret = new HashSet<RMContainer>();
+    ApplicationAttemptId appId = app.getApplicationAttemptId();
+
+    // first drop reserved containers towards rsrcPreempt
+    List<RMContainer> reservations =
+        new ArrayList<RMContainer>(app.getReservedContainers());
+    for (RMContainer c : reservations) {
+      if (Resources.lessThanOrEqual(rc, clusterResource,
+          rsrcPreempt, Resources.none())) {
+        return ret;
+      }
+      if (!observeOnly) {
+        dispatcher.handle(new ContainerPreemptEvent(appId, c,
+            ContainerPreemptEventType.DROP_RESERVATION));
+      }
+      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
+    }
+
+    // if more resources are to be freed go through all live containers in
+    // reverse priority and reverse allocation order and mark them for
+    // preemption
+    List<RMContainer> containers =
+      new ArrayList<RMContainer>(app.getLiveContainers());
+
+    sortContainers(containers);
+
+    for (RMContainer c : containers) {
+      if (Resources.lessThanOrEqual(rc, clusterResource,
+            rsrcPreempt, Resources.none())) {
+        return ret;
+      }
+      ret.add(c);
+      Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
+    }
+
+    return ret;
+  }
+
+  /**
+   * Compare by reversed priority order first, and then reversed containerId
+   * order
+   * @param containers
+   */
+  @VisibleForTesting
+  static void sortContainers(List<RMContainer> containers){
+    Collections.sort(containers, new Comparator<RMContainer>() {
+      @Override
+      public int compare(RMContainer a, RMContainer b) {
+        Comparator<Priority> c = new org.apache.hadoop.yarn.server
+            .resourcemanager.resource.Priority.Comparator();
+        int priorityComp = c.compare(b.getContainer().getPriority(),
+                                     a.getContainer().getPriority());
+        if (priorityComp != 0) {
+          return priorityComp;
+        }
+        return b.getContainerId().getId() -
+               a.getContainerId().getId();
+      }
+    });
+  }
+
+  @Override
+  public long getMonitoringInterval() {
+    return monitoringInterval;
+  }
+
+  @Override
+  public String getPolicyName() {
+    return "ProportionalCapacityPreemptionPolicy";
+  }
+
+
+  /**
+   * This method walks a tree of CSQueue and clones the portion of the state
+   * relevant for preemption in TempQueue(s). It also maintains a pointer to
+   * the leaves. Finally it aggregates pending resources in each queue and rolls
+   * it up to higher levels.
+   *
+   * @param root the root of the CapacityScheduler queue hierarchy
+   * @param clusterResources the total amount of resources in the cluster
+   * @return the root of the cloned queue hierarchy
+   */
+  private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
+    TempQueue ret;
+    synchronized (root) {
+    float absUsed = root.getAbsoluteUsedCapacity();
+      Resource current = Resources.multiply(clusterResources, absUsed);
+      Resource guaranteed =
+        Resources.multiply(clusterResources, root.getAbsoluteCapacity());
+      if (root instanceof LeafQueue) {
+        LeafQueue l = (LeafQueue) root;
+        Resource pending = l.getTotalResourcePending();
+        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        ret.setLeafQueue(l);
+      } else {
+        Resource pending = Resource.newInstance(0, 0);
+        ret = new TempQueue(root.getQueueName(), current, pending, guaranteed);
+        for (CSQueue c : root.getChildQueues()) {
+          ret.addChild(cloneQueues(c, clusterResources));
+        }
+      }
+    }
+    return ret;
+  }
+
+  // simple printout function that reports internal queue state (useful for
+  // plotting)
+  private void logToCSV(List<TempQueue> unorderedqueues){
+    List<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
+    Collections.sort(queues, new Comparator<TempQueue>(){
+      @Override
+      public int compare(TempQueue o1, TempQueue o2) {
+        return o1.queueName.compareTo(o2.queueName);
+      }});
+    String queueState = " QUEUESTATE: " + clock.getTime();
+    StringBuilder sb = new StringBuilder();
+    sb.append(queueState);
+    for (TempQueue tq : queues) {
+      sb.append(", ");
+      tq.appendLogString(sb);
+    }
+    LOG.info(sb.toString());
+  }
+
+  /**
+   * Temporary data-structure tracking resource availability, pending resource
+   * need, current utilization. Used to clone {@link CSQueue}.
+   */
+  static class TempQueue {
+    final String queueName;
+    final Resource current;
+    final Resource pending;
+    final Resource guaranteed;
+    Resource idealAssigned;
+    Resource toBePreempted;
+    Resource actuallyPreempted;
+
+    double normalizedGuarantee;
+
+    final ArrayList<TempQueue> children;
+    LeafQueue leafQueue;
+
+    TempQueue(String queueName, Resource current, Resource pending,
+        Resource guaranteed) {
+      this.queueName = queueName;
+      this.current = current;
+      this.pending = pending;
+      this.guaranteed = guaranteed;
+      this.idealAssigned = Resource.newInstance(0, 0);
+      this.actuallyPreempted = Resource.newInstance(0, 0);
+      this.toBePreempted = Resource.newInstance(0, 0);
+      this.normalizedGuarantee = Float.NaN;
+      this.children = new ArrayList<TempQueue>();
+    }
+
+    public void setLeafQueue(LeafQueue l){
+      assert children.size() == 0;
+      this.leafQueue = l;
+    }
+
+    /**
+     * When adding a child we also aggregate its pending resource needs.
+     * @param q the child queue to add to this queue
+     */
+    public void addChild(TempQueue q) {
+      assert leafQueue == null;
+      children.add(q);
+      Resources.addTo(pending, q.pending);
+    }
+
+    public void addChildren(ArrayList<TempQueue> queues) {
+      assert leafQueue == null;
+      children.addAll(queues);
+    }
+
+
+    public ArrayList<TempQueue> getChildren(){
+      return children;
+    }
+
+    // This function "accepts" all the resources it can (pending) and return
+    // the unused ones
+    Resource offer(Resource avail, ResourceCalculator rc,
+        Resource clusterResource) {
+      // remain = avail - min(avail, current + pending - assigned)
+      Resource accepted = Resources.min(rc, clusterResource,
+          avail,
+          Resources.subtract(
+              Resources.add(current, pending),
+              idealAssigned));
+      Resource remain = Resources.subtract(avail, accepted);
+      Resources.addTo(idealAssigned, accepted);
+      return remain;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("CUR: ").append(current)
+        .append(" PEN: ").append(pending)
+        .append(" GAR: ").append(guaranteed)
+        .append(" NORM: ").append(normalizedGuarantee)
+        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
+        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
+        .append(" ACTUAL_PREEMPT: ").append(actuallyPreempted);
+
+      return sb.toString();
+    }
+    public void assignPreemption(float scalingFactor,
+        ResourceCalculator rc, Resource clusterResource) {
+      if (Resources.greaterThan(rc, clusterResource, current, idealAssigned)) {
+          toBePreempted = Resources.multiply(
+              Resources.subtract(current, idealAssigned), scalingFactor);
+      } else {
+        toBePreempted = Resource.newInstance(0, 0);
+      }
+    }
+
+    void appendLogString(StringBuilder sb) {
+      sb.append(queueName).append(", ")
+        .append(current.getMemory()).append(", ")
+        .append(current.getVirtualCores()).append(", ")
+        .append(pending.getMemory()).append(", ")
+        .append(pending.getVirtualCores()).append(", ")
+        .append(guaranteed.getMemory()).append(", ")
+        .append(guaranteed.getVirtualCores()).append(", ")
+        .append(idealAssigned.getMemory()).append(", ")
+        .append(idealAssigned.getVirtualCores()).append(", ")
+        .append(toBePreempted.getMemory()).append(", ")
+        .append(toBePreempted.getVirtualCores() ).append(", ")
+        .append(actuallyPreempted.getMemory()).append(", ")
+        .append(actuallyPreempted.getVirtualCores());
+    }
+
+  }
+
+}

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -205,6 +206,14 @@ public class AppSchedulingInfo {
     return requests.get(priority);
   }
 
+  synchronized public List<ResourceRequest> getAllResourceRequests() {
+    List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
+    for (Map<String, ResourceRequest> r : requests.values()) {
+      ret.addAll(r.values());
+    }
+    return ret;
+  }
+
   synchronized public ResourceRequest getResourceRequest(Priority priority,
       String resourceName) {
     Map<String, ResourceRequest> nodeRequests = requests.get(priority);

+ 57 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEvent.java

@@ -0,0 +1,57 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * Simple event class used to communicate containers unreservations, preemption, killing
+ */
+public class ContainerPreemptEvent
+    extends AbstractEvent<ContainerPreemptEventType> {
+
+  private final ApplicationAttemptId aid;
+  private final RMContainer container;
+
+  public ContainerPreemptEvent(ApplicationAttemptId aid, RMContainer container,
+      ContainerPreemptEventType type) {
+    super(type);
+    this.aid = aid;
+    this.container = container;
+  }
+
+  public RMContainer getContainer(){
+    return this.container;
+  }
+
+  public ApplicationAttemptId getAppId() {
+    return aid;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder(super.toString());
+    sb.append(" ").append(getAppId());
+    sb.append(" ").append(getContainer().getContainerId());
+    return sb.toString();
+  }
+
+}

+ 26 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ContainerPreemptEventType.java

@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+public enum ContainerPreemptEventType {
+
+  DROP_RESERVATION,
+  PREEMPT_CONTAINER,
+  KILL_CONTAINER
+
+}

+ 50 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PreemptableResourceScheduler.java

@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+/**
+ * Interface for a scheduler that supports preemption/killing
+ *
+ */
+public interface PreemptableResourceScheduler extends ResourceScheduler {
+
+  /**
+   * If the scheduler support container reservations, this method is used to
+   * ask the scheduler to drop the reservation for the given container.
+   * @param container Reference to reserved container allocation.
+   */
+  void dropContainerReservation(RMContainer container);
+
+  /**
+   * Ask the scheduler to obtain back the container from a specific application
+   * by issuing a preemption request
+   * @param aid the application from which we want to get a container back
+   * @param container the container we want back
+   */
+  void preemptContainer(ApplicationAttemptId aid, RMContainer container);
+
+  /**
+   * Ask the scheduler to forcibly interrupt the container given as input
+   * @param container
+   */
+  void killContainer(RMContainer container);
+
+}

+ 47 - 10
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -61,8 +61,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -83,8 +83,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 @LimitedPrivate("yarn")
 @Evolving
 @SuppressWarnings("unchecked")
-public class CapacityScheduler 
-implements ResourceScheduler, CapacitySchedulerContext, Configurable {
+public class CapacityScheduler
+  implements PreemptableResourceScheduler, CapacitySchedulerContext,
+             Configurable {
 
   private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
 
@@ -525,8 +526,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
     
     // Sanity check
     SchedulerUtils.normalizeRequests(
-        ask, calculator, getClusterResources(), minimumAllocation,
-        maximumAllocation);
+        ask, getResourceCalculator(), getClusterResources(),
+        getMinimumResourceCapability(), maximumAllocation);
 
     // Release containers
     for (ContainerId releasedContainerId : release) {
@@ -578,9 +579,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
           " #ask=" + ask.size());
       }
 
-      return new Allocation(
-          application.pullNewlyAllocatedContainers(), 
-          application.getHeadroom());
+      return application.getAllocation(getResourceCalculator(),
+                   clusterResource, getMinimumResourceCapability());
     }
   }
 
@@ -812,7 +812,8 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
     Container container = rmContainer.getContainer();
     
     // Get the application for the finished container
-    ApplicationAttemptId applicationAttemptId = container.getId().getApplicationAttemptId();
+    ApplicationAttemptId applicationAttemptId =
+      container.getId().getApplicationAttemptId();
     FiCaSchedulerApp application = getApplication(applicationAttemptId);
     if (application == null) {
       LOG.info("Container " + container + " of" +
@@ -869,5 +870,41 @@ implements ResourceScheduler, CapacitySchedulerContext, Configurable {
     FiCaSchedulerNode node = getNode(nodeId);
     return node == null ? null : new SchedulerNodeReport(node);
   }
-  
+
+  @Override
+  public void dropContainerReservation(RMContainer container) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("DROP_RESERVATION:" + container.toString());
+    }
+    completedContainer(container,
+        SchedulerUtils.createAbnormalContainerStatus(
+            container.getContainerId(),
+            SchedulerUtils.UNRESERVED_CONTAINER),
+        RMContainerEventType.KILL);
+  }
+
+  @Override
+  public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
+          " container: " + cont.toString());
+    }
+    FiCaSchedulerApp app = applications.get(aid);
+    if (app != null) {
+      app.addPreemptContainer(cont.getContainerId());
+    }
+  }
+
+  @Override
+  public void killContainer(RMContainer cont) {
+    if(LOG.isDebugEnabled()){
+      LOG.debug("KILL_CONTAINER: container" + cont.toString());
+    }
+    completedContainer(cont,
+        SchedulerUtils.createAbnormalContainerStatus(
+            cont.getContainerId(),"Container being forcibly preempted:"
+        + cont.getContainerId()),
+        RMContainerEventType.KILL);
+  }
+
 }

+ 45 - 26
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -1390,18 +1390,20 @@ public class LeafQueue implements CSQueue {
     node.reserveResource(application, priority, rmContainer);
   }
 
-  private void unreserve(FiCaSchedulerApp application, Priority priority, 
+  private boolean unreserve(FiCaSchedulerApp application, Priority priority,
       FiCaSchedulerNode node, RMContainer rmContainer) {
     // Done with the reservation?
-    application.unreserve(node, priority);
-    node.unreserveResource(application);
-      
+    if (application.unreserve(node, priority)) {
+      node.unreserveResource(application);
+
       // Update reserved metrics
-    getMetrics().unreserveResource(
-        application.getUser(), rmContainer.getContainer().getResource());
+      getMetrics().unreserveResource(
+          application.getUser(), rmContainer.getContainer().getResource());
+      return true;
+    }
+    return false;
   }
 
-
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
@@ -1411,37 +1413,40 @@ public class LeafQueue implements CSQueue {
       synchronized (this) {
 
         Container container = rmContainer.getContainer();
-        
+
+        boolean removed = false;
         // Inform the application & the node
         // Note: It's safe to assume that all state changes to RMContainer
         // happen under scheduler's lock... 
         // So, this is, in effect, a transaction across application & node
         if (rmContainer.getState() == RMContainerState.RESERVED) {
-          unreserve(application, rmContainer.getReservedPriority(), 
+          removed = unreserve(application, rmContainer.getReservedPriority(),
               node, rmContainer);
         } else {
-          application.containerCompleted(rmContainer, containerStatus, event);
+          removed =
+            application.containerCompleted(rmContainer, containerStatus, event);
           node.releaseContainer(container);
         }
 
-
         // Book-keeping
-        releaseResource(clusterResource, 
-            application, container.getResource());
-
-        LOG.info("completedContainer" +
-            " container=" + container +
-            " resource=" + container.getResource() +
-        		" queue=" + this + 
-            " usedCapacity=" + getUsedCapacity() +
-            " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
-            " used=" + usedResources + 
-            " cluster=" + clusterResource);
+        if (removed) {
+          releaseResource(clusterResource,
+              application, container.getResource());
+          LOG.info("completedContainer" +
+              " container=" + container +
+              " resource=" + container.getResource() +
+              " queue=" + this +
+              " usedCapacity=" + getUsedCapacity() +
+              " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
+              " used=" + usedResources +
+              " cluster=" + clusterResource);
+          // Inform the parent queue
+          getParent().completedContainer(clusterResource, application,
+              node, rmContainer, null, event);
+        }
       }
 
-      // Inform the parent queue
-      getParent().completedContainer(clusterResource, application, 
-          node, rmContainer, null, event);
+
     }
   }
 
@@ -1588,5 +1593,19 @@ public class LeafQueue implements CSQueue {
     getParent().recoverContainer(clusterResource, application, container);
 
   }
-  
+
+  // need to access the list of apps from the preemption monitor
+  public Set<FiCaSchedulerApp> getApplications() {
+    return Collections.unmodifiableSet(activeApplications);
+  }
+
+  // return a single Resource capturing the overal amount of pending resources
+  public Resource getTotalResourcePending() {
+    Resource ret = BuilderUtils.newResource(0, 0);
+    for (FiCaSchedulerApp f : activeApplications) {
+      Resources.addTo(ret, f.getTotalPendingRequests());
+    }
+    return ret;
+  }
+
 }

+ 105 - 38
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -20,9 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,7 +41,6 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@@ -53,11 +55,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppSchedulingInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
 import org.apache.hadoop.yarn.util.resource.Resources;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
 import com.google.common.collect.HashMultiset;
 import com.google.common.collect.Multiset;
@@ -85,17 +89,19 @@ public class FiCaSchedulerApp extends SchedulerApplication {
   private Resource resourceLimit = recordFactory
       .newRecordInstance(Resource.class);
 
-  private Map<ContainerId, RMContainer> liveContainers
-  = new HashMap<ContainerId, RMContainer>();
-  private List<RMContainer> newlyAllocatedContainers = 
-      new ArrayList<RMContainer>();
+  private Map<ContainerId, RMContainer> liveContainers =
+    new HashMap<ContainerId, RMContainer>();
+  private List<RMContainer> newlyAllocatedContainers =
+    new ArrayList<RMContainer>();
 
   final Map<Priority, Map<NodeId, RMContainer>> reservedContainers = 
       new HashMap<Priority, Map<NodeId, RMContainer>>();
 
   private boolean isStopped = false;
 
-  
+  private final Set<ContainerId> containersToPreempt =
+    new HashSet<ContainerId>();
+
   /**
    * Count how many times the application has been given an opportunity
    * to schedule a task at each priority. Each time the scheduler
@@ -219,12 +225,17 @@ public class FiCaSchedulerApp extends SchedulerApplication {
       RMContainerEventType.LAUNCHED));
   }
 
-  synchronized public void containerCompleted(RMContainer rmContainer,
+  synchronized public boolean containerCompleted(RMContainer rmContainer,
       ContainerStatus containerStatus, RMContainerEventType event) {
-    
+
+    // Remove from the list of containers
+    if (null == liveContainers.remove(rmContainer.getContainerId())) {
+      return false;
+    }
+
     Container container = rmContainer.getContainer();
     ContainerId containerId = container.getId();
-    
+
     // Inform the container
     rmContainer.handle(
         new RMContainerFinishedEvent(
@@ -234,9 +245,8 @@ public class FiCaSchedulerApp extends SchedulerApplication {
         );
     LOG.info("Completed container: " + rmContainer.getContainerId() + 
         " in state: " + rmContainer.getState() + " event:" + event);
-    
-    // Remove from the list of containers
-    liveContainers.remove(rmContainer.getContainerId());
+
+    containersToPreempt.remove(rmContainer.getContainerId());
 
     RMAuditLogger.logSuccess(getUser(), 
         AuditConstants.RELEASE_CONTAINER, "SchedulerApp", 
@@ -246,6 +256,8 @@ public class FiCaSchedulerApp extends SchedulerApplication {
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
     Resources.subtractFrom(currentConsumption, containerResource);
+
+    return true;
   }
 
   synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
@@ -345,7 +357,8 @@ public class FiCaSchedulerApp extends SchedulerApplication {
   }
   
   /**
-   * Return the number of times the application has been given an opportunity
+   * @param priority Target priority
+   * @return the number of times the application has been given an opportunity
    * to schedule a task at the given priority since the last time it
    * successfully did so.
    */
@@ -419,33 +432,36 @@ public class FiCaSchedulerApp extends SchedulerApplication {
     return rmContainer;
   }
 
-  public synchronized void unreserve(FiCaSchedulerNode node, Priority priority) {
-    Map<NodeId, RMContainer> reservedContainers = 
-        this.reservedContainers.get(priority);
-    RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
-    if (reservedContainers.isEmpty()) {
-      this.reservedContainers.remove(priority);
-    }
-    
-    // reservedContainer should not be null here
-    if (reservedContainer == null) {
-      String errorMesssage =
-          "Application " + getApplicationId() + " is trying to unreserve "
-              + " on node " + node + ", currently has "
-              + reservedContainers.size() + " at priority " + priority
-              + "; currentReservation " + currentReservation;
-      LOG.warn(errorMesssage);
-      throw new YarnRuntimeException(errorMesssage);
-    }
-    // Reset the re-reservation count
-    resetReReservations(priority);
+  public synchronized boolean unreserve(FiCaSchedulerNode node, Priority priority) {
+    Map<NodeId, RMContainer> reservedContainers =
+      this.reservedContainers.get(priority);
+
+    if (reservedContainers != null) {
+      RMContainer reservedContainer = reservedContainers.remove(node.getNodeID());
 
-    Resource resource = reservedContainer.getContainer().getResource();
-    Resources.subtractFrom(currentReservation, resource);
+      // unreserve is now triggered in new scenarios (preemption)
+      // as a consequence reservedcontainer might be null, adding NP-checks
+      if (reservedContainer != null
+          && reservedContainer.getContainer() != null
+          && reservedContainer.getContainer().getResource() != null) {
 
-    LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedContainers.size() + " at priority "
-        + priority + "; currentReservation " + currentReservation);
+        if (reservedContainers.isEmpty()) {
+          this.reservedContainers.remove(priority);
+        }
+        // Reset the re-reservation count
+        resetReReservations(priority);
+
+        Resource resource = reservedContainer.getContainer().getResource();
+        Resources.subtractFrom(currentReservation, resource);
+
+        LOG.info("Application " + getApplicationId() + " unreserved "
+            + " on node " + node + ", currently has " + reservedContainers.size()
+            + " at priority " + priority + "; currentReservation "
+            + currentReservation);
+        return true;
+      }
+    }
+    return false;
   }
 
   /**
@@ -509,4 +525,55 @@ public class FiCaSchedulerApp extends SchedulerApplication {
   public Queue getQueue() {
     return queue;
   }
+
+  public Resource getTotalPendingRequests() {
+    Resource ret = Resource.newInstance(0, 0);
+    for (ResourceRequest rr : appSchedulingInfo.getAllResourceRequests()) {
+      // to avoid double counting we count only "ANY" resource requests
+      if (ResourceRequest.isAnyLocation(rr.getResourceName())){
+        Resources.addTo(ret,
+            Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+      }
+    }
+    return ret;
+  }
+
+  public synchronized void addPreemptContainer(ContainerId cont){
+    // ignore already completed containers
+    if (liveContainers.containsKey(cont)) {
+      containersToPreempt.add(cont);
+    }
+  }
+
+  /**
+   * This method produces an Allocation that includes the current view
+   * of the resources that will be allocated to and preempted from this
+   * application.
+   *
+   * @param rc
+   * @param clusterResource
+   * @param minimumAllocation
+   * @return an allocation
+   */
+  public synchronized Allocation getAllocation(ResourceCalculator rc,
+      Resource clusterResource, Resource minimumAllocation) {
+
+    Set<ContainerId> currentContPreemption = Collections.unmodifiableSet(
+        new HashSet<ContainerId>(containersToPreempt));
+    containersToPreempt.clear();
+    Resource tot = Resource.newInstance(0, 0);
+    for(ContainerId c : currentContPreemption){
+      Resources.addTo(tot,
+          liveContainers.get(c).getContainer().getResource());
+    }
+    int numCont = (int) Math.ceil(
+        Resources.divide(rc, clusterResource, tot, minimumAllocation));
+    ResourceRequest rr = ResourceRequest.newInstance(
+        Priority.UNDEFINED, ResourceRequest.ANY,
+        minimumAllocation, numCont);
+    return new Allocation(pullNewlyAllocatedContainers(), getHeadroom(),
+                          null, currentContPreemption,
+                          Collections.singletonList(rr));
+  }
+
 }

+ 21 - 13
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java

@@ -142,8 +142,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
     }
 
     /* remove the containers from the nodemanger */
-    launchedContainers.remove(container.getId());
-    updateResource(container);
+    if (null != launchedContainers.remove(container.getId())) {
+      updateResource(container);
+    }
 
     LOG.info("Released container " + container.getId() + 
         " of capacity " + container.getResource() + " on host " + rmNode.getNodeAddress() + 
@@ -226,18 +227,25 @@ public class FiCaSchedulerNode extends SchedulerNode {
 
   public synchronized void unreserveResource(
       SchedulerApplication application) {
-    // Cannot unreserve for wrong application...
-    ApplicationAttemptId reservedApplication = 
-        reservedContainer.getContainer().getId().getApplicationAttemptId(); 
-    if (!reservedApplication.equals(
-        application.getApplicationAttemptId())) {
-      throw new IllegalStateException("Trying to unreserve " +  
-          " for application " + application.getApplicationAttemptId() + 
-          " when currently reserved " + 
-          " for application " + reservedApplication.getApplicationId() + 
-          " on node " + this);
-    }
     
+    // adding NP checks as this can now be called for preemption
+    if (reservedContainer != null
+        && reservedContainer.getContainer() != null
+        && reservedContainer.getContainer().getId() != null
+        && reservedContainer.getContainer().getId().getApplicationAttemptId() != null) {
+
+      // Cannot unreserve for wrong application...
+      ApplicationAttemptId reservedApplication =
+          reservedContainer.getContainer().getId().getApplicationAttemptId();
+      if (!reservedApplication.equals(
+          application.getApplicationAttemptId())) {
+        throw new IllegalStateException("Trying to unreserve " +
+            " for application " + application.getApplicationAttemptId() +
+            " when currently reserved " +
+            " for application " + reservedApplication.getApplicationId() +
+            " on node " + this);
+      }
+    }
     reservedContainer = null;
   }
 

+ 541 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -0,0 +1,541 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.TreeSet;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.mockito.ArgumentCaptor;
+import org.mockito.ArgumentMatcher;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+public class TestProportionalCapacityPreemptionPolicy {
+
+  static final long TS = 3141592653L;
+
+  int appAlloc = 0;
+  Random rand = null;
+  Clock mClock = null;
+  Configuration conf = null;
+  CapacityScheduler mCS = null;
+  EventHandler<ContainerPreemptEvent> mDisp = null;
+  ResourceCalculator rc = new DefaultResourceCalculator();
+  final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
+      ApplicationId.newInstance(TS, 0), 0);
+  final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
+      ApplicationId.newInstance(TS, 1), 0);
+  final ApplicationAttemptId appC = ApplicationAttemptId.newInstance(
+      ApplicationId.newInstance(TS, 2), 0);
+  final ApplicationAttemptId appD = ApplicationAttemptId.newInstance(
+      ApplicationId.newInstance(TS, 3), 0);
+  final ApplicationAttemptId appE = ApplicationAttemptId.newInstance(
+      ApplicationId.newInstance(TS, 4), 0);
+  final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
+    ArgumentCaptor.forClass(ContainerPreemptEvent.class);
+
+  @Rule public TestName name = new TestName();
+
+  @Before
+  @SuppressWarnings("unchecked")
+  public void setup() {
+    conf = new Configuration(false);
+    conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
+    conf.setLong(MONITORING_INTERVAL, 3000);
+    // report "ideal" preempt
+    conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
+    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
+
+    mClock = mock(Clock.class);
+    mCS = mock(CapacityScheduler.class);
+    when(mCS.getResourceCalculator()).thenReturn(rc);
+    mDisp = mock(EventHandler.class);
+    rand = new Random();
+    long seed = rand.nextLong();
+    System.out.println(name.getMethodName() + " SEED: " + seed);
+    rand.setSeed(seed);
+    appAlloc = 0;
+  }
+
+  @Test
+  public void testIgnore() {
+    int[][] qData = new int[][]{
+      //  /   A   B   C
+      { 100, 40, 40, 20 },  // abs
+      { 100,  0, 60, 40 },  // used
+      {   0,  0,  0,  0 },  // pending
+      {   0,  0,  0,  0 },  // reserved
+      {   3,  1,  1,  1 },  // apps
+      {  -1,  1,  1,  1 },  // req granularity
+      {   3,  0,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // don't correct imbalances without demand
+    verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
+  }
+
+  @Test
+  public void testProportionalPreemption() {
+    int[][] qData = new int[][]{
+      //  /   A   B   C  D
+      { 100, 10, 40, 20, 30 },  // abs
+      { 100, 30, 60, 10,  0 },  // used
+      {  45, 20,  5, 20,  0 },  // pending
+      {   0,  0,  0,  0,  0 },  // reserved
+      {   3,  1,  1,  1,  0 },  // apps
+      {  -1,  1,  1,  1,  1 },  // req granularity
+      {   4,  0,  0,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    verify(mDisp, times(16)).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+
+  @Test
+  public void testPreemptCycle() {
+    int[][] qData = new int[][]{
+      //  /   A   B   C
+      { 100, 40, 40, 20 },  // abs
+      { 100,  0, 60, 40 },  // used
+      {  10, 10,  0,  0 },  // pending
+      {   0,  0,  0,  0 },  // reserved
+      {   3,  1,  1,  1 },  // apps
+      {  -1,  1,  1,  1 },  // req granularity
+      {   3,  0,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // ensure all pending rsrc from A get preempted from other queues
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
+  }
+
+  @Test
+  public void testExpireKill() {
+    final long killTime = 10000L;
+    int[][] qData = new int[][]{
+      //  /   A   B   C
+      { 100, 40, 40, 20 },  // abs
+      { 100,  0, 60, 40 },  // used
+      {  10, 10,  0,  0 },  // pending
+      {   0,  0,  0,  0 },  // reserved
+      {   3,  1,  1,  1 },  // apps
+      {  -1,  1,  1,  1 },  // req granularity
+      {   3,  0,  0,  0 },  // subqueues
+    };
+    conf.setLong(WAIT_TIME_BEFORE_KILL, killTime);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+
+    // ensure all pending rsrc from A get preempted from other queues
+    when(mClock.getTime()).thenReturn(0L);
+    policy.editSchedule();
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // requests reiterated
+    when(mClock.getTime()).thenReturn(killTime / 2);
+    policy.editSchedule();
+    verify(mDisp, times(20)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // kill req sent
+    when(mClock.getTime()).thenReturn(killTime + 1);
+    policy.editSchedule();
+    verify(mDisp, times(30)).handle(evtCaptor.capture());
+    List<ContainerPreemptEvent> events = evtCaptor.getAllValues();
+    for (ContainerPreemptEvent e : events.subList(20, 30)) {
+      assertEquals(appC, e.getAppId());
+      assertEquals(KILL_CONTAINER, e.getType());
+    }
+  }
+
+  @Test
+  public void testDeadzone() {
+    int[][] qData = new int[][]{
+      //  /   A   B   C
+      { 100, 40, 40, 20 },  // abs
+      { 100, 39, 43, 21 },  // used
+      {  10, 10,  0,  0 },  // pending
+      {   0,  0,  0,  0 },  // reserved
+      {   3,  1,  1,  1 },  // apps
+      {  -1,  1,  1,  1 },  // req granularity
+      {   3,  0,  0,  0 },  // subqueues
+    };
+    conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // ignore 10% overcapacity to avoid jitter
+    verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
+  }
+
+  @Test
+  public void testOverCapacityImbalance() {
+    int[][] qData = new int[][]{
+      //  /   A   B   C
+      { 100, 40, 40, 20 },  // abs
+      { 100, 55, 45,  0 },  // used
+      {  20, 10, 10,  0 },  // pending
+      {   0,  0,  0,  0 },  // reserved
+      {   2,  1,  1,  0 },  // apps
+      {  -1,  1,  1,  0 },  // req granularity
+      {   3,  0,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // correct imbalance between over-capacity queues
+    verify(mDisp, times(5)).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+
+  @Test
+  public void testNaturalTermination() {
+    int[][] qData = new int[][]{
+      //  /   A   B   C
+      { 100, 40, 40, 20 },  // abs
+      { 100, 55, 45,  0 },  // used
+      {  20, 10, 10,  0 },  // pending
+      {   0,  0,  0,  0 },  // reserved
+      {   2,  1,  1,  0 },  // apps
+      {  -1,  1,  1,  0 },  // req granularity
+      {   3,  0,  0,  0 },  // subqueues
+    };
+    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // ignore 10% imbalance between over-capacity queues
+    verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
+  }
+
+  @Test
+  public void testObserveOnly() {
+    int[][] qData = new int[][]{
+      //  /   A   B   C
+      { 100, 40, 40, 20 },  // abs
+      { 100, 90, 10,  0 },  // used
+      {  80, 10, 20, 50 },  // pending
+      {   0,  0,  0,  0 },  // reserved
+      {   2,  1,  1,  0 },  // apps
+      {  -1,  1,  1,  0 },  // req granularity
+      {   3,  0,  0,  0 },  // subqueues
+    };
+    conf.setBoolean(OBSERVE_ONLY, true);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // verify even severe imbalance not affected
+    verify(mDisp, never()).handle(isA(ContainerPreemptEvent.class));
+  }
+
+  @Test
+  public void testHierarchical() {
+    int[][] qData = new int[][] {
+      //  /    A   B   C    D   E   F
+      { 200, 100, 50, 50, 100, 10, 90 },  // abs
+      { 200, 110, 60, 50,  90, 90,  0 },  // used
+      {  10,   0,  0,  0,  10,  0, 10 },  // pending
+      {   0,   0,  0,  0,   0,  0,  0 },  // reserved
+      {   4,   2,  1,  1,   2,  1,  1 },  // apps
+      {  -1,  -1,  1,  1,  -1,  1,  1 },  // req granularity
+      {   2,   2,  0,  0,   2,  0,  0 },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // verify capacity taken from A1, not B1 despite B1 being far over
+    // its absolute guaranteed capacity
+    verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
+  }
+
+  @Test
+  public void testHierarchicalLarge() {
+    int[][] qData = new int[][] {
+      //  /    A   B   C    D   E   F    G   H   I
+      { 400, 200, 60,140, 100, 70, 30, 100, 10, 90  },  // abs
+      { 400, 210, 70,140, 100, 50, 50,  90, 90,  0  },  // used
+      {  10,   0,  0,  0,   0,  0,  0,   0,  0, 15  },  // pending
+      {   0,   0,  0,  0,   0,  0,  0,   0,  0,  0  },  // reserved
+      {   6,   2,  1,  1,   2,  1,  1,   2,  1,  1  },  // apps
+      {  -1,  -1,  1,  1,  -1,  1,  1,  -1,  1,  1  },  // req granularity
+      {   3,   2,  0,  0,   2,  0,  0,   2,  0,  0  },  // subqueues
+    };
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    policy.editSchedule();
+    // verify capacity taken from A1, not H1 despite H1 being far over
+    // its absolute guaranteed capacity
+
+    // XXX note: compensating for rounding error in Resources.multiplyTo
+    // which is likely triggered since we use small numbers for readability
+    verify(mDisp, times(9)).handle(argThat(new IsPreemptionRequestFor(appA)));
+    verify(mDisp, times(4)).handle(argThat(new IsPreemptionRequestFor(appE)));
+  }
+
+  @Test
+  public void testContainerOrdering(){
+
+    List<RMContainer> containers = new ArrayList<RMContainer>();
+
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(
+        ApplicationId.newInstance(TS, 10), 0);
+
+    // create a set of containers
+    RMContainer rm1 = mockContainer(appAttId, 5, mock(Resource.class), 3);
+    RMContainer rm2 = mockContainer(appAttId, 3, mock(Resource.class), 3);
+    RMContainer rm3 = mockContainer(appAttId, 2, mock(Resource.class), 2);
+    RMContainer rm4 = mockContainer(appAttId, 1, mock(Resource.class), 2);
+    RMContainer rm5 = mockContainer(appAttId, 4, mock(Resource.class), 1);
+
+    // insert them in non-sorted order
+    containers.add(rm3);
+    containers.add(rm2);
+    containers.add(rm1);
+    containers.add(rm5);
+    containers.add(rm4);
+
+    // sort them
+    ProportionalCapacityPreemptionPolicy.sortContainers(containers);
+
+    // verify the "priority"-first, "reverse container-id"-second
+    // ordering is enforced correctly
+    assert containers.get(0).equals(rm1);
+    assert containers.get(1).equals(rm2);
+    assert containers.get(2).equals(rm3);
+    assert containers.get(3).equals(rm4);
+    assert containers.get(4).equals(rm5);
+
+  }
+
+  static class IsPreemptionRequestFor
+      extends ArgumentMatcher<ContainerPreemptEvent> {
+    private final ApplicationAttemptId appAttId;
+    private final ContainerPreemptEventType type;
+    IsPreemptionRequestFor(ApplicationAttemptId appAttId) {
+      this(appAttId, PREEMPT_CONTAINER);
+    }
+    IsPreemptionRequestFor(ApplicationAttemptId appAttId,
+        ContainerPreemptEventType type) {
+      this.appAttId = appAttId;
+      this.type = type;
+    }
+    @Override
+    public boolean matches(Object o) {
+      return appAttId.equals(((ContainerPreemptEvent)o).getAppId())
+          && type.equals(((ContainerPreemptEvent)o).getType());
+    }
+    @Override
+    public String toString() {
+      return appAttId.toString();
+    }
+  }
+
+  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
+    ProportionalCapacityPreemptionPolicy policy =
+      new ProportionalCapacityPreemptionPolicy(conf, mDisp, mCS, mClock);
+    ParentQueue mRoot = buildMockRootQueue(rand, qData);
+    when(mCS.getRootQueue()).thenReturn(mRoot);
+
+    Resource clusterResources =
+      Resource.newInstance(leafAbsCapacities(qData[0], qData[6]), 0);
+    when(mCS.getClusterResources()).thenReturn(clusterResources);
+    return policy;
+  }
+
+  ParentQueue buildMockRootQueue(Random r, int[]... queueData) {
+    int[] abs      = queueData[0];
+    int[] used     = queueData[1];
+    int[] pending  = queueData[2];
+    int[] reserved = queueData[3];
+    int[] apps     = queueData[4];
+    int[] gran     = queueData[5];
+    int[] queues   = queueData[6];
+
+    return mockNested(abs, used, pending, reserved, apps, gran, queues);
+  }
+
+  ParentQueue mockNested(int[] abs, int[] used,
+      int[] pending, int[] reserved, int[] apps, int[] gran, int[] queues) {
+    float tot = leafAbsCapacities(abs, queues);
+    Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
+    ParentQueue root = mockParentQueue(null, queues[0], pqs);
+    when(root.getQueueName()).thenReturn("/");
+    when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
+    when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
+    for (int i = 1; i < queues.length; ++i) {
+      final CSQueue q;
+      final ParentQueue p = pqs.removeLast();
+      final String queueName = "queue" + ((char)('A' + i - 1));
+      if (queues[i] > 0) {
+        q = mockParentQueue(p, queues[i], pqs);
+      } else {
+        q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
+      }
+      when(q.getParent()).thenReturn(p);
+      when(q.getQueueName()).thenReturn(queueName);
+      when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
+      when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
+    }
+    assert 0 == pqs.size();
+    return root;
+  }
+
+  ParentQueue mockParentQueue(ParentQueue p, int subqueues,
+      Deque<ParentQueue> pqs) {
+    ParentQueue pq = mock(ParentQueue.class);
+    List<CSQueue> cqs = new ArrayList<CSQueue>();
+    when(pq.getChildQueues()).thenReturn(cqs);
+    for (int i = 0; i < subqueues; ++i) {
+      pqs.add(pq);
+    }
+    if (p != null) {
+      p.getChildQueues().add(pq);
+    }
+    return pq;
+  }
+
+  LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
+      int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
+    LeafQueue lq = mock(LeafQueue.class);
+    when(lq.getTotalResourcePending()).thenReturn(
+        Resource.newInstance(pending[i], 0));
+    // consider moving where CapacityScheduler::comparator accessible
+    NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
+      new Comparator<FiCaSchedulerApp>() {
+        @Override
+        public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+          return a1.getApplicationAttemptId()
+              .compareTo(a2.getApplicationAttemptId());
+        }
+      });
+    // applications are added in global L->R order in queues
+    if (apps[i] != 0) {
+      int aUsed    = used[i] / apps[i];
+      int aPending = pending[i] / apps[i];
+      int aReserve = reserved[i] / apps[i];
+      for (int a = 0; a < apps[i]; ++a) {
+        qApps.add(mockApp(i, appAlloc, aUsed, aPending, aReserve, gran[i]));
+        ++appAlloc;
+      }
+    }
+    when(lq.getApplications()).thenReturn(qApps);
+    p.getChildQueues().add(lq);
+    return lq;
+  }
+
+  FiCaSchedulerApp mockApp(int qid, int id, int used, int pending, int reserved,
+      int gran) {
+    FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+
+    ApplicationId appId = ApplicationId.newInstance(TS, id);
+    ApplicationAttemptId appAttId = ApplicationAttemptId.newInstance(appId, 0);
+    when(app.getApplicationId()).thenReturn(appId);
+    when(app.getApplicationAttemptId()).thenReturn(appAttId);
+
+    int cAlloc = 0;
+    Resource unit = Resource.newInstance(gran, 0);
+    List<RMContainer> cReserved = new ArrayList<RMContainer>();
+    for (int i = 0; i < reserved; i += gran) {
+      cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
+      ++cAlloc;
+    }
+    when(app.getReservedContainers()).thenReturn(cReserved);
+
+    List<RMContainer> cLive = new ArrayList<RMContainer>();
+    for (int i = 0; i < used; i += gran) {
+      cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+      ++cAlloc;
+    }
+    when(app.getLiveContainers()).thenReturn(cLive);
+    return app;
+  }
+
+  RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
+      Resource r, int priority) {
+    ContainerId cId = ContainerId.newInstance(appAttId, id);
+    Container c = mock(Container.class);
+    when(c.getResource()).thenReturn(r);
+    when(c.getPriority()).thenReturn(Priority.create(priority));
+    RMContainer mC = mock(RMContainer.class);
+    when(mC.getContainerId()).thenReturn(cId);
+    when(mC.getContainer()).thenReturn(c);
+    return mC;
+  }
+
+  static int leafAbsCapacities(int[] abs, int[] subqueues) {
+    int ret = 0;
+    for (int i = 0; i < abs.length; ++i) {
+      if (0 == subqueues[i]) {
+        ret += abs[i];
+      }
+    }
+    return ret;
+  }
+
+  void printString(CSQueue nq, String indent) {
+    if (nq instanceof ParentQueue) {
+      System.out.println(indent + nq.getQueueName()
+          + " cur:" + nq.getAbsoluteUsedCapacity()
+          + " guar:" + nq.getAbsoluteCapacity()
+          );
+      for (CSQueue q : ((ParentQueue)nq).getChildQueues()) {
+        printString(q, indent + "  ");
+      }
+    } else {
+      System.out.println(indent + nq.getQueueName()
+          + " pen:" + ((LeafQueue) nq).getTotalResourcePending()
+          + " cur:" + nq.getAbsoluteUsedCapacity()
+          + " guar:" + nq.getAbsoluteCapacity()
+          );
+      for (FiCaSchedulerApp a : ((LeafQueue)nq).getApplications()) {
+        System.out.println(indent + "  " + a.getApplicationId());
+      }
+    }
+  }
+
+}