|
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
-import org.apache.hadoop.yarn.util.Clock;
|
|
|
import org.apache.hadoop.yarn.util.RackResolver;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
@@ -142,21 +141,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
private int lastCompletedTasks = 0;
|
|
|
|
|
|
private boolean recalculateReduceSchedule = false;
|
|
|
- private int mapResourceRequest;//memory
|
|
|
- private int reduceResourceRequest;//memory
|
|
|
+ private int mapResourceReqt;//memory
|
|
|
+ private int reduceResourceReqt;//memory
|
|
|
|
|
|
private boolean reduceStarted = false;
|
|
|
private float maxReduceRampupLimit = 0;
|
|
|
private float maxReducePreemptionLimit = 0;
|
|
|
- /**
|
|
|
- * after this threshold, if the container request is not allocated, it is
|
|
|
- * considered delayed.
|
|
|
- */
|
|
|
- private long allocationDelayThresholdMs = 0;
|
|
|
private float reduceSlowStart = 0;
|
|
|
private long retryInterval;
|
|
|
private long retrystartTime;
|
|
|
- private Clock clock;
|
|
|
|
|
|
@VisibleForTesting
|
|
|
protected BlockingQueue<ContainerAllocatorEvent> eventQueue
|
|
@@ -167,7 +160,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
public RMContainerAllocator(ClientService clientService, AppContext context) {
|
|
|
super(clientService, context);
|
|
|
this.stopped = new AtomicBoolean(false);
|
|
|
- this.clock = context.getClock();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -182,9 +174,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
maxReducePreemptionLimit = conf.getFloat(
|
|
|
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
|
|
|
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
|
|
|
- allocationDelayThresholdMs = conf.getInt(
|
|
|
- MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
|
|
|
- MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
|
|
|
RackResolver.init(conf);
|
|
|
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
|
|
|
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
|
|
@@ -251,7 +240,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
getJob().getTotalMaps(), completedMaps,
|
|
|
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
|
|
|
assignedRequests.maps.size(), assignedRequests.reduces.size(),
|
|
|
- mapResourceRequest, reduceResourceRequest,
|
|
|
+ mapResourceReqt, reduceResourceReqt,
|
|
|
pendingReduces.size(),
|
|
|
maxReduceRampupLimit, reduceSlowStart);
|
|
|
recalculateReduceSchedule = false;
|
|
@@ -273,18 +262,6 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
scheduleStats.log("Final Stats: ");
|
|
|
}
|
|
|
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- AssignedRequests getAssignedRequests() {
|
|
|
- return assignedRequests;
|
|
|
- }
|
|
|
-
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- ScheduledRequests getScheduledRequests() {
|
|
|
- return scheduledRequests;
|
|
|
- }
|
|
|
-
|
|
|
public boolean getIsReduceStarted() {
|
|
|
return reduceStarted;
|
|
|
}
|
|
@@ -320,16 +297,16 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
int supportedMaxContainerCapability =
|
|
|
getMaxContainerCapability().getMemory();
|
|
|
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
|
|
|
- if (mapResourceRequest == 0) {
|
|
|
- mapResourceRequest = reqEvent.getCapability().getMemory();
|
|
|
+ if (mapResourceReqt == 0) {
|
|
|
+ mapResourceReqt = reqEvent.getCapability().getMemory();
|
|
|
eventHandler.handle(new JobHistoryEvent(jobId,
|
|
|
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
|
|
|
- mapResourceRequest)));
|
|
|
- LOG.info("mapResourceRequest:"+ mapResourceRequest);
|
|
|
- if (mapResourceRequest > supportedMaxContainerCapability) {
|
|
|
+ mapResourceReqt)));
|
|
|
+ LOG.info("mapResourceReqt:"+mapResourceReqt);
|
|
|
+ if (mapResourceReqt > supportedMaxContainerCapability) {
|
|
|
String diagMsg = "MAP capability required is more than the supported " +
|
|
|
- "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
|
|
|
- mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
|
|
|
+ "max container capability in the cluster. Killing the Job. mapResourceReqt: " +
|
|
|
+ mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
|
|
|
LOG.info(diagMsg);
|
|
|
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
|
|
jobId, diagMsg));
|
|
@@ -337,20 +314,20 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
//set the rounded off memory
|
|
|
- reqEvent.getCapability().setMemory(mapResourceRequest);
|
|
|
+ reqEvent.getCapability().setMemory(mapResourceReqt);
|
|
|
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
|
|
|
} else {
|
|
|
- if (reduceResourceRequest == 0) {
|
|
|
- reduceResourceRequest = reqEvent.getCapability().getMemory();
|
|
|
+ if (reduceResourceReqt == 0) {
|
|
|
+ reduceResourceReqt = reqEvent.getCapability().getMemory();
|
|
|
eventHandler.handle(new JobHistoryEvent(jobId,
|
|
|
new NormalizedResourceEvent(
|
|
|
org.apache.hadoop.mapreduce.TaskType.REDUCE,
|
|
|
- reduceResourceRequest)));
|
|
|
- LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
|
|
|
- if (reduceResourceRequest > supportedMaxContainerCapability) {
|
|
|
+ reduceResourceReqt)));
|
|
|
+ LOG.info("reduceResourceReqt:"+reduceResourceReqt);
|
|
|
+ if (reduceResourceReqt > supportedMaxContainerCapability) {
|
|
|
String diagMsg = "REDUCE capability required is more than the " +
|
|
|
"supported max container capability in the cluster. Killing the " +
|
|
|
- "Job. reduceResourceRequest: " + reduceResourceRequest +
|
|
|
+ "Job. reduceResourceReqt: " + reduceResourceReqt +
|
|
|
" maxContainerCapability:" + supportedMaxContainerCapability;
|
|
|
LOG.info(diagMsg);
|
|
|
eventHandler.handle(new JobDiagnosticsUpdateEvent(
|
|
@@ -359,7 +336,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
//set the rounded off memory
|
|
|
- reqEvent.getCapability().setMemory(reduceResourceRequest);
|
|
|
+ reqEvent.getCapability().setMemory(reduceResourceReqt);
|
|
|
if (reqEvent.getEarlierAttemptFailed()) {
|
|
|
//add to the front of queue for fail fast
|
|
|
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
|
|
@@ -407,22 +384,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
return host;
|
|
|
}
|
|
|
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- synchronized void setReduceResourceRequest(int mem) {
|
|
|
- this.reduceResourceRequest = mem;
|
|
|
- }
|
|
|
-
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- synchronized void setMapResourceRequest(int mem) {
|
|
|
- this.mapResourceRequest = mem;
|
|
|
- }
|
|
|
-
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- void preemptReducesIfNeeded() {
|
|
|
- if (reduceResourceRequest == 0) {
|
|
|
+ private void preemptReducesIfNeeded() {
|
|
|
+ if (reduceResourceReqt == 0) {
|
|
|
return; //no reduces
|
|
|
}
|
|
|
//check if reduces have taken over the whole cluster and there are
|
|
@@ -430,9 +393,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
if (scheduledRequests.maps.size() > 0) {
|
|
|
int memLimit = getMemLimit();
|
|
|
int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
|
|
|
- assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
|
|
|
+ assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
|
|
|
//availableMemForMap must be sufficient to run atleast 1 map
|
|
|
- if (availableMemForMap < mapResourceRequest) {
|
|
|
+ if (availableMemForMap < mapResourceReqt) {
|
|
|
//to make sure new containers are given to maps and not reduces
|
|
|
//ramp down all scheduled reduces if any
|
|
|
//(since reduces are scheduled at higher priority than maps)
|
|
@@ -442,39 +405,21 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
scheduledRequests.reduces.clear();
|
|
|
|
|
|
- //do further checking to find the number of map requests that were
|
|
|
- //hanging around for a while
|
|
|
- int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
|
|
|
- if (hangingMapRequests > 0) {
|
|
|
- //preempt for making space for at least one map
|
|
|
- int premeptionLimit = Math.max(mapResourceRequest,
|
|
|
- (int) (maxReducePreemptionLimit * memLimit));
|
|
|
-
|
|
|
- int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
|
|
|
- premeptionLimit);
|
|
|
-
|
|
|
- int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
|
|
|
- toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
|
|
|
-
|
|
|
- LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
|
|
|
- assignedRequests.preemptReduce(toPreempt);
|
|
|
- }
|
|
|
+ //preempt for making space for atleast one map
|
|
|
+ int premeptionLimit = Math.max(mapResourceReqt,
|
|
|
+ (int) (maxReducePreemptionLimit * memLimit));
|
|
|
+
|
|
|
+ int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt,
|
|
|
+ premeptionLimit);
|
|
|
+
|
|
|
+ int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
|
|
|
+ toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
|
|
|
+
|
|
|
+ LOG.info("Going to preempt " + toPreempt);
|
|
|
+ assignedRequests.preemptReduce(toPreempt);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
|
|
|
- if (allocationDelayThresholdMs <= 0)
|
|
|
- return requestMap.size();
|
|
|
- int hangingRequests = 0;
|
|
|
- long currTime = clock.getTime();
|
|
|
- for (ContainerRequest request: requestMap.values()) {
|
|
|
- long delay = currTime - request.requestTimeMs;
|
|
|
- if (delay > allocationDelayThresholdMs)
|
|
|
- hangingRequests++;
|
|
|
- }
|
|
|
- return hangingRequests;
|
|
|
- }
|
|
|
|
|
|
@Private
|
|
|
public void scheduleReduces(
|
|
@@ -751,13 +696,11 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
@Private
|
|
|
public int getMemLimit() {
|
|
|
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
|
|
|
- return headRoom + assignedRequests.maps.size() * mapResourceRequest +
|
|
|
- assignedRequests.reduces.size() * reduceResourceRequest;
|
|
|
+ return headRoom + assignedRequests.maps.size() * mapResourceReqt +
|
|
|
+ assignedRequests.reduces.size() * reduceResourceReqt;
|
|
|
}
|
|
|
-
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- class ScheduledRequests {
|
|
|
+
|
|
|
+ private class ScheduledRequests {
|
|
|
|
|
|
private final LinkedList<TaskAttemptId> earlierFailedMaps =
|
|
|
new LinkedList<TaskAttemptId>();
|
|
@@ -767,8 +710,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
new HashMap<String, LinkedList<TaskAttemptId>>();
|
|
|
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
|
|
|
new HashMap<String, LinkedList<TaskAttemptId>>();
|
|
|
- @VisibleForTesting
|
|
|
- final Map<TaskAttemptId, ContainerRequest> maps =
|
|
|
+ private final Map<TaskAttemptId, ContainerRequest> maps =
|
|
|
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
|
|
|
|
|
|
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
|
|
@@ -864,22 +806,22 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
int allocatedMemory = allocated.getResource().getMemory();
|
|
|
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|
|
|
|| PRIORITY_MAP.equals(priority)) {
|
|
|
- if (allocatedMemory < mapResourceRequest
|
|
|
+ if (allocatedMemory < mapResourceReqt
|
|
|
|| maps.isEmpty()) {
|
|
|
LOG.info("Cannot assign container " + allocated
|
|
|
+ " for a map as either "
|
|
|
- + " container memory less than required " + mapResourceRequest
|
|
|
+ + " container memory less than required " + mapResourceReqt
|
|
|
+ " or no pending map tasks - maps.isEmpty="
|
|
|
+ maps.isEmpty());
|
|
|
isAssignable = false;
|
|
|
}
|
|
|
}
|
|
|
else if (PRIORITY_REDUCE.equals(priority)) {
|
|
|
- if (allocatedMemory < reduceResourceRequest
|
|
|
+ if (allocatedMemory < reduceResourceReqt
|
|
|
|| reduces.isEmpty()) {
|
|
|
LOG.info("Cannot assign container " + allocated
|
|
|
+ " for a reduce as either "
|
|
|
- + " container memory less than required " + reduceResourceRequest
|
|
|
+ + " container memory less than required " + reduceResourceReqt
|
|
|
+ " or no pending reduce tasks - reduces.isEmpty="
|
|
|
+ reduces.isEmpty());
|
|
|
isAssignable = false;
|
|
@@ -1158,18 +1100,14 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- @Private
|
|
|
- @VisibleForTesting
|
|
|
- class AssignedRequests {
|
|
|
+ private class AssignedRequests {
|
|
|
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
|
|
|
new HashMap<ContainerId, TaskAttemptId>();
|
|
|
private final LinkedHashMap<TaskAttemptId, Container> maps =
|
|
|
new LinkedHashMap<TaskAttemptId, Container>();
|
|
|
- @VisibleForTesting
|
|
|
- final LinkedHashMap<TaskAttemptId, Container> reduces =
|
|
|
+ private final LinkedHashMap<TaskAttemptId, Container> reduces =
|
|
|
new LinkedHashMap<TaskAttemptId, Container>();
|
|
|
- @VisibleForTesting
|
|
|
- final Set<TaskAttemptId> preemptionWaitingReduces =
|
|
|
+ private final Set<TaskAttemptId> preemptionWaitingReduces =
|
|
|
new HashSet<TaskAttemptId>();
|
|
|
|
|
|
void add(Container container, TaskAttemptId tId) {
|