|
@@ -72,7 +72,10 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|
remoteRequestsTable =
|
|
remoteRequestsTable =
|
|
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
|
|
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
|
|
|
|
|
|
- private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
|
|
|
|
|
|
+ // use custom comparator to make sure ResourceRequest objects differing only in
|
|
|
|
+ // numContainers dont end up as duplicates
|
|
|
|
+ private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
|
|
|
|
+ new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
|
|
private final Set<ContainerId> release = new TreeSet<ContainerId>();
|
|
private final Set<ContainerId> release = new TreeSet<ContainerId>();
|
|
|
|
|
|
private boolean nodeBlacklistingEnabled;
|
|
private boolean nodeBlacklistingEnabled;
|
|
@@ -235,7 +238,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|
ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
|
|
ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
|
|
zeroedRequest.setNumContainers(0);
|
|
zeroedRequest.setNumContainers(0);
|
|
// to be sent to RM on next heartbeat
|
|
// to be sent to RM on next heartbeat
|
|
- ask.add(zeroedRequest);
|
|
|
|
|
|
+ addResourceRequestToAsk(zeroedRequest);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
// if all requests were still in ask queue
|
|
// if all requests were still in ask queue
|
|
@@ -320,7 +323,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
|
|
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
|
|
|
|
|
|
// Note this down for next interaction with ResourceManager
|
|
// Note this down for next interaction with ResourceManager
|
|
- ask.add(remoteRequest);
|
|
|
|
|
|
+ addResourceRequestToAsk(remoteRequest);
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("addResourceRequest:" + " applicationId="
|
|
LOG.debug("addResourceRequest:" + " applicationId="
|
|
+ applicationId.getId() + " priority=" + priority.getPriority()
|
|
+ applicationId.getId() + " priority=" + priority.getPriority()
|
|
@@ -353,7 +356,12 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
|
|
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
|
|
}
|
|
}
|
|
|
|
|
|
- remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
|
|
|
|
|
|
+ if(remoteRequest.getNumContainers() > 0) {
|
|
|
|
+ // based on blacklisting comments above we can end up decrementing more
|
|
|
|
+ // than requested. so guard for that.
|
|
|
|
+ remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
|
|
|
|
+ }
|
|
|
|
+
|
|
if (remoteRequest.getNumContainers() == 0) {
|
|
if (remoteRequest.getNumContainers() == 0) {
|
|
reqMap.remove(capability);
|
|
reqMap.remove(capability);
|
|
if (reqMap.size() == 0) {
|
|
if (reqMap.size() == 0) {
|
|
@@ -362,13 +370,12 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|
if (remoteRequests.size() == 0) {
|
|
if (remoteRequests.size() == 0) {
|
|
remoteRequestsTable.remove(priority);
|
|
remoteRequestsTable.remove(priority);
|
|
}
|
|
}
|
|
- //remove from ask if it may have
|
|
|
|
- ask.remove(remoteRequest);
|
|
|
|
- } else {
|
|
|
|
- ask.add(remoteRequest);//this will override the request if ask doesn't
|
|
|
|
- //already have it.
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // send the updated resource request to RM
|
|
|
|
+ // send 0 container count requests also to cancel previous requests
|
|
|
|
+ addResourceRequestToAsk(remoteRequest);
|
|
|
|
+
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.info("AFTER decResourceRequest:" + " applicationId="
|
|
LOG.info("AFTER decResourceRequest:" + " applicationId="
|
|
+ applicationId.getId() + " priority=" + priority.getPriority()
|
|
+ applicationId.getId() + " priority=" + priority.getPriority()
|
|
@@ -376,6 +383,16 @@ public abstract class RMContainerRequestor extends RMCommunicator {
|
|
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
|
|
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
|
|
|
+ // because objects inside the resource map can be deleted ask can end up
|
|
|
|
+ // containing an object that matches new resource object but with different
|
|
|
|
+ // numContainers. So exisintg values must be replaced explicitly
|
|
|
|
+ if(ask.contains(remoteRequest)) {
|
|
|
|
+ ask.remove(remoteRequest);
|
|
|
|
+ }
|
|
|
|
+ ask.add(remoteRequest);
|
|
|
|
+ }
|
|
|
|
|
|
protected void release(ContainerId containerId) {
|
|
protected void release(ContainerId containerId) {
|
|
release.add(containerId);
|
|
release.add(containerId);
|