|
@@ -26,8 +26,8 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.TreeSet;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
@@ -59,7 +59,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
public class AppSchedulingInfo {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(AppSchedulingInfo.class);
|
|
|
- private static final int EPOCH_BIT_SHIFT = 40;
|
|
|
|
|
|
private final ApplicationId applicationId;
|
|
|
private final ApplicationAttemptId applicationAttemptId;
|
|
@@ -79,7 +78,8 @@ public class AppSchedulingInfo {
|
|
|
|
|
|
private Set<String> requestedPartitions = new HashSet<>();
|
|
|
|
|
|
- final Set<SchedulerRequestKey> schedulerKeys = new TreeSet<>();
|
|
|
+ private final ConcurrentSkipListMap<SchedulerRequestKey, Integer>
|
|
|
+ schedulerKeys = new ConcurrentSkipListMap<>();
|
|
|
final Map<SchedulerRequestKey, Map<String, ResourceRequest>>
|
|
|
resourceRequestMap = new ConcurrentHashMap<>();
|
|
|
final Map<NodeId, Map<SchedulerRequestKey, Map<ContainerId,
|
|
@@ -236,6 +236,7 @@ public class AppSchedulingInfo {
|
|
|
if (null == requestsOnNodeWithPriority) {
|
|
|
requestsOnNodeWithPriority = new TreeMap<>();
|
|
|
requestsOnNode.put(schedulerKey, requestsOnNodeWithPriority);
|
|
|
+ incrementSchedulerKeyReference(schedulerKey);
|
|
|
}
|
|
|
|
|
|
requestsOnNodeWithPriority.put(containerId, request);
|
|
@@ -250,11 +251,30 @@ public class AppSchedulingInfo {
|
|
|
LOG.debug("Added increase request:" + request.getContainerId()
|
|
|
+ " delta=" + delta);
|
|
|
}
|
|
|
-
|
|
|
- // update Scheduler Keys
|
|
|
- schedulerKeys.add(schedulerKey);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+ private void incrementSchedulerKeyReference(
|
|
|
+ SchedulerRequestKey schedulerKey) {
|
|
|
+ Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
|
|
|
+ if (schedulerKeyCount == null) {
|
|
|
+ schedulerKeys.put(schedulerKey, 1);
|
|
|
+ } else {
|
|
|
+ schedulerKeys.put(schedulerKey, schedulerKeyCount + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void decrementSchedulerKeyReference(
|
|
|
+ SchedulerRequestKey schedulerKey) {
|
|
|
+ Integer schedulerKeyCount = schedulerKeys.get(schedulerKey);
|
|
|
+ if (schedulerKeyCount != null) {
|
|
|
+ if (schedulerKeyCount > 1) {
|
|
|
+ schedulerKeys.put(schedulerKey, schedulerKeyCount - 1);
|
|
|
+ } else {
|
|
|
+ schedulerKeys.remove(schedulerKey);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public synchronized boolean removeIncreaseRequest(NodeId nodeId,
|
|
|
SchedulerRequestKey schedulerKey, ContainerId containerId) {
|
|
|
Map<SchedulerRequestKey, Map<ContainerId, SchedContainerChangeRequest>>
|
|
@@ -275,6 +295,7 @@ public class AppSchedulingInfo {
|
|
|
// remove hierarchies if it becomes empty
|
|
|
if (requestsOnNodeWithPriority.isEmpty()) {
|
|
|
requestsOnNode.remove(schedulerKey);
|
|
|
+ decrementSchedulerKeyReference(schedulerKey);
|
|
|
}
|
|
|
if (requestsOnNode.isEmpty()) {
|
|
|
containerIncreaseRequestMap.remove(nodeId);
|
|
@@ -341,7 +362,6 @@ public class AppSchedulingInfo {
|
|
|
if (asks == null) {
|
|
|
asks = new ConcurrentHashMap<>();
|
|
|
this.resourceRequestMap.put(schedulerKey, asks);
|
|
|
- this.schedulerKeys.add(schedulerKey);
|
|
|
}
|
|
|
|
|
|
// Increment number of containers if recovering preempted resources
|
|
@@ -360,29 +380,34 @@ public class AppSchedulingInfo {
|
|
|
|
|
|
anyResourcesUpdated = true;
|
|
|
|
|
|
- // Activate application. Metrics activation is done here.
|
|
|
- // TODO: Shouldn't we activate even if numContainers = 0?
|
|
|
- if (request.getNumContainers() > 0) {
|
|
|
- activeUsersManager.activateApplication(user, applicationId);
|
|
|
- }
|
|
|
-
|
|
|
// Update pendingResources
|
|
|
- updatePendingResources(lastRequest, request, queue.getMetrics());
|
|
|
+ updatePendingResources(lastRequest, request, schedulerKey,
|
|
|
+ queue.getMetrics());
|
|
|
}
|
|
|
}
|
|
|
return anyResourcesUpdated;
|
|
|
}
|
|
|
|
|
|
private void updatePendingResources(ResourceRequest lastRequest,
|
|
|
- ResourceRequest request, QueueMetrics metrics) {
|
|
|
+ ResourceRequest request, SchedulerRequestKey schedulerKey,
|
|
|
+ QueueMetrics metrics) {
|
|
|
+ int lastRequestContainers =
|
|
|
+ (lastRequest != null) ? lastRequest.getNumContainers() : 0;
|
|
|
if (request.getNumContainers() <= 0) {
|
|
|
+ if (lastRequestContainers >= 0) {
|
|
|
+ decrementSchedulerKeyReference(schedulerKey);
|
|
|
+ }
|
|
|
LOG.info("checking for deactivate of application :"
|
|
|
+ this.applicationId);
|
|
|
checkForDeactivation();
|
|
|
+ } else {
|
|
|
+ // Activate application. Metrics activation is done here.
|
|
|
+ if (lastRequestContainers <= 0) {
|
|
|
+ incrementSchedulerKeyReference(schedulerKey);
|
|
|
+ activeUsersManager.activateApplication(user, applicationId);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- int lastRequestContainers =
|
|
|
- (lastRequest != null) ? lastRequest.getNumContainers() : 0;
|
|
|
Resource lastRequestCapability =
|
|
|
lastRequest != null ? lastRequest.getCapability() : Resources.none();
|
|
|
metrics.incrPendingResources(user,
|
|
@@ -505,7 +530,7 @@ public class AppSchedulingInfo {
|
|
|
}
|
|
|
|
|
|
public synchronized Collection<SchedulerRequestKey> getSchedulerKeys() {
|
|
|
- return schedulerKeys;
|
|
|
+ return schedulerKeys.keySet();
|
|
|
}
|
|
|
|
|
|
public synchronized Map<String, ResourceRequest> getResourceRequests(
|
|
@@ -617,7 +642,7 @@ public class AppSchedulingInfo {
|
|
|
} else if (type == NodeType.RACK_LOCAL) {
|
|
|
allocateRackLocal(node, schedulerKey, request, resourceRequests);
|
|
|
} else {
|
|
|
- allocateOffSwitch(request, resourceRequests);
|
|
|
+ allocateOffSwitch(request, resourceRequests, schedulerKey);
|
|
|
}
|
|
|
QueueMetrics metrics = queue.getMetrics();
|
|
|
if (pending) {
|
|
@@ -656,7 +681,7 @@ public class AppSchedulingInfo {
|
|
|
|
|
|
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
|
|
|
ResourceRequest.ANY);
|
|
|
- decrementOutstanding(offRackRequest);
|
|
|
+ decrementOutstanding(offRackRequest, schedulerKey);
|
|
|
|
|
|
// Update cloned NodeLocal, RackLocal and OffRack requests for recovery
|
|
|
resourceRequests.add(cloneResourceRequest(nodeLocalRequest));
|
|
@@ -684,7 +709,7 @@ public class AppSchedulingInfo {
|
|
|
|
|
|
ResourceRequest offRackRequest = resourceRequestMap.get(schedulerKey).get(
|
|
|
ResourceRequest.ANY);
|
|
|
- decrementOutstanding(offRackRequest);
|
|
|
+ decrementOutstanding(offRackRequest, schedulerKey);
|
|
|
|
|
|
// Update cloned RackLocal and OffRack requests for recovery
|
|
|
resourceRequests.add(cloneResourceRequest(rackLocalRequest));
|
|
@@ -696,15 +721,16 @@ public class AppSchedulingInfo {
|
|
|
* application.
|
|
|
*/
|
|
|
private synchronized void allocateOffSwitch(
|
|
|
- ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests) {
|
|
|
+ ResourceRequest offSwitchRequest, List<ResourceRequest> resourceRequests,
|
|
|
+ SchedulerRequestKey schedulerKey) {
|
|
|
// Update future requirements
|
|
|
- decrementOutstanding(offSwitchRequest);
|
|
|
+ decrementOutstanding(offSwitchRequest, schedulerKey);
|
|
|
// Update cloned OffRack requests for recovery
|
|
|
resourceRequests.add(cloneResourceRequest(offSwitchRequest));
|
|
|
}
|
|
|
|
|
|
private synchronized void decrementOutstanding(
|
|
|
- ResourceRequest offSwitchRequest) {
|
|
|
+ ResourceRequest offSwitchRequest, SchedulerRequestKey schedulerKey) {
|
|
|
int numOffSwitchContainers = offSwitchRequest.getNumContainers() - 1;
|
|
|
|
|
|
// Do not remove ANY
|
|
@@ -713,6 +739,7 @@ public class AppSchedulingInfo {
|
|
|
// Do we have any outstanding requests?
|
|
|
// If there is nothing, we need to deactivate this application
|
|
|
if (numOffSwitchContainers == 0) {
|
|
|
+ decrementSchedulerKeyReference(schedulerKey);
|
|
|
checkForDeactivation();
|
|
|
}
|
|
|
|
|
@@ -723,24 +750,7 @@ public class AppSchedulingInfo {
|
|
|
}
|
|
|
|
|
|
private synchronized void checkForDeactivation() {
|
|
|
- boolean deactivate = true;
|
|
|
- for (SchedulerRequestKey schedulerKey : getSchedulerKeys()) {
|
|
|
- ResourceRequest request =
|
|
|
- getResourceRequest(schedulerKey, ResourceRequest.ANY);
|
|
|
- if (request != null) {
|
|
|
- if (request.getNumContainers() > 0) {
|
|
|
- deactivate = false;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // also we need to check increase request
|
|
|
- if (!deactivate) {
|
|
|
- deactivate = containerIncreaseRequestMap.isEmpty();
|
|
|
- }
|
|
|
-
|
|
|
- if (deactivate) {
|
|
|
+ if (schedulerKeys.isEmpty()) {
|
|
|
activeUsersManager.deactivateApplication(user, applicationId);
|
|
|
}
|
|
|
}
|