Browse Source

YARN-5978. ContainerScheduler and ContainerManager changes to support ExecType update. (Kartheek Muthyala via asuresh)

(cherry picked from commit 4d7be1d8575e9254c59d41460960708e3718503a)
Arun Suresh 7 years ago
parent
commit
7b22df3da6
17 changed files with 964 additions and 237 deletions
  1. 366 29
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java
  2. 3 4
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
  3. 82 50
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
  4. 2 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java
  5. 17 20
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
  6. 0 24
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java
  7. 73 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java
  8. 1 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java
  9. 85 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java
  10. 10 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
  11. 31 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
  12. 167 100
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
  13. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
  14. 96 0
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java
  15. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java
  16. 1 1
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
  17. 28 2
      hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java

+ 366 - 29
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestAMRMClient.java

@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.client.api.impl;
 package org.apache.hadoop.yarn.client.api.impl;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.any;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.TreeSet;
 
 
@@ -142,6 +144,10 @@ public class TestAMRMClient {
     // set the minimum allocation so that resource decrease can go under 1024
     // set the minimum allocation so that resource decrease can go under 1024
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
+    conf.setBoolean(
+        YarnConfiguration.OPPORTUNISTIC_CONTAINER_ALLOCATION_ENABLED, true);
+    conf.setInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
     yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
     yarnCluster = new MiniYARNCluster(TestAMRMClient.class.getName(), nodeCount, 1, 1);
     yarnCluster.init(conf);
     yarnCluster.init(conf);
     yarnCluster.start();
     yarnCluster.start();
@@ -908,8 +914,8 @@ public class TestAMRMClient {
     // add exp=x to ANY
     // add exp=x to ANY
     client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
     client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
         1), null, null, Priority.UNDEFINED, true, "x"));
         1), null, null, Priority.UNDEFINED, true, "x"));
-    Assert.assertEquals(1, client.ask.size());
-    Assert.assertEquals("x", client.ask.iterator().next()
+    assertEquals(1, client.ask.size());
+    assertEquals("x", client.ask.iterator().next()
         .getNodeLabelExpression());
         .getNodeLabelExpression());
 
 
     // add exp=x then add exp=a to ANY in same priority, only exp=a should kept
     // add exp=x then add exp=a to ANY in same priority, only exp=a should kept
@@ -917,8 +923,8 @@ public class TestAMRMClient {
         1), null, null, Priority.UNDEFINED, true, "x"));
         1), null, null, Priority.UNDEFINED, true, "x"));
     client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
     client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
         1), null, null, Priority.UNDEFINED, true, "a"));
         1), null, null, Priority.UNDEFINED, true, "a"));
-    Assert.assertEquals(1, client.ask.size());
-    Assert.assertEquals("a", client.ask.iterator().next()
+    assertEquals(1, client.ask.size());
+    assertEquals("a", client.ask.iterator().next()
         .getNodeLabelExpression());
         .getNodeLabelExpression());
     
     
     // add exp=x to ANY, rack and node, only resource request has ANY resource
     // add exp=x to ANY, rack and node, only resource request has ANY resource
@@ -927,10 +933,10 @@ public class TestAMRMClient {
     client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
     client.addContainerRequest(new ContainerRequest(Resource.newInstance(1024,
         1), null, null, Priority.UNDEFINED, true,
         1), null, null, Priority.UNDEFINED, true,
         "y"));
         "y"));
-    Assert.assertEquals(1, client.ask.size());
+    assertEquals(1, client.ask.size());
     for (ResourceRequest req : client.ask) {
     for (ResourceRequest req : client.ask) {
       if (ResourceRequest.ANY.equals(req.getResourceName())) {
       if (ResourceRequest.ANY.equals(req.getResourceName())) {
-        Assert.assertEquals("y", req.getNodeLabelExpression());
+        assertEquals("y", req.getNodeLabelExpression());
       } else {
       } else {
         Assert.assertNull(req.getNodeLabelExpression());
         Assert.assertNull(req.getNodeLabelExpression());
       }
       }
@@ -941,7 +947,7 @@ public class TestAMRMClient {
         new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y"));
         new String[] { "node1", "node2" }, Priority.UNDEFINED, true, "y"));
     for (ResourceRequest req : client.ask) {
     for (ResourceRequest req : client.ask) {
       if (ResourceRequest.ANY.equals(req.getResourceName())) {
       if (ResourceRequest.ANY.equals(req.getResourceName())) {
-        Assert.assertEquals("y", req.getNodeLabelExpression());
+        assertEquals("y", req.getNodeLabelExpression());
       } else {
       } else {
         Assert.assertNull(req.getNodeLabelExpression());
         Assert.assertNull(req.getNodeLabelExpression());
       }
       }
@@ -955,7 +961,7 @@ public class TestAMRMClient {
     } catch (InvalidContainerRequestException e) {
     } catch (InvalidContainerRequestException e) {
       return;
       return;
     }
     }
-    Assert.fail();
+    fail();
   }
   }
   
   
   @Test(timeout=30000)
   @Test(timeout=30000)
