Przeglądaj źródła

YARN-999. In case of long running tasks, reduce node resource should balloon out resource quickly by calling preemption API and suspending running task. Contributed by Inigo Goiri.

Giovanni Matteo Fumarola 6 lat temu
rodzic
commit
cfec455c45
15 zmienionych plików z 1295 dodań i 95 usunięć
  1. 6 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java
  2. 5 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
  3. 11 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
  4. 17 5
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
  5. 75 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
  6. 66 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
  7. 26 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
  8. 9 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
  9. 93 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java
  10. 723 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java
  11. 120 85
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
  12. 52 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java
  13. 46 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java
  14. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties
  15. 23 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties

+ 6 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/ResourceOption.java

@@ -55,12 +55,16 @@ public abstract class ResourceOption {
    * Get timeout for tolerant of resource over-commitment
    * Note: negative value means no timeout so that allocated containers will
    * keep running until the end even under resource over-commitment cases.
-   * @return <em>overCommitTimeout</em> of the ResourceOption
+   * @return <em>overCommitTimeout</em> of the ResourceOption in milliseconds.
    */
   @Private
   @Evolving
   public abstract int getOverCommitTimeout();
-  
+
+  /**
+   * Set the overcommit timeout.
+   * @param overCommitTimeout Timeout in ms. Negative means no timeout.
+   */
   @Private
   @Evolving
   protected abstract void setOverCommitTimeout(int overCommitTimeout);

+ 5 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java

@@ -675,6 +675,11 @@ public class ResourceTrackerService extends AbstractService implements
     if (capability != null) {
       nodeHeartBeatResponse.setResource(capability);
     }
+    // Check if we got an event (AdminService) that updated the resources
+    if (rmNode.isUpdatedCapability()) {
+      nodeHeartBeatResponse.setResource(rmNode.getTotalCapability());
+      rmNode.resetUpdatedCapability();
+    }
 
     // 7. Send Container Queuing Limits back to the Node. This will be used by
     // the node to truncate the number of Containers queued for execution.

+ 11 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java

@@ -104,6 +104,17 @@ public interface RMNode {
    */
   public Resource getTotalCapability();
 
+  /**
+   * If the total available resources has been updated.
+   * @return If the capability has been updated.
+   */
+  boolean isUpdatedCapability();
+
+  /**
+   * Mark that the updated event has been processed.
+   */
+  void resetUpdatedCapability();
+
   /**
    * the aggregated resource utilization of the containers.
    * @return the aggregated resource utilization of the containers.

+ 17 - 5
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java

@@ -126,6 +126,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   /* Snapshot of total resources before receiving decommissioning command */
   private volatile Resource originalTotalCapability;
   private volatile Resource totalCapability;
+  private volatile boolean updatedCapability = false;
   private final Node node;
 
   private String healthReport;
@@ -456,6 +457,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
     return this.totalCapability;
   }
 
+  @Override
+  public boolean isUpdatedCapability() {
+    return this.updatedCapability;
+  }
+
+  @Override
+  public void resetUpdatedCapability() {
+    this.updatedCapability = false;
+  }
+
   @Override
   public String getRackName() {
     return node.getNetworkLocation();
@@ -814,11 +825,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
         .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
   }
   
-  private static void updateNodeResourceFromEvent(RMNodeImpl rmNode, 
-     RMNodeResourceUpdateEvent event){
-      ResourceOption resourceOption = event.getResourceOption();
-      // Set resource on RMNode
-      rmNode.totalCapability = resourceOption.getResource();
+  private static void updateNodeResourceFromEvent(RMNodeImpl rmNode,
+      RMNodeResourceUpdateEvent event){
+    ResourceOption resourceOption = event.getResourceOption();
+    // Set resource on RMNode
+    rmNode.totalCapability = resourceOption.getResource();
+    rmNode.updatedCapability = true;
   }
 
   private static NodeHealthStatus updateRMNodeFromStatusEvents(

+ 75 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java

@@ -92,13 +92,16 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
 import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -116,6 +119,8 @@ public abstract class AbstractYarnScheduler
   private static final Logger LOG =
       LoggerFactory.getLogger(AbstractYarnScheduler.class);
 
+  private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
+
   protected final ClusterNodeTracker<N> nodeTracker =
       new ClusterNodeTracker<>();
 
@@ -809,6 +814,7 @@ public abstract class AbstractYarnScheduler
     try {
       SchedulerNode node = getSchedulerNode(nm.getNodeID());
       Resource newResource = resourceOption.getResource();
+      final int timeout = resourceOption.getOverCommitTimeout();
       Resource oldResource = node.getTotalResource();
       if (!oldResource.equals(newResource)) {
         // Notify NodeLabelsManager about this change
@@ -816,13 +822,15 @@ public abstract class AbstractYarnScheduler
             newResource);
 
         // Log resource change
-        LOG.info("Update resource on node: " + node.getNodeName() + " from: "
-            + oldResource + ", to: " + newResource);
+        LOG.info("Update resource on node: {} from: {}, to: {} in {} ms",
+            node.getNodeName(), oldResource, newResource, timeout);
 
         nodeTracker.removeNode(nm.getNodeID());
 
         // update resource to node
         node.updateTotalResource(newResource);
+        node.setOvercommitTimeOut(timeout);
+        signalContainersIfOvercommitted(node, timeout == 0);
 
         nodeTracker.addNode((N) node);
       } else{
@@ -1165,6 +1173,10 @@ public abstract class AbstractYarnScheduler
       updateNodeResourceUtilization(nm, schedulerNode);
     }
 
+    if (schedulerNode != null) {
+      signalContainersIfOvercommitted(schedulerNode, true);
+    }
+
     // Now node data structures are up-to-date and ready for scheduling.
     if(LOG.isDebugEnabled()) {
       LOG.debug(
@@ -1174,6 +1186,67 @@ public abstract class AbstractYarnScheduler
     }
   }
 
+  /**
+   * Check if the node is overcommitted and needs to remove containers. If
+   * it is overcommitted, it will kill or preempt (notify the AM to stop them)
+   * containers. It also takes into account the overcommit timeout. It only
+   * notifies the application to preempt a container if the timeout hasn't
+   * passed. If the timeout has passed, it tries to kill the containers. If
+   * there is no timeout, it doesn't do anything and just prevents new
+   * allocations.
+   *
+   * This action is taken when the change of resources happens (to preempt
+   * containers or killing them if specified) or when the node heart beats
+   * (for killing only).
+   *
+   * @param schedulerNode The node to check whether is overcommitted.
+   * @param kill If the container should be killed or just notify the AM.
+   */
+  private void signalContainersIfOvercommitted(
+      SchedulerNode schedulerNode, boolean kill) {
+
+    // If there is no time out, we don't do anything
+    if (!schedulerNode.isOvercommitTimeOutSet()) {
+      return;
+    }
+
+    SchedulerEventType eventType =
+        SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION;
+    if (kill) {
+      eventType = SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE;
+
+      // If it hasn't timed out yet, don't kill
+      if (!schedulerNode.isOvercommitTimedOut()) {
+        return;
+      }
+    }
+
+    // Check if the node is overcommitted (negative resources)
+    ResourceCalculator rc = getResourceCalculator();
+    Resource unallocated = Resource.newInstance(
+        schedulerNode.getUnallocatedResource());
+    if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+      return;
+    }
+
+    LOG.info("{} is overcommitted ({}), preempt/kill containers",
+        schedulerNode.getNodeID(), unallocated);
+    for (RMContainer container : schedulerNode.getContainersToKill()) {
+      LOG.info("Send {} to {} to free up {}", eventType,
+          container.getContainerId(), container.getAllocatedResource());
+      ApplicationAttemptId appId = container.getApplicationAttemptId();
+      ContainerPreemptEvent event =
+          new ContainerPreemptEvent(appId, container, eventType);
+      this.rmContext.getDispatcher().getEventHandler().handle(event);
+      Resources.addTo(unallocated, container.getAllocatedResource());
+
+      if (Resources.fitsIn(rc, ZERO_RESOURCE, unallocated)) {
+        LOG.debug("Enough unallocated resources {}", unallocated);
+        break;
+      }
+    }
+  }
+
   @Override
   public Resource getNormalizedResource(Resource requestedResource,
                                         Resource maxResourceCapability) {

+ 66 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.HashMap;
 import java.util.List;
@@ -26,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang3.builder.CompareToBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -70,6 +72,8 @@ public abstract class SchedulerNode {
       ResourceUtilization.newInstance(0, 0, 0f);
   private volatile ResourceUtilization nodeUtilization =
       ResourceUtilization.newInstance(0, 0, 0f);
+  /** Time stamp for overcommitted resources to time out. */
+  private long overcommitTimeout = -1;
 
   /* set of containers that are allocated containers */
   private final Map<ContainerId, ContainerInfo> launchedContainers =
@@ -119,6 +123,38 @@ public abstract class SchedulerNode {
         this.allocatedResource);
   }
 
+  /**
+   * Set the timeout for the node to stop overcommitting the resources. After
+   * this time the scheduler will start killing containers until the resources
+   * are not overcommitted anymore. This may reset a previous timeout.
+   * @param timeOut Time out in milliseconds.
+   */
+  public synchronized void setOvercommitTimeOut(long timeOut) {
+    if (timeOut >= 0) {
+      if (this.overcommitTimeout != -1) {
+        LOG.debug("The overcommit timeout for {} was already set to {}",
+            getNodeID(), this.overcommitTimeout);
+      }
+      this.overcommitTimeout = Time.now() + timeOut;
+    }
+  }
+
+  /**
+   * Check if the time out has passed.
+   * @return If the node is overcommitted.
+   */
+  public synchronized boolean isOvercommitTimedOut() {
+    return this.overcommitTimeout >= 0 && Time.now() >= this.overcommitTimeout;
+  }
+
+  /**
+   * Check if the node has a time out for overcommit resources.
+   * @return If the node has a time out for overcommit resources.
+   */
+  public synchronized boolean isOvercommitTimeOutSet() {
+    return this.overcommitTimeout >= 0;
+  }
+
   /**
    * Get the ID of the node which contains both its hostname and port.
    * @return The ID of the node.
@@ -372,6 +408,36 @@ public abstract class SchedulerNode {
     return result;
   }
 
+  /**
+   * Get the containers running on the node ordered by which to kill first. It
+   * tries to kill AMs last, then GUARANTEED containers, and it kills
+   * OPPORTUNISTIC first. If the same time, it uses the creation time.
+   * @return A copy of the running containers ordered by which to kill first.
+   */
+  public List<RMContainer> getContainersToKill() {
+    List<RMContainer> result = getLaunchedContainers();
+    Collections.sort(result, (c1, c2) -> {
+      return new CompareToBuilder()
+          .append(c1.isAMContainer(), c2.isAMContainer())
+          .append(c2.getExecutionType(), c1.getExecutionType()) // reversed
+          .append(c2.getCreationTime(), c1.getCreationTime()) // reversed
+          .toComparison();
+    });
+    return result;
+  }
+
+  /**
+   * Get the launched containers in the node.
+   * @return List of launched containers.
+   */
+  protected synchronized List<RMContainer> getLaunchedContainers() {
+    List<RMContainer> result = new ArrayList<>();
+    for (ContainerInfo info : launchedContainers.values()) {
+      result.add(info.container);
+    }
+    return result;
+  }
+
   /**
    * Get the container for the specified container ID.
    * @param containerId The container ID

+ 26 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java

@@ -87,6 +87,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptA
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -1288,8 +1289,32 @@ public class FairScheduler extends
               SchedulerUtils.EXPIRED_CONTAINER),
           RMContainerEventType.EXPIRE);
       break;
+    case MARK_CONTAINER_FOR_PREEMPTION:
+      if (!(event instanceof ContainerPreemptEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      ContainerPreemptEvent preemptContainerEvent =
+          (ContainerPreemptEvent)event;
+      ApplicationAttemptId appId = preemptContainerEvent.getAppId();
+      RMContainer preemptedContainer = preemptContainerEvent.getContainer();
+      FSAppAttempt app = getApplicationAttempt(appId);
+      app.trackContainerForPreemption(preemptedContainer);
+      break;
+    case MARK_CONTAINER_FOR_KILLABLE:
+      if (!(event instanceof ContainerPreemptEvent)) {
+        throw new RuntimeException("Unexpected event type: " + event);
+      }
+      ContainerPreemptEvent containerKillableEvent =
+          (ContainerPreemptEvent)event;
+      RMContainer killableContainer = containerKillableEvent.getContainer();
+      completedContainer(killableContainer,
+          SchedulerUtils.createPreemptedContainerStatus(
+              killableContainer.getContainerId(),
+              SchedulerUtils.PREEMPTED_CONTAINER),
+          RMContainerEventType.KILL);
+      break;
     default:
-      LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
+      LOG.error("Unknown event arrived at FairScheduler: {}", event);
     }
   }
 

+ 9 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java

@@ -190,6 +190,15 @@ public class MockNodes {
       return this.perNode;
     }
 
+    @Override
+    public boolean isUpdatedCapability() {
+      return false;
+    }
+
+    @Override
+    public void resetUpdatedCapability() {
+    }
+
     @Override
     public String getRackName() {
       return this.rackName;

+ 93 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestAbstractYarnScheduler.java

@@ -18,12 +18,14 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +37,7 @@ import com.google.common.collect.Sets;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.*;
@@ -1018,4 +1021,94 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
       System.out.println("Stopping testContainerRecoveredByNode");
     }
   }
+
+  /**
+   * Test the order we get the containers to kill. It should respect the order
+   * described in {@link SchedulerNode#getContainersToKill()}.
+   */
+  @Test
+  public void testGetRunningContainersToKill() {
+    final SchedulerNode node = new MockSchedulerNode();
+    assertEquals(Collections.emptyList(), node.getContainersToKill());
+
+    // AM0
+    RMContainer am0 = newMockRMContainer(
+        true, ExecutionType.GUARANTEED, "AM0");
+    node.allocateContainer(am0);
+    assertEquals(Arrays.asList(am0), node.getContainersToKill());
+
+    // OPPORTUNISTIC0, AM0
+    RMContainer opp0 = newMockRMContainer(
+        false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC0");
+    node.allocateContainer(opp0);
+    assertEquals(Arrays.asList(opp0, am0), node.getContainersToKill());
+
+    // OPPORTUNISTIC0, GUARANTEED0, AM0
+    RMContainer regular0 = newMockRMContainer(
+        false, ExecutionType.GUARANTEED, "GUARANTEED0");
+    node.allocateContainer(regular0);
+    assertEquals(Arrays.asList(opp0, regular0, am0),
+        node.getContainersToKill());
+
+    // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM0
+    RMContainer opp1 = newMockRMContainer(
+        false, ExecutionType.OPPORTUNISTIC, "OPPORTUNISTIC1");
+    node.allocateContainer(opp1);
+    assertEquals(Arrays.asList(opp1, opp0, regular0, am0),
+        node.getContainersToKill());
+
+    // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED0, AM1, AM0
+    RMContainer am1 = newMockRMContainer(
+        true, ExecutionType.GUARANTEED, "AM1");
+    node.allocateContainer(am1);
+    assertEquals(Arrays.asList(opp1, opp0, regular0, am1, am0),
+        node.getContainersToKill());
+
+    // OPPORTUNISTIC1, OPPORTUNISTIC0, GUARANTEED1, GUARANTEED0, AM1, AM0
+    RMContainer regular1 = newMockRMContainer(
+        false, ExecutionType.GUARANTEED, "GUARANTEED1");
+    node.allocateContainer(regular1);
+    assertEquals(Arrays.asList(opp1, opp0, regular1, regular0, am1, am0),
+        node.getContainersToKill());
+  }
+
+  private static RMContainer newMockRMContainer(boolean isAMContainer,
+      ExecutionType executionType, String name) {
+    RMContainer container = mock(RMContainer.class);
+    when(container.isAMContainer()).thenReturn(isAMContainer);
+    when(container.getExecutionType()).thenReturn(executionType);
+    when(container.getCreationTime()).thenReturn(Time.now());
+    when(container.toString()).thenReturn(name);
+    return container;
+  }
+
+  /**
+   * SchedulerNode mock to test launching containers.
+   */
+  class MockSchedulerNode extends SchedulerNode {
+    private final List<RMContainer> containers = new ArrayList<>();
+
+    MockSchedulerNode() {
+      super(MockNodes.newNodeInfo(0, Resource.newInstance(1, 1)), false);
+    }
+
+    @Override
+    protected List<RMContainer> getLaunchedContainers() {
+      return containers;
+    }
+
+    @Override
+    public void allocateContainer(RMContainer rmContainer) {
+      containers.add(rmContainer);
+      // Shuffle for testing
+      Collections.shuffle(containers);
+    }
+
+    @Override
+    public void reserveResource(SchedulerApplicationAttempt attempt,
+        SchedulerRequestKey schedulerKey, RMContainer container) {}
+
+    @Override
+    public void unreserveResource(SchedulerApplicationAttempt attempt) {}
+  }
 }

+ 723 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerOvercommit.java

@@ -0,0 +1,723 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.PreemptionContainer;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceOption;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.NodeAttributeStore;
+import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.TestResourceTrackerService.NullNodeAttributeStore;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Generic tests for overcommitting resources. This needs to be instantiated
+ * with a scheduler ({@link YarnConfiguration.RM_SCHEDULER}).
+ *
+ * If reducing the amount of resources leads to overcommitting (negative
+ * available resources), the scheduler will select containers to make room.
+ * <ul>
+ * <li>If there is no timeout (&lt;0), it doesn't kill or preempt surplus
+ * containers.</li>
+ * <li>If the timeout is 0, it kills the surplus containers immediately.</li>
+ * <li>If the timeout is larger than 0, it first asks the application to
+ * preempt those containers and after the timeout passes, it kills the surplus
+ * containers.</li>
+ * </ul>
+ */
+public abstract class TestSchedulerOvercommit {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestSchedulerOvercommit.class);
+
+  /** 1 GB in MB. */
+  protected final static int GB = 1024;
+
+  /** We do scheduling and heart beat every 200ms. */
+  protected static final int INTERVAL = 200;
+
+
+  /** Mock Resource Manager. */
+  private MockRM rm;
+  /** Scheduler for the Mock Resource Manager.*/
+  private ResourceScheduler scheduler;
+
+  /** Node Manager running containers. */
+  private MockNM nm;
+  private NodeId nmId;
+
+  /** Application to allocate containers. */
+  private RMAppAttempt attempt;
+  private MockAM am;
+
+  /**
+   * Setup the cluster with: an RM, a NM and an application for test.
+   * @throws Exception If it cannot set up the cluster.
+   */
+  @Before
+  public void setup() throws Exception {
+    LOG.info("Setting up the test cluster...");
+
+    // Start the Resource Manager
+    Configuration conf = getConfiguration();
+    rm = new MockRM(conf);
+    rm.start();
+    scheduler = rm.getResourceScheduler();
+
+    // Add a Node Manager with 4GB
+    nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+    nmId = nm.getNodeId();
+
+    // Start an AM with 2GB
+    RMApp app = rm.submitApp(2 * GB);
+    nm.nodeHeartbeat(true);
+    attempt = app.getCurrentAppAttempt();
+    am = rm.sendAMLaunched(attempt.getAppAttemptId());
+    am.registerAppAttempt();
+
+    // After allocation, used 2GB and remaining 2GB on the NM
+    assertMemory(scheduler, nmId, 2 * GB, 2 * GB);
+    nm.nodeHeartbeat(true);
+  }
+
+  /**
+   * Get the configuration for the scheduler. This is used when setting up the
+   * Resource Manager and should setup the scheduler (e.g., Capacity Scheduler
+   * or Fair Scheduler). It needs to set the configuration with
+   * {@link YarnConfiguration.RM_SCHEDULER}.
+   * @return Configuration for the scheduler.
+   */
+  protected Configuration getConfiguration() {
+    Configuration conf = new YarnConfiguration();
+
+    // Prevent loading node attributes
+    conf.setClass(YarnConfiguration.FS_NODE_ATTRIBUTE_STORE_IMPL_CLASS,
+        NullNodeAttributeStore.class, NodeAttributeStore.class);
+
+    return conf;
+  }
+
+  /**
+   * Stops the default application and the RM (with the scheduler).
+   * @throws Exception If it cannot stop the cluster.
+   */
+  @After
+  public void cleanup() throws Exception {
+    LOG.info("Cleaning up the test cluster...");
+
+    if (am != null) {
+      am.unregisterAppAttempt();
+      am = null;
+    }
+    if (rm != null) {
+      rm.drainEvents();
+      rm.stop();
+      rm = null;
+    }
+  }
+
+
+  /**
+   * Reducing the resources with no timeout should prevent new containers
+   * but wait for the current ones without killing.
+   */
+  @Test
+  public void testReduceNoTimeout() throws Exception {
+
+    // New 2GB container should give 4 GB used (2+2) and 0 GB available
+    Container c1 = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // Update node resource to 2 GB, so resource is over-consumed
+    updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+    // The used resource should still be 4 GB and negative available resource
+    waitMemory(scheduler, nmId, 4 * GB, -2 * GB, INTERVAL, 2 * 1000);
+    // Check that the NM got the updated resources
+    nm.nodeHeartbeat(true);
+    assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+    // Check that we did not get a preemption request
+    assertNoPreemption(am.schedule().getPreemptionMessage());
+
+    // Check container can complete successfully with resource over-commitment
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
+    nm.containerStatus(containerStatus);
+
+    LOG.info("Waiting for container to be finished for app...");
+    GenericTestUtils.waitFor(
+        () -> attempt.getJustFinishedContainers().size() == 1,
+        INTERVAL, 2 * 1000);
+    assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+    assertMemory(scheduler, nmId, 2 * GB, 0);
+
+    // Verify no NPE is trigger in schedule after resource is updated
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+    AllocateResponse allocResponse2 = am.schedule();
+    assertTrue("Shouldn't have enough resource to allocate containers",
+        allocResponse2.getAllocatedContainers().isEmpty());
+    // Try 10 times as scheduling is an async process
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(INTERVAL);
+      allocResponse2 = am.schedule();
+      assertTrue("Shouldn't have enough resource to allocate containers",
+          allocResponse2.getAllocatedContainers().isEmpty());
+    }
+  }
+
+  /**
+   * Changing resources multiples times without waiting for the
+   * timeout.
+   */
+  @Test
+  public void testChangeResourcesNoTimeout() throws Exception {
+    waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000);
+
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 2 * 1000);
+
+    updateNodeResource(rm, nmId, 0 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, -2 * GB, 100, 2 * 1000);
+
+    updateNodeResource(rm, nmId, 4 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 2 * GB, 100, 2 * 1000);
+
+    // The application should still be running without issues.
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+  }
+
+  /**
+   * Reducing the resources with 0 time out kills the container right away.
+   */
+  @Test
+  public void testReduceKill() throws Exception {
+
+    Container container = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // Reducing to 2GB should kill the container
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 2 * GB, 2, 0);
+    waitMemory(scheduler, nm, 2 * GB, 0 * GB, INTERVAL, 2 * INTERVAL);
+
+    // Check that the new container was killed
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus containerStatus = completedContainers.get(0);
+    assertContainerKilled(container.getId(), containerStatus);
+
+    // It should kill the containers right away
+    assertTime(0, Time.now() - t0);
+  }
+
+  /**
+   * Reducing the resources with a time out should first preempt and then kill.
+   */
+  @Test
+  public void testReducePreemptAndKill() throws Exception {
+
+    Container container = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // We give an overcommit time out of 2 seconds
+    final int timeout = (int)TimeUnit.SECONDS.toMillis(2);
+
+    // Reducing to 2GB should first preempt the container
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 2 * GB, 2, timeout);
+    waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout);
+
+    // We should receive a notification to preempt the container
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(container.getId(), preemptMsg);
+
+    // Wait until the container is killed
+    waitMemory(scheduler, nm, 2 * GB, 0, INTERVAL, timeout + 2 * INTERVAL);
+
+    // Check that the container was killed
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus containerStatus = completedContainers.get(0);
+    assertContainerKilled(container.getId(), containerStatus);
+
+    // Check how long it took to kill the container
+    assertTime(timeout, Time.now() - t0);
+  }
+
+  /**
+   * Reducing the resources (with a time out) triggers a preemption message to
+   * the AM right away. Then, increasing them again should prevent the killing
+   * when the time out would have happened.
+   */
+  @Test
+  public void testReducePreemptAndCancel() throws Exception {
+
+    Container container = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // We give an overcommit time out of 2 seconds
+    final int timeout = (int)TimeUnit.SECONDS.toMillis(1);
+
+    // Reducing to 2GB should first preempt the container
+    updateNodeResource(rm, nmId, 2 * GB, 2, timeout);
+    waitMemory(scheduler, nm, 4 * GB, -2 * GB, INTERVAL, timeout);
+
+    // We should receive a notification to preempt the container
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(container.getId(), preemptMsg);
+
+    // Increase the resources again
+    updateNodeResource(rm, nmId, 4 * GB, 2, timeout);
+    waitMemory(scheduler, nm, 4 * GB, 0, INTERVAL, timeout);
+
+    long t0 = Time.now();
+    while (Time.now() - t0 < TimeUnit.SECONDS.toMillis(2)) {
+      nm.nodeHeartbeat(true);
+      AllocateResponse allocation = am.schedule();
+      assertNoPreemption(allocation.getPreemptionMessage());
+      assertTrue(allocation.getCompletedContainersStatuses().isEmpty());
+      Thread.sleep(INTERVAL);
+    }
+
+    // Check that the containers are still running
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+    assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers());
+  }
+
+  /**
+   * Test the order we kill multiple containers.
+   * It initially has: AM(2GB), C1(1GB), C2(1GB), AM2(2GB), and C3(2GB).
+   * It should kill in this order: C3, C2, C1, AM2, and AM1.
+   */
+  @Test
+  public void testKillMultipleContainers() throws Exception {
+
+    updateNodeResource(rm, nmId, 8 * GB, 6, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 6 * GB, 200, 5 * 1000);
+
+    // Start 2 containers with 1 GB each
+    Container c1 = createContainer(am, 1 * GB);
+    Container c2 = createContainer(am, 1 * GB);
+    waitMemory(scheduler, nmId, 4 * GB, 4 * GB, 200, 5 * 1000);
+
+    // Start an AM with 2GB
+    RMApp app2 = rm.submitApp(2 * GB, "app2", "user2");
+    nm.nodeHeartbeat(true);
+    RMAppAttempt attempt2 = app2.getCurrentAppAttempt();
+    MockAM am2 = rm.sendAMLaunched(attempt2.getAppAttemptId());
+    am2.registerAppAttempt();
+    waitMemory(scheduler, nm, 6 * GB, 2 * GB, 200, 5 * 1000);
+    assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+    Container c3 = createContainer(am2, 2 * GB);
+    waitMemory(scheduler, nm, 8 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(5, scheduler.getNodeReport(nmId).getNumContainers());
+
+    // Reduce the resources to kill C3 and C2 (not AM2)
+    updateNodeResource(rm, nmId, 5 * GB, 6, 0);
+    waitMemory(scheduler, nm, 5 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(3, scheduler.getNodeReport(nmId).getNumContainers());
+
+    List<ContainerStatus> completedContainers =
+        am2.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus container3Status = completedContainers.get(0);
+    assertContainerKilled(c3.getId(), container3Status);
+
+    completedContainers = am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus container2Status = completedContainers.get(0);
+    assertContainerKilled(c2.getId(), container2Status);
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+    assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+    // Reduce the resources to kill C1 (not AM2)
+    updateNodeResource(rm, nmId, 4 * GB, 6, 0);
+    waitMemory(scheduler, nm, 4 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(2, scheduler.getNodeReport(nmId).getNumContainers());
+    completedContainers = am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus container1Status = completedContainers.get(0);
+    assertContainerKilled(c1.getId(), container1Status);
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+    assertEquals(RMAppAttemptState.RUNNING, attempt2.getState());
+
+    // Reduce the resources to kill AM2
+    updateNodeResource(rm, nmId, 2 * GB, 6, 0);
+    waitMemory(scheduler, nm, 2 * GB, 0 * GB, 200, 5 * 1000);
+    assertEquals(1, scheduler.getNodeReport(nmId).getNumContainers());
+    assertEquals(RMAppAttemptState.FAILED, attempt2.getState());
+
+    // The first application should be fine and still running
+    assertEquals(RMAppAttemptState.RUNNING, attempt.getState());
+  }
+
+  @Test
+  public void testEndToEnd() throws Exception {
+
+    Container c1 = createContainer(am, 2 * GB);
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+
+    // check node report, 4 GB used and 0 GB available
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+    nm.nodeHeartbeat(true);
+    assertEquals(4 * GB, nm.getCapability().getMemorySize());
+
+    // update node resource to 2 GB, so resource is over-consumed
+    updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+    // the used resource should still 4 GB and negative available resource
+    waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
+    // check that we did not get a preemption requests
+    assertNoPreemption(am.schedule().getPreemptionMessage());
+
+    // check that the NM got the updated resources
+    nm.nodeHeartbeat(true);
+    assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+    // check container can complete successfully with resource over-commitment
+    ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
+        c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
+    nm.containerStatus(containerStatus);
+
+    LOG.info("Waiting for containers to be finished for app 1...");
+    GenericTestUtils.waitFor(
+        () -> attempt.getJustFinishedContainers().size() == 1, 100, 2000);
+    assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+    assertMemory(scheduler, nmId, 2 * GB, 0);
+
+    // verify no NPE is trigger in schedule after resource is updated
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+    AllocateResponse allocResponse2 = am.schedule();
+    assertTrue("Shouldn't have enough resource to allocate containers",
+        allocResponse2.getAllocatedContainers().isEmpty());
+    // try 10 times as scheduling is an async process
+    for (int i = 0; i < 10; i++) {
+      Thread.sleep(100);
+      allocResponse2 = am.schedule();
+      assertTrue("Shouldn't have enough resource to allocate containers",
+          allocResponse2.getAllocatedContainers().isEmpty());
+    }
+
+    // increase the resources again to 5 GB to schedule the 3GB container
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);
+
+    // kick the scheduling and check it took effect
+    nm.nodeHeartbeat(true);
+    while (allocResponse2.getAllocatedContainers().isEmpty()) {
+      LOG.info("Waiting for containers to be created for app 1...");
+      Thread.sleep(100);
+      allocResponse2 = am.schedule();
+    }
+    assertEquals(1, allocResponse2.getAllocatedContainers().size());
+    Container c2 = allocResponse2.getAllocatedContainers().get(0);
+    assertEquals(3 * GB, c2.getResource().getMemorySize());
+    assertEquals(nmId, c2.getNodeId());
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources and trigger a preempt request to the AM for c2
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // increasing the resources again, should stop killing the containers
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
+    Thread.sleep(3 * 1000);
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources again to trigger a preempt request to the AM for c2
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // wait until the scheduler kills the container
+    GenericTestUtils.waitFor(() -> {
+      try {
+        nm.nodeHeartbeat(true); // trigger preemption in the NM
+      } catch (Exception e) {
+        LOG.error("Cannot heartbeat", e);
+      }
+      SchedulerNodeReport report = scheduler.getNodeReport(nmId);
+      return report.getAvailableResource().getMemorySize() > 0;
+    }, 200, 5 * 1000);
+    assertMemory(scheduler, nmId, 2 * GB, 1 * GB);
+
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus c2status = completedContainers.get(0);
+    assertContainerKilled(c2.getId(), c2status);
+
+    assertTime(2000, Time.now() - t0);
+  }
+
+  /**
+   * Create a container with a particular size and make sure it succeeds.
+   * @param am Application Master to add the container to.
+   * @param memory Memory of the container.
+   * @return Newly created container.
+   * @throws Exception If there are issues creating the container.
+   */
+  protected Container createContainer(
+      final MockAM app, final int memory) throws Exception {
+
+    ResourceRequest req = ResourceRequest.newBuilder()
+        .capability(Resource.newInstance(memory, 1))
+        .numContainers(1)
+        .build();
+    AllocateResponse response = app.allocate(singletonList(req), emptyList());
+    List<Container> allocated = response.getAllocatedContainers();
+    nm.nodeHeartbeat(true);
+    for (int i = 0; allocated.isEmpty() && i < 10; i++) {
+      LOG.info("Waiting for containers to be created for app...");
+      Thread.sleep(INTERVAL);
+      response = app.schedule();
+      allocated = response.getAllocatedContainers();
+      nm.nodeHeartbeat(true);
+    }
+    assertFalse("Cannot create the container", allocated.isEmpty());
+
+    assertEquals(1, allocated.size());
+    final Container c = allocated.get(0);
+    assertEquals(memory, c.getResource().getMemorySize());
+    assertEquals(nmId, c.getNodeId());
+    return c;
+  }
+
+  /**
+   * Update the resources on a Node Manager.
+   * @param rm Resource Manager to contact.
+   * @param nmId Identifier of the Node Manager.
+   * @param memory Memory in MB.
+   * @param vCores Number of virtual cores.
+   * @param overcommitTimeout Timeout for overcommit.
+   * @throws Exception If the update cannot be completed.
+   */
+  public static void updateNodeResource(MockRM rm, NodeId nmId,
+      int memory, int vCores, int overcommitTimeout) throws Exception {
+    AdminService admin = rm.getAdminService();
+    ResourceOption resourceOption = ResourceOption.newInstance(
+        Resource.newInstance(memory, vCores), overcommitTimeout);
+    UpdateNodeResourceRequest req = UpdateNodeResourceRequest.newInstance(
+        singletonMap(nmId, resourceOption));
+    admin.updateNodeResource(req);
+  }
+
+  /**
+   * Make sure that the container was killed.
+   * @param containerId Expected container identifier.
+   * @param status Container status to check.
+   */
+  public static void assertContainerKilled(
+      final ContainerId containerId, final ContainerStatus status) {
+    assertEquals(containerId, status.getContainerId());
+    assertEquals(ContainerState.COMPLETE, status.getState());
+    assertEquals(ContainerExitStatus.PREEMPTED, status.getExitStatus());
+    assertEquals(SchedulerUtils.PREEMPTED_CONTAINER, status.getDiagnostics());
+  }
+
+  /**
+   * Check that an elapsed time is at least the expected time and no more than
+   * two heart beats/scheduling rounds.
+   * @param expectedTime Time expected in milliseconds.
+   * @param time Actual time to check.
+   */
+  public static void assertTime(final long expectedTime, final long time) {
+    assertTrue("Too short: " + time + "ms", time > expectedTime);
+    assertTrue("Too long: " + time + "ms",
+        time < (expectedTime + 2 * INTERVAL));
+  }
+
+  /**
+   * Check that the scheduler didn't ask to preempt anything.
+   * @param msg Preemption message from the scheduler.
+   */
+  public static void assertNoPreemption(final PreemptionMessage msg) {
+    if (msg != null &&
+        msg.getContract() != null &&
+        !msg.getContract().getContainers().isEmpty()) {
+      fail("We shouldn't preempt containers: " + msg);
+    }
+  }
+
+  /**
+   * Check that the scheduler ask to preempt a particular container.
+   * @param containerId Expected container to preempt.
+   * @param msg Preemption message from the scheduler.
+   */
+  public static void assertPreemption(
+      final ContainerId containerId, final PreemptionMessage msg) {
+    assertNotNull("Expected a preemption message", msg);
+    Set<ContainerId> preemptContainers = new HashSet<>();
+    if (msg.getContract() != null) {
+      for (PreemptionContainer c : msg.getContract().getContainers()) {
+        preemptContainers.add(c.getId());
+      }
+    }
+    if (msg.getStrictContract() != null) {
+      for (PreemptionContainer c : msg.getStrictContract().getContainers()) {
+        preemptContainers.add(c.getId());
+      }
+    }
+    assertEquals(Collections.singleton(containerId), preemptContainers);
+  }
+
+  /**
+   * Check if a node report has the expected memory values.
+   * @param scheduler Scheduler with the data.
+   * @param nmId Identifier of the node to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   */
+  public static void assertMemory(ResourceScheduler scheduler, NodeId nmId,
+      long expectedUsed, long expectedAvailable) {
+    SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
+    assertNotNull(nmReport);
+    Resource used = nmReport.getUsedResource();
+    assertEquals("Used memory", expectedUsed, used.getMemorySize());
+    Resource available = nmReport.getAvailableResource();
+    assertEquals("Available memory",
+        expectedAvailable, available.getMemorySize());
+  }
+
+  /**
+   * Wait until the memory of a NM is at a given point.
+   * It does not trigger NM heart beat.
+   * @param scheduler Scheduler with the data.
+   * @param nmId Identifier of the node to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   * @param checkEveryMillis How often to perform the test in ms.
+   * @param waitForMillis The maximum time to wait in ms.
+   * @throws Exception If we don't get to the expected memory.
+   */
+  public static void waitMemory(ResourceScheduler scheduler,
+      NodeId nmId, int expectedUsed, int expectedAvailable,
+      int checkEveryMillis, int waitForMillis) throws Exception {
+    waitMemory(scheduler, nmId, null, expectedUsed, expectedAvailable,
+        checkEveryMillis, waitForMillis);
+  }
+
+  /**
+   * Wait until the memory of a NM is at a given point.
+   * It triggers NM heart beat.
+   * @param scheduler Scheduler with the data.
+   * @param nm Node Manager to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   * @param checkEveryMillis How often to perform the test in ms.
+   * @param waitForMillis The maximum time to wait in ms.
+   * @throws Exception If we don't get to the expected memory.
+   */
+  public static void waitMemory(ResourceScheduler scheduler, MockNM nm,
+      int expectedUsed, int expectedAvailable,
+      int checkEveryMillis, int waitForMillis) throws Exception {
+    waitMemory(scheduler, nm.getNodeId(), nm, expectedUsed, expectedAvailable,
+        checkEveryMillis, waitForMillis);
+  }
+
+  /**
+   * Wait until the memory of a NM is at a given point.
+   * If the NM is specified, it does heart beat.
+   * @param scheduler Scheduler with the data.
+   * @param nmId Identifier of the node to check.
+   * @param nm Node Manager to check.
+   * @param expectedUsed The expected used memory in MB.
+   * @param expectedAvailable The expected available memory in MB.
+   * @param checkEveryMillis How often to perform the test in ms.
+   * @param waitForMillis The maximum time to wait in ms.
+   * @throws Exception If we don't get to the expected memory.
+   */
+  public static void waitMemory(ResourceScheduler scheduler,
+      NodeId nmId, MockNM nm,
+      int expectedUsed, int expectedAvailable,
+      int checkEveryMillis, int waitForMillis) throws Exception {
+
+    long start = Time.monotonicNow();
+    while (Time.monotonicNow() - start < waitForMillis) {
+      try {
+        if (nm != null) {
+          nm.nodeHeartbeat(true);
+        }
+        assertMemory(scheduler, nmId, expectedUsed, expectedAvailable);
+        return;
+      } catch (AssertionError e) {
+        Thread.sleep(checkEveryMillis);
+      }
+    }
+
+    // No success, notify time out
+    SchedulerNodeReport nmReport = scheduler.getNodeReport(nmId);
+    Resource used = nmReport.getUsedResource();
+    Resource available = nmReport.getAvailableResource();
+    throw new TimeoutException("Took longer than " + waitForMillis +
+        "ms to get to " + expectedUsed + "," + expectedAvailable +
+        " actual=" + used + "," + available);
+  }
+}

+ 120 - 85
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java

@@ -21,6 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
 import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertContainerKilled;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertMemory;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertNoPreemption;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertPreemption;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.assertTime;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.updateNodeResource;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerOvercommit.waitMemory;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -57,6 +64,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.LocalConfigurationProvider;
 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -76,12 +84,12 @@ import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
+import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceOption;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -94,8 +102,6 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
 import org.apache.hadoop.yarn.server.resourcemanager.Application;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
@@ -1310,110 +1316,139 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase {
 
   @Test
   public void testResourceOverCommit() throws Exception {
-    int waitCount;
     Configuration conf = new Configuration();
     conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
         ResourceScheduler.class);
     MockRM rm = new MockRM(conf);
     rm.start();
+    ResourceScheduler scheduler = rm.getResourceScheduler();
 
-    MockNM nm1 = rm.registerNode("127.0.0.1:1234", 4 * GB);
-    RMApp app1 = rm.submitApp(2048);
-    // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm1
-    nm1.nodeHeartbeat(true);
-    RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
-    MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
-    am1.registerAppAttempt();
-    SchedulerNodeReport report_nm1 = rm.getResourceScheduler().getNodeReport(
-        nm1.getNodeId());
-    // check node report, 2 GB used and 2 GB available
-    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
-    Assert.assertEquals(2 * GB, report_nm1.getAvailableResource().getMemorySize());
+    MockNM nm = rm.registerNode("127.0.0.1:1234", 4 * GB);
+    NodeId nmId = nm.getNodeId();
+    RMApp app = rm.submitApp(2048);
+    // kick the scheduling, 2 GB given to AM1, remaining 2GB on nm
+    nm.nodeHeartbeat(true);
+    RMAppAttempt attempt1 = app.getCurrentAppAttempt();
+    MockAM am = rm.sendAMLaunched(attempt1.getAppAttemptId());
+    am.registerAppAttempt();
+    assertMemory(scheduler, nmId, 2 * GB, 2 * GB);
 
-    // add request for containers
-    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 2 * GB, 1, 1);
-    AllocateResponse alloc1Response = am1.schedule(); // send the request
+    // add request for 1 container of 2 GB
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 2 * GB, 1, 1);
+    AllocateResponse alloc1Response = am.schedule(); // send the request
 
     // kick the scheduler, 2 GB given to AM1, resource remaining 0
