|
@@ -24,6 +24,9 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.PriorityQueue;
|
|
|
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.common.collect.ArrayListMultimap;
|
|
|
import com.google.common.collect.ListMultimap;
|
|
@@ -33,6 +36,8 @@ import com.google.common.collect.ListMultimap;
|
|
|
* constraints
|
|
|
*/
|
|
|
public class MaxRunningAppsEnforcer {
|
|
|
+ private static final Log LOG = LogFactory.getLog(FairScheduler.class);
|
|
|
+
|
|
|
private final FairScheduler scheduler;
|
|
|
|
|
|
// Tracks the number of running applications by user.
|
|
@@ -163,7 +168,7 @@ public class MaxRunningAppsEnforcer {
|
|
|
Iterator<FSSchedulerApp> iter = new MultiListStartTimeIterator(
|
|
|
appsNowMaybeRunnable);
|
|
|
FSSchedulerApp prev = null;
|
|
|
- int numNowRunnable = 0;
|
|
|
+ List<AppSchedulable> noLongerPendingApps = new ArrayList<AppSchedulable>();
|
|
|
while (iter.hasNext()) {
|
|
|
FSSchedulerApp next = iter.next();
|
|
|
if (next == prev) {
|
|
@@ -173,21 +178,34 @@ public class MaxRunningAppsEnforcer {
|
|
|
if (canAppBeRunnable(next.getQueue(), next.getUser())) {
|
|
|
trackRunnableApp(next);
|
|
|
AppSchedulable appSched = next.getAppSchedulable();
|
|
|
- next.getQueue().makeAppRunnable(appSched);
|
|
|
- if (!usersNonRunnableApps.remove(next.getUser(), appSched)) {
|
|
|
- throw new IllegalStateException("Waiting app " + next
|
|
|
- + " expected to be in usersNonRunnableApps");
|
|
|
- }
|
|
|
+ next.getQueue().getRunnableAppSchedulables().add(appSched);
|
|
|
+ noLongerPendingApps.add(appSched);
|
|
|
|
|
|
// No more than one app per list will be able to be made runnable, so
|
|
|
// we can stop looking after we've found that many
|
|
|
- if (numNowRunnable >= appsNowMaybeRunnable.size()) {
|
|
|
+ if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
|
|
|
prev = next;
|
|
|
}
|
|
|
+
|
|
|
+ // We remove the apps from their pending lists afterwards so that we don't
|
|
|
+ // pull them out from under the iterator. If they are not in these lists
|
|
|
+ // in the first place, there is a bug.
|
|
|
+ for (AppSchedulable appSched : noLongerPendingApps) {
|
|
|
+ if (!appSched.getApp().getQueue().getNonRunnableAppSchedulables()
|
|
|
+ .remove(appSched)) {
|
|
|
+ LOG.error("Can't make app runnable that does not already exist in queue"
|
|
|
+ + " as non-runnable: " + appSched + ". This should never happen.");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!usersNonRunnableApps.remove(appSched.getApp().getUser(), appSched)) {
|
|
|
+ LOG.error("Waiting app " + appSched + " expected to be in "
|
|
|
+ + "usersNonRunnableApps, but was not. This should never happen.");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -225,7 +243,7 @@ public class MaxRunningAppsEnforcer {
|
|
|
* This allows us to pick which list to advance in O(log(num lists)) instead
|
|
|
* of O(num lists) time.
|
|
|
*/
|
|
|
- private static class MultiListStartTimeIterator implements
|
|
|
+ static class MultiListStartTimeIterator implements
|
|
|
Iterator<FSSchedulerApp> {
|
|
|
|
|
|
private List<AppSchedulable>[] appLists;
|