|
@@ -0,0 +1,285 @@
|
|
|
|
+/**
|
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
|
+ * distributed with this work for additional information
|
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
|
+ *
|
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
+ *
|
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
|
+ * limitations under the License.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+package org.apache.hadoop.yarn.sls.appmaster;
|
|
|
|
+
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
|
+import org.apache.hadoop.classification.InterfaceStability;
|
|
|
|
+import org.apache.hadoop.security.UserGroupInformation;
|
|
|
|
+import org.apache.hadoop.security.token.Token;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
|
+import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
|
+import org.apache.hadoop.yarn.sls.SLSRunner;
|
|
|
|
+import org.apache.hadoop.yarn.sls.scheduler.ContainerSimulator;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
+
|
|
|
|
+import java.security.PrivilegedExceptionAction;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.HashMap;
|
|
|
|
+import java.util.LinkedList;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.Map;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * AMSimulator that simulates DAG - it requests for containers
|
|
|
|
+ * based on the delay specified. It finishes when all the tasks
|
|
|
|
+ * are completed.
|
|
|
|
+ *
|
|
|
|
+ * Vocabulary Used:
|
|
|
|
+ * pending -> requests which are NOT yet sent to RM.
|
|
|
|
+ * scheduled -> requests which are sent to RM but not yet assigned.
|
|
|
|
+ * assigned -> requests which are assigned to a container.
|
|
|
|
+ * completed -> request corresponding to which container has completed.
|
|
|
|
+ * Containers are requested based on the request delay.
|
|
|
|
+ */
|
|
|
|
+
|
|
|
|
+@InterfaceAudience.Private
|
|
|
|
+@InterfaceStability.Unstable
|
|
|
|
+public class DAGAMSimulator extends AMSimulator {
|
|
|
|
+
|
|
|
|
+ private static final int PRIORITY = 20;
|
|
|
|
+
|
|
|
|
+ private List<ContainerSimulator> pendingContainers =
|
|
|
|
+ new LinkedList<>();
|
|
|
|
+
|
|
|
|
+ private List<ContainerSimulator> scheduledContainers =
|
|
|
|
+ new LinkedList<>();
|
|
|
|
+
|
|
|
|
+ private Map<ContainerId, ContainerSimulator> assignedContainers =
|
|
|
|
+ new HashMap<>();
|
|
|
|
+
|
|
|
|
+ private List<ContainerSimulator> completedContainers =
|
|
|
|
+ new LinkedList<>();
|
|
|
|
+
|
|
|
|
+ private List<ContainerSimulator> allContainers =
|
|
|
|
+ new LinkedList<>();
|
|
|
|
+
|
|
|
|
+ private boolean isFinished = false;
|
|
|
|
+
|
|
|
|
+ private long amStartTime;
|
|
|
|
+
|
|
|
|
+ private static final Logger LOG =
|
|
|
|
+ LoggerFactory.getLogger(DAGAMSimulator.class);
|
|
|
|
+
|
|
|
|
+ @SuppressWarnings("checkstyle:parameternumber")
|
|
|
|
+ public void init(int heartbeatInterval,
|
|
|
|
+ List<ContainerSimulator> containerList, ResourceManager resourceManager,
|
|
|
|
+ SLSRunner slsRunnner, long startTime, long finishTime, String simUser,
|
|
|
|
+ String simQueue, boolean tracked, String oldApp, long baseTimeMS,
|
|
|
|
+ Resource amResource, String nodeLabelExpr, Map<String, String> params,
|
|
|
|
+ Map<ApplicationId, AMSimulator> appIdAMSim) {
|
|
|
|
+ super.init(heartbeatInterval, containerList, resourceManager, slsRunnner,
|
|
|
|
+ startTime, finishTime, simUser, simQueue, tracked, oldApp, baseTimeMS,
|
|
|
|
+ amResource, nodeLabelExpr, params, appIdAMSim);
|
|
|
|
+ super.amtype = "dag";
|
|
|
|
+
|
|
|
|
+ allContainers.addAll(containerList);
|
|
|
|
+ pendingContainers.addAll(containerList);
|
|
|
|
+ totalContainers = allContainers.size();
|
|
|
|
+
|
|
|
|
+ LOG.info("Added new job with {} containers", allContainers.size());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void firstStep() throws Exception {
|
|
|
|
+ super.firstStep();
|
|
|
|
+ amStartTime = System.currentTimeMillis();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void initReservation(ReservationId reservationId,
|
|
|
|
+ long deadline, long now) {
|
|
|
|
+ // DAG AM doesn't support reservation
|
|
|
|
+ setReservationRequest(null);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void notifyAMContainerLaunched(Container masterContainer)
|
|
|
|
+ throws Exception {
|
|
|
|
+ if (null != masterContainer) {
|
|
|
|
+ restart();
|
|
|
|
+ super.notifyAMContainerLaunched(masterContainer);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void processResponseQueue() throws Exception {
|
|
|
|
+ while (!responseQueue.isEmpty()) {
|
|
|
|
+ AllocateResponse response = responseQueue.take();
|
|
|
|
+
|
|
|
|
+ // check completed containers
|
|
|
|
+ if (!response.getCompletedContainersStatuses().isEmpty()) {
|
|
|
|
+ for (ContainerStatus cs : response.getCompletedContainersStatuses()) {
|
|
|
|
+ ContainerId containerId = cs.getContainerId();
|
|
|
|
+ if (cs.getExitStatus() == ContainerExitStatus.SUCCESS) {
|
|
|
|
+ if (assignedContainers.containsKey(containerId)) {
|
|
|
|
+ LOG.debug("Application {} has one container finished ({}).",
|
|
|
|
+ appId, containerId);
|
|
|
|
+ ContainerSimulator containerSimulator =
|
|
|
|
+ assignedContainers.remove(containerId);
|
|
|
|
+ finishedContainers++;
|
|
|
|
+ completedContainers.add(containerSimulator);
|
|
|
|
+ } else if (amContainer.getId().equals(containerId)) {
|
|
|
|
+ // am container released event
|
|
|
|
+ isFinished = true;
|
|
|
|
+ LOG.info("Application {} goes to finish.", appId);
|
|
|
|
+ }
|
|
|
|
+ if (finishedContainers >= totalContainers) {
|
|
|
|
+ lastStep();
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ // container to be killed
|
|
|
|
+ if (assignedContainers.containsKey(containerId)) {
|
|
|
|
+ LOG.error("Application {} has one container killed ({}).", appId,
|
|
|
|
+ containerId);
|
|
|
|
+ pendingContainers.add(assignedContainers.remove(containerId));
|
|
|
|
+ } else if (amContainer.getId().equals(containerId)) {
|
|
|
|
+ LOG.error("Application {}'s AM is "
|
|
|
|
+ + "going to be killed. Waiting for rescheduling...", appId);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // check finished
|
|
|
|
+ if (isAMContainerRunning &&
|
|
|
|
+ (finishedContainers >= totalContainers)) {
|
|
|
|
+ isAMContainerRunning = false;
|
|
|
|
+ LOG.info("Application {} sends out event to clean up"
|
|
|
|
+ + " its AM container.", appId);
|
|
|
|
+ isFinished = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // check allocated containers
|
|
|
|
+ for (Container container : response.getAllocatedContainers()) {
|
|
|
|
+ if (!scheduledContainers.isEmpty()) {
|
|
|
|
+ ContainerSimulator cs = scheduledContainers.remove(0);
|
|
|
|
+ LOG.debug("Application {} starts to launch a container ({}).",
|
|
|
|
+ appId, container.getId());
|
|
|
|
+ assignedContainers.put(container.getId(), cs);
|
|
|
|
+ se.getNmMap().get(container.getNodeId())
|
|
|
|
+ .addNewContainer(container, cs.getLifeTime());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void sendContainerRequest() throws Exception {
|
|
|
|
+ if (isFinished) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ // send out request
|
|
|
|
+ List<ResourceRequest> ask = null;
|
|
|
|
+ if (finishedContainers != totalContainers) {
|
|
|
|
+ if (!pendingContainers.isEmpty()) {
|
|
|
|
+ List<ContainerSimulator> toBeScheduled =
|
|
|
|
+ getToBeScheduledContainers(pendingContainers, amStartTime);
|
|
|
|
+ if (toBeScheduled.size() > 0) {
|
|
|
|
+ ask = packageRequests(toBeScheduled, PRIORITY);
|
|
|
|
+ LOG.info("Application {} sends out request for {} containers.",
|
|
|
|
+ appId, toBeScheduled.size());
|
|
|
|
+ scheduledContainers.addAll(toBeScheduled);
|
|
|
|
+ pendingContainers.removeAll(toBeScheduled);
|
|
|
|
+ toBeScheduled.clear();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (ask == null) {
|
|
|
|
+ ask = new ArrayList<>();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ final AllocateRequest request = createAllocateRequest(ask);
|
|
|
|
+ if (totalContainers == 0) {
|
|
|
|
+ request.setProgress(1.0f);
|
|
|
|
+ } else {
|
|
|
|
+ request.setProgress((float) finishedContainers / totalContainers);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ UserGroupInformation ugi =
|
|
|
|
+ UserGroupInformation.createRemoteUser(appAttemptId.toString());
|
|
|
|
+ Token<AMRMTokenIdentifier> token = rm.getRMContext().getRMApps()
|
|
|
|
+ .get(appAttemptId.getApplicationId())
|
|
|
|
+ .getRMAppAttempt(appAttemptId).getAMRMToken();
|
|
|
|
+ ugi.addTokenIdentifier(token.decodeIdentifier());
|
|
|
|
+ AllocateResponse response = ugi.doAs(
|
|
|
|
+ (PrivilegedExceptionAction<AllocateResponse>) () -> rm
|
|
|
|
+ .getApplicationMasterService().allocate(request));
|
|
|
|
+ if (response != null) {
|
|
|
|
+ responseQueue.put(response);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @VisibleForTesting
|
|
|
|
+ public List<ContainerSimulator> getToBeScheduledContainers(
|
|
|
|
+ List<ContainerSimulator> containers, long startTime) {
|
|
|
|
+ List<ContainerSimulator> toBeScheduled = new LinkedList<>();
|
|
|
|
+ for (ContainerSimulator cs : containers) {
|
|
|
|
+ // only request for the container if it is time to request
|
|
|
|
+ if (cs.getRequestDelay() + startTime <=
|
|
|
|
+ System.currentTimeMillis()) {
|
|
|
|
+ toBeScheduled.add(cs);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return toBeScheduled;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void checkStop() {
|
|
|
|
+ if (isFinished) {
|
|
|
|
+ super.setEndTime(System.currentTimeMillis());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void lastStep() throws Exception {
|
|
|
|
+ super.lastStep();
|
|
|
|
+
|
|
|
|
+ //clear data structures.
|
|
|
|
+ allContainers.clear();
|
|
|
|
+ pendingContainers.clear();
|
|
|
|
+ scheduledContainers.clear();
|
|
|
|
+ assignedContainers.clear();
|
|
|
|
+ completedContainers.clear();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * restart running because of the am container killed.
|
|
|
|
+ */
|
|
|
|
+ private void restart() {
|
|
|
|
+ isFinished = false;
|
|
|
|
+ pendingContainers.clear();
|
|
|
|
+ pendingContainers.addAll(allContainers);
|
|
|
|
+ pendingContainers.removeAll(completedContainers);
|
|
|
|
+ amContainer = null;
|
|
|
|
+ }
|
|
|
|
+}
|