|
@@ -67,7 +67,6 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.Dispatcher;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
-import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier;
|
|
@@ -83,8 +82,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
|
|
|
|
|
+
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.placement
|
|
|
|
+ .ApplicationPlacementContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
@@ -204,6 +204,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
|
|
|
private CallerContext callerContext;
|
|
private CallerContext callerContext;
|
|
|
|
|
|
|
|
+ private ApplicationPlacementContext placementContext;
|
|
|
|
+
|
|
Object transitionTodo;
|
|
Object transitionTodo;
|
|
|
|
|
|
private Priority applicationPriority;
|
|
private Priority applicationPriority;
|
|
@@ -417,7 +419,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
List<ResourceRequest> amReqs) {
|
|
List<ResourceRequest> amReqs) {
|
|
this(applicationId, rmContext, config, name, user, queue, submissionContext,
|
|
this(applicationId, rmContext, config, name, user, queue, submissionContext,
|
|
scheduler, masterService, submitTime, applicationType, applicationTags,
|
|
scheduler, masterService, submitTime, applicationType, applicationTags,
|
|
- amReqs, -1);
|
|
|
|
|
|
+ amReqs, null, -1);
|
|
}
|
|
}
|
|
|
|
|
|
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
|
public RMAppImpl(ApplicationId applicationId, RMContext rmContext,
|
|
@@ -425,7 +427,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
|
|
ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
|
|
ApplicationMasterService masterService, long submitTime,
|
|
ApplicationMasterService masterService, long submitTime,
|
|
String applicationType, Set<String> applicationTags,
|
|
String applicationType, Set<String> applicationTags,
|
|
- List<ResourceRequest> amReqs, long startTime) {
|
|
|
|
|
|
+ List<ResourceRequest> amReqs, ApplicationPlacementContext
|
|
|
|
+ placementContext, long startTime) {
|
|
|
|
|
|
this.systemClock = SystemClock.getInstance();
|
|
this.systemClock = SystemClock.getInstance();
|
|
|
|
|
|
@@ -484,6 +487,8 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|
|
|
|
this.callerContext = CallerContext.getCurrent();
|
|
this.callerContext = CallerContext.getCurrent();
|
|
|
|
|
|
|
|
+ this.placementContext = placementContext;
|
|
|
|
+
|
|
long localLogAggregationStatusTimeout =
|
|
long localLogAggregationStatusTimeout =
|
|
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
|
conf.getLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
|
|
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
|
YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
|
|
@@ -1098,22 +1103,12 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- ApplicationPlacementContext placementContext = null;
|
|
|
|
- try {
|
|
|
|
- placementContext = placeApplication(app.rmContext,
|
|
|
|
- app.submissionContext, app.user);
|
|
|
|
- } catch (Exception e) {
|
|
|
|
- String msg = "Failed to place application to queue :" + e.getMessage();
|
|
|
|
- app.diagnostics.append(msg);
|
|
|
|
- LOG.error(msg, e);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
// No existent attempts means the attempt associated with this app was not
|
|
// No existent attempts means the attempt associated with this app was not
|
|
// started or started but not yet saved.
|
|
// started or started but not yet saved.
|
|
if (app.attempts.isEmpty()) {
|
|
if (app.attempts.isEmpty()) {
|
|
app.scheduler.handle(
|
|
app.scheduler.handle(
|
|
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
|
|
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
|
|
- app.applicationPriority, placementContext));
|
|
|
|
|
|
+ app.applicationPriority, app.placementContext));
|
|
return RMAppState.SUBMITTED;
|
|
return RMAppState.SUBMITTED;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -1121,7 +1116,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
// knows applications before AM or NM re-registers.
|
|
// knows applications before AM or NM re-registers.
|
|
app.scheduler.handle(
|
|
app.scheduler.handle(
|
|
new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
|
|
new AppAddedSchedulerEvent(app.user, app.submissionContext, true,
|
|
- app.applicationPriority, placementContext));
|
|
|
|
|
|
+ app.applicationPriority, app.placementContext));
|
|
|
|
|
|
// recover attempts
|
|
// recover attempts
|
|
app.recoverAppAttempts();
|
|
app.recoverAppAttempts();
|
|
@@ -1137,20 +1132,9 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
RMAppTransition {
|
|
RMAppTransition {
|
|
@Override
|
|
@Override
|
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
|
public void transition(RMAppImpl app, RMAppEvent event) {
|
|
- ApplicationPlacementContext placementContext = null;
|
|
|
|
- try {
|
|
|
|
- placementContext = placeApplication(app.rmContext,
|
|
|
|
- app.submissionContext, app.user);
|
|
|
|
- replaceQueueFromPlacementContext(placementContext,
|
|
|
|
- app.submissionContext);
|
|
|
|
- } catch (YarnException e) {
|
|
|
|
- String msg = "Failed to place application to queue :" + e.getMessage();
|
|
|
|
- app.diagnostics.append(msg);
|
|
|
|
- LOG.error(msg, e);
|
|
|
|
- }
|
|
|
|
app.handler.handle(
|
|
app.handler.handle(
|
|
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
|
|
new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
|
|
- app.applicationPriority, placementContext));
|
|
|
|
|
|
+ app.applicationPriority, app.placementContext));
|
|
// send the ATS create Event
|
|
// send the ATS create Event
|
|
app.sendATSCreateEvent();
|
|
app.sendATSCreateEvent();
|
|
}
|
|
}
|
|
@@ -1624,6 +1608,11 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
|| appState == RMAppState.KILLING;
|
|
|| appState == RMAppState.KILLING;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public ApplicationPlacementContext getApplicationPlacementContext() {
|
|
|
|
+ return placementContext;
|
|
|
|
+ }
|
|
|
|
+
|
|
public RMAppState getRecoveredFinalState() {
|
|
public RMAppState getRecoveredFinalState() {
|
|
return this.recoveredFinalState;
|
|
return this.recoveredFinalState;
|
|
}
|
|
}
|
|
@@ -2046,37 +2035,4 @@ public class RMAppImpl implements RMApp, Recoverable {
|
|
this.submissionContext.setAMContainerSpec(null);
|
|
this.submissionContext.setAMContainerSpec(null);
|
|
this.submissionContext.setLogAggregationContext(null);
|
|
this.submissionContext.setLogAggregationContext(null);
|
|
}
|
|
}
|
|
-
|
|
|
|
- @VisibleForTesting
|
|
|
|
- static ApplicationPlacementContext placeApplication(RMContext rmContext,
|
|
|
|
- ApplicationSubmissionContext context, String user) throws YarnException {
|
|
|
|
-
|
|
|
|
- ApplicationPlacementContext placementContext = null;
|
|
|
|
- PlacementManager placementManager = rmContext.getQueuePlacementManager();
|
|
|
|
-
|
|
|
|
- if (placementManager != null) {
|
|
|
|
- placementContext = placementManager.placeApplication(context, user);
|
|
|
|
- } else{
|
|
|
|
- LOG.error(
|
|
|
|
- "Queue Placement Manager is null. Cannot place application :" + " "
|
|
|
|
- + context.getApplicationId() + " to queue ");
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- return placementContext;
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- static void replaceQueueFromPlacementContext(
|
|
|
|
- ApplicationPlacementContext placementContext,
|
|
|
|
- ApplicationSubmissionContext context) {
|
|
|
|
- // Set it to ApplicationSubmissionContext
|
|
|
|
- //apply queue mapping only to new application submissions
|
|
|
|
- if (placementContext != null && !StringUtils.equals(context.getQueue(),
|
|
|
|
- placementContext.getQueue())) {
|
|
|
|
- LOG.info("Placed application=" + context.getApplicationId() + " to queue="
|
|
|
|
- + placementContext.getQueue() + ", original queue=" + context
|
|
|
|
- .getQueue());
|
|
|
|
- context.setQueue(placementContext.getQueue());
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
}
|
|
}
|