|
@@ -32,6 +32,7 @@ import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
|
|
|
import org.apache.commons.lang.StringUtils;
|
|
|
+import org.apache.commons.lang.mutable.MutableObject;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
@@ -1242,15 +1243,25 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
RMContainer reservedContainer, boolean needToUnreserve) {
|
|
|
Resource assigned = Resources.none();
|
|
|
|
|
|
+ NodeType requestType = null;
|
|
|
+ MutableObject allocatedContainer = new MutableObject();
|
|
|
// Data-local
|
|
|
ResourceRequest nodeLocalResourceRequest =
|
|
|
application.getResourceRequest(priority, node.getNodeName());
|
|
|
if (nodeLocalResourceRequest != null) {
|
|
|
- assigned =
|
|
|
- assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
|
|
|
- node, application, priority, reservedContainer, needToUnreserve);
|
|
|
- if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
+ requestType = NodeType.NODE_LOCAL;
|
|
|
+ assigned =
|
|
|
+ assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
|
|
|
+ node, application, priority, reservedContainer, needToUnreserve,
|
|
|
+ allocatedContainer);
|
|
|
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
assigned, Resources.none())) {
|
|
|
+
|
|
|
+ //update locality statistics
|
|
|
+ if (allocatedContainer.getValue() != null) {
|
|
|
+ application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
|
|
|
+ requestType);
|
|
|
+ }
|
|
|
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
|
|
|
}
|
|
|
}
|
|
@@ -1262,12 +1273,23 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (!rackLocalResourceRequest.getRelaxLocality()) {
|
|
|
return SKIP_ASSIGNMENT;
|
|
|
}
|
|
|
-
|
|
|
- assigned =
|
|
|
- assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
|
|
|
- node, application, priority, reservedContainer, needToUnreserve);
|
|
|
- if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
+
|
|
|
+ if (requestType != NodeType.NODE_LOCAL) {
|
|
|
+ requestType = NodeType.RACK_LOCAL;
|
|
|
+ }
|
|
|
+
|
|
|
+ assigned =
|
|
|
+ assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
|
|
|
+ node, application, priority, reservedContainer, needToUnreserve,
|
|
|
+ allocatedContainer);
|
|
|
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
|
|
|
assigned, Resources.none())) {
|
|
|
+
|
|
|
+ //update locality statistics
|
|
|
+ if (allocatedContainer.getValue() != null) {
|
|
|
+ application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
|
|
|
+ requestType);
|
|
|
+ }
|
|
|
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
|
|
|
}
|
|
|
}
|
|
@@ -1279,11 +1301,21 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
if (!offSwitchResourceRequest.getRelaxLocality()) {
|
|
|
return SKIP_ASSIGNMENT;
|
|
|
}
|
|
|
+ if (requestType != NodeType.NODE_LOCAL
|
|
|
+ && requestType != NodeType.RACK_LOCAL) {
|
|
|
+ requestType = NodeType.OFF_SWITCH;
|
|
|
+ }
|
|
|
+
|
|
|
+ assigned =
|
|
|
+ assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
|
|
|
+ node, application, priority, reservedContainer, needToUnreserve,
|
|
|
+ allocatedContainer);
|
|
|
|
|
|
- return new CSAssignment(assignOffSwitchContainers(clusterResource,
|
|
|
- offSwitchResourceRequest, node, application, priority,
|
|
|
- reservedContainer, needToUnreserve),
|
|
|
- NodeType.OFF_SWITCH);
|
|
|
+ // update locality statistics
|
|
|
+ if (allocatedContainer.getValue() != null) {
|
|
|
+ application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
|
|
|
+ }
|
|
|
+ return new CSAssignment(assigned, NodeType.OFF_SWITCH);
|
|
|
}
|
|
|
|
|
|
return SKIP_ASSIGNMENT;
|
|
@@ -1370,40 +1402,43 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Resource assignNodeLocalContainers(Resource clusterResource,
|
|
|
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
|
|
|
FiCaSchedulerApp application, Priority priority,
|
|
|
- RMContainer reservedContainer, boolean needToUnreserve) {
|
|
|
- if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
|
|
|
+ RMContainer reservedContainer, boolean needToUnreserve,
|
|
|
+ MutableObject allocatedContainer) {
|
|
|
+ if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
|
|
|
reservedContainer)) {
|
|
|
return assignContainer(clusterResource, node, application, priority,
|
|
|
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
|
|
|
- needToUnreserve);
|
|
|
+ needToUnreserve, allocatedContainer);
|
|
|
}
|
|
|
|
|
|
return Resources.none();
|
|
|
}
|
|
|
|
|
|
- private Resource assignRackLocalContainers(Resource clusterResource,
|
|
|
- ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
|
|
|
- FiCaSchedulerApp application, Priority priority,
|
|
|
- RMContainer reservedContainer, boolean needToUnreserve) {
|
|
|
- if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
|
|
|
+ private Resource assignRackLocalContainers(
|
|
|
+ Resource clusterResource, ResourceRequest rackLocalResourceRequest,
|
|
|
+ FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
|
|
+ RMContainer reservedContainer, boolean needToUnreserve,
|
|
|
+ MutableObject allocatedContainer) {
|
|
|
+ if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
|
|
|
reservedContainer)) {
|
|
|
return assignContainer(clusterResource, node, application, priority,
|
|
|
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
|
|
|
- needToUnreserve);
|
|
|
+ needToUnreserve, allocatedContainer);
|
|
|
}
|
|
|
|
|
|
return Resources.none();
|
|
|
}
|
|
|
|
|
|
- private Resource assignOffSwitchContainers(Resource clusterResource,
|
|
|
- ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
|
|
|
- FiCaSchedulerApp application, Priority priority,
|
|
|
- RMContainer reservedContainer, boolean needToUnreserve) {
|
|
|
- if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
|
|
|
+ private Resource assignOffSwitchContainers(
|
|
|
+ Resource clusterResource, ResourceRequest offSwitchResourceRequest,
|
|
|
+ FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
|
|
|
+ RMContainer reservedContainer, boolean needToUnreserve,
|
|
|
+ MutableObject allocatedContainer) {
|
|
|
+ if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
|
|
|
reservedContainer)) {
|
|
|
return assignContainer(clusterResource, node, application, priority,
|
|
|
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
|
|
|
- needToUnreserve);
|
|
|
+ needToUnreserve, allocatedContainer);
|
|
|
}
|
|
|
|
|
|
return Resources.none();
|
|
@@ -1487,7 +1522,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
|
|
|
FiCaSchedulerApp application, Priority priority,
|
|
|
ResourceRequest request, NodeType type, RMContainer rmContainer,
|
|
|
- boolean needToUnreserve) {
|
|
|
+ boolean needToUnreserve, MutableObject createdContainer) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("assignContainers: node=" + node.getNodeName()
|
|
|
+ " application=" + application.getApplicationId()
|
|
@@ -1592,7 +1627,7 @@ public class LeafQueue extends AbstractCSQueue {
|
|
|
" container=" + container +
|
|
|
" queue=" + this +
|
|
|
" clusterResource=" + clusterResource);
|
|
|
-
|
|
|
+ createdContainer.setValue(allocatedContainer);
|
|
|
return container.getResource();
|
|
|
} else {
|
|
|
// if we are allowed to allocate but this node doesn't have space, reserve it or
|