Selaa lähdekoodia

YARN-4822. Refactor existing Preemption Policy of CS for easier adding new approach to select preemption candidates. Contributed by Wangda Tan

Jian He 9 vuotta sitten
vanhempi
commit
60e4116bf1
15 muutettua tiedostoa jossa 1393 lisäystä ja 844 poistoa
  1. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java
  2. 0 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
  3. 52 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
  4. 65 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
  5. 364 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
  6. 370 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
  7. 52 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
  8. 169 716
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
  9. 159 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
  10. 45 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
  11. 0 6
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java
  12. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java
  13. 67 66
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
  14. 35 43
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
  15. 10 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingEditPolicy.java

@@ -23,7 +23,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour
 
 public interface SchedulingEditPolicy {
 
-  public void init(Configuration config, RMContext context,
+  void init(Configuration config, RMContext context,
       PreemptableResourceScheduler scheduler);
 
   /**
@@ -31,10 +31,10 @@ public interface SchedulingEditPolicy {
    * allowed to track containers and affect the scheduler. The "actions"
    * performed are passed back through an EventHandler.
    */
-  public void editSchedule();
+  void editSchedule();
 
-  public long getMonitoringInterval();
+  long getMonitoringInterval();
 
-  public String getPolicyName();
+  String getPolicyName();
 
 }

+ 0 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java

@@ -45,10 +45,6 @@ public class SchedulingMonitor extends AbstractService {
     this.rmContext = rmContext;
   }
 
-  public long getMonitorInterval() {
-    return monitorInterval;
-  }
-  
   @VisibleForTesting
   public synchronized SchedulingEditPolicy getSchedulingEditPolicy() {
     return scheduleEditPolicy;

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
+import java.util.Collection;
+import java.util.Set;
+
+interface CapacitySchedulerPreemptionContext {
+  CapacityScheduler getScheduler();
+
+  TempQueuePerPartition getQueueByPartition(String queueName,
+      String partition);
+
+  Collection<TempQueuePerPartition> getQueuePartitions(String queueName);
+
+  ResourceCalculator getResourceCalculator();
+
+  RMContext getRMContext();
+
+  boolean isObserveOnly();
+
+  Set<ContainerId> getKillableContainers();
+
+  double getMaxIgnoreOverCapacity();
+
+  double getNaturalTerminationFactor();
+
+  Set<String> getLeafQueueNames();
+
+  Set<String> getAllPartitions();
+}

+ 65 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java

@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+public class CapacitySchedulerPreemptionUtils {
+  public static Map<String, Resource> getResToObtainByPartitionForLeafQueue(
+      CapacitySchedulerPreemptionContext context, String queueName,
+      Resource clusterResource) {
+    Map<String, Resource> resToObtainByPartition = new HashMap<>();
+    // compute resToObtainByPartition considered inter-queue preemption
+    for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) {
+      if (qT.preemptionDisabled) {
+        continue;
+      }
+
+      //  Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
+      if (Resources.greaterThan(context.getResourceCalculator(),
+          clusterResource, qT.actuallyToBePreempted, Resources.none())) {
+        resToObtainByPartition.put(qT.partition,
+            Resources.clone(qT.actuallyToBePreempted));
+      }
+    }
+
+    return resToObtainByPartition;
+  }
+
+  public static boolean isContainerAlreadySelected(RMContainer container,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+    if (null == selectedCandidates) {
+      return false;
+    }
+
+    Set<RMContainer> containers = selectedCandidates.get(
+        container.getApplicationAttemptId());
+    if (containers == null) {
+      return false;
+    }
+    return containers.contains(container);
+  }
+}

+ 364 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java

@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+public class FifoCandidatesSelector
+    extends PreemptionCandidatesSelector {
+  private static final Log LOG =
+      LogFactory.getLog(FifoCandidatesSelector.class);
+  private PreemptableResourceCalculator preemptableAmountCalculator;
+
+  FifoCandidatesSelector(
+      CapacitySchedulerPreemptionContext preemptionContext) {
+    super(preemptionContext);
+
+    preemptableAmountCalculator = new PreemptableResourceCalculator(
+        preemptionContext);
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource clusterResource, Resource totalPreemptionAllowed) {
+    // Calculate how much resources we need to preempt
+    preemptableAmountCalculator.computeIdealAllocation(clusterResource,
+        totalPreemptionAllowed);
+
+    Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
+        new HashMap<>();
+    List<RMContainer> skippedAMContainerlist = new ArrayList<>();
+
+    // Loop all leaf queues
+    for (String queueName : preemptionContext.getLeafQueueNames()) {
+      // check if preemption disabled for the queue
+      if (preemptionContext.getQueueByPartition(queueName,
+          RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("skipping from queue=" + queueName
+              + " because it's a non-preemptable queue");
+        }
+        continue;
+      }
+
+      // compute resToObtainByPartition considered inter-queue preemption
+      LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+          RMNodeLabelsManager.NO_LABEL).leafQueue;
+
+      Map<String, Resource> resToObtainByPartition =
+          CapacitySchedulerPreemptionUtils
+              .getResToObtainByPartitionForLeafQueue(preemptionContext,
+                  queueName, clusterResource);
+
+      synchronized (leafQueue) {
+        // go through all ignore-partition-exclusivity containers first to make
+        // sure such containers will be preemptionCandidates first
+        Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
+            leafQueue.getIgnoreExclusivityRMContainers();
+        for (String partition : resToObtainByPartition.keySet()) {
+          if (ignorePartitionExclusivityContainers.containsKey(partition)) {
+            TreeSet<RMContainer> rmContainers =
+                ignorePartitionExclusivityContainers.get(partition);
+            // We will check container from reverse order, so latter submitted
+            // application's containers will be preemptionCandidates first.
+            for (RMContainer c : rmContainers.descendingSet()) {
+              if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+                  selectedCandidates)) {
+                // Skip already selected containers
+                continue;
+              }
+              boolean preempted = tryPreemptContainerAndDeductResToObtain(
+                  resToObtainByPartition, c, clusterResource, preemptMap,
+                  totalPreemptionAllowed);
+              if (!preempted) {
+                continue;
+              }
+            }
+          }
+        }
+
+        // preempt other containers
+        Resource skippedAMSize = Resource.newInstance(0, 0);
+        Iterator<FiCaSchedulerApp> desc =
+            leafQueue.getOrderingPolicy().getPreemptionIterator();
+        while (desc.hasNext()) {
+          FiCaSchedulerApp fc = desc.next();
+          // When we complete preempt from one partition, we will remove from
+          // resToObtainByPartition, so when it becomes empty, we can get no
+          // more preemption is needed
+          if (resToObtainByPartition.isEmpty()) {
+            break;
+          }
+
+          preemptFrom(fc, clusterResource, resToObtainByPartition,
+              skippedAMContainerlist, skippedAMSize, preemptMap,
+              totalPreemptionAllowed);
+        }
+
+        // Can try preempting AMContainers (still saving atmost
+        // maxAMCapacityForThisQueue AMResource's) if more resources are
+        // required to be preemptionCandidates from this Queue.
+        Resource maxAMCapacityForThisQueue = Resources.multiply(
+            Resources.multiply(clusterResource,
+                leafQueue.getAbsoluteCapacity()),
+            leafQueue.getMaxAMResourcePerQueuePercent());
+
+        preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
+            resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
+            totalPreemptionAllowed);
+      }
+    }
+
+    return preemptMap;
+  }
+
+  /**
+   * As more resources are needed for preemption, saved AMContainers has to be
+   * rescanned. Such AMContainers can be preemptionCandidates based on resToObtain, but
+   * maxAMCapacityForThisQueue resources will be still retained.
+   *
+   * @param clusterResource
+   * @param preemptMap
+   * @param skippedAMContainerlist
+   * @param skippedAMSize
+   * @param maxAMCapacityForThisQueue
+   */
+  private void preemptAMContainers(Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      List<RMContainer> skippedAMContainerlist,
+      Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
+      Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) {
+    for (RMContainer c : skippedAMContainerlist) {
+      // Got required amount of resources for preemption, can stop now
+      if (resToObtainByPartition.isEmpty()) {
+        break;
+      }
+      // Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
+      // container selection iteration for preemption will be stopped.
+      if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
+          maxAMCapacityForThisQueue)) {
+        break;
+      }
+
+      boolean preempted =
+          tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+              clusterResource, preemptMap, totalPreemptionAllowed);
+      if (preempted) {
+        Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
+      }
+    }
+    skippedAMContainerlist.clear();
+  }
+
+  private boolean preemptMapContains(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId attemptId, RMContainer rmContainer) {
+    Set<RMContainer> rmContainers;
+    if (null == (rmContainers = preemptMap.get(attemptId))) {
+      return false;
+    }
+    return rmContainers.contains(rmContainer);
+  }
+
+  /**
+   * Return should we preempt rmContainer. If we should, deduct from
+   * <code>resourceToObtainByPartition</code>
+   */
+  private boolean tryPreemptContainerAndDeductResToObtain(
+      Map<String, Resource> resourceToObtainByPartitions,
+      RMContainer rmContainer, Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      Resource totalPreemptionAllowed) {
+    ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
+
+    // We will not account resource of a container twice or more
+    if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
+      return false;
+    }
+
+    String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
+    Resource toObtainByPartition =
+        resourceToObtainByPartitions.get(nodePartition);
+
+    if (null != toObtainByPartition && Resources.greaterThan(rc,
+        clusterResource, toObtainByPartition, Resources.none()) && Resources
+        .fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(),
+            totalPreemptionAllowed)) {
+      Resources.subtractFrom(toObtainByPartition,
+          rmContainer.getAllocatedResource());
+      Resources.subtractFrom(totalPreemptionAllowed,
+          rmContainer.getAllocatedResource());
+
+      // When we have no more resource need to obtain, remove from map.
+      if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
+          Resources.none())) {
+        resourceToObtainByPartitions.remove(nodePartition);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Marked container=" + rmContainer.getContainerId()
+            + " in partition=" + nodePartition
+            + " to be preemption candidates");
+      }
+      // Add to preemptMap
+      addToPreemptMap(preemptMap, attemptId, rmContainer);
+      return true;
+    }
+
+    return false;
+  }
+
+  private String getPartitionByNodeId(NodeId nodeId) {
+    return preemptionContext.getScheduler().getSchedulerNode(nodeId)
+        .getPartition();
+  }
+
+  /**
+   * Given a target preemption for a specific application, select containers
+   * to preempt (after unreserving all reservation for that app).
+   */
+  @SuppressWarnings("unchecked")
+  private void preemptFrom(FiCaSchedulerApp app,
+      Resource clusterResource, Map<String, Resource> resToObtainByPartition,
+      List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
+      Resource totalPreemptionAllowed) {
+    ApplicationAttemptId appId = app.getApplicationAttemptId();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Looking at application=" + app.getApplicationAttemptId()
+          + " resourceToObtain=" + resToObtainByPartition);
+    }
+
+    // first drop reserved containers towards rsrcPreempt
+    List<RMContainer> reservedContainers =
+        new ArrayList<>(app.getReservedContainers());
+    for (RMContainer c : reservedContainers) {
+      if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+          selectedContainers)) {
+        continue;
+      }
+      if (resToObtainByPartition.isEmpty()) {
+        return;
+      }
+
+      // Try to preempt this container
+      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+          clusterResource, selectedContainers, totalPreemptionAllowed);
+
+      if (!preemptionContext.isObserveOnly()) {
+        preemptionContext.getRMContext().getDispatcher().getEventHandler()
+            .handle(new ContainerPreemptEvent(appId, c,
+                SchedulerEventType.KILL_RESERVED_CONTAINER));
+      }
+    }
+
+    // if more resources are to be freed go through all live containers in
+    // reverse priority and reverse allocation order and mark them for
+    // preemption
+    List<RMContainer> liveContainers =
+        new ArrayList<>(app.getLiveContainers());
+
+    sortContainers(liveContainers);
+
+    for (RMContainer c : liveContainers) {
+      if (resToObtainByPartition.isEmpty()) {
+        return;
+      }
+
+      if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+          selectedContainers)) {
+        continue;
+      }
+
+      // Skip already marked to killable containers
+      if (null != preemptionContext.getKillableContainers() && preemptionContext
+          .getKillableContainers().contains(c.getContainerId())) {
+        continue;
+      }
+
+      // Skip AM Container from preemption for now.
+      if (c.isAMContainer()) {
+        skippedAMContainerlist.add(c);
+        Resources.addTo(skippedAMSize, c.getAllocatedResource());
+        continue;
+      }
+
+      // Try to preempt this container
+      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+          clusterResource, selectedContainers, totalPreemptionAllowed);
+    }
+  }
+
+  /**
+   * Compare by reversed priority order first, and then reversed containerId
+   * order
+   * @param containers
+   */
+  @VisibleForTesting
+  static void sortContainers(List<RMContainer> containers){
+    Collections.sort(containers, new Comparator<RMContainer>() {
+      @Override
+      public int compare(RMContainer a, RMContainer b) {
+        Comparator<Priority> c = new org.apache.hadoop.yarn.server
+            .resourcemanager.resource.Priority.Comparator();
+        int priorityComp = c.compare(b.getContainer().getPriority(),
+            a.getContainer().getPriority());
+        if (priorityComp != 0) {
+          return priorityComp;
+        }
+        return b.getContainerId().compareTo(a.getContainerId());
+      }
+    });
+  }
+
+  private void addToPreemptMap(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
+    Set<RMContainer> set;
+    if (null == (set = preemptMap.get(appAttemptId))) {
+      set = new HashSet<>();
+      preemptMap.put(appAttemptId, set);
+    }
+    set.add(containerToPreempt);
+  }
+}

