Explorar o código

YARN-4512 [YARN-1011]. Provide a knob to turn on over-allocation. (kasha)

Karthik Kambatla %!s(int64=9) %!d(string=hai) anos
pai
achega
6389e0a27b
Modificáronse 14 ficheiros con 457 adicións e 12 borrados
  1. 12 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 21 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
  3. 10 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
  4. 45 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
  5. 45 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java
  6. 45 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java
  7. 106 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java
  8. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java
  9. 10 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  10. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  11. 17 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  12. 3 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  13. 34 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
  14. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

+ 12 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -2108,7 +2108,6 @@ public class YarnConfiguration extends Configuration {
   public static final boolean DEFAULT_NM_LINUX_CONTAINER_CGROUPS_STRICT_RESOURCE_USAGE =
       false;
 
-
   // Configurations for applicaiton life time monitor feature
   public static final String RM_APPLICATION_MONITOR_INTERVAL_MS =
       RM_PREFIX + "application-timeouts.monitor.interval-ms";
@@ -2116,6 +2115,18 @@ public class YarnConfiguration extends Configuration {
   public static final long DEFAULT_RM_APPLICATION_MONITOR_INTERVAL_MS =
       3000;
 
+  /** Overallocation (= allocation based on utilization) configs. */
+  public static final String NM_OVERALLOCATION_ALLOCATION_THRESHOLD =
+      NM_PREFIX + "overallocation.allocation-threshold";
+  public static final float DEFAULT_NM_OVERALLOCATION_ALLOCATION_THRESHOLD
+      = 0f;
+  @Private
+  public static final float MAX_NM_OVERALLOCATION_ALLOCATION_THRESHOLD = 0.95f;
+  public static final String NM_OVERALLOCATION_PREEMPTION_THRESHOLD =
+      NM_PREFIX + "overallocation.preemption-threshold";
+  public static final float DEFAULT_NM_OVERALLOCATION_PREEMPTION_THRESHOLD
+      = 0f;
+
   /**
    * Interval of time the linux container executor should try cleaning up
    * cgroups entry when cleaning up a container. This is required due to what 

+ 21 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml

@@ -1725,6 +1725,27 @@
     <value>default</value>
   </property>
 
+  <property>
+    <description>The extent of over-allocation (container-allocation based on
+      current utilization instead of prior allocation) allowed on this node,
+      expressed as a float between 0 and 0.95. By default, over-allocation is
+      turned off (value = 0). When turned on, the node allows running
+      OPPORTUNISTIC containers when the aggregate utilization is under the
+      value specified here multiplied by the node's advertised capacity.
+    </description>
+    <name>yarn.nodemanager.overallocation.allocation-threshold</name>
+    <value>0f</value>
+  </property>
+
+  <property>
+    <description>When a node is over-allocated to improve utilization by
+      running OPPORTUNISTIC containers, this config captures the utilization
+      beyond which OPPORTUNISTIC containers should start getting preempted.
+    </description>
+    <name>yarn.nodemanager.overallocation.preemption-threshold</name>
+    <value>1</value>
+  </property>
+
   <property>
     <description>This configuration setting determines the capabilities
       assigned to docker containers when they are launched. While these may not

+ 10 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java

@@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.util.Records;
 
 public abstract class RegisterNodeManagerRequest {
@@ -42,14 +43,14 @@ public abstract class RegisterNodeManagerRequest {
       List<NMContainerStatus> containerStatuses,
       List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels) {
     return newInstance(nodeId, httpPort, resource, nodeManagerVersionId,
-        containerStatuses, runningApplications, nodeLabels, null);
+        containerStatuses, runningApplications, nodeLabels, null, null);
   }
 
   public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
       int httpPort, Resource resource, String nodeManagerVersionId,
       List<NMContainerStatus> containerStatuses,
       List<ApplicationId> runningApplications, Set<NodeLabel> nodeLabels,
-      Resource physicalResource) {
+      Resource physicalResource, OverAllocationInfo overAllocationInfo) {
     RegisterNodeManagerRequest request =
         Records.newRecord(RegisterNodeManagerRequest.class);
     request.setHttpPort(httpPort);
@@ -60,9 +61,10 @@ public abstract class RegisterNodeManagerRequest {
     request.setRunningApplications(runningApplications);
     request.setNodeLabels(nodeLabels);
     request.setPhysicalResource(physicalResource);
+    request.setOverAllocationInfo(overAllocationInfo);
     return request;
   }
-  
+
   public abstract NodeId getNodeId();
   public abstract int getHttpPort();
   public abstract Resource getResource();
@@ -70,7 +72,11 @@ public abstract class RegisterNodeManagerRequest {
   public abstract List<NMContainerStatus> getNMContainerStatuses();
   public abstract Set<NodeLabel> getNodeLabels();
   public abstract void setNodeLabels(Set<NodeLabel> nodeLabels);
-  
+
+  public abstract OverAllocationInfo getOverAllocationInfo();
+  public abstract void setOverAllocationInfo(
+      OverAllocationInfo overAllocationInfo);
+
   /**
    * We introduce this here because currently YARN RM doesn't persist nodes info
    * for application running. When RM restart happened, we cannot determinate if

+ 45 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java

@@ -42,12 +42,15 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregation
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
-    
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.OverAllocationInfoPBImpl;
+
 public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
   RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
   RegisterNodeManagerRequestProto.Builder builder = null;
@@ -58,6 +61,7 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
   private List<NMContainerStatus> containerStatuses = null;
   private List<ApplicationId> runningApplications = null;
   private Set<NodeLabel> labels = null;
+  private OverAllocationInfo overAllocationInfo = null;
 
   private List<LogAggregationReport> logAggregationReportsForApps = null;
 
@@ -107,6 +111,11 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     if (this.logAggregationReportsForApps != null) {
       addLogAggregationStatusForAppsToProto();
     }
+
+    if (this.overAllocationInfo != null) {
+      builder.setOverAllocationInfo(
+          convertToProtoFormat(this.overAllocationInfo));
+    }
   }
 
   private void addLogAggregationStatusForAppsToProto() {
@@ -387,7 +396,30 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
     builder.clearNodeLabels();
     this.labels = nodeLabels;
   }
-  
+
+  @Override
+  public synchronized OverAllocationInfo getOverAllocationInfo() {
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.overAllocationInfo != null) {
+      return this.overAllocationInfo;
+    }
+    if (!p.hasOverAllocationInfo()) {
+      return null;
+    }
+    this.overAllocationInfo = convertFromProtoFormat(p.getOverAllocationInfo());
+    return this.overAllocationInfo;
+  }
+
+  @Override
+  public synchronized void setOverAllocationInfo(
+      OverAllocationInfo overAllocationInfo) {
+    maybeInitBuilder();
+    if (this.overAllocationInfo == null) {
+      builder.clearOverAllocationInfo();
+    }
+    this.overAllocationInfo = overAllocationInfo;
+  }
+
   private synchronized void initNodeLabels() {
     if (this.labels != null) {
       return;
@@ -475,9 +507,19 @@ public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest
   @Override
   public synchronized void setLogAggregationReportsForApps(
       List<LogAggregationReport> logAggregationStatusForApps) {
-    if(logAggregationStatusForApps == null) {
+    if (logAggregationStatusForApps == null) {
       builder.clearLogAggregationReportsForApps();
     }
     this.logAggregationReportsForApps = logAggregationStatusForApps;
   }
+
+  private static OverAllocationInfoProto convertToProtoFormat(
+      OverAllocationInfo overAllocationInfo) {
+    return ((OverAllocationInfoPBImpl)overAllocationInfo).getProto();
+  }
+
+  private static OverAllocationInfo convertFromProtoFormat(
+      OverAllocationInfoProto proto) {
+    return new OverAllocationInfoPBImpl(proto);
+  }
 }

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/OverAllocationInfo.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.api.records.impl.pb
+    .OverAllocationInfoPBImpl;
+
+/**
+ * Captures information on how aggressively the scheduler can over-allocate
+ * OPPORTUNISTIC containers on a node. This is node-specific, and is sent on
+ * the wire on each heartbeat.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class OverAllocationInfo {
+  public static OverAllocationInfo newInstance(
+      ResourceThresholds overAllocationThresholds) {
+    OverAllocationInfo info = new OverAllocationInfoPBImpl();
+    info.setOverAllocationThreshold(overAllocationThresholds);
+    return info;
+  }
+
+  public abstract ResourceThresholds getOverAllocationThresholds();
+
+  public abstract void setOverAllocationThreshold(
+      ResourceThresholds resourceThresholds);
+}

+ 45 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ResourceThresholds.java

@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.ResourceThresholdsPBImpl;
+
+/**
+ * Captures resource thresholds to be used for allocation and preemption
+ * when over-allocation is turned on.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public abstract class ResourceThresholds {
+  public static ResourceThresholds newInstance(float threshold) {
+    ResourceThresholds thresholds = new ResourceThresholdsPBImpl();
+    thresholds.setMemoryThreshold(threshold);
+    thresholds.setCpuThreshold(threshold);
+    return thresholds;
+  }
+
+  public abstract float getMemoryThreshold();
+
+  public abstract float getCpuThreshold();
+
+  public abstract void setMemoryThreshold(float memoryThreshold);
+
+  public abstract void setCpuThreshold(float cpuThreshold);
+}

+ 106 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/OverAllocationInfoPBImpl.java

@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.OverAllocationInfoProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProto;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+
+public class OverAllocationInfoPBImpl extends OverAllocationInfo {
+  private OverAllocationInfoProto proto =
+      OverAllocationInfoProto.getDefaultInstance();
+  private OverAllocationInfoProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  private ResourceThresholds overAllocationThresholds = null;
+
+  public OverAllocationInfoPBImpl() {
+    builder = OverAllocationInfoProto.newBuilder();
+  }
+
+  public OverAllocationInfoPBImpl(OverAllocationInfoProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public synchronized OverAllocationInfoProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    if (overAllocationThresholds != null) {
+      builder.setOverAllocationThresholds(
+          convertToProtoFormat(overAllocationThresholds));
+    }
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = OverAllocationInfoProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public synchronized ResourceThresholds getOverAllocationThresholds() {
+    OverAllocationInfoProtoOrBuilder p = viaProto ? proto : builder;
+    if (overAllocationThresholds != null) {
+      return overAllocationThresholds;
+    }
+    if (!p.hasOverAllocationThresholds()) {
+      return null;
+    }
+    overAllocationThresholds =
+        convertFromProtoFormat(p.getOverAllocationThresholds());
+    return overAllocationThresholds;
+  }
+
+  @Override
+  public synchronized void setOverAllocationThreshold(
+      ResourceThresholds resourceThresholds) {
+    maybeInitBuilder();
+    if (this.overAllocationThresholds != null) {
+      builder.clearOverAllocationThresholds();
+    }
+    this.overAllocationThresholds = resourceThresholds;
+  }
+
+  private static ResourceThresholdsProto convertToProtoFormat(
+      ResourceThresholds overAllocationThresholds) {
+    return ((ResourceThresholdsPBImpl) overAllocationThresholds).getProto();
+  }
+
+  private static ResourceThresholds convertFromProtoFormat(
+      ResourceThresholdsProto overAllocationThresholdsProto) {
+    return new ResourceThresholdsPBImpl(overAllocationThresholdsProto);
+  }
+}

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ResourceThresholdsPBImpl.java

@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ResourceThresholdsProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
+
+public class ResourceThresholdsPBImpl extends ResourceThresholds{
+  private ResourceThresholdsProto proto =
+      ResourceThresholdsProto.getDefaultInstance();
+  private ResourceThresholdsProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public ResourceThresholdsPBImpl() {
+    builder = ResourceThresholdsProto.newBuilder();
+  }
+
+  public ResourceThresholdsPBImpl(ResourceThresholdsProto proto) {
+    this.proto = proto;
+    viaProto = true;
+  }
+
+  public synchronized ResourceThresholdsProto getProto() {
+    mergeLocalToProto();
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return proto;
+  }
+
+  private synchronized void mergeLocalToProto() {
+    if (viaProto) {
+      maybeInitBuilder();
+    }
+    mergeLocalToBuilder();
+    proto = builder.build();
+    viaProto = true;
+  }
+
+  private synchronized void mergeLocalToBuilder() {
+    /*
+     * Right now, we have only memory and cpu thresholds that are floats.
+     * This is a no-op until we add other non-static fields to the proto.
+     */
+  }
+
+  private synchronized void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ResourceThresholdsProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public synchronized float getMemoryThreshold() {
+    ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMemory();
+  }
+
+  @Override
+  public synchronized float getCpuThreshold() {
+    ResourceThresholdsProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getCpu();
+  }
+
+  @Override
+  public synchronized void setMemoryThreshold(float memoryThreshold) {
+    maybeInitBuilder();
+    builder.setMemory(memoryThreshold);
+  }
+
+  @Override
+  public synchronized void setCpuThreshold(float cpuThreshold) {
+    maybeInitBuilder();
+    builder.setCpu(cpuThreshold);
+  }
+}

