|
@@ -21,12 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
import java.io.IOException;
|
|
|
import java.io.InputStream;
|
|
|
import java.net.InetSocketAddress;
|
|
|
-import java.net.UnknownHostException;
|
|
|
import java.util.ArrayList;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashSet;
|
|
|
import java.util.List;
|
|
|
-import java.util.Set;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
@@ -37,10 +33,10 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
|
|
import org.apache.hadoop.ipc.Server;
|
|
|
import org.apache.hadoop.security.SaslRpcServer;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.authorize.PolicyProvider;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
@@ -52,30 +48,11 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
|
|
|
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
|
|
-import org.apache.hadoop.yarn.api.records.NMToken;
|
|
|
-import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
-import org.apache.hadoop.yarn.api.records.NodeReport;
|
|
|
-import org.apache.hadoop.yarn.api.records.PreemptionContainer;
|
|
|
-import org.apache.hadoop.yarn.api.records.PreemptionContract;
|
|
|
-import org.apache.hadoop.yarn.api.records.PreemptionMessage;
|
|
|
-import org.apache.hadoop.yarn.api.records.PreemptionResourceRequest;
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
-import org.apache.hadoop.yarn.api.records.StrictPreemptionContract;
|
|
|
-import org.apache.hadoop.yarn.api.records.UpdateContainerError;
|
|
|
-import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
|
|
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
|
|
import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException;
|
|
|
-import org.apache.hadoop.yarn.exceptions.InvalidContainerReleaseException;
|
|
|
-import org.apache.hadoop.yarn.exceptions.InvalidResourceBlacklistRequestException;
|
|
|
-import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
@@ -86,25 +63,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRegistrationEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptStatusupdateEvent;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
|
|
|
-
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
- .AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
|
|
- .SchedulerApplicationAttempt;
|
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security
|
|
|
.AMRMTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
|
|
|
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
|
|
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
|
@@ -124,6 +88,12 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
private final ConcurrentMap<ApplicationAttemptId, AllocateResponseLock> responseMap =
|
|
|
new ConcurrentHashMap<ApplicationAttemptId, AllocateResponseLock>();
|
|
|
protected final RMContext rmContext;
|
|
|
+ private final ApplicationMasterServiceProcessor amsProcessor;
|
|
|
+
|
|
|
+ public ApplicationMasterService(RMContext rmContext,
|
|
|
+ YarnScheduler scheduler) {
|
|
|
+ this(ApplicationMasterService.class.getName(), rmContext, scheduler);
|
|
|
+ }
|
|
|
|
|
|
public ApplicationMasterService(String name, RMContext rmContext,
|
|
|
YarnScheduler scheduler) {
|
|
@@ -131,11 +101,11 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
this.amLivelinessMonitor = rmContext.getAMLivelinessMonitor();
|
|
|
this.rScheduler = scheduler;
|
|
|
this.rmContext = rmContext;
|
|
|
+ this.amsProcessor = createProcessor();
|
|
|
}
|
|
|
|
|
|
- public ApplicationMasterService(RMContext rmContext,
|
|
|
- YarnScheduler scheduler) {
|
|
|
- this(ApplicationMasterService.class.getName(), rmContext, scheduler);
|
|
|
+ protected ApplicationMasterServiceProcessor createProcessor() {
|
|
|
+ return new DefaultAMSProcessor(rmContext, rScheduler);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -230,82 +200,22 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
+ appID;
|
|
|
LOG.warn(message);
|
|
|
RMAuditLogger.logFailure(
|
|
|
- this.rmContext.getRMApps()
|
|
|
- .get(appID).getUser(),
|
|
|
- AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
|
|
|
- appID, applicationAttemptId);
|
|
|
+ this.rmContext.getRMApps()
|
|
|
+ .get(appID).getUser(),
|
|
|
+ AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
|
|
|
+ appID, applicationAttemptId);
|
|
|
throw new InvalidApplicationMasterRequestException(message);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
|
|
- RMApp app = this.rmContext.getRMApps().get(appID);
|
|
|
-
|
|
|
+
|
|
|
// Setting the response id to 0 to identify if the
|
|
|
// application master is register for the respective attemptid
|
|
|
lastResponse.setResponseId(0);
|
|
|
lock.setAllocateResponse(lastResponse);
|
|
|
- LOG.info("AM registration " + applicationAttemptId);
|
|
|
- this.rmContext
|
|
|
- .getDispatcher()
|
|
|
- .getEventHandler()
|
|
|
- .handle(
|
|
|
- new RMAppAttemptRegistrationEvent(applicationAttemptId, request
|
|
|
- .getHost(), request.getRpcPort(), request.getTrackingUrl()));
|
|
|
- RMAuditLogger.logSuccess(app.getUser(), AuditConstants.REGISTER_AM,
|
|
|
- "ApplicationMasterService", appID, applicationAttemptId);
|
|
|
-
|
|
|
- // Pick up min/max resource from scheduler...
|
|
|
- RegisterApplicationMasterResponse response = recordFactory
|
|
|
- .newRecordInstance(RegisterApplicationMasterResponse.class);
|
|
|
- response.setMaximumResourceCapability(rScheduler
|
|
|
- .getMaximumResourceCapability(app.getQueue()));
|
|
|
- response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
|
|
|
- .getSubmissionContext().getAMContainerSpec().getApplicationACLs());
|
|
|
- response.setQueue(app.getQueue());
|
|
|
- if (UserGroupInformation.isSecurityEnabled()) {
|
|
|
- LOG.info("Setting client token master key");
|
|
|
- response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext
|
|
|
- .getClientToAMTokenSecretManager()
|
|
|
- .getMasterKey(applicationAttemptId).getEncoded()));
|
|
|
- }
|
|
|
-
|
|
|
- // For work-preserving AM restart, retrieve previous attempts' containers
|
|
|
- // and corresponding NM tokens.
|
|
|
- if (app.getApplicationSubmissionContext()
|
|
|
- .getKeepContainersAcrossApplicationAttempts()) {
|
|
|
- List<Container> transferredContainers = rScheduler
|
|
|
- .getTransferredContainers(applicationAttemptId);
|
|
|
- if (!transferredContainers.isEmpty()) {
|
|
|
- response.setContainersFromPreviousAttempts(transferredContainers);
|
|
|
- List<NMToken> nmTokens = new ArrayList<NMToken>();
|
|
|
- for (Container container : transferredContainers) {
|
|
|
- try {
|
|
|
- NMToken token = rmContext.getNMTokenSecretManager()
|
|
|
- .createAndGetNMToken(app.getUser(), applicationAttemptId,
|
|
|
- container);
|
|
|
- if (null != token) {
|
|
|
- nmTokens.add(token);
|
|
|
- }
|
|
|
- } catch (IllegalArgumentException e) {
|
|
|
- // if it's a DNS issue, throw UnknowHostException directly and
|
|
|
- // that
|
|
|
- // will be automatically retried by RMProxy in RPC layer.
|
|
|
- if (e.getCause() instanceof UnknownHostException) {
|
|
|
- throw (UnknownHostException) e.getCause();
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- response.setNMTokensFromPreviousAttempts(nmTokens);
|
|
|
- LOG.info("Application " + appID + " retrieved "
|
|
|
- + transferredContainers.size() + " containers from previous"
|
|
|
- + " attempts and " + nmTokens.size() + " NM tokens.");
|
|
|
- }
|
|
|
- }
|
|
|
|
|
|
- response.setSchedulerResourceTypes(rScheduler
|
|
|
- .getSchedulingResourceTypes());
|
|
|
-
|
|
|
- return response;
|
|
|
+ return this.amsProcessor.registerApplicationMaster(
|
|
|
+ amrmTokenIdentifier.getApplicationAttemptId(), request);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -350,15 +260,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
}
|
|
|
|
|
|
this.amLivelinessMonitor.receivedPing(applicationAttemptId);
|
|
|
-
|
|
|
- rmContext.getDispatcher().getEventHandler().handle(
|
|
|
- new RMAppAttemptUnregistrationEvent(applicationAttemptId, request
|
|
|
- .getTrackingUrl(), request.getFinalApplicationStatus(), request
|
|
|
- .getDiagnostics()));
|
|
|
-
|
|
|
- // For UnmanagedAMs, return true so they don't retry
|
|
|
- return FinishApplicationMasterResponse.newInstance(
|
|
|
- rmApp.getApplicationSubmissionContext().getUnmanagedAM());
|
|
|
+ return this.amsProcessor.finishApplicationMaster(
|
|
|
+ applicationAttemptId, request);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -438,10 +341,8 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
throw new InvalidApplicationMasterRequestException(message);
|
|
|
}
|
|
|
|
|
|
- AllocateResponse response =
|
|
|
- recordFactory.newRecordInstance(AllocateResponse.class);
|
|
|
- allocateInternal(amrmTokenIdentifier.getApplicationAttemptId(),
|
|
|
- request, response);
|
|
|
+ AllocateResponse response = this.amsProcessor.allocate(
|
|
|
+ amrmTokenIdentifier.getApplicationAttemptId(), request);
|
|
|
|
|
|
// update AMRMToken if the token is rolled-up
|
|
|
MasterKeyData nextMasterKey =
|
|
@@ -477,288 +378,7 @@ public class ApplicationMasterService extends AbstractService implements
|
|
|
response.setResponseId(lastResponse.getResponseId() + 1);
|
|
|
lock.setAllocateResponse(response);
|
|
|
return response;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected void allocateInternal(ApplicationAttemptId appAttemptId,
|
|
|
- AllocateRequest request, AllocateResponse allocateResponse)
|
|
|
- throws YarnException {
|
|
|
-
|
|
|
- //filter illegal progress values
|
|
|
- float filteredProgress = request.getProgress();
|
|
|
- if (Float.isNaN(filteredProgress) ||
|
|
|
- filteredProgress == Float.NEGATIVE_INFINITY ||
|
|
|
- filteredProgress < 0) {
|
|
|
- request.setProgress(0);
|
|
|
- } else if (filteredProgress > 1 ||
|
|
|
- filteredProgress == Float.POSITIVE_INFINITY) {
|
|
|
- request.setProgress(1);
|
|
|
- }
|
|
|
-
|
|
|
- // Send the status update to the appAttempt.
|
|
|
- this.rmContext.getDispatcher().getEventHandler().handle(
|
|
|
- new RMAppAttemptStatusupdateEvent(appAttemptId, request
|
|
|
- .getProgress()));
|
|
|
-
|
|
|
- List<ResourceRequest> ask = request.getAskList();
|
|
|
- List<ContainerId> release = request.getReleaseList();
|
|
|
-
|
|
|
- ResourceBlacklistRequest blacklistRequest =
|
|
|
- request.getResourceBlacklistRequest();
|
|
|
- List<String> blacklistAdditions =
|
|
|
- (blacklistRequest != null) ?
|
|
|
- blacklistRequest.getBlacklistAdditions() : Collections.EMPTY_LIST;
|
|
|
- List<String> blacklistRemovals =
|
|
|
- (blacklistRequest != null) ?
|
|
|
- blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
|
|
|
- RMApp app =
|
|
|
- this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
|
|
|
-
|
|
|
- // set label expression for Resource Requests if resourceName=ANY
|
|
|
- ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
|
|
|
- for (ResourceRequest req : ask) {
|
|
|
- if (null == req.getNodeLabelExpression()
|
|
|
- && ResourceRequest.ANY.equals(req.getResourceName())) {
|
|
|
- req.setNodeLabelExpression(asc.getNodeLabelExpression());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- Resource maximumCapacity = rScheduler.getMaximumResourceCapability();
|
|
|
-
|
|
|
- // sanity check
|
|
|
- try {
|
|
|
- RMServerUtils.normalizeAndValidateRequests(ask,
|
|
|
- maximumCapacity, app.getQueue(),
|
|
|
- rScheduler, rmContext);
|
|
|
- } catch (InvalidResourceRequestException e) {
|
|
|
- LOG.warn("Invalid resource ask by application " + appAttemptId, e);
|
|
|
- throw e;
|
|
|
- }
|
|
|
-
|
|
|
- try {
|
|
|
- RMServerUtils.validateBlacklistRequest(blacklistRequest);
|
|
|
- } catch (InvalidResourceBlacklistRequestException e) {
|
|
|
- LOG.warn("Invalid blacklist request by application " + appAttemptId, e);
|
|
|
- throw e;
|
|
|
}
|
|
|
-
|
|
|
- // In the case of work-preserving AM restart, it's possible for the
|
|
|
- // AM to release containers from the earlier attempt.
|
|
|
- if (!app.getApplicationSubmissionContext()
|
|
|
- .getKeepContainersAcrossApplicationAttempts()) {
|
|
|
- try {
|
|
|
- RMServerUtils.validateContainerReleaseRequest(release, appAttemptId);
|
|
|
- } catch (InvalidContainerReleaseException e) {
|
|
|
- LOG.warn("Invalid container release by application " + appAttemptId,
|
|
|
- e);
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Split Update Resource Requests into increase and decrease.
|
|
|
- // No Exceptions are thrown here. All update errors are aggregated
|
|
|
- // and returned to the AM.
|
|
|
- List<UpdateContainerError> updateErrors = new ArrayList<>();
|
|
|
- ContainerUpdates containerUpdateRequests =
|
|
|
- RMServerUtils.validateAndSplitUpdateResourceRequests(
|
|
|
- rmContext, request, maximumCapacity, updateErrors);
|
|
|
-
|
|
|
- // Send new requests to appAttempt.
|
|
|
- Allocation allocation;
|
|
|
- RMAppAttemptState state =
|
|
|
- app.getRMAppAttempt(appAttemptId).getAppAttemptState();
|
|
|
- if (state.equals(RMAppAttemptState.FINAL_SAVING) ||
|
|
|
- state.equals(RMAppAttemptState.FINISHING) ||
|
|
|
- app.isAppFinalStateStored()) {
|
|
|
- LOG.warn(appAttemptId + " is in " + state +
|
|
|
- " state, ignore container allocate request.");
|
|
|
- allocation = EMPTY_ALLOCATION;
|
|
|
- } else {
|
|
|
- allocation =
|
|
|
- this.rScheduler.allocate(appAttemptId, ask, release,
|
|
|
- blacklistAdditions, blacklistRemovals,
|
|
|
- containerUpdateRequests);
|
|
|
- }
|
|
|
-
|
|
|
- if (!blacklistAdditions.isEmpty() || !blacklistRemovals.isEmpty()) {
|
|
|
- LOG.info("blacklist are updated in Scheduler." +
|
|
|
- "blacklistAdditions: " + blacklistAdditions + ", " +
|
|
|
- "blacklistRemovals: " + blacklistRemovals);
|
|
|
- }
|
|
|
- RMAppAttempt appAttempt = app.getRMAppAttempt(appAttemptId);
|
|
|
-
|
|
|
- if (allocation.getNMTokens() != null &&
|
|
|
- !allocation.getNMTokens().isEmpty()) {
|
|
|
- allocateResponse.setNMTokens(allocation.getNMTokens());
|
|
|
- }
|
|
|
-
|
|
|
- // Notify the AM of container update errors
|
|
|
- addToUpdateContainerErrors(allocateResponse, updateErrors);
|
|
|
-
|
|
|
- // update the response with the deltas of node status changes
|
|
|
- List<RMNode> updatedNodes = new ArrayList<RMNode>();
|
|
|
- if(app.pullRMNodeUpdates(updatedNodes) > 0) {
|
|
|
- List<NodeReport> updatedNodeReports = new ArrayList<NodeReport>();
|
|
|
- for(RMNode rmNode: updatedNodes) {
|
|
|
- SchedulerNodeReport schedulerNodeReport =
|
|
|
- rScheduler.getNodeReport(rmNode.getNodeID());
|
|
|
- Resource used = BuilderUtils.newResource(0, 0);
|
|
|
- int numContainers = 0;
|
|
|
- if (schedulerNodeReport != null) {
|
|
|
- used = schedulerNodeReport.getUsedResource();
|
|
|
- numContainers = schedulerNodeReport.getNumContainers();
|
|
|
- }
|
|
|
- NodeId nodeId = rmNode.getNodeID();
|
|
|
- NodeReport report =
|
|
|
- BuilderUtils.newNodeReport(nodeId, rmNode.getState(),
|
|
|
- rmNode.getHttpAddress(), rmNode.getRackName(), used,
|
|
|
- rmNode.getTotalCapability(), numContainers,
|
|
|
- rmNode.getHealthReport(), rmNode.getLastHealthReportTime(),
|
|
|
- rmNode.getNodeLabels());
|
|
|
-
|
|
|
- updatedNodeReports.add(report);
|
|
|
- }
|
|
|
- allocateResponse.setUpdatedNodes(updatedNodeReports);
|
|
|
- }
|
|
|
-
|
|
|
- addToAllocatedContainers(allocateResponse, allocation.getContainers());
|
|
|
-
|
|
|
- allocateResponse.setCompletedContainersStatuses(appAttempt
|
|
|
- .pullJustFinishedContainers());
|
|
|
- allocateResponse.setAvailableResources(allocation.getResourceLimit());
|
|
|
-
|
|
|
- addToContainerUpdates(appAttemptId, allocateResponse, allocation);
|
|
|
-
|
|
|
- allocateResponse.setNumClusterNodes(this.rScheduler.getNumClusterNodes());
|
|
|
-
|
|
|
- // add preemption to the allocateResponse message (if any)
|
|
|
- allocateResponse
|
|
|
- .setPreemptionMessage(generatePreemptionMessage(allocation));
|
|
|
-
|
|
|
- // Set application priority
|
|
|
- allocateResponse.setApplicationPriority(app
|
|
|
- .getApplicationPriority());
|
|
|
- }
|
|
|
-
|
|
|
- private void addToContainerUpdates(ApplicationAttemptId appAttemptId,
|
|
|
- AllocateResponse allocateResponse, Allocation allocation) {
|
|
|
- // Handling increased containers
|
|
|
- addToUpdatedContainers(
|
|
|
- allocateResponse, ContainerUpdateType.INCREASE_RESOURCE,
|
|
|
- allocation.getIncreasedContainers());
|
|
|
-
|
|
|
- // Handling decreased containers
|
|
|
- addToUpdatedContainers(
|
|
|
- allocateResponse, ContainerUpdateType.DECREASE_RESOURCE,
|
|
|
- allocation.getDecreasedContainers());
|
|
|
-
|
|
|
- // Handling promoted containers
|
|
|
- addToUpdatedContainers(
|
|
|
- allocateResponse, ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
|
|
|
- allocation.getPromotedContainers());
|
|
|
-
|
|
|
- // Handling demoted containers
|
|
|
- addToUpdatedContainers(
|
|
|
- allocateResponse, ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
|
|
|
- allocation.getDemotedContainers());
|
|
|
-
|
|
|
- SchedulerApplicationAttempt applicationAttempt = ((AbstractYarnScheduler)
|
|
|
- rScheduler).getApplicationAttempt(appAttemptId);
|
|
|
- if (applicationAttempt != null) {
|
|
|
- addToUpdateContainerErrors(allocateResponse,
|
|
|
- ((AbstractYarnScheduler)rScheduler)
|
|
|
- .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected void addToUpdateContainerErrors(AllocateResponse allocateResponse,
|
|
|
- List<UpdateContainerError> updateContainerErrors) {
|
|
|
- if (!updateContainerErrors.isEmpty()) {
|
|
|
- if (allocateResponse.getUpdateErrors() != null
|
|
|
- && !allocateResponse.getUpdateErrors().isEmpty()) {
|
|
|
- updateContainerErrors = new ArrayList<>(updateContainerErrors);
|
|
|
- updateContainerErrors.addAll(allocateResponse.getUpdateErrors());
|
|
|
- }
|
|
|
- allocateResponse.setUpdateErrors(updateContainerErrors);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected void addToUpdatedContainers(AllocateResponse allocateResponse,
|
|
|
- ContainerUpdateType updateType, List<Container> updatedContainers) {
|
|
|
- if (updatedContainers != null && updatedContainers.size() > 0) {
|
|
|
- ArrayList<UpdatedContainer> containersToSet = new ArrayList<>();
|
|
|
- if (allocateResponse.getUpdatedContainers() != null &&
|
|
|
- !allocateResponse.getUpdatedContainers().isEmpty()) {
|
|
|
- containersToSet.addAll(allocateResponse.getUpdatedContainers());
|
|
|
- }
|
|
|
- for (Container updatedContainer : updatedContainers) {
|
|
|
- containersToSet.add(
|
|
|
- UpdatedContainer.newInstance(updateType, updatedContainer));
|
|
|
- }
|
|
|
- allocateResponse.setUpdatedContainers(containersToSet);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- protected void addToAllocatedContainers(AllocateResponse allocateResponse,
|
|
|
- List<Container> allocatedContainers) {
|
|
|
- if (allocateResponse.getAllocatedContainers() != null
|
|
|
- && !allocateResponse.getAllocatedContainers().isEmpty()) {
|
|
|
- allocatedContainers = new ArrayList<>(allocatedContainers);
|
|
|
- allocatedContainers.addAll(allocateResponse.getAllocatedContainers());
|
|
|
- }
|
|
|
- allocateResponse.setAllocatedContainers(allocatedContainers);
|
|
|
- }
|
|
|
-
|
|
|
- private PreemptionMessage generatePreemptionMessage(Allocation allocation){
|
|
|
- PreemptionMessage pMsg = null;
|
|
|
- // assemble strict preemption request
|
|
|
- if (allocation.getStrictContainerPreemptions() != null) {
|
|
|
- pMsg =
|
|
|
- recordFactory.newRecordInstance(PreemptionMessage.class);
|
|
|
- StrictPreemptionContract pStrict =
|
|
|
- recordFactory.newRecordInstance(StrictPreemptionContract.class);
|
|
|
- Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
|
|
|
- for (ContainerId cId : allocation.getStrictContainerPreemptions()) {
|
|
|
- PreemptionContainer pc =
|
|
|
- recordFactory.newRecordInstance(PreemptionContainer.class);
|
|
|
- pc.setId(cId);
|
|
|
- pCont.add(pc);
|
|
|
- }
|
|
|
- pStrict.setContainers(pCont);
|
|
|
- pMsg.setStrictContract(pStrict);
|
|
|
- }
|
|
|
-
|
|
|
- // assemble negotiable preemption request
|
|
|
- if (allocation.getResourcePreemptions() != null &&
|
|
|
- allocation.getResourcePreemptions().size() > 0 &&
|
|
|
- allocation.getContainerPreemptions() != null &&
|
|
|
- allocation.getContainerPreemptions().size() > 0) {
|
|
|
- if (pMsg == null) {
|
|
|
- pMsg =
|
|
|
- recordFactory.newRecordInstance(PreemptionMessage.class);
|
|
|
- }
|
|
|
- PreemptionContract contract =
|
|
|
- recordFactory.newRecordInstance(PreemptionContract.class);
|
|
|
- Set<PreemptionContainer> pCont = new HashSet<PreemptionContainer>();
|
|
|
- for (ContainerId cId : allocation.getContainerPreemptions()) {
|
|
|
- PreemptionContainer pc =
|
|
|
- recordFactory.newRecordInstance(PreemptionContainer.class);
|
|
|
- pc.setId(cId);
|
|
|
- pCont.add(pc);
|
|
|
- }
|
|
|
- List<PreemptionResourceRequest> pRes = new ArrayList<PreemptionResourceRequest>();
|
|
|
- for (ResourceRequest crr : allocation.getResourcePreemptions()) {
|
|
|
- PreemptionResourceRequest prr =
|
|
|
- recordFactory.newRecordInstance(PreemptionResourceRequest.class);
|
|
|
- prr.setResourceRequest(crr);
|
|
|
- pRes.add(prr);
|
|
|
- }
|
|
|
- contract.setContainers(pCont);
|
|
|
- contract.setResourceRequest(pRes);
|
|
|
- pMsg.setContract(contract);
|
|
|
- }
|
|
|
-
|
|
|
- return pMsg;
|
|
|
}
|
|
|
|
|
|
public void registerAppAttempt(ApplicationAttemptId attemptId) {
|