|
@@ -62,7 +62,6 @@ import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.util.RackResolver;
|
|
|
|
|
@@ -703,7 +702,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
addContainerReq(req);
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
+ // this method will change the list of allocatedContainers.
|
|
|
private void assign(List<Container> allocatedContainers) {
|
|
|
Iterator<Container> it = allocatedContainers.iterator();
|
|
|
LOG.info("Got allocated containers " + allocatedContainers.size());
|
|
@@ -744,85 +743,97 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
+ reduces.isEmpty());
|
|
|
isAssignable = false;
|
|
|
}
|
|
|
- }
|
|
|
+ } else {
|
|
|
+ LOG.warn("Container allocated at unwanted priority: " + priority +
|
|
|
+ ". Returning to RM...");
|
|
|
+ isAssignable = false;
|
|
|
+ }
|
|
|
|
|
|
- boolean blackListed = false;
|
|
|
- ContainerRequest assigned = null;
|
|
|
+ if(!isAssignable) {
|
|
|
+ // release container if we could not assign it
|
|
|
+ containerNotAssigned(allocated);
|
|
|
+ it.remove();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- ContainerId allocatedContainerId = allocated.getId();
|
|
|
- if (isAssignable) {
|
|
|
- // do not assign if allocated container is on a
|
|
|
- // blacklisted host
|
|
|
- String allocatedHost = allocated.getNodeId().getHost();
|
|
|
- blackListed = isNodeBlacklisted(allocatedHost);
|
|
|
- if (blackListed) {
|
|
|
- // we need to request for a new container
|
|
|
- // and release the current one
|
|
|
- LOG.info("Got allocated container on a blacklisted "
|
|
|
- + " host "+allocatedHost
|
|
|
- +". Releasing container " + allocated);
|
|
|
+ // do not assign if allocated container is on a
|
|
|
+ // blacklisted host
|
|
|
+ String allocatedHost = allocated.getNodeId().getHost();
|
|
|
+ if (isNodeBlacklisted(allocatedHost)) {
|
|
|
+ // we need to request for a new container
|
|
|
+ // and release the current one
|
|
|
+ LOG.info("Got allocated container on a blacklisted "
|
|
|
+ + " host "+allocatedHost
|
|
|
+ +". Releasing container " + allocated);
|
|
|
|
|
|
- // find the request matching this allocated container
|
|
|
- // and replace it with a new one
|
|
|
- ContainerRequest toBeReplacedReq =
|
|
|
- getContainerReqToReplace(allocated);
|
|
|
- if (toBeReplacedReq != null) {
|
|
|
- LOG.info("Placing a new container request for task attempt "
|
|
|
- + toBeReplacedReq.attemptID);
|
|
|
- ContainerRequest newReq =
|
|
|
- getFilteredContainerRequest(toBeReplacedReq);
|
|
|
- decContainerReq(toBeReplacedReq);
|
|
|
- if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
|
|
|
- TaskType.MAP) {
|
|
|
- maps.put(newReq.attemptID, newReq);
|
|
|
- }
|
|
|
- else {
|
|
|
- reduces.put(newReq.attemptID, newReq);
|
|
|
- }
|
|
|
- addContainerReq(newReq);
|
|
|
+ // find the request matching this allocated container
|
|
|
+ // and replace it with a new one
|
|
|
+ ContainerRequest toBeReplacedReq =
|
|
|
+ getContainerReqToReplace(allocated);
|
|
|
+ if (toBeReplacedReq != null) {
|
|
|
+ LOG.info("Placing a new container request for task attempt "
|
|
|
+ + toBeReplacedReq.attemptID);
|
|
|
+ ContainerRequest newReq =
|
|
|
+ getFilteredContainerRequest(toBeReplacedReq);
|
|
|
+ decContainerReq(toBeReplacedReq);
|
|
|
+ if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
|
|
|
+ TaskType.MAP) {
|
|
|
+ maps.put(newReq.attemptID, newReq);
|
|
|
}
|
|
|
else {
|
|
|
- LOG.info("Could not map allocated container to a valid request."
|
|
|
- + " Releasing allocated container " + allocated);
|
|
|
+ reduces.put(newReq.attemptID, newReq);
|
|
|
}
|
|
|
+ addContainerReq(newReq);
|
|
|
}
|
|
|
else {
|
|
|
- assigned = assign(allocated);
|
|
|
- if (assigned != null) {
|
|
|
- // Update resource requests
|
|
|
- decContainerReq(assigned);
|
|
|
+ LOG.info("Could not map allocated container to a valid request."
|
|
|
+ + " Releasing allocated container " + allocated);
|
|
|
+ }
|
|
|
+
|
|
|
+ // release container if we could not assign it
|
|
|
+ containerNotAssigned(allocated);
|
|
|
+ it.remove();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // send the container-assigned event to task attempt
|
|
|
- eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
|
|
- assigned.attemptID, allocated, applicationACLs));
|
|
|
+ assignContainers(allocatedContainers);
|
|
|
+
|
|
|
+ // release container if we could not assign it
|
|
|
+ it = allocatedContainers.iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ Container allocated = it.next();
|
|
|
+ LOG.info("Releasing unassigned and invalid container "
|
|
|
+ + allocated + ". RM may have assignment issues");
|
|
|
+ containerNotAssigned(allocated);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void containerAssigned(Container allocated,
|
|
|
+ ContainerRequest assigned) {
|
|
|
+ // Update resource requests
|
|
|
+ decContainerReq(assigned);
|
|
|
|
|
|
- assignedRequests.add(allocatedContainerId, assigned.attemptID);
|
|
|
+ // send the container-assigned event to task attempt
|
|
|
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
|
|
+ assigned.attemptID, allocated, applicationACLs));
|
|
|
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.info("Assigned container (" + allocated + ") "
|
|
|
- + " to task " + assigned.attemptID + " on node "
|
|
|
- + allocated.getNodeId().toString());
|
|
|
- }
|
|
|
- }
|
|
|
- else {
|
|
|
- //not assigned to any request, release the container
|
|
|
- LOG.info("Releasing unassigned and invalid container "
|
|
|
- + allocated + ". RM has gone crazy, someone go look!"
|
|
|
- + " Hey RM, if you are so rich, go donate to non-profits!");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // release container if it was blacklisted
|
|
|
- // or if we could not assign it
|
|
|
- if (blackListed || assigned == null) {
|
|
|
- containersReleased++;
|
|
|
- release(allocatedContainerId);
|
|
|
- }
|
|
|
+ assignedRequests.add(allocated.getId(), assigned.attemptID);
|
|
|
+
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.info("Assigned container (" + allocated + ") "
|
|
|
+ + " to task " + assigned.attemptID + " on node "
|
|
|
+ + allocated.getNodeId().toString());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private ContainerRequest assign(Container allocated) {
|
|
|
+ private void containerNotAssigned(Container allocated) {
|
|
|
+ containersReleased++;
|
|
|
+ release(allocated.getId());
|
|
|
+ }
|
|
|
+
|
|
|
+ private ContainerRequest assignWithoutLocality(Container allocated) {
|
|
|
ContainerRequest assigned = null;
|
|
|
|
|
|
Priority priority = allocated.getPriority();
|
|
@@ -834,18 +845,24 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
LOG.debug("Assigning container " + allocated + " to reduce");
|
|
|
}
|
|
|
assigned = assignToReduce(allocated);
|
|
|
- } else if (PRIORITY_MAP.equals(priority)) {
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Assigning container " + allocated + " to map");
|
|
|
- }
|
|
|
- assigned = assignToMap(allocated);
|
|
|
- } else {
|
|
|
- LOG.warn("Container allocated at unwanted priority: " + priority +
|
|
|
- ". Returning to RM...");
|
|
|
}
|
|
|
|
|
|
return assigned;
|
|
|
}
|
|
|
+
|
|
|
+ private void assignContainers(List<Container> allocatedContainers) {
|
|
|
+ Iterator<Container> it = allocatedContainers.iterator();
|
|
|
+ while (it.hasNext()) {
|
|
|
+ Container allocated = it.next();
|
|
|
+ ContainerRequest assigned = assignWithoutLocality(allocated);
|
|
|
+ if (assigned != null) {
|
|
|
+ containerAssigned(allocated, assigned);
|
|
|
+ it.remove();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ assignMapsWithLocality(allocatedContainers);
|
|
|
+ }
|
|
|
|
|
|
private ContainerRequest getContainerReqToReplace(Container allocated) {
|
|
|
LOG.info("Finding containerReq for allocated container: " + allocated);
|
|
@@ -916,11 +933,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
- private ContainerRequest assignToMap(Container allocated) {
|
|
|
- //try to assign to maps if present
|
|
|
- //first by host, then by rack, followed by *
|
|
|
- ContainerRequest assigned = null;
|
|
|
- while (assigned == null && maps.size() > 0) {
|
|
|
+ private void assignMapsWithLocality(List<Container> allocatedContainers) {
|
|
|
+ // try to assign to all nodes first to match node local
|
|
|
+ Iterator<Container> it = allocatedContainers.iterator();
|
|
|
+ while(it.hasNext() && maps.size() > 0){
|
|
|
+ Container allocated = it.next();
|
|
|
+ Priority priority = allocated.getPriority();
|
|
|
+ assert PRIORITY_MAP.equals(priority);
|
|
|
+ // "if (maps.containsKey(tId))" below should be almost always true.
|
|
|
+ // hence this while loop would almost always have O(1) complexity
|
|
|
String host = allocated.getNodeId().getHost();
|
|
|
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
|
|
|
while (list != null && list.size() > 0) {
|
|
@@ -929,7 +950,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
TaskAttemptId tId = list.removeFirst();
|
|
|
if (maps.containsKey(tId)) {
|
|
|
- assigned = maps.remove(tId);
|
|
|
+ ContainerRequest assigned = maps.remove(tId);
|
|
|
+ containerAssigned(allocated, assigned);
|
|
|
+ it.remove();
|
|
|
JobCounterUpdateEvent jce =
|
|
|
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
|
|
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
|
|
@@ -941,39 +964,56 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- if (assigned == null) {
|
|
|
- String rack = RackResolver.resolve(host).getNetworkLocation();
|
|
|
- list = mapsRackMapping.get(rack);
|
|
|
- while (list != null && list.size() > 0) {
|
|
|
- TaskAttemptId tId = list.removeFirst();
|
|
|
- if (maps.containsKey(tId)) {
|
|
|
- assigned = maps.remove(tId);
|
|
|
- JobCounterUpdateEvent jce =
|
|
|
- new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
|
|
- jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
|
|
|
- eventHandler.handle(jce);
|
|
|
- rackLocalAssigned++;
|
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Assigned based on rack match " + rack);
|
|
|
- }
|
|
|
- break;
|
|
|
- }
|
|
|
- }
|
|
|
- if (assigned == null && maps.size() > 0) {
|
|
|
- TaskAttemptId tId = maps.keySet().iterator().next();
|
|
|
- assigned = maps.remove(tId);
|
|
|
+ }
|
|
|
+
|
|
|
+ // try to match all rack local
|
|
|
+ it = allocatedContainers.iterator();
|
|
|
+ while(it.hasNext() && maps.size() > 0){
|
|
|
+ Container allocated = it.next();
|
|
|
+ Priority priority = allocated.getPriority();
|
|
|
+ assert PRIORITY_MAP.equals(priority);
|
|
|
+ // "if (maps.containsKey(tId))" below should be almost always true.
|
|
|
+ // hence this while loop would almost always have O(1) complexity
|
|
|
+ String host = allocated.getNodeId().getHost();
|
|
|
+ String rack = RackResolver.resolve(host).getNetworkLocation();
|
|
|
+ LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
|
|
|
+ while (list != null && list.size() > 0) {
|
|
|
+ TaskAttemptId tId = list.removeFirst();
|
|
|
+ if (maps.containsKey(tId)) {
|
|
|
+ ContainerRequest assigned = maps.remove(tId);
|
|
|
+ containerAssigned(allocated, assigned);
|
|
|
+ it.remove();
|
|
|
JobCounterUpdateEvent jce =
|
|
|
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
|
|
- jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
|
|
|
+ jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
|
|
|
eventHandler.handle(jce);
|
|
|
+ rackLocalAssigned++;
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("Assigned based on * match");
|
|
|
+ LOG.debug("Assigned based on rack match " + rack);
|
|
|
}
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- return assigned;
|
|
|
+
|
|
|
+ // assign remaining
|
|
|
+ it = allocatedContainers.iterator();
|
|
|
+ while(it.hasNext() && maps.size() > 0){
|
|
|
+ Container allocated = it.next();
|
|
|
+ Priority priority = allocated.getPriority();
|
|
|
+ assert PRIORITY_MAP.equals(priority);
|
|
|
+ TaskAttemptId tId = maps.keySet().iterator().next();
|
|
|
+ ContainerRequest assigned = maps.remove(tId);
|
|
|
+ containerAssigned(allocated, assigned);
|
|
|
+ it.remove();
|
|
|
+ JobCounterUpdateEvent jce =
|
|
|
+ new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
|
|
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
|
|
|
+ eventHandler.handle(jce);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigned based on * match");
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|