+ 10 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -72,6 +72,7 @@ message RegisterNodeManagerRequestProto {
   optional NodeLabelsProto nodeLabels = 8;
   optional ResourceProto physicalResource = 9;
   repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
+  optional OverAllocationInfoProto overAllocationInfo = 11;
 }
 
 message RegisterNodeManagerResponseProto {
@@ -203,3 +204,12 @@ message SCMUploaderCanUploadRequestProto {
 message SCMUploaderCanUploadResponseProto {
   optional bool uploadable = 1;
 }
+
+message OverAllocationInfoProto {
+  optional ResourceThresholdsProto over_allocation_thresholds = 1;
+}
+
+message ResourceThresholdsProto {
+  optional float memory = 1 [default = 0];
+  optional float cpu = 2 [default = 0];
+}

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
@@ -137,4 +138,8 @@ public interface Context {
    * @return the NM {@code DeletionService}.
    */
   DeletionService getDeletionService();
+
+  boolean isOverAllocationEnabled();
+
+  OverAllocationInfo getOverAllocationInfo();
 }

+ 17 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -53,6 +53,7 @@ import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.collectormanager.NMCollectorService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
@@ -684,6 +685,8 @@ public class NodeManager extends CompositeService
 
     private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
 
+    private OverAllocationInfo overAllocationInfo;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager aclsManager,
@@ -831,6 +834,20 @@ public class NodeManager extends CompositeService
       this.nodeStatusUpdater = nodeStatusUpdater;
     }
 
+    @Override
+    public boolean isOverAllocationEnabled() {
+      return getOverAllocationInfo() != null;
+    }
+
+    @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return this.overAllocationInfo;
+    }
+
+    public void setOverAllocationInfo(OverAllocationInfo overAllocationInfo) {
+      this.overAllocationInfo = overAllocationInfo;
+    }
+
     public boolean isDistributedSchedulingEnabled() {
       return isDistSchedulingEnabled;
     }

