|
@@ -41,6 +41,7 @@ import org.apache.hadoop.mapreduce.JobCounter;
|
|
|
import org.apache.hadoop.mapreduce.MRJobConfig;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
|
|
|
import org.apache.hadoop.mapreduce.jobhistory.NormalizedResourceEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
|
|
|
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.AppContext;
|
|
@@ -254,28 +255,30 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
@SuppressWarnings({ "unchecked" })
|
|
|
protected synchronized void handleEvent(ContainerAllocatorEvent event) {
|
|
|
- LOG.info("Processing the event " + event.toString());
|
|
|
recalculateReduceSchedule = true;
|
|
|
if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ) {
|
|
|
ContainerRequestEvent reqEvent = (ContainerRequestEvent) event;
|
|
|
+ JobId jobId = getJob().getID();
|
|
|
+ int supportedMaxContainerCapability =
|
|
|
+ getMaxContainerCapability().getMemory();
|
|
|
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
|
|
|
if (mapResourceReqt == 0) {
|
|
|
mapResourceReqt = reqEvent.getCapability().getMemory();
|
|
|
int minSlotMemSize = getMinContainerCapability().getMemory();
|
|
|
mapResourceReqt = (int) Math.ceil((float) mapResourceReqt/minSlotMemSize)
|
|
|
* minSlotMemSize;
|
|
|
- eventHandler.handle(new JobHistoryEvent(getJob().getID(),
|
|
|
+ eventHandler.handle(new JobHistoryEvent(jobId,
|
|
|
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
|
|
|
mapResourceReqt)));
|
|
|
LOG.info("mapResourceReqt:"+mapResourceReqt);
|
|
|
- if (mapResourceReqt > getMaxContainerCapability().getMemory()) {
|
|
|
+ if (mapResourceReqt > supportedMaxContainerCapability) {
|
|
|
String diagMsg = "MAP capability required is more than the supported " +
|
|
|
"max container capability in the cluster. Killing the Job. mapResourceReqt: " +
|
|
|
- mapResourceReqt + " maxContainerCapability:" + getMaxContainerCapability().getMemory();
|
|
|
+ mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
|
|
|
LOG.info(diagMsg);
|
|
|
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
|
|
- getJob().getID(), diagMsg));
|
|
|
- eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
|
|
|
+ jobId, diagMsg));
|
|
|
+ eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
|
|
}
|
|
|
}
|
|
|
//set the rounded off memory
|
|
@@ -288,20 +291,20 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
//round off on slotsize
|
|
|
reduceResourceReqt = (int) Math.ceil((float)
|
|
|
reduceResourceReqt/minSlotMemSize) * minSlotMemSize;
|
|
|
- eventHandler.handle(new JobHistoryEvent(getJob().getID(),
|
|
|
+ eventHandler.handle(new JobHistoryEvent(jobId,
|
|
|
new NormalizedResourceEvent(
|
|
|
org.apache.hadoop.mapreduce.TaskType.REDUCE,
|
|
|
reduceResourceReqt)));
|
|
|
LOG.info("reduceResourceReqt:"+reduceResourceReqt);
|
|
|
- if (reduceResourceReqt > getMaxContainerCapability().getMemory()) {
|
|
|
+ if (reduceResourceReqt > supportedMaxContainerCapability) {
|
|
|
String diagMsg = "REDUCE capability required is more than the " +
|
|
|
"supported max container capability in the cluster. Killing the " +
|
|
|
"Job. reduceResourceReqt: " + reduceResourceReqt +
|
|
|
- " maxContainerCapability:" + getMaxContainerCapability().getMemory();
|
|
|
+ " maxContainerCapability:" + supportedMaxContainerCapability;
|
|
|
LOG.info(diagMsg);
|
|
|
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
|
|
- getJob().getID(), diagMsg));
|
|
|
- eventHandler.handle(new JobEvent(getJob().getID(), JobEventType.JOB_KILL));
|
|
|
+ jobId, diagMsg));
|
|
|
+ eventHandler.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
|
|
|
}
|
|
|
}
|
|
|
//set the rounded off memory
|
|
@@ -317,6 +320,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
} else if (
|
|
|
event.getType() == ContainerAllocator.EventType.CONTAINER_DEALLOCATE) {
|
|
|
+
|
|
|
+ LOG.info("Processing the event " + event.toString());
|
|
|
+
|
|
|
TaskAttemptId aId = event.getAttemptID();
|
|
|
|
|
|
boolean removed = scheduledRequests.remove(aId);
|
|
@@ -579,7 +585,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
computeIgnoreBlacklisting();
|
|
|
|
|
|
for (ContainerStatus cont : finishedContainers) {
|
|
|
- LOG.info("Received completed container " + cont);
|
|
|
+ LOG.info("Received completed container " + cont.getContainerId());
|
|
|
TaskAttemptId attemptID = assignedRequests.get(cont.getContainerId());
|
|
|
if (attemptID == null) {
|
|
|
LOG.error("Container complete event for unknown container id "
|
|
@@ -664,7 +670,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
mapsHostMapping.put(host, list);
|
|
|
}
|
|
|
list.add(event.getAttemptID());
|
|
|
- LOG.info("Added attempt req to host " + host);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Added attempt req to host " + host);
|
|
|
+ }
|
|
|
}
|
|
|
for (String rack: event.getRacks()) {
|
|
|
LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
|
|
@@ -673,7 +681,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
mapsRackMapping.put(rack, list);
|
|
|
}
|
|
|
list.add(event.getAttemptID());
|
|
|
- LOG.info("Added attempt req to rack " + rack);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Added attempt req to rack " + rack);
|
|
|
+ }
|
|
|
}
|
|
|
request = new ContainerRequest(event, PRIORITY_MAP);
|
|
|
}
|
|
@@ -694,18 +704,21 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
containersAllocated += allocatedContainers.size();
|
|
|
while (it.hasNext()) {
|
|
|
Container allocated = it.next();
|
|
|
- LOG.info("Assigning container " + allocated.getId() +
|
|
|
- " with priority " + allocated.getPriority() +
|
|
|
- " to NM " + allocated.getNodeId());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigning container " + allocated.getId()
|
|
|
+ + " with priority " + allocated.getPriority() + " to NM "
|
|
|
+ + allocated.getNodeId());
|
|
|
+ }
|
|
|
|
|
|
// check if allocated container meets memory requirements
|
|
|
// and whether we have any scheduled tasks that need
|
|
|
// a container to be assigned
|
|
|
boolean isAssignable = true;
|
|
|
Priority priority = allocated.getPriority();
|
|
|
+ int allocatedMemory = allocated.getResource().getMemory();
|
|
|
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|
|
|
|| PRIORITY_MAP.equals(priority)) {
|
|
|
- if (allocated.getResource().getMemory() < mapResourceReqt
|
|
|
+ if (allocatedMemory < mapResourceReqt
|
|
|
|| maps.isEmpty()) {
|
|
|
LOG.info("Cannot assign container " + allocated
|
|
|
+ " for a map as either "
|
|
@@ -716,7 +729,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
else if (PRIORITY_REDUCE.equals(priority)) {
|
|
|
- if (allocated.getResource().getMemory() < reduceResourceReqt
|
|
|
+ if (allocatedMemory < reduceResourceReqt
|
|
|
|| reduces.isEmpty()) {
|
|
|
LOG.info("Cannot assign container " + allocated
|
|
|
+ " for a reduce as either "
|
|
@@ -730,15 +743,17 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
boolean blackListed = false;
|
|
|
ContainerRequest assigned = null;
|
|
|
|
|
|
+ ContainerId allocatedContainerId = allocated.getId();
|
|
|
if (isAssignable) {
|
|
|
// do not assign if allocated container is on a
|
|
|
// blacklisted host
|
|
|
- blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
|
|
|
+ String allocatedHost = allocated.getNodeId().getHost();
|
|
|
+ blackListed = isNodeBlacklisted(allocatedHost);
|
|
|
if (blackListed) {
|
|
|
// we need to request for a new container
|
|
|
// and release the current one
|
|
|
LOG.info("Got allocated container on a blacklisted "
|
|
|
- + " host "+allocated.getNodeId().getHost()
|
|
|
+ + " host "+allocatedHost
|
|
|
+". Releasing container " + allocated);
|
|
|
|
|
|
// find the request matching this allocated container
|
|
@@ -775,11 +790,13 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
|
|
assigned.attemptID, allocated, applicationACLs));
|
|
|
|
|
|
- assignedRequests.add(allocated.getId(), assigned.attemptID);
|
|
|
+ assignedRequests.add(allocatedContainerId, assigned.attemptID);
|
|
|
|
|
|
- LOG.info("Assigned container (" + allocated + ") " +
|
|
|
- " to task " + assigned.attemptID +
|
|
|
- " on node " + allocated.getNodeId().toString());
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.info("Assigned container (" + allocated + ") "
|
|
|
+ + " to task " + assigned.attemptID + " on node "
|
|
|
+ + allocated.getNodeId().toString());
|
|
|
+ }
|
|
|
}
|
|
|
else {
|
|
|
//not assigned to any request, release the container
|
|
@@ -794,7 +811,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
// or if we could not assign it
|
|
|
if (blackListed || assigned == null) {
|
|
|
containersReleased++;
|
|
|
- release(allocated.getId());
|
|
|
+ release(allocatedContainerId);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -807,10 +824,14 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
LOG.info("Assigning container " + allocated + " to fast fail map");
|
|
|
assigned = assignToFailedMap(allocated);
|
|
|
} else if (PRIORITY_REDUCE.equals(priority)) {
|
|
|
- LOG.info("Assigning container " + allocated + " to reduce");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigning container " + allocated + " to reduce");
|
|
|
+ }
|
|
|
assigned = assignToReduce(allocated);
|
|
|
} else if (PRIORITY_MAP.equals(priority)) {
|
|
|
- LOG.info("Assigning container " + allocated + " to map");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigning container " + allocated + " to map");
|
|
|
+ }
|
|
|
assigned = assignToMap(allocated);
|
|
|
} else {
|
|
|
LOG.warn("Container allocated at unwanted priority: " + priority +
|
|
@@ -897,7 +918,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
String host = allocated.getNodeId().getHost();
|
|
|
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
|
|
|
while (list != null && list.size() > 0) {
|
|
|
- LOG.info("Host matched to the request list " + host);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Host matched to the request list " + host);
|
|
|
+ }
|
|
|
TaskAttemptId tId = list.removeFirst();
|
|
|
if (maps.containsKey(tId)) {
|
|
|
assigned = maps.remove(tId);
|
|
@@ -906,7 +929,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
|
|
|
eventHandler.handle(jce);
|
|
|
hostLocalAssigned++;
|
|
|
- LOG.info("Assigned based on host match " + host);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigned based on host match " + host);
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -922,7 +947,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
|
|
|
eventHandler.handle(jce);
|
|
|
rackLocalAssigned++;
|
|
|
- LOG.info("Assigned based on rack match " + rack);
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigned based on rack match " + rack);
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -933,7 +960,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
|
|
|
jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
|
|
|
eventHandler.handle(jce);
|
|
|
- LOG.info("Assigned based on * match");
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("Assigned based on * match");
|
|
|
+ }
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -953,8 +982,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
new HashSet<TaskAttemptId>();
|
|
|
|
|
|
void add(ContainerId containerId, TaskAttemptId tId) {
|
|
|
- LOG.info("Assigned container " + containerId.toString()
|
|
|
- + " to " + tId);
|
|
|
+ LOG.info("Assigned container " + containerId.toString() + " to " + tId);
|
|
|
containerToAttemptMap.put(containerId, tId);
|
|
|
if (tId.getTaskId().getTaskType().equals(TaskType.MAP)) {
|
|
|
maps.put(tId, containerId);
|
|
@@ -963,6 +991,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
void preemptReduce(int toPreempt) {
|
|
|
List<TaskAttemptId> reduceList = new ArrayList<TaskAttemptId>
|
|
|
(reduces.keySet());
|