|
@@ -92,7 +92,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContai
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|
@@ -114,13 +113,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
|
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
@@ -862,7 +862,7 @@ public class CapacityScheduler extends
|
|
|
LOG.info("Skip killing " + rmContainer.getContainerId());
|
|
|
continue;
|
|
|
}
|
|
|
- completedContainer(
|
|
|
+ super.completedContainer(
|
|
|
rmContainer,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
|
|
@@ -871,7 +871,7 @@ public class CapacityScheduler extends
|
|
|
|
|
|
// Release all reserved containers
|
|
|
for (RMContainer rmContainer : attempt.getReservedContainers()) {
|
|
|
- completedContainer(
|
|
|
+ super.completedContainer(
|
|
|
rmContainer,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
rmContainer.getContainerId(), "Application Complete"),
|
|
@@ -1044,7 +1044,7 @@ public class CapacityScheduler extends
|
|
|
for (ContainerStatus completedContainer : completedContainers) {
|
|
|
ContainerId containerId = completedContainer.getContainerId();
|
|
|
RMContainer container = getRMContainer(containerId);
|
|
|
- completedContainer(container, completedContainer,
|
|
|
+ super.completedContainer(container, completedContainer,
|
|
|
RMContainerEventType.FINISHED);
|
|
|
if (container != null) {
|
|
|
releasedContainers++;
|
|
@@ -1125,7 +1125,7 @@ public class CapacityScheduler extends
|
|
|
// Unreserve container on this node
|
|
|
RMContainer reservedContainer = node.getReservedContainer();
|
|
|
if (null != reservedContainer) {
|
|
|
- dropContainerReservation(reservedContainer);
|
|
|
+ killReservedContainer(reservedContainer);
|
|
|
}
|
|
|
|
|
|
// Update node labels after we've done this
|
|
@@ -1369,18 +1369,19 @@ public class CapacityScheduler extends
|
|
|
ContainerExpiredSchedulerEvent containerExpiredEvent =
|
|
|
(ContainerExpiredSchedulerEvent) event;
|
|
|
ContainerId containerId = containerExpiredEvent.getContainerId();
|
|
|
- completedContainer(getRMContainer(containerId),
|
|
|
+ super.completedContainer(getRMContainer(containerId),
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
containerId,
|
|
|
SchedulerUtils.EXPIRED_CONTAINER),
|
|
|
RMContainerEventType.EXPIRE);
|
|
|
}
|
|
|
break;
|
|
|
- case DROP_RESERVATION:
|
|
|
+ case KILL_RESERVED_CONTAINER:
|
|
|
{
|
|
|
- ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
|
|
|
- RMContainer container = dropReservationEvent.getContainer();
|
|
|
- dropContainerReservation(container);
|
|
|
+ ContainerPreemptEvent killReservedContainerEvent =
|
|
|
+ (ContainerPreemptEvent) event;
|
|
|
+ RMContainer container = killReservedContainerEvent.getContainer();
|
|
|
+ killReservedContainer(container);
|
|
|
}
|
|
|
break;
|
|
|
case PREEMPT_CONTAINER:
|
|
@@ -1392,19 +1393,11 @@ public class CapacityScheduler extends
|
|
|
preemptContainer(aid, containerToBePreempted);
|
|
|
}
|
|
|
break;
|
|
|
- case KILL_CONTAINER:
|
|
|
+ case KILL_PREEMPTED_CONTAINER:
|
|
|
{
|
|
|
ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
|
|
|
RMContainer containerToBeKilled = killContainerEvent.getContainer();
|
|
|
- killContainer(containerToBeKilled);
|
|
|
- }
|
|
|
- break;
|
|
|
- case CONTAINER_RESCHEDULED:
|
|
|
- {
|
|
|
- ContainerRescheduledEvent containerRescheduledEvent =
|
|
|
- (ContainerRescheduledEvent) event;
|
|
|
- RMContainer container = containerRescheduledEvent.getContainer();
|
|
|
- recoverResourceRequestForContainer(container);
|
|
|
+ killPreemptedContainer(containerToBeKilled);
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
@@ -1459,7 +1452,7 @@ public class CapacityScheduler extends
|
|
|
// Remove running containers
|
|
|
List<RMContainer> runningContainers = node.getRunningContainers();
|
|
|
for (RMContainer container : runningContainers) {
|
|
|
- completedContainer(container,
|
|
|
+ super.completedContainer(container,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
container.getContainerId(),
|
|
|
SchedulerUtils.LOST_CONTAINER),
|
|
@@ -1469,7 +1462,7 @@ public class CapacityScheduler extends
|
|
|
// Remove reservations, if any
|
|
|
RMContainer reservedContainer = node.getReservedContainer();
|
|
|
if (reservedContainer != null) {
|
|
|
- completedContainer(reservedContainer,
|
|
|
+ super.completedContainer(reservedContainer,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
reservedContainer.getContainerId(),
|
|
|
SchedulerUtils.LOST_CONTAINER),
|
|
@@ -1485,13 +1478,8 @@ public class CapacityScheduler extends
|
|
|
|
|
|
@Lock(CapacityScheduler.class)
|
|
|
@Override
|
|
|
- protected synchronized void completedContainer(RMContainer rmContainer,
|
|
|
+ protected synchronized void completedContainerInternal(RMContainer rmContainer,
|
|
|
ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
- if (rmContainer == null) {
|
|
|
- LOG.info("Container " + containerStatus.getContainerId() +
|
|
|
- " completed with event " + event);
|
|
|
- return;
|
|
|
- }
|
|
|
|
|
|
Container container = rmContainer.getContainer();
|
|
|
|
|
@@ -1593,11 +1581,14 @@ public class CapacityScheduler extends
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void dropContainerReservation(RMContainer container) {
|
|
|
+ public void killReservedContainer(RMContainer container) {
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
- LOG.debug("DROP_RESERVATION:" + container.toString());
|
|
|
+ LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":"
|
|
|
+ + container.toString());
|
|
|
}
|
|
|
- completedContainer(container,
|
|
|
+ // TODO: What happens if this is no longer a reserved container, for e.g if
|
|
|
+ // the reservation became an allocation.
|
|
|
+ super.completedContainer(container,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
container.getContainerId(),
|
|
|
SchedulerUtils.UNRESERVED_CONTAINER),
|
|
@@ -1607,23 +1598,24 @@ public class CapacityScheduler extends
|
|
|
@Override
|
|
|
public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
- LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
|
|
|
- " container: " + cont.toString());
|
|
|
+ LOG.debug(SchedulerEventType.PREEMPT_CONTAINER + ": appAttempt:"
|
|
|
+ + aid.toString() + " container: " + cont.toString());
|
|
|
}
|
|
|
FiCaSchedulerApp app = getApplicationAttempt(aid);
|
|
|
if (app != null) {
|
|
|
- app.addPreemptContainer(cont.getContainerId());
|
|
|
+ app.preemptContainer(cont.getContainerId());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void killContainer(RMContainer cont) {
|
|
|
+ public void killPreemptedContainer(RMContainer cont) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("KILL_CONTAINER: container" + cont.toString());
|
|
|
+ LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container"
|
|
|
+ + cont.toString());
|
|
|
}
|
|
|
- completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
|
|
|
- cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
|
|
|
- RMContainerEventType.KILL);
|
|
|
+ super.completedContainer(cont, SchedulerUtils
|
|
|
+ .createPreemptedContainerStatus(cont.getContainerId(),
|
|
|
+ SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
@Override
|