Browse Source

YARN-2888. Corrective mechanisms for rebalancing NM container queues. (asuresh)

Arun Suresh 9 years ago
parent
commit
f0ac18d001
23 changed files with 851 additions and 243 deletions
  1. 39 9
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
  2. 12 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java
  3. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
  4. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
  5. 44 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java
  6. 80 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java
  7. 6 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
  8. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
  9. 4 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
  10. 92 79
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
  11. 42 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java
  12. 9 3
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  13. 38 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java
  14. 2 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
  15. 5 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java
  16. 117 77
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java
  17. 4 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
  18. 14 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
  19. 2 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
  20. 7 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  21. 97 49
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java
  22. 125 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java
  23. 64 16
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java

+ 39 - 9
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java

@@ -344,17 +344,47 @@ public class YarnConfiguration extends Configuration {
       YARN_PREFIX + "distributed-scheduling.top-k";
       YARN_PREFIX + "distributed-scheduling.top-k";
   public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
   public static final int DIST_SCHEDULING_TOP_K_DEFAULT = 10;
 
 
-  /** Frequency for computing Top K Best Nodes */
-  public static final String DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS =
-      YARN_PREFIX + "distributed-scheduling.top-k-compute-interval-ms";
-  public static final long DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT = 1000;
-
-  /** Comparator for determining Node Load for Distributed Scheduling */
-  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR =
-      YARN_PREFIX + "distributed-scheduling.top-k-comparator";
-  public static final String DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT =
+  /** Frequency for computing least loaded NMs. */
+  public static final String NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS =
+      YARN_PREFIX + "nm-container-queuing.sorting-nodes-interval-ms";
+  public static final long
+      NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT = 1000;
+
+  /** Comparator for determining Node Load for Distributed Scheduling. */
+  public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR =
+      YARN_PREFIX + "nm-container-queuing.load-comparator";
+  public static final String NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT =
       "QUEUE_LENGTH";
       "QUEUE_LENGTH";
 
 
+  /** Value of standard deviation used for calculation of queue limit
+   * thresholds. */
+  public static final String NM_CONTAINER_QUEUING_LIMIT_STDEV =
+      YARN_PREFIX + "nm-container-queuing.queue-limit-stdev";
+  public static final float NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT =
+      1.0f;
+
+  /** Min length of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH =
+      YARN_PREFIX + "nm-container-queuing.min-queue-length";
+  public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT = 1;
+
+  /** Max length of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH =
+      YARN_PREFIX + "nm-container-queuing.max-queue-length";
+  public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT = 10;
+
+  /** Min wait time of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS =
+      YARN_PREFIX + "nm-container-queuing.min-queue-wait-time-ms";
+  public static final int NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT =
+      1;
+
+  /** Max wait time of container queue at NodeManager. */
+  public static final String NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS =
+      YARN_PREFIX + "nm-container-queuing.max-queue-wait-time-ms";
+  public static final int NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT =
+      10;
+
   /**
   /**
    * Enable/disable intermediate-data encryption at YARN level. For now, this
    * Enable/disable intermediate-data encryption at YARN level. For now, this
    * only is used by the FileSystemRMStateStore to setup right file-system
    * only is used by the FileSystemRMStateStore to setup right file-system

+ 12 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/conf/TestYarnConfigurationFields.java

@@ -135,9 +135,19 @@ public class TestYarnConfigurationFields extends TestConfigurationFieldsBase {
     configurationPrefixToSkipCompare
     configurationPrefixToSkipCompare
         .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
         .add(YarnConfiguration.DIST_SCHEDULING_TOP_K);
     configurationPrefixToSkipCompare
     configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS);
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
     configurationPrefixToSkipCompare
     configurationPrefixToSkipCompare
-        .add(YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR);
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
+    configurationPrefixToSkipCompare
+        .add(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV);
 
 
     // Set by container-executor.cfg
     // Set by container-executor.cfg
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);
     configurationPrefixToSkipCompare.add(YarnConfiguration.NM_USER_HOME_DIR);

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java

@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
 
@@ -78,4 +79,7 @@ public interface NodeHeartbeatResponse {
 
 
   List<Container> getContainersToDecrease();
   List<Container> getContainersToDecrease();
   void addAllContainersToDecrease(Collection<Container> containersToDecrease);
   void addAllContainersToDecrease(Collection<Container> containersToDecrease);
+
+  ContainerQueuingLimit getContainerQueuingLimit();
+  void setContainerQueuingLimit(ContainerQueuingLimit containerQueuingLimit);
 }
 }

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java

@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
@@ -46,8 +47,10 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatR
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import org.apache.hadoop.yarn.server.api.records.impl.pb.ContainerQueuingLimitPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
 
 
 
 
@@ -65,6 +68,7 @@ public class NodeHeartbeatResponsePBImpl extends
 
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
+  private ContainerQueuingLimit containerQueuingLimit = null;
   private List<Container> containersToDecrease = null;
   private List<Container> containersToDecrease = null;
   private List<SignalContainerRequest> containersToSignal = null;
   private List<SignalContainerRequest> containersToSignal = null;
 
 
@@ -102,6 +106,10 @@ public class NodeHeartbeatResponsePBImpl extends
       builder.setNmTokenMasterKey(
       builder.setNmTokenMasterKey(
           convertToProtoFormat(this.nmTokenMasterKey));
           convertToProtoFormat(this.nmTokenMasterKey));
     }
     }
+    if (this.containerQueuingLimit != null) {
+      builder.setContainerQueuingLimit(
+          convertToProtoFormat(this.containerQueuingLimit));
+    }
     if (this.systemCredentials != null) {
     if (this.systemCredentials != null) {
       addSystemCredentialsToProto();
       addSystemCredentialsToProto();
     }
     }
@@ -196,6 +204,30 @@ public class NodeHeartbeatResponsePBImpl extends
     this.nmTokenMasterKey = masterKey;
     this.nmTokenMasterKey = masterKey;
   }
   }
 
 
+  @Override
+  public ContainerQueuingLimit getContainerQueuingLimit() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    if (this.containerQueuingLimit != null) {
+      return this.containerQueuingLimit;
+    }
+    if (!p.hasContainerQueuingLimit()) {
+      return null;
+    }
+    this.containerQueuingLimit =
+        convertFromProtoFormat(p.getContainerQueuingLimit());
+    return this.containerQueuingLimit;
+  }
+
+  @Override
+  public void setContainerQueuingLimit(ContainerQueuingLimit
+      containerQueuingLimit) {
+    maybeInitBuilder();
+    if (containerQueuingLimit == null) {
+      builder.clearContainerQueuingLimit();
+    }
+    this.containerQueuingLimit = containerQueuingLimit;
+  }
+
   @Override
   @Override
   public NodeAction getNodeAction() {
   public NodeAction getNodeAction() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
@@ -638,6 +670,16 @@ public class NodeHeartbeatResponsePBImpl extends
     builder.addAllContainersToSignal(iterable);
     builder.addAllContainersToSignal(iterable);
   }
   }
 
 
+  private ContainerQueuingLimit convertFromProtoFormat(
+      ContainerQueuingLimitProto p) {
+    return new ContainerQueuingLimitPBImpl(p);
+  }
+
+  private ContainerQueuingLimitProto convertToProtoFormat(
+      ContainerQueuingLimit c) {
+    return ((ContainerQueuingLimitPBImpl)c).getProto();
+  }
+
   private SignalContainerRequestPBImpl convertFromProtoFormat(
   private SignalContainerRequestPBImpl convertFromProtoFormat(
       SignalContainerRequestProto p) {
       SignalContainerRequestProto p) {
     return new SignalContainerRequestPBImpl(p);
     return new SignalContainerRequestPBImpl(p);

+ 44 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/ContainerQueuingLimit.java

@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records;
+
+import org.apache.hadoop.yarn.util.Records;
+
+/**
+ * Used to hold max wait time / queue length information to be
+ * passed back to the NodeManager.
+ */
+public abstract class ContainerQueuingLimit {
+
+  public static ContainerQueuingLimit newInstance() {
+    ContainerQueuingLimit containerQueuingLimit =
+        Records.newRecord(ContainerQueuingLimit.class);
+    containerQueuingLimit.setMaxQueueLength(-1);
+    containerQueuingLimit.setMaxQueueWaitTimeInMs(-1);
+    return containerQueuingLimit;
+  }
+
+  public abstract int getMaxQueueLength();
+
+  public abstract void setMaxQueueLength(int queueLength);
+
+  public abstract int getMaxQueueWaitTimeInMs();
+
+  public abstract void setMaxQueueWaitTimeInMs(int waitTime);
+}

+ 80 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/ContainerQueuingLimitPBImpl.java

@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.api.records.impl.pb;
+
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+
+/**
+ * Implementation of ContainerQueuingLimit interface.
+ */
+public class ContainerQueuingLimitPBImpl extends ContainerQueuingLimit {
+
+  private ContainerQueuingLimitProto proto =
+      ContainerQueuingLimitProto.getDefaultInstance();
+  private ContainerQueuingLimitProto.Builder builder = null;
+  private boolean viaProto = false;
+
+  public ContainerQueuingLimitPBImpl() {
+    builder = ContainerQueuingLimitProto.newBuilder();
+  }
+
+  public ContainerQueuingLimitPBImpl(ContainerQueuingLimitProto proto) {
+    this.proto = proto;
+    this.viaProto = true;
+  }
+
+  public ContainerQueuingLimitProto getProto() {
+    proto = viaProto ? proto : builder.build();
+    viaProto = true;
+    return  proto;
+  }
+
+  private void maybeInitBuilder() {
+    if (viaProto || builder == null) {
+      builder = ContainerQueuingLimitProto.newBuilder(proto);
+    }
+    viaProto = false;
+  }
+
+  @Override
+  public int getMaxQueueWaitTimeInMs() {
+    ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMaxQueueWaitTimeInMs();
+  }
+
+  @Override
+  public void setMaxQueueWaitTimeInMs(int waitTime) {
+    maybeInitBuilder();
+    builder.setMaxQueueWaitTimeInMs(waitTime);
+  }
+
+  @Override
+  public int getMaxQueueLength() {
+    ContainerQueuingLimitProtoOrBuilder p = viaProto ? proto : builder;
+    return p.getMaxQueueLength();
+  }
+
+  @Override
+  public void setMaxQueueLength(int queueLength) {
+    maybeInitBuilder();
+    builder.setMaxQueueLength(queueLength);
+  }
+}

+ 6 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto

@@ -100,6 +100,12 @@ message NodeHeartbeatResponseProto {
   optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
   optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
   repeated ContainerProto containers_to_decrease = 12;
   repeated ContainerProto containers_to_decrease = 12;
   repeated SignalContainerRequestProto containers_to_signal = 13;
   repeated SignalContainerRequestProto containers_to_signal = 13;
+  optional ContainerQueuingLimitProto container_queuing_limit = 14;
+}
+
+message ContainerQueuingLimitProto {
+  optional int32 max_queue_length = 1;
+  optional int32 max_queue_wait_time_in_ms = 2;
 }
 }
 
 
 message SystemCredentialsForAppsProto {
 message SystemCredentialsForAppsProto {

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java

@@ -23,13 +23,13 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentMap;
 
 
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -82,7 +82,7 @@ public interface Context {
 
 
   NodeHealthStatus getNodeHealthStatus();
   NodeHealthStatus getNodeHealthStatus();
 
 
-  ContainerManagementProtocol getContainerManager();
+  ContainerManager getContainerManager();
 
 
   NodeResourceMonitor getNodeResourceMonitor();
   NodeResourceMonitor getNodeResourceMonitor();
 
 

+ 4 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java

@@ -47,7 +47,6 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
 import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,6 +59,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -465,7 +465,7 @@ public class NodeManager extends CompositeService
 
 
     private final NMContainerTokenSecretManager containerTokenSecretManager;
     private final NMContainerTokenSecretManager containerTokenSecretManager;
     private final NMTokenSecretManagerInNM nmTokenSecretManager;
     private final NMTokenSecretManagerInNM nmTokenSecretManager;
-    private ContainerManagementProtocol containerManager;
+    private ContainerManager containerManager;
     private NodeResourceMonitor nodeResourceMonitor;
     private NodeResourceMonitor nodeResourceMonitor;
     private final LocalDirsHandlerService dirsHandler;
     private final LocalDirsHandlerService dirsHandler;
     private final ApplicationACLsManager aclsManager;
     private final ApplicationACLsManager aclsManager;
@@ -555,11 +555,11 @@ public class NodeManager extends CompositeService
     }
     }
 
 
     @Override
     @Override
-    public ContainerManagementProtocol getContainerManager() {
+    public ContainerManager getContainerManager() {
       return this.containerManager;
       return this.containerManager;
     }
     }
 
 
-    public void setContainerManager(ContainerManagementProtocol containerManager) {
+    public void setContainerManager(ContainerManager containerManager) {
       this.containerManager = containerManager;
       this.containerManager = containerManager;
     }
     }
 
 

+ 92 - 79
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java

@@ -71,13 +71,14 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
+
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
@@ -401,8 +402,7 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
 
 
     LOG.info(successfullRegistrationMsg);
     LOG.info(successfullRegistrationMsg);
     LOG.info("Notifying ContainerManager to unblock new container-requests");
     LOG.info("Notifying ContainerManager to unblock new container-requests");
-    ((ContainerManagerImpl) this.context.getContainerManager())
-      .setBlockNewContainerRequests(false);
+    this.context.getContainerManager().setBlockNewContainerRequests(false);
   }
   }
 
 
   private List<ApplicationId> createKeepAliveApplicationList() {
   private List<ApplicationId> createKeepAliveApplicationList() {
@@ -465,10 +465,8 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
    * @return Resource utilization of all the containers.
    * @return Resource utilization of all the containers.
    */
    */
   private ResourceUtilization getContainersUtilization() {
   private ResourceUtilization getContainersUtilization() {
-    ContainerManagerImpl containerManager =
-        (ContainerManagerImpl) this.context.getContainerManager();
     ContainersMonitor containersMonitor =
     ContainersMonitor containersMonitor =
-        containerManager.getContainersMonitor();
+        this.context.getContainerManager().getContainersMonitor();
     return containersMonitor.getContainersUtilization();
     return containersMonitor.getContainersUtilization();
   }
   }
 
 
@@ -735,7 +733,6 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             Set<NodeLabel> nodeLabelsForHeartbeat =
             Set<NodeLabel> nodeLabelsForHeartbeat =
                 nodeLabelsHandler.getNodeLabelsForHeartbeat();
                 nodeLabelsHandler.getNodeLabelsForHeartbeat();
             NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
             NodeStatus nodeStatus = getNodeStatus(lastHeartbeatID);
-
             NodeHeartbeatRequest request =
             NodeHeartbeatRequest request =
                 NodeHeartbeatRequest.newInstance(nodeStatus,
                 NodeHeartbeatRequest.newInstance(nodeStatus,
                     NodeStatusUpdaterImpl.this.context
                     NodeStatusUpdaterImpl.this.context
@@ -760,82 +757,70 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             nextHeartBeatInterval = response.getNextHeartBeatInterval();
             updateMasterKeys(response);
             updateMasterKeys(response);
 
 
-            if (response.getNodeAction() == NodeAction.SHUTDOWN) {
-              LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
-                  + " heartbeat, hence shutting down.");
-              LOG.warn("Message from ResourceManager: "
-                  + response.getDiagnosticsMessage());
-              context.setDecommissioned(true);
-              dispatcher.getEventHandler().handle(
-                  new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
-              break;
-            }
-            if (response.getNodeAction() == NodeAction.RESYNC) {
-              LOG.warn("Node is out of sync with ResourceManager,"
-                  + " hence resyncing.");
-              LOG.warn("Message from ResourceManager: "
-                  + response.getDiagnosticsMessage());
-              // Invalidate the RMIdentifier while resync
-              NodeStatusUpdaterImpl.this.rmIdentifier =
-                  ResourceManagerConstants.RM_INVALID_IDENTIFIER;
-              dispatcher.getEventHandler().handle(
-                  new NodeManagerEvent(NodeManagerEventType.RESYNC));
-              pendingCompletedContainers.clear();
-              break;
-            }
-
-            nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(response);
-
-            // Explicitly put this method after checking the resync response. We
-            // don't want to remove the completed containers before resync
-            // because these completed containers will be reported back to RM
-            // when NM re-registers with RM.
-            // Only remove the cleanedup containers that are acked
-            removeOrTrackCompletedContainersFromContext(response
+            if (!handleShutdownOrResyncCommand(response)) {
+              nodeLabelsHandler.verifyRMHeartbeatResponseForNodeLabels(
+                  response);
+
+              // Explicitly put this method after checking the resync
+              // response. We
+              // don't want to remove the completed containers before resync
+              // because these completed containers will be reported back to RM
+              // when NM re-registers with RM.
+              // Only remove the cleanedup containers that are acked
+              removeOrTrackCompletedContainersFromContext(response
                   .getContainersToBeRemovedFromNM());
                   .getContainersToBeRemovedFromNM());
 
 
-            logAggregationReportForAppsTempList.clear();
-            lastHeartbeatID = response.getResponseId();
-            List<ContainerId> containersToCleanup = response
-                .getContainersToCleanup();
-            if (!containersToCleanup.isEmpty()) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrCompletedContainersEvent(containersToCleanup,
-                    CMgrCompletedContainersEvent.Reason.BY_RESOURCEMANAGER));
-            }
-            List<ApplicationId> appsToCleanup =
-                response.getApplicationsToCleanup();
-            //Only start tracking for keepAlive on FINISH_APP
-            trackAppsForKeepAlive(appsToCleanup);
-            if (!appsToCleanup.isEmpty()) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrCompletedAppsEvent(appsToCleanup,
-                      CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
-            }
-
-            Map<ApplicationId, ByteBuffer> systemCredentials =
-                response.getSystemCredentialsForApps();
-            if (systemCredentials != null && !systemCredentials.isEmpty()) {
-              ((NMContext) context)
-                .setSystemCrendentialsForApps(parseCredentials(systemCredentials));
-            }
+              logAggregationReportForAppsTempList.clear();
+              lastHeartbeatID = response.getResponseId();
+              List<ContainerId> containersToCleanup = response
+                  .getContainersToCleanup();
+              if (!containersToCleanup.isEmpty()) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrCompletedContainersEvent(containersToCleanup,
+                        CMgrCompletedContainersEvent.Reason
+                            .BY_RESOURCEMANAGER));
+              }
+              List<ApplicationId> appsToCleanup =
+                  response.getApplicationsToCleanup();
+              //Only start tracking for keepAlive on FINISH_APP
+              trackAppsForKeepAlive(appsToCleanup);
+              if (!appsToCleanup.isEmpty()) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrCompletedAppsEvent(appsToCleanup,
+                        CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
+              }
+              Map<ApplicationId, ByteBuffer> systemCredentials =
+                  response.getSystemCredentialsForApps();
+              if (systemCredentials != null && !systemCredentials.isEmpty()) {
+                ((NMContext) context).setSystemCrendentialsForApps(
+                    parseCredentials(systemCredentials));
+              }
+              List<org.apache.hadoop.yarn.api.records.Container>
+                  containersToDecrease = response.getContainersToDecrease();
+              if (!containersToDecrease.isEmpty()) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrDecreaseContainersResourceEvent(
+                        containersToDecrease)
+                );
+              }
 
 
-            List<org.apache.hadoop.yarn.api.records.Container>
-                containersToDecrease = response.getContainersToDecrease();
-            if (!containersToDecrease.isEmpty()) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrDecreaseContainersResourceEvent(containersToDecrease)
-              );
-            }
+              // SignalContainer request originally comes from end users via
+              // ClientRMProtocol's SignalContainer. Forward the request to
+              // ContainerManager which will dispatch the event to
+              // ContainerLauncher.
+              List<SignalContainerRequest> containersToSignal = response
+                  .getContainersToSignalList();
+              if (containersToSignal.size() != 0) {
+                dispatcher.getEventHandler().handle(
+                    new CMgrSignalContainersEvent(containersToSignal));
+              }
 
 
-            // SignalContainer request originally comes from end users via
-            // ClientRMProtocol's SignalContainer. Forward the request to
-            // ContainerManager which will dispatch the event to ContainerLauncher.
-            List<SignalContainerRequest> containersToSignal = response
-                .getContainersToSignalList();
-            if (containersToSignal.size() != 0) {
-              dispatcher.getEventHandler().handle(
-                  new CMgrSignalContainersEvent(containersToSignal));
+              // Update QueuingLimits if ContainerManager supports queuing
+              ContainerQueuingLimit queuingLimit =
+                  response.getContainerQueuingLimit();
+              if (queuingLimit != null) {
+                context.getContainerManager().updateQueuingLimit(queuingLimit);
+              }
             }
             }
           } catch (ConnectException e) {
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             //catch and throw the exception if tried MAX wait time to connect RM
@@ -883,6 +868,34 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     statusUpdater.start();
     statusUpdater.start();
   }
   }
 
 
