|
@@ -58,6 +58,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
@@ -982,4 +984,82 @@ public class TestContainerAllocation {
|
|
|
Assert.assertEquals(2, lq.getMetrics().getAppsPending());
|
|
|
rm1.close();
|
|
|
}
|
|
|
+
|
|
|
+ @Test(timeout = 60000)
|
|
|
+ public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit()
|
|
|
+ throws Exception {
|
|
|
+ /**
|
|
|
+ * Queue structure:
|
|
|
+ * <pre>
|
|
|
+ * Root
|
|
|
+ * / | \
|
|
|
+ * a b c
|
|
|
+ * 10 20 70
|
|
|
+ * | \
|
|
|
+ * c1 c2
|
|
|
+ * 10(max=10) 90
|
|
|
+ * </pre>
|
|
|
+ * Test case:
|
|
|
+ * Create a cluster with two nodes whose node resource both are
|
|
|
+ * <10GB, 10core>, create queues as above, among them max-capacity of "c1"
|
|
|
+ * is 10 and others are all 100, so that max-capacity of queue "c1" is
|
|
|
+ * <2GB, 2core>,
|
|
|
+ * submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1,
|
|
|
+ * submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1,
|
|
|
+ * app1 and app2 both ask one <2GB, 1core> containers
|
|
|
+ *
|
|
|
+ * Now queue "c" has lower capacity percentage than queue "b", the
|
|
|
+ * allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached
|
|
|
+ * queue limit so that requests of app1 should be pending
|
|
|
+ *
|
|
|
+ * After nm1 do 1 heartbeat, scheduler should allocate one container for
|
|
|
+ * app2 on nm1.
|
|
|
+ */
|
|
|
+ CapacitySchedulerConfiguration newConf =
|
|
|
+ (CapacitySchedulerConfiguration) TestUtils
|
|
|
+ .getConfigurationWithMultipleQueues(conf);
|
|
|
+ newConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".c",
|
|
|
+ new String[] { "c1", "c2" });
|
|
|
+ newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
|
|
|
+ newConf
|
|
|
+ .setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
|
|
|
+ newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 90);
|
|
|
+ newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
|
|
+ DominantResourceCalculator.class, ResourceCalculator.class);
|
|
|
+
|
|
|
+ MockRM rm1 = new MockRM(newConf);
|
|
|
+
|
|
|
+ RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
|
|
|
+ nodeLabelsManager.init(newConf);
|
|
|
+ rm1.getRMContext().setNodeLabelManager(nodeLabelsManager);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
|
|
|
+
|
|
|
+ // launch an app to queue "c1", AM container should be launched on nm1
|
|
|
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // launch another app to queue "b", AM container should be launched on nm1
|
|
|
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
+
|
|
|
+ am1.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
|
|
+ am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
|
|
|
+
|
|
|
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
|
|
|
+ FiCaSchedulerApp schedulerApp1 =
|
|
|
+ cs.getApplicationAttempt(am1.getApplicationAttemptId());
|
|
|
+ FiCaSchedulerApp schedulerApp2 =
|
|
|
+ cs.getApplicationAttempt(am2.getApplicationAttemptId());
|
|
|
+
|
|
|
+ // Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2
|
|
|
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
|
|
|
+ rm1.drainEvents();
|
|
|
+ Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
|
|
|
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
}
|