Browse Source

YARN-4934. Reserved Resource for QueueMetrics needs to be handled correctly in few cases. (Sunil G via wangda)
(cherry picked from commit fdc46bfb37776d8c41b68f6c33a2379d0f329994)

Wangda Tan 9 years ago
parent
commit
a87abf9b69

+ 0 - 7
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java

@@ -1441,13 +1441,6 @@ public class LeafQueue extends AbstractCSQueue {
         // Book-keeping
         if (removed) {
 
-          // track reserved resource for metrics, for normal container
-          // getReservedResource will be null.
-          Resource reservedRes = rmContainer.getReservedResource();
-          if (reservedRes != null && !reservedRes.equals(Resources.none())) {
-            decReservedResource(node.getPartition(), reservedRes);
-          }
-
           // Inform the ordering policy
           orderingPolicy.containerReleased(application, rmContainer);
           

+ 2 - 0
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java

@@ -248,6 +248,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       // Update reserved metrics
       queue.getMetrics().unreserveResource(getUser(),
           rmContainer.getReservedResource());
+      queue.decReservedResource(node.getPartition(),
+          rmContainer.getReservedResource());
       return true;
     }
     return false;

+ 187 - 1
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java

@@ -28,6 +28,7 @@ import org.apache.hadoop.security.SecurityUtilTestHelper;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
@@ -37,6 +38,8 @@ import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ContainerType;
+import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@@ -50,8 +53,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@@ -418,5 +426,183 @@ public class TestContainerAllocation {
 
     rm1.close();
   }
-  
+
+  @Test(timeout = 60000)
+  public void testAllocationForReservedContainer() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to a queue. And there's one
+     * node with 8G resource in the cluster. App1 allocates a 6G container, Then
+     * app2 asks for a 4G container. App2's request will be reserved on the
+     * node.
+     *
+     * Before next node heartbeat, app1 container is completed/killed. So app1
+     * container which was reserved will be allocated.
+     */
+    // inject node label manager
+    MockRM rm1 = new MockRM();
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app to queue, AM container should be launched in nm1
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
+
+    // Do node heartbeats 2 times
+    // First time will allocate container for app1, second time will reserve
+    // container for app2
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // App2 will get preference to be allocated on node1, and node1 will be all
+    // used by App2.
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 =
+        cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+    // Check if a 4G container allocated for app1, and nothing allocated for app2
+    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+    Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
+
+    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
+    Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemory());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Usage of queue = 4G + 2 * 1G + 4G (reserved)
+    Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    // Mark one app1 container as killed/completed and re-kick RM
+    for (RMContainer container : schedulerApp1.getLiveContainers()) {
+      if (container.isAMContainer()) {
+        continue;
+      }
+      cs.markContainerForKillable(container);
+    }
+    // Cancel asks of app1 and re-kick RM
+    am1.allocate("*", 4 * GB, 0, new ArrayList<ContainerId>());
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // Check 4G container cancelled for app1, and one container allocated for
+    // app2
+    Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
+
+    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
+    Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemory());
+    Assert.assertNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Usage of queue = 4G + 2 * 1G
+    Assert.assertEquals(6 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    rm1.close();
+  }
+
+  @Test(timeout = 60000)
+  public void testReservedContainerMetricsOnDecommisionedNode() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to a queue. And there's one
+     * node with 8G resource in the cluster. App1 allocates a 6G container, Then
+     * app2 asks for a 4G container. App2's request will be reserved on the
+     * node.
+     *
+     * Before next node heartbeat, app1 container is completed/killed. So app1
+     * container which was reserved will be allocated.
+     */
+    // inject node label manager
+    MockRM rm1 = new MockRM();
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB);
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // launch another app to queue, AM container should be launched in nm1
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "default");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+    am1.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+    am2.allocate("*", 4 * GB, 1, new ArrayList<ContainerId>());
+
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    LeafQueue leafQueue = (LeafQueue) cs.getQueue("default");
+
+    // Do node heartbeats 2 times
+    // First time will allocate container for app1, second time will reserve
+    // container for app2
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+
+    // App2 will get preference to be allocated on node1, and node1 will be all
+    // used by App2.
+    FiCaSchedulerApp schedulerApp1 =
+        cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 =
+        cs.getApplicationAttempt(am2.getApplicationAttemptId());
+
+    // Check if a 4G container allocated for app1, and nothing allocated for app2
+    Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+    Assert.assertTrue(schedulerApp2.getReservedContainers().size() > 0);
+
+    // NM1 has available resource = 2G (8G - 2 * 1G - 4G)
+    Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+        .getAvailableResource().getMemory());
+    Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+    // Usage of queue = 4G + 2 * 1G + 4G (reserved)
+    Assert.assertEquals(10 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(4 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(4 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    // Remove the node
+    cs.handle(new NodeRemovedSchedulerEvent(rmNode1));
+
+    // Check all container cancelled for app1 and app2
+    Assert.assertEquals(0, schedulerApp1.getLiveContainers().size());
+    Assert.assertEquals(0, schedulerApp2.getLiveContainers().size());
+    Assert.assertFalse(schedulerApp2.getReservedContainers().size() > 0);
+
+    // Usage and Reserved capacity of queue is 0
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getUsed().getMemory());
+    Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
+        .getReserved().getMemory());
+    Assert.assertEquals(0 * GB, leafQueue.getQueueResourceUsage().getReserved()
+        .getMemory());
+
+    rm1.close();
+  }
 }