+ 370 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java

@@ -0,0 +1,370 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+/**
+ * Calculate how much resources need to be preempted for each queue,
+ * will be used by {@link FifoCandidatesSelector}
+ */
+public class PreemptableResourceCalculator {
+  private static final Log LOG =
+      LogFactory.getLog(PreemptableResourceCalculator.class);
+
+  private final CapacitySchedulerPreemptionContext context;
+  private final ResourceCalculator rc;
+
+  static class TQComparator implements Comparator<TempQueuePerPartition> {
+    private ResourceCalculator rc;
+    private Resource clusterRes;
+
+    TQComparator(ResourceCalculator rc, Resource clusterRes) {
+      this.rc = rc;
+      this.clusterRes = clusterRes;
+    }
+
+    @Override
+    public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
+      if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
+        return -1;
+      }
+      if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
+        return 1;
+      }
+      return 0;
+    }
+
+    // Calculates idealAssigned / guaranteed
+    // TempQueues with 0 guarantees are always considered the most over
+    // capacity and therefore considered last for resources.
+    private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
+      double pctOver = Integer.MAX_VALUE;
+      if (q != null && Resources.greaterThan(
+          rc, clusterRes, q.guaranteed, Resources.none())) {
+        pctOver =
+            Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
+      }
+      return (pctOver);
+    }
+  }
+
+  public PreemptableResourceCalculator(CapacitySchedulerPreemptionContext preemptionContext) {
+    context = preemptionContext;
+    rc = preemptionContext.getResourceCalculator();
+  }
+
+  /**
+   * Computes a normalizedGuaranteed capacity based on active queues
+   * @param rc resource calculator
+   * @param clusterResource the total amount of resources in the cluster
+   * @param queues the list of queues to consider
+   */
+  private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
+      Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
+    Resource activeCap = Resource.newInstance(0, 0);
+
+    if (ignoreGuar) {
+      for (TempQueuePerPartition q : queues) {
+        q.normalizedGuarantee = 1.0f / queues.size();
+      }
+    } else {
+      for (TempQueuePerPartition q : queues) {
+        Resources.addTo(activeCap, q.guaranteed);
+      }
+      for (TempQueuePerPartition q : queues) {
+        q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+            q.guaranteed, activeCap);
+      }
+    }
+  }
+
+  // Take the most underserved TempQueue (the one on the head). Collect and
+  // return the list of all queues that have the same idealAssigned
+  // percentage of guaranteed.
+  protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
+      PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
+    ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
+    while (!orderedByNeed.isEmpty()) {
+      TempQueuePerPartition q1 = orderedByNeed.remove();
+      underserved.add(q1);
+      TempQueuePerPartition q2 = orderedByNeed.peek();
+      // q1's pct of guaranteed won't be larger than q2's. If it's less, then
+      // return what has already been collected. Otherwise, q1's pct of
+      // guaranteed == that of q2, so add q2 to underserved list during the
+      // next pass.
+      if (q2 == null || tqComparator.compare(q1,q2) < 0) {
+        return underserved;
+      }
+    }
+    return underserved;
+  }
+
+
+  /**
+   * Given a set of queues compute the fix-point distribution of unassigned
+   * resources among them. As pending request of a queue are exhausted, the
+   * queue is removed from the set and remaining capacity redistributed among
+   * remaining queues. The distribution is weighted based on guaranteed
+   * capacity, unless asked to ignoreGuarantee, in which case resources are
+   * distributed uniformly.
+   */
+  private void computeFixpointAllocation(ResourceCalculator rc,
+      Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
+      Resource unassigned, boolean ignoreGuarantee) {
+    // Prior to assigning the unused resources, process each queue as follows:
+    // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
+    // Else idealAssigned = current;
+    // Subtract idealAssigned resources from unassigned.
+    // If the queue has all of its needs met (that is, if
+    // idealAssigned >= current + pending), remove it from consideration.
+    // Sort queues from most under-guaranteed to most over-guaranteed.
+    TQComparator tqComparator = new TQComparator(rc, tot_guarant);
+    PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
+        tqComparator);
+    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+      TempQueuePerPartition q = i.next();
+      if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
+        q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
+      } else {
+        q.idealAssigned = Resources.clone(q.current);
+      }
+      Resources.subtractFrom(unassigned, q.idealAssigned);
+      // If idealAssigned < (current + pending), q needs more resources, so
+      // add it to the list of underserved queues, ordered by need.
+      Resource curPlusPend = Resources.add(q.current, q.pending);
+      if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
+        orderedByNeed.add(q);
+      }
+    }
+
+    //assign all cluster resources until no more demand, or no resources are left
+    while (!orderedByNeed.isEmpty()
+        && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
+      Resource wQassigned = Resource.newInstance(0, 0);
+      // we compute normalizedGuarantees capacity based on currently active
+      // queues
+      resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
+
+      // For each underserved queue (or set of queues if multiple are equally
+      // underserved), offer its share of the unassigned resources based on its
+      // normalized guarantee. After the offer, if the queue is not satisfied,
+      // place it back in the ordered list of queues, recalculating its place
+      // in the order of most under-guaranteed to most over-guaranteed. In this
+      // way, the most underserved queue(s) are always given resources first.
+      Collection<TempQueuePerPartition> underserved =
+          getMostUnderservedQueues(orderedByNeed, tqComparator);
+      for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
+          .hasNext();) {
+        TempQueuePerPartition sub = i.next();
+        Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
+            unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
+        Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+        Resource wQdone = Resources.subtract(wQavail, wQidle);
+
+        if (Resources.greaterThan(rc, tot_guarant,
+            wQdone, Resources.none())) {
+          // The queue is still asking for more. Put it back in the priority
+          // queue, recalculating its order based on need.
+          orderedByNeed.add(sub);
+        }
+        Resources.addTo(wQassigned, wQdone);
+      }
+      Resources.subtractFrom(unassigned, wQassigned);
+    }
+  }
+
+  /**
+   * This method computes (for a single level in the tree, passed as a {@code
+   * List<TempQueue>}) the ideal assignment of resources. This is done
+   * recursively to allocate capacity fairly across all queues with pending
+   * demands. It terminates when no resources are left to assign, or when all
+   * demand is satisfied.
+   *
+   * @param rc resource calculator
+   * @param queues a list of cloned queues to be assigned capacity to (this is
+   * an out param)
+   * @param totalPreemptionAllowed total amount of preemption we allow
+   * @param tot_guarant the amount of capacity assigned to this pool of queues
+   */
+  private void computeIdealResourceDistribution(ResourceCalculator rc,
+      List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
+      Resource tot_guarant) {
+
+    // qAlloc tracks currently active queues (will decrease progressively as
+    // demand is met)
+    List<TempQueuePerPartition> qAlloc = new ArrayList<>(queues);
+    // unassigned tracks how much resources are still to assign, initialized
+    // with the total capacity for this set of queues
+    Resource unassigned = Resources.clone(tot_guarant);
+
+    // group queues based on whether they have non-zero guaranteed capacity
+    Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<>();
+    Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
+
+    for (TempQueuePerPartition q : qAlloc) {
+      if (Resources
+          .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+        nonZeroGuarQueues.add(q);
+      } else {
+        zeroGuarQueues.add(q);
+      }
+    }
+
+    // first compute the allocation as a fixpoint based on guaranteed capacity
+    computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+        false);
+
+    // if any capacity is left unassigned, distributed among zero-guarantee
+    // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
+    if (!zeroGuarQueues.isEmpty()
+        && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
+      computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+          true);
+    }
+
+    // based on ideal assignment computed above and current assignment we derive
+    // how much preemption is required overall
+    Resource totPreemptionNeeded = Resource.newInstance(0, 0);
+    for (TempQueuePerPartition t:queues) {
+      if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
+        Resources.addTo(totPreemptionNeeded,
+            Resources.subtract(t.current, t.idealAssigned));
+      }
+    }
+
+    // if we need to preempt more than is allowed, compute a factor (0<f<1)
+    // that is used to scale down how much we ask back from each queue
+    float scalingFactor = 1.0F;
+    if (Resources.greaterThan(rc, tot_guarant,
+        totPreemptionNeeded, totalPreemptionAllowed)) {
+      scalingFactor = Resources.divide(rc, tot_guarant,
+          totalPreemptionAllowed, totPreemptionNeeded);
+    }
+
+    // assign to each queue the amount of actual preemption based on local
+    // information of ideal preemption and scaling factor
+    for (TempQueuePerPartition t : queues) {
+      t.assignPreemption(scalingFactor, rc, tot_guarant);
+    }
+    if (LOG.isDebugEnabled()) {
+      for (TempQueuePerPartition t : queues) {
+        LOG.debug(t);
+      }
+    }
+
+  }
+
+  /**
+   * This method recursively computes the ideal assignment of resources to each
+   * level of the hierarchy. This ensures that leafs that are over-capacity but
+   * with parents within capacity will not be preemptionCandidates. Preemptions are allowed
+   * within each subtree according to local over/under capacity.
+   *
+   * @param root the root of the cloned queue hierachy
+   * @param totalPreemptionAllowed maximum amount of preemption allowed
+   * @return a list of leaf queues updated with preemption targets
+   */
+  private void recursivelyComputeIdealAssignment(
+      TempQueuePerPartition root, Resource totalPreemptionAllowed) {
+    if (root.getChildren() != null &&
+        root.getChildren().size() > 0) {
+      // compute ideal distribution at this level
+      computeIdealResourceDistribution(rc, root.getChildren(),
+          totalPreemptionAllowed, root.idealAssigned);
+      // compute recursively for lower levels and build list of leafs
+      for(TempQueuePerPartition t : root.getChildren()) {
+        recursivelyComputeIdealAssignment(t, totalPreemptionAllowed);
+      }
+    }
+  }
+
+
+  private void calculateResToObtainByPartitionForLeafQueues(
+      Set<String> leafQueueNames, Resource clusterResource) {
+    // Loop all leaf queues
+    for (String queueName : leafQueueNames) {
+      // check if preemption disabled for the queue
+      if (context.getQueueByPartition(queueName,
+          RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("skipping from queue=" + queueName
+              + " because it's a non-preemptable queue");
+        }
+        continue;
+      }
+
+      // compute resToObtainByPartition considered inter-queue preemption
+      for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) {
+        // we act only if we are violating balance by more than
+        // maxIgnoredOverCapacity
+        if (Resources.greaterThan(rc, clusterResource, qT.current,
+            Resources.multiply(qT.guaranteed, 1.0 + context.getMaxIgnoreOverCapacity()))) {
+          // we introduce a dampening factor naturalTerminationFactor that
+          // accounts for natural termination of containers
+          Resource resToObtain = Resources.multiply(qT.toBePreempted,
+              context.getNaturalTerminationFactor());
+          // Only add resToObtain when it >= 0
+          if (Resources.greaterThan(rc, clusterResource, resToObtain,
+              Resources.none())) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Queue=" + queueName + " partition=" + qT.partition
+                  + " resource-to-obtain=" + resToObtain);
+            }
+          }
+          qT.actuallyToBePreempted = Resources.clone(resToObtain);
+        } else {
+          qT.actuallyToBePreempted = Resources.none();
+        }
+      }
+    }
+  }
+
+  public void computeIdealAllocation(Resource clusterResource,
+      Resource totalPreemptionAllowed) {
+    for (String partition : context.getAllPartitions()) {
+      TempQueuePerPartition tRoot =
+          context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
+      // compute the ideal distribution of resources among queues
+      // updates cloned queues state accordingly
+      tRoot.idealAssigned = tRoot.guaranteed;
+      recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+    }
+
+    // based on ideal allocation select containers to be preempted from each
+    // calculate resource-to-obtain by partition for each leaf queues
+    calculateResToObtainByPartitionForLeafQueues(context.getLeafQueueNames(),
+        clusterResource);
+  }
+}

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java

