|
@@ -19,6 +19,7 @@
|
|
|
package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
@@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
|
|
|
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
|
|
|
.ChangeMonitoringContainerResourceEvent;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
|
|
|
|
|
|
|
|
@@ -74,7 +76,7 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
queuedOpportunisticContainers = new LinkedHashMap<>();
|
|
|
|
|
|
// Used to keep track of containers that have been marked to be killed
|
|
|
- // to make room for a guaranteed container.
|
|
|
+ // or paused to make room for a guaranteed container.
|
|
|
private final Map<ContainerId, Container> oppContainersToKill =
|
|
|
new HashMap<>();
|
|
|
|
|
@@ -98,6 +100,8 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
private final AsyncDispatcher dispatcher;
|
|
|
private final NodeManagerMetrics metrics;
|
|
|
|
|
|
+ private Boolean usePauseEventForPreemption = false;
|
|
|
+
|
|
|
/**
|
|
|
* Instantiate a Container Scheduler.
|
|
|
* @param context NodeManager Context.
|
|
@@ -112,6 +116,17 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
DEFAULT_NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH));
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void serviceInit(Configuration conf) throws Exception {
|
|
|
+ super.serviceInit(conf);
|
|
|
+ this.usePauseEventForPreemption =
|
|
|
+ conf.getBoolean(
|
|
|
+ YarnConfiguration.NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION,
|
|
|
+ YarnConfiguration.
|
|
|
+ DEFAULT_NM_CONTAINER_QUEUING_USE_PAUSE_FOR_PREEMPTION);
|
|
|
+ }
|
|
|
+
|
|
|
@VisibleForTesting
|
|
|
public ContainerScheduler(Context context, AsyncDispatcher dispatcher,
|
|
|
NodeManagerMetrics metrics, int qLength) {
|
|
@@ -136,8 +151,9 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
case SCHEDULE_CONTAINER:
|
|
|
scheduleContainer(event.getContainer());
|
|
|
break;
|
|
|
+ case CONTAINER_PAUSED:
|
|
|
case CONTAINER_COMPLETED:
|
|
|
- onContainerCompleted(event.getContainer());
|
|
|
+ onResourcesReclaimed(event.getContainer());
|
|
|
break;
|
|
|
case UPDATE_CONTAINER:
|
|
|
if (event instanceof UpdateContainerSchedulerEvent) {
|
|
@@ -203,9 +219,9 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
queuedGuaranteedContainers.put(containerId,
|
|
|
updateEvent.getContainer());
|
|
|
}
|
|
|
- //Kill opportunistic containers if any to make room for
|
|
|
+ //Kill/pause opportunistic containers if any to make room for
|
|
|
// promotion request
|
|
|
- killOpportunisticContainers(updateEvent.getContainer());
|
|
|
+ reclaimOpportunisticContainerResources(updateEvent.getContainer());
|
|
|
} else {
|
|
|
// Demotion of queued container.. Should not happen too often
|
|
|
// since you should not find too many queued guaranteed
|
|
@@ -238,6 +254,12 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
return this.queuedOpportunisticContainers.size();
|
|
|
}
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ public void setUsePauseEventForPreemption(
|
|
|
+ boolean usePauseEventForPreemption) {
|
|
|
+ this.usePauseEventForPreemption = usePauseEventForPreemption;
|
|
|
+ }
|
|
|
+
|
|
|
public OpportunisticContainersStatus getOpportunisticContainersStatus() {
|
|
|
this.opportunisticContainersStatus.setQueuedOpportContainers(
|
|
|
getNumQueuedOpportunisticContainers());
|
|
@@ -252,7 +274,7 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
return this.opportunisticContainersStatus;
|
|
|
}
|
|
|
|
|
|
- private void onContainerCompleted(Container container) {
|
|
|
+ private void onResourcesReclaimed(Container container) {
|
|
|
oppContainersToKill.remove(container.getContainerId());
|
|
|
|
|
|
// This could be killed externally for eg. by the ContainerManager,
|
|
@@ -282,6 +304,24 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
// Start pending guaranteed containers, if resources available.
|
|
|
boolean resourcesAvailable =
|
|
|
startContainersFromQueue(queuedGuaranteedContainers.values());
|
|
|
+ // Resume opportunistic containers, if resource available.
|
|
|
+ if (resourcesAvailable) {
|
|
|
+ List<Container> pausedContainers = new ArrayList<Container>();
|
|
|
+ Map<ContainerId, Container> containers =
|
|
|
+ context.getContainers();
|
|
|
+ for (Map.Entry<ContainerId, Container>entry : containers.entrySet()) {
|
|
|
+ ContainerId contId = entry.getKey();
|
|
|
+ // Find containers that were not already started and are in paused state
|
|
|
+ if(false == runningContainers.containsKey(contId)) {
|
|
|
+ if(containers.get(contId).getContainerState()
|
|
|
+ == ContainerState.PAUSED) {
|
|
|
+ pausedContainers.add(containers.get(contId));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ resourcesAvailable =
|
|
|
+ startContainersFromQueue(pausedContainers);
|
|
|
+ }
|
|
|
// Start opportunistic containers, if resources available.
|
|
|
if (resourcesAvailable) {
|
|
|
startContainersFromQueue(queuedOpportunisticContainers.values());
|
|
@@ -378,7 +418,7 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
// if the guaranteed container is queued, we need to preempt opportunistic
|
|
|
// containers for make room for it
|
|
|
if (queuedGuaranteedContainers.containsKey(container.getContainerId())) {
|
|
|
- killOpportunisticContainers(container);
|
|
|
+ reclaimOpportunisticContainerResources(container);
|
|
|
}
|
|
|
} else {
|
|
|
// Given an opportunistic container, we first try to start as many queuing
|
|
@@ -396,19 +436,30 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void killOpportunisticContainers(Container container) {
|
|
|
- List<Container> extraOpportContainersToKill =
|
|
|
- pickOpportunisticContainersToKill(container.getContainerId());
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void reclaimOpportunisticContainerResources(Container container) {
|
|
|
+ List<Container> extraOppContainersToReclaim =
|
|
|
+ pickOpportunisticContainersToReclaimResources(
|
|
|
+ container.getContainerId());
|
|
|
// Kill the opportunistic containers that were chosen.
|
|
|
- for (Container contToKill : extraOpportContainersToKill) {
|
|
|
- contToKill.sendKillEvent(
|
|
|
- ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
|
|
|
- "Container Killed to make room for Guaranteed Container.");
|
|
|
- oppContainersToKill.put(contToKill.getContainerId(), contToKill);
|
|
|
+ for (Container contToReclaim : extraOppContainersToReclaim) {
|
|
|
+ String preemptionAction = usePauseEventForPreemption == true ? "paused" :
|
|
|
+ "resumed";
|
|
|
LOG.info(
|
|
|
- "Opportunistic container {} will be killed in order to start the "
|
|
|
+ "Container {} will be {} to start the "
|
|
|
+ "execution of guaranteed container {}.",
|
|
|
- contToKill.getContainerId(), container.getContainerId());
|
|
|
+ contToReclaim.getContainerId(), preemptionAction,
|
|
|
+ container.getContainerId());
|
|
|
+
|
|
|
+ if (usePauseEventForPreemption) {
|
|
|
+ contToReclaim.sendPauseEvent(
|
|
|
+ "Container Paused to make room for Guaranteed Container");
|
|
|
+ } else {
|
|
|
+ contToReclaim.sendKillEvent(
|
|
|
+ ContainerExitStatus.KILLED_BY_CONTAINER_SCHEDULER,
|
|
|
+ "Container Killed to make room for Guaranteed Container.");
|
|
|
+ }
|
|
|
+ oppContainersToKill.put(contToReclaim.getContainerId(), contToReclaim);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -423,7 +474,7 @@ public class ContainerScheduler extends AbstractService implements
|
|
|
container.sendLaunchEvent();
|
|
|
}
|
|
|
|
|
|
- private List<Container> pickOpportunisticContainersToKill(
|
|
|
+ private List<Container> pickOpportunisticContainersToReclaimResources(
|
|
|
ContainerId containerToStartId) {
|
|
|
// The opportunistic containers that need to be killed for the
|
|
|
// given container to start.
|