+ 3 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -387,8 +387,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
       RegisterNodeManagerRequest request =
           RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
               nodeManagerVersionId, containerReports, getRunningApplications(),
-              nodeLabels, physicalResource);
-
+              nodeLabels, physicalResource, context.getOverAllocationInfo());
       if (containerReports != null) {
         LOG.info("Registering with RM using containers :" + containerReports);
       }
@@ -519,8 +518,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
         = getIncreasedContainers();
     NodeStatus nodeStatus =
         NodeStatus.newInstance(nodeId, responseId, containersStatuses,
-          createKeepAliveApplicationList(), nodeHealthStatus,
-          containersUtilization, nodeUtilization, increasedContainers);
+            createKeepAliveApplicationList(), nodeHealthStatus,
+            containersUtilization, nodeUtilization, increasedContainers);
 
     nodeStatus.setOpportunisticContainersStatus(
         getOpportunisticContainersStatus());

+ 34 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -38,8 +38,11 @@ import org.apache.hadoop.yarn.api.records.ResourceUtilization;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
+import org.apache.hadoop.yarn.server.api.records.ResourceThresholds;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerKillEvent;
@@ -110,6 +113,8 @@ public class ContainersMonitorImpl extends AbstractService implements
 
   private ResourceUtilization containersUtilization;
 