@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
+import java.util.Map;
+import java.util.Set;
+
+public abstract class PreemptionCandidatesSelector {
+  protected CapacitySchedulerPreemptionContext preemptionContext;
+  protected ResourceCalculator rc;
+
+  PreemptionCandidatesSelector(
+      CapacitySchedulerPreemptionContext preemptionContext) {
+    this.preemptionContext = preemptionContext;
+    this.rc = preemptionContext.getResourceCalculator();
+  }
+
+  /**
+   * Get preemption candidates from computed resource sharing and already
+   * selected candidates.
+   *
+   * @param selectedCandidates already selected candidates from previous policies
+   * @param clusterResource
+   * @param totalPreemptedResourceAllowed how many resources allowed to be
+   *                                      preempted in this round
+   * @return merged selected candidates.
+   */
+  public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource clusterResource, Resource totalPreemptedResourceAllowed);
+}

Tiedoston diff-näkymää rajattu, sillä se on liian suuri
+ 169 - 716
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java


+ 159 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java

@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+
+/**
+ * Temporary data-structure tracking resource availability, pending resource
+ * need, current utilization. This is per-queue-per-partition data structure
+ */
+public class TempQueuePerPartition {
+  // Following fields are copied from scheduler
+  final String queueName;
+  final Resource current;
+  final Resource pending;
+  final Resource guaranteed;
+  final Resource maxCapacity;
+  final Resource killable;
+  final String partition;
+
+  // Following fields are setted and used by candidate selection policies
+  Resource idealAssigned;
+  Resource toBePreempted;
+  Resource untouchableExtra;
+  Resource preemptableExtra;
+  // For logging purpose
+  Resource actuallyToBePreempted;
+
+  double normalizedGuarantee;
+
+  final ArrayList<TempQueuePerPartition> children;
+  LeafQueue leafQueue;
+  boolean preemptionDisabled;
+
+  TempQueuePerPartition(String queueName, Resource current, Resource pending,
+      Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
+      String partition, Resource killable) {
+    this.queueName = queueName;
+    this.current = current;
+    this.pending = pending;
+    this.guaranteed = guaranteed;
+    this.maxCapacity = maxCapacity;
+    this.idealAssigned = Resource.newInstance(0, 0);
+    this.actuallyToBePreempted = Resource.newInstance(0, 0);
+    this.toBePreempted = Resource.newInstance(0, 0);
+    this.normalizedGuarantee = Float.NaN;
+    this.children = new ArrayList<>();
+    this.untouchableExtra = Resource.newInstance(0, 0);
+    this.preemptableExtra = Resource.newInstance(0, 0);
+    this.preemptionDisabled = preemptionDisabled;
+    this.partition = partition;
+    this.killable = killable;
+  }
+
+  public void setLeafQueue(LeafQueue l) {
+    assert children.size() == 0;
+    this.leafQueue = l;
+  }
+
+  /**
+   * When adding a child we also aggregate its pending resource needs.
+   * @param q the child queue to add to this queue
+   */
+  public void addChild(TempQueuePerPartition q) {
+    assert leafQueue == null;
+    children.add(q);
+    Resources.addTo(pending, q.pending);
+  }
+
+  public ArrayList<TempQueuePerPartition> getChildren(){
+    return children;
+  }
+
+  // This function "accepts" all the resources it can (pending) and return
+  // the unused ones
+  Resource offer(Resource avail, ResourceCalculator rc,
+      Resource clusterResource) {
+    Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
+        Resources.subtract(maxCapacity, idealAssigned),
+        Resource.newInstance(0, 0));
+    // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+    Resource accepted =
+        Resources.min(rc, clusterResource,
+            absMaxCapIdealAssignedDelta,
+            Resources.min(rc, clusterResource, avail, Resources.subtract(
+                Resources.add(current, pending), idealAssigned)));
+    Resource remain = Resources.subtract(avail, accepted);
+    Resources.addTo(idealAssigned, accepted);
+    return remain;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(" NAME: " + queueName)
+        .append(" CUR: ").append(current)
+        .append(" PEN: ").append(pending)
+        .append(" GAR: ").append(guaranteed)
+        .append(" NORM: ").append(normalizedGuarantee)
+        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
+        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
+        .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted)
+        .append(" UNTOUCHABLE: ").append(untouchableExtra)
+        .append(" PREEMPTABLE: ").append(preemptableExtra)
+        .append("\n");
+
+    return sb.toString();
+  }
+
+  public void assignPreemption(float scalingFactor, ResourceCalculator rc,
+      Resource clusterResource) {
+    if (Resources.greaterThan(rc, clusterResource,
+        Resources.subtract(current, killable), idealAssigned)) {
+      toBePreempted = Resources.multiply(Resources
+              .subtract(Resources.subtract(current, killable), idealAssigned),
+          scalingFactor);
+    } else {
+      toBePreempted = Resource.newInstance(0, 0);
+    }
+  }
+
+  void appendLogString(StringBuilder sb) {
+    sb.append(queueName).append(", ")
+        .append(current.getMemory()).append(", ")
+        .append(current.getVirtualCores()).append(", ")
+        .append(pending.getMemory()).append(", ")
+        .append(pending.getVirtualCores()).append(", ")
+        .append(guaranteed.getMemory()).append(", ")
+        .append(guaranteed.getVirtualCores()).append(", ")
+        .append(idealAssigned.getMemory()).append(", ")
+        .append(idealAssigned.getVirtualCores()).append(", ")
+        .append(toBePreempted.getMemory()).append(", ")
+        .append(toBePreempted.getVirtualCores() ).append(", ")
+        .append(actuallyToBePreempted.getMemory()).append(", ")
+        .append(actuallyToBePreempted.getVirtualCores());
+  }
+
+}

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java

