|
@@ -20,12 +20,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.security.SecurityUtilTestHelper;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
|
@@ -41,19 +43,28 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMSecretManagerService;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.DummyRMNodeLabelsManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
+import com.google.common.collect.ImmutableMap;
|
|
|
+import com.google.common.collect.ImmutableSet;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
+
|
|
|
|
|
|
public class TestContainerAllocation {
|
|
|
|
|
@@ -62,12 +73,16 @@ public class TestContainerAllocation {
|
|
|
private final int GB = 1024;
|
|
|
|
|
|
private YarnConfiguration conf;
|
|
|
+
|
|
|
+ RMNodeLabelsManager mgr;
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws Exception {
|
|
|
conf = new YarnConfiguration();
|
|
|
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
ResourceScheduler.class);
|
|
|
+ mgr = new DummyRMNodeLabelsManager();
|
|
|
+ mgr.init(conf);
|
|
|
}
|
|
|
|
|
|
@Test(timeout = 3000000)
|
|
@@ -305,4 +320,449 @@ public class TestContainerAllocation {
|
|
|
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
|
|
|
MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
}
|
|
|
+
|
|
|
+ private Configuration getConfigurationWithDefaultQueueLabels(
|
|
|
+ Configuration config) {
|
|
|
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+
|
|
|
+ CapacitySchedulerConfiguration conf =
|
|
|
+ (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
|
|
|
+ new CapacitySchedulerConfiguration(config);
|
|
|
+ conf.setDefaultNodeLabelExpression(A, "x");
|
|
|
+ conf.setDefaultNodeLabelExpression(B, "y");
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Configuration getConfigurationWithQueueLabels(Configuration config) {
|
|
|
+ CapacitySchedulerConfiguration conf =
|
|
|
+ new CapacitySchedulerConfiguration(config);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
|
|
|
+
|
|
|
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ conf.setCapacity(A, 10);
|
|
|
+ conf.setMaximumCapacity(A, 15);
|
|
|
+ conf.setAccessibleNodeLabels(A, toSet("x"));
|
|
|
+ conf.setCapacityByLabel(A, "x", 100);
|
|
|
+
|
|
|
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ conf.setCapacity(B, 20);
|
|
|
+ conf.setAccessibleNodeLabels(B, toSet("y"));
|
|
|
+ conf.setCapacityByLabel(B, "y", 100);
|
|
|
+
|
|
|
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
|
|
+ conf.setCapacity(C, 70);
|
|
|
+ conf.setMaximumCapacity(C, 70);
|
|
|
+ conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
|
|
|
+
|
|
|
+ // Define 2nd-level queues
|
|
|
+ final String A1 = A + ".a1";
|
|
|
+ conf.setQueues(A, new String[] {"a1"});
|
|
|
+ conf.setCapacity(A1, 100);
|
|
|
+ conf.setMaximumCapacity(A1, 100);
|
|
|
+ conf.setCapacityByLabel(A1, "x", 100);
|
|
|
+
|
|
|
+ final String B1 = B + ".b1";
|
|
|
+ conf.setQueues(B, new String[] {"b1"});
|
|
|
+ conf.setCapacity(B1, 100);
|
|
|
+ conf.setMaximumCapacity(B1, 100);
|
|
|
+ conf.setCapacityByLabel(B1, "y", 100);
|
|
|
+
|
|
|
+ final String C1 = C + ".c1";
|
|
|
+ conf.setQueues(C, new String[] {"c1"});
|
|
|
+ conf.setCapacity(C1, 100);
|
|
|
+ conf.setMaximumCapacity(C1, 100);
|
|
|
+
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void checkTaskContainersHost(ApplicationAttemptId attemptId,
|
|
|
+ ContainerId containerId, ResourceManager rm, String host) {
|
|
|
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
|
|
|
+ SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
|
|
|
+
|
|
|
+ Assert.assertTrue(appReport.getLiveContainers().size() > 0);
|
|
|
+ for (RMContainer c : appReport.getLiveContainers()) {
|
|
|
+ if (c.getContainerId().equals(containerId)) {
|
|
|
+ Assert.assertEquals(host, c.getAllocatedNode().getHost());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private <E> Set<E> toSet(E... elements) {
|
|
|
+ Set<E> set = Sets.newHashSet(elements);
|
|
|
+ return set;
|
|
|
+ }
|
|
|
+
|
|
|
+ private Configuration getComplexConfigurationWithQueueLabels(
|
|
|
+ Configuration config) {
|
|
|
+ CapacitySchedulerConfiguration conf =
|
|
|
+ new CapacitySchedulerConfiguration(config);
|
|
|
+
|
|
|
+ // Define top-level queues
|
|
|
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
|
|
+
|
|
|
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
|
|
+ conf.setCapacity(A, 10);
|
|
|
+ conf.setMaximumCapacity(A, 10);
|
|
|
+ conf.setAccessibleNodeLabels(A, toSet("x", "y"));
|
|
|
+ conf.setCapacityByLabel(A, "x", 100);
|
|
|
+ conf.setCapacityByLabel(A, "y", 50);
|
|
|
+
|
|
|
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
|
|
+ conf.setCapacity(B, 90);
|
|
|
+ conf.setMaximumCapacity(B, 100);
|
|
|
+ conf.setAccessibleNodeLabels(B, toSet("y", "z"));
|
|
|
+ conf.setCapacityByLabel(B, "y", 50);
|
|
|
+ conf.setCapacityByLabel(B, "z", 100);
|
|
|
+
|
|
|
+ // Define 2nd-level queues
|
|
|
+ final String A1 = A + ".a1";
|
|
|
+ conf.setQueues(A, new String[] {"a1"});
|
|
|
+ conf.setCapacity(A1, 100);
|
|
|
+ conf.setMaximumCapacity(A1, 100);
|
|
|
+ conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
|
|
|
+ conf.setDefaultNodeLabelExpression(A1, "x");
|
|
|
+ conf.setCapacityByLabel(A1, "x", 100);
|
|
|
+ conf.setCapacityByLabel(A1, "y", 100);
|
|
|
+
|
|
|
+ conf.setQueues(B, new String[] {"b1", "b2"});
|
|
|
+ final String B1 = B + ".b1";
|
|
|
+ conf.setCapacity(B1, 50);
|
|
|
+ conf.setMaximumCapacity(B1, 50);
|
|
|
+ conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
|
|
|
+
|
|
|
+ final String B2 = B + ".b2";
|
|
|
+ conf.setCapacity(B2, 50);
|
|
|
+ conf.setMaximumCapacity(B2, 50);
|
|
|
+ conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
|
|
|
+ conf.setCapacityByLabel(B2, "y", 100);
|
|
|
+ conf.setCapacityByLabel(B2, "z", 100);
|
|
|
+
|
|
|
+ return conf;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testContainerAllocationWithSingleUserLimits() throws Exception {
|
|
|
+ final RMNodeLabelsManager mgr = new DummyRMNodeLabelsManager();
|
|
|
+ mgr.init(conf);
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
|
|
+ NodeId.newInstance("h2", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // A has only 10% of x, so it can only allocate one container in label=empty
|
|
|
+ ContainerId containerId =
|
|
|
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ // Cannot allocate 2nd label=empty container
|
|
|
+ containerId =
|
|
|
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 3);
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+
|
|
|
+ // A has default user limit = 100, so it can use all resource in label = x
|
|
|
+ // We can allocate floor(8000 / 1024) = 7 containers
|
|
|
+ for (int id = 3; id <= 8; id++) {
|
|
|
+ containerId =
|
|
|
+ ContainerId.newInstance(am1.getApplicationAttemptId(), id);
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ }
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 300000)
|
|
|
+ public void testContainerAllocateWithComplexLabels() throws Exception {
|
|
|
+ /*
|
|
|
+ * Queue structure:
|
|
|
+ * root (*)
|
|
|
+ * ________________
|
|
|
+ * / \
|
|
|
+ * a x(100%), y(50%) b y(50%), z(100%)
|
|
|
+ * ________________ ______________
|
|
|
+ * / / \
|
|
|
+ * a1 (x,y) b1(no) b2(y,z)
|
|
|
+ * 100% y = 100%, z = 100%
|
|
|
+ *
|
|
|
+ * Node structure:
|
|
|
+ * h1 : x
|
|
|
+ * h2 : x, y
|
|
|
+ * h3 : y
|
|
|
+ * h4 : y, z
|
|
|
+ * h5 : NO
|
|
|
+ *
|
|
|
+ * Total resource:
|
|
|
+ * x: 4G
|
|
|
+ * y: 6G
|
|
|
+ * z: 2G
|
|
|
+ * *: 2G
|
|
|
+ *
|
|
|
+ * Resource of
|
|
|
+ * a1: x=4G, y=3G, NO=0.2G
|
|
|
+ * b1: NO=0.9G (max=1G)
|
|
|
+ * b2: y=3, z=2G, NO=0.9G (max=1G)
|
|
|
+ *
|
|
|
+ * Each node can only allocate two containers
|
|
|
+ */
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
|
|
|
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("x", "y"),
|
|
|
+ NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
|
|
|
+ toSet("y", "z"), NodeId.newInstance("h5", 0),
|
|
|
+ RMNodeLabelsManager.EMPTY_STRING_SET));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 2048);
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 2048);
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 2048);
|
|
|
+ MockNM nm4 = rm1.registerNode("h4:1234", 2048);
|
|
|
+ MockNM nm5 = rm1.registerNode("h5:1234", 2048);
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // request a container (label = x && y). can only allocate on nm2
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x && y");
|
|
|
+ containerId =
|
|
|
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), and check all container will
|
|
|
+ // be allocated in h5
|
|
|
+ RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
|
|
|
+
|
|
|
+ // request a container for AM, will succeed
|
|
|
+ // and now b1's queue capacity will be used, cannot allocate more containers
|
|
|
+ // (Maximum capacity reached)
|
|
|
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm4, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm5, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+
|
|
|
+ // launch an app to queue b2
|
|
|
+ RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
|
|
|
+
|
|
|
+ // request a container. try to allocate on nm1 (label = x) and nm3 (label =
|
|
|
+ // y,z). Will successfully allocate on nm3
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
|
|
|
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h3");
|
|
|
+
|
|
|
+ // try to allocate container (request label = y && z) on nm3 (label = y) and
|
|
|
+ // nm4 (label = y,z). Will sucessfully allocate on nm4 only.
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y && z");
|
|
|
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 3);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm4, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h4");
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testContainerAllocateWithLabels() throws Exception {
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
|
|
+ NodeId.newInstance("h2", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
|
|
|
+ containerId =
|
|
|
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), and check all container will
|
|
|
+ // be allocated in h2
|
|
|
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
|
|
|
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // launch an app to queue c1 (label = ""), and check all container will
|
|
|
+ // be allocated in h3
|
|
|
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h3");
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test (timeout = 120000)
|
|
|
+ public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
|
|
|
+ // This test is pretty much similar to testContainerAllocateWithLabel.
|
|
|
+ // Difference is, this test doesn't specify label expression in ResourceRequest,
|
|
|
+ // instead, it uses default queue label expression
|
|
|
+
|
|
|
+ // set node -> label
|
|
|
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
|
|
|
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
|
|
|
+ NodeId.newInstance("h2", 0), toSet("y")));
|
|
|
+
|
|
|
+ // inject node label manager
|
|
|
+ MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
|
|
|
+ @Override
|
|
|
+ public RMNodeLabelsManager createNodeLabelManager() {
|
|
|
+ return mgr;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.getRMContext().setNodeLabelManager(mgr);
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
|
|
|
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
|
|
|
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
|
|
|
+
|
|
|
+ ContainerId containerId;
|
|
|
+
|
|
|
+ // launch an app to queue a1 (label = x), and check all container will
|
|
|
+ // be allocated in h1
|
|
|
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId =
|
|
|
+ ContainerId.newInstance(am1.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h1");
|
|
|
+
|
|
|
+ // launch an app to queue b1 (label = y), and check all container will
|
|
|
+ // be allocated in h2
|
|
|
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newInstance(am2.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h2");
|
|
|
+
|
|
|
+ // launch an app to queue c1 (label = ""), and check all container will
|
|
|
+ // be allocated in h3
|
|
|
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
|
|
|
+
|
|
|
+ // request a container.
|
|
|
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
|
|
|
+ containerId = ContainerId.newInstance(am3.getApplicationAttemptId(), 2);
|
|
|
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
|
|
|
+ RMContainerState.ALLOCATED, 10 * 1000));
|
|
|
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
|
|
|
+ "h3");
|
|
|
+
|
|
|
+ rm1.close();
|
|
|
+ }
|
|
|
}
|