|
@@ -1735,13 +1735,15 @@ public class FairScheduler extends
|
|
|
String queueName) throws YarnException {
|
|
|
writeLock.lock();
|
|
|
try {
|
|
|
+ // app could have finished between pre check and now
|
|
|
SchedulerApplication<FSAppAttempt> app = applications.get(appId);
|
|
|
if (app == null) {
|
|
|
throw new YarnException("App to be moved " + appId + " not found.");
|
|
|
}
|
|
|
- FSAppAttempt attempt = (FSAppAttempt) app.getCurrentAppAttempt();
|
|
|
- // To serialize with FairScheduler#allocate, synchronize on app attempt
|
|
|
+ FSLeafQueue targetQueue = null;
|
|
|
|
|
|
+ // To serialize with FairScheduler#allocate, synchronize on app attempt
|
|
|
+ FSAppAttempt attempt = app.getCurrentAppAttempt();
|
|
|
attempt.getWriteLock().lock();
|
|
|
try {
|
|
|
FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
|
@@ -1753,7 +1755,9 @@ public class FairScheduler extends
|
|
|
+ " is stopped and can't be moved!");
|
|
|
}
|
|
|
String destQueueName = handleMoveToPlanQueue(queueName);
|
|
|
- FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
|
|
|
+ // Prevent removal of the queue while the move is in progress by
|
|
|
+ // registering the app as submitted to the queue.
|
|
|
+ targetQueue = queueMgr.getLeafQueue(destQueueName, false, appId);
|
|
|
if (targetQueue == null) {
|
|
|
throw new YarnException("Target queue " + queueName
|
|
|
+ " not found or is not a leaf queue.");
|
|
@@ -1766,9 +1770,14 @@ public class FairScheduler extends
|
|
|
verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
|
|
|
}
|
|
|
|
|
|
+ // The move itself will clean up the app submit registration.
|
|
|
executeMove(app, attempt, oldQueue, targetQueue);
|
|
|
return targetQueue.getQueueName();
|
|
|
} finally {
|
|
|
+ // Cleanup the submit registration in case of move failure.
|
|
|
+ if (targetQueue != null) {
|
|
|
+ targetQueue.removeAssignedApp(appId);
|
|
|
+ }
|
|
|
attempt.getWriteLock().unlock();
|
|
|
}
|
|
|
} finally {
|
|
@@ -1776,6 +1785,17 @@ public class FairScheduler extends
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Perform pre-checks while moving the application. This should not check any
|
|
|
+ * application values that can change since the check is not part of an
|
|
|
+ * atomic action. During a move the scheduler can still assign containers and
|
|
|
+ * the app can still be updated until the real move is performed under
|
|
|
+ * proper locking.
|
|
|
+ *
|
|
|
+ * @param appId The ID of the app to be moved
|
|
|
+ * @param newQueue The name of the queue the app should move to
|
|
|
+ * @throws YarnException if the validate fails
|
|
|
+ */
|
|
|
@Override
|
|
|
public void preValidateMoveApplication(ApplicationId appId, String newQueue)
|
|
|
throws YarnException {
|
|
@@ -1786,24 +1806,14 @@ public class FairScheduler extends
|
|
|
throw new YarnException("App to be moved " + appId + " not found.");
|
|
|
}
|
|
|
|
|
|
- FSAppAttempt attempt = app.getCurrentAppAttempt();
|
|
|
- // To serialize with FairScheduler#allocate, synchronize on app attempt
|
|
|
-
|
|
|
- attempt.getWriteLock().lock();
|
|
|
- try {
|
|
|
- FSLeafQueue oldQueue = (FSLeafQueue) app.getQueue();
|
|
|
- String destQueueName = handleMoveToPlanQueue(newQueue);
|
|
|
- FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
|
|
|
- if (targetQueue == null) {
|
|
|
- throw new YarnException("Target queue " + newQueue
|
|
|
- + " not found or is not a leaf queue.");
|
|
|
- }
|
|
|
-
|
|
|
- if (oldQueue.isRunnableApp(attempt)) {
|
|
|
- verifyMoveDoesNotViolateConstraints(attempt, oldQueue, targetQueue);
|
|
|
- }
|
|
|
- } finally {
|
|
|
- attempt.getWriteLock().unlock();
|
|
|
+ // Do not register the app on the new queue: lots of things can still
|
|
|
+ // change between this check and the real move and unregistering the move
|
|
|
+ // becomes a problem.
|
|
|
+ String destQueueName = handleMoveToPlanQueue(newQueue);
|
|
|
+ FSLeafQueue targetQueue = queueMgr.getLeafQueue(destQueueName, false);
|
|
|
+ if (targetQueue == null) {
|
|
|
+ throw new YarnException("Target queue " + newQueue
|
|
|
+ + " not found or is not a leaf queue.");
|
|
|
}
|
|
|
} finally {
|
|
|
writeLock.unlock();
|