Browse Source

YARN-10043. FairOrderingPolicy Improvements. Contributed by Manikandan R

Szilard Nemeth 5 years ago
parent
commit
3d5ade1839

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.Schedulabl
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -209,6 +210,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
 
   private String nodeLabelExpression;
 
+  private final long startTime;
+
   public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 
       String user, Queue queue, AbstractUsersManager abstractUsersManager,
       RMContext rmContext) {
@@ -242,6 +245,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     readLock = lock.readLock();
     writeLock = lock.writeLock();
+    startTime = SystemClock.getInstance().getTime();
   }
 
   public void setOpportunisticContainerContext(
@@ -1487,4 +1491,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   public String getPartition() {
     return nodeLabelExpression == null ? "" : nodeLabelExpression;
   }
+
+
+  @Override
+  public long getStartTime() {
+    return startTime;
+  }
 }

+ 39 - 8
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java

@@ -28,14 +28,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 
 /**
- * An OrderingPolicy which orders SchedulableEntities for fairness (see
- * FairScheduler
- * FairSharePolicy), generally, processes with lesser usage are lesser. If
- * sizedBasedWeight is set to true then an application with high demand
- * may be prioritized ahead of an application with less usage.  This
- * is to offset the tendency to favor small apps, which could result in
- * starvation for large apps if many small ones enter and leave the queue
- * continuously (optional, default false)
+ *
+ * FairOrderingPolicy comparison goes through following steps.
+ *
+ * 1.Fairness based comparison. SchedulableEntities with lesser usage would be
+ * preferred when compared to another. If sizedBasedWeight is set to true then
+ * an application with high demand may be prioritized ahead of an application
+ * with less usage. This is to offset the tendency to favor small apps, which
+ * could result in starvation for large apps if many small ones enter and leave
+ * the queue continuously (optional, default false)
+ *
+ * 2. Compare using job submit time. SchedulableEntities submitted earlier would
+ * be preferred than later.
+ *
+ * 3. Compare demands. SchedulableEntities without resource demand get lower
+ * priority than ones which have demands.
  */
 public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractComparatorOrderingPolicy<S> {
 
@@ -46,6 +53,30 @@ public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
     @Override
     public int compare(final SchedulableEntity r1, final SchedulableEntity r2) {
       int res = (int) Math.signum( getMagnitude(r1) - getMagnitude(r2) );
+
+      if (res == 0) {
+        res = (int) Math.signum(r1.getStartTime() - r2.getStartTime());
+      }
+
+      if (res == 0) {
+        res = compareDemand(r1, r2);
+      }
+      return res;
+    }
+
+    private int compareDemand(SchedulableEntity s1, SchedulableEntity s2) {
+      int res = 0;
+      long demand1 = s1.getSchedulingResourceUsage()
+          .getCachedDemand(CommonNodeLabelsManager.ANY).getMemorySize();
+      long demand2 = s2.getSchedulingResourceUsage()
+          .getCachedDemand(CommonNodeLabelsManager.ANY).getMemorySize();
+
+      if ((demand1 == 0) && (demand2 > 0)) {
+        res = 1;
+      } else if ((demand2 == 0) && (demand1 > 0)) {
+        res = -1;
+      }
+
       return res;
     }
   }

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/SchedulableEntity.java

@@ -60,4 +60,10 @@ public interface SchedulableEntity {
    * @return partition
    */
   String getPartition();
+
+  /**
+   * Start time of the job.
+   * @return start time
+   */
+  long getStartTime();
 }

+ 12 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/MockSchedulableEntity.java

@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.util.SystemClock;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 
 
