|
@@ -24,8 +24,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
|
|
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
|
|
@@ -33,43 +31,38 @@ import org.apache.hadoop.yarn.api.records.RejectionReason;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
|
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
|
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
|
-import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collections;
|
|
import java.util.Collections;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
import java.util.Map;
|
|
import java.util.Map;
|
|
-import java.util.Set;
|
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.ExecutorService;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.concurrent.Executors;
|
|
import java.util.stream.Collectors;
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * An ApplicationMasterService Processor that performs Constrained placement of
|
|
|
|
|
|
+ * An ApplicationMasterServiceProcessor that performs Constrained placement of
|
|
* Scheduling Requests. It does the following:
|
|
* Scheduling Requests. It does the following:
|
|
* 1. All initialization.
|
|
* 1. All initialization.
|
|
* 2. Intercepts placement constraints from the register call and adds it to
|
|
* 2. Intercepts placement constraints from the register call and adds it to
|
|
* the placement constraint manager.
|
|
* the placement constraint manager.
|
|
* 3. Dispatches Scheduling Requests to the Planner.
|
|
* 3. Dispatches Scheduling Requests to the Planner.
|
|
*/
|
|
*/
|
|
-public class PlacementProcessor implements ApplicationMasterServiceProcessor {
|
|
|
|
|
|
+public class PlacementConstraintProcessor extends AbstractPlacementProcessor {
|
|
|
|
|
|
/**
|
|
/**
|
|
* Wrapper over the SchedulingResponse that wires in the placement attempt
|
|
* Wrapper over the SchedulingResponse that wires in the placement attempt
|
|
@@ -90,11 +83,8 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
|
|
}
|
|
}
|
|
|
|
|
|
private static final Logger LOG =
|
|
private static final Logger LOG =
|
|
- LoggerFactory.getLogger(PlacementProcessor.class);
|
|
|
|
- private PlacementConstraintManager constraintManager;
|
|
|
|
- private ApplicationMasterServiceProcessor nextAMSProcessor;
|
|
|
|
|
|
+ LoggerFactory.getLogger(PlacementConstraintProcessor.class);
|
|
|
|
|
|
- private AbstractYarnScheduler scheduler;
|
|
|
|
private ExecutorService schedulingThreadPool;
|
|
private ExecutorService schedulingThreadPool;
|
|
private int retryAttempts;
|
|
private int retryAttempts;
|
|
private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
|
|
private Map<ApplicationId, List<BatchedRequests>> requestsToRetry =
|
|
@@ -110,12 +100,8 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
|
|
public void init(ApplicationMasterServiceContext amsContext,
|
|
public void init(ApplicationMasterServiceContext amsContext,
|
|
ApplicationMasterServiceProcessor nextProcessor) {
|
|
ApplicationMasterServiceProcessor nextProcessor) {
|
|
LOG.info("Initializing Constraint Placement Processor:");
|
|
LOG.info("Initializing Constraint Placement Processor:");
|
|
- this.nextAMSProcessor = nextProcessor;
|
|
|
|
- this.constraintManager =
|
|
|
|
- ((RMContextImpl)amsContext).getPlacementConstraintManager();
|
|
|
|
|
|
+ super.init(amsContext, nextProcessor);
|
|
|
|
|
|
- this.scheduler =
|
|
|
|
- (AbstractYarnScheduler)((RMContextImpl)amsContext).getScheduler();
|
|
|
|
// Only the first class is considered - even if a comma separated
|
|
// Only the first class is considered - even if a comma separated
|
|
// list is provided. (This is for simplicity, since getInstances does a
|
|
// list is provided. (This is for simplicity, since getInstances does a
|
|
// lot of good things by handling things correctly)
|
|
// lot of good things by handling things correctly)
|
|
@@ -165,28 +151,6 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
|
|
LOG.info("Num retry attempts [{}]", this.retryAttempts);
|
|
LOG.info("Num retry attempts [{}]", this.retryAttempts);
|
|
}
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
|
- public void registerApplicationMaster(ApplicationAttemptId appAttemptId,
|
|
|
|
- RegisterApplicationMasterRequest request,
|
|
|
|
- RegisterApplicationMasterResponse response)
|
|
|
|
- throws IOException, YarnException {
|
|
|
|
- Map<Set<String>, PlacementConstraint> appPlacementConstraints =
|
|
|
|
- request.getPlacementConstraints();
|
|
|
|
- processPlacementConstraints(
|
|
|
|
- appAttemptId.getApplicationId(), appPlacementConstraints);
|
|
|
|
- nextAMSProcessor.registerApplicationMaster(appAttemptId, request, response);
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- private void processPlacementConstraints(ApplicationId applicationId,
|
|
|
|
- Map<Set<String>, PlacementConstraint> appPlacementConstraints) {
|
|
|
|
- if (appPlacementConstraints != null && !appPlacementConstraints.isEmpty()) {
|
|
|
|
- LOG.info("Constraints added for application [{}] against tags [{}]",
|
|
|
|
- applicationId, appPlacementConstraints);
|
|
|
|
- constraintManager.registerApplication(
|
|
|
|
- applicationId, appPlacementConstraints);
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
@Override
|
|
@Override
|
|
public void allocate(ApplicationAttemptId appAttemptId,
|
|
public void allocate(ApplicationAttemptId appAttemptId,
|
|
AllocateRequest request, AllocateResponse response) throws YarnException {
|
|
AllocateRequest request, AllocateResponse response) throws YarnException {
|
|
@@ -311,11 +275,10 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
|
|
public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
|
|
public void finishApplicationMaster(ApplicationAttemptId appAttemptId,
|
|
FinishApplicationMasterRequest request,
|
|
FinishApplicationMasterRequest request,
|
|
FinishApplicationMasterResponse response) {
|
|
FinishApplicationMasterResponse response) {
|
|
- constraintManager.unregisterApplication(appAttemptId.getApplicationId());
|
|
|
|
placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
|
|
placementDispatcher.clearApplicationState(appAttemptId.getApplicationId());
|
|
requestsToReject.remove(appAttemptId.getApplicationId());
|
|
requestsToReject.remove(appAttemptId.getApplicationId());
|
|
requestsToRetry.remove(appAttemptId.getApplicationId());
|
|
requestsToRetry.remove(appAttemptId.getApplicationId());
|
|
- nextAMSProcessor.finishApplicationMaster(appAttemptId, request, response);
|
|
|
|
|
|
+ super.finishApplicationMaster(appAttemptId, request, response);
|
|
}
|
|
}
|
|
|
|
|
|
private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
|
|
private void handleSchedulingResponse(SchedulingResponse schedulerResponse) {
|