|
@@ -58,6 +58,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerRequestor.ContainerRequest;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.util.StringInterner;
|
|
@@ -94,7 +95,7 @@ import org.slf4j.LoggerFactory;
|
|
|
/**
|
|
|
* Allocates the container from the ResourceManager scheduler.
|
|
|
*/
|
|
|
-public class RMContainerAllocator extends RMContainerRequestor
|
|
|
+public class RMContainerAllocator extends RMCommunicator
|
|
|
implements ContainerAllocator {
|
|
|
|
|
|
static final Logger LOG = LoggerFactory.getLogger(RMContainerAllocator.class);
|
|
@@ -114,6 +115,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
private Thread eventHandlingThread;
|
|
|
private final AtomicBoolean stopped;
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ protected RMContainerRequestor containerRequestor;
|
|
|
+
|
|
|
static {
|
|
|
PRIORITY_FAST_FAIL_MAP = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Priority.class);
|
|
|
PRIORITY_FAST_FAIL_MAP.setPriority(5);
|
|
@@ -207,6 +211,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
return new AssignedRequests();
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
@Override
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|
|
|
super.serviceInit(conf);
|
|
@@ -242,6 +247,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
MRJobConfig.DEFAULT_MR_NUM_OPPORTUNISTIC_MAPS_PERCENT));
|
|
|
LOG.info(this.scheduledRequests.getNumOpportunisticMapsPercent() +
|
|
|
"% of the mappers will be scheduled using OPPORTUNISTIC containers");
|
|
|
+ containerRequestor = new RMContainerRequestor(this);
|
|
|
+ containerRequestor.init(conf);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -398,8 +405,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
removed = true;
|
|
|
assignedRequests.remove(aId);
|
|
|
containersReleased++;
|
|
|
- pendingRelease.add(containerId);
|
|
|
- release(containerId);
|
|
|
+ containerRequestor.pendingRelease.add(containerId);
|
|
|
+ containerRequestor.release(containerId);
|
|
|
}
|
|
|
}
|
|
|
if (!removed) {
|
|
@@ -411,7 +418,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
|
|
|
ContainerFailedEvent fEv = (ContainerFailedEvent) event;
|
|
|
String host = getHost(fEv.getContMgrAddress());
|
|
|
- containerFailedOnHost(host);
|
|
|
+ containerRequestor.containerFailedOnHost(host);
|
|
|
// propagate failures to preemption policy to discard checkpoints for
|
|
|
// failed tasks
|
|
|
preemptionPolicy.handleFailedContainer(event.getAttemptID());
|
|
@@ -569,13 +576,14 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
// there are enough resources for a mapper to run. This is calculated by
|
|
|
// excluding scheduled reducers from headroom and comparing it against
|
|
|
// resources required to run one mapper.
|
|
|
- Resource scheduledReducesResource = Resources.multiply(
|
|
|
- reduceResourceRequest, scheduledRequests.reduces.size());
|
|
|
- Resource availableResourceForMap =
|
|
|
- Resources.subtract(getAvailableResources(), scheduledReducesResource);
|
|
|
- if (ResourceCalculatorUtils.computeAvailableContainers(availableResourceForMap,
|
|
|
- mapResourceRequest, getSchedulerResourceTypes()) > 0) {
|
|
|
- // Enough room to run a mapper
|
|
|
+ Resource scheduledReducesResource = Resources
|
|
|
+ .multiply(reduceResourceRequest, scheduledRequests.reduces.size());
|
|
|
+ Resource availableResourceForMap = Resources.subtract(
|
|
|
+ containerRequestor.getAvailableResources(), scheduledReducesResource);
|
|
|
+ if (ResourceCalculatorUtils.computeAvailableContainers(
|
|
|
+ availableResourceForMap, mapResourceRequest,
|
|
|
+ getSchedulerResourceTypes()) > 0) {
|
|
|
+ // Enough room to run a mapper
|
|
|
return false;
|
|
|
}
|
|
|
|
|
@@ -651,7 +659,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
|
|
|
// get available resources for this job
|
|
|
- Resource headRoom = getAvailableResources();
|
|
|
+ Resource headRoom = containerRequestor.getAvailableResources();
|
|
|
|
|
|
LOG.info("Recalculating schedule, headroom=" + headRoom);
|
|
|
|
|
@@ -782,7 +790,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
applyConcurrentTaskLimits();
|
|
|
|
|
|
// will be null the first time
|
|
|
- Resource headRoom = Resources.clone(getAvailableResources());
|
|
|
+ Resource headRoom = Resources
|
|
|
+ .clone(containerRequestor.getAvailableResources());
|
|
|
AllocateResponse response;
|
|
|
/*
|
|
|
* If contact with RM is lost, the AM will wait MR_AM_TO_RM_WAIT_INTERVAL_MS
|
|
@@ -790,7 +799,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
* to contact the RM.
|
|
|
*/
|
|
|
try {
|
|
|
- response = makeRemoteRequest();
|
|
|
+ response = containerRequestor.makeRemoteRequest();
|
|
|
// Reset retry count if no exception occurred.
|
|
|
retrystartTime = System.currentTimeMillis();
|
|
|
} catch (ApplicationAttemptNotFoundException e ) {
|
|
@@ -805,9 +814,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
LOG.info("ApplicationMaster is out of sync with ResourceManager,"
|
|
|
+ " hence resync and send outstanding requests.");
|
|
|
// RM may have restarted, re-register with RM.
|
|
|
- lastResponseID = 0;
|
|
|
+ containerRequestor.lastResponseID = 0;
|
|
|
register();
|
|
|
- addOutstandingRequestOnResync();
|
|
|
+ containerRequestor.addOutstandingRequestOnResync();
|
|
|
return null;
|
|
|
} catch (InvalidLabelResourceRequestException e) {
|
|
|
// If Invalid label exception is received means the requested label doesnt
|
|
@@ -833,7 +842,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
// continue to attempt to contact the RM.
|
|
|
throw e;
|
|
|
}
|
|
|
- Resource newHeadRoom = getAvailableResources();
|
|
|
+ Resource newHeadRoom = containerRequestor.getAvailableResources();
|
|
|
List<Container> newContainers = response.getAllocatedContainers();
|
|
|
// Setting NMTokens
|
|
|
if (response.getNMTokens() != null) {
|
|
@@ -874,7 +883,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
|
|
|
//Called on each allocation. Will know about newly blacklisted/added hosts.
|
|
|
- computeIgnoreBlacklisting();
|
|
|
+ containerRequestor.computeIgnoreBlacklisting();
|
|
|
|
|
|
handleUpdatedNodes(response);
|
|
|
handleJobPriorityChange(response);
|
|
@@ -900,7 +909,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
LOG.error("Container complete event for unknown container "
|
|
|
+ container.getContainerId());
|
|
|
} else {
|
|
|
- pendingRelease.remove(container.getContainerId());
|
|
|
+ containerRequestor.pendingRelease.remove(container.getContainerId());
|
|
|
assignedRequests.remove(attemptID);
|
|
|
|
|
|
// Send the diagnostics
|
|
@@ -927,11 +936,12 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
int normalMapRequestLimit = Math.min(
|
|
|
maxRequestedMaps - failedMapRequestLimit,
|
|
|
numScheduledMaps - numScheduledFailMaps);
|
|
|
- setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
|
|
|
- failedMapRequestLimit);
|
|
|
- setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
|
|
|
- setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP, mapResourceRequest,
|
|
|
+ containerRequestor.setRequestLimit(PRIORITY_FAST_FAIL_MAP,
|
|
|
+ mapResourceRequest, failedMapRequestLimit);
|
|
|
+ containerRequestor.setRequestLimit(PRIORITY_MAP, mapResourceRequest,
|
|
|
normalMapRequestLimit);
|
|
|
+ containerRequestor.setRequestLimit(PRIORITY_OPPORTUNISTIC_MAP,
|
|
|
+ mapResourceRequest, normalMapRequestLimit);
|
|
|
}
|
|
|
|
|
|
int numScheduledReduces = scheduledRequests.reduces.size();
|
|
@@ -941,7 +951,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
maxRunningReduces - assignedRequests.reduces.size());
|
|
|
int reduceRequestLimit = Math.min(maxRequestedReduces,
|
|
|
numScheduledReduces);
|
|
|
- setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
|
|
|
+ containerRequestor.setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
|
|
|
reduceRequestLimit);
|
|
|
}
|
|
|
}
|
|
@@ -1036,7 +1046,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
@Private
|
|
|
public Resource getResourceLimit() {
|
|
|
- Resource headRoom = getAvailableResources();
|
|
|
+ Resource headRoom = containerRequestor.getAvailableResources();
|
|
|
Resource assignedMapResource =
|
|
|
Resources.multiply(mapResourceRequest, assignedRequests.maps.size());
|
|
|
Resource assignedReduceResource =
|
|
@@ -1087,7 +1097,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
if (req == null) {
|
|
|
return false;
|
|
|
} else {
|
|
|
- decContainerReq(req);
|
|
|
+ containerRequestor.decContainerReq(req);
|
|
|
return true;
|
|
|
}
|
|
|
}
|
|
@@ -1097,7 +1107,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
if (it.hasNext()) {
|
|
|
Entry<TaskAttemptId, ContainerRequest> entry = it.next();
|
|
|
it.remove();
|
|
|
- decContainerReq(entry.getValue());
|
|
|
+ containerRequestor.decContainerReq(entry.getValue());
|
|
|
return entry.getValue();
|
|
|
}
|
|
|
return null;
|
|
@@ -1114,14 +1124,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
|
|
|
// If its an earlier Failed attempt, do not retry as OPPORTUNISTIC
|
|
|
maps.put(event.getAttemptID(), request);
|
|
|
- addContainerReq(request);
|
|
|
+ containerRequestor.addContainerReq(request);
|
|
|
} else {
|
|
|
if (mapsMod100 < numOpportunisticMapsPercent) {
|
|
|
request =
|
|
|
new ContainerRequest(event, PRIORITY_OPPORTUNISTIC_MAP,
|
|
|
mapNodeLabelExpression);
|
|
|
maps.put(event.getAttemptID(), request);
|
|
|
- addOpportunisticResourceRequest(request.priority, request.capability);
|
|
|
+ containerRequestor.addOpportunisticResourceRequest(request.priority,
|
|
|
+ request.capability);
|
|
|
} else {
|
|
|
request =
|
|
|
new ContainerRequest(event, PRIORITY_MAP, mapNodeLabelExpression);
|
|
@@ -1148,7 +1159,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
maps.put(event.getAttemptID(), request);
|
|
|
- addContainerReq(request);
|
|
|
+ containerRequestor.addContainerReq(request);
|
|
|
}
|
|
|
mapsMod100++;
|
|
|
mapsMod100 %= 100;
|
|
@@ -1158,7 +1169,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
void addReduce(ContainerRequest req) {
|
|
|
reduces.put(req.attemptID, req);
|
|
|
- addContainerReq(req);
|
|
|
+ containerRequestor.addContainerReq(req);
|
|
|
}
|
|
|
|
|
|
// this method will change the list of allocatedContainers.
|
|
@@ -1223,7 +1234,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
// do not assign if allocated container is on a
|
|
|
// blacklisted host
|
|
|
String allocatedHost = allocated.getNodeId().getHost();
|
|
|
- if (isNodeBlacklisted(allocatedHost)) {
|
|
|
+ if (containerRequestor.isNodeBlacklisted(allocatedHost)) {
|
|
|
// we need to request for a new container
|
|
|
// and release the current one
|
|
|
LOG.info("Got allocated container on a blacklisted "
|
|
@@ -1237,9 +1248,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
if (toBeReplacedReq != null) {
|
|
|
LOG.info("Placing a new container request for task attempt "
|
|
|
+ toBeReplacedReq.attemptID);
|
|
|
- ContainerRequest newReq =
|
|
|
- getFilteredContainerRequest(toBeReplacedReq);
|
|
|
- decContainerReq(toBeReplacedReq);
|
|
|
+ ContainerRequest newReq = containerRequestor
|
|
|
+ .filterRequest(toBeReplacedReq);
|
|
|
+ containerRequestor.decContainerReq(toBeReplacedReq);
|
|
|
if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
|
|
|
TaskType.MAP) {
|
|
|
maps.put(newReq.attemptID, newReq);
|
|
@@ -1247,7 +1258,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
else {
|
|
|
reduces.put(newReq.attemptID, newReq);
|
|
|
}
|
|
|
- addContainerReq(newReq);
|
|
|
+ containerRequestor.addContainerReq(newReq);
|
|
|
}
|
|
|
else {
|
|
|
LOG.info("Could not map allocated container to a valid request."
|
|
@@ -1276,7 +1287,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
private void containerAssigned(Container allocated,
|
|
|
ContainerRequest assigned) {
|
|
|
// Update resource requests
|
|
|
- decContainerReq(assigned);
|
|
|
+ containerRequestor.decContainerReq(assigned);
|
|
|
|
|
|
// send the container-assigned event to task attempt
|
|
|
eventHandler.handle(new TaskAttemptContainerAssignedEvent(
|
|
@@ -1293,8 +1304,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
private void containerNotAssigned(Container allocated) {
|
|
|
containersReleased++;
|
|
|
- pendingRelease.add(allocated.getId());
|
|
|
- release(allocated.getId());
|
|
|
+ containerRequestor.pendingRelease.add(allocated.getId());
|
|
|
+ containerRequestor.release(allocated.getId());
|
|
|
}
|
|
|
|
|
|
private ContainerRequest assignWithoutLocality(Container allocated) {
|
|
@@ -1659,4 +1670,18 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
|
|
|
}
|
|
|
|
|
|
+ public Set<String> getBlacklistedNodes() {
|
|
|
+ return containerRequestor.getBlacklistedNodes();
|
|
|
+ }
|
|
|
+
|
|
|
+ public RMContainerRequestor getContainerRequestor() {
|
|
|
+ return containerRequestor;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void resetContainerForReuse(ContainerId containerId) {
|
|
|
+ TaskAttemptId attemptId = assignedRequests.get(containerId);
|
|
|
+ if (attemptId != null) {
|
|
|
+ assignedRequests.remove(attemptId);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|