@@ -1020,4 +1020,49 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public boolean getLazyPreemptionEnabled() {
     return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED);
   }
+
+  /** If true, run the policy but do not affect the cluster with preemption and
+   * kill events. */
+  public static final String PREEMPTION_OBSERVE_ONLY =
+      "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
+  public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false;
+
+  /** Time in milliseconds between invocations of this policy */
+  public static final String PREEMPTION_MONITORING_INTERVAL =
+      "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
+  public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L;
+
+  /** Time in milliseconds between requesting a preemption from an application
+   * and killing the container. */
+  public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL =
+      "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
+  public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L;
+
+  /** Maximum percentage of resources preemptionCandidates in a single round. By
+   * controlling this value one can throttle the pace at which containers are
+   * reclaimed from the cluster. After computing the total desired preemption,
+   * the policy scales it back within this limit. */
+  public static final String TOTAL_PREEMPTION_PER_ROUND =
+      "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
+  public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f;
+
+  /** Maximum amount of resources above the target capacity ignored for
+   * preemption. This defines a deadzone around the target capacity that helps
+   * prevent thrashing and oscillations around the computed target balance.
+   * High values would slow the time to capacity and (absent natural
+   * completions) it might prevent convergence to guaranteed capacity. */
+  public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
+      "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
+  public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f;
+  /**
+   * Given a computed preemption target, account for containers naturally
+   * expiring and preempt only this percentage of the delta. This determines
+   * the rate of geometric convergence into the deadzone ({@link
+   * #PREEMPTION_MAX_IGNORED_OVER_CAPACITY}). For example, a termination factor of 0.5
+   * will reclaim almost 95% of resources within 5 * {@link
+   * #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
+  public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
+      "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
+  public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
+      0.2f;
 }

+ 0 - 6
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptableQueue.java

@@ -86,12 +86,6 @@ public class PreemptableQueue {
     return res == null ? Resources.none() : res;
   }
 
-  @SuppressWarnings("unchecked")
-  public Map<ContainerId, RMContainer> getKillableContainers(String partition) {
-    Map<ContainerId, RMContainer> map = killableContainers.get(partition);
-    return map == null ? Collections.EMPTY_MAP : map;
-  }
-
   public Map<String, Map<ContainerId, RMContainer>> getKillableContainers() {
     return killableContainers;
   }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/preemption/PreemptionManager.java

@@ -146,7 +146,7 @@ public class PreemptionManager {
     }
   }
 
-  public Map<String, PreemptableQueue> getShallowCopyOfPreemptableEntities() {
+  public Map<String, PreemptableQueue> getShallowCopyOfPreemptableQueues() {
     try {
       readLock.lock();
       Map<String, PreemptableQueue> map = new HashMap<>();

+ 67 - 66
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -17,38 +17,6 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MAX_IGNORED_OVER_CAPACITY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.OBSERVE_ONLY;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.NavigableSet;
-import java.util.Random;
-import java.util.StringTokenizer;
-import java.util.TreeSet;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -63,7 +31,6 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TempQueuePerPartition;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -95,6 +62,32 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Random;
+import java.util.StringTokenizer;
+import java.util.TreeSet;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 public class TestProportionalCapacityPreemptionPolicy {
 
   static final long TS = 3141592653L;
@@ -105,11 +98,10 @@ public class TestProportionalCapacityPreemptionPolicy {
   float setAMResourcePercent = 0.0f;
   Random rand = null;
   Clock mClock = null;
-  Configuration conf = null;
+  CapacitySchedulerConfiguration conf = null;
   CapacityScheduler mCS = null;
   RMContext rmContext = null;
   RMNodeLabelsManager lm = null;
-  CapacitySchedulerConfiguration schedConf = null;
   EventHandler<SchedulerEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
   Resource clusterResources = null;
@@ -132,7 +124,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
     int value;
 
-    private priority(int value) {
+    priority(int value) {
       this.value = value;
     }
 
@@ -146,12 +138,17 @@ public class TestProportionalCapacityPreemptionPolicy {
   @Before
   @SuppressWarnings("unchecked")
   public void setup() {
-    conf = new Configuration(false);
-    conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
-    conf.setLong(MONITORING_INTERVAL, 3000);
+    conf = new CapacitySchedulerConfiguration(new Configuration(false));
+    conf.setLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
+    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        3000);
     // report "ideal" preempt
-    conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
-    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        1.0f);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        1.0f);
     conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
         ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
     conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
@@ -164,8 +161,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     mCS = mock(CapacityScheduler.class);
     when(mCS.getResourceCalculator()).thenReturn(rc);
     lm = mock(RMNodeLabelsManager.class);
-    schedConf = new CapacitySchedulerConfiguration();
-    when(mCS.getConfiguration()).thenReturn(schedConf);
+    when(mCS.getConfiguration()).thenReturn(conf);
     rmContext = mock(RMContext.class);
     when(mCS.getRMContext()).thenReturn(rmContext);
     when(mCS.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -271,7 +267,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  1 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setLong(WAIT_TIME_BEFORE_KILL, killTime);
+    conf.setLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
+        killTime);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
 
     // ensure all pending rsrc from A get preempted from other queues
@@ -308,7 +306,9 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  1 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setFloat(MAX_IGNORED_OVER_CAPACITY, (float) 0.1);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_MAX_IGNORED_OVER_CAPACITY,
+        (float) 0.1);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // ignore 10% overcapacity to avoid jitter
@@ -330,7 +330,7 @@ public class TestProportionalCapacityPreemptionPolicy {
         {   3,   0,   0,  0 },  // subqueues
       };
 
-    schedConf.setPreemptionDisabled("root.queueB", true);
+    conf.setPreemptionDisabled("root.queueB", true);
 
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
@@ -343,7 +343,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // event handler will count only events from the following test and not the
     // previous one.
     setup();
-    schedConf.setPreemptionDisabled("root.queueB", false);
+    conf.setPreemptionDisabled("root.queueB", false);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
 
     policy2.editSchedule();
@@ -382,7 +382,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Need to call setup() again to reset mDisp
     setup();
     // Turn off preemption for queueB and it's children
-    schedConf.setPreemptionDisabled("root.queueA.queueB", true);
+    conf.setPreemptionDisabled("root.queueA.queueB", true);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
     policy2.editSchedule();
     ApplicationAttemptId expectedAttemptOnQueueC = 
@@ -429,7 +429,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Need to call setup() again to reset mDisp
     setup();
     // Turn off preemption for queueB(appA)
-    schedConf.setPreemptionDisabled("root.queueA.queueB", true);
+    conf.setPreemptionDisabled("root.queueA.queueB", true);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
     policy2.editSchedule();
     // Now that queueB(appA) is not preemptable, verify that resources come
@@ -439,8 +439,8 @@ public class TestProportionalCapacityPreemptionPolicy {
 
     setup();
     // Turn off preemption for two of the 3 queues with over-capacity.
-    schedConf.setPreemptionDisabled("root.queueD.queueE", true);
-    schedConf.setPreemptionDisabled("root.queueA.queueB", true);
+    conf.setPreemptionDisabled("root.queueD.queueE", true);
+    conf.setPreemptionDisabled("root.queueA.queueB", true);
     ProportionalCapacityPreemptionPolicy policy3 = buildPolicy(qData);
     policy3.editSchedule();
 
@@ -481,7 +481,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     // Turn off preemption for queueA and it's children. queueF(appC)'s request
     // should starve.
     setup(); // Call setup() to reset mDisp
-    schedConf.setPreemptionDisabled("root.queueA", true);
+    conf.setPreemptionDisabled("root.queueA", true);
     ProportionalCapacityPreemptionPolicy policy2 = buildPolicy(qData);
     policy2.editSchedule();
     verify(mDisp, never()).handle(argThat(new IsPreemptionRequestFor(appA))); // queueC
@@ -505,7 +505,7 @@ public class TestProportionalCapacityPreemptionPolicy {
       {   -1,   -1,    1,    1,    1,   -1,    1,    1,    1 },  // req granularity
       {    2,    3,    0,    0,    0,    3,    0,    0,    0 },  // subqueues
     };
-    schedConf.setPreemptionDisabled("root.queueA.queueC", true);
+    conf.setPreemptionDisabled("root.queueA.queueC", true);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // Although queueC(appB) is way over capacity and is untouchable,
@@ -529,7 +529,7 @@ public class TestProportionalCapacityPreemptionPolicy {
         {   3,   2,   0,   0,   2,   0,   0,   2,   0,   0 },  // subqueues
    };
 
-    schedConf.setPreemptionDisabled("root", true);
+    conf.setPreemptionDisabled("root", true);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // All queues should be non-preemptable, so request should starve.
@@ -556,7 +556,7 @@ public class TestProportionalCapacityPreemptionPolicy {
         {   2,   2,   0,   0,   2,   0,   0 },  // subqueues
     };
     // QueueE inherits non-preemption from QueueD
-    schedConf.setPreemptionDisabled("root.queueD", true);
+    conf.setPreemptionDisabled("root.queueD", true);
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // appC is running on QueueE. QueueE is over absMaxCap, but is not
@@ -596,7 +596,10 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  0 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 0.1);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        (float) 0.1);
+
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // ignore 10% imbalance between over-capacity queues
@@ -616,7 +619,10 @@ public class TestProportionalCapacityPreemptionPolicy {
       {  -1,  1,  1,  0 },  // req granularity
       {   3,  0,  0,  0 },  // subqueues
     };
-    conf.setBoolean(OBSERVE_ONLY, true);
+    conf.setBoolean(CapacitySchedulerConfiguration.PREEMPTION_OBSERVE_ONLY,
+        true);
+    when(mCS.getConfiguration()).thenReturn(
+        new CapacitySchedulerConfiguration(conf));
     ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
     policy.editSchedule();
     // verify even severe imbalance not affected
@@ -735,7 +741,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     containers.add(rm4);
 
     // sort them
-    ProportionalCapacityPreemptionPolicy.sortContainers(containers);
+    FifoCandidatesSelector.sortContainers(containers);
 
     // verify the "priority"-first, "reverse container-id"-second
     // ordering is enforced correctly
@@ -957,7 +963,7 @@ public class TestProportionalCapacityPreemptionPolicy {
 
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData) {
     ProportionalCapacityPreemptionPolicy policy = new ProportionalCapacityPreemptionPolicy(
-        conf, rmContext, mCS, mClock);
+        rmContext, mCS, mClock);
     clusterResources = Resource.newInstance(
         leafAbsCapacities(qData[0], qData[7]), 0);
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
@@ -967,11 +973,6 @@ public class TestProportionalCapacityPreemptionPolicy {
     return policy;
   }
 
-  ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
-      String[][] resData) {
-    return buildPolicy(qData, resData, false);
-  }
-
   ProportionalCapacityPreemptionPolicy buildPolicy(int[][] qData,
       String[][] resData, boolean useDominantResourceCalculator) {
     if (useDominantResourceCalculator) {
@@ -979,7 +980,7 @@ public class TestProportionalCapacityPreemptionPolicy {
           new DominantResourceCalculator());
     }
     ProportionalCapacityPreemptionPolicy policy =
-        new ProportionalCapacityPreemptionPolicy(conf, rmContext, mCS, mClock);
+        new ProportionalCapacityPreemptionPolicy(rmContext, mCS, mClock);
     clusterResources = leafAbsCapacities(parseResourceDetails(resData[0]),
         qData[2]);
     ParentQueue mRoot = buildMockRootQueue(rand, resData, qData);
@@ -1124,7 +1125,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     String qName = "";
     while(tokenizer.hasMoreTokens()) {
       qName += tokenizer.nextToken();
-      preemptionDisabled = schedConf.getPreemptionDisabled(qName, preemptionDisabled);
+      preemptionDisabled = conf.getPreemptionDisabled(qName, preemptionDisabled);
       qName += ".";
     }
     return preemptionDisabled;

+ 35 - 43
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java

@@ -18,29 +18,6 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
-import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -81,6 +58,25 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
   private static final Log LOG =
       LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
@@ -94,8 +90,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
 
   private ResourceCalculator rc = new DefaultResourceCalculator();
   private Clock mClock = null;
-  private Configuration conf = null;
-  private CapacitySchedulerConfiguration csConf = null;
+  private CapacitySchedulerConfiguration conf = null;
   private CapacityScheduler cs = null;
   private EventHandler<SchedulerEvent> mDisp = null;
   private ProportionalCapacityPreemptionPolicy policy = null;
@@ -107,24 +102,23 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     org.apache.log4j.Logger.getRootLogger().setLevel(
         org.apache.log4j.Level.DEBUG);
 
-    conf = new Configuration(false);
-    conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
-    conf.setLong(MONITORING_INTERVAL, 3000);
+    conf = new CapacitySchedulerConfiguration(new Configuration(false));
+    conf.setLong(
+        CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
+    conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+        3000);
     // report "ideal" preempt
-    conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
-    conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
-    conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
-        ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
-    conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
-    // FairScheduler doesn't support this test,
-    // Set CapacityScheduler as the scheduler for this test.
-    conf.set("yarn.resourcemanager.scheduler.class",
-        CapacityScheduler.class.getName());
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        (float) 1.0);
+    conf.setFloat(
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        (float) 1.0);
 
     mClock = mock(Clock.class);
     cs = mock(CapacityScheduler.class);
     when(cs.getResourceCalculator()).thenReturn(rc);
     when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
+    when(cs.getConfiguration()).thenReturn(conf);
 
     nlm = mock(RMNodeLabelsManager.class);
     mDisp = mock(EventHandler.class);
@@ -134,11 +128,9 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     Dispatcher disp = mock(Dispatcher.class);
     when(rmContext.getDispatcher()).thenReturn(disp);
     when(disp.getEventHandler()).thenReturn(mDisp);
-    csConf = new CapacitySchedulerConfiguration();
-    when(cs.getConfiguration()).thenReturn(csConf);
     when(cs.getRMContext()).thenReturn(rmContext);
 
-    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs, mClock);
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
     partitionToResource = new HashMap<>();
     nodeIdToSchedulerNodes = new HashMap<>();
     nameToCSQueues = new HashMap<>();
@@ -576,7 +568,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
         "c\t" // app3 in c
         + "(1,1,n1,x,20,false)"; // 20x in n1
 
-    csConf.setPreemptionDisabled("root.b", true);
+    conf.setPreemptionDisabled("root.b", true);
     buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
     policy.editSchedule();
 
@@ -901,7 +893,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
     when(cs.getClusterResource()).thenReturn(clusterResource);
     mockApplications(appsConfig);
 
-    policy = new ProportionalCapacityPreemptionPolicy(conf, rmContext, cs,
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
         mClock);
   }
 
@@ -1235,7 +1227,7 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
 
     // Setup preemption disabled
     when(queue.getPreemptionDisabled()).thenReturn(
-        csConf.getPreemptionDisabled(queuePath, false));
+        conf.getPreemptionDisabled(queuePath, false));
 
     nameToCSQueues.put(queueName, queue);
     when(cs.getQueue(eq(queueName))).thenReturn(queue);

+ 10 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -81,14 +82,15 @@ public class TestCapacitySchedulerPreemption {
     conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
 
     // Set preemption related configurations
-    conf.setInt(ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL,
+    conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
         0);
     conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
         true);
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        1.0f);
     conf.setFloat(
-        ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND, 1.0f);
-    conf.setFloat(
-        ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR, 1.0f);
+        CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+        1.0f);
     mgr = new NullRMNodeLabelsManager();
     mgr.init(this.conf);
     clock = mock(Clock.class);
@@ -484,6 +486,10 @@ public class TestCapacitySchedulerPreemption {
             .isEmpty());
   }
 
+  /*
+   * Ignore this test now because it could be a premature optimization
+   */
+  @Ignore
   @Test (timeout = 60000)
   public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
       throws Exception {

Kaikkia tiedostoja ei voida näyttää, sillä liian monta tiedostoa muuttui tässä diffissä