+  private boolean handleShutdownOrResyncCommand(
+      NodeHeartbeatResponse response) {
+    if (response.getNodeAction() == NodeAction.SHUTDOWN) {
+      LOG.warn("Recieved SHUTDOWN signal from Resourcemanager as part of"
+          + " heartbeat, hence shutting down.");
+      LOG.warn("Message from ResourceManager: "
+          + response.getDiagnosticsMessage());
+      context.setDecommissioned(true);
+      dispatcher.getEventHandler().handle(
+          new NodeManagerEvent(NodeManagerEventType.SHUTDOWN));
+      return true;
+    }
+    if (response.getNodeAction() == NodeAction.RESYNC) {
+      LOG.warn("Node is out of sync with ResourceManager,"
+          + " hence resyncing.");
+      LOG.warn("Message from ResourceManager: "
+          + response.getDiagnosticsMessage());
+      // Invalidate the RMIdentifier while resync
+      NodeStatusUpdaterImpl.this.rmIdentifier =
+          ResourceManagerConstants.RM_INVALID_IDENTIFIER;
+      dispatcher.getEventHandler().handle(
+          new NodeManagerEvent(NodeManagerEventType.RESYNC));
+      pendingCompletedContainers.clear();
+      return true;
+    }
+    return false;
+  }
+
   private List<LogAggregationReport> getLogAggregationReportsForApps(
   private List<LogAggregationReport> getLogAggregationReportsForApps(
       ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
       ConcurrentLinkedQueue<LogAggregationReport> lastestLogAggregationStatus) {
     LogAggregationReport status;
     LogAggregationReport status;

+ 42 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManager.java

@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager;
+
+import org.apache.hadoop.service.ServiceStateChangeListener;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
+    .ContainersMonitor;
+
+/**
+ * The ContainerManager is an entity that manages the life cycle of Containers.
+ */
+public interface ContainerManager extends ServiceStateChangeListener,
+    ContainerManagementProtocol,
+    EventHandler<ContainerManagerEvent> {
+
+  ContainersMonitor getContainersMonitor();
+
+  void updateQueuingLimit(ContainerQueuingLimit queuingLimit);
+
+  void setBlockNewContainerRequests(boolean blockNewContainerRequests);
+
+}

+ 9 - 3
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -53,7 +53,6 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.service.Service;
-import org.apache.hadoop.service.ServiceStateChangeListener;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
@@ -95,6 +94,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Containe
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
 import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
@@ -150,8 +150,7 @@ import com.google.protobuf.ByteString;
 import org.apache.hadoop.yarn.util.resource.Resources;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 
 public class ContainerManagerImpl extends CompositeService implements
 public class ContainerManagerImpl extends CompositeService implements
-    ServiceStateChangeListener, ContainerManagementProtocol,
-    EventHandler<ContainerManagerEvent> {
+    ContainerManager {
 
 
   /**
   /**
    * Extra duration to wait for applications to be killed on shutdown.
    * Extra duration to wait for applications to be killed on shutdown.
@@ -410,6 +409,7 @@ public class ContainerManagerImpl extends CompositeService implements
     }
     }
   }
   }
 
 
+  @Override
   public ContainersMonitor getContainersMonitor() {
   public ContainersMonitor getContainersMonitor() {
     return this.containersMonitor;
     return this.containersMonitor;
   }
   }
@@ -1398,6 +1398,7 @@ public class ContainerManagerImpl extends CompositeService implements
     }
     }
   }
   }
 
 
+  @Override
   public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
   public void setBlockNewContainerRequests(boolean blockNewContainerRequests) {
     this.blockNewContainerRequests.set(blockNewContainerRequests);
     this.blockNewContainerRequests.set(blockNewContainerRequests);
   }
   }
@@ -1434,4 +1435,9 @@ public class ContainerManagerImpl extends CompositeService implements
   protected boolean isServiceStopped() {
   protected boolean isServiceStopped() {
     return serviceStopped;
     return serviceStopped;
   }
   }
+
+  @Override
+  public void updateQueuingLimit(ContainerQueuingLimit queuingLimit) {
+    LOG.trace("Implementation does not support queuing of Containers !!");
+  }
 }
 }

+ 38 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/queuing/QueuingContainerManagerImpl.java

@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -83,6 +84,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
   private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
   private Queue<AllocatedContainerInfo> queuedOpportunisticContainers;
 
 
   private Set<ContainerId> opportunisticContainersToKill;
   private Set<ContainerId> opportunisticContainersToKill;
+  private final ContainerQueuingLimit queuingLimit;
 
 
   public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
   public QueuingContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
@@ -95,6 +97,7 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
     this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
     this.queuedOpportunisticContainers = new ConcurrentLinkedQueue<>();
     this.opportunisticContainersToKill = Collections.synchronizedSet(
     this.opportunisticContainersToKill = Collections.synchronizedSet(
         new HashSet<ContainerId>());
         new HashSet<ContainerId>());
+    this.queuingLimit = ContainerQueuingLimit.newInstance();
   }
   }
 
 
   @Override
   @Override
@@ -526,6 +529,41 @@ public class QueuingContainerManagerImpl extends ContainerManagerImpl {
     }
     }
   }
   }
 
 
+  @Override
+  public void updateQueuingLimit(ContainerQueuingLimit limit) {
+    this.queuingLimit.setMaxQueueLength(limit.getMaxQueueLength());
+    // TODO: Include wait time as well once it is implemented
+    if (this.queuingLimit.getMaxQueueLength() > -1) {
+      shedQueuedOpportunisticContainers();
+    }
+  }
+
+  private void shedQueuedOpportunisticContainers() {
+    int numAllowed = this.queuingLimit.getMaxQueueLength();
+    Iterator<AllocatedContainerInfo> containerIter =
+        queuedOpportunisticContainers.iterator();
+    while (containerIter.hasNext()) {
+      AllocatedContainerInfo cInfo = containerIter.next();
+      if (numAllowed <= 0) {
+        containerIter.remove();
+        ContainerTokenIdentifier containerTokenIdentifier = this.context
+            .getQueuingContext().getQueuedContainers().remove(
+                cInfo.getContainerTokenIdentifier().getContainerID());
+        // The Container might have already started while we were
+        // iterating..
+        if (containerTokenIdentifier != null) {
+          this.context.getQueuingContext().getKilledQueuedContainers()
+              .putIfAbsent(cInfo.getContainerTokenIdentifier(),
+                  "Container De-queued to meet global queuing limits. "
+                      + "Max Queue length["
+                      + this.queuingLimit.getMaxQueueLength() + "]");
+        }
+      }
+      numAllowed--;
+    }
+  }
+
+
   static class AllocatedContainerInfo {
   static class AllocatedContainerInfo {
     private final ContainerTokenIdentifier containerTokenIdentifier;
     private final ContainerTokenIdentifier containerTokenIdentifier;
     private final StartContainerRequest startRequest;
     private final StartContainerRequest startRequest;

+ 2 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java

@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.NodeResourceMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -642,7 +643,7 @@ public abstract class BaseAMRMProxyTest {
     }
     }
 
 
     @Override
     @Override
-    public ContainerManagementProtocol getContainerManager() {
+    public ContainerManager getContainerManager() {
       return null;
       return null;
     }
     }
 
 

+ 5 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClusterMonitor.java

@@ -24,13 +24,17 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 
 
 import java.util.List;
 import java.util.List;
 
 
+/**
+ * Implementations of this class are notified of changes to the cluster's state,
+ * such as node addition, removal and updates.
+ */
 public interface ClusterMonitor {
 public interface ClusterMonitor {
 
 
   void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
   void addNode(List<NMContainerStatus> containerStatuses, RMNode rmNode);
 
 
   void removeNode(RMNode removedRMNode);
   void removeNode(RMNode removedRMNode);
 
 
-  void nodeUpdate(RMNode rmNode);
+  void updateNode(RMNode rmNode);
 
 
   void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
   void updateNodeResource(RMNode rmNode, ResourceOption resourceOption);
 }
 }

+ 117 - 77
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/DistributedSchedulingService.java

@@ -46,8 +46,10 @@ import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
 import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
 
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed
-    .TopKNodeSelector;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
+
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -57,7 +59,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretMan
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.InetSocketAddress;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentHashMap;
@@ -76,30 +77,64 @@ public class DistributedSchedulingService extends ApplicationMasterService
   private static final Log LOG =
   private static final Log LOG =
       LogFactory.getLog(DistributedSchedulingService.class);
       LogFactory.getLog(DistributedSchedulingService.class);
 
 
-  private final TopKNodeSelector clusterMonitor;
+  private final NodeQueueLoadMonitor nodeMonitor;
 
 
   private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
   private final ConcurrentHashMap<String, Set<NodeId>> rackToNode =
       new ConcurrentHashMap<>();
       new ConcurrentHashMap<>();
   private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
   private final ConcurrentHashMap<String, Set<NodeId>> hostToNode =
       new ConcurrentHashMap<>();
       new ConcurrentHashMap<>();
+  private final int k;
 
 
   public DistributedSchedulingService(RMContext rmContext,
   public DistributedSchedulingService(RMContext rmContext,
       YarnScheduler scheduler) {
       YarnScheduler scheduler) {
     super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
     super(DistributedSchedulingService.class.getName(), rmContext, scheduler);
-    int k = rmContext.getYarnConfiguration().getInt(
+    this.k = rmContext.getYarnConfiguration().getInt(
         YarnConfiguration.DIST_SCHEDULING_TOP_K,
         YarnConfiguration.DIST_SCHEDULING_TOP_K,
         YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
         YarnConfiguration.DIST_SCHEDULING_TOP_K_DEFAULT);
-    long topKComputationInterval = rmContext.getYarnConfiguration().getLong(
-        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS,
-        YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPUTE_INT_MS_DEFAULT);
-    TopKNodeSelector.TopKComparator comparator =
-        TopKNodeSelector.TopKComparator.valueOf(
+    long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
+        YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
+        YarnConfiguration.
+            NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS_DEFAULT);
+    NodeQueueLoadMonitor.LoadComparator comparator =
+        NodeQueueLoadMonitor.LoadComparator.valueOf(
             rmContext.getYarnConfiguration().get(
             rmContext.getYarnConfiguration().get(
-                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR,
-                YarnConfiguration.DIST_SCHEDULING_TOP_K_COMPARATOR_DEFAULT));
-    TopKNodeSelector topKSelector =
-        new TopKNodeSelector(k, topKComputationInterval, comparator);
-    this.clusterMonitor = topKSelector;
+                YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
+                YarnConfiguration.
+                    NM_CONTAINER_QUEUING_LOAD_COMPARATOR_DEFAULT));
+
+    NodeQueueLoadMonitor topKSelector =
+        new NodeQueueLoadMonitor(nodeSortInterval, comparator);
+
+    float sigma = rmContext.getYarnConfiguration()
+        .getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
+            YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV_DEFAULT);
+
+    int limitMin, limitMax;
+
+    if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH_DEFAULT);
+    } else {
+      limitMin = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS_DEFAULT);
+      limitMax = rmContext.getYarnConfiguration()
+          .getInt(
+              YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
+              YarnConfiguration.
+                  NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS_DEFAULT);
+    }
+
+    topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
+    this.nodeMonitor = topKSelector;
   }
   }
 
 
   @Override
   @Override
