Browse Source

YARN-2933. Capacity Scheduler preemption policy should only consider capacity without labels temporarily. Contributed by Mayank Bansal

(cherry picked from commit 0a2d3e717d9c42090a32ff177991a222a1e34132)
Wangda Tan 10 năm trước cách đây
mục cha
commit
ef6fc24dfb

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -357,6 +357,9 @@ Release 2.7.0 - UNRELEASED
     YARN-3015. yarn classpath command should support same options as hadoop
     classpath. (Contributed by Varun Saxena)
 
+    YARN-2933. Capacity Scheduler preemption policy should only consider capacity 
+    without labels temporarily. (Mayank Bansal via wangda)
+
 Release 2.6.0 - 2014-11-18
 
   INCOMPATIBLE CHANGES

+ 49 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java

@@ -30,15 +30,19 @@ import java.util.NavigableSet;
 import java.util.PriorityQueue;
 import java.util.Set;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
@@ -129,6 +133,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   private float percentageClusterPreemptionAllowed;
   private double naturalTerminationFactor;
   private boolean observeOnly;
+  private Map<NodeId, Set<String>> labels;
 
   public ProportionalCapacityPreemptionPolicy() {
     clock = new SystemClock();
@@ -168,6 +173,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
       config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
     observeOnly = config.getBoolean(OBSERVE_ONLY, false);
     rc = scheduler.getResourceCalculator();
+    labels = null;
   }
   
   @VisibleForTesting
@@ -176,13 +182,38 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
   }
 
   @Override
-  public void editSchedule(){
+  public void editSchedule() {
     CSQueue root = scheduler.getRootQueue();
-    Resource clusterResources =
-      Resources.clone(scheduler.getClusterResource());
+    Resource clusterResources = Resources.clone(scheduler.getClusterResource());
+    clusterResources = getNonLabeledResources(clusterResources);
+    setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
+        .getNodeLabels());
     containerBasedPreemptOrKill(root, clusterResources);
   }
 
+  /**
+   * Setting Node Labels
+   * 
+   * @param nodelabels
+   */
+  public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
+    labels = nodelabels;
+  }
+
+  /**
+   * This method returns all non labeled resources.
+   * 
+   * @param clusterResources
+   * @return Resources
+   */
+  private Resource getNonLabeledResources(Resource clusterResources) {
+    RMContext rmcontext = scheduler.getRMContext();
+    RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
+    Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
+        clusterResources);
+    return res == null ? clusterResources : res;
+  }
+  
   /**
    * This method selects and tracks containers to be preempted. If a container
    * is in the target list for more than maxWaitTime it is killed.
@@ -593,7 +624,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
    * @param app
    * @param clusterResource
    * @param rsrcPreempt
-   * @return
+   * @return Set<RMContainer> Set of RMContainers
    */
   private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
       Resource clusterResource, Resource rsrcPreempt,
@@ -635,12 +666,26 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
         Resources.addTo(skippedAMSize, c.getContainer().getResource());
         continue;
       }
+      // skip Labeled resource
+      if(isLabeledContainer(c)){
+        continue;
+      }
       ret.add(c);
       Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
     }
 
     return ret;
   }
