|
@@ -21,11 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.reservation;
|
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.MismatchedUserException;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.PlanningException;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.reservation.exceptions.ResourceOverCommitException;
|
|
-import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* This policy enforce a simple physical cluster capacity constraints, by
|
|
* This policy enforce a simple physical cluster capacity constraints, by
|
|
@@ -52,29 +50,21 @@ public class NoOverCommitPolicy implements SharingPolicy {
|
|
+ oldReservation.getUser() + " != " + reservation.getUser());
|
|
+ oldReservation.getUser() + " != " + reservation.getUser());
|
|
}
|
|
}
|
|
|
|
|
|
- long startTime = reservation.getStartTime();
|
|
|
|
- long endTime = reservation.getEndTime();
|
|
|
|
- long step = plan.getStep();
|
|
|
|
|
|
+ RLESparseResourceAllocation available = plan.getAvailableResourceOverTime(
|
|
|
|
+ reservation.getUser(), reservation.getReservationId(),
|
|
|
|
+ reservation.getStartTime(), reservation.getEndTime());
|
|
|
|
|
|
- // for every instant in time, check we are respecting cluster capacity
|
|
|
|
- for (long t = startTime; t < endTime; t += step) {
|
|
|
|
- Resource currExistingAllocTot = plan.getTotalCommittedResources(t);
|
|
|
|
- Resource currNewAlloc = reservation.getResourcesAtTime(t);
|
|
|
|
- Resource currOldAlloc = Resource.newInstance(0, 0);
|
|
|
|
- if (oldReservation != null) {
|
|
|
|
- oldReservation.getResourcesAtTime(t);
|
|
|
|
- }
|
|
|
|
- // check the cluster is never over committed
|
|
|
|
- // currExistingAllocTot + currNewAlloc - currOldAlloc >
|
|
|
|
- // capPlan.getTotalCapacity()
|
|
|
|
- if (Resources.greaterThan(plan.getResourceCalculator(), plan
|
|
|
|
- .getTotalCapacity(), Resources.subtract(
|
|
|
|
- Resources.add(currExistingAllocTot, currNewAlloc), currOldAlloc),
|
|
|
|
- plan.getTotalCapacity())) {
|
|
|
|
- throw new ResourceOverCommitException("Resources at time " + t
|
|
|
|
- + " would be overcommitted by " + "accepting reservation: "
|
|
|
|
- + reservation.getReservationId());
|
|
|
|
- }
|
|
|
|
|
|
+ // test the reservation does not exceed what is available
|
|
|
|
+ try {
|
|
|
|
+ RLESparseResourceAllocation
|
|
|
|
+ .merge(plan.getResourceCalculator(), plan.getTotalCapacity(),
|
|
|
|
+ available, reservation.getResourcesOverTime(),
|
|
|
|
+ RLESparseResourceAllocation.RLEOperator.subtractTestNonNegative,
|
|
|
|
+ reservation.getStartTime(), reservation.getEndTime());
|
|
|
|
+ } catch (PlanningException p) {
|
|
|
|
+ throw new ResourceOverCommitException(
|
|
|
|
+ "Resources at time " + " would be overcommitted by "
|
|
|
|
+ + "accepting reservation: " + reservation.getReservationId());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|