|
@@ -66,11 +66,11 @@ class FSPreemptionThread extends Thread {
|
|
|
schedulerReadLock = scheduler.getSchedulerReadLock();
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public void run() {
|
|
|
while (!Thread.interrupted()) {
|
|
|
- FSAppAttempt starvedApp;
|
|
|
- try{
|
|
|
- starvedApp = context.getStarvedApps().take();
|
|
|
+ try {
|
|
|
+ FSAppAttempt starvedApp = context.getStarvedApps().take();
|
|
|
// Hold the scheduler readlock so this is not concurrent with the
|
|
|
// update thread.
|
|
|
schedulerReadLock.lock();
|
|
@@ -82,7 +82,7 @@ class FSPreemptionThread extends Thread {
|
|
|
starvedApp.preemptionTriggered(delayBeforeNextStarvationCheck);
|
|
|
} catch (InterruptedException e) {
|
|
|
LOG.info("Preemption thread interrupted! Exiting.");
|
|
|
- return;
|
|
|
+ Thread.currentThread().interrupt();
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -112,16 +112,19 @@ class FSPreemptionThread extends Thread {
|
|
|
PreemptableContainers bestContainers = null;
|
|
|
List<FSSchedulerNode> potentialNodes = scheduler.getNodeTracker()
|
|
|
.getNodesByResourceName(rr.getResourceName());
|
|
|
+ int maxAMContainers = Integer.MAX_VALUE;
|
|
|
+
|
|
|
for (FSSchedulerNode node : potentialNodes) {
|
|
|
- int maxAMContainers = bestContainers == null ?
|
|
|
- Integer.MAX_VALUE : bestContainers.numAMContainers;
|
|
|
PreemptableContainers preemptableContainers =
|
|
|
identifyContainersToPreemptOnNode(
|
|
|
rr.getCapability(), node, maxAMContainers);
|
|
|
+
|
|
|
if (preemptableContainers != null) {
|
|
|
// This set is better than any previously identified set.
|
|
|
bestContainers = preemptableContainers;
|
|
|
- if (preemptableContainers.numAMContainers == 0) {
|
|
|
+ maxAMContainers = bestContainers.numAMContainers;
|
|
|
+
|
|
|
+ if (maxAMContainers == 0) {
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -182,13 +185,10 @@ class FSPreemptionThread extends Thread {
|
|
|
return preemptableContainers;
|
|
|
}
|
|
|
}
|
|
|
- return null;
|
|
|
- }
|
|
|
|
|
|
- private boolean isNodeAlreadyReserved(
|
|
|
- FSSchedulerNode node, FSAppAttempt app) {
|
|
|
- FSAppAttempt nodeReservedApp = node.getReservedAppSchedulable();
|
|
|
- return nodeReservedApp != null && !nodeReservedApp.equals(app);
|
|
|
+ // Return null if the sum of all preemptable containers' resources
|
|
|
+ // isn't enough to satisfy the starved request.
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
private void trackPreemptionsAgainstNode(List<RMContainer> containers,
|
|
@@ -214,7 +214,7 @@ class FSPreemptionThread extends Thread {
|
|
|
}
|
|
|
|
|
|
private class PreemptContainersTask extends TimerTask {
|
|
|
- private List<RMContainer> containers;
|
|
|
+ private final List<RMContainer> containers;
|
|
|
|
|
|
PreemptContainersTask(List<RMContainer> containers) {
|
|
|
this.containers = containers;
|