+  
+  /**
+   * Checking if given container is a labeled container
+   * 
+   * @param c
+   * @return true/false
+   */
+  private boolean isLabeledContainer(RMContainer c) {
+    return labels.containsKey(c.getAllocatedNode());
+  }
 
   /**
    * Compare by reversed priority order first, and then reversed containerId

+ 97 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java

@@ -38,27 +38,37 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.NavigableSet;
 import java.util.Random;
+import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
@@ -72,12 +82,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 import org.mockito.ArgumentCaptor;
 import org.mockito.ArgumentMatcher;
+import org.mortbay.log.Log;
 
 public class TestProportionalCapacityPreemptionPolicy {
 
@@ -85,14 +97,18 @@ public class TestProportionalCapacityPreemptionPolicy {
 
   int appAlloc = 0;
   boolean setAMContainer = false;
+  boolean setLabeledContainer = false;
   float setAMResourcePercent = 0.0f;
   Random rand = null;
   Clock mClock = null;
   Configuration conf = null;
   CapacityScheduler mCS = null;
+  RMContext rmContext = null;
+  RMNodeLabelsManager lm = null;
   CapacitySchedulerConfiguration schedConf = null;
   EventHandler<ContainerPreemptEvent> mDisp = null;
   ResourceCalculator rc = new DefaultResourceCalculator();
+  Resource clusterResources = null;
   final ApplicationAttemptId appA = ApplicationAttemptId.newInstance(
       ApplicationId.newInstance(TS, 0), 0);
   final ApplicationAttemptId appB = ApplicationAttemptId.newInstance(
@@ -108,6 +124,19 @@ public class TestProportionalCapacityPreemptionPolicy {
   final ArgumentCaptor<ContainerPreemptEvent> evtCaptor =
     ArgumentCaptor.forClass(ContainerPreemptEvent.class);
 
+  public enum priority {
+    AMCONTAINER(0), CONTAINER(1), LABELEDCONTAINER(2);
+    int value;
+
+    private priority(int value) {
+      this.value = value;
+    }
+
+    public int getValue() {
+      return this.value;
+    }
+  };  
+
   @Rule public TestName name = new TestName();
 
   @Before
@@ -130,8 +159,12 @@ public class TestProportionalCapacityPreemptionPolicy {
     mClock = mock(Clock.class);
     mCS = mock(CapacityScheduler.class);
     when(mCS.getResourceCalculator()).thenReturn(rc);
+    lm = mock(RMNodeLabelsManager.class);
     schedConf = new CapacitySchedulerConfiguration();
     when(mCS.getConfiguration()).thenReturn(schedConf);
+    rmContext = mock(RMContext.class);
+    when(mCS.getRMContext()).thenReturn(rmContext);
+    when(rmContext.getNodeLabelManager()).thenReturn(lm);
     mDisp = mock(EventHandler.class);
     rand = new Random();
     long seed = rand.nextLong();
@@ -746,7 +779,51 @@ public class TestProportionalCapacityPreemptionPolicy {
     verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
     setAMContainer = false;
   }
-  
+
+  @Test
+  public void testIdealAllocationForLabels() {
+    int[][] qData = new int[][] {
+    // / A B
+        { 80, 40, 40 }, // abs
+        { 80, 80, 80 }, // maxcap
+        { 80, 80, 0 }, // used
+        { 70, 20, 50 }, // pending
+        { 0, 0, 0 }, // reserved
+        { 5, 4, 1 }, // apps
+        { -1, 1, 1 }, // req granularity
+        { 2, 0, 0 }, // subqueues
+    };
+    setAMContainer = true;
+    setLabeledContainer = true;
+    Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
+    NodeId node = NodeId.newInstance("node1", 0);
+    Set<String> labelSet = new HashSet<String>();
+    labelSet.add("x");
+    labels.put(node, labelSet);
+    when(lm.getNodeLabels()).thenReturn(labels);
+    ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+    // Subtracting Label X resources from cluster resources
+    when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
+        Resources.clone(Resource.newInstance(80, 0)));
+    clusterResources.setMemory(100);
+    policy.editSchedule();
+
+    // By skipping AM Container and Labeled container, all other 18 containers
+    // of appD will be
+    // preempted
+    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
+
+    // By skipping AM Container and Labeled container, all other 18 containers
+    // of appC will be
+    // preempted
+    verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
+
+    // rest 4 containers from appB will be preempted
+    verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
+    setAMContainer = false;
+    setLabeledContainer = false;
+  }
+
   @Test
   public void testPreemptSkippedAMContainers() {
     int[][] qData = new int[][] {
@@ -846,7 +923,7 @@ public class TestProportionalCapacityPreemptionPolicy {
     ParentQueue mRoot = buildMockRootQueue(rand, qData);
     when(mCS.getRootQueue()).thenReturn(mRoot);
 
-    Resource clusterResources =
+    clusterResources =
       Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
     when(mCS.getClusterResource()).thenReturn(clusterResources);
     return policy;
@@ -965,7 +1042,8 @@ public class TestProportionalCapacityPreemptionPolicy {
     Resource unit = Resource.newInstance(gran, 0);
     List<RMContainer> cReserved = new ArrayList<RMContainer>();
     for (int i = 0; i < reserved; i += gran) {
-      cReserved.add(mockContainer(appAttId, cAlloc, unit, 1));
+      cReserved.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
+          .getValue()));
       ++cAlloc;
     }
     when(app.getReservedContainers()).thenReturn(cReserved);
@@ -973,9 +1051,16 @@ public class TestProportionalCapacityPreemptionPolicy {
     List<RMContainer> cLive = new ArrayList<RMContainer>();
     for (int i = 0; i < used; i += gran) {
       if(setAMContainer && i == 0){
-        cLive.add(mockContainer(appAttId, cAlloc, unit, 0));
-      }else{
-        cLive.add(mockContainer(appAttId, cAlloc, unit, 1));
+        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.AMCONTAINER
+            .getValue()));
+      }else if(setLabeledContainer && i ==1){
+        cLive.add(mockContainer(appAttId, cAlloc, unit,
+            priority.LABELEDCONTAINER.getValue()));
+        ++used;
+      }
+      else{
+        cLive.add(mockContainer(appAttId, cAlloc, unit, priority.CONTAINER
+            .getValue()));
       }
       ++cAlloc;
     }
@@ -984,18 +1069,21 @@ public class TestProportionalCapacityPreemptionPolicy {
   }
 
   RMContainer mockContainer(ApplicationAttemptId appAttId, int id,
-      Resource r, int priority) {
+      Resource r, int cpriority) {
     ContainerId cId = ContainerId.newContainerId(appAttId, id);
     Container c = mock(Container.class);
     when(c.getResource()).thenReturn(r);
-    when(c.getPriority()).thenReturn(Priority.create(priority));
+    when(c.getPriority()).thenReturn(Priority.create(cpriority));
     RMContainer mC = mock(RMContainer.class);
     when(mC.getContainerId()).thenReturn(cId);
     when(mC.getContainer()).thenReturn(c);
     when(mC.getApplicationAttemptId()).thenReturn(appAttId);
-    if(0 == priority){
+    if (priority.AMCONTAINER.getValue() == cpriority) {
       when(mC.isAMContainer()).thenReturn(true);
     }
+    if (priority.LABELEDCONTAINER.getValue() == cpriority) {
+      when(mC.getAllocatedNode()).thenReturn(NodeId.newInstance("node1", 0));
+    }
     return mC;
   }