Bläddra i källkod

YARN-1241: Include missing files

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1546625 13f79535-47bb-0310-9956-ffa450edef68
Sanford Ryza 11 år sedan
förälder
incheckning
61b6ed73f8

+ 302 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java

@@ -0,0 +1,302 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+
+/**
+ * Handles tracking and enforcement for user and queue maxRunningApps
+ * constraints
+ */
+public class MaxRunningAppsEnforcer {
+  private final QueueManager queueMgr;
+
+  // Tracks the number of running applications by user.
+  private final Map<String, Integer> usersNumRunnableApps;
+  @VisibleForTesting
+  final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
+
+  public MaxRunningAppsEnforcer(QueueManager queueMgr) {
+    this.queueMgr = queueMgr;
+    this.usersNumRunnableApps = new HashMap<String, Integer>();
+    this.usersNonRunnableApps = ArrayListMultimap.create();
+  }
+
+  /**
+   * Checks whether making the application runnable would exceed any
+   * maxRunningApps limits.
+   */
+  public boolean canAppBeRunnable(FSQueue queue, String user) {
+    Integer userNumRunnable = usersNumRunnableApps.get(user);
+    if (userNumRunnable == null) {
+      userNumRunnable = 0;
+    }
+    if (userNumRunnable >= queueMgr.getUserMaxApps(user)) {
+      return false;
+    }
+    // Check queue and all parent queues
+    while (queue != null) {
+      int queueMaxApps = queueMgr.getQueueMaxApps(queue.getName());
+      if (queue.getNumRunnableApps() >= queueMaxApps) {
+        return false;
+      }
+      queue = queue.getParent();
+    }
+
+    return true;
+  }
+
+  /**
+   * Tracks the given new runnable app for purposes of maintaining max running
+   * app limits.
+   */
+  public void trackRunnableApp(FSSchedulerApp app) {
+    String user = app.getUser();
+    FSLeafQueue queue = app.getQueue();
+    // Increment running counts for all parent queues
+    FSParentQueue parent = queue.getParent();
+    while (parent != null) {
+      parent.incrementRunnableApps();
+      parent = parent.getParent();
+    }
+
+    Integer userNumRunnable = usersNumRunnableApps.get(user);
+    usersNumRunnableApps.put(user, (userNumRunnable == null ? 0
+        : userNumRunnable) + 1);
+  }
+
+  /**
+   * Tracks the given new non runnable app so that it can be made runnable when
+   * it would not violate max running app limits.
+   */
+  public void trackNonRunnableApp(FSSchedulerApp app) {
+    String user = app.getUser();
+    usersNonRunnableApps.put(user, app.getAppSchedulable());
+  }
+
+  /**
+   * Updates the relevant tracking variables after a runnable app with the given
+   * queue and user has been removed. Checks to see whether any other applications
+   * are now runnable and makes them so.
+   * 
+   * Runs in O(n log(n)) where n is the number of queues that are under the
+   * highest queue that went from having no slack to having slack.
+   */
+  public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
+    // Update usersRunnableApps
+    String user = app.getUser();
+    int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
+    if (newUserNumRunning == 0) {
+      usersNumRunnableApps.remove(user);
+    } else {
+      usersNumRunnableApps.put(user, newUserNumRunning);
+    }
+
+    // Update runnable app bookkeeping for queues:
+    // childqueueX might have no pending apps itself, but if a queue higher up
+    // in the hierarchy parentqueueY has a maxRunningApps set, an app completion
+    // in childqueueX could allow an app in some other distant child of
+    // parentqueueY to become runnable.
+    // An app removal will only possibly allow another app to become runnable if
+    // the queue was already at its max before the removal.
+    // Thus we find the ancestor queue highest in the tree for which the app
+    // that was at its maxRunningApps before the removal.
+    FSLeafQueue queue = app.getQueue();
+    FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
+        queueMgr.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
+    FSParentQueue parent = queue.getParent();
+    while (parent != null) {
+      if (parent.getNumRunnableApps() == queueMgr.getQueueMaxApps(parent
+          .getName())) {
+        highestQueueWithAppsNowRunnable = parent;
+      }
+      parent.decrementRunnableApps();
+      parent = parent.getParent();
+    }
+
+    List<List<AppSchedulable>> appsNowMaybeRunnable =
+        new ArrayList<List<AppSchedulable>>();
+
+    // Compile lists of apps which may now be runnable
+    // We gather lists instead of building a set of all non-runnable apps so
+    // that this whole operation can be O(number of queues) instead of
+    // O(number of apps)
+    if (highestQueueWithAppsNowRunnable != null) {
+      gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
+          appsNowMaybeRunnable);
+    }
+    if (newUserNumRunning == queueMgr.getUserMaxApps(user) - 1) {
+      List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
+      if (userWaitingApps != null) {
+        appsNowMaybeRunnable.add(userWaitingApps);
+      }
+    }
+
+    // Scan through and check whether this means that any apps are now runnable
+    Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
+        appsNowMaybeRunnable);
+    FSSchedulerApp prev = null;
+    int numNowRunnable = 0;
+    while (iter.hasNext()) {
+      FSSchedulerApp next = iter.next();
+      if (next == prev) {
+        continue;
+      }
+
+      if (canAppBeRunnable(next.getQueue(), next.getUser())) {
+        trackRunnableApp(next);
+        AppSchedulable appSched = next.getAppSchedulable();
+        next.getQueue().makeAppRunnable(appSched);
+        if (!usersNonRunnableApps.remove(next.getUser(), appSched)) {
+          throw new IllegalStateException("Waiting app " + next
+              + " expected to be in usersNonRunnableApps");
+        }
+
+        // No more than one app per list will be able to be made runnable, so
+        // we can stop looking after we've found that many
+        if (numNowRunnable >= appsNowMaybeRunnable.size()) {
+          break;
+        }
+      }
+
+      prev = next;
+    }
+  }
+  
+  /**
+   * Stops tracking the given non-runnable app
+   */
+  public void untrackNonRunnableApp(FSSchedulerApp app) {
+    usersNonRunnableApps.remove(app.getUser(), app.getAppSchedulable());
+  }
+
+  /**
+   * Traverses the queue hierarchy under the given queue to gather all lists
+   * of non-runnable applications.
+   */
+  private void gatherPossiblyRunnableAppLists(FSQueue queue,
+      List<List<AppSchedulable>> appLists) {
+    if (queue.getNumRunnableApps() < queueMgr.getQueueMaxApps(queue.getName())) {
+      if (queue instanceof FSLeafQueue) {
+        appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables());
+      } else {
+        for (FSQueue child : queue.getChildQueues()) {
+          gatherPossiblyRunnableAppLists(child, appLists);
+        }
+      }
+    }
+  }
+
+  /**
+   * Takes a list of lists, each of which is ordered by start time, and returns
+   * their elements in order of start time.
+   * 
+   * We maintain positions in each of the lists.  Each next() call advances
+   * the position in one of the lists.  We maintain a heap that orders lists
+   * by the start time of the app in the current position in that list.
+   * This allows us to pick which list to advance in O(log(num lists)) instead
+   * of O(num lists) time.
+   */
+  private static class MultiListStartTimeIterator implements
+      Iterator<FSSchedulerApp> {
+
+    private List<AppSchedulable>[] appLists;
+    private int[] curPositionsInAppLists;
+    private PriorityQueue<IndexAndTime> appListsByCurStartTime;
+
+    @SuppressWarnings("unchecked")
+    public MultiListStartTimeIterator(List<List<AppSchedulable>> appListList) {
+      appLists = appListList.toArray(new List[appListList.size()]);
+      curPositionsInAppLists = new int[appLists.length];
+      appListsByCurStartTime = new PriorityQueue<IndexAndTime>();
+      for (int i = 0; i < appLists.length; i++) {
+        long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0)
+            .getStartTime();
+        appListsByCurStartTime.add(new IndexAndTime(i, time));
+      }
+    }
+
+    @Override
+    public boolean hasNext() {
+      return !appListsByCurStartTime.isEmpty()
+          && appListsByCurStartTime.peek().time != Long.MAX_VALUE;
+    }
+
+    @Override
+    public FSSchedulerApp next() {
+      IndexAndTime indexAndTime = appListsByCurStartTime.remove();
+      int nextListIndex = indexAndTime.index;
+      AppSchedulable next = appLists[nextListIndex]
+          .get(curPositionsInAppLists[nextListIndex]);
+      curPositionsInAppLists[nextListIndex]++;
+
+      if (curPositionsInAppLists[nextListIndex] < appLists[nextListIndex].size()) {
+        indexAndTime.time = appLists[nextListIndex]
+            .get(curPositionsInAppLists[nextListIndex]).getStartTime();
+      } else {
+        indexAndTime.time = Long.MAX_VALUE;
+      }
+      appListsByCurStartTime.add(indexAndTime);
+
+      return next.getApp();
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Remove not supported");
+    }
+
+    private static class IndexAndTime implements Comparable<IndexAndTime> {
+      public int index;
+      public long time;
+
+      public IndexAndTime(int index, long time) {
+        this.index = index;
+        this.time = time;
+      }
+
+      @Override
+      public int compareTo(IndexAndTime o) {
+        return time < o.time ? -1 : (time > o.time ? 1 : 0);
+      }
+
+      @Override
+      public boolean equals(Object o) {
+        if (!(o instanceof IndexAndTime)) {
+          return false;
+        }
+        IndexAndTime other = (IndexAndTime)o;
+        return other.time == time;
+      }
+
+      @Override
+      public int hashCode() {
+        return (int)time;
+      }
+    }
+  }
+}

