|
@@ -27,8 +27,8 @@ import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeMap;
|
|
|
-import java.util.TreeSet;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
@@ -82,7 +82,8 @@ public class AppSchedulingInfo {
|
|
|
|
|
|
private Set<String> requestedPartitions = new HashSet<>();
|
|
|
|
|
|
- final Set<Priority> priorities = new TreeSet<>(COMPARATOR);
|
|
|
+ private final ConcurrentSkipListMap<Priority, Integer> priorities =
|
|
|
+ new ConcurrentSkipListMap<>(COMPARATOR);
|
|
|
final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
|
|
|
new ConcurrentHashMap<>();
|
|
|
final Map<NodeId, Map<Priority, Map<ContainerId,
|
|
@@ -233,6 +234,7 @@ public class AppSchedulingInfo {
|
|
|
if (null == requestsOnNodeWithPriority) {
|
|
|
requestsOnNodeWithPriority = new TreeMap<>();
|
|
|
requestsOnNode.put(priority, requestsOnNodeWithPriority);
|
|
|
+ incrementPriorityReference(priority);
|
|
|
}
|
|
|
|
|
|
requestsOnNodeWithPriority.put(containerId, request);
|
|
@@ -247,11 +249,28 @@ public class AppSchedulingInfo {
|
|
|
LOG.debug("Added increase request:" + request.getContainerId()
|
|
|
+ " delta=" + delta);
|
|
|
}
|
|
|
-
|
|
|
- // update priorities
|
|
|
- priorities.add(priority);
|
|
|
}
|
|
|
|
|
|
+ private void incrementPriorityReference(Priority priority) {
|
|
|
+ Integer priorityCount = priorities.get(priority);
|
|
|
+ if (priorityCount == null) {
|
|
|
+ priorities.put(priority, 1);
|
|
|
+ } else {
|
|
|
+ priorities.put(priority, priorityCount + 1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void decrementPriorityReference(Priority priority) {
|
|
|
+ Integer priorityCount = priorities.get(priority);
|
|
|
+ if (priorityCount != null) {
|
|
|
+ if (priorityCount > 1) {
|
|
|
+ priorities.put(priority, priorityCount - 1);
|
|
|
+ } else {
|
|
|
+ priorities.remove(priority);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
|
|
|
ContainerId containerId) {
|
|
|
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
|
@@ -272,6 +291,7 @@ public class AppSchedulingInfo {
|
|
|
// remove hierarchies if it becomes empty
|
|
|
if (requestsOnNodeWithPriority.isEmpty()) {
|
|
|
requestsOnNode.remove(priority);
|
|
|
+ decrementPriorityReference(priority);
|
|
|
}
|
|
|
if (requestsOnNode.isEmpty()) {
|
|
|
containerIncreaseRequestMap.remove(nodeId);
|
|
@@ -337,7 +357,6 @@ public class AppSchedulingInfo {
|
|
|
if (asks == null) {
|
|
|
asks = new ConcurrentHashMap<>();
|
|
|
this.resourceRequestMap.put(priority, asks);
|
|
|
- this.priorities.add(priority);
|
|
|
}
|
|
|
|
|
|
// Increment number of containers if recovering preempted resources
|
|
@@ -356,12 +375,6 @@ public class AppSchedulingInfo {
|
|
|
|
|
|
anyResourcesUpdated = true;
|
|
|
|
|
|
- // Activate application. Metrics activation is done here.
|
|
|
- // TODO: Shouldn't we activate even if numContainers = 0?
|
|
|
- if (request.getNumContainers() > 0) {
|
|
|
- activeUsersManager.activateApplication(user, applicationId);
|
|
|
- }
|
|
|
-
|
|
|
// Update pendingResources
|
|
|
updatePendingResources(lastRequest, request, queue.getMetrics());
|
|
|
}
|
|
@@ -371,14 +384,23 @@ public class AppSchedulingInfo {
|
|
|
|
|
|
private void updatePendingResources(ResourceRequest lastRequest,
|
|
|
ResourceRequest request, QueueMetrics metrics) {
|
|
|
+ int lastRequestContainers =
|
|
|
+ (lastRequest != null) ? lastRequest.getNumContainers() : 0;
|
|
|
if (request.getNumContainers() <= 0) {
|
|
|
+ if (lastRequestContainers >= 0) {
|
|
|
+ decrementPriorityReference(request.getPriority());
|
|
|
+ }
|
|
|
LOG.info("checking for deactivate of application :"
|
|
|
+ this.applicationId);
|
|
|
checkForDeactivation();
|
|
|
+ } else {
|
|
|
+ // Activate application. Metrics activation is done here.
|
|
|
+ if (lastRequestContainers <= 0) {
|
|
|
+ incrementPriorityReference(request.getPriority());
|
|
|
+ activeUsersManager.activateApplication(user, applicationId);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- int lastRequestContainers =
|
|
|
- (lastRequest != null) ? lastRequest.getNumContainers() : 0;
|
|
|
Resource lastRequestCapability =
|
|
|
lastRequest != null ? lastRequest.getCapability() : Resources.none();
|
|
|
metrics.incrPendingResources(user,
|
|
@@ -501,7 +523,7 @@ public class AppSchedulingInfo {
|
|
|
}
|
|
|
|
|
|
public synchronized Collection<Priority> getPriorities() {
|
|
|
- return priorities;
|
|
|
+ return priorities.keySet();
|
|
|
}
|
|
|
|
|
|
public synchronized Map<String, ResourceRequest> getResourceRequests(
|
|
@@ -700,6 +722,7 @@ public class AppSchedulingInfo {
|
|
|
// Do we have any outstanding requests?
|
|
|
// If there is nothing, we need to deactivate this application
|
|
|
if (numOffSwitchContainers == 0) {
|
|
|
+ decrementPriorityReference(offSwitchRequest.getPriority());
|
|
|
checkForDeactivation();
|
|
|
}
|
|
|
|
|
@@ -710,23 +733,7 @@ public class AppSchedulingInfo {
|
|
|
}
|
|
|
|
|
|
private synchronized void checkForDeactivation() {
|
|
|
- boolean deactivate = true;
|
|
|
- for (Priority priority : getPriorities()) {
|
|
|
- ResourceRequest request = getResourceRequest(priority, ResourceRequest.ANY);
|
|
|
- if (request != null) {
|
|
|
- if (request.getNumContainers() > 0) {
|
|
|
- deactivate = false;
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // also we need to check increase request
|
|
|
- if (!deactivate) {
|
|
|
- deactivate = containerIncreaseRequestMap.isEmpty();
|
|
|
- }
|
|
|
-
|
|
|
- if (deactivate) {
|
|
|
+ if (priorities.isEmpty()) {
|
|
|
activeUsersManager.deactivateApplication(user, applicationId);
|
|
|
}
|
|
|
}
|