|
@@ -238,6 +238,11 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
return this.queuedOpportunisticContainers.size();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public int getNumRunningContainers() {
|
|
|
+ return this.runningContainers.size();
|
|
|
+ }
|
|
|
+
|
|
|
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
|
|
this.opportunisticContainersStatus.setQueuedOpportContainers(
|
|
|
getNumQueuedOpportunisticContainers());
|
|
@@ -274,27 +279,32 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
ExecutionType.OPPORTUNISTIC) {
|
|
|
this.metrics.completeOpportunisticContainer(container.getResource());
|
|
|
}
|
|
|
- startPendingContainers();
|
|
|
+ startPendingContainers(false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void startPendingContainers() {
|
|
|
+ /**
|
|
|
+ * Start pending containers in the queue.
|
|
|
+ * @param forceStartGuaranteedContaieners When this is true, start guaranteed
|
|
|
+ * container without looking at available resource
|
|
|
+ */
|
|
|
+ private void startPendingContainers(boolean forceStartGuaranteedContaieners) {
|
|
|
// Start pending guaranteed containers, if resources available.
|
|
|
- boolean resourcesAvailable =
|
|
|
- startContainersFromQueue(queuedGuaranteedContainers.values());
|
|
|
+ boolean resourcesAvailable = startContainers(
|
|
|
+ queuedGuaranteedContainers.values(), forceStartGuaranteedContaieners);
|
|
|
// Start opportunistic containers, if resources available.
|
|
|
if (resourcesAvailable) {
|
|
|
- startContainersFromQueue(queuedOpportunisticContainers.values());
|
|
|
+ startContainers(queuedOpportunisticContainers.values(), false);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private boolean startContainersFromQueue(
|
|
|
- Collection<Container> queuedContainers) {
|
|
|
- Iterator<Container> cIter = queuedContainers.iterator();
|
|
|
+ private boolean startContainers(
|
|
|
+ Collection<Container> containersToBeStarted, boolean force) {
|
|
|
+ Iterator<Container> cIter = containersToBeStarted.iterator();
|
|
|
boolean resourcesAvailable = true;
|
|
|
while (cIter.hasNext() && resourcesAvailable) {
|
|
|
Container container = cIter.next();
|
|
|
- if (tryStartContainer(container)) {
|
|
|
+ if (tryStartContainer(container, force)) {
|
|
|
cIter.remove();
|
|
|
} else {
|
|
|
resourcesAvailable = false;
|
|
@@ -303,9 +313,11 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
return resourcesAvailable;
|
|
|
}
|
|
|
|
|
|
- private boolean tryStartContainer(Container container) {
|
|
|
+ private boolean tryStartContainer(Container container, boolean force) {
|
|
|
boolean containerStarted = false;
|
|
|
- if (resourceAvailableToStartContainer(container)) {
|
|
|
+ // call startContainer without checking available resource when force==true
|
|
|
+ if (force || resourceAvailableToStartContainer(
|
|
|
+ container)) {
|
|
|
startContainer(container);
|
|
|
containerStarted = true;
|
|
|
}
|
|
@@ -373,7 +385,12 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
// enough number of opportunistic containers.
|
|
|
if (isGuaranteedContainer) {
|
|
|
enqueueContainer(container);
|
|
|
- startPendingContainers();
|
|
|
+
|
|
|
+ // When opportunistic container not allowed (which is determined by
|
|
|
+ // max-queue length of pending opportunistic containers <= 0), start
|
|
|
+ // guaranteed containers without looking at available resources.
|
|
|
+ boolean forceStartGuaranteedContainers = (maxOppQueueLength <= 0);
|
|
|
+ startPendingContainers(forceStartGuaranteedContainers);
|
|
|
|
|
|
// if the guaranteed container is queued, we need to preempt opportunistic
|
|
|
// containers for make room for it
|
|
@@ -386,12 +403,12 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
// containers based on remaining resource available, then enqueue the
|
|
|
// opportunistic container. If the container is enqueued, we do another
|
|
|
// pass to try to start the newly enqueued opportunistic container.
|
|
|
- startPendingContainers();
|
|
|
+ startPendingContainers(false);
|
|
|
boolean containerQueued = enqueueContainer(container);
|
|
|
// container may not get queued because the max opportunistic container
|
|
|
// queue length is reached. If so, there is no point doing another pass
|
|
|
if (containerQueued) {
|
|
|
- startPendingContainers();
|
|
|
+ startPendingContainers(false);
|
|
|
}
|
|
|
}
|
|
|
}
|