@@ -189,7 +224,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
 
 
     // Set nodes to be used for scheduling
     // Set nodes to be used for scheduling
     dsResp.setNodesForScheduling(
     dsResp.setNodesForScheduling(
-        new ArrayList<>(this.clusterMonitor.selectNodes()));
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
     return dsResp;
     return dsResp;
   }
   }
 
 
@@ -201,7 +236,7 @@ public class DistributedSchedulingService extends ApplicationMasterService
         (DistSchedAllocateResponse.class);
         (DistSchedAllocateResponse.class);
     dsResp.setAllocateResponse(response);
     dsResp.setAllocateResponse(response);
     dsResp.setNodesForScheduling(
     dsResp.setNodesForScheduling(
-        new ArrayList<>(this.clusterMonitor.selectNodes()));
+        this.nodeMonitor.selectLeastLoadedNodes(this.k));
     return dsResp;
     return dsResp;
   }
   }
 
 
@@ -229,67 +264,72 @@ public class DistributedSchedulingService extends ApplicationMasterService
   @Override
   @Override
   public void handle(SchedulerEvent event) {
   public void handle(SchedulerEvent event) {
     switch (event.getType()) {
     switch (event.getType()) {
-      case NODE_ADDED:
-        if (!(event instanceof NodeAddedSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
-        clusterMonitor.addNode(nodeAddedEvent.getContainerReports(),
-            nodeAddedEvent.getAddedRMNode());
-        addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
-            nodeAddedEvent.getAddedRMNode().getNodeID());
-        addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
-            nodeAddedEvent.getAddedRMNode().getNodeID());
-        break;
-      case NODE_REMOVED:
-        if (!(event instanceof NodeRemovedSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeRemovedSchedulerEvent nodeRemovedEvent =
-            (NodeRemovedSchedulerEvent)event;
-        clusterMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
-        removeFromMapping(rackToNode,
-            nodeRemovedEvent.getRemovedRMNode().getRackName(),
-            nodeRemovedEvent.getRemovedRMNode().getNodeID());
-        removeFromMapping(hostToNode,
-            nodeRemovedEvent.getRemovedRMNode().getHostName(),
-            nodeRemovedEvent.getRemovedRMNode().getNodeID());
-        break;
-      case NODE_UPDATE:
-        if (!(event instanceof NodeUpdateSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
-        clusterMonitor.nodeUpdate(nodeUpdatedEvent.getRMNode());
-        break;
-      case NODE_RESOURCE_UPDATE:
-        if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
-          throw new RuntimeException("Unexpected event type: " + event);
-        }
-        NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
-            (NodeResourceUpdateSchedulerEvent)event;
-        clusterMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
-            nodeResourceUpdatedEvent.getResourceOption());
-        break;
-
-      // <-- IGNORED EVENTS : START -->
-      case APP_ADDED:
-        break;
-      case APP_REMOVED:
-        break;
-      case APP_ATTEMPT_ADDED:
-        break;
-      case APP_ATTEMPT_REMOVED:
-        break;
-      case CONTAINER_EXPIRED:
-        break;
-      case NODE_LABELS_UPDATE:
-        break;
-      // <-- IGNORED EVENTS : END -->
-      default:
-        LOG.error("Unknown event arrived at DistributedSchedulingService: "
-            + event.toString());
+    case NODE_ADDED:
+      if (!(event instanceof NodeAddedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
+      nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
+          nodeAddedEvent.getAddedRMNode());
+      addToMapping(rackToNode, nodeAddedEvent.getAddedRMNode().getRackName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      addToMapping(hostToNode, nodeAddedEvent.getAddedRMNode().getHostName(),
+          nodeAddedEvent.getAddedRMNode().getNodeID());
+      break;
+    case NODE_REMOVED:
+      if (!(event instanceof NodeRemovedSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeRemovedSchedulerEvent nodeRemovedEvent =
+          (NodeRemovedSchedulerEvent) event;
+      nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
+      removeFromMapping(rackToNode,
+          nodeRemovedEvent.getRemovedRMNode().getRackName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      removeFromMapping(hostToNode,
+          nodeRemovedEvent.getRemovedRMNode().getHostName(),
+          nodeRemovedEvent.getRemovedRMNode().getNodeID());
+      break;
+    case NODE_UPDATE:
+      if (!(event instanceof NodeUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
+          event;
+      nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
+      break;
+    case NODE_RESOURCE_UPDATE:
+      if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
+          (NodeResourceUpdateSchedulerEvent) event;
+      nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
+          nodeResourceUpdatedEvent.getResourceOption());
+      break;
+
+    // <-- IGNORED EVENTS : START -->
+    case APP_ADDED:
+      break;
+    case APP_REMOVED:
+      break;
+    case APP_ATTEMPT_ADDED:
+      break;
+    case APP_ATTEMPT_REMOVED:
+      break;
+    case CONTAINER_EXPIRED:
+      break;
+    case NODE_LABELS_UPDATE:
+      break;
+    // <-- IGNORED EVENTS : END -->
+    default:
+      LOG.error("Unknown event arrived at DistributedSchedulingService: "
+          + event.toString());
     }
     }
+
   }
   }
 
 
+  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+    return nodeMonitor.getThresholdCalculator();
+  }
 }
 }

+ 4 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java

@@ -39,6 +39,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -139,4 +141,6 @@ public interface RMContext {
   void setLeaderElectorService(LeaderElectorService elector);
   void setLeaderElectorService(LeaderElectorService elector);
 
 
   LeaderElectorService getLeaderElectorService();
   LeaderElectorService getLeaderElectorService();
+
+  QueueLimitCalculator getNodeManagerQueueLimitCalculator();
 }
 }

+ 14 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java

@@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessM
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
 import org.apache.hadoop.yarn.server.resourcemanager.security.DelegationTokenRenewer;
@@ -74,6 +76,8 @@ public class RMContextImpl implements RMContext {
   private SystemMetricsPublisher systemMetricsPublisher;
   private SystemMetricsPublisher systemMetricsPublisher;
   private LeaderElectorService elector;
   private LeaderElectorService elector;
 
 
+  private QueueLimitCalculator queueLimitCalculator;
+
   /**
   /**
    * Default constructor. To be used in conjunction with setter methods for
    * Default constructor. To be used in conjunction with setter methods for
    * individual fields.
    * individual fields.
@@ -472,4 +476,14 @@ public class RMContextImpl implements RMContext {
   public void setQueuePlacementManager(PlacementManager placementMgr) {
   public void setQueuePlacementManager(PlacementManager placementMgr) {
     this.activeServiceContext.setQueuePlacementManager(placementMgr);
     this.activeServiceContext.setQueuePlacementManager(placementMgr);
   }
   }
+
+  @Override
+  public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
+    return this.queueLimitCalculator;
+  }
+
+  public void setContainerQueueLimitCalculator(
+      QueueLimitCalculator limitCalculator) {
+    this.queueLimitCalculator = limitCalculator;
+  }
 }
 }

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java

@@ -1154,6 +1154,8 @@ public class ResourceManager extends CompositeService implements Recoverable {
       addService(distSchedulerEventDispatcher);
       addService(distSchedulerEventDispatcher);
       rmDispatcher.register(SchedulerEventType.class,
       rmDispatcher.register(SchedulerEventType.class,
           distSchedulerEventDispatcher);
           distSchedulerEventDispatcher);
+      this.rmContext.setContainerQueueLimitCalculator(
+          distributedSchedulingService.getNodeManagerQueueLimitCalculator());
       return distributedSchedulingService;
       return distributedSchedulingService;
     }
     }
     return new ApplicationMasterService(this.rmContext, scheduler);
     return new ApplicationMasterService(this.rmContext, scheduler);

+ 7 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -536,6 +536,13 @@ public class ResourceTrackerService extends AbstractService implements
       }
       }
     }
     }
 
 
+    // 6. Send Container Queuing Limits back to the Node. This will be used by
+    //    the node to truncate the number of Containers queued for execution.
+    if (this.rmContext.getNodeManagerQueueLimitCalculator() != null) {
+      nodeHeartBeatResponse.setContainerQueuingLimit(
+          this.rmContext.getNodeManagerQueueLimitCalculator()
+              .createContainerQueuingLimit());
+    }
     return nodeHeartBeatResponse;
     return nodeHeartBeatResponse;
   }
   }
 
 

+ 97 - 49
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TopKNodeSelector.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/NodeQueueLoadMonitor.java

@@ -31,36 +31,47 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.Comparator;
-import java.util.HashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 
 
-public class TopKNodeSelector implements ClusterMonitor {
+/**
+ * The NodeQueueLoadMonitor keeps track of load metrics (such as queue length
+ * and total wait time) associated with Container Queues on the Node Manager.
+ * It uses this information to periodically sort the Nodes from least to most
+ * loaded.
+ */
+public class NodeQueueLoadMonitor implements ClusterMonitor {
 
 
-  final static Log LOG = LogFactory.getLog(TopKNodeSelector.class);
+  final static Log LOG = LogFactory.getLog(NodeQueueLoadMonitor.class);
 
 
-  public enum TopKComparator implements Comparator<ClusterNode> {
-    WAIT_TIME,
-    QUEUE_LENGTH;
+  /**
+   * The comparator used to specify the metric against which the load
+   * of two Nodes are compared.
+   */
+  public enum LoadComparator implements Comparator<ClusterNode> {
+    QUEUE_LENGTH,
+    QUEUE_WAIT_TIME;
 
 
     @Override
     @Override
     public int compare(ClusterNode o1, ClusterNode o2) {
     public int compare(ClusterNode o1, ClusterNode o2) {
-      if (getQuant(o1) == getQuant(o2)) {
+      if (getMetric(o1) == getMetric(o2)) {
         return o1.timestamp < o2.timestamp ? +1 : -1;
         return o1.timestamp < o2.timestamp ? +1 : -1;
       }
       }
-      return getQuant(o1) > getQuant(o2) ? +1 : -1;
+      return getMetric(o1) > getMetric(o2) ? +1 : -1;
     }
     }
 
 
-    private int getQuant(ClusterNode c) {
-      return (this == WAIT_TIME) ? c.queueTime : c.waitQueueLength;
+    public int getMetric(ClusterNode c) {
+      return (this == QUEUE_LENGTH) ? c.queueLength : c.queueWaitTime;
     }
     }
   }
   }
 
 
   static class ClusterNode {
   static class ClusterNode {
-    int queueTime = -1;
-    int waitQueueLength = 0;
+    int queueLength = 0;
+    int queueWaitTime = -1;
     double timestamp;
     double timestamp;
     final NodeId nodeId;
     final NodeId nodeId;
 
 
@@ -69,13 +80,13 @@ public class TopKNodeSelector implements ClusterMonitor {
       updateTimestamp();
       updateTimestamp();
     }
     }
 
 
-    public ClusterNode setQueueTime(int queueTime) {
-      this.queueTime = queueTime;
+    public ClusterNode setQueueLength(int qLength) {
+      this.queueLength = qLength;
       return this;
       return this;
     }
     }
 
 
-    public ClusterNode setWaitQueueLength(int queueLength) {
-      this.waitQueueLength = queueLength;
+    public ClusterNode setQueueWaitTime(int wTime) {
+      this.queueWaitTime = wTime;
       return this;
       return this;
     }
     }
 
 
@@ -85,34 +96,37 @@ public class TopKNodeSelector implements ClusterMonitor {
     }
     }
   }
   }
 
 
-  private final int k;
-  private final List<NodeId> topKNodes;
   private final ScheduledExecutorService scheduledExecutor;
   private final ScheduledExecutorService scheduledExecutor;
-  private final HashMap<NodeId, ClusterNode> clusterNodes = new HashMap<>();
-  private final Comparator<ClusterNode> comparator;
+
+  private final List<NodeId> sortedNodes;
+  private final Map<NodeId, ClusterNode> clusterNodes =
+      new ConcurrentHashMap<>();
+  private final LoadComparator comparator;
+  private QueueLimitCalculator thresholdCalculator;
 
 
   Runnable computeTask = new Runnable() {
   Runnable computeTask = new Runnable() {
     @Override
     @Override
     public void run() {
     public void run() {
-      synchronized (topKNodes) {
-        topKNodes.clear();
-        topKNodes.addAll(computeTopKNodes());
+      synchronized (sortedNodes) {
+        sortedNodes.clear();
+        sortedNodes.addAll(sortNodes());
+        if (thresholdCalculator != null) {
+          thresholdCalculator.update();
+        }
       }
       }
     }
     }
   };
   };
 
 
   @VisibleForTesting
   @VisibleForTesting
-  TopKNodeSelector(int k, TopKComparator comparator) {
-    this.k = k;
-    this.topKNodes = new ArrayList<>();
+  NodeQueueLoadMonitor(LoadComparator comparator) {
+    this.sortedNodes = new ArrayList<>();
     this.comparator = comparator;
     this.comparator = comparator;
     this.scheduledExecutor = null;
     this.scheduledExecutor = null;
   }
   }
 
 
-  public TopKNodeSelector(int k, long nodeComputationInterval,
-      TopKComparator comparator) {
-    this.k = k;
-    this.topKNodes = new ArrayList<>();
+  public NodeQueueLoadMonitor(long nodeComputationInterval,
+      LoadComparator comparator) {
+    this.sortedNodes = new ArrayList<>();
     this.scheduledExecutor = Executors.newScheduledThreadPool(1);
     this.scheduledExecutor = Executors.newScheduledThreadPool(1);
     this.comparator = comparator;
     this.comparator = comparator;
     this.scheduledExecutor.scheduleAtFixedRate(computeTask,
     this.scheduledExecutor.scheduleAtFixedRate(computeTask,
@@ -120,12 +134,32 @@ public class TopKNodeSelector implements ClusterMonitor {
         TimeUnit.MILLISECONDS);
         TimeUnit.MILLISECONDS);
   }
   }
 
 
+  List<NodeId> getSortedNodes() {
+    return sortedNodes;
+  }
+
+  public QueueLimitCalculator getThresholdCalculator() {
+    return thresholdCalculator;
+  }
+
+  Map<NodeId, ClusterNode> getClusterNodes() {
+    return clusterNodes;
+  }
+
+  Comparator<ClusterNode> getComparator() {
+    return comparator;
+  }
+
+  public void initThresholdCalculator(float sigma, int limitMin, int limitMax) {
+    this.thresholdCalculator =
+        new QueueLimitCalculator(this, sigma, limitMin, limitMax);
+  }
 
 
   @Override
   @Override
   public void addNode(List<NMContainerStatus> containerStatuses, RMNode
   public void addNode(List<NMContainerStatus> containerStatuses, RMNode
       rmNode) {
       rmNode) {
     LOG.debug("Node added event from: " + rmNode.getNode().getName());
     LOG.debug("Node added event from: " + rmNode.getNode().getName());
-    // Ignoring this currently : atleast one NODE_UPDATE heartbeat is
+    // Ignoring this currently : at least one NODE_UPDATE heartbeat is
     // required to ensure node eligibility.
     // required to ensure node eligibility.
   }
   }
 
 
@@ -143,24 +177,24 @@ public class TopKNodeSelector implements ClusterMonitor {
   }
   }
 
 
   @Override
   @Override
-  public void nodeUpdate(RMNode rmNode) {
+  public void updateNode(RMNode rmNode) {
     LOG.debug("Node update event from: " + rmNode.getNodeID());
     LOG.debug("Node update event from: " + rmNode.getNodeID());
     QueuedContainersStatus queuedContainersStatus =
     QueuedContainersStatus queuedContainersStatus =
         rmNode.getQueuedContainersStatus();
         rmNode.getQueuedContainersStatus();
     int estimatedQueueWaitTime =
     int estimatedQueueWaitTime =
         queuedContainersStatus.getEstimatedQueueWaitTime();
         queuedContainersStatus.getEstimatedQueueWaitTime();
     int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
     int waitQueueLength = queuedContainersStatus.getWaitQueueLength();
-    // Add nodes to clusterNodes.. if estimatedQueueTime is -1, Ignore node
-    // UNLESS comparator is based on queue length, in which case, we should add
+    // Add nodes to clusterNodes. If estimatedQueueTime is -1, ignore node
+    // UNLESS comparator is based on queue length.
     synchronized (this.clusterNodes) {
     synchronized (this.clusterNodes) {
       ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
       ClusterNode currentNode = this.clusterNodes.get(rmNode.getNodeID());
       if (currentNode == null) {
       if (currentNode == null) {
         if (estimatedQueueWaitTime != -1
         if (estimatedQueueWaitTime != -1
-            || comparator == TopKComparator.QUEUE_LENGTH) {
+            || comparator == LoadComparator.QUEUE_LENGTH) {
           this.clusterNodes.put(rmNode.getNodeID(),
           this.clusterNodes.put(rmNode.getNodeID(),
               new ClusterNode(rmNode.getNodeID())
               new ClusterNode(rmNode.getNodeID())
-                  .setQueueTime(estimatedQueueWaitTime)
-                  .setWaitQueueLength(waitQueueLength));
+                  .setQueueWaitTime(estimatedQueueWaitTime)
+                  .setQueueLength(waitQueueLength));
           LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
           LOG.info("Inserting ClusterNode [" + rmNode.getNodeID() + "]" +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
               "wait queue length [" + waitQueueLength + "]");
               "wait queue length [" + waitQueueLength + "]");
@@ -171,10 +205,10 @@ public class TopKNodeSelector implements ClusterMonitor {
         }
         }
       } else {
       } else {
         if (estimatedQueueWaitTime != -1
         if (estimatedQueueWaitTime != -1
-            || comparator == TopKComparator.QUEUE_LENGTH) {
+            || comparator == LoadComparator.QUEUE_LENGTH) {
           currentNode
           currentNode
-              .setQueueTime(estimatedQueueWaitTime)
-              .setWaitQueueLength(waitQueueLength)
+              .setQueueWaitTime(estimatedQueueWaitTime)
+              .setQueueLength(waitQueueLength)
               .updateTimestamp();
               .updateTimestamp();
           LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
           LOG.info("Updating ClusterNode [" + rmNode.getNodeID() + "]" +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
               "with queue wait time [" + estimatedQueueWaitTime + "] and " +
@@ -182,8 +216,8 @@ public class TopKNodeSelector implements ClusterMonitor {
         } else {
         } else {
           this.clusterNodes.remove(rmNode.getNodeID());
           this.clusterNodes.remove(rmNode.getNodeID());
           LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
           LOG.info("Deleting ClusterNode [" + rmNode.getNodeID() + "]" +
-              "with queue wait time [" + currentNode.queueTime + "] and " +
-              "wait queue length [" + currentNode.waitQueueLength + "]");
+              "with queue wait time [" + currentNode.queueWaitTime + "] and " +
+              "wait queue length [" + currentNode.queueLength + "]");
         }
         }
       }
       }
     }
     }
@@ -192,25 +226,38 @@ public class TopKNodeSelector implements ClusterMonitor {
   @Override
   @Override
   public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
   public void updateNodeResource(RMNode rmNode, ResourceOption resourceOption) {
     LOG.debug("Node resource update event from: " + rmNode.getNodeID());
     LOG.debug("Node resource update event from: " + rmNode.getNodeID());
-    // Ignoring this currently...
+    // Ignoring this currently.
   }
   }
 
 
+  /**
+   * Returns all Node Ids as ordered list from Least to Most Loaded.
+   * @return ordered list of nodes
+   */
   public List<NodeId> selectNodes() {
   public List<NodeId> selectNodes() {
-    synchronized (this.topKNodes) {
-      return this.k < this.topKNodes.size() ?
-          new ArrayList<>(this.topKNodes).subList(0, this.k) :
-          new ArrayList<>(this.topKNodes);
+    return selectLeastLoadedNodes(-1);
+  }
+
+  /**
+   * Returns 'K' of the least Loaded Node Ids as ordered list.
+   * @param k max number of nodes to return
+   * @return ordered list of nodes
+   */
+  public List<NodeId> selectLeastLoadedNodes(int k) {
+    synchronized (this.sortedNodes) {
+      return ((k < this.sortedNodes.size()) && (k >= 0)) ?
+          new ArrayList<>(this.sortedNodes).subList(0, k) :
+          new ArrayList<>(this.sortedNodes);
     }
     }
   }
   }
 
 
-  private List<NodeId> computeTopKNodes() {
+  private List<NodeId> sortNodes() {
     synchronized (this.clusterNodes) {
     synchronized (this.clusterNodes) {
       ArrayList aList = new ArrayList<>(this.clusterNodes.values());
       ArrayList aList = new ArrayList<>(this.clusterNodes.values());
       List<NodeId> retList = new ArrayList<>();
       List<NodeId> retList = new ArrayList<>();
       Object[] nodes = aList.toArray();
       Object[] nodes = aList.toArray();
       // Collections.sort would do something similar by calling Arrays.sort
       // Collections.sort would do something similar by calling Arrays.sort
       // internally but would finally iterate through the input list (aList)
       // internally but would finally iterate through the input list (aList)
-      // to reset the value of each element.. Since we don't really care about
+      // to reset the value of each element. Since we don't really care about
       // 'aList', we can use the iteration to create the list of nodeIds which
       // 'aList', we can use the iteration to create the list of nodeIds which
       // is what we ultimately care about.
       // is what we ultimately care about.
       Arrays.sort(nodes, (Comparator)comparator);
       Arrays.sort(nodes, (Comparator)comparator);
@@ -220,4 +267,5 @@ public class TopKNodeSelector implements ClusterMonitor {
       return retList;
       return retList;
     }
     }
   }
   }
+
 }
 }

+ 125 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/QueueLimitCalculator.java

@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.ClusterNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor.LoadComparator;
+
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * This class interacts with the NodeQueueLoadMonitor to keep track of the
+ * mean and standard deviation of the configured metrics (queue length or queue
+ * wait time) used to characterize the queue load of a specific node.
+ * The NodeQueueLoadMonitor triggers an update (by calling the
+ * <code>update()</code> method) every time it performs a re-ordering of
+ * all nodes.
+ */
+public class QueueLimitCalculator {
+
+  class Stats {
+    private final AtomicInteger mean = new AtomicInteger(0);
+    private final AtomicInteger stdev = new AtomicInteger(0);
+
+    /**
+     * Not thread safe. Caller should synchronize on sorted nodes list.
+     */
+    void update() {
+      List<NodeId> sortedNodes = nodeSelector.getSortedNodes();
+      if (sortedNodes.size() > 0) {
+        // Calculate mean
+        int sum = 0;
+        for (NodeId n : sortedNodes) {
+          sum += getMetric(getNode(n));
+        }
+        mean.set(sum / sortedNodes.size());
+
+        // Calculate stdev
+        int sqrSumMean = 0;
+        for (NodeId n : sortedNodes) {
+          int val = getMetric(getNode(n));
+          sqrSumMean += Math.pow(val - mean.get(), 2);
+        }
+        stdev.set(
+            (int) Math.round(Math.sqrt(
+                sqrSumMean / (float) sortedNodes.size())));
+      }
+    }
+
+    private ClusterNode getNode(NodeId nId) {
+      return nodeSelector.getClusterNodes().get(nId);
+    }
+
+    private int getMetric(ClusterNode cn) {
+      return (cn != null) ? ((LoadComparator)nodeSelector.getComparator())
+              .getMetric(cn) : 0;
+    }
+
+    public int getMean() {
+      return mean.get();
+    }
+
+    public int getStdev() {
+      return stdev.get();
+    }
+  }
+
+  private final NodeQueueLoadMonitor nodeSelector;
+  private final float sigma;
+  private final int rangeMin;
+  private final int rangeMax;
+  private final Stats stats = new Stats();
+
+  QueueLimitCalculator(NodeQueueLoadMonitor selector, float sigma,
+      int rangeMin, int rangeMax) {
+    this.nodeSelector = selector;
+    this.sigma = sigma;
+    this.rangeMax = rangeMax;
+    this.rangeMin = rangeMin;
+  }
+
+  private int determineThreshold() {
+    return (int) (stats.getMean() + sigma * stats.getStdev());
+  }
+
+  void update() {
+    this.stats.update();
+  }
+
+  private int getThreshold() {
+    int thres = determineThreshold();
+    return Math.min(rangeMax, Math.max(rangeMin, thres));
+  }
+
+  public ContainerQueuingLimit createContainerQueuingLimit() {
+    ContainerQueuingLimit containerQueuingLimit =
+        ContainerQueuingLimit.newInstance();
+    if (nodeSelector.getComparator() == LoadComparator.QUEUE_WAIT_TIME) {
+      containerQueuingLimit.setMaxQueueWaitTimeInMs(getThreshold());
+      containerQueuingLimit.setMaxQueueLength(-1);
+    } else {
+      containerQueuingLimit.setMaxQueueWaitTimeInMs(-1);
+      containerQueuingLimit.setMaxQueueLength(getThreshold());
+    }
+    return containerQueuingLimit;
+  }
+}

+ 64 - 16
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestTopKNodeSelector.java → hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/distributed/TestNodeQueueLoadMonitor.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed;
 
 
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.QueuedContainersStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.junit.Assert;
 import org.junit.Assert;
@@ -27,7 +28,10 @@ import org.mockito.Mockito;
 
 
 import java.util.List;
 import java.util.List;
 
 
-public class TestTopKNodeSelector {
+/**
+ * Unit tests for NodeQueueLoadMonitor.
+ */
+public class TestNodeQueueLoadMonitor {
 
 
   static class FakeNodeId extends NodeId {
   static class FakeNodeId extends NodeId {
     final String host;
     final String host;
@@ -62,12 +66,12 @@ public class TestTopKNodeSelector {
   }
   }
 
 
   @Test
   @Test
-  public void testQueueTimeSort() {
-    TopKNodeSelector selector = new TopKNodeSelector(5,
-        TopKNodeSelector.TopKComparator.WAIT_TIME);
-    selector.nodeUpdate(createRMNode("h1", 1, 15, 10));
-    selector.nodeUpdate(createRMNode("h2", 2, 5, 10));
-    selector.nodeUpdate(createRMNode("h3", 3, 10, 10));
+  public void testWaitTimeSort() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_WAIT_TIME);
+    selector.updateNode(createRMNode("h1", 1, 15, 10));
+    selector.updateNode(createRMNode("h2", 2, 5, 10));
+    selector.updateNode(createRMNode("h3", 3, 10, 10));
     selector.computeTask.run();
     selector.computeTask.run();
     List<NodeId> nodeIds = selector.selectNodes();
     List<NodeId> nodeIds = selector.selectNodes();
     System.out.println("1-> " + nodeIds);
     System.out.println("1-> " + nodeIds);
@@ -76,7 +80,7 @@ public class TestTopKNodeSelector {
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
 
 
     // Now update node3
     // Now update node3
-    selector.nodeUpdate(createRMNode("h3", 3, 2, 10));
+    selector.updateNode(createRMNode("h3", 3, 2, 10));
     selector.computeTask.run();
     selector.computeTask.run();
     nodeIds = selector.selectNodes();
     nodeIds = selector.selectNodes();
     System.out.println("2-> "+ nodeIds);
     System.out.println("2-> "+ nodeIds);
@@ -85,7 +89,7 @@ public class TestTopKNodeSelector {
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
 
 
     // Now send update with -1 wait time
     // Now send update with -1 wait time
-    selector.nodeUpdate(createRMNode("h4", 4, -1, 10));
+    selector.updateNode(createRMNode("h4", 4, -1, 10));
     selector.computeTask.run();
     selector.computeTask.run();
     nodeIds = selector.selectNodes();
     nodeIds = selector.selectNodes();
     System.out.println("3-> "+ nodeIds);
     System.out.println("3-> "+ nodeIds);
@@ -97,11 +101,11 @@ public class TestTopKNodeSelector {
 
 
   @Test
   @Test
   public void testQueueLengthSort() {
   public void testQueueLengthSort() {
-    TopKNodeSelector selector = new TopKNodeSelector(5,
-        TopKNodeSelector.TopKComparator.QUEUE_LENGTH);
-    selector.nodeUpdate(createRMNode("h1", 1, -1, 15));
-    selector.nodeUpdate(createRMNode("h2", 2, -1, 5));
-    selector.nodeUpdate(createRMNode("h3", 3, -1, 10));
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+    selector.updateNode(createRMNode("h1", 1, -1, 15));
+    selector.updateNode(createRMNode("h2", 2, -1, 5));
+    selector.updateNode(createRMNode("h3", 3, -1, 10));
     selector.computeTask.run();
     selector.computeTask.run();
     List<NodeId> nodeIds = selector.selectNodes();
     List<NodeId> nodeIds = selector.selectNodes();
     System.out.println("1-> " + nodeIds);
     System.out.println("1-> " + nodeIds);
@@ -110,7 +114,7 @@ public class TestTopKNodeSelector {
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
 
 
     // Now update node3
     // Now update node3
-    selector.nodeUpdate(createRMNode("h3", 3, -1, 2));
+    selector.updateNode(createRMNode("h3", 3, -1, 2));
     selector.computeTask.run();
     selector.computeTask.run();
     nodeIds = selector.selectNodes();
     nodeIds = selector.selectNodes();
     System.out.println("2-> "+ nodeIds);
     System.out.println("2-> "+ nodeIds);
@@ -119,7 +123,7 @@ public class TestTopKNodeSelector {
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
     Assert.assertEquals("h1:1", nodeIds.get(2).toString());
 
 
     // Now send update with -1 wait time but valid length
     // Now send update with -1 wait time but valid length
-    selector.nodeUpdate(createRMNode("h4", 4, -1, 20));
+    selector.updateNode(createRMNode("h4", 4, -1, 20));
     selector.computeTask.run();
     selector.computeTask.run();
     nodeIds = selector.selectNodes();
     nodeIds = selector.selectNodes();
     System.out.println("3-> "+ nodeIds);
     System.out.println("3-> "+ nodeIds);
@@ -130,6 +134,50 @@ public class TestTopKNodeSelector {
     Assert.assertEquals("h4:4", nodeIds.get(3).toString());
     Assert.assertEquals("h4:4", nodeIds.get(3).toString());
   }
   }
 
 
+  @Test
+  public void testContainerQueuingLimit() {
+    NodeQueueLoadMonitor selector = new NodeQueueLoadMonitor(
+        NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH);
+    selector.updateNode(createRMNode("h1", 1, -1, 15));
+    selector.updateNode(createRMNode("h2", 2, -1, 5));
+    selector.updateNode(createRMNode("h3", 3, -1, 10));
+
+    // Test Mean Calculation
+    selector.initThresholdCalculator(0, 6, 100);
+    QueueLimitCalculator calculator = selector.getThresholdCalculator();
+    ContainerQueuingLimit containerQueuingLimit = calculator
+        .createContainerQueuingLimit();
+    Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength());
+    Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
+    selector.computeTask.run();
+    containerQueuingLimit = calculator.createContainerQueuingLimit();
+    Assert.assertEquals(10, containerQueuingLimit.getMaxQueueLength());
+    Assert.assertEquals(-1, containerQueuingLimit.getMaxQueueWaitTimeInMs());
+
+    // Test Limits do not exceed specified max
+    selector.updateNode(createRMNode("h1", 1, -1, 110));
+    selector.updateNode(createRMNode("h2", 2, -1, 120));
+    selector.updateNode(createRMNode("h3", 3, -1, 130));
+    selector.updateNode(createRMNode("h4", 4, -1, 140));
+    selector.updateNode(createRMNode("h5", 5, -1, 150));
+    selector.updateNode(createRMNode("h6", 6, -1, 160));
+    selector.computeTask.run();
+    containerQueuingLimit = calculator.createContainerQueuingLimit();
+    Assert.assertEquals(100, containerQueuingLimit.getMaxQueueLength());
+
+    // Test Limits do not go below specified min
+    selector.updateNode(createRMNode("h1", 1, -1, 1));
+    selector.updateNode(createRMNode("h2", 2, -1, 2));
+    selector.updateNode(createRMNode("h3", 3, -1, 3));
+    selector.updateNode(createRMNode("h4", 4, -1, 4));
+    selector.updateNode(createRMNode("h5", 5, -1, 5));
+    selector.updateNode(createRMNode("h6", 6, -1, 6));
+    selector.computeTask.run();
+    containerQueuingLimit = calculator.createContainerQueuingLimit();
+    Assert.assertEquals(6, containerQueuingLimit.getMaxQueueLength());
+
+  }
+
   private RMNode createRMNode(String host, int port,
   private RMNode createRMNode(String host, int port,
       int waitTime, int queueLength) {
       int waitTime, int queueLength) {
     RMNode node1 = Mockito.mock(RMNode.class);
     RMNode node1 = Mockito.mock(RMNode.class);