Browse Source

YARN-3632. Ordering policy should be allowed to reorder an application when demand changes. Contributed by Craig Welch
(cherry picked from commit 10732d515f62258309f98e4d7d23249f80b1847d)

Jian He 10 năm trước cách đây
mục cha
commit
5e7be094ec
9 tập tin đã thay đổi với 187 bổ sung9 xóa
  1. 3 0
      hadoop-yarn-project/CHANGES.txt
  2. 6 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
  3. 3 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  4. 16 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
  5. 25 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java
  6. 9 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java
  7. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java
  8. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java
  9. 115 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

+ 3 - 0
hadoop-yarn-project/CHANGES.txt

@@ -398,6 +398,9 @@ Release 2.8.0 - UNRELEASED
 
     YARN-3707. RM Web UI queue filter doesn't work. (Wangda Tan via jianhe)
 
+    YARN-3632. Ordering policy should be allowed to reorder an application when
+    demand changes. (Craig Welch via jianhe)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

+ 6 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java

@@ -128,11 +128,14 @@ public class AppSchedulingInfo {
    *
    * @param requests resources to be acquired
    * @param recoverPreemptedRequest recover Resource Request on preemption
+   * @return true if any resource was updated, false else
    */
-  synchronized public void updateResourceRequests(
+  synchronized public boolean updateResourceRequests(
       List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
     QueueMetrics metrics = queue.getMetrics();
     
+    boolean anyResourcesUpdated = false;
+
     // Update resource requests
     for (ResourceRequest request : requests) {
       Priority priority = request.getPriority();
@@ -146,6 +149,7 @@ public class AppSchedulingInfo {
               + request);
         }
         updatePendingResources = true;
+        anyResourcesUpdated = true;
         
         // Premature optimization?
         // Assumes that we won't see more than one priority request updated
@@ -209,6 +213,7 @@ public class AppSchedulingInfo {
         }
       }
     }
+    return anyResourcesUpdated;
   }
 
   /**

+ 3 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -284,11 +284,12 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
     return queue;
   }
   
-  public synchronized void updateResourceRequests(
+  public synchronized boolean updateResourceRequests(
       List<ResourceRequest> requests) {
     if (!isStopped) {
-      appSchedulingInfo.updateResourceRequests(requests, false);
+      return appSchedulingInfo.updateResourceRequests(requests, false);
     }
+    return false;
   }
   
   public synchronized void recoverResourceRequests(

+ 16 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

@@ -895,6 +895,10 @@ public class CapacityScheduler extends
     // Release containers
     releaseContainers(release, application);
 
+    Allocation allocation;
+
+    LeafQueue updateDemandForQueue = null;
+
     synchronized (application) {
 
       // make sure we aren't stopping/removing the application
@@ -915,8 +919,10 @@ public class CapacityScheduler extends
         application.showRequests();
   
         // Update application requests
-        application.updateResourceRequests(ask);
-  
+        if (application.updateResourceRequests(ask)) {
+          updateDemandForQueue = (LeafQueue) application.getQueue();
+        }
+
         LOG.debug("allocate: post-update");
         application.showRequests();
       }
@@ -929,9 +935,16 @@ public class CapacityScheduler extends
 
       application.updateBlacklist(blacklistAdditions, blacklistRemovals);
 
-      return application.getAllocation(getResourceCalculator(),
+      allocation = application.getAllocation(getResourceCalculator(),
                    clusterResource, getMinimumResourceCapability());
     }
+
+    if (updateDemandForQueue != null) {
+      updateDemandForQueue.getOrderingPolicy().demandUpdated(application);
+    }
+
+    return allocation;
+
   }
 
   @Override

+ 25 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/AbstractComparatorOrderingPolicy.java

@@ -37,6 +37,7 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
                                             
   protected TreeSet<S> schedulableEntities;
   protected Comparator<SchedulableEntity> comparator;
+  protected Map<String, S> entitiesToReorder = new HashMap<String, S>();
   
   public AbstractComparatorOrderingPolicy() { }
   
@@ -47,11 +48,13 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
   
   @Override
   public Iterator<S> getAssignmentIterator() {
+    reorderScheduleEntities();
     return schedulableEntities.iterator();
   }
   
   @Override
   public Iterator<S> getPreemptionIterator() {
+    reorderScheduleEntities();
     return schedulableEntities.descendingIterator();
   }
   
@@ -68,6 +71,22 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
     schedulableEntities.add(schedulableEntity);
   }
   
+  protected void reorderScheduleEntities() {
+    synchronized (entitiesToReorder) {
+      for (Map.Entry<String, S> entry :
+          entitiesToReorder.entrySet()) {
+        reorderSchedulableEntity(entry.getValue());
+      }
+      entitiesToReorder.clear();
+    }
+  }
+
+  protected void entityRequiresReordering(S schedulableEntity) {
+    synchronized (entitiesToReorder) {
+      entitiesToReorder.put(schedulableEntity.getId(), schedulableEntity);
+    }
+  }
+
   @VisibleForTesting
   public Comparator<SchedulableEntity> getComparator() {
     return comparator; 
@@ -80,6 +99,9 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
   
   @Override
   public boolean removeSchedulableEntity(S s) {
+    synchronized (entitiesToReorder) {
+      entitiesToReorder.remove(s.getId());
+    }
     return schedulableEntities.remove(s); 
   }
   
@@ -104,6 +126,9 @@ public abstract class AbstractComparatorOrderingPolicy<S extends SchedulableEnti
   public abstract void containerReleased(S schedulableEntity, 
     RMContainer r);
   
+  @Override
+  public abstract void demandUpdated(S schedulableEntity);
+
   @Override
   public abstract String getInfo();
   

+ 9 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FairOrderingPolicy.java

@@ -96,15 +96,22 @@ public class FairOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
   @Override
   public void containerAllocated(S schedulableEntity,
     RMContainer r) {
-      reorderSchedulableEntity(schedulableEntity);
+      entityRequiresReordering(schedulableEntity);
     }
 
   @Override
   public void containerReleased(S schedulableEntity,
     RMContainer r) {
-      reorderSchedulableEntity(schedulableEntity);
+      entityRequiresReordering(schedulableEntity);
     }
 
+  @Override
+  public void demandUpdated(S schedulableEntity) {
+    if (sizeBasedWeight) {
+      entityRequiresReordering(schedulableEntity);
+    }
+  }
+
   @Override
   public String getInfo() {
     String sbw = sizeBasedWeight ? " with sizeBasedWeight" : "";

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/FifoOrderingPolicy.java

@@ -46,7 +46,11 @@ public class FifoOrderingPolicy<S extends SchedulableEntity> extends AbstractCom
   public void containerReleased(S schedulableEntity, 
     RMContainer r) {
     }
-  
+
+  @Override
+  public void demandUpdated(S schedulableEntity) {
+  }
+
   @Override
   public String getInfo() {
     return "FifoOrderingPolicy";

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/policy/OrderingPolicy.java

@@ -101,6 +101,11 @@ public interface OrderingPolicy<S extends SchedulableEntity> {
   public void containerReleased(S schedulableEntity, 
     RMContainer r);
   
+  /**
+   * Demand Updated for the passed schedulableEntity, reorder if needed.
+   */
+  void demandUpdated(S schedulableEntity);
+
   /**
    * Display information regarding configuration & status
    */

