Kaynağa Gözat

YARN-11034. Add enhanced headroom in AllocateResponse (#3766)

minni31 3 yıl önce
ebeveyn
işleme
4b26635a34

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java

@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -147,6 +148,23 @@ public abstract class AllocateResponse {
         .collectorInfo(collectorInfo).build();
   }
 
+  @Private
+  @Unstable
+  public static AllocateResponse newInstance(int responseId,
+      List<ContainerStatus> completedContainers,
+      List<Container> allocatedContainers, List<NodeReport> updatedNodes,
+      Resource availResources, AMCommand command, int numClusterNodes,
+      PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
+      List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo,
+      EnhancedHeadroom enhancedHeadroom) {
+    AllocateResponse response =
+        newInstance(responseId, completedContainers, allocatedContainers,
+            updatedNodes, availResources, command, numClusterNodes, preempt,
+            nmTokens, amRMToken, updatedContainers, collectorInfo);
+    response.setEnhancedHeadroom(enhancedHeadroom);
+    return response;
+  }
+
   /**
    * If the <code>ResourceManager</code> needs the
    * <code>ApplicationMaster</code> to take some action then it will send an
@@ -439,6 +457,14 @@ public abstract class AllocateResponse {
     return new AllocateResponseBuilder();
   }
 
+  @Public
+  @Unstable
+  public abstract EnhancedHeadroom getEnhancedHeadroom();
+
+  @Private
+  @Unstable
+  public abstract void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom);
+
   /**
    * Class to construct instances of {@link AllocateResponse} with specific
    * options.
@@ -666,6 +692,18 @@ public abstract class AllocateResponse {
       return this;
     }
 
+    @Public
+    @Unstable
+    public EnhancedHeadroom getEnhancedHeadroom() {
+      return allocateResponse.getEnhancedHeadroom();
+    }
+
+    @Private
+    @Unstable
+    public void setEnhancedHeadroom(EnhancedHeadroom enhancedHeadroom){
+      allocateResponse.setEnhancedHeadroom(enhancedHeadroom);
+    }
+
     /**
      * Return generated {@link AllocateResponse} object.
      * @return {@link AllocateResponse}

+ 72 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/EnhancedHeadroom.java

@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Enhanced head room in AllocateResponse.
+ * This provides a channel for RMs to return load information for AMRMProxy
+ * decision making when rerouting resource requests.
+ *
+ * Contains total pending container count and active cores for a cluster.
+ */
+public abstract class EnhancedHeadroom {
+  public static EnhancedHeadroom newInstance(int totalPendingCount,
+      int totalActiveCores) {
+    EnhancedHeadroom enhancedHeadroom =
+        Records.newRecord(EnhancedHeadroom.class);
+    enhancedHeadroom.setTotalPendingCount(totalPendingCount);
+    enhancedHeadroom.setTotalActiveCores(totalActiveCores);
+    return enhancedHeadroom;
+  }
+
+  /**
+   * Set total pending container count.
+   * @param totalPendingCount the pending container count
+   */
+  public abstract void setTotalPendingCount(int totalPendingCount);
+
+  /**
+   * Get total pending container count.
+   * @return the pending container count
+   */
+  public abstract int getTotalPendingCount();
+
+  /**
+   * Set total active cores for the cluster.
+   * @param totalActiveCores the total active cores for the cluster
+   */
+  public abstract void setTotalActiveCores(int totalActiveCores);
+
+  /**
+   * Get total active cores for the cluster.
+   * @return totalActiveCores the total active cores for the cluster
+   */
+  public abstract int getTotalActiveCores();
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("<pendingCount:").append(this.getTotalPendingCount());
+    sb.append(", activeCores:").append(this.getTotalActiveCores());
+    sb.append(">");
+    return sb.toString();
+  }
+}

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto

@@ -106,6 +106,11 @@ message UpdatedContainerProto {
   required ContainerProto container = 2;
 }
 
