|
@@ -228,12 +228,12 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
boolean removed = scheduledRequests.remove(aId);
|
|
|
if (!removed) {
|
|
|
- Container container = assignedRequests.get(aId);
|
|
|
- if (container != null) {
|
|
|
+ ContainerId containerId = assignedRequests.get(aId);
|
|
|
+ if (containerId != null) {
|
|
|
removed = true;
|
|
|
assignedRequests.remove(aId);
|
|
|
containersReleased++;
|
|
|
- release(container);
|
|
|
+ release(containerId);
|
|
|
}
|
|
|
}
|
|
|
if (!removed) {
|
|
@@ -565,16 +565,18 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
|
|
assigned.attemptID, allocated));
|
|
|
|
|
|
- assignedRequests.add(allocated, assigned.attemptID);
|
|
|
+ assignedRequests.add(allocated.getId(), assigned.attemptID);
|
|
|
|
|
|
LOG.info("Assigned container (" + allocated + ") " +
|
|
|
" to task " + assigned.attemptID +
|
|
|
" on node " + allocated.getNodeId().toString());
|
|
|
} else {
|
|
|
//not assigned to any request, release the container
|
|
|
- LOG.info("Releasing unassigned container " + allocated);
|
|
|
+ LOG.info("Releasing unassigned and invalid container " + allocated
|
|
|
+ + ". RM has gone crazy, someone go look!"
|
|
|
+ + " Hey RM, if you are so rich, go donate to non-profits!");
|
|
|
containersReleased++;
|
|
|
- release(allocated);
|
|
|
+ release(allocated.getId());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -705,21 +707,21 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
private class AssignedRequests {
|
|
|
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
|
|
|
new HashMap<ContainerId, TaskAttemptId>();
|
|
|
- private final LinkedHashMap<TaskAttemptId, Container> maps =
|
|
|
- new LinkedHashMap<TaskAttemptId, Container>();
|
|
|
- private final LinkedHashMap<TaskAttemptId, Container> reduces =
|
|
|
- new LinkedHashMap<TaskAttemptId, Container>();
|
|
|
+ private final LinkedHashMap<TaskAttemptId, ContainerId> maps =
|
|
|
+ new LinkedHashMap<TaskAttemptId, ContainerId>();
|
|
|
+ private final LinkedHashMap<TaskAttemptId, ContainerId> reduces =
|
|
|
+ new LinkedHashMap<TaskAttemptId, ContainerId>();
|
|
|
private final Set<TaskAttemptId> preemptionWaitingReduces =
|
|
|
new HashSet<TaskAttemptId>();
|
|
|
|
|
|
- void add(Container container, TaskAttemptId tId) {
|
|
|
- LOG.info("Assigned container " + container.getNodeId().toString()
|
|
|
+ void add(ContainerId containerId, TaskAttemptId tId) {
|
|
|
+ LOG.info("Assigned container " + containerId.toString()
|
|
|
+ " to " + tId);
|
|
|
- containerToAttemptMap.put(container.getId(), tId);
|
|
|
+ containerToAttemptMap.put(containerId, tId);
|
|
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
|
|
- maps.put(tId, container);
|
|
|
+ maps.put(tId, containerId);
|
|
|
} else {
|
|
|
- reduces.put(tId, container);
|
|
|
+ reduces.put(tId, containerId);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -745,12 +747,12 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
|
|
|
boolean remove(TaskAttemptId tId) {
|
|
|
- Container container = null;
|
|
|
+ ContainerId containerId = null;
|
|
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
|
|
- container = maps.remove(tId);
|
|
|
+ containerId = maps.remove(tId);
|
|
|
} else {
|
|
|
- container = reduces.remove(tId);
|
|
|
- if (container != null) {
|
|
|
+ containerId = reduces.remove(tId);
|
|
|
+ if (containerId != null) {
|
|
|
boolean preempted = preemptionWaitingReduces.remove(tId);
|
|
|
if (preempted) {
|
|
|
LOG.info("Reduce preemption successful " + tId);
|
|
@@ -758,8 +760,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- if (container != null) {
|
|
|
- containerToAttemptMap.remove(container.getId());
|
|
|
+ if (containerId != null) {
|
|
|
+ containerToAttemptMap.remove(containerId);
|
|
|
return true;
|
|
|
}
|
|
|
return false;
|
|
@@ -769,7 +771,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
return containerToAttemptMap.get(cId);
|
|
|
}
|
|
|
|
|
|
- Container get(TaskAttemptId tId) {
|
|
|
+ ContainerId get(TaskAttemptId tId) {
|
|
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
|
|
return maps.get(tId);
|
|
|
} else {
|