|
@@ -0,0 +1,364 @@
|
|
|
+/**
|
|
|
+* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+* or more contributor license agreements. See the NOTICE file
|
|
|
+* distributed with this work for additional information
|
|
|
+* regarding copyright ownership. The ASF licenses this file
|
|
|
+* to you under the Apache License, Version 2.0 (the
|
|
|
+* "License"); you may not use this file except in compliance
|
|
|
+* with the License. You may obtain a copy of the License at
|
|
|
+*
|
|
|
+* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+*
|
|
|
+* Unless required by applicable law or agreed to in writing, software
|
|
|
+* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+* See the License for the specific language governing permissions and
|
|
|
+* limitations under the License.
|
|
|
+*/
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.TreeSet;
|
|
|
+
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.ipc.RPC;
|
|
|
+import org.apache.hadoop.service.AbstractService;
|
|
|
+import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|
|
+import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
|
|
+import org.apache.hadoop.yarn.client.AMRMClientUtils;
|
|
|
+import org.apache.hadoop.yarn.client.ClientRMProxy;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSet;
|
|
|
+import org.apache.hadoop.yarn.server.scheduler.ResourceRequestSetKey;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
+
|
|
|
+/**
|
|
|
+ * A component that sits in between AMRMClient(Impl) and Yarn RM. It remembers
|
|
|
+ * pending requests similar to AMRMClient, and handles RM re-sync automatically
|
|
|
+ * without propagate the re-sync exception back to AMRMClient.
|
|
|
+ */
|
|
|
+public class AMRMClientRelayer extends AbstractService
|
|
|
+ implements ApplicationMasterProtocol {
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(AMRMClientRelayer.class);
|
|
|
+
|
|
|
+ private ApplicationMasterProtocol rmClient;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The original registration request that was sent by the AM. This instance is
|
|
|
+ * reused to register/re-register with all the sub-cluster RMs.
|
|
|
+ */
|
|
|
+ private RegisterApplicationMasterRequest amRegistrationRequest;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Similar to AMRMClientImpl, all data structures below have two versions:
|
|
|
+ *
|
|
|
+ * The remote ones are all the pending requests that RM has not fulfill yet.
|
|
|
+ * Whenever RM fails over, we re-register and then full re-send all these
|
|
|
+ * pending requests.
|
|
|
+ *
|
|
|
+ * The non-remote ones are the requests that RM has not received yet. When RM
|
|
|
+ * throws non-fail-over exception back, the request is considered not received
|
|
|
+ * by RM. We will merge with new requests and re-send in the next heart beat.
|
|
|
+ */
|
|
|
+ private Map<ResourceRequestSetKey, ResourceRequestSet> remotePendingAsks =
|
|
|
+ new HashMap<>();
|
|
|
+ /**
|
|
|
+ * Same as AMRMClientImpl, we need to use a custom comparator that does not
|
|
|
+ * look at ResourceRequest.getNumContainers() here. TreeSet allows a custom
|
|
|
+ * comparator.
|
|
|
+ */
|
|
|
+ private Set<ResourceRequest> ask =
|
|
|
+ new TreeSet<>(new ResourceRequest.ResourceRequestComparator());
|
|
|
+
|
|
|
+ private Set<ContainerId> remotePendingRelease = new HashSet<>();
|
|
|
+ private Set<ContainerId> release = new HashSet<>();
|
|
|
+
|
|
|
+ private Set<String> remoteBlacklistedNodes = new HashSet<>();
|
|
|
+ private Set<String> blacklistAdditions = new HashSet<>();
|
|
|
+ private Set<String> blacklistRemovals = new HashSet<>();
|
|
|
+
|
|
|
+ private Map<ContainerId, UpdateContainerRequest> remotePendingChange =
|
|
|
+ new HashMap<>();
|
|
|
+ private Map<ContainerId, UpdateContainerRequest> change = new HashMap<>();
|
|
|
+
|
|
|
+ private Map<Set<String>, List<SchedulingRequest>> remotePendingSchedRequest =
|
|
|
+ new HashMap<>();
|
|
|
+ private List<SchedulingRequest> schedulingRequest = new ArrayList<>();
|
|
|
+
|
|
|
+ public AMRMClientRelayer() {
|
|
|
+ super(AMRMClientRelayer.class.getName());
|
|
|
+ }
|
|
|
+
|
|
|
+ public AMRMClientRelayer(ApplicationMasterProtocol rmClient) {
|
|
|
+ this();
|
|
|
+ this.rmClient = rmClient;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceInit(Configuration conf) throws Exception {
|
|
|
+ super.serviceInit(conf);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceStart() throws Exception {
|
|
|
+ final YarnConfiguration conf = new YarnConfiguration(getConfig());
|
|
|
+ try {
|
|
|
+ if (this.rmClient == null) {
|
|
|
+ this.rmClient =
|
|
|
+ ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw new YarnRuntimeException(e);
|
|
|
+ }
|
|
|
+ super.serviceStart();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected void serviceStop() throws Exception {
|
|
|
+ if (this.rmClient != null) {
|
|
|
+ RPC.stopProxy(this.rmClient);
|
|
|
+ }
|
|
|
+ super.serviceStop();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public RegisterApplicationMasterResponse registerApplicationMaster(
|
|
|
+ RegisterApplicationMasterRequest request)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ this.amRegistrationRequest = request;
|
|
|
+ return this.rmClient.registerApplicationMaster(request);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public FinishApplicationMasterResponse finishApplicationMaster(
|
|
|
+ FinishApplicationMasterRequest request)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ try {
|
|
|
+ return this.rmClient.finishApplicationMaster(request);
|
|
|
+ } catch (ApplicationMasterNotRegisteredException e) {
|
|
|
+ LOG.warn("Out of sync with ResourceManager, hence resyncing.");
|
|
|
+ // re register with RM
|
|
|
+ registerApplicationMaster(this.amRegistrationRequest);
|
|
|
+ return finishApplicationMaster(request);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public AllocateResponse allocate(AllocateRequest allocateRequest)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ AllocateResponse allocateResponse = null;
|
|
|
+ try {
|
|
|
+ synchronized (this) {
|
|
|
+ // update the data structures first
|
|
|
+ addNewAsks(allocateRequest.getAskList());
|
|
|
+
|
|
|
+ if (allocateRequest.getReleaseList() != null) {
|
|
|
+ this.remotePendingRelease.addAll(allocateRequest.getReleaseList());
|
|
|
+ this.release.addAll(allocateRequest.getReleaseList());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (allocateRequest.getResourceBlacklistRequest() != null) {
|
|
|
+ if (allocateRequest.getResourceBlacklistRequest()
|
|
|
+ .getBlacklistAdditions() != null) {
|
|
|
+ this.remoteBlacklistedNodes.addAll(allocateRequest
|
|
|
+ .getResourceBlacklistRequest().getBlacklistAdditions());
|
|
|
+ this.blacklistAdditions.addAll(allocateRequest
|
|
|
+ .getResourceBlacklistRequest().getBlacklistAdditions());
|
|
|
+ }
|
|
|
+ if (allocateRequest.getResourceBlacklistRequest()
|
|
|
+ .getBlacklistRemovals() != null) {
|
|
|
+ this.remoteBlacklistedNodes.removeAll(allocateRequest
|
|
|
+ .getResourceBlacklistRequest().getBlacklistRemovals());
|
|
|
+ this.blacklistRemovals.addAll(allocateRequest
|
|
|
+ .getResourceBlacklistRequest().getBlacklistRemovals());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (allocateRequest.getUpdateRequests() != null) {
|
|
|
+ for (UpdateContainerRequest update : allocateRequest
|
|
|
+ .getUpdateRequests()) {
|
|
|
+ this.remotePendingChange.put(update.getContainerId(), update);
|
|
|
+ this.change.put(update.getContainerId(), update);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (allocateRequest.getSchedulingRequests() != null) {
|
|
|
+ AMRMClientUtils.addToOutstandingSchedulingRequests(
|
|
|
+ allocateRequest.getSchedulingRequests(),
|
|
|
+ this.remotePendingSchedRequest);
|
|
|
+ this.schedulingRequest
|
|
|
+ .addAll(allocateRequest.getSchedulingRequests());
|
|
|
+ }
|
|
|
+
|
|
|
+ ArrayList<ResourceRequest> askList = new ArrayList<>(ask.size());
|
|
|
+ for (ResourceRequest r : ask) {
|
|
|
+ // create a copy of ResourceRequest as we might change it while the
|
|
|
+ // RPC layer is using it to send info across
|
|
|
+ askList.add(ResourceRequest.newBuilder().priority(r.getPriority())
|
|
|
+ .resourceName(r.getResourceName()).capability(r.getCapability())
|
|
|
+ .numContainers(r.getNumContainers())
|
|
|
+ .relaxLocality(r.getRelaxLocality())
|
|
|
+ .nodeLabelExpression(r.getNodeLabelExpression())
|
|
|
+ .executionTypeRequest(r.getExecutionTypeRequest())
|
|
|
+ .allocationRequestId(r.getAllocationRequestId()).build());
|
|
|
+ }
|
|
|
+
|
|
|
+ allocateRequest = AllocateRequest.newBuilder()
|
|
|
+ .responseId(allocateRequest.getResponseId())
|
|
|
+ .progress(allocateRequest.getProgress()).askList(askList)
|
|
|
+ .releaseList(new ArrayList<>(this.release))
|
|
|
+ .resourceBlacklistRequest(ResourceBlacklistRequest.newInstance(
|
|
|
+ new ArrayList<>(this.blacklistAdditions),
|
|
|
+ new ArrayList<>(this.blacklistRemovals)))
|
|
|
+ .updateRequests(new ArrayList<>(this.change.values()))
|
|
|
+ .schedulingRequests(new ArrayList<>(this.schedulingRequest))
|
|
|
+ .build();
|
|
|
+ }
|
|
|
+
|
|
|
+ // Do the actual allocate call
|
|
|
+ try {
|
|
|
+ allocateResponse = this.rmClient.allocate(allocateRequest);
|
|
|
+ } catch (ApplicationMasterNotRegisteredException e) {
|
|
|
+ LOG.warn("ApplicationMaster is out of sync with ResourceManager,"
|
|
|
+ + " hence resyncing.");
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ // Add all remotePending data into to-send data structures
|
|
|
+ for (ResourceRequestSet requestSet : this.remotePendingAsks
|
|
|
+ .values()) {
|
|
|
+ for (ResourceRequest rr : requestSet.getRRs()) {
|
|
|
+ addResourceRequestToAsk(rr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ this.release.addAll(this.remotePendingRelease);
|
|
|
+ this.blacklistAdditions.addAll(this.remoteBlacklistedNodes);
|
|
|
+ this.change.putAll(this.remotePendingChange);
|
|
|
+ for (List<SchedulingRequest> reqs : this.remotePendingSchedRequest
|
|
|
+ .values()) {
|
|
|
+ this.schedulingRequest.addAll(reqs);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // re register with RM, then retry allocate recursively
|
|
|
+ registerApplicationMaster(this.amRegistrationRequest);
|
|
|
+ return allocate(allocateRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ synchronized (this) {
|
|
|
+ // Process the allocate response from RM
|
|
|
+ if (allocateResponse.getCompletedContainersStatuses() != null) {
|
|
|
+ for (ContainerStatus container : allocateResponse
|
|
|
+ .getCompletedContainersStatuses()) {
|
|
|
+ this.remotePendingRelease.remove(container.getContainerId());
|
|
|
+ this.remotePendingChange.remove(container.getContainerId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (allocateResponse.getUpdatedContainers() != null) {
|
|
|
+ for (UpdatedContainer updatedContainer : allocateResponse
|
|
|
+ .getUpdatedContainers()) {
|
|
|
+ this.remotePendingChange
|
|
|
+ .remove(updatedContainer.getContainer().getId());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ AMRMClientUtils.removeFromOutstandingSchedulingRequests(
|
|
|
+ allocateResponse.getAllocatedContainers(),
|
|
|
+ this.remotePendingSchedRequest);
|
|
|
+ AMRMClientUtils.removeFromOutstandingSchedulingRequests(
|
|
|
+ allocateResponse.getContainersFromPreviousAttempts(),
|
|
|
+ this.remotePendingSchedRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ } finally {
|
|
|
+ synchronized (this) {
|
|
|
+ /*
|
|
|
+ * If allocateResponse is null, it means exception happened and RM did
|
|
|
+ * not accept the request. Don't clear any data structures so that they
|
|
|
+ * will be re-sent next time.
|
|
|
+ *
|
|
|
+ * Otherwise request was accepted by RM, we are safe to clear these.
|
|
|
+ */
|
|
|
+ if (allocateResponse != null) {
|
|
|
+ this.ask.clear();
|
|
|
+ this.release.clear();
|
|
|
+
|
|
|
+ this.blacklistAdditions.clear();
|
|
|
+ this.blacklistRemovals.clear();
|
|
|
+
|
|
|
+ this.change.clear();
|
|
|
+ this.schedulingRequest.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return allocateResponse;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addNewAsks(List<ResourceRequest> asks) throws YarnException {
|
|
|
+ Set<ResourceRequestSetKey> touchedKeys = new HashSet<>();
|
|
|
+ for (ResourceRequest rr : asks) {
|
|
|
+ addResourceRequestToAsk(rr);
|
|
|
+
|
|
|
+ ResourceRequestSetKey key = new ResourceRequestSetKey(rr);
|
|
|
+ touchedKeys.add(key);
|
|
|
+
|
|
|
+ ResourceRequestSet askSet = this.remotePendingAsks.get(key);
|
|
|
+ if (askSet == null) {
|
|
|
+ askSet = new ResourceRequestSet(key);
|
|
|
+ this.remotePendingAsks.put(key, askSet);
|
|
|
+ }
|
|
|
+ askSet.addAndOverrideRR(rr);
|
|
|
+ }
|
|
|
+
|
|
|
+ // Cleanup properly if needed
|
|
|
+ for (ResourceRequestSetKey key : touchedKeys) {
|
|
|
+ ResourceRequestSet askSet = this.remotePendingAsks.get(key);
|
|
|
+ if (askSet.getNumContainers() == 0) {
|
|
|
+ this.remotePendingAsks.remove(key);
|
|
|
+ } else {
|
|
|
+ // Remove non-any zero RRs
|
|
|
+ askSet.cleanupZeroNonAnyRR();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
|
|
|
+ // The ResourceRequestComparator doesn't look at container count when
|
|
|
+ // comparing. So we need to make sure the new RR override the old if any
|
|
|
+ this.ask.remove(remoteRequest);
|
|
|
+ this.ask.add(remoteRequest);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ protected Map<ResourceRequestSetKey, ResourceRequestSet>
|
|
|
+ getRemotePendingAsks() {
|
|
|
+ return this.remotePendingAsks;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|