@@ -1026,7 +1032,8 @@ public class TestAMRMClient {
     // get allocations
     // get allocations
     AllocateResponse allocResponse = amClient.allocate(0.1f);
     AllocateResponse allocResponse = amClient.allocate(0.1f);
     List<Container> containers = allocResponse.getAllocatedContainers();
     List<Container> containers = allocResponse.getAllocatedContainers();
-    Assert.assertEquals(num, containers.size());
+    assertEquals(num, containers.size());
+
     // build container launch context
     // build container launch context
     Credentials ts = new Credentials();
     Credentials ts = new Credentials();
     DataOutputBuffer dob = new DataOutputBuffer();
     DataOutputBuffer dob = new DataOutputBuffer();
@@ -1067,14 +1074,14 @@ public class TestAMRMClient {
   private void doContainerResourceChange(
   private void doContainerResourceChange(
       final AMRMClient<ContainerRequest> amClient, List<Container> containers)
       final AMRMClient<ContainerRequest> amClient, List<Container> containers)
       throws YarnException, IOException {
       throws YarnException, IOException {
-    Assert.assertEquals(3, containers.size());
+    assertEquals(3, containers.size());
     // remember the container IDs
     // remember the container IDs
     Container container1 = containers.get(0);
     Container container1 = containers.get(0);
     Container container2 = containers.get(1);
     Container container2 = containers.get(1);
     Container container3 = containers.get(2);
     Container container3 = containers.get(2);
     AMRMClientImpl<ContainerRequest> amClientImpl =
     AMRMClientImpl<ContainerRequest> amClientImpl =
         (AMRMClientImpl<ContainerRequest>) amClient;
         (AMRMClientImpl<ContainerRequest>) amClient;
-    Assert.assertEquals(0, amClientImpl.change.size());
+    assertEquals(0, amClientImpl.change.size());
     // verify newer request overwrites older request for the container1
     // verify newer request overwrites older request for the container1
     amClientImpl.requestContainerUpdate(container1,
     amClientImpl.requestContainerUpdate(container1,
         UpdateContainerRequest.newInstance(container1.getVersion(),
         UpdateContainerRequest.newInstance(container1.getVersion(),
@@ -1084,21 +1091,21 @@ public class TestAMRMClient {
         UpdateContainerRequest.newInstance(container1.getVersion(),
         UpdateContainerRequest.newInstance(container1.getVersion(),
             container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
             container1.getId(), ContainerUpdateType.INCREASE_RESOURCE,
             Resource.newInstance(4096, 1), null));
             Resource.newInstance(4096, 1), null));
-    Assert.assertEquals(Resource.newInstance(4096, 1),
+    assertEquals(Resource.newInstance(4096, 1),
         amClientImpl.change.get(container1.getId()).getValue().getCapability());
         amClientImpl.change.get(container1.getId()).getValue().getCapability());
     // verify new decrease request cancels old increase request for container1
     // verify new decrease request cancels old increase request for container1
     amClientImpl.requestContainerUpdate(container1,
     amClientImpl.requestContainerUpdate(container1,
         UpdateContainerRequest.newInstance(container1.getVersion(),
         UpdateContainerRequest.newInstance(container1.getVersion(),
             container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
             container1.getId(), ContainerUpdateType.DECREASE_RESOURCE,
             Resource.newInstance(512, 1), null));
             Resource.newInstance(512, 1), null));
-    Assert.assertEquals(Resource.newInstance(512, 1),
+    assertEquals(Resource.newInstance(512, 1),
         amClientImpl.change.get(container1.getId()).getValue().getCapability());
         amClientImpl.change.get(container1.getId()).getValue().getCapability());
     // request resource increase for container2
     // request resource increase for container2
     amClientImpl.requestContainerUpdate(container2,
     amClientImpl.requestContainerUpdate(container2,
         UpdateContainerRequest.newInstance(container2.getVersion(),
         UpdateContainerRequest.newInstance(container2.getVersion(),
             container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
             container2.getId(), ContainerUpdateType.INCREASE_RESOURCE,
             Resource.newInstance(2048, 1), null));
             Resource.newInstance(2048, 1), null));
-    Assert.assertEquals(Resource.newInstance(2048, 1),
+    assertEquals(Resource.newInstance(2048, 1),
         amClientImpl.change.get(container2.getId()).getValue().getCapability());
         amClientImpl.change.get(container2.getId()).getValue().getCapability());
     // verify release request will cancel pending change requests for the same
     // verify release request will cancel pending change requests for the same
     // container
     // container
@@ -1106,27 +1113,357 @@ public class TestAMRMClient {
         UpdateContainerRequest.newInstance(container3.getVersion(),
         UpdateContainerRequest.newInstance(container3.getVersion(),
             container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
             container3.getId(), ContainerUpdateType.INCREASE_RESOURCE,
             Resource.newInstance(2048, 1), null));
             Resource.newInstance(2048, 1), null));
-    Assert.assertEquals(3, amClientImpl.pendingChange.size());
+    assertEquals(3, amClientImpl.pendingChange.size());
     amClientImpl.releaseAssignedContainer(container3.getId());
     amClientImpl.releaseAssignedContainer(container3.getId());
-    Assert.assertEquals(2, amClientImpl.pendingChange.size());
+    assertEquals(2, amClientImpl.pendingChange.size());
     // as of now: container1 asks to decrease to (512, 1)
     // as of now: container1 asks to decrease to (512, 1)
     //            container2 asks to increase to (2048, 1)
     //            container2 asks to increase to (2048, 1)
     // send allocation requests
     // send allocation requests
     AllocateResponse allocResponse = amClient.allocate(0.1f);
     AllocateResponse allocResponse = amClient.allocate(0.1f);
-    Assert.assertEquals(0, amClientImpl.change.size());
+    assertEquals(0, amClientImpl.change.size());
     // we should get decrease confirmation right away
     // we should get decrease confirmation right away
     List<UpdatedContainer> updatedContainers =
     List<UpdatedContainer> updatedContainers =
         allocResponse.getUpdatedContainers();
         allocResponse.getUpdatedContainers();
-    Assert.assertEquals(1, updatedContainers.size());
+    assertEquals(1, updatedContainers.size());
     // we should get increase allocation after the next NM's heartbeat to RM
     // we should get increase allocation after the next NM's heartbeat to RM
     triggerSchedulingWithNMHeartBeat();
     triggerSchedulingWithNMHeartBeat();
     // get allocations
     // get allocations
     allocResponse = amClient.allocate(0.1f);
     allocResponse = amClient.allocate(0.1f);
     updatedContainers =
     updatedContainers =
         allocResponse.getUpdatedContainers();
         allocResponse.getUpdatedContainers();
-    Assert.assertEquals(1, updatedContainers.size());
+    assertEquals(1, updatedContainers.size());
   }
   }
 
 
+  @Test(timeout=60000)
+  public void testAMRMClientWithContainerPromotion()
+      throws YarnException, IOException {
+    AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
+        (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
+            .createAMRMClient();
+    //asserting we are not using the singleton instance cache
+    Assert.assertSame(NMTokenCache.getSingleton(),
+        amClient.getNMTokenCache());
+    amClient.init(conf);
+    amClient.start();
+
+    // start am nm client
+    NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
+    Assert.assertNotNull(nmClient);
+    // asserting we are using the singleton instance cache
+    Assert.assertSame(
+        NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
+    nmClient.init(conf);
+    nmClient.start();
+    assertEquals(STATE.STARTED, nmClient.getServiceState());
+
+    amClient.registerApplicationMaster("Host", 10000, "");
+    // setup container request
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // START OPPORTUNISTIC Container, Send allocation request to RM
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null, ExecutionTypeRequest
+            .newInstance(ExecutionType.OPPORTUNISTIC, true)));
+
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.OPPORTUNISTIC, capability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(1, oppContainersRequestedAny);
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    Map<ContainerId, Container> allocatedOpportContainers = new HashMap<>();
+    int iterationsLeft = 50;
+
+    amClient.getNMTokenCache().clearCache();
+    assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+
+    AllocateResponse allocResponse = null;
+    while (allocatedContainerCount < oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      allocResponse = amClient.allocate(0.1f);
+      // let NM heartbeat to RM and trigger allocations
+      //triggerSchedulingWithNMHeartBeat();
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.OPPORTUNISTIC) {
+          allocatedOpportContainers.put(container.getId(), container);
+        }
+      }
+      if (allocatedContainerCount < oppContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+
+    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+    assertEquals(oppContainersRequestedAny, allocatedOpportContainers.size());
+
+    startContainer(allocResponse, nmClient);
+
+    // SEND PROMOTION REQUEST TO RM
+    try {
+      Container c = allocatedOpportContainers.values().iterator().next();
+      amClient.requestContainerUpdate(
+          c, UpdateContainerRequest.newInstance(c.getVersion(),
+              c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+              null, ExecutionType.OPPORTUNISTIC));
+      fail("Should throw Exception..");
+    } catch (IllegalArgumentException e) {
+      System.out.println("## " + e.getMessage());
+      assertTrue(e.getMessage().contains(
+          "target should be GUARANTEED and original should be OPPORTUNISTIC"));
+    }
+
+    Container c = allocatedOpportContainers.values().iterator().next();
+    amClient.requestContainerUpdate(
+        c, UpdateContainerRequest.newInstance(c.getVersion(),
+            c.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE,
+            null, ExecutionType.GUARANTEED));
+    iterationsLeft = 120;
+    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+      // inform RM of rejection
+      allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      if (allocResponse.getUpdatedContainers() != null) {
+        for (UpdatedContainer updatedContainer : allocResponse
+            .getUpdatedContainers()) {
+          System.out.println("Got update..");
+          updatedContainers.put(updatedContainer.getContainer().getId(),
+              updatedContainer);
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
+      }
+    }
+    assertEquals(1, updatedContainers.size());
+
+    for (ContainerId cId : allocatedOpportContainers.keySet()) {
+      Container orig = allocatedOpportContainers.get(cId);
+      UpdatedContainer updatedContainer = updatedContainers.get(cId);
+      assertNotNull(updatedContainer);
+      assertEquals(ExecutionType.GUARANTEED,
+          updatedContainer.getContainer().getExecutionType());
+      assertEquals(orig.getResource(),
+          updatedContainer.getContainer().getResource());
+      assertEquals(orig.getNodeId(),
+          updatedContainer.getContainer().getNodeId());
+      assertEquals(orig.getVersion() + 1,
+          updatedContainer.getContainer().getVersion());
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // SEND UPDATE EXECTYPE UPDATE TO NM
+    updateContainerExecType(allocResponse, ExecutionType.GUARANTEED, nmClient);
+
+    amClient.ask.clear();
+  }
+
+  @Test(timeout=60000)
+  public void testAMRMClientWithContainerDemotion()
+      throws YarnException, IOException {
+    AMRMClientImpl<AMRMClient.ContainerRequest> amClient =
+        (AMRMClientImpl<AMRMClient.ContainerRequest>) AMRMClient
+            .createAMRMClient();
+    //asserting we are not using the singleton instance cache
+    Assert.assertSame(NMTokenCache.getSingleton(),
+        amClient.getNMTokenCache());
+    amClient.init(conf);
+    amClient.start();
+
+    NMClientImpl nmClient = (NMClientImpl) NMClient.createNMClient();
+    Assert.assertNotNull(nmClient);
+    // asserting we are using the singleton instance cache
+    Assert.assertSame(
+        NMTokenCache.getSingleton(), nmClient.getNMTokenCache());
+    nmClient.init(conf);
+    nmClient.start();
+    assertEquals(STATE.STARTED, nmClient.getServiceState());
+
+    amClient.registerApplicationMaster("Host", 10000, "");
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // START OPPORTUNISTIC Container, Send allocation request to RM
+    amClient.addContainerRequest(
+        new AMRMClient.ContainerRequest(capability, null, null, priority2, 0,
+            true, null, ExecutionTypeRequest
+            .newInstance(ExecutionType.GUARANTEED, true)));
+
+    int oppContainersRequestedAny =
+        amClient.getTable(0).get(priority2, ResourceRequest.ANY,
+            ExecutionType.GUARANTEED, capability).remoteRequest
+            .getNumContainers();
+
+    assertEquals(1, oppContainersRequestedAny);
+    assertEquals(1, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    // RM should allocate container within 2 calls to allocate()
+    int allocatedContainerCount = 0;
+    Map<ContainerId, Container> allocatedGuaranteedContainers = new HashMap<>();
+    int iterationsLeft = 50;
+
+    amClient.getNMTokenCache().clearCache();
+    assertEquals(0,
+        amClient.getNMTokenCache().numberOfTokensInCache());
+
+    AllocateResponse allocResponse = null;
+    while (allocatedContainerCount < oppContainersRequestedAny
+        && iterationsLeft-- > 0) {
+      allocResponse = amClient.allocate(0.1f);
+      // let NM heartbeat to RM and trigger allocations
+      //triggerSchedulingWithNMHeartBeat();
+      assertEquals(0, amClient.ask.size());
+      assertEquals(0, amClient.release.size());
+
+      allocatedContainerCount +=
+          allocResponse.getAllocatedContainers().size();
+      for (Container container : allocResponse.getAllocatedContainers()) {
+        if (container.getExecutionType() == ExecutionType.GUARANTEED) {
+          allocatedGuaranteedContainers.put(container.getId(), container);
+        }
+      }
+      if (allocatedContainerCount < oppContainersRequestedAny) {
+        // sleep to let NM's heartbeat to RM and trigger allocations
+        sleep(100);
+      }
+    }
+    assertEquals(oppContainersRequestedAny, allocatedContainerCount);
+    assertEquals(oppContainersRequestedAny,
+        allocatedGuaranteedContainers.size());
+    startContainer(allocResponse, nmClient);
+
+    // SEND DEMOTION REQUEST TO RM
+    try {
+      Container c = allocatedGuaranteedContainers.values().iterator().next();
+      amClient.requestContainerUpdate(
+          c, UpdateContainerRequest.newInstance(c.getVersion(),
+              c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+              null, ExecutionType.GUARANTEED));
+      fail("Should throw Exception..");
+    } catch (IllegalArgumentException e) {
+      System.out.println("## " + e.getMessage());
+      assertTrue(e.getMessage().contains(
+          "target should be OPPORTUNISTIC and original should be GUARANTEED"));
+    }
+
+    Container c = allocatedGuaranteedContainers.values().iterator().next();
+    amClient.requestContainerUpdate(
+        c, UpdateContainerRequest.newInstance(c.getVersion(),
+            c.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE,
+            null, ExecutionType.OPPORTUNISTIC));
+    iterationsLeft = 120;
+    Map<ContainerId, UpdatedContainer> updatedContainers = new HashMap<>();
+    // do a few iterations to ensure RM is not going to send new containers
+    while (iterationsLeft-- > 0 && updatedContainers.isEmpty()) {
+      // inform RM of rejection
+      allocResponse = amClient.allocate(0.1f);
+      // RM did not send new containers because AM does not need any
+      if (allocResponse.getUpdatedContainers() != null) {
+        for (UpdatedContainer updatedContainer : allocResponse
+            .getUpdatedContainers()) {
+          System.out.println("Got update..");
+          updatedContainers.put(updatedContainer.getContainer().getId(),
+              updatedContainer);
+        }
+      }
+      if (iterationsLeft > 0) {
+        // sleep to make sure NM's heartbeat
+        sleep(100);
+      }
+    }
+    assertEquals(1, updatedContainers.size());
+
+    for (ContainerId cId : allocatedGuaranteedContainers.keySet()) {
+      Container orig = allocatedGuaranteedContainers.get(cId);
+      UpdatedContainer updatedContainer = updatedContainers.get(cId);
+      assertNotNull(updatedContainer);
+      assertEquals(ExecutionType.OPPORTUNISTIC,
+          updatedContainer.getContainer().getExecutionType());
+      assertEquals(orig.getResource(),
+          updatedContainer.getContainer().getResource());
+      assertEquals(orig.getNodeId(),
+          updatedContainer.getContainer().getNodeId());
+      assertEquals(orig.getVersion() + 1,
+          updatedContainer.getContainer().getVersion());
+    }
+    assertEquals(0, amClient.ask.size());
+    assertEquals(0, amClient.release.size());
+
+    updateContainerExecType(allocResponse, ExecutionType.OPPORTUNISTIC,
+        nmClient);
+    amClient.ask.clear();
+  }
+
+  private void updateContainerExecType(AllocateResponse allocResponse,
+      ExecutionType expectedExecType, NMClientImpl nmClient)
+      throws IOException, YarnException {
+    for (UpdatedContainer updatedContainer : allocResponse
+        .getUpdatedContainers()) {
+      Container container = updatedContainer.getContainer();
+      nmClient.increaseContainerResource(container);
+      // NodeManager may still need some time to get the stable
+      // container status
+      while (true) {
+        ContainerStatus status = nmClient
+            .getContainerStatus(container.getId(), container.getNodeId());
+        if (status.getExecutionType() == expectedExecType) {
+          break;
+        }
+        sleep(10);
+      }
+    }
+  }
+
+  private void startContainer(AllocateResponse allocResponse,
+      NMClientImpl nmClient) throws IOException, YarnException {
+    // START THE CONTAINER IN NM
+    // build container launch context
+    Credentials ts = new Credentials();
+    DataOutputBuffer dob = new DataOutputBuffer();
+    ts.writeTokenStorageToStream(dob);
+    ByteBuffer securityTokens =
+        ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    // start a process long enough for increase/decrease action to take effect
+    ContainerLaunchContext clc = BuilderUtils.newContainerLaunchContext(
+        Collections.<String, LocalResource>emptyMap(),
+        new HashMap<String, String>(), Arrays.asList("sleep", "100"),
+        new HashMap<String, ByteBuffer>(), securityTokens,
+        new HashMap<ApplicationAccessType, String>());
+    // start the containers and make sure they are in RUNNING state
+    for (Container container : allocResponse.getAllocatedContainers()) {
+      nmClient.startContainer(container, clc);
+      // NodeManager may still need some time to get the stable
+      // container status
+      while (true) {
+        ContainerStatus status = nmClient
+            .getContainerStatus(container.getId(), container.getNodeId());
+        if (status.getState() == ContainerState.RUNNING) {
+          break;
+        }
+        sleep(10);
+      }
+    }
+  }
+
+
   private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
   private void testAllocation(final AMRMClientImpl<ContainerRequest> amClient)
       throws YarnException, IOException {
       throws YarnException, IOException {
     // setup container request
     // setup container request
@@ -1156,7 +1493,7 @@ public class TestAMRMClient {
     Set<ContainerId> releases = new TreeSet<ContainerId>();
     Set<ContainerId> releases = new TreeSet<ContainerId>();
     
     
     amClient.getNMTokenCache().clearCache();
     amClient.getNMTokenCache().clearCache();
-    Assert.assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
+    assertEquals(0, amClient.getNMTokenCache().numberOfTokensInCache());
     HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
     HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
     
     
     while (allocatedContainerCount < containersRequestedAny
     while (allocatedContainerCount < containersRequestedAny
@@ -1176,7 +1513,7 @@ public class TestAMRMClient {
       for (NMToken token : allocResponse.getNMTokens()) {
       for (NMToken token : allocResponse.getNMTokens()) {
         String nodeID = token.getNodeId().toString();
         String nodeID = token.getNodeId().toString();
         if (receivedNMTokens.containsKey(nodeID)) {
         if (receivedNMTokens.containsKey(nodeID)) {
-          Assert.fail("Received token again for : " + nodeID);          
+          fail("Received token again for : " + nodeID);
         }
         }
         receivedNMTokens.put(nodeID, token.getToken());
         receivedNMTokens.put(nodeID, token.getToken());
       }
       }
@@ -1188,7 +1525,7 @@ public class TestAMRMClient {
     }
     }
     
     
     // Should receive atleast 1 token
     // Should receive atleast 1 token
-    Assert.assertTrue(receivedNMTokens.size() > 0
+    assertTrue(receivedNMTokens.size() > 0
         && receivedNMTokens.size() <= nodeCount);
         && receivedNMTokens.size() <= nodeCount);
     
     
     assertEquals(allocatedContainerCount, containersRequestedAny);
     assertEquals(allocatedContainerCount, containersRequestedAny);
@@ -1428,7 +1765,7 @@ public class TestAMRMClient {
       org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 =
       org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_1 =
           getAMRMToken();
           getAMRMToken();
       Assert.assertNotNull(amrmToken_1);
       Assert.assertNotNull(amrmToken_1);
-      Assert.assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
+      assertEquals(amrmToken_1.decodeIdentifier().getKeyId(),
         amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
         amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
 
 
       // Wait for enough time and make sure the roll_over happens
       // Wait for enough time and make sure the roll_over happens
@@ -1443,7 +1780,7 @@ public class TestAMRMClient {
       org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 =
       org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken_2 =
           getAMRMToken();
           getAMRMToken();
       Assert.assertNotNull(amrmToken_2);
       Assert.assertNotNull(amrmToken_2);
-      Assert.assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
+      assertEquals(amrmToken_2.decodeIdentifier().getKeyId(),
         amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
         amrmTokenSecretManager.getMasterKey().getMasterKey().getKeyId());
 
 
       Assert.assertNotEquals(amrmToken_1, amrmToken_2);
       Assert.assertNotEquals(amrmToken_1, amrmToken_2);
@@ -1458,7 +1795,7 @@ public class TestAMRMClient {
       AMRMTokenIdentifierForTest newVersionTokenIdentifier = 
       AMRMTokenIdentifierForTest newVersionTokenIdentifier = 
           new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
           new AMRMTokenIdentifierForTest(amrmToken_2.decodeIdentifier(), "message");
       
       
-      Assert.assertEquals("Message is changed after set to newVersionTokenIdentifier",
+      assertEquals("Message is changed after set to newVersionTokenIdentifier",
           "message", newVersionTokenIdentifier.getMessage());
           "message", newVersionTokenIdentifier.getMessage());
       org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken = 
       org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> newVersionToken = 
           new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
           new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> (
@@ -1514,10 +1851,10 @@ public class TestAMRMClient {
                 .getBindAddress(), conf);
                 .getBindAddress(), conf);
           }
           }
         }).allocate(Records.newRecord(AllocateRequest.class));
         }).allocate(Records.newRecord(AllocateRequest.class));
-        Assert.fail("The old Token should not work");
+        fail("The old Token should not work");
       } catch (Exception ex) {
       } catch (Exception ex) {
-        Assert.assertTrue(ex instanceof InvalidToken);
-        Assert.assertTrue(ex.getMessage().contains(
+        assertTrue(ex instanceof InvalidToken);
+        assertTrue(ex.getMessage().contains(
           "Invalid AMRMToken from "
           "Invalid AMRMToken from "
               + amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
               + amrmToken_2.decodeIdentifier().getApplicationAttemptId()));
       }
       }
@@ -1544,7 +1881,7 @@ public class TestAMRMClient {
       org.apache.hadoop.security.token.Token<?> token = iter.next();
       org.apache.hadoop.security.token.Token<?> token = iter.next();
       if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
       if (token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
         if (result != null) {
         if (result != null) {
-          Assert.fail("credentials has more than one AMRM token."
+          fail("credentials has more than one AMRM token."
               + " token1: " + result + " token2: " + token);
               + " token1: " + result + " token2: " + token);
         }
         }
         result = (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>)
         result = (org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>)

+ 3 - 4
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java

@@ -301,7 +301,6 @@ public class TestNMClient {
         assertTrue("The thrown exception is not expected",
         assertTrue("The thrown exception is not expected",
             e.getMessage().contains("is not handled by this NodeManager"));
             e.getMessage().contains("is not handled by this NodeManager"));
       }
       }
-
       // increaseContainerResource shouldn't be called before startContainer,
       // increaseContainerResource shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
       // otherwise, NodeManager cannot find the container
       try {
       try {
@@ -475,10 +474,10 @@ public class TestNMClient {
     try {
     try {
       nmClient.increaseContainerResource(container);
       nmClient.increaseContainerResource(container);
     } catch (YarnException e) {
     } catch (YarnException e) {
-      // NM container will only be in SCHEDULED state, so expect the increase
-      // action to fail.
+      // NM container increase container resource should fail without a version
+      // increase action to fail.
       if (!e.getMessage().contains(
       if (!e.getMessage().contains(
-          "can only be changed when a container is in RUNNING state")) {
+          container.getId() + " has update version ")) {
         throw (AssertionError)
         throw (AssertionError)
             (new AssertionError("Exception is not expected: " + e)
             (new AssertionError("Exception is not expected: " + e)
                 .initCause(e));
                 .initCause(e));

+ 82 - 50
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java

@@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
@@ -134,13 +135,14 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ChangeMonitoringContainerResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitorImpl;
 
 
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerScheduler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.ContainerSchedulerEventType;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler.UpdateContainerSchedulerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredApplicationsState;
@@ -382,8 +384,24 @@ public class ContainerManagerImpl extends CompositeService implements
       throws IOException {
       throws IOException {
     StartContainerRequest req = rcs.getStartRequest();
     StartContainerRequest req = rcs.getStartRequest();
     ContainerLaunchContext launchContext = req.getContainerLaunchContext();
     ContainerLaunchContext launchContext = req.getContainerLaunchContext();
-    ContainerTokenIdentifier token =
-        BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+    ContainerTokenIdentifier token = null;
+    if(rcs.getCapability() != null) {
+      ContainerTokenIdentifier originalToken =
+          BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+      token = new ContainerTokenIdentifier(originalToken.getContainerID(),
+          originalToken.getVersion(), originalToken.getNmHostAddress(),
+          originalToken.getApplicationSubmitter(), rcs.getCapability(),
+          originalToken.getExpiryTimeStamp(), originalToken.getMasterKeyId(),
+          originalToken.getRMIdentifier(), originalToken.getPriority(),
+          originalToken.getCreationTime(),
+          originalToken.getLogAggregationContext(),
+          originalToken.getNodeLabelExpression(),
+          originalToken.getContainerType(), originalToken.getExecutionType());
+
+    } else {
+      token = BuilderUtils.newContainerTokenIdentifier(req.getContainerToken());
+    }
+
     ContainerId containerId = token.getContainerID();
     ContainerId containerId = token.getContainerID();
     ApplicationId appId =
     ApplicationId appId =
         containerId.getApplicationAttemptId().getApplicationId();
         containerId.getApplicationAttemptId().getApplicationId();
@@ -1122,9 +1140,7 @@ public class ContainerManagerImpl extends CompositeService implements
           // as container resource increase request will have come with
           // as container resource increase request will have come with
           // an updated NMToken.
           // an updated NMToken.
           updateNMTokenIdentifier(nmTokenIdentifier);
           updateNMTokenIdentifier(nmTokenIdentifier);
-          Resource resource = containerTokenIdentifier.getResource();
-          changeContainerResourceInternal(containerId,
-              containerTokenIdentifier.getVersion(), resource, true);
+          updateContainerInternal(containerId, containerTokenIdentifier);
           successfullyUpdatedContainers.add(containerId);
           successfullyUpdatedContainers.add(containerId);
         } catch (YarnException | InvalidToken e) {
         } catch (YarnException | InvalidToken e) {
           failedContainers.put(containerId, SerializedException.newInstance(e));
           failedContainers.put(containerId, SerializedException.newInstance(e));
@@ -1138,9 +1154,9 @@ public class ContainerManagerImpl extends CompositeService implements
   }
   }
 
 
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
-  private void changeContainerResourceInternal(ContainerId containerId,
-      int containerVersion, Resource targetResource, boolean increase)
-          throws YarnException, IOException {
+  private void updateContainerInternal(ContainerId containerId,
+      ContainerTokenIdentifier containerTokenIdentifier)
+      throws YarnException, IOException {
     Container container = context.getContainers().get(containerId);
     Container container = context.getContainers().get(containerId);
     // Check container existence
     // Check container existence
     if (container == null) {
     if (container == null) {
@@ -1152,64 +1168,77 @@ public class ContainerManagerImpl extends CompositeService implements
             + " is not handled by this NodeManager");
             + " is not handled by this NodeManager");
       }
       }
     }
     }
+    // Check container version.
+    int currentVersion = container.getContainerTokenIdentifier().getVersion();
+    if (containerTokenIdentifier.getVersion() <= currentVersion) {
+      throw RPCUtil.getRemoteException("Container " + containerId.toString()
+          + " has update version [" + currentVersion + "] >= requested version"
+          + " [" + containerTokenIdentifier.getVersion() + "]");
+    }
+
     // Check container state
     // Check container state
     org.apache.hadoop.yarn.server.nodemanager.
     org.apache.hadoop.yarn.server.nodemanager.
         containermanager.container.ContainerState currentState =
         containermanager.container.ContainerState currentState =
         container.getContainerState();
         container.getContainerState();
     if (currentState != org.apache.hadoop.yarn.server.
     if (currentState != org.apache.hadoop.yarn.server.
-        nodemanager.containermanager.container.ContainerState.RUNNING) {
+            nodemanager.containermanager.container.ContainerState.RUNNING &&
+        currentState != org.apache.hadoop.yarn.server.
+            nodemanager.containermanager.container.ContainerState.SCHEDULED) {
       throw RPCUtil.getRemoteException("Container " + containerId.toString()
       throw RPCUtil.getRemoteException("Container " + containerId.toString()
           + " is in " + currentState.name() + " state."
           + " is in " + currentState.name() + " state."
           + " Resource can only be changed when a container is in"
           + " Resource can only be changed when a container is in"
-          + " RUNNING state");
+          + " RUNNING or SCHEDULED state");
     }
     }
+
     // Check validity of the target resource.
     // Check validity of the target resource.
     Resource currentResource = container.getResource();
     Resource currentResource = container.getResource();
-    if (currentResource.equals(targetResource)) {
-      LOG.warn("Unable to change resource for container "
-          + containerId.toString()
-          + ". The target resource "
-          + targetResource.toString()
-          + " is the same as the current resource");
-      return;
-    }
-    if (increase && !Resources.fitsIn(currentResource, targetResource)) {
-      throw RPCUtil.getRemoteException("Unable to increase resource for "
-          + "container " + containerId.toString()
-          + ". The target resource "
-          + targetResource.toString()
-          + " is smaller than the current resource "
-          + currentResource.toString());
-    }
-    if (!increase &&
-        (!Resources.fitsIn(Resources.none(), targetResource)
-            || !Resources.fitsIn(targetResource, currentResource))) {
-      throw RPCUtil.getRemoteException("Unable to decrease resource for "
-          + "container " + containerId.toString()
-          + ". The target resource "
-          + targetResource.toString()
-          + " is not smaller than the current resource "
-          + currentResource.toString());
-    }
-    if (increase) {
-      org.apache.hadoop.yarn.api.records.Container increasedContainer =
-          org.apache.hadoop.yarn.api.records.Container.newInstance(
-              containerId, null, null, targetResource, null, null);
+    ExecutionType currentExecType =
+        container.getContainerTokenIdentifier().getExecutionType();
+    boolean isResourceChange = false;
+    boolean isExecTypeUpdate = false;
+    Resource targetResource = containerTokenIdentifier.getResource();
+    ExecutionType targetExecType = containerTokenIdentifier.getExecutionType();
+
+    // Is true if either the resources has increased or execution type
+    // updated from opportunistic to guaranteed
+    boolean isIncrease = false;
+    if (!currentResource.equals(targetResource)) {
+      isResourceChange = true;
+      isIncrease = Resources.fitsIn(currentResource, targetResource)
+          && !Resources.fitsIn(targetResource, currentResource);
+    } else if (!currentExecType.equals(targetExecType)) {
+      isExecTypeUpdate = true;
+      isIncrease = currentExecType == ExecutionType.OPPORTUNISTIC &&
+          targetExecType == ExecutionType.GUARANTEED;
+    }
+    if (isIncrease) {
+      org.apache.hadoop.yarn.api.records.Container increasedContainer = null;
+      if (isResourceChange) {
+        increasedContainer =
+            org.apache.hadoop.yarn.api.records.Container.newInstance(
+                containerId, null, null, targetResource, null, null,
+                currentExecType);
+      } else {
+        increasedContainer =
+            org.apache.hadoop.yarn.api.records.Container.newInstance(
+                containerId, null, null, currentResource, null, null,
+                targetExecType);
+      }
       if (context.getIncreasedContainers().putIfAbsent(containerId,
       if (context.getIncreasedContainers().putIfAbsent(containerId,
           increasedContainer) != null){
           increasedContainer) != null){
         throw RPCUtil.getRemoteException("Container " + containerId.toString()
         throw RPCUtil.getRemoteException("Container " + containerId.toString()
-            + " resource is being increased.");
+            + " resource is being increased -or- " +
+            "is undergoing ExecutionType promoted.");
       }
       }
     }
     }
     this.readLock.lock();
     this.readLock.lock();
     try {
     try {
       if (!serviceStopped) {
       if (!serviceStopped) {
-        // Persist container resource change for recovery
-        this.context.getNMStateStore().storeContainerResourceChanged(
-            containerId, containerVersion, targetResource);
-        getContainersMonitor().handle(
-            new ChangeMonitoringContainerResourceEvent(
-                containerId, targetResource));
+        // Dispatch message to ContainerScheduler to actually
+        // make the change.
+        dispatcher.getEventHandler().handle(new UpdateContainerSchedulerEvent(
+            container, containerTokenIdentifier, isResourceChange,
+            isExecTypeUpdate, isIncrease));
       } else {
       } else {
         throw new YarnException(
         throw new YarnException(
             "Unable to change container resource as the NodeManager is "
             "Unable to change container resource as the NodeManager is "
@@ -1484,8 +1513,11 @@ public class ContainerManagerImpl extends CompositeService implements
       for (org.apache.hadoop.yarn.api.records.Container container
       for (org.apache.hadoop.yarn.api.records.Container container
           : containersDecreasedEvent.getContainersToDecrease()) {
           : containersDecreasedEvent.getContainersToDecrease()) {
         try {
         try {
-          changeContainerResourceInternal(container.getId(),
-              container.getVersion(), container.getResource(), false);
+          ContainerTokenIdentifier containerTokenIdentifier =
+              BuilderUtils.newContainerTokenIdentifier(
+                  container.getContainerToken());
+          updateContainerInternal(container.getId(),
+              containerTokenIdentifier);
         } catch (YarnException e) {
         } catch (YarnException e) {
           LOG.error("Unable to decrease container resource", e);
           LOG.error("Unable to decrease container resource", e);
         } catch (IOException e) {
         } catch (IOException e) {

+ 2 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/Container.java

@@ -38,10 +38,10 @@ public interface Container extends EventHandler<ContainerEvent> {
 
 
   Resource getResource();
   Resource getResource();
 
 
-  void setResource(Resource targetResource);
-
   ContainerTokenIdentifier getContainerTokenIdentifier();
   ContainerTokenIdentifier getContainerTokenIdentifier();
 
 
+  void setContainerTokenIdentifier(ContainerTokenIdentifier token);
+
   String getUser();
   String getUser();
   
   
   ContainerState getContainerState();
   ContainerState getContainerState();

+ 17 - 20
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java

@@ -147,9 +147,8 @@ public class ContainerImpl implements Container {
   private final Credentials credentials;
   private final Credentials credentials;
   private final NodeManagerMetrics metrics;
   private final NodeManagerMetrics metrics;
   private volatile ContainerLaunchContext launchContext;
   private volatile ContainerLaunchContext launchContext;
-  private final ContainerTokenIdentifier containerTokenIdentifier;
+  private volatile ContainerTokenIdentifier containerTokenIdentifier;
   private final ContainerId containerId;
   private final ContainerId containerId;
-  private volatile Resource resource;
   private final String user;
   private final String user;
   private int version;
   private int version;
   private int exitCode = ContainerExitStatus.INVALID;
   private int exitCode = ContainerExitStatus.INVALID;
@@ -200,7 +199,6 @@ public class ContainerImpl implements Container {
         YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
         YarnConfiguration.DEFAULT_NM_CONTAINER_DIAGNOSTICS_MAXIMUM_SIZE);
     this.containerTokenIdentifier = containerTokenIdentifier;
     this.containerTokenIdentifier = containerTokenIdentifier;
     this.containerId = containerTokenIdentifier.getContainerID();
     this.containerId = containerTokenIdentifier.getContainerID();
-    this.resource = containerTokenIdentifier.getResource();
     this.diagnostics = new StringBuilder();
     this.diagnostics = new StringBuilder();
     this.credentials = creds;
     this.credentials = creds;
     this.metrics = metrics;
     this.metrics = metrics;
@@ -267,13 +265,6 @@ public class ContainerImpl implements Container {
     this.exitCode = rcs.getExitCode();
     this.exitCode = rcs.getExitCode();
     this.recoveredAsKilled = rcs.getKilled();
     this.recoveredAsKilled = rcs.getKilled();
     this.diagnostics.append(rcs.getDiagnostics());
     this.diagnostics.append(rcs.getDiagnostics());
-    Resource recoveredCapability = rcs.getCapability();
-    if (recoveredCapability != null
-        && !this.resource.equals(recoveredCapability)) {
-      // resource capability had been updated before NM was down
-      this.resource = Resource.newInstance(recoveredCapability.getMemorySize(),
-          recoveredCapability.getVirtualCores());
-    }
     this.version = rcs.getVersion();
     this.version = rcs.getVersion();
     this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
     this.remainingRetryAttempts = rcs.getRemainingRetryAttempts();
     this.workDir = rcs.getWorkDir();
     this.workDir = rcs.getWorkDir();
@@ -634,14 +625,8 @@ public class ContainerImpl implements Container {
 
 
   @Override
   @Override
   public Resource getResource() {
   public Resource getResource() {
-    return Resources.clone(this.resource);
-  }
-
-  @Override
-  public void setResource(Resource targetResource) {
-    Resource currentResource = getResource();
-    this.resource = Resources.clone(targetResource);
-    this.metrics.changeContainer(currentResource, targetResource);
+    return Resources.clone(
+        this.containerTokenIdentifier.getResource());
   }
   }
 
 
   @Override
   @Override
@@ -654,6 +639,16 @@ public class ContainerImpl implements Container {
     }
     }
   }
   }
 
 
+  @Override
+  public void setContainerTokenIdentifier(ContainerTokenIdentifier token) {
+    this.writeLock.lock();
+    try {
+      this.containerTokenIdentifier = token;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
   @Override
   @Override
   public String getWorkDir() {
   public String getWorkDir() {
     return workDir;
     return workDir;
@@ -829,7 +824,8 @@ public class ContainerImpl implements Container {
             AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
             AuditConstants.FINISH_KILLED_CONTAINER, "ContainerImpl",
             container.containerId.getApplicationAttemptId().getApplicationId(),
             container.containerId.getApplicationAttemptId().getApplicationId(),
             container.containerId);
             container.containerId);
-        container.metrics.releaseContainer(container.resource);
+        container.metrics.releaseContainer(
+            container.containerTokenIdentifier.getResource());
         container.sendFinishedEvents();
         container.sendFinishedEvents();
         return ContainerState.DONE;
         return ContainerState.DONE;
       }
       }
@@ -1513,7 +1509,8 @@ public class ContainerImpl implements Container {
     @Override
     @Override
     @SuppressWarnings("unchecked")
     @SuppressWarnings("unchecked")
     public void transition(ContainerImpl container, ContainerEvent event) {
     public void transition(ContainerImpl container, ContainerEvent event) {
-      container.metrics.releaseContainer(container.resource);
+      container.metrics.releaseContainer(
+          container.containerTokenIdentifier.getResource());
       if (container.containerMetrics != null) {
       if (container.containerMetrics != null) {
         container.containerMetrics
         container.containerMetrics
             .recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);
             .recordFinishTimeAndExitCode(clock.getTime(), container.exitCode);

+ 0 - 24
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/ContainersMonitorImpl.java

@@ -618,19 +618,6 @@ public class ContainersMonitorImpl extends AbstractService implements
     }
     }
   }
   }
 
 
-  private void changeContainerResource(
-      ContainerId containerId, Resource resource) {
-    Container container = context.getContainers().get(containerId);
-    // Check container existence
-    if (container == null) {
-      LOG.warn("Container " + containerId.toString() + "does not exist");
-      return;
-    }
-    // YARN-5860: Route this through the ContainerScheduler to
-    //       fix containerAllocation
-    container.setResource(resource);
-  }
-
   private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
   private void updateContainerMetrics(ContainersMonitorEvent monitoringEvent) {
     if (!containerMetricsEnabled || monitoringEvent == null) {
     if (!containerMetricsEnabled || monitoringEvent == null) {
       return;
       return;
@@ -743,16 +730,6 @@ public class ContainersMonitorImpl extends AbstractService implements
   @SuppressWarnings("unchecked")
   @SuppressWarnings("unchecked")
   public void handle(ContainersMonitorEvent monitoringEvent) {
   public void handle(ContainersMonitorEvent monitoringEvent) {
     ContainerId containerId = monitoringEvent.getContainerId();
     ContainerId containerId = monitoringEvent.getContainerId();
-    if (!containersMonitorEnabled) {
-      if (monitoringEvent.getType() == ContainersMonitorEventType
-          .CHANGE_MONITORING_CONTAINER_RESOURCE) {
-        // Nothing to enforce. Update container resource immediately.
-        ChangeMonitoringContainerResourceEvent changeEvent =
-            (ChangeMonitoringContainerResourceEvent) monitoringEvent;
-        changeContainerResource(containerId, changeEvent.getResource());
-      }
-      return;
-    }
 
 
     switch (monitoringEvent.getType()) {
     switch (monitoringEvent.getType()) {
     case START_MONITORING_CONTAINER:
     case START_MONITORING_CONTAINER:
@@ -786,7 +763,6 @@ public class ContainersMonitorImpl extends AbstractService implements
     long vmemLimit = (long) (pmemLimit * vmemRatio);
     long vmemLimit = (long) (pmemLimit * vmemRatio);
     int cpuVcores = changeEvent.getResource().getVirtualCores();
     int cpuVcores = changeEvent.getResource().getVirtualCores();
     processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
     processTreeInfo.setResourceLimit(pmemLimit, vmemLimit, cpuVcores);
-    changeContainerResource(containerId, changeEvent.getResource());
   }
   }
 
 
   protected void onStopMonitoringContainer(
   protected void onStopMonitoringContainer(

+ 73 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerScheduler.java

@@ -31,6 +31,9 @@ import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor
+    .ChangeMonitoringContainerResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.monitor.ContainersMonitor;
 
 
 
 
@@ -136,6 +139,13 @@ public class ContainerScheduler extends AbstractService implements
     case CONTAINER_COMPLETED:
     case CONTAINER_COMPLETED:
       onContainerCompleted(event.getContainer());
       onContainerCompleted(event.getContainer());
       break;
       break;
+    case UPDATE_CONTAINER:
+      if (event instanceof UpdateContainerSchedulerEvent) {
+        onUpdateContainer((UpdateContainerSchedulerEvent) event);
+      } else {
+        LOG.error("Unknown event type on UpdateCOntainer: " + event.getType());
+      }
+      break;
     case SHED_QUEUED_CONTAINERS:
     case SHED_QUEUED_CONTAINERS:
       shedQueuedOpportunisticContainers();
       shedQueuedOpportunisticContainers();
       break;
       break;
@@ -145,6 +155,69 @@ public class ContainerScheduler extends AbstractService implements
     }
     }
   }
   }
 
 
+  /**
+   * We assume that the ContainerManager has already figured out what kind
+   * of update this is.
+   */
+  private void onUpdateContainer(UpdateContainerSchedulerEvent updateEvent) {
+    ContainerId containerId = updateEvent.getContainer().getContainerId();
+    if (updateEvent.isResourceChange()) {
+      if (runningContainers.containsKey(containerId)) {
+        this.utilizationTracker.subtractContainerResource(
+            updateEvent.getContainer());
+        updateEvent.getContainer().setContainerTokenIdentifier(
+            updateEvent.getUpdatedToken());
+        this.utilizationTracker.addContainerResources(
+            updateEvent.getContainer());
+        getContainersMonitor().handle(
+            new ChangeMonitoringContainerResourceEvent(containerId,
+                updateEvent.getUpdatedToken().getResource()));
+      } else {
+        updateEvent.getContainer().setContainerTokenIdentifier(
+            updateEvent.getUpdatedToken());
+      }
+      try {
+        // Persist change in the state store.
+        this.context.getNMStateStore().storeContainerResourceChanged(
+            containerId,
+            updateEvent.getUpdatedToken().getVersion(),
+            updateEvent.getUpdatedToken().getResource());
+      } catch (IOException e) {
+        LOG.warn("Could not store container [" + containerId + "] resource " +
+            "change..", e);
+      }
+    }
+
+    if (updateEvent.isExecTypeUpdate()) {
+      updateEvent.getContainer().setContainerTokenIdentifier(
+          updateEvent.getUpdatedToken());
+      // If this is a running container.. just change the execution type
+      // and be done with it.
+      if (!runningContainers.containsKey(containerId)) {
+        // Promotion or not (Increase signifies either a promotion
+        // or container size increase)
+        if (updateEvent.isIncrease()) {
+          // Promotion of queued container..
+          if (queuedOpportunisticContainers.remove(containerId) != null) {
+            queuedGuaranteedContainers.put(containerId,
+                updateEvent.getContainer());
+          }
+          //Kill opportunistic containers if any to make room for
+          // promotion request
+          killOpportunisticContainers(updateEvent.getContainer());
+        } else {
+          // Demotion of queued container.. Should not happen too often
+          // since you should not find too many queued guaranteed
+          // containers
+          if (queuedGuaranteedContainers.remove(containerId) != null) {
+            queuedOpportunisticContainers.put(containerId,
+                updateEvent.getContainer());
+          }
+        }
+      }
+    }
+  }
+
   /**
   /**
    * Return number of queued containers.
    * Return number of queued containers.
    * @return Number of queued containers.
    * @return Number of queued containers.

+ 1 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/ContainerSchedulerEventType.java

@@ -24,6 +24,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
 public enum ContainerSchedulerEventType {
 public enum ContainerSchedulerEventType {
   SCHEDULE_CONTAINER,
   SCHEDULE_CONTAINER,
   CONTAINER_COMPLETED,
   CONTAINER_COMPLETED,
+  UPDATE_CONTAINER,
   // Producer: Node HB response - RM has asked to shed the queue
   // Producer: Node HB response - RM has asked to shed the queue
   SHED_QUEUED_CONTAINERS,
   SHED_QUEUED_CONTAINERS,
 }
 }

+ 85 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/UpdateContainerSchedulerEvent.java

@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.nodemanager.containermanager.scheduler;
+
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container
+    .Container;
+/**
+ * Update Event consumed by the {@link ContainerScheduler}.
+ */
+public class UpdateContainerSchedulerEvent extends ContainerSchedulerEvent {
+
+  private ContainerTokenIdentifier updatedToken;
+  private boolean isResourceChange;
+  private boolean isExecTypeUpdate;
+  private boolean isIncrease;
+
+  /**
+   * Create instance of Event.
+   *
+   * @param originalContainer Original Container.
+   * @param updatedToken Updated Container Token.
+   * @param isResourceChange is this a Resource Change.
+   * @param isExecTypeUpdate is this an ExecTypeUpdate.
+   * @param isIncrease is this a Container Increase.
+   */
+  public UpdateContainerSchedulerEvent(Container originalContainer,
+      ContainerTokenIdentifier updatedToken, boolean isResourceChange,
+      boolean isExecTypeUpdate, boolean isIncrease) {
+    super(originalContainer, ContainerSchedulerEventType.UPDATE_CONTAINER);
+    this.updatedToken = updatedToken;
+    this.isResourceChange = isResourceChange;
+    this.isExecTypeUpdate = isExecTypeUpdate;
+    this.isIncrease = isIncrease;
+  }
+
+  /**
+   * Update Container Token.
+   *
+   * @return Container Token.
+   */
+  public ContainerTokenIdentifier getUpdatedToken() {
+    return updatedToken;
+  }
+
+  /**
+   * isResourceChange.
+   * @return isResourceChange.
+   */
+  public boolean isResourceChange() {
+    return isResourceChange;
+  }
+
+  /**
+   * isExecTypeUpdate.
+   * @return isExecTypeUpdate.
+   */
+  public boolean isExecTypeUpdate() {
+    return isExecTypeUpdate;
+  }
+
+  /**
+   * isIncrease.
+   * @return isIncrease.
+   */
+  public boolean isIncrease() {
+    return isIncrease;
+  }
+}

+ 10 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java

@@ -682,7 +682,7 @@ public class TestNodeManagerResync {
         try{
         try{
           try {
           try {
             updateBarrier.await();
             updateBarrier.await();
-            increaseTokens.add(getContainerToken(targetResource));
+            increaseTokens.add(getContainerToken(targetResource, 1));
             ContainerUpdateRequest updateRequest =
             ContainerUpdateRequest updateRequest =
                 ContainerUpdateRequest.newInstance(increaseTokens);
                 ContainerUpdateRequest.newInstance(increaseTokens);
             ContainerUpdateResponse updateResponse =
             ContainerUpdateResponse updateResponse =
@@ -710,6 +710,15 @@ public class TestNodeManagerResync {
           getNMContext().getNodeId(), user, resource,
           getNMContext().getNodeId(), user, resource,
           getNMContext().getContainerTokenSecretManager(), null);
           getNMContext().getContainerTokenSecretManager(), null);
     }
     }
+
+    private Token getContainerToken(Resource resource, int version)
+        throws IOException {
+      ContainerId cId = TestContainerManager.createContainerId(0);
+      return TestContainerManager.createContainerToken(
+          cId, version, DUMMY_RM_IDENTIFIER,
+          getNMContext().getNodeId(), user, resource,
+          getNMContext().getContainerTokenSecretManager(), null);
+    }
   }
   }
 
 
   public static NMContainerStatus createNMContainerStatus(int id,
   public static NMContainerStatus createNMContainerStatus(int id,

+ 31 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java

@@ -421,6 +421,20 @@ public abstract class BaseContainerManagerTest {
             containerTokenIdentifier);
             containerTokenIdentifier);
   }
   }
 
 
+  public static Token createContainerToken(ContainerId cId, int version,
+      long rmIdentifier, NodeId nodeId, String user, Resource resource,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext) throws IOException {
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(cId, version, nodeId.toString(), user,
+            resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+            Priority.newInstance(0), 0, logAggregationContext, null,
+            ContainerType.TASK, ExecutionType.GUARANTEED);
+    return BuilderUtils.newContainerToken(nodeId,
+        containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
+        containerTokenIdentifier);
+  }
+
   public static Token createContainerToken(ContainerId cId, long rmIdentifier,
   public static Token createContainerToken(ContainerId cId, long rmIdentifier,
       NodeId nodeId, String user, Resource resource,
       NodeId nodeId, String user, Resource resource,
       NMContainerTokenSecretManager containerTokenSecretManager,
       NMContainerTokenSecretManager containerTokenSecretManager,
@@ -431,8 +445,23 @@ public abstract class BaseContainerManagerTest {
             System.currentTimeMillis() + 100000L, 123, rmIdentifier,
             System.currentTimeMillis() + 100000L, 123, rmIdentifier,
             Priority.newInstance(0), 0, logAggregationContext, null,
             Priority.newInstance(0), 0, logAggregationContext, null,
             ContainerType.TASK, executionType);
             ContainerType.TASK, executionType);
-    return BuilderUtils.newContainerToken(nodeId, containerTokenSecretManager
-            .retrievePassword(containerTokenIdentifier),
+    return BuilderUtils.newContainerToken(nodeId,
+        containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
+        containerTokenIdentifier);
+  }
+
+  public static Token createContainerToken(ContainerId cId, int version,
+      long rmIdentifier, NodeId nodeId, String user, Resource resource,
+      NMContainerTokenSecretManager containerTokenSecretManager,
+      LogAggregationContext logAggregationContext, ExecutionType executionType)
+      throws IOException {
+    ContainerTokenIdentifier containerTokenIdentifier =
+        new ContainerTokenIdentifier(cId, version, nodeId.toString(), user,
+            resource, System.currentTimeMillis() + 100000L, 123, rmIdentifier,
+            Priority.newInstance(0), 0, logAggregationContext, null,
+            ContainerType.TASK, executionType);
+    return BuilderUtils.newContainerToken(nodeId,
+        containerTokenSecretManager.retrievePassword(containerTokenIdentifier),
         containerTokenIdentifier);
         containerTokenIdentifier);
   }
   }
 
 

+ 167 - 100
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java

@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager.containermanager;
 
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verify;
 
 
@@ -70,6 +71,7 @@ import org.apache.hadoop.yarn.api.records.ContainerRetryContext;
 import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.ContainerRetryPolicy;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
@@ -80,14 +82,15 @@ import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ConfigurationException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
 import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
-import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
 import org.apache.hadoop.yarn.server.nodemanager.CMgrSignalContainersEvent;
+import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.Signal;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
@@ -100,6 +103,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.Conta
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
 import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerSignalContext;
+import org.apache.hadoop.yarn.server.nodemanager.executor.ContainerStartContext;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Before;
@@ -116,10 +120,34 @@ public class TestContainerManager extends BaseContainerManagerTest {
   static {
   static {
     LOG = LogFactory.getLog(TestContainerManager.class);
     LOG = LogFactory.getLog(TestContainerManager.class);
   }
   }
-  
+
+  private boolean delayContainers = false;
+
+  @Override
+  protected ContainerExecutor createContainerExecutor() {
+    DefaultContainerExecutor exec = new DefaultContainerExecutor() {
+      @Override
+      public int launchContainer(ContainerStartContext ctx)
+          throws IOException, ConfigurationException {
+        if (delayContainers) {
+          try {
+            Thread.sleep(10000);
+          } catch (InterruptedException e) {
+            // Nothing..
+          }
+        }
+        return super.launchContainer(ctx);
+      }
+    };
+    exec.setConf(conf);
+    return spy(exec);
+  }
+
   @Override
   @Override
   @Before
   @Before
   public void setup() throws IOException {
   public void setup() throws IOException {
+    conf.setInt(
+        YarnConfiguration.NM_OPPORTUNISTIC_CONTAINERS_MAX_QUEUE_LENGTH, 10);
     super.setup();
     super.setup();
   }
   }
   
   
@@ -1468,7 +1496,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     Assert.assertEquals(strExceptionMsg,
     Assert.assertEquals(strExceptionMsg,
         ContainerManagerImpl.INVALID_NMTOKEN_MSG);
         ContainerManagerImpl.INVALID_NMTOKEN_MSG);
 
 
-    ContainerManagerImpl spyContainerMgr = Mockito.spy(cMgrImpl);
+    ContainerManagerImpl spyContainerMgr = spy(cMgrImpl);
     UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a");
     UserGroupInformation ugInfo = UserGroupInformation.createRemoteUser("a");
     Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo);
     Mockito.when(spyContainerMgr.getRemoteUgi()).thenReturn(ugInfo);
     Mockito.when(spyContainerMgr.
     Mockito.when(spyContainerMgr.
@@ -1543,7 +1571,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     // container will have exited, and won't be in RUNNING state
     // container will have exited, and won't be in RUNNING state
     ContainerId cId0 = createContainerId(0);
     ContainerId cId0 = createContainerId(0);
     Token containerToken =
     Token containerToken =
-        createContainerToken(cId0, DUMMY_RM_IDENTIFIER,
+        createContainerToken(cId0, 1, DUMMY_RM_IDENTIFIER,
             context.getNodeId(), user,
             context.getNodeId(), user,
                 Resource.newInstance(1234, 3),
                 Resource.newInstance(1234, 3),
                     context.getContainerTokenSecretManager(), null);
                     context.getContainerTokenSecretManager(), null);
@@ -1572,7 +1600,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
       if (cId0.equals(entry.getKey())) {
       if (cId0.equals(entry.getKey())) {
         Assert.assertTrue(entry.getValue().getMessage()
         Assert.assertTrue(entry.getValue().getMessage()
           .contains("Resource can only be changed when a "
           .contains("Resource can only be changed when a "
-              + "container is in RUNNING state"));
+              + "container is in RUNNING or SCHEDULED state"));
       } else if (cId7.equals(entry.getKey())) {
       } else if (cId7.equals(entry.getKey())) {
         Assert.assertTrue(entry.getValue().getMessage()
         Assert.assertTrue(entry.getValue().getMessage()
             .contains("Container " + cId7.toString()
             .contains("Container " + cId7.toString()
@@ -1584,89 +1612,6 @@ public class TestContainerManager extends BaseContainerManagerTest {
     }
     }
   }
   }
 
 
-  @Test
-  public void testIncreaseContainerResourceWithInvalidResource() throws Exception {
-    containerManager.start();
-    File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
-    PrintWriter fileWriter = new PrintWriter(scriptFile);
-    // Construct the Container-id
-    ContainerId cId = createContainerId(0);
-    if (Shell.WINDOWS) {
-      fileWriter.println("@ping -n 100 127.0.0.1 >nul");
-    } else {
-      fileWriter.write("\numask 0");
-      fileWriter.write("\nexec sleep 100");
-    }
-    fileWriter.close();
-    ContainerLaunchContext containerLaunchContext =
-        recordFactory.newRecordInstance(ContainerLaunchContext.class);
-    URL resource_alpha =
-        URL.fromPath(localFS
-            .makeQualified(new Path(scriptFile.getAbsolutePath())));
-    LocalResource rsrc_alpha =
-        recordFactory.newRecordInstance(LocalResource.class);
-    rsrc_alpha.setResource(resource_alpha);
-    rsrc_alpha.setSize(-1);
-    rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
-    rsrc_alpha.setType(LocalResourceType.FILE);
-    rsrc_alpha.setTimestamp(scriptFile.lastModified());
-    String destinationFile = "dest_file";
-    Map<String, LocalResource> localResources =
-        new HashMap<String, LocalResource>();
-    localResources.put(destinationFile, rsrc_alpha);
-    containerLaunchContext.setLocalResources(localResources);
-    List<String> commands =
-        Arrays.asList(Shell.getRunScriptCommand(scriptFile));
-    containerLaunchContext.setCommands(commands);
-
-    StartContainerRequest scRequest =
-        StartContainerRequest.newInstance(
-            containerLaunchContext,
-            createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
-            user, context.getContainerTokenSecretManager()));
-    List<StartContainerRequest> list = new ArrayList<>();
-    list.add(scRequest);
-    StartContainersRequest allRequests =
-        StartContainersRequest.newInstance(list);
-    containerManager.startContainers(allRequests);
-    // Make sure the container reaches RUNNING state
-    BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
-        org.apache.hadoop.yarn.server.nodemanager.
-            containermanager.container.ContainerState.RUNNING);
-    // Construct container resource increase request,
-    List<Token> increaseTokens = new ArrayList<>();
-    // Add increase request. The increase request should fail
-    // as the current resource does not fit in the target resource
-    Token containerToken =
-        createContainerToken(cId, DUMMY_RM_IDENTIFIER,
-            context.getNodeId(), user,
-            Resource.newInstance(512, 1),
-            context.getContainerTokenSecretManager(), null);
-    increaseTokens.add(containerToken);
-    ContainerUpdateRequest updateRequest =
-        ContainerUpdateRequest.newInstance(increaseTokens);
-    ContainerUpdateResponse updateResponse =
-        containerManager.updateContainer(updateRequest);
-    // Check response
-    Assert.assertEquals(
-        0, updateResponse.getSuccessfullyUpdatedContainers().size());
-    Assert.assertEquals(1, updateResponse.getFailedRequests().size());
-    for (Map.Entry<ContainerId, SerializedException> entry : updateResponse
-        .getFailedRequests().entrySet()) {
-      if (cId.equals(entry.getKey())) {
-        Assert.assertNotNull("Failed message", entry.getValue().getMessage());
-        Assert.assertTrue(entry.getValue().getMessage()
-            .contains("The target resource "
-                + Resource.newInstance(512, 1).toString()
-                + " is smaller than the current resource "
-                + Resource.newInstance(1024, 1)));
-      } else {
-        throw new YarnException("Received failed request from wrong"
-            + " container: " + entry.getKey().toString());
-      }
-    }
-  }
-
   @Test
   @Test
   public void testChangeContainerResource() throws Exception {
   public void testChangeContainerResource() throws Exception {
     containerManager.start();
     containerManager.start();
@@ -1720,7 +1665,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     List<Token> increaseTokens = new ArrayList<>();
     List<Token> increaseTokens = new ArrayList<>();
     // Add increase request.
     // Add increase request.
     Resource targetResource = Resource.newInstance(4096, 2);
     Resource targetResource = Resource.newInstance(4096, 2);
-    Token containerToken = createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+    Token containerToken = createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER,
         context.getNodeId(), user, targetResource,
         context.getNodeId(), user, targetResource,
             context.getContainerTokenSecretManager(), null);
             context.getContainerTokenSecretManager(), null);
     increaseTokens.add(containerToken);
     increaseTokens.add(containerToken);
@@ -1741,15 +1686,19 @@ public class TestContainerManager extends BaseContainerManagerTest {
     // Check status immediately as resource increase is blocking
     // Check status immediately as resource increase is blocking
     assertEquals(targetResource, containerStatus.getCapability());
     assertEquals(targetResource, containerStatus.getCapability());
     // Simulate a decrease request
     // Simulate a decrease request
-    List<org.apache.hadoop.yarn.api.records.Container> containersToDecrease
-        = new ArrayList<>();
+    List<Token> decreaseTokens = new ArrayList<>();
     targetResource = Resource.newInstance(2048, 2);
     targetResource = Resource.newInstance(2048, 2);
-    org.apache.hadoop.yarn.api.records.Container decreasedContainer =
-        org.apache.hadoop.yarn.api.records.Container
-            .newInstance(cId, null, null, targetResource, null, null);
-    containersToDecrease.add(decreasedContainer);
-    containerManager.handle(
-        new CMgrDecreaseContainersResourceEvent(containersToDecrease));
+    Token token = createContainerToken(cId, 2, DUMMY_RM_IDENTIFIER,
+        context.getNodeId(), user, targetResource,
+        context.getContainerTokenSecretManager(), null);
+    decreaseTokens.add(token);
+    updateRequest = ContainerUpdateRequest.newInstance(decreaseTokens);
+    updateResponse = containerManager.updateContainer(updateRequest);
+
+    Assert.assertEquals(
+        1, updateResponse.getSuccessfullyUpdatedContainers().size());
+    Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
+
     // Check status with retry
     // Check status with retry
     containerStatus = containerManager
     containerStatus = containerManager
         .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
         .getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
@@ -1879,7 +1828,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     ContainerLaunchContext containerLaunchContext =
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
     ContainerLaunchContext spyContainerLaunchContext =
     ContainerLaunchContext spyContainerLaunchContext =
-        Mockito.spy(containerLaunchContext);
+        spy(containerLaunchContext);
     Mockito.when(spyContainerLaunchContext.getLocalResources())
     Mockito.when(spyContainerLaunchContext.getLocalResources())
         .thenReturn(localResources);
         .thenReturn(localResources);
 
 
@@ -1924,7 +1873,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     ContainerLaunchContext containerLaunchContext =
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
     ContainerLaunchContext spyContainerLaunchContext =
     ContainerLaunchContext spyContainerLaunchContext =
-        Mockito.spy(containerLaunchContext);
+        spy(containerLaunchContext);
     Mockito.when(spyContainerLaunchContext.getLocalResources())
     Mockito.when(spyContainerLaunchContext.getLocalResources())
         .thenReturn(localResources);
         .thenReturn(localResources);
 
 
@@ -1969,7 +1918,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
     ContainerLaunchContext containerLaunchContext =
     ContainerLaunchContext containerLaunchContext =
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
         recordFactory.newRecordInstance(ContainerLaunchContext.class);
     ContainerLaunchContext spyContainerLaunchContext =
     ContainerLaunchContext spyContainerLaunchContext =
-        Mockito.spy(containerLaunchContext);
+        spy(containerLaunchContext);
     Mockito.when(spyContainerLaunchContext.getLocalResources())
     Mockito.when(spyContainerLaunchContext.getLocalResources())
         .thenReturn(localResources);
         .thenReturn(localResources);
 
 
@@ -1996,4 +1945,122 @@ public class TestContainerManager extends BaseContainerManagerTest {
     Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
     Assert.assertTrue(response.getFailedRequests().get(cId).getMessage()
         .contains("Null resource visibility for local resource"));
         .contains("Null resource visibility for local resource"));
   }
   }
+
+  @Test
+  public void testContainerUpdateExecTypeOpportunisticToGuaranteed()
+      throws IOException, YarnException, InterruptedException {
+    delayContainers = true;
+    containerManager.start();
+    // Construct the Container-id
+    ContainerId cId = createContainerId(0);
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(
+            containerLaunchContext,
+            createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+                context.getNodeId(), user, BuilderUtils.newResource(512, 1),
+                context.getContainerTokenSecretManager(), null,
+                ExecutionType.OPPORTUNISTIC));
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+    // Make sure the container reaches RUNNING state
+    BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
+        org.apache.hadoop.yarn.server.nodemanager.
+            containermanager.container.ContainerState.RUNNING);
+    // Construct container resource increase request,
+    List<Token> updateTokens = new ArrayList<>();
+    Token containerToken =
+        createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED);
+    updateTokens.add(containerToken);
+    ContainerUpdateRequest updateRequest =
+        ContainerUpdateRequest.newInstance(updateTokens);
+    ContainerUpdateResponse updateResponse =
+        containerManager.updateContainer(updateRequest);
+
+    Assert.assertEquals(
+        1, updateResponse.getSuccessfullyUpdatedContainers().size());
+    Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
+
+    //Make sure the container is running
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    statList.add(cId);
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    Assert.assertEquals(1, containerStatuses.size());
+    for (ContainerStatus status : containerStatuses) {
+      Assert.assertEquals(
+          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+          status.getState());
+      Assert.assertEquals(ExecutionType.GUARANTEED, status.getExecutionType());
+    }
+  }
+
+  @Test
+  public void testContainerUpdateExecTypeGuaranteedToOpportunistic()
+      throws IOException, YarnException, InterruptedException {
+    delayContainers = true;
+    containerManager.start();
+    // Construct the Container-id
+    ContainerId cId = createContainerId(0);
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    StartContainerRequest scRequest =
+        StartContainerRequest.newInstance(
+            containerLaunchContext,
+            createContainerToken(cId, DUMMY_RM_IDENTIFIER,
+                context.getNodeId(), user, BuilderUtils.newResource(512, 1),
+                context.getContainerTokenSecretManager(), null));
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(scRequest);
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+    // Make sure the container reaches RUNNING state
+    BaseContainerManagerTest.waitForNMContainerState(containerManager, cId,
+        org.apache.hadoop.yarn.server.nodemanager.
+            containermanager.container.ContainerState.RUNNING);
+    // Construct container resource increase request,
+    List<Token> updateTokens = new ArrayList<>();
+    Token containerToken =
+        createContainerToken(cId, 1, DUMMY_RM_IDENTIFIER, context.getNodeId(),
+            user, BuilderUtils.newResource(512, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC);
+    updateTokens.add(containerToken);
+    ContainerUpdateRequest updateRequest =
+        ContainerUpdateRequest.newInstance(updateTokens);
+    ContainerUpdateResponse updateResponse =
+        containerManager.updateContainer(updateRequest);
+
+    Assert.assertEquals(
+        1, updateResponse.getSuccessfullyUpdatedContainers().size());
+    Assert.assertTrue(updateResponse.getFailedRequests().isEmpty());
+
+    //Make sure the container is running
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    statList.add(cId);
+    GetContainerStatusesRequest statRequest =
+        GetContainerStatusesRequest.newInstance(statList);
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    Assert.assertEquals(1, containerStatuses.size());
+    for (ContainerStatus status : containerStatuses) {
+      Assert.assertEquals(
+          org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+          status.getState());
+      Assert
+          .assertEquals(ExecutionType.OPPORTUNISTIC, status.getExecutionType());
+    }
+  }
 }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java

@@ -638,7 +638,7 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
     final List<Token> increaseTokens = new ArrayList<Token>();
     final List<Token> increaseTokens = new ArrayList<Token>();
     // add increase request
     // add increase request
     Token containerToken = TestContainerManager.createContainerToken(
     Token containerToken = TestContainerManager.createContainerToken(
-        cid, 0, context.getNodeId(), user.getShortUserName(),
+        cid, 1, 0, context.getNodeId(), user.getShortUserName(),
         capability, context.getContainerTokenSecretManager(), null);
         capability, context.getContainerTokenSecretManager(), null);
     increaseTokens.add(containerToken);
     increaseTokens.add(containerToken);
     final ContainerUpdateRequest updateRequest =
     final ContainerUpdateRequest updateRequest =

+ 96 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/scheduler/TestContainerSchedulerQueuing.java

@@ -27,6 +27,8 @@ import java.util.List;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.ContainerUpdateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
@@ -37,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
 import org.apache.hadoop.yarn.api.records.ExecutionType;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.ConfigurationException;
 import org.apache.hadoop.yarn.exceptions.ConfigurationException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -866,4 +869,97 @@ public class TestContainerSchedulerQueuing extends BaseContainerManagerTest {
         map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED)
         map.get(org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED)
             .getContainerId());
             .getContainerId());
   }
   }
+
+  /**
+   * Starts one OPPORTUNISTIC container that takes up the whole node's
+   * resources, and submit one more that will be queued. Now promote the
+   * queued OPPORTUNISTIC container, which should kill the current running
+   * OPPORTUNISTIC container to make room for the promoted request.
+   * @throws Exception
+   */
+  @Test
+  public void testPromotionOfOpportunisticContainers() throws Exception {
+    containerManager.start();
+
+    ContainerLaunchContext containerLaunchContext =
+        recordFactory.newRecordInstance(ContainerLaunchContext.class);
+
+    List<StartContainerRequest> list = new ArrayList<>();
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(0), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(2048, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+    list.add(StartContainerRequest.newInstance(
+        containerLaunchContext,
+        createContainerToken(createContainerId(1), DUMMY_RM_IDENTIFIER,
+            context.getNodeId(),
+            user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.OPPORTUNISTIC)));
+
+    StartContainersRequest allRequests =
+        StartContainersRequest.newInstance(list);
+    containerManager.startContainers(allRequests);
+
+    Thread.sleep(5000);
+
+    // Ensure first container is running and others are queued.
+    List<ContainerId> statList = new ArrayList<ContainerId>();
+    for (int i = 0; i < 3; i++) {
+      statList.add(createContainerId(i));
+    }
+    GetContainerStatusesRequest statRequest = GetContainerStatusesRequest
+        .newInstance(Arrays.asList(createContainerId(0)));
+    List<ContainerStatus> containerStatuses = containerManager
+        .getContainerStatuses(statRequest).getContainerStatuses();
+    for (ContainerStatus status : containerStatuses) {
+      if (status.getContainerId().equals(createContainerId(0))) {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.RUNNING,
+            status.getState());
+      } else {
+        Assert.assertEquals(
+            org.apache.hadoop.yarn.api.records.ContainerState.SCHEDULED,
+            status.getState());
+      }
+    }
+
+    ContainerScheduler containerScheduler =
+        containerManager.getContainerScheduler();
+    // Ensure two containers are properly queued.
+    Assert.assertEquals(1, containerScheduler.getNumQueuedContainers());
+    Assert.assertEquals(0,
+        containerScheduler.getNumQueuedGuaranteedContainers());
+    Assert.assertEquals(1,
+        containerScheduler.getNumQueuedOpportunisticContainers());
+
+    // Promote Queued Opportunistic Container
+    Token updateToken =
+        createContainerToken(createContainerId(1), 1, DUMMY_RM_IDENTIFIER,
+            context.getNodeId(), user, BuilderUtils.newResource(1024, 1),
+            context.getContainerTokenSecretManager(), null,
+            ExecutionType.GUARANTEED);
+    List<Token> updateTokens = new ArrayList<Token>();
+    updateTokens.add(updateToken);
+    ContainerUpdateRequest updateRequest =
+        ContainerUpdateRequest.newInstance(updateTokens);
+    ContainerUpdateResponse updateResponse =
+        containerManager.updateContainer(updateRequest);
+
+    Assert.assertEquals(1,
+        updateResponse.getSuccessfullyUpdatedContainers().size());
+    Assert.assertEquals(0, updateResponse.getFailedRequests().size());
+
+    waitForContainerState(containerManager, createContainerId(0),
+        org.apache.hadoop.yarn.api.records.ContainerState.COMPLETE);
+
+    waitForContainerState(containerManager, createContainerId(1),
+        org.apache.hadoop.yarn.api.records.ContainerState.RUNNING);
+
+    // Ensure no containers are queued.
+    Assert.assertEquals(0, containerScheduler.getNumQueuedContainers());
+  }
 }
 }

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/MockContainer.java

@@ -139,7 +139,7 @@ public class MockContainer implements Container {
   }
   }
 
 
   @Override
   @Override
-  public void setResource(Resource targetResource) {
+  public void setContainerTokenIdentifier(ContainerTokenIdentifier token) {
   }
   }
 
 
   @Override
   @Override

+ 1 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java

@@ -655,7 +655,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
               container.getNodeId(), getUser(), container.getResource(),
               container.getNodeId(), getUser(), container.getResource(),
               container.getPriority(), rmContainer.getCreationTime(),
               container.getPriority(), rmContainer.getCreationTime(),
               this.logAggregationContext, rmContainer.getNodeLabelExpression(),
               this.logAggregationContext, rmContainer.getNodeLabelExpression(),
-              containerType));
+              containerType, container.getExecutionType()));
       updateNMToken(container);
       updateNMToken(container);
     } catch (IllegalArgumentException e) {
     } catch (IllegalArgumentException e) {
       // DNS might be down, skip returning this container.
       // DNS might be down, skip returning this container.

+ 28 - 2
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/RMContainerTokenSecretManager.java

@@ -186,6 +186,31 @@ public class RMContainerTokenSecretManager extends
         null, null, ContainerType.TASK);
         null, null, ContainerType.TASK);
   }
   }
 
 
+  /**
+   * Helper function for creating ContainerTokens.
+   *
+   * @param containerId containerId.
+   * @param containerVersion containerVersion.
+   * @param nodeId nodeId.
+   * @param appSubmitter appSubmitter.
+   * @param capability capability.
+   * @param priority priority.
+   * @param createTime createTime.
+   * @param logAggregationContext logAggregationContext.
+   * @param nodeLabelExpression nodeLabelExpression.
+   * @param containerType containerType.
+   * @return the container-token.
+   */
+  public Token createContainerToken(ContainerId containerId,
+      int containerVersion, NodeId nodeId, String appSubmitter,
+      Resource capability, Priority priority, long createTime,
+      LogAggregationContext logAggregationContext, String nodeLabelExpression,
+      ContainerType containerType) {
+    return createContainerToken(containerId, containerVersion, nodeId,
+        appSubmitter, capability, priority, createTime, null, null,
+        ContainerType.TASK, ExecutionType.GUARANTEED);
+  }
+
   /**
   /**
    * Helper function for creating ContainerTokens
    * Helper function for creating ContainerTokens
    *
    *
@@ -199,13 +224,14 @@ public class RMContainerTokenSecretManager extends
    * @param logAggregationContext Log Aggregation Context
    * @param logAggregationContext Log Aggregation Context
    * @param nodeLabelExpression Node Label Expression
    * @param nodeLabelExpression Node Label Expression
    * @param containerType Container Type
    * @param containerType Container Type
+   * @param execType Execution Type
    * @return the container-token
    * @return the container-token
    */
    */
   public Token createContainerToken(ContainerId containerId,
   public Token createContainerToken(ContainerId containerId,
       int containerVersion, NodeId nodeId, String appSubmitter,
       int containerVersion, NodeId nodeId, String appSubmitter,
       Resource capability, Priority priority, long createTime,
       Resource capability, Priority priority, long createTime,
       LogAggregationContext logAggregationContext, String nodeLabelExpression,
       LogAggregationContext logAggregationContext, String nodeLabelExpression,
-      ContainerType containerType) {
+      ContainerType containerType, ExecutionType execType) {
     byte[] password;
     byte[] password;
     ContainerTokenIdentifier tokenIdentifier;
     ContainerTokenIdentifier tokenIdentifier;
     long expiryTimeStamp =
     long expiryTimeStamp =
@@ -220,7 +246,7 @@ public class RMContainerTokenSecretManager extends
               this.currentMasterKey.getMasterKey().getKeyId(),
               this.currentMasterKey.getMasterKey().getKeyId(),
               ResourceManager.getClusterTimeStamp(), priority, createTime,
               ResourceManager.getClusterTimeStamp(), priority, createTime,
               logAggregationContext, nodeLabelExpression, containerType,
               logAggregationContext, nodeLabelExpression, containerType,
-              ExecutionType.GUARANTEED);
+              execType);
       password = this.createPassword(tokenIdentifier);
       password = this.createPassword(tokenIdentifier);
 
 
     } finally {
     } finally {