|
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resource.DefaultResourceCalculator;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
|
@@ -45,6 +46,9 @@ import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
@Private
|
|
@Private
|
|
@Unstable
|
|
@Unstable
|
|
public class AppSchedulable extends Schedulable {
|
|
public class AppSchedulable extends Schedulable {
|
|
|
|
+ private static final DefaultResourceCalculator RESOURCE_CALCULATOR
|
|
|
|
+ = new DefaultResourceCalculator();
|
|
|
|
+
|
|
private FairScheduler scheduler;
|
|
private FairScheduler scheduler;
|
|
private FSSchedulerApp app;
|
|
private FSSchedulerApp app;
|
|
private Resource demand = Resources.createResource(0);
|
|
private Resource demand = Resources.createResource(0);
|
|
@@ -180,15 +184,15 @@ public class AppSchedulable extends Schedulable {
|
|
* update relevant bookeeping. This dispatches ro relevant handlers
|
|
* update relevant bookeeping. This dispatches ro relevant handlers
|
|
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
|
|
* in the {@link FSSchedulerNode} and {@link SchedulerApp} classes.
|
|
*/
|
|
*/
|
|
- private void reserve(FSSchedulerApp application, Priority priority,
|
|
|
|
- FSSchedulerNode node, Container container, boolean alreadyReserved) {
|
|
|
|
|
|
+ private void reserve(Priority priority, FSSchedulerNode node,
|
|
|
|
+ Container container, boolean alreadyReserved) {
|
|
LOG.info("Making reservation: node=" + node.getHostName() +
|
|
LOG.info("Making reservation: node=" + node.getHostName() +
|
|
" app_id=" + app.getApplicationId());
|
|
" app_id=" + app.getApplicationId());
|
|
if (!alreadyReserved) {
|
|
if (!alreadyReserved) {
|
|
- getMetrics().reserveResource(application.getUser(), container.getResource());
|
|
|
|
- RMContainer rmContainer = application.reserve(node, priority, null,
|
|
|
|
|
|
+ getMetrics().reserveResource(app.getUser(), container.getResource());
|
|
|
|
+ RMContainer rmContainer = app.reserve(node, priority, null,
|
|
container);
|
|
container);
|
|
- node.reserveResource(application, priority, rmContainer);
|
|
|
|
|
|
+ node.reserveResource(app, priority, rmContainer);
|
|
getMetrics().reserveResource(app.getUser(),
|
|
getMetrics().reserveResource(app.getUser(),
|
|
container.getResource());
|
|
container.getResource());
|
|
scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
|
|
scheduler.getRootQueueMetrics().reserveResource(app.getUser(),
|
|
@@ -197,25 +201,24 @@ public class AppSchedulable extends Schedulable {
|
|
|
|
|
|
else {
|
|
else {
|
|
RMContainer rmContainer = node.getReservedContainer();
|
|
RMContainer rmContainer = node.getReservedContainer();
|
|
- application.reserve(node, priority, rmContainer, container);
|
|
|
|
- node.reserveResource(application, priority, rmContainer);
|
|
|
|
|
|
+ app.reserve(node, priority, rmContainer, container);
|
|
|
|
+ node.reserveResource(app, priority, rmContainer);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Remove the reservation on {@code node} for {@ application} at the given
|
|
|
|
|
|
+ * Remove the reservation on {@code node} at the given
|
|
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
|
|
* {@link Priority}. This dispatches to the SchedulerApp and SchedulerNode
|
|
* handlers for an unreservation.
|
|
* handlers for an unreservation.
|
|
*/
|
|
*/
|
|
- private void unreserve(FSSchedulerApp application, Priority priority,
|
|
|
|
- FSSchedulerNode node) {
|
|
|
|
|
|
+ public void unreserve(Priority priority, FSSchedulerNode node) {
|
|
RMContainer rmContainer = node.getReservedContainer();
|
|
RMContainer rmContainer = node.getReservedContainer();
|
|
- application.unreserve(node, priority);
|
|
|
|
- node.unreserveResource(application);
|
|
|
|
|
|
+ app.unreserve(node, priority);
|
|
|
|
+ node.unreserveResource(app);
|
|
getMetrics().unreserveResource(
|
|
getMetrics().unreserveResource(
|
|
- application.getUser(), rmContainer.getContainer().getResource());
|
|
|
|
|
|
+ app.getUser(), rmContainer.getContainer().getResource());
|
|
scheduler.getRootQueueMetrics().unreserveResource(
|
|
scheduler.getRootQueueMetrics().unreserveResource(
|
|
- application.getUser(), rmContainer.getContainer().getResource());
|
|
|
|
|
|
+ app.getUser(), rmContainer.getContainer().getResource());
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -224,8 +227,8 @@ public class AppSchedulable extends Schedulable {
|
|
* sure the particular request should be facilitated by this node.
|
|
* sure the particular request should be facilitated by this node.
|
|
*/
|
|
*/
|
|
private Resource assignContainer(FSSchedulerNode node,
|
|
private Resource assignContainer(FSSchedulerNode node,
|
|
- FSSchedulerApp application, Priority priority,
|
|
|
|
- ResourceRequest request, NodeType type, boolean reserved) {
|
|
|
|
|
|
+ Priority priority, ResourceRequest request, NodeType type,
|
|
|
|
+ boolean reserved) {
|
|
|
|
|
|
// How much does this request need?
|
|
// How much does this request need?
|
|
Resource capability = request.getCapability();
|
|
Resource capability = request.getCapability();
|
|
@@ -237,7 +240,7 @@ public class AppSchedulable extends Schedulable {
|
|
if (reserved) {
|
|
if (reserved) {
|
|
container = node.getReservedContainer().getContainer();
|
|
container = node.getReservedContainer().getContainer();
|
|
} else {
|
|
} else {
|
|
- container = createContainer(application, node, capability, priority);
|
|
|
|
|
|
+ container = createContainer(app, node, capability, priority);
|
|
}
|
|
}
|
|
|
|
|
|
// Can we allocate a container on this node?
|
|
// Can we allocate a container on this node?
|
|
@@ -247,9 +250,12 @@ public class AppSchedulable extends Schedulable {
|
|
if (availableContainers > 0) {
|
|
if (availableContainers > 0) {
|
|
// Inform the application of the new container for this request
|
|
// Inform the application of the new container for this request
|
|
RMContainer allocatedContainer =
|
|
RMContainer allocatedContainer =
|
|
- application.allocate(type, node, priority, request, container);
|
|
|
|
|
|
+ app.allocate(type, node, priority, request, container);
|
|
if (allocatedContainer == null) {
|
|
if (allocatedContainer == null) {
|
|
// Did the application need this resource?
|
|
// Did the application need this resource?
|
|
|
|
+ if (reserved) {
|
|
|
|
+ unreserve(priority, node);
|
|
|
|
+ }
|
|
return Resources.none();
|
|
return Resources.none();
|
|
}
|
|
}
|
|
else {
|
|
else {
|
|
@@ -262,17 +268,17 @@ public class AppSchedulable extends Schedulable {
|
|
|
|
|
|
// If we had previously made a reservation, delete it
|
|
// If we had previously made a reservation, delete it
|
|
if (reserved) {
|
|
if (reserved) {
|
|
- unreserve(application, priority, node);
|
|
|
|
|
|
+ unreserve(priority, node);
|
|
}
|
|
}
|
|
|
|
|
|
// Inform the node
|
|
// Inform the node
|
|
- node.allocateContainer(application.getApplicationId(),
|
|
|
|
|
|
+ node.allocateContainer(app.getApplicationId(),
|
|
allocatedContainer);
|
|
allocatedContainer);
|
|
|
|
|
|
return container.getResource();
|
|
return container.getResource();
|
|
} else {
|
|
} else {
|
|
// The desired container won't fit here, so reserve
|
|
// The desired container won't fit here, so reserve
|
|
- reserve(application, priority, node, container, reserved);
|
|
|
|
|
|
+ reserve(priority, node, container, reserved);
|
|
|
|
|
|
return FairScheduler.CONTAINER_RESERVED;
|
|
return FairScheduler.CONTAINER_RESERVED;
|
|
}
|
|
}
|
|
@@ -287,7 +293,7 @@ public class AppSchedulable extends Schedulable {
|
|
|
|
|
|
// Make sure the application still needs requests at this priority
|
|
// Make sure the application still needs requests at this priority
|
|
if (app.getTotalRequiredResources(priority) == 0) {
|
|
if (app.getTotalRequiredResources(priority) == 0) {
|
|
- unreserve(app, priority, node);
|
|
|
|
|
|
+ unreserve(priority, node);
|
|
return Resources.none();
|
|
return Resources.none();
|
|
}
|
|
}
|
|
} else {
|
|
} else {
|
|
@@ -304,7 +310,8 @@ public class AppSchedulable extends Schedulable {
|
|
// (not scheduled) in order to promote better locality.
|
|
// (not scheduled) in order to promote better locality.
|
|
synchronized (app) {
|
|
synchronized (app) {
|
|
for (Priority priority : prioritiesToTry) {
|
|
for (Priority priority : prioritiesToTry) {
|
|
- if (app.getTotalRequiredResources(priority) <= 0) {
|
|
|
|
|
|
+ if (app.getTotalRequiredResources(priority) <= 0 ||
|
|
|
|
+ !hasContainerForNode(priority, node)) {
|
|
continue;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -321,14 +328,14 @@ public class AppSchedulable extends Schedulable {
|
|
|
|
|
|
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
|
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
|
&& localRequest != null && localRequest.getNumContainers() != 0) {
|
|
&& localRequest != null && localRequest.getNumContainers() != 0) {
|
|
- return assignContainer(node, app, priority,
|
|
|
|
|
|
+ return assignContainer(node, priority,
|
|
localRequest, NodeType.NODE_LOCAL, reserved);
|
|
localRequest, NodeType.NODE_LOCAL, reserved);
|
|
}
|
|
}
|
|
|
|
|
|
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
|
if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
|
|
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
|
|
&& (allowedLocality.equals(NodeType.RACK_LOCAL) ||
|
|
allowedLocality.equals(NodeType.OFF_SWITCH))) {
|
|
allowedLocality.equals(NodeType.OFF_SWITCH))) {
|
|
- return assignContainer(node, app, priority, rackLocalRequest,
|
|
|
|
|
|
+ return assignContainer(node, priority, rackLocalRequest,
|
|
NodeType.RACK_LOCAL, reserved);
|
|
NodeType.RACK_LOCAL, reserved);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -336,7 +343,7 @@ public class AppSchedulable extends Schedulable {
|
|
ResourceRequest.ANY);
|
|
ResourceRequest.ANY);
|
|
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
|
|
if (offSwitchRequest != null && offSwitchRequest.getNumContainers() != 0
|
|
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
|
|
&& allowedLocality.equals(NodeType.OFF_SWITCH)) {
|
|
- return assignContainer(node, app, priority, offSwitchRequest,
|
|
|
|
|
|
+ return assignContainer(node, priority, offSwitchRequest,
|
|
NodeType.OFF_SWITCH, reserved);
|
|
NodeType.OFF_SWITCH, reserved);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -352,4 +359,16 @@ public class AppSchedulable extends Schedulable {
|
|
public Resource assignContainer(FSSchedulerNode node) {
|
|
public Resource assignContainer(FSSchedulerNode node) {
|
|
return assignContainer(node, false);
|
|
return assignContainer(node, false);
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Whether this app has containers requests that could be satisfied on the
|
|
|
|
+ * given node, if the node had full space.
|
|
|
|
+ */
|
|
|
|
+ public boolean hasContainerForNode(Priority prio, FSSchedulerNode node) {
|
|
|
|
+ // TODO: add checks stuff about node specific scheduling here
|
|
|
|
+ ResourceRequest request = app.getResourceRequest(prio, ResourceRequest.ANY);
|
|
|
|
+ return request.getNumContainers() > 0 &&
|
|
|
|
+ Resources.lessThanOrEqual(RESOURCE_CALCULATOR, null,
|
|
|
|
+ request.getCapability(), node.getRMNode().getTotalCapability());
|
|
|
|
+ }
|
|
}
|
|
}
|