+  private ResourceThresholds overAllocationPreemptionThresholds;
+
   private volatile boolean stopped = false;
 
   public ContainersMonitorImpl(ContainerExecutor exec,
@@ -213,6 +218,13 @@ public class ContainersMonitorImpl extends AbstractService implements
       }
     }
 
+    initializeOverAllocation(conf);
+    if (context.isOverAllocationEnabled()) {
+      pmemCheckEnabled = true;
+      LOG.info("Force enabling physical memory checks because " +
+          "overallocation is enabled");
+    }
+
     containersMonitorEnabled =
         isContainerMonitorEnabled() && monitoringInterval > 0;
     LOG.info("ContainersMonitor enabled: " + containersMonitorEnabled);
@@ -264,6 +276,28 @@ public class ContainersMonitorImpl extends AbstractService implements
             pId, processTreeClass, conf);
   }
 
+  private void initializeOverAllocation(Configuration conf) {
+    float overAllocationTreshold = conf.getFloat(
+        YarnConfiguration.NM_OVERALLOCATION_ALLOCATION_THRESHOLD,
+        YarnConfiguration.DEFAULT_NM_OVERALLOCATION_ALLOCATION_THRESHOLD);
+    overAllocationTreshold = Math.min(overAllocationTreshold,
+        YarnConfiguration.MAX_NM_OVERALLOCATION_ALLOCATION_THRESHOLD);
+    overAllocationTreshold = Math.max(0, overAllocationTreshold);
+
+    if (overAllocationTreshold > 0f) {
+      ((NodeManager.NMContext) context).setOverAllocationInfo(
+          OverAllocationInfo.newInstance(
+              ResourceThresholds.newInstance(overAllocationTreshold)));
+
+      float preemptionThreshold = conf.getFloat(
+          YarnConfiguration.NM_OVERALLOCATION_PREEMPTION_THRESHOLD,
+          YarnConfiguration.DEFAULT_NM_OVERALLOCATION_PREEMPTION_THRESHOLD);
+
+      this.overAllocationPreemptionThresholds =
+          ResourceThresholds.newInstance(preemptionThreshold);
+    }
+  }
+
   private boolean isResourceCalculatorAvailable() {
     if (resourceCalculatorPlugin == null) {
       LOG.info("ResourceCalculatorPlugin is unavailable on this system. " + this

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerStateTransitionListener;
+import org.apache.hadoop.yarn.server.api.records.OverAllocationInfo;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -771,6 +772,16 @@ public abstract class BaseAMRMProxyTest {
       return null;
     }
 
+    @Override
+    public boolean isOverAllocationEnabled() {
+      return false;
+    }
+
+    @Override
+    public OverAllocationInfo getOverAllocationInfo() {
+      return null;
+    }
+
     @Override
     public NodeResourceMonitor getNodeResourceMonitor() {
       return null;