|
@@ -552,6 +552,278 @@ public class TestNodeLabelContainerAllocation {
|
|
|
rm1.close();
|
|
|
}
|
|
|
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testContainerReservationContinueLookingWithLabels()
|
|
|
+ throws Exception {
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
|
|
|
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(
|
|
|
+ TestUtils.getConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+ LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x)
|
|
|
+ RMApp app1 = rm1.submitApp(2 * GB, "app1", "user", null, "a1", "x");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
|
|
|
+ .getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Verify live on node1
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
|
|
+ Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(2 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // request map containers for app1.
|
|
|
+ am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), "x");
|
|
|
+
|
|
|
+ // Do node heartbeat to allocate first mapper on node1
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+
|
|
|
+ // Verify live on node1
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
|
|
+ Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(7 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // Do node heartbeat to allocate second mapper on node2
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ // Verify live on node2
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // node1 7 GB used, node2 5 GB used
|
|
|
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
|
|
+ Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(12 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // request reducer containers for app1.
|
|
|
+ am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), "x");
|
|
|
+
|
|
|
+ // Do node heartbeat to reserve reducer on node1
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+
|
|
|
+ // node1 7 GB used and 3 GB reserved, node2 5 GB used
|
|
|
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
|
|
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(15 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(3 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // Do node heartbeat to allocate container for second reducer on node2
|
|
|
+ // This should unreserve the reserved container
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ // Verify live on node2
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // node1 7 GB used and 0 GB reserved, node2 8 GB used
|
|
|
+ Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
|
|
|
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(15 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testContainerReservationContinueLookingWithDefaultLabels()
|
|
|
+ throws Exception {
|
|
|
+ // This is the same as testContainerReservationContinueLookingWithLabels,
|
|
|
+ // but this test doesn't specify the label expression in the
|
|
|
+ // ResourceRequest, instead it uses default queue label expressions
|
|
|
+ mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
|
|
|
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("x")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(
|
|
|
+ TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 8 * GB); // label = x
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
|
|
|
+ LeafQueue leafQueue = (LeafQueue) cs.getQueue("a1");
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x)
|
|
|
+ RMApp app1 = rm1.submitApp(2 * GB, "app1", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1
|
|
|
+ .getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Verify live on node1
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
|
|
+ Assert.assertEquals(2 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(2 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // request map containers for app1.
|
|
|
+ am1.allocate("*", 5 * GB, 2, 5, new ArrayList<ContainerId>(), null);
|
|
|
+
|
|
|
+ // Do node heartbeat to allocate first mapper on node1
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+
|
|
|
+ // Verify live on node1
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ Assert.assertEquals(2, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
|
|
+ Assert.assertEquals(7 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(7 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // Do node heartbeat to allocate second mapper on node2
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ // Verify live on node2
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // node1 7 GB used, node2 5 GB used
|
|
|
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertFalse(schedulerApp1.getReservedContainers().size() > 0);
|
|
|
+ Assert.assertEquals(12 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(12 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // request reducer containers for app1.
|
|
|
+ am1.allocate("*", 3 * GB, 2, 10, new ArrayList<ContainerId>(), null);
|
|
|
+
|
|
|
+ // Do node heartbeat to reserve reducer on node1
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+
|
|
|
+ // node1 7 GB used and 3 GB reserved, node2 5 GB used
|
|
|
+ Assert.assertEquals(3, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(1, schedulerApp1.getReservedContainers().size());
|
|
|
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(3 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(15 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(3 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ // Do node heartbeat to allocate container for second reducer on node2
|
|
|
+ // This should unreserve the reserved container
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
|
|
|
+
|
|
|
+ // Verify live on node2
|
|
|
+ containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 5);
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // node1 7 GB used and 0 GB reserved, node2 8 GB used
|
|
|
+ Assert.assertEquals(4, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(0, schedulerApp1.getReservedContainers().size());
|
|
|
+ Assert.assertEquals(15 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB, cs.getRootQueue().getQueueResourceUsage()
|
|
|
+ .getReserved("x").getMemorySize());
|
|
|
+ Assert.assertEquals(15 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getUsed("x").getMemorySize());
|
|
|
+ Assert.assertEquals(0 * GB,
|
|
|
+ leafQueue.getQueueResourceUsage().getReserved("x").getMemorySize());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
@Test (timeout = 120000)
|
|
|
public void testRMContainerLeakInLeafQueue() throws Exception {
|
|
|
// set node -> label
|