-    nm1.nodeHeartbeat(true);
-    while (alloc1Response.getAllocatedContainers().size() < 1) {
+    nm.nodeHeartbeat(true);
+    while (alloc1Response.getAllocatedContainers().isEmpty()) {
       LOG.info("Waiting for containers to be created for app 1...");
       Thread.sleep(100);
-      alloc1Response = am1.schedule();
+      alloc1Response = am.schedule();
     }
 
     List<Container> allocated1 = alloc1Response.getAllocatedContainers();
-    Assert.assertEquals(1, allocated1.size());
-    Assert.assertEquals(2 * GB, allocated1.get(0).getResource().getMemorySize());
-    Assert.assertEquals(nm1.getNodeId(), allocated1.get(0).getNodeId());
-
-    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-    // check node report, 4 GB used and 0 GB available
-    Assert.assertEquals(0, report_nm1.getAvailableResource().getMemorySize());
-    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
-
-    // check container is assigned with 2 GB.
+    assertEquals(1, allocated1.size());
     Container c1 = allocated1.get(0);
-    Assert.assertEquals(2 * GB, c1.getResource().getMemorySize());
-
-    // update node resource to 2 GB, so resource is over-consumed.
-    Map<NodeId, ResourceOption> nodeResourceMap =
-        new HashMap<NodeId, ResourceOption>();
-    nodeResourceMap.put(nm1.getNodeId(),
-        ResourceOption.newInstance(Resource.newInstance(2 * GB, 1), -1));
-    UpdateNodeResourceRequest request =
-        UpdateNodeResourceRequest.newInstance(nodeResourceMap);
-    AdminService as = ((MockRM)rm).getAdminService();
-    as.updateNodeResource(request);
-
-    waitCount = 0;
-    while (waitCount++ != 20) {
-      report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-      if (report_nm1.getAvailableResource().getMemorySize() != 0) {
-        break;
-      }
-      LOG.info("Waiting for RMNodeResourceUpdateEvent to be handled... Tried "
-          + waitCount + " times already..");
-      Thread.sleep(1000);
-    }
-    // Now, the used resource is still 4 GB, and available resource is minus value.
-    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-    Assert.assertEquals(4 * GB, report_nm1.getUsedResource().getMemorySize());
-    Assert.assertEquals(-2 * GB, report_nm1.getAvailableResource().getMemorySize());
+    assertEquals(2 * GB, c1.getResource().getMemorySize());
+    assertEquals(nmId, c1.getNodeId());
 
-    // Check container can complete successfully in case of resource over-commitment.
+    // check node report, 4 GB used and 0 GB available
+    assertMemory(scheduler, nmId, 4 * GB, 0);
+    nm.nodeHeartbeat(true);
+    assertEquals(4 * GB, nm.getCapability().getMemorySize());
+
+    // update node resource to 2 GB, so resource is over-consumed
+    updateNodeResource(rm, nmId, 2 * GB, 2, -1);
+    // the used resource should still 4 GB and negative available resource
+    waitMemory(scheduler, nmId, 4 * GB, -2 * GB, 200, 5 * 1000);
+    // check that we did not get a preemption requests
+    assertNoPreemption(am.schedule().getPreemptionMessage());
+
+    // check that the NM got the updated resources
+    nm.nodeHeartbeat(true);
+    assertEquals(2 * GB, nm.getCapability().getMemorySize());
+
+    // check container can complete successfully with resource over-commitment
     ContainerStatus containerStatus = BuilderUtils.newContainerStatus(
         c1.getId(), ContainerState.COMPLETE, "", 0, c1.getResource());
-    nm1.containerStatus(containerStatus);
-    waitCount = 0;
-    while (attempt1.getJustFinishedContainers().size() < 1
-        && waitCount++ != 20) {
-      LOG.info("Waiting for containers to be finished for app 1... Tried "
-          + waitCount + " times already..");
+    nm.containerStatus(containerStatus);
+
+    LOG.info("Waiting for containers to be finished for app 1...");
+    GenericTestUtils.waitFor(
+        () -> attempt1.getJustFinishedContainers().size() == 1, 100, 2000);
+    assertEquals(1, am.schedule().getCompletedContainersStatuses().size());
+    assertMemory(scheduler, nmId, 2 * GB, 0);
+
+    // verify no NPE is trigger in schedule after resource is updated
+    am.addRequests(new String[] {"127.0.0.1", "127.0.0.2"}, 3 * GB, 1, 1);
+    AllocateResponse allocResponse2 = am.schedule();
+    assertTrue("Shouldn't have enough resource to allocate containers",
+        allocResponse2.getAllocatedContainers().isEmpty());
+    // try 10 times as scheduling is an async process
+    for (int i = 0; i < 10; i++) {
       Thread.sleep(100);
+      allocResponse2 = am.schedule();
+      assertTrue("Shouldn't have enough resource to allocate containers",
+          allocResponse2.getAllocatedContainers().isEmpty());
     }
-    Assert.assertEquals(1, attempt1.getJustFinishedContainers().size());
-    Assert.assertEquals(1, am1.schedule().getCompletedContainersStatuses().size());
-    report_nm1 = rm.getResourceScheduler().getNodeReport(nm1.getNodeId());
-    Assert.assertEquals(2 * GB, report_nm1.getUsedResource().getMemorySize());
-    // As container return 2 GB back, the available resource becomes 0 again.
-    Assert.assertEquals(0 * GB, report_nm1.getAvailableResource().getMemorySize());
-
-    // Verify no NPE is trigger in schedule after resource is updated.
-    am1.addRequests(new String[] { "127.0.0.1", "127.0.0.2" }, 3 * GB, 1, 1);
-    alloc1Response = am1.schedule();
-    Assert.assertEquals("Shouldn't have enough resource to allocate containers",
-        0, alloc1Response.getAllocatedContainers().size());
-    int times = 0;
-    // try 10 times as scheduling is async process.
-    while (alloc1Response.getAllocatedContainers().size() < 1
-        && times++ < 10) {
-      LOG.info("Waiting for containers to be allocated for app 1... Tried "
-          + times + " times already..");
+
+    // increase the resources again to 5 GB to schedule the 3GB container
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 2 * GB, 3 * GB, 100, 5 * 1000);
+
+    // kick the scheduling and check it took effect
+    nm.nodeHeartbeat(true);
+    while (allocResponse2.getAllocatedContainers().isEmpty()) {
+      LOG.info("Waiting for containers to be created for app 1...");
       Thread.sleep(100);
+      allocResponse2 = am.schedule();
     }
-    Assert.assertEquals("Shouldn't have enough resource to allocate containers",
-        0, alloc1Response.getAllocatedContainers().size());
+    assertEquals(1, allocResponse2.getAllocatedContainers().size());
+    Container c2 = allocResponse2.getAllocatedContainers().get(0);
+    assertEquals(3 * GB, c2.getResource().getMemorySize());
+    assertEquals(nmId, c2.getNodeId());
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources and trigger a preempt request to the AM for c2
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    PreemptionMessage preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // increasing the resources again, should stop killing the containers
+    updateNodeResource(rm, nmId, 5 * GB, 2, -1);
+    waitMemory(scheduler, nmId, 5 * GB, 0, 200, 5 * 1000);
+    Thread.sleep(3 * 1000);
+    assertMemory(scheduler, nmId, 5 * GB, 0);
+
+    // reduce the resources again to trigger a preempt request to the AM for c2
+    long t0 = Time.now();
+    updateNodeResource(rm, nmId, 3 * GB, 2, 2 * 1000);
+    waitMemory(scheduler, nmId, 5 * GB, -2 * GB, 200, 5 * 1000);
+
+    preemptMsg = am.schedule().getPreemptionMessage();
+    assertPreemption(c2.getId(), preemptMsg);
+
+    // wait until the scheduler kills the container
+    GenericTestUtils.waitFor(() -> {
+      try {
+        nm.nodeHeartbeat(true); // trigger preemption in the NM
+      } catch (Exception e) {
+        LOG.error("Cannot heartbeat", e);
+      }
+      SchedulerNodeReport report = scheduler.getNodeReport(nmId);
+      return report.getAvailableResource().getMemorySize() > 0;
+    }, 200, 5 * 1000);
+    assertMemory(scheduler, nmId, 2 * GB, 1 * GB);
+
+    List<ContainerStatus> completedContainers =
+        am.schedule().getCompletedContainersStatuses();
+    assertEquals(1, completedContainers.size());
+    ContainerStatus c2status = completedContainers.get(0);
+    assertContainerKilled(c2.getId(), c2status);
+
+    assertTime(2000, Time.now() - t0);
+
     rm.stop();
   }
 

+ 52 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerOvercommit.java

@@ -0,0 +1,52 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit;
+
+/**
+ * Test changing resources and overcommit in the Capacity Scheduler
+ * {@link CapacityScheduler}.
+ */
+public class TestCapacitySchedulerOvercommit extends TestSchedulerOvercommit {
+
+  @Override
+  protected Configuration getConfiguration() {
+    Configuration conf = super.getConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER,
+        CapacityScheduler.class, ResourceScheduler.class);
+
+    // Remove limits on AMs to allow multiple applications running
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(conf);
+    csConf.setMaximumApplicationMasterResourcePerQueuePercent(
+        CapacitySchedulerConfiguration.ROOT, 100.0f);
+    csConf.setMaximumAMResourcePercentPerPartition(
+        CapacitySchedulerConfiguration.ROOT, "", 100.0f);
+    csConf.setMaximumApplicationMasterResourcePerQueuePercent(
+        CapacitySchedulerConfiguration.ROOT + ".default", 100.0f);
+    csConf.setMaximumAMResourcePercentPerPartition(
+        CapacitySchedulerConfiguration.ROOT + ".default", "", 100.0f);
+
+    return csConf;
+  }
+}

