|
@@ -509,18 +509,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
|
|
|
} else {
|
|
|
for (String host : event.getHosts()) {
|
|
|
- //host comes from data splitLocations which are hostnames. Containers
|
|
|
- // use IP addresses.
|
|
|
- //TODO Temporary fix for locality. Use resolvers from h-common.
|
|
|
- // Cache to make this more efficient ?
|
|
|
- InetAddress addr = null;
|
|
|
- try {
|
|
|
- addr = InetAddress.getByName(host);
|
|
|
- } catch (UnknownHostException e) {
|
|
|
- LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
|
|
|
- }
|
|
|
- if (addr != null) //Fallback to host if resolve fails.
|
|
|
- host = addr.getHostAddress();
|
|
|
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
|
|
|
if (list == null) {
|
|
|
list = new LinkedList<TaskAttemptId>();
|
|
@@ -557,26 +545,101 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
while (it.hasNext()) {
|
|
|
Container allocated = it.next();
|
|
|
LOG.info("Assigning container " + allocated);
|
|
|
- ContainerRequest assigned = assign(allocated);
|
|
|
-
|
|
|
- if (assigned != null) {
|
|
|
- // Update resource requests
|
|
|
- decContainerReq(assigned);
|
|
|
+
|
|
|
+ // check if allocated container meets memory requirements
|
|
|
+ // and whether we have any scheduled tasks that need
|
|
|
+ // a container to be assigned
|
|
|
+ boolean isAssignable = true;
|
|
|
+ Priority priority = allocated.getPriority();
|
|
|
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|
|
|
+ || PRIORITY_MAP.equals(priority)) {
|
|
|
+ if (allocated.getResource().getMemory() < mapResourceReqt
|
|
|
+ || maps.isEmpty()) {
|
|
|
+ LOG.info("Cannot assign container " + allocated
|
|
|
+ + " for a map as either "
|
|
|
+ + " container memory less than required " + mapResourceReqt
|
|
|
+ + " or no pending map tasks - maps.isEmpty="
|
|
|
+ + maps.isEmpty());
|
|
|
+ isAssignable = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (PRIORITY_REDUCE.equals(priority)) {
|
|
|
+ if (allocated.getResource().getMemory() < reduceResourceReqt
|
|
|
+ || reduces.isEmpty()) {
|
|
|
+ LOG.info("Cannot assign container " + allocated
|
|
|
+ + " for a reduce as either "
|
|
|
+ + " container memory less than required " + reduceResourceReqt
|
|
|
+ + " or no pending reduce tasks - reduces.isEmpty="
|
|
|
+ + reduces.isEmpty());
|
|
|
+ isAssignable = false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ boolean blackListed = false;
|
|
|
+ ContainerRequest assigned = null;
|
|
|
+
|
|
|
+ if (isAssignable) {
|
|
|
+ // do not assign if allocated container is on a
|
|
|
+ // blacklisted host
|
|
|
+ blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
|
|
|
+ if (blackListed) {
|
|
|
+ // we need to request for a new container
|
|
|
+ // and release the current one
|
|
|
+ LOG.info("Got allocated container on a blacklisted "
|
|
|
+ + " host. Releasing container " + allocated);
|
|
|
+
|
|
|
+ // find the request matching this allocated container
|
|
|
+ // and replace it with a new one
|
|
|
+ ContainerRequest toBeReplacedReq =
|
|
|
+ getContainerReqToReplace(allocated);
|
|
|
+ if (toBeReplacedReq != null) {
|
|
|
+ LOG.info("Placing a new container request for task attempt "
|
|
|
+ + toBeReplacedReq.attemptID);
|
|
|
+ ContainerRequest newReq =
|
|
|
+ getFilteredContainerRequest(toBeReplacedReq);
|
|
|
+ decContainerReq(toBeReplacedReq);
|
|
|
+ if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
|
|
|
+ TaskType.MAP) {
|
|
|
+ maps.put(newReq.attemptID, newReq);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ reduces.put(newReq.attemptID, newReq);
|
|
|
+ }
|
|
|
+ addContainerReq(newReq);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ LOG.info("Could not map allocated container to a valid request."
|
|
|
+ + " Releasing allocated container " + allocated);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ assigned = assign(allocated);
|
|
|
+ if (assigned != null) {
|
|
|
+ // Update resource requests
|
|
|
+ decContainerReq(assigned);
|
|
|
|
|
|
- // send the container-assigned event to task attempt
|
|
|
- eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
|
|
- assigned.attemptID, allocated));
|
|
|
+ // send the container-assigned event to task attempt
|
|
|
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
|
|
+ assigned.attemptID, allocated));
|
|
|
|
|
|
- assignedRequests.add(allocated.getId(), assigned.attemptID);
|
|
|
-
|
|
|
- LOG.info("Assigned container (" + allocated + ") " +
|
|
|
- " to task " + assigned.attemptID +
|
|
|
- " on node " + allocated.getNodeId().toString());
|
|
|
- } else {
|
|
|
- //not assigned to any request, release the container
|
|
|
- LOG.info("Releasing unassigned and invalid container " + allocated
|
|
|
- + ". RM has gone crazy, someone go look!"
|
|
|
- + " Hey RM, if you are so rich, go donate to non-profits!");
|
|
|
+ assignedRequests.add(allocated.getId(), assigned.attemptID);
|
|
|
+
|
|
|
+ LOG.info("Assigned container (" + allocated + ") " +
|
|
|
+ " to task " + assigned.attemptID +
|
|
|
+ " on node " + allocated.getNodeId().toString());
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ //not assigned to any request, release the container
|
|
|
+ LOG.info("Releasing unassigned and invalid container "
|
|
|
+ + allocated + ". RM has gone crazy, someone go look!"
|
|
|
+ + " Hey RM, if you are so rich, go donate to non-profits!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // release container if it was blacklisted
|
|
|
+ // or if we could not assign it
|
|
|
+ if (blackListed || assigned == null) {
|
|
|
containersReleased++;
|
|
|
release(allocated.getId());
|
|
|
}
|
|
@@ -604,12 +667,37 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
return assigned;
|
|
|
}
|
|
|
|
|
|
+ private ContainerRequest getContainerReqToReplace(Container allocated) {
|
|
|
+ Priority priority = allocated.getPriority();
|
|
|
+ ContainerRequest toBeReplaced = null;
|
|
|
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|
|
|
+ || PRIORITY_MAP.equals(priority)) {
|
|
|
+ // allocated container was for a map
|
|
|
+ String host = allocated.getNodeId().getHost();
|
|
|
+ LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
|
|
|
+ if (list != null && list.size() > 0) {
|
|
|
+ TaskAttemptId tId = list.removeLast();
|
|
|
+ if (maps.containsKey(tId)) {
|
|
|
+ toBeReplaced = maps.remove(tId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ TaskAttemptId tId = maps.keySet().iterator().next();
|
|
|
+ toBeReplaced = maps.remove(tId);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else if (PRIORITY_REDUCE.equals(priority)) {
|
|
|
+ TaskAttemptId tId = reduces.keySet().iterator().next();
|
|
|
+ toBeReplaced = reduces.remove(tId);
|
|
|
+ }
|
|
|
+ return toBeReplaced;
|
|
|
+ }
|
|
|
+
|
|
|
|
|
|
private ContainerRequest assignToFailedMap(Container allocated) {
|
|
|
//try to assign to earlierFailedMaps if present
|
|
|
ContainerRequest assigned = null;
|
|
|
- while (assigned == null && earlierFailedMaps.size() > 0 &&
|
|
|
- allocated.getResource().getMemory() >= mapResourceReqt) {
|
|
|
+ while (assigned == null && earlierFailedMaps.size() > 0) {
|
|
|
TaskAttemptId tId = earlierFailedMaps.removeFirst();
|
|
|
if (maps.containsKey(tId)) {
|
|
|
assigned = maps.remove(tId);
|
|
@@ -627,8 +715,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
private ContainerRequest assignToReduce(Container allocated) {
|
|
|
ContainerRequest assigned = null;
|
|
|
//try to assign to reduces if present
|
|
|
- if (assigned == null && reduces.size() > 0
|
|
|
- && allocated.getResource().getMemory() >= reduceResourceReqt) {
|
|
|
+ if (assigned == null && reduces.size() > 0) {
|
|
|
TaskAttemptId tId = reduces.keySet().iterator().next();
|
|
|
assigned = reduces.remove(tId);
|
|
|
LOG.info("Assigned to reduce");
|
|
@@ -640,9 +727,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
//try to assign to maps if present
|
|
|
//first by host, then by rack, followed by *
|
|
|
ContainerRequest assigned = null;
|
|
|
- while (assigned == null && maps.size() > 0
|
|
|
- && allocated.getResource().getMemory() >= mapResourceReqt) {
|
|
|
- String host = getHost(allocated.getNodeId().toString());
|
|
|
+ while (assigned == null && maps.size() > 0) {
|
|
|
+ String host = allocated.getNodeId().getHost();
|
|
|
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
|
|
|
while (list != null && list.size() > 0) {
|
|
|
LOG.info("Host matched to the request list " + host);
|