|
@@ -125,9 +125,13 @@ class FSPreemptionThread extends Thread {
|
|
for (RMContainer container : containers) {
|
|
for (RMContainer container : containers) {
|
|
FSAppAttempt app = scheduler.getSchedulerApp(
|
|
FSAppAttempt app = scheduler.getSchedulerApp(
|
|
container.getApplicationAttemptId());
|
|
container.getApplicationAttemptId());
|
|
- LOG.info("Preempting container " + container +
|
|
|
|
- " from queue " + app.getQueueName());
|
|
|
|
- app.trackContainerForPreemption(container);
|
|
|
|
|
|
+ LOG.info("Preempting container " + container + " from queue: "
|
|
|
|
+ + (app != null ? app.getQueueName() : "unknown"));
|
|
|
|
+ // If the app has unregistered while building the container list
|
|
|
|
+ // the app might be null, skip notifying the app
|
|
|
|
+ if (app != null) {
|
|
|
|
+ app.trackContainerForPreemption(container);
|
|
|
|
+ }
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
@@ -189,6 +193,13 @@ class FSPreemptionThread extends Thread {
|
|
for (RMContainer container : containersToCheck) {
|
|
for (RMContainer container : containersToCheck) {
|
|
FSAppAttempt app =
|
|
FSAppAttempt app =
|
|
scheduler.getSchedulerApp(container.getApplicationAttemptId());
|
|
scheduler.getSchedulerApp(container.getApplicationAttemptId());
|
|
|
|
+ // If the app has unregistered while building the container list the app
|
|
|
|
+ // might be null, just skip this container: it should be cleaned up soon
|
|
|
|
+ if (app == null) {
|
|
|
|
+ LOG.info("Found container " + container + " on node "
|
|
|
|
+ + node.getNodeName() + "without app, skipping preemption");
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
ApplicationId appId = app.getApplicationId();
|
|
ApplicationId appId = app.getApplicationId();
|
|
|
|
|
|
if (app.canContainerBePreempted(container,
|
|
if (app.canContainerBePreempted(container,
|