|
@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
import java.util.TreeSet;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
@@ -64,7 +66,7 @@ public class AppSchedulingInfo {
|
|
|
final Set<Priority> priorities = new TreeSet<Priority>(
|
|
|
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
|
|
|
final Map<Priority, Map<String, ResourceRequest>> requests =
|
|
|
- new HashMap<Priority, Map<String, ResourceRequest>>();
|
|
|
+ new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
|
|
|
private Set<String> blacklist = new HashSet<String>();
|
|
|
|
|
|
//private final ApplicationStore store;
|
|
@@ -159,7 +161,7 @@ public class AppSchedulingInfo {
|
|
|
Map<String, ResourceRequest> asks = this.requests.get(priority);
|
|
|
|
|
|
if (asks == null) {
|
|
|
- asks = new HashMap<String, ResourceRequest>();
|
|
|
+ asks = new ConcurrentHashMap<String, ResourceRequest>();
|
|
|
this.requests.put(priority, asks);
|
|
|
this.priorities.add(priority);
|
|
|
}
|
|
@@ -221,7 +223,7 @@ public class AppSchedulingInfo {
|
|
|
return requests.get(priority);
|
|
|
}
|
|
|
|
|
|
- synchronized public List<ResourceRequest> getAllResourceRequests() {
|
|
|
+ public List<ResourceRequest> getAllResourceRequests() {
|
|
|
List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
|
|
|
for (Map<String, ResourceRequest> r : requests.values()) {
|
|
|
ret.addAll(r.values());
|
|
@@ -300,17 +302,11 @@ public class AppSchedulingInfo {
|
|
|
Priority priority, ResourceRequest nodeLocalRequest, Container container,
|
|
|
List<ResourceRequest> resourceRequests) {
|
|
|
// Update future requirements
|
|
|
- nodeLocalRequest.setNumContainers(nodeLocalRequest.getNumContainers() - 1);
|
|
|
- if (nodeLocalRequest.getNumContainers() == 0) {
|
|
|
- this.requests.get(priority).remove(node.getNodeName());
|
|
|
- }
|
|
|
+ decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
|
|
|
|
|
|
ResourceRequest rackLocalRequest = requests.get(priority).get(
|
|
|
node.getRackName());
|
|
|
- rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
|
|
|
- if (rackLocalRequest.getNumContainers() == 0) {
|
|
|
- this.requests.get(priority).remove(node.getRackName());
|
|
|
- }
|
|
|
+ decResourceRequest(node.getRackName(), priority, rackLocalRequest);
|
|
|
|
|
|
ResourceRequest offRackRequest = requests.get(priority).get(
|
|
|
ResourceRequest.ANY);
|
|
@@ -322,6 +318,14 @@ public class AppSchedulingInfo {
|
|
|
resourceRequests.add(cloneResourceRequest(offRackRequest));
|
|
|
}
|
|
|
|
|
|
+ private void decResourceRequest(String resourceName, Priority priority,
|
|
|
+ ResourceRequest request) {
|
|
|
+ request.setNumContainers(request.getNumContainers() - 1);
|
|
|
+ if (request.getNumContainers() == 0) {
|
|
|
+ requests.get(priority).remove(resourceName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* The {@link ResourceScheduler} is allocating data-local resources to the
|
|
|
* application.
|
|
@@ -333,11 +337,8 @@ public class AppSchedulingInfo {
|
|
|
Priority priority, ResourceRequest rackLocalRequest, Container container,
|
|
|
List<ResourceRequest> resourceRequests) {
|
|
|
// Update future requirements
|
|
|
- rackLocalRequest.setNumContainers(rackLocalRequest.getNumContainers() - 1);
|
|
|
- if (rackLocalRequest.getNumContainers() == 0) {
|
|
|
- this.requests.get(priority).remove(node.getRackName());
|
|
|
- }
|
|
|
-
|
|
|
+ decResourceRequest(node.getRackName(), priority, rackLocalRequest);
|
|
|
+
|
|
|
ResourceRequest offRackRequest = requests.get(priority).get(
|
|
|
ResourceRequest.ANY);
|
|
|
decrementOutstanding(offRackRequest);
|