|
@@ -93,6 +93,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContai
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
|
|
@@ -114,14 +115,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerRescheduledEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.utils.Lock;
|
|
|
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
@@ -865,7 +865,7 @@ public class CapacityScheduler extends
|
|
|
LOG.info("Skip killing " + rmContainer.getContainerId());
|
|
|
continue;
|
|
|
}
|
|
|
- super.completedContainer(
|
|
|
+ completedContainer(
|
|
|
rmContainer,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
rmContainer.getContainerId(), SchedulerUtils.COMPLETED_APPLICATION),
|
|
@@ -874,7 +874,7 @@ public class CapacityScheduler extends
|
|
|
|
|
|
// Release all reserved containers
|
|
|
for (RMContainer rmContainer : attempt.getReservedContainers()) {
|
|
|
- super.completedContainer(
|
|
|
+ completedContainer(
|
|
|
rmContainer,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
rmContainer.getContainerId(), "Application Complete"),
|
|
@@ -1047,7 +1047,7 @@ public class CapacityScheduler extends
|
|
|
for (ContainerStatus completedContainer : completedContainers) {
|
|
|
ContainerId containerId = completedContainer.getContainerId();
|
|
|
RMContainer container = getRMContainer(containerId);
|
|
|
- super.completedContainer(container, completedContainer,
|
|
|
+ completedContainer(container, completedContainer,
|
|
|
RMContainerEventType.FINISHED);
|
|
|
if (container != null) {
|
|
|
releasedContainers++;
|
|
@@ -1128,7 +1128,7 @@ public class CapacityScheduler extends
|
|
|
// Unreserve container on this node
|
|
|
RMContainer reservedContainer = node.getReservedContainer();
|
|
|
if (null != reservedContainer) {
|
|
|
- killReservedContainer(reservedContainer);
|
|
|
+ dropContainerReservation(reservedContainer);
|
|
|
}
|
|
|
|
|
|
// Update node labels after we've done this
|
|
@@ -1372,19 +1372,18 @@ public class CapacityScheduler extends
|
|
|
ContainerExpiredSchedulerEvent containerExpiredEvent =
|
|
|
(ContainerExpiredSchedulerEvent) event;
|
|
|
ContainerId containerId = containerExpiredEvent.getContainerId();
|
|
|
- super.completedContainer(getRMContainer(containerId),
|
|
|
+ completedContainer(getRMContainer(containerId),
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
containerId,
|
|
|
SchedulerUtils.EXPIRED_CONTAINER),
|
|
|
RMContainerEventType.EXPIRE);
|
|
|
}
|
|
|
break;
|
|
|
- case KILL_RESERVED_CONTAINER:
|
|
|
+ case DROP_RESERVATION:
|
|
|
{
|
|
|
- ContainerPreemptEvent killReservedContainerEvent =
|
|
|
- (ContainerPreemptEvent) event;
|
|
|
- RMContainer container = killReservedContainerEvent.getContainer();
|
|
|
- killReservedContainer(container);
|
|
|
+ ContainerPreemptEvent dropReservationEvent = (ContainerPreemptEvent)event;
|
|
|
+ RMContainer container = dropReservationEvent.getContainer();
|
|
|
+ dropContainerReservation(container);
|
|
|
}
|
|
|
break;
|
|
|
case PREEMPT_CONTAINER:
|
|
@@ -1396,11 +1395,19 @@ public class CapacityScheduler extends
|
|
|
preemptContainer(aid, containerToBePreempted);
|
|
|
}
|
|
|
break;
|
|
|
- case KILL_PREEMPTED_CONTAINER:
|
|
|
+ case KILL_CONTAINER:
|
|
|
{
|
|
|
ContainerPreemptEvent killContainerEvent = (ContainerPreemptEvent)event;
|
|
|
RMContainer containerToBeKilled = killContainerEvent.getContainer();
|
|
|
- killPreemptedContainer(containerToBeKilled);
|
|
|
+ killContainer(containerToBeKilled);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case CONTAINER_RESCHEDULED:
|
|
|
+ {
|
|
|
+ ContainerRescheduledEvent containerRescheduledEvent =
|
|
|
+ (ContainerRescheduledEvent) event;
|
|
|
+ RMContainer container = containerRescheduledEvent.getContainer();
|
|
|
+ recoverResourceRequestForContainer(container);
|
|
|
}
|
|
|
break;
|
|
|
default:
|
|
@@ -1455,7 +1462,7 @@ public class CapacityScheduler extends
|
|
|
// Remove running containers
|
|
|
List<RMContainer> runningContainers = node.getRunningContainers();
|
|
|
for (RMContainer container : runningContainers) {
|
|
|
- super.completedContainer(container,
|
|
|
+ completedContainer(container,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
container.getContainerId(),
|
|
|
SchedulerUtils.LOST_CONTAINER),
|
|
@@ -1465,7 +1472,7 @@ public class CapacityScheduler extends
|
|
|
// Remove reservations, if any
|
|
|
RMContainer reservedContainer = node.getReservedContainer();
|
|
|
if (reservedContainer != null) {
|
|
|
- super.completedContainer(reservedContainer,
|
|
|
+ completedContainer(reservedContainer,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
reservedContainer.getContainerId(),
|
|
|
SchedulerUtils.LOST_CONTAINER),
|
|
@@ -1481,8 +1488,13 @@ public class CapacityScheduler extends
|
|
|
|
|
|
@Lock(CapacityScheduler.class)
|
|
|
@Override
|
|
|
- protected synchronized void completedContainerInternal(RMContainer rmContainer,
|
|
|
+ protected synchronized void completedContainer(RMContainer rmContainer,
|
|
|
ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
+ if (rmContainer == null) {
|
|
|
+ LOG.info("Container " + containerStatus.getContainerId() +
|
|
|
+ " completed with event " + event);
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
Container container = rmContainer.getContainer();
|
|
|
|
|
@@ -1584,14 +1596,11 @@ public class CapacityScheduler extends
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void killReservedContainer(RMContainer container) {
|
|
|
+ public void dropContainerReservation(RMContainer container) {
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
- LOG.debug(SchedulerEventType.KILL_RESERVED_CONTAINER + ":"
|
|
|
- + container.toString());
|
|
|
+ LOG.debug("DROP_RESERVATION:" + container.toString());
|
|
|
}
|
|
|
- // TODO: What happens if this is no longer a reserved container, for e.g if
|
|
|
- // the reservation became an allocation.
|
|
|
- super.completedContainer(container,
|
|
|
+ completedContainer(container,
|
|
|
SchedulerUtils.createAbnormalContainerStatus(
|
|
|
container.getContainerId(),
|
|
|
SchedulerUtils.UNRESERVED_CONTAINER),
|
|
@@ -1601,24 +1610,23 @@ public class CapacityScheduler extends
|
|
|
@Override
|
|
|
public void preemptContainer(ApplicationAttemptId aid, RMContainer cont) {
|
|
|
if(LOG.isDebugEnabled()){
|
|
|
- LOG.debug(SchedulerEventType.PREEMPT_CONTAINER + ": appAttempt:"
|
|
|
- + aid.toString() + " container: " + cont.toString());
|
|
|
+ LOG.debug("PREEMPT_CONTAINER: application:" + aid.toString() +
|
|
|
+ " container: " + cont.toString());
|
|
|
}
|
|
|
FiCaSchedulerApp app = getApplicationAttempt(aid);
|
|
|
if (app != null) {
|
|
|
- app.preemptContainer(cont.getContainerId());
|
|
|
+ app.addPreemptContainer(cont.getContainerId());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void killPreemptedContainer(RMContainer cont) {
|
|
|
+ public void killContainer(RMContainer cont) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug(SchedulerEventType.KILL_PREEMPTED_CONTAINER + ": container"
|
|
|
- + cont.toString());
|
|
|
+ LOG.debug("KILL_CONTAINER: container" + cont.toString());
|
|
|
}
|
|
|
- super.completedContainer(cont, SchedulerUtils
|
|
|
- .createPreemptedContainerStatus(cont.getContainerId(),
|
|
|
- SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL);
|
|
|
+ completedContainer(cont, SchedulerUtils.createPreemptedContainerStatus(
|
|
|
+ cont.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER),
|
|
|
+ RMContainerEventType.KILL);
|
|
|
}
|
|
|
|
|
|
@Override
|