|
@@ -201,6 +201,22 @@ public class AppSchedulable extends Schedulable {
|
|
|
* Assign a container to this node to facilitate {@code request}. If node does
|
|
|
* not have enough memory, create a reservation. This is called once we are
|
|
|
* sure the particular request should be facilitated by this node.
|
|
|
+ *
|
|
|
+ * @param node
|
|
|
+ * The node to try placing the container on.
|
|
|
+ * @param priority
|
|
|
+ * The requested priority for the container.
|
|
|
+ * @param request
|
|
|
+ * The ResourceRequest we're trying to satisfy.
|
|
|
+ * @param type
|
|
|
+ * The locality of the assignment.
|
|
|
+ * @param reserved
|
|
|
+ * Whether there's already a container reserved for this app on the node.
|
|
|
+ * @return
|
|
|
+ * If an assignment was made, returns the resources allocated to the
|
|
|
+ * container. If a reservation was made, returns
|
|
|
+ * FairScheduler.CONTAINER_RESERVED. If no assignment or reservation was
|
|
|
+ * made, returns an empty resource.
|
|
|
*/
|
|
|
private Resource assignContainer(FSSchedulerNode node,
|
|
|
ResourceRequest request, NodeType type,
|
|
@@ -255,17 +271,6 @@ public class AppSchedulable extends Schedulable {
|
|
|
LOG.debug("Node offered to app: " + getName() + " reserved: " + reserved);
|
|
|
}
|
|
|
|
|
|
- if (reserved) {
|
|
|
- RMContainer rmContainer = node.getReservedContainer();
|
|
|
- Priority priority = rmContainer.getReservedPriority();
|
|
|
-
|
|
|
- // Make sure the application still needs requests at this priority
|
|
|
- if (app.getTotalRequiredResources(priority) == 0) {
|
|
|
- unreserve(priority, node);
|
|
|
- return Resources.none();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
Collection<Priority> prioritiesToTry = (reserved) ?
|
|
|
Arrays.asList(node.getReservedContainer().getReservedPriority()) :
|
|
|
app.getPriorities();
|
|
@@ -338,7 +343,33 @@ public class AppSchedulable extends Schedulable {
|
|
|
return Resources.none();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Called when this application already has an existing reservation on the
|
|
|
+ * given node. Sees whether we can turn the reservation into an allocation.
|
|
|
+ * Also checks whether the application needs the reservation anymore, and
|
|
|
+ * releases it if not.
|
|
|
+ *
|
|
|
+ * @param node
|
|
|
+ * Node that the application has an existing reservation on
|
|
|
+ */
|
|
|
public Resource assignReservedContainer(FSSchedulerNode node) {
|
|
|
+ RMContainer rmContainer = node.getReservedContainer();
|
|
|
+ Priority priority = rmContainer.getReservedPriority();
|
|
|
+
|
|
|
+ // Make sure the application still needs requests at this priority
|
|
|
+ if (app.getTotalRequiredResources(priority) == 0) {
|
|
|
+ unreserve(priority, node);
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Fail early if the reserved container won't fit.
|
|
|
+ // Note that we have an assumption here that there's only one container size
|
|
|
+ // per priority.
|
|
|
+ if (!Resources.fitsIn(node.getReservedContainer().getReservedResource(),
|
|
|
+ node.getAvailableResource())) {
|
|
|
+ return Resources.none();
|
|
|
+ }
|
|
|
+
|
|
|
return assignContainer(node, true);
|
|
|
}
|
|
|
|