|
@@ -360,7 +360,8 @@ public class FifoScheduler extends AbstractYarnScheduler implements
|
|
return nodes.get(nodeId);
|
|
return nodes.get(nodeId);
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void addApplication(ApplicationId applicationId,
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public synchronized void addApplication(ApplicationId applicationId,
|
|
String queue, String user) {
|
|
String queue, String user) {
|
|
SchedulerApplication application =
|
|
SchedulerApplication application =
|
|
new SchedulerApplication(DEFAULT_QUEUE, user);
|
|
new SchedulerApplication(DEFAULT_QUEUE, user);
|
|
@@ -372,7 +373,8 @@ public class FifoScheduler extends AbstractYarnScheduler implements
|
|
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
|
.handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
|
|
}
|
|
}
|
|
|
|
|
|
- private synchronized void
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public synchronized void
|
|
addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
|
addApplicationAttempt(ApplicationAttemptId appAttemptId,
|
|
boolean transferStateFromPreviousAttempt) {
|
|
boolean transferStateFromPreviousAttempt) {
|
|
SchedulerApplication application =
|
|
SchedulerApplication application =
|
|
@@ -458,6 +460,9 @@ public class FifoScheduler extends AbstractYarnScheduler implements
|
|
.entrySet()) {
|
|
.entrySet()) {
|
|
FiCaSchedulerApp application =
|
|
FiCaSchedulerApp application =
|
|
(FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
|
|
(FiCaSchedulerApp) e.getValue().getCurrentAppAttempt();
|
|
|
|
+ if (application == null) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
LOG.debug("pre-assignContainers");
|
|
LOG.debug("pre-assignContainers");
|
|
application.showRequests();
|
|
application.showRequests();
|
|
synchronized (application) {
|
|
synchronized (application) {
|
|
@@ -497,6 +502,9 @@ public class FifoScheduler extends AbstractYarnScheduler implements
|
|
for (SchedulerApplication application : applications.values()) {
|
|
for (SchedulerApplication application : applications.values()) {
|
|
FiCaSchedulerApp attempt =
|
|
FiCaSchedulerApp attempt =
|
|
(FiCaSchedulerApp) application.getCurrentAppAttempt();
|
|
(FiCaSchedulerApp) application.getCurrentAppAttempt();
|
|
|
|
+ if (attempt == null) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
|
|
attempt.setHeadroom(Resources.subtract(clusterResource, usedResource));
|
|
}
|
|
}
|
|
}
|
|
}
|