+ 46 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerOvercommit.java

@@ -0,0 +1,46 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerOvercommit;
+
+/**
+ * Test changing resources and overcommit in the Fair Scheduler
+ * {@link FairScheduler}.
+ */
+public class TestFairSchedulerOvercommit extends TestSchedulerOvercommit {
+
+  @Override
+  protected Configuration getConfiguration() {
+    Configuration conf = super.getConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER,
+        FairScheduler.class, ResourceScheduler.class);
+
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
+    conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10 * GB);
+    conf.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, false);
+    conf.setLong(FairSchedulerConfiguration.UPDATE_INTERVAL_MS, 10);
+    conf.setFloat(FairSchedulerConfiguration.PREEMPTION_THRESHOLD, 0f);
+
+    return conf;
+  }
+}

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2-resourcemanager.properties

@@ -0,0 +1,23 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements.  See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+*.period=10
+*.periodMillis=100

+ 23 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/resources/hadoop-metrics2.properties

@@ -0,0 +1,23 @@
+#
+#   Licensed to the Apache Software Foundation (ASF) under one or more
+#   contributor license agreements.  See the NOTICE file distributed with
+#   this work for additional information regarding copyright ownership.
+#   The ASF licenses this file to You under the Apache License, Version 2.0
+#   (the "License"); you may not use this file except in compliance with
+#   the License.  You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# syntax: [prefix].[source|sink].[instance].[options]
+# See javadoc of package-info.java for org.apache.hadoop.metrics2 for details
+
+*.sink.file.class=org.apache.hadoop.metrics2.sink.FileSink
+# default sampling period, in seconds
+*.period=10
+*.periodMillis=100