|
@@ -18,12 +18,10 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
|
|
|
|
|
|
-import java.io.Serializable;
|
|
|
import java.text.DecimalFormat;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Arrays;
|
|
|
import java.util.Collection;
|
|
|
-import java.util.Comparator;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
@@ -81,7 +79,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
private FairScheduler scheduler;
|
|
|
private FSQueue fsQueue;
|
|
|
private Resource fairShare = Resources.createResource(0, 0);
|
|
|
- private RMContainerComparator comparator = new RMContainerComparator();
|
|
|
|
|
|
// Preemption related variables
|
|
|
private Resource fairshareStarvation = Resources.none();
|
|
@@ -121,7 +118,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
this.resourceWeights = new ResourceWeights();
|
|
|
}
|
|
|
|
|
|
- public ResourceWeights getResourceWeights() {
|
|
|
+ ResourceWeights getResourceWeights() {
|
|
|
return resourceWeights;
|
|
|
}
|
|
|
|
|
@@ -132,7 +129,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
return queue.getMetrics();
|
|
|
}
|
|
|
|
|
|
- public void containerCompleted(RMContainer rmContainer,
|
|
|
+ void containerCompleted(RMContainer rmContainer,
|
|
|
ContainerStatus containerStatus, RMContainerEventType event) {
|
|
|
try {
|
|
|
writeLock.lock();
|
|
@@ -491,7 +488,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
* @param schedulerKey Scheduler Key
|
|
|
* @param level NodeType
|
|
|
*/
|
|
|
- public void resetAllowedLocalityLevel(
|
|
|
+ void resetAllowedLocalityLevel(
|
|
|
SchedulerRequestKey schedulerKey, NodeType level) {
|
|
|
NodeType old;
|
|
|
try {
|
|
@@ -513,45 +510,33 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
}
|
|
|
|
|
|
// Preemption related methods
|
|
|
- public Resource getStarvation() {
|
|
|
+ Resource getStarvation() {
|
|
|
return Resources.add(fairshareStarvation, minshareStarvation);
|
|
|
}
|
|
|
|
|
|
- public void setMinshareStarvation(Resource starvation) {
|
|
|
+ void setMinshareStarvation(Resource starvation) {
|
|
|
this.minshareStarvation = starvation;
|
|
|
}
|
|
|
|
|
|
- public void resetMinshareStarvation() {
|
|
|
+ void resetMinshareStarvation() {
|
|
|
this.minshareStarvation = Resources.none();
|
|
|
}
|
|
|
|
|
|
- public void addPreemption(RMContainer container) {
|
|
|
+ void addPreemption(RMContainer container) {
|
|
|
containersToPreempt.add(container);
|
|
|
Resources.addTo(preemptedResources, container.getAllocatedResource());
|
|
|
}
|
|
|
|
|
|
- public Set<RMContainer> getPreemptionContainers() {
|
|
|
+ Set<RMContainer> getPreemptionContainers() {
|
|
|
return containersToPreempt;
|
|
|
}
|
|
|
|
|
|
|
|
|
- public Resource getPreemptedResources() {
|
|
|
+ private Resource getPreemptedResources() {
|
|
|
return preemptedResources;
|
|
|
}
|
|
|
|
|
|
- public void resetPreemptedResources() {
|
|
|
- preemptedResources = Resources.createResource(0);
|
|
|
- for (RMContainer container : getPreemptionContainers()) {
|
|
|
- Resources.addTo(preemptedResources, container.getAllocatedResource());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public void clearPreemptedResources() {
|
|
|
- preemptedResources.setMemorySize(0);
|
|
|
- preemptedResources.setVirtualCores(0);
|
|
|
- }
|
|
|
-
|
|
|
- public boolean canContainerBePreempted(RMContainer container) {
|
|
|
+ boolean canContainerBePreempted(RMContainer container) {
|
|
|
// Sanity check that the app owns this container
|
|
|
if (!getLiveContainersMap().containsKey(container.getContainerId()) &&
|
|
|
!newlyAllocatedContainers.contains(container)) {
|
|
@@ -585,7 +570,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
* @param schedulerKey Scheduler Key
|
|
|
* @return Container
|
|
|
*/
|
|
|
- public Container createContainer(FSSchedulerNode node, Resource capability,
|
|
|
+ private Container createContainer(FSSchedulerNode node, Resource capability,
|
|
|
SchedulerRequestKey schedulerKey) {
|
|
|
|
|
|
NodeId nodeId = node.getRMNode().getNodeID();
|
|
@@ -593,12 +578,10 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
getApplicationAttemptId(), getNewContainerId());
|
|
|
|
|
|
// Create the container
|
|
|
- Container container = BuilderUtils.newContainer(containerId, nodeId,
|
|
|
+ return BuilderUtils.newContainer(containerId, nodeId,
|
|
|
node.getRMNode().getHttpAddress(), capability,
|
|
|
schedulerKey.getPriority(), null,
|
|
|
schedulerKey.getAllocationRequestId());
|
|
|
-
|
|
|
- return container;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -853,7 +836,8 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
}
|
|
|
|
|
|
Collection<SchedulerRequestKey> keysToTry = (reserved) ?
|
|
|
- Arrays.asList(node.getReservedContainer().getReservedSchedulerKey()) :
|
|
|
+ Collections.singletonList(
|
|
|
+ node.getReservedContainer().getReservedSchedulerKey()) :
|
|
|
getSchedulerKeys();
|
|
|
|
|
|
// For each priority, see if we can schedule a node local, rack local
|
|
@@ -1011,7 +995,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
* Node that the application has an existing reservation on
|
|
|
* @return whether the reservation on the given node is valid.
|
|
|
*/
|
|
|
- public boolean assignReservedContainer(FSSchedulerNode node) {
|
|
|
+ boolean assignReservedContainer(FSSchedulerNode node) {
|
|
|
RMContainer rmContainer = node.getReservedContainer();
|
|
|
SchedulerRequestKey reservedSchedulerKey =
|
|
|
rmContainer.getReservedSchedulerKey();
|
|
@@ -1040,19 +1024,6 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- static class RMContainerComparator implements Comparator<RMContainer>,
|
|
|
- Serializable {
|
|
|
- @Override
|
|
|
- public int compare(RMContainer c1, RMContainer c2) {
|
|
|
- int ret = c1.getContainer().getPriority().compareTo(
|
|
|
- c2.getContainer().getPriority());
|
|
|
- if (ret == 0) {
|
|
|
- return c2.getContainerId().compareTo(c1.getContainerId());
|
|
|
- }
|
|
|
- return ret;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Helper method that computes the extent of fairshare fairshareStarvation.
|
|
|
*/
|
|
@@ -1079,7 +1050,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
return this.fairshareStarvation;
|
|
|
}
|
|
|
|
|
|
- public ResourceRequest getNextResourceRequest() {
|
|
|
+ ResourceRequest getNextResourceRequest() {
|
|
|
return appSchedulingInfo.getNextResourceRequest();
|
|
|
}
|
|
|
|
|
@@ -1197,25 +1168,4 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
|
|
|
updateAMContainerDiagnostics(AMState.INACTIVATED,
|
|
|
diagnosticMessageBldr.toString());
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * Preempt a running container according to the priority
|
|
|
- */
|
|
|
- @Override
|
|
|
- public RMContainer preemptContainer() {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("App " + getName() + " is going to preempt a running " +
|
|
|
- "container");
|
|
|
- }
|
|
|
-
|
|
|
- RMContainer toBePreempted = null;
|
|
|
- for (RMContainer container : getLiveContainers()) {
|
|
|
- if (!getPreemptionContainers().contains(container) &&
|
|
|
- (toBePreempted == null ||
|
|
|
- comparator.compare(toBePreempted, container) > 0)) {
|
|
|
- toBePreempted = container;
|
|
|
- }
|
|
|
- }
|
|
|
- return toBePreempted;
|
|
|
- }
|
|
|
}
|