@@ -31,6 +32,7 @@ public class MockSchedulableEntity implements SchedulableEntity {
   private Priority priority;
   private boolean isRecovering;
   private String partition = "";
+  private long startTime;
 
   public MockSchedulableEntity() { }
   
@@ -39,6 +41,7 @@ public class MockSchedulableEntity implements SchedulableEntity {
     this.serial = serial;
     this.priority = Priority.newInstance(priority);
     this.isRecovering = isRecovering;
+    this.startTime = SystemClock.getInstance().getTime();
   }
 
   public void setId(String id) {
@@ -108,4 +111,13 @@ public class MockSchedulableEntity implements SchedulableEntity {
   public void setPartition(String partition) {
     this.partition = partition;
   }
+
+  @Override
+  public long getStartTime() {
+    return this.startTime;
+  }
+
+  public void setStartTime(long startTime) {
+    this.startTime = startTime;
+  }
 }

+ 114 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/TestFairOrderingPolicy.java

@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy;
 
+import static org.junit.Assert.assertEquals;
+
 import java.util.*;
 
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -47,7 +49,8 @@ public class TestFairOrderingPolicy {
     MockSchedulableEntity r1 = new MockSchedulableEntity();
     MockSchedulableEntity r2 = new MockSchedulableEntity();
 
-    Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+    assertEquals("Comparator Output", 0,
+        policy.getComparator().compare(r1, r2));
 
     //consumption
     r1.setUsed(Resources.createResource(1, 0));
@@ -65,7 +68,8 @@ public class TestFairOrderingPolicy {
     MockSchedulableEntity r2 = new MockSchedulableEntity();
 
     //No changes, equal
-    Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+    assertEquals("Comparator Output", 0,
+        policy.getComparator().compare(r1, r2));
 
     r1.setUsed(Resources.createResource(4 * GB));
     r2.setUsed(Resources.createResource(4 * GB));
@@ -79,7 +83,8 @@ public class TestFairOrderingPolicy {
       r2.getSchedulingResourceUsage());
 
     //Same, equal
-    Assert.assertTrue(policy.getComparator().compare(r1, r2) == 0);
+    assertEquals("Comparator Output", 0,
+        policy.getComparator().compare(r1, r2));
 
     r2.setUsed(Resources.createResource(5 * GB));
     r2.setPending(Resources.createResource(5 * GB));
@@ -235,4 +240,110 @@ public class TestFairOrderingPolicy {
     }
   }
 
+  @Test
+  public void testOrderingUsingUsedAndPendingResources() {
+    FairOrderingPolicy<MockSchedulableEntity> policy =
+        new FairOrderingPolicy<>();
+    policy.setSizeBasedWeight(true);
+    MockSchedulableEntity r1 = new MockSchedulableEntity();
+    MockSchedulableEntity r2 = new MockSchedulableEntity();
+
+    r1.setUsed(Resources.createResource(4 * GB));
+    r2.setUsed(Resources.createResource(4 * GB));
+
+    r1.setPending(Resources.createResource(4 * GB));
+    r2.setPending(Resources.createResource(4 * GB));
+
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
+
+    // Same, equal
+    assertEquals("Comparator Output", 0,
+        policy.getComparator().compare(r1, r2));
+
+    r1.setUsed(Resources.createResource(4 * GB));
+    r2.setUsed(Resources.createResource(8 * GB));
+
+    r1.setPending(Resources.createResource(4 * GB));
+    r2.setPending(Resources.createResource(8 * GB));
+
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
+
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0);
+  }
+
+  @Test
+  public void testOrderingUsingAppSubmitTime() {
+    FairOrderingPolicy<MockSchedulableEntity> policy =
+        new FairOrderingPolicy<>();
+    policy.setSizeBasedWeight(true);
+    MockSchedulableEntity r1 = new MockSchedulableEntity();
+    MockSchedulableEntity r2 = new MockSchedulableEntity();
+
+    // R1, R2 has been started at same time
+    assertEquals(r1.getStartTime(), r2.getStartTime());
+
+    // No changes, equal
+    assertEquals("Comparator Output", 0,
+        policy.getComparator().compare(r1, r2));
+
+    // R2 has been started after R1
+    r1.setStartTime(5);
+    r2.setStartTime(10);
+
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) < 0);
+
+    // R1 has been started after R2
+    r1.setStartTime(10);
+    r2.setStartTime(5);
+
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
+  }
+
+  @Test
+  public void testOrderingUsingAppDemand() {
+    FairOrderingPolicy<MockSchedulableEntity> policy =
+        new FairOrderingPolicy<MockSchedulableEntity>();
+    MockSchedulableEntity r1 = new MockSchedulableEntity();
+    MockSchedulableEntity r2 = new MockSchedulableEntity();
+
+    r1.setUsed(Resources.createResource(0));
+    r2.setUsed(Resources.createResource(0));
+
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
+
+    // Same, equal
+    assertEquals("Comparator Output", 0,
+        policy.getComparator().compare(r1, r2));
+
+    // Compare demands ensures entity without resource demands gets lower
+    // priority
+    r1.setPending(Resources.createResource(0));
+    r2.setPending(Resources.createResource(8 * GB));
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
+
+    Assert.assertTrue(policy.getComparator().compare(r1, r2) > 0);
+
+    // When both entity has certain demands, then there is no actual comparison
+    r1.setPending(Resources.createResource(4 * GB));
+    r2.setPending(Resources.createResource(12 * GB));
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r1.getSchedulingResourceUsage());
+    AbstractComparatorOrderingPolicy
+        .updateSchedulingResourceUsage(r2.getSchedulingResourceUsage());
+
+    assertEquals("Comparator Output", 0,
+        policy.getComparator().compare(r1, r2));
+  }
 }