+ 115 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.nodelabels.RMNodeLabel;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
@@ -126,6 +128,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -676,6 +679,118 @@ public class TestCapacityScheduler {
     rm.stop();
   }
   
+  @Test
+  public void testAllocateReorder() throws Exception {
+
+    //Confirm that allocation (resource request) alone will trigger a change in
+    //application ordering where appropriate
+
+    Configuration conf = new Configuration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+        ResourceScheduler.class);
+    MockRM rm = new MockRM(conf);
+    rm.start();
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+
+    LeafQueue q = (LeafQueue) cs.getQueue("default");
+    Assert.assertNotNull(q);
+
+    FairOrderingPolicy fop = new FairOrderingPolicy();
+    fop.setSizeBasedWeight(true);
+    q.setOrderingPolicy(fop);
+
+    String host = "127.0.0.1";
+    RMNode node =
+        MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, host);
+    cs.handle(new NodeAddedSchedulerEvent(node));
+
+    //add app begin
+    ApplicationId appId1 = BuilderUtils.newApplicationId(100, 1);
+    ApplicationAttemptId appAttemptId1 = BuilderUtils.newApplicationAttemptId(
+        appId1, 1);
+
+    RMAppAttemptMetrics attemptMetric1 =
+        new RMAppAttemptMetrics(appAttemptId1, rm.getRMContext());
+    RMAppImpl app1 = mock(RMAppImpl.class);
+    when(app1.getApplicationId()).thenReturn(appId1);
+    RMAppAttemptImpl attempt1 = mock(RMAppAttemptImpl.class);
+    when(attempt1.getAppAttemptId()).thenReturn(appAttemptId1);
+    when(attempt1.getRMAppAttemptMetrics()).thenReturn(attemptMetric1);
+    when(app1.getCurrentAppAttempt()).thenReturn(attempt1);
+
+    rm.getRMContext().getRMApps().put(appId1, app1);
+
+    SchedulerEvent addAppEvent1 =
+        new AppAddedSchedulerEvent(appId1, "default", "user");
+    cs.handle(addAppEvent1);
+    SchedulerEvent addAttemptEvent1 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId1, false);
+    cs.handle(addAttemptEvent1);
+    //add app end
+
+    //add app begin
+    ApplicationId appId2 = BuilderUtils.newApplicationId(100, 2);
+    ApplicationAttemptId appAttemptId2 = BuilderUtils.newApplicationAttemptId(
+        appId2, 1);
+
+    RMAppAttemptMetrics attemptMetric2 =
+        new RMAppAttemptMetrics(appAttemptId2, rm.getRMContext());
+    RMAppImpl app2 = mock(RMAppImpl.class);
+    when(app2.getApplicationId()).thenReturn(appId2);
+    RMAppAttemptImpl attempt2 = mock(RMAppAttemptImpl.class);
+    when(attempt2.getAppAttemptId()).thenReturn(appAttemptId2);
+    when(attempt2.getRMAppAttemptMetrics()).thenReturn(attemptMetric2);
+    when(app2.getCurrentAppAttempt()).thenReturn(attempt2);
+
+    rm.getRMContext().getRMApps().put(appId2, app2);
+
+    SchedulerEvent addAppEvent2 =
+        new AppAddedSchedulerEvent(appId2, "default", "user");
+    cs.handle(addAppEvent2);
+    SchedulerEvent addAttemptEvent2 =
+        new AppAttemptAddedSchedulerEvent(appAttemptId2, false);
+    cs.handle(addAttemptEvent2);
+    //add app end
+
+    RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
+
+    Priority priority = TestUtils.createMockPriority(1);
+    ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
+
+    //This will allocate for app1
+    cs.allocate(appAttemptId1,
+        Collections.<ResourceRequest>singletonList(r1),
+        Collections.<ContainerId>emptyList(),
+        null, null);
+
+    //And this will result in container assignment for app1
+    CapacityScheduler.schedule(cs);
+
+    //Verify that app1 is still first in assignment order
+    //This happens because app2 has no demand/a magnitude of NaN, which
+    //results in app1 and app2 being equal in the fairness comparison and
+    //failling back to fifo (start) ordering
+    assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
+      appId1.toString());
+
+    //Now, allocate for app2 (this would be the first/AM allocation)
+    ResourceRequest r2 = TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true, priority, recordFactory);
+    cs.allocate(appAttemptId2,
+        Collections.<ResourceRequest>singletonList(r2),
+        Collections.<ContainerId>emptyList(),
+        null, null);
+
+    //In this case we do not perform container assignment because we want to
+    //verify re-ordering based on the allocation alone
+
+    //Now, the first app for assignment is app2
+    assertEquals(q.getOrderingPolicy().getAssignmentIterator().next().getId(),
+      appId2.toString());
+
+    rm.stop();
+  }
+
   @Test
   public void testResourceOverCommit() throws Exception {
     Configuration conf = new Configuration();