|
@@ -57,6 +57,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdate
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
|
|
|
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
|
|
|
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.AMPreemptionPolicy;
|
|
|
import org.apache.hadoop.util.StringInterner;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
@@ -67,6 +68,7 @@ import org.apache.hadoop.yarn.api.records.NMToken;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.client.api.NMTokenCache;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
@@ -147,13 +149,17 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
private long retryInterval;
|
|
|
private long retrystartTime;
|
|
|
|
|
|
+ private final AMPreemptionPolicy preemptionPolicy;
|
|
|
+
|
|
|
BlockingQueue<ContainerAllocatorEvent> eventQueue
|
|
|
= new LinkedBlockingQueue<ContainerAllocatorEvent>();
|
|
|
|
|
|
private ScheduleStats scheduleStats = new ScheduleStats();
|
|
|
|
|
|
- public RMContainerAllocator(ClientService clientService, AppContext context) {
|
|
|
+ public RMContainerAllocator(ClientService clientService, AppContext context,
|
|
|
+ AMPreemptionPolicy preemptionPolicy) {
|
|
|
super(clientService, context);
|
|
|
+ this.preemptionPolicy = preemptionPolicy;
|
|
|
this.stopped = new AtomicBoolean(false);
|
|
|
}
|
|
|
|
|
@@ -361,11 +367,15 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
LOG.error("Could not deallocate container for task attemptId " +
|
|
|
aId);
|
|
|
}
|
|
|
+ preemptionPolicy.handleCompletedContainer(event.getAttemptID());
|
|
|
} else if (
|
|
|
event.getType() == ContainerAllocator.EventType.CONTAINER_FAILED) {
|
|
|
ContainerFailedEvent fEv = (ContainerFailedEvent) event;
|
|
|
String host = getHost(fEv.getContMgrAddress());
|
|
|
containerFailedOnHost(host);
|
|
|
+ // propagate failures to preemption policy to discard checkpoints for
|
|
|
+ // failed tasks
|
|
|
+ preemptionPolicy.handleFailedContainer(event.getAttemptID());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -399,7 +409,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
scheduledRequests.reduces.clear();
|
|
|
|
|
|
- //preempt for making space for atleast one map
|
|
|
+ //preempt for making space for at least one map
|
|
|
int premeptionLimit = Math.max(mapResourceReqt,
|
|
|
(int) (maxReducePreemptionLimit * memLimit));
|
|
|
|
|
@@ -409,7 +419,7 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
|
|
|
toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
|
|
|
|
|
|
- LOG.info("Going to preempt " + toPreempt);
|
|
|
+ LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
|
|
|
assignedRequests.preemptReduce(toPreempt);
|
|
|
}
|
|
|
}
|
|
@@ -595,6 +605,14 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
}
|
|
|
|
|
|
List<ContainerStatus> finishedContainers = response.getCompletedContainersStatuses();
|
|
|
+
|
|
|
+ // propagate preemption requests
|
|
|
+ final PreemptionMessage preemptReq = response.getPreemptionMessage();
|
|
|
+ if (preemptReq != null) {
|
|
|
+ preemptionPolicy.preempt(
|
|
|
+ new PreemptionContext(assignedRequests), preemptReq);
|
|
|
+ }
|
|
|
+
|
|
|
if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
|
|
|
//something changed
|
|
|
recalculateReduceSchedule = true;
|
|
@@ -630,7 +648,9 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
String diagnostics = StringInterner.weakIntern(cont.getDiagnostics());
|
|
|
eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID,
|
|
|
diagnostics));
|
|
|
- }
|
|
|
+
|
|
|
+ preemptionPolicy.handleCompletedContainer(attemptID);
|
|
|
+ }
|
|
|
}
|
|
|
return newContainers;
|
|
|
}
|
|
@@ -1232,4 +1252,27 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|
|
" RackLocal:" + rackLocalAssigned);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ static class PreemptionContext extends AMPreemptionPolicy.Context {
|
|
|
+ final AssignedRequests reqs;
|
|
|
+
|
|
|
+ PreemptionContext(AssignedRequests reqs) {
|
|
|
+ this.reqs = reqs;
|
|
|
+ }
|
|
|
+ @Override
|
|
|
+ public TaskAttemptId getTaskAttempt(ContainerId container) {
|
|
|
+ return reqs.get(container);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Container> getContainers(TaskType t){
|
|
|
+ if(TaskType.REDUCE.equals(t))
|
|
|
+ return new ArrayList<Container>(reqs.reduces.values());
|
|
|
+ if(TaskType.MAP.equals(t))
|
|
|
+ return new ArrayList<Container>(reqs.maps.values());
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
}
|