+message EnhancedHeadroomProto {
+  optional int32 total_pending_count = 1;
+  optional int32 total_active_cores = 2;
+}
+
 message AllocateResponseProto {
   optional AMCommandProto a_m_command = 1;
   optional int32 response_id = 2;
@@ -123,6 +128,7 @@ message AllocateResponseProto {
   repeated UpdatedContainerProto updated_containers = 16;
   repeated ContainerProto containers_from_previous_attempts = 17;
   repeated RejectedSchedulingRequestProto rejected_scheduling_requests = 18;
+  optional EnhancedHeadroomProto enhanced_headroom = 19;
 }
 
 enum SchedulerResourceTypes {

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.UpdatedContainer;
 import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
@@ -89,6 +91,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   private Token amrmToken = null;
   private Priority appPriority = null;
   private CollectorInfo collectorInfo = null;
+  private EnhancedHeadroom enhancedHeadroom = null;
 
   public AllocateResponsePBImpl() {
     builder = AllocateResponseProto.newBuilder();
@@ -190,6 +193,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
           getContainerProtoIterable(this.containersFromPreviousAttempts);
       builder.addAllContainersFromPreviousAttempts(iterable);
     }
+    if (this.enhancedHeadroom != null) {
+      builder.setEnhancedHeadroom(convertToProtoFormat(this.enhancedHeadroom));
+    }
   }
 
   private synchronized void mergeLocalToProto() {
@@ -422,6 +428,28 @@ public class AllocateResponsePBImpl extends AllocateResponse {
     this.amrmToken = amRMToken;
   }
 
+  @Override
+  public synchronized EnhancedHeadroom getEnhancedHeadroom() {
+    AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (enhancedHeadroom != null) {
+      return enhancedHeadroom;
+    }
+    if (!p.hasEnhancedHeadroom()) {
+      return null;
+    }
+    this.enhancedHeadroom = convertFromProtoFormat(p.getEnhancedHeadroom());
+    return enhancedHeadroom;
+  }
+
+  @Override
+  public synchronized void setEnhancedHeadroom(
+      EnhancedHeadroom enhancedHeadroom) {
+    maybeInitBuilder();
+    if (enhancedHeadroom == null) {
+      builder.clearEnhancedHeadroom();
+    }
+    this.enhancedHeadroom = enhancedHeadroom;
+  }
 
   @Override
   public synchronized CollectorInfo getCollectorInfo() {
@@ -933,4 +961,14 @@ public class AllocateResponsePBImpl extends AllocateResponse {
   private PriorityProto convertToProtoFormat(Priority t) {
     return ((PriorityPBImpl)t).getProto();
   }
+
+  private EnhancedHeadroomPBImpl convertFromProtoFormat(
+      YarnServiceProtos.EnhancedHeadroomProto p) {
+    return new EnhancedHeadroomPBImpl(p);
+  }
+
+  private YarnServiceProtos.EnhancedHeadroomProto convertToProtoFormat(
+      EnhancedHeadroom t) {
+    return ((EnhancedHeadroomPBImpl) t).getProto();
+  }
 }

+ 123 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/EnhancedHeadroomPBImpl.java

@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProto;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.EnhancedHeadroomProtoOrBuilder;
+
+import org.apache.hadoop.thirdparty.protobuf.TextFormat;
+
+public class EnhancedHeadroomPBImpl extends EnhancedHeadroom {
+
+  private EnhancedHeadroomProto proto =
+      EnhancedHeadroomProto.getDefaultInstance();
+  private EnhancedHeadroomProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public EnhancedHeadroomPBImpl() {
+    builder = EnhancedHeadroomProto.newBuilder();
+  }
+
+  public EnhancedHeadroomPBImpl(EnhancedHeadroomProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public EnhancedHeadroomProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  @Override
+  public int hashCode() {
+    return getProto().hashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null) {
+      return false;
+    }
+    if (other.getClass().isAssignableFrom(this.getClass())) {
+      return this.getProto().equals(this.getClass().cast(other).getProto());
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return TextFormat.shortDebugString(getProto());
+  }
+
+  private void mergeLocalToBuilder() {
+    // No local content yet
+  }
+
+  private void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = EnhancedHeadroomProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public void setTotalPendingCount(int totalPendingCount) {
+    maybeInitBuilder();
+    if (totalPendingCount == 0) {
+      builder.clearTotalPendingCount();
+      return;
+    }
+    builder.setTotalPendingCount(totalPendingCount);
+  }
+
+  @Override
+  public int getTotalPendingCount() {
+    EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasTotalPendingCount()) ? p.getTotalPendingCount() : 0;
+  }
+
+  @Override
+  public void setTotalActiveCores(int totalActiveCores) {
+    maybeInitBuilder();
+    if (totalActiveCores == 0) {
+      builder.clearTotalActiveCores();
+      return;
+    }
+    builder.setTotalActiveCores(totalActiveCores);
+  }
+
+  @Override
+  public int getTotalActiveCores() {
+    EnhancedHeadroomProtoOrBuilder p = viaProto ? proto : builder;
+    return (p.hasTotalActiveCores()) ? p.getTotalActiveCores() : 0;
+  }
+
+}

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/TestPBImplRecords.java

@@ -127,6 +127,7 @@ import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerReport;
 import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -184,6 +185,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerReportPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerRetryContextPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.EnhancedHeadroomPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ExecutionTypeRequestPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
@@ -430,6 +432,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
     generateByNewInstance(UpdatedContainer.class);
     generateByNewInstance(ContainerUpdateRequest.class);
     generateByNewInstance(ContainerUpdateResponse.class);
+    generateByNewInstance(EnhancedHeadroom.class);
     // genByNewInstance does not apply to QueueInfo, cause
     // it is recursive(has sub queues)
     typeValueCache.put(QueueInfo.class, QueueInfo.
@@ -1331,4 +1334,10 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
     validatePBImplRecord(GetNodesToAttributesResponsePBImpl.class,
         YarnServiceProtos.GetNodesToAttributesResponseProto.class);
   }
+
+  @Test
+  public void testGetEnhancedHeadroomPBImpl() throws Exception {
+    validatePBImplRecord(EnhancedHeadroomPBImpl.class,
+        YarnServiceProtos.EnhancedHeadroomProto.class);
+  }
 }

+ 10 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/policies/amrmproxy/LocalityMulticastAMRMProxyPolicy.java

@@ -32,6 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -131,6 +132,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
   private SubClusterResolver resolver;
 
   private Map<SubClusterId, Resource> headroom;
+  private Map<SubClusterId, EnhancedHeadroom> enhancedHeadroom;
   private float hrAlpha;
   private FederationStateStoreFacade federationFacade;
   private SubClusterId homeSubcluster;
@@ -182,6 +184,7 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
 
     if (headroom == null) {
       headroom = new ConcurrentHashMap<>();
+      enhancedHeadroom = new ConcurrentHashMap<>();
     }
     hrAlpha = policy.getHeadroomAlpha();
 
@@ -195,9 +198,14 @@ public class LocalityMulticastAMRMProxyPolicy extends AbstractAMRMProxyPolicy {
       AllocateResponse response) throws YarnException {
     if (response.getAvailableResources() != null) {
       headroom.put(subClusterId, response.getAvailableResources());
-      LOG.info("Subcluster {} updated with {} memory headroom", subClusterId,
-          response.getAvailableResources().getMemorySize());
     }
+    if (response.getEnhancedHeadroom() != null) {
+      this.enhancedHeadroom.put(subClusterId, response.getEnhancedHeadroom());
+    }
+    LOG.info(
+        "Subcluster {} updated with AvailableResource {}, EnhancedHeadRoom {}",
+        subClusterId, response.getAvailableResources(),
+        response.getEnhancedHeadroom());
   }
 
   @Override

+ 13 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DefaultAMSProcessor.java

@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.CollectorInfo;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
+import org.apache.hadoop.yarn.api.records.EnhancedHeadroom;
 import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -334,6 +336,17 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
         .pullJustFinishedContainers());
     response.setAvailableResources(allocation.getResourceLimit());
 
+    QueueMetrics queueMetrics =
+        this.rmContext.getScheduler().getRootQueueMetrics();
+    if (queueMetrics != null) {
+      int totalVirtualCores =
+          queueMetrics.getAllocatedVirtualCores() + queueMetrics
+              .getAvailableVirtualCores();
+      int pendingContainers = queueMetrics.getPendingContainers();
+      response.setEnhancedHeadroom(
+          EnhancedHeadroom.newInstance(pendingContainers, totalVirtualCores));
+    }
+
     addToContainerUpdates(response, allocation,
         ((AbstractYarnScheduler)getScheduler())
             .getApplicationAttempt(appAttemptId).pullUpdateContainerErrors());