|
@@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstant
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
@@ -88,6 +89,8 @@ import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* A scheduler that schedules resources between a set of queues. The scheduler
|
|
* A scheduler that schedules resources between a set of queues. The scheduler
|
|
* keeps track of the resources used by each queue, and attempts to maintain
|
|
* keeps track of the resources used by each queue, and attempts to maintain
|
|
@@ -601,12 +604,8 @@ public class FairScheduler implements ResourceScheduler {
|
|
*/
|
|
*/
|
|
protected synchronized void addApplication(
|
|
protected synchronized void addApplication(
|
|
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
|
|
ApplicationAttemptId applicationAttemptId, String queueName, String user) {
|
|
-
|
|
|
|
- FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
|
|
|
|
- if (queue == null) {
|
|
|
|
- // queue is not an existing or createable leaf queue
|
|
|
|
- queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
|
|
|
- }
|
|
|
|
|
|
+ RMApp rmApp = rmContext.getRMApps().get(applicationAttemptId);
|
|
|
|
+ FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
|
|
|
|
|
|
FSSchedulerApp schedulerApp =
|
|
FSSchedulerApp schedulerApp =
|
|
new FSSchedulerApp(applicationAttemptId, user,
|
|
new FSSchedulerApp(applicationAttemptId, user,
|
|
@@ -637,6 +636,27 @@ public class FairScheduler implements ResourceScheduler {
|
|
new RMAppAttemptEvent(applicationAttemptId,
|
|
new RMAppAttemptEvent(applicationAttemptId,
|
|
RMAppAttemptEventType.APP_ACCEPTED));
|
|
RMAppAttemptEventType.APP_ACCEPTED));
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
|
|
|
|
+ // Potentially set queue to username if configured to do so
|
|
|
|
+ if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
|
|
|
|
+ userAsDefaultQueue) {
|
|
|
|
+ queueName = user;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
|
|
|
|
+ if (queue == null) {
|
|
|
|
+ // queue is not an existing or createable leaf queue
|
|
|
|
+ queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (rmApp != null) {
|
|
|
|
+ rmApp.setQueue(queue.getName());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return queue;
|
|
|
|
+ }
|
|
|
|
|
|
private synchronized void removeApplication(
|
|
private synchronized void removeApplication(
|
|
ApplicationAttemptId applicationAttemptId,
|
|
ApplicationAttemptId applicationAttemptId,
|
|
@@ -985,13 +1005,6 @@ public class FairScheduler implements ResourceScheduler {
|
|
}
|
|
}
|
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
|
AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
|
|
String queue = appAddedEvent.getQueue();
|
|
String queue = appAddedEvent.getQueue();
|
|
-
|
|
|
|
- // Potentially set queue to username if configured to do so
|
|
|
|
- String def = YarnConfiguration.DEFAULT_QUEUE_NAME;
|
|
|
|
- if (queue.equals(def) && userAsDefaultQueue) {
|
|
|
|
- queue = appAddedEvent.getUser();
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
|
|
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
|
|
appAddedEvent.getUser());
|
|
appAddedEvent.getUser());
|
|
break;
|
|
break;
|