+ 152 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java

@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestMaxRunningAppsEnforcer {
+  private QueueManager queueManager;
+  private Map<String, Integer> queueMaxApps;
+  private Map<String, Integer> userMaxApps;
+  private MaxRunningAppsEnforcer maxAppsEnforcer;
+  private int appNum;
+  private TestFairScheduler.MockClock clock;
+  
+  @Before
+  public void setup() throws Exception {
+    clock = new TestFairScheduler.MockClock();
+    FairScheduler scheduler = mock(FairScheduler.class);
+    when(scheduler.getConf()).thenReturn(
+        new FairSchedulerConfiguration(new Configuration()));
+    when(scheduler.getClock()).thenReturn(clock);
+    
+    queueManager = new QueueManager(scheduler);
+    queueManager.initialize();
+    
+    queueMaxApps = queueManager.info.queueMaxApps;
+    userMaxApps = queueManager.info.userMaxApps;
+    maxAppsEnforcer = new MaxRunningAppsEnforcer(queueManager);
+    appNum = 0;
+  }
+  
+  private FSSchedulerApp addApp(FSLeafQueue queue, String user) {
+    ApplicationId appId = ApplicationId.newInstance(0l, appNum++);
+    ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0);
+    boolean runnable = maxAppsEnforcer.canAppBeRunnable(queue, user);
+    FSSchedulerApp app = new FSSchedulerApp(attId, user, queue, null, null);
+    queue.addApp(app, runnable);
+    if (runnable) {
+      maxAppsEnforcer.trackRunnableApp(app);
+    } else {
+      maxAppsEnforcer.trackNonRunnableApp(app);
+    }
+    return app;
+  }
+  
+  private void removeApp(FSSchedulerApp app) {
+    app.getQueue().removeApp(app);
+    maxAppsEnforcer.updateRunnabilityOnAppRemoval(app);
+  }
+  
+  @Test
+  public void testRemoveDoesNotEnableAnyApp() {
+    FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1", true);
+    FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue2", true);
+    queueMaxApps.put("root", 2);
+    queueMaxApps.put("root.queue1", 1);
+    queueMaxApps.put("root.queue2", 1);
+    FSSchedulerApp app1 = addApp(leaf1, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    assertEquals(1, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+    removeApp(app1);
+    assertEquals(0, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+  }
+  
+  @Test
+  public void testRemoveEnablesAppOnCousinQueue() {
+    FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
+    FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
+    queueMaxApps.put("root.queue1", 2);
+    FSSchedulerApp app1 = addApp(leaf1, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    assertEquals(1, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+    removeApp(app1);
+    assertEquals(0, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(2, leaf2.getRunnableAppSchedulables().size());
+    assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
+  }
+  
+  @Test
+  public void testRemoveEnablesOneByQueueOneByUser() {
+    FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.leaf1", true);
+    FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.leaf2", true);
+    queueMaxApps.put("root.queue1.leaf1", 2);
+    userMaxApps.put("user1", 1);
+    FSSchedulerApp app1 = addApp(leaf1, "user1");
+    addApp(leaf1, "user2");
+    addApp(leaf1, "user3");
+    addApp(leaf2, "user1");
+    assertEquals(2, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf1.getNonRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+    removeApp(app1);
+    assertEquals(2, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+    assertEquals(0, leaf1.getNonRunnableAppSchedulables().size());
+    assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
+  }
+  
+  @Test
+  public void testRemoveEnablingOrderedByStartTime() {
+    FSLeafQueue leaf1 = queueManager.getLeafQueue("root.queue1.subqueue1.leaf1", true);
+    FSLeafQueue leaf2 = queueManager.getLeafQueue("root.queue1.subqueue2.leaf2", true);
+    queueMaxApps.put("root.queue1", 2);
+    FSSchedulerApp app1 = addApp(leaf1, "user");
+    addApp(leaf2, "user");
+    addApp(leaf2, "user");
+    clock.tick(20);
+    addApp(leaf1, "user");
+    assertEquals(1, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getRunnableAppSchedulables().size());
+    assertEquals(1, leaf1.getNonRunnableAppSchedulables().size());
+    assertEquals(1, leaf2.getNonRunnableAppSchedulables().size());
+    removeApp(app1);
+    assertEquals(0, leaf1.getRunnableAppSchedulables().size());
+    assertEquals(2, leaf2.getRunnableAppSchedulables().size());
+    assertEquals(0, leaf2.getNonRunnableAppSchedulables().size());
+  }
+  
+}