|
@@ -18,12 +18,29 @@
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
|
|
|
|
|
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+import static org.mockito.Mockito.when;
|
|
|
+
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
-
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
@@ -31,11 +48,21 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
|
|
|
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Matchers;
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
import com.google.common.collect.ImmutableSet;
|
|
@@ -47,7 +74,8 @@ public class TestApplicationLimitsByPartition {
|
|
|
RMNodeLabelsManager mgr;
|
|
|
private YarnConfiguration conf;
|
|
|
|
|
|
- RMContext rmContext = null;
|
|
|
+ private final ResourceCalculator resourceCalculator =
|
|
|
+ new DefaultResourceCalculator();
|
|
|
|
|
|
@Before
|
|
|
public void setUp() throws IOException {
|
|
@@ -538,4 +566,174 @@ public class TestApplicationLimitsByPartition {
|
|
|
|
|
|
rm1.close();
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testHeadroom() throws Exception {
|
|
|
+ /*
|
|
|
+ * Test Case: Verify Headroom calculated is sum of headrooms for each
|
|
|
+ * partition requested. So submit a app with requests for default partition
|
|
|
+ * and 'x' partition, so the total headroom for the user should be sum of
|
|
|
+ * the head room for both labels.
|
|
|
+ */
|
|
|
+
|
|
|
+ simpleNodeLabelMappingToManager();
|
|
|
+ CapacitySchedulerConfiguration csConf =
|
|
|
+ (CapacitySchedulerConfiguration) TestUtils
|
|
|
+ .getComplexConfigurationWithQueueLabels(conf);
|
|
|
+ final String A1 = CapacitySchedulerConfiguration.ROOT + ".a" + ".a1";
|
|
|
+ final String B2 = CapacitySchedulerConfiguration.ROOT + ".b" + ".b2";
|
|
|
+ csConf.setUserLimit(A1, 25);
|
|
|
+ csConf.setUserLimit(B2, 25);
|
|
|
+
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+
|
|
|
+ CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
|
|
+ when(csContext.getConfiguration()).thenReturn(csConf);
|
|
|
+ when(csContext.getConf()).thenReturn(conf);
|
|
|
+ when(csContext.getMinimumResourceCapability())
|
|
|
+ .thenReturn(Resources.createResource(GB));
|
|
|
+ when(csContext.getMaximumResourceCapability())
|
|
|
+ .thenReturn(Resources.createResource(16 * GB));
|
|
|
+ when(csContext.getNonPartitionedQueueComparator())
|
|
|
+ .thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
|
|
|
+ when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
|
|
|
+ RMContext rmContext = TestUtils.getMockRMContext();
|
|
|
+ RMContext spyRMContext = spy(rmContext);
|
|
|
+ when(spyRMContext.getNodeLabelManager()).thenReturn(mgr);
|
|
|
+ when(csContext.getRMContext()).thenReturn(spyRMContext);
|
|
|
+
|
|
|
+ mgr.activateNode(NodeId.newInstance("h0", 0),
|
|
|
+ Resource.newInstance(160 * GB, 16)); // default Label
|
|
|
+ mgr.activateNode(NodeId.newInstance("h1", 0),
|
|
|
+ Resource.newInstance(160 * GB, 16)); // label x
|
|
|
+ mgr.activateNode(NodeId.newInstance("h2", 0),
|
|
|
+ Resource.newInstance(160 * GB, 16)); // label y
|
|
|
+
|
|
|
+ // Say cluster has 100 nodes of 16G each
|
|
|
+ Resource clusterResource = Resources.createResource(160 * GB);
|
|
|
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
|
|
|
+
|
|
|
+ Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
|
|
+ CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
|
|
|
+ "root", queues, queues, TestUtils.spyHook);
|
|
|
+
|
|
|
+ ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
|
|
|
+ when(csContext.getClusterResourceUsage())
|
|
|
+ .thenReturn(queueResUsage);
|
|
|
+
|
|
|
+ // Manipulate queue 'a'
|
|
|
+ LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue) queues.get("b2"));
|
|
|
+ queue.updateClusterResource(clusterResource,
|
|
|
+ new ResourceLimits(clusterResource));
|
|
|
+
|
|
|
+ String rack_0 = "rack_0";
|
|
|
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode("h0", rack_0, 0, 160 * GB);
|
|
|
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode("h1", rack_0, 0, 160 * GB);
|
|
|
+
|
|
|
+ final String user_0 = "user_0";
|
|
|
+ final String user_1 = "user_1";
|
|
|
+
|
|
|
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
+ ConcurrentMap<ApplicationId, RMApp> spyApps =
|
|
|
+ spy(new ConcurrentHashMap<ApplicationId, RMApp>());
|
|
|
+ RMApp rmApp = mock(RMApp.class);
|
|
|
+ ResourceRequest amResourceRequest = mock(ResourceRequest.class);
|
|
|
+ Resource amResource = Resources.createResource(0, 0);
|
|
|
+ when(amResourceRequest.getCapability()).thenReturn(amResource);
|
|
|
+ when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
|
|
|
+ Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
|
|
+ when(spyRMContext.getRMApps()).thenReturn(spyApps);
|
|
|
+ RMAppAttempt rmAppAttempt = mock(RMAppAttempt.class);
|
|
|
+ when(rmApp.getRMAppAttempt((ApplicationAttemptId) Matchers.any()))
|
|
|
+ .thenReturn(rmAppAttempt);
|
|
|
+ when(rmApp.getCurrentAppAttempt()).thenReturn(rmAppAttempt);
|
|
|
+ Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
|
|
|
+ Mockito.doReturn(true).when(spyApps)
|
|
|
+ .containsKey((ApplicationId) Matchers.any());
|
|
|
+
|
|
|
+ Priority priority_1 = TestUtils.createMockPriority(1);
|
|
|
+
|
|
|
+ // Submit first application with some resource-requests from user_0,
|
|
|
+ // and check headroom
|
|
|
+ final ApplicationAttemptId appAttemptId_0_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(0, 0);
|
|
|
+ FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0,
|
|
|
+ queue, queue.getActiveUsersManager(), spyRMContext);
|
|
|
+ queue.submitApplicationAttempt(app_0_0, user_0);
|
|
|
+
|
|
|
+ List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
|
|
|
+ app_0_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
|
|
+ 1 * GB, 2, true, priority_1, recordFactory));
|
|
|
+ app_0_0.updateResourceRequests(app_0_0_requests);
|
|
|
+
|
|
|
+ // Schedule to compute
|
|
|
+ queue.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
|
|
|
+ //head room = queue capacity = 50 % 90% 160 GB
|
|
|
+ Resource expectedHeadroom =
|
|
|
+ Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1);
|
|
|
+ assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
|
|
+
|
|
|
+ // Submit second application from user_0, check headroom
|
|
|
+ final ApplicationAttemptId appAttemptId_0_1 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(1, 0);
|
|
|
+ FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0,
|
|
|
+ queue, queue.getActiveUsersManager(), spyRMContext);
|
|
|
+ queue.submitApplicationAttempt(app_0_1, user_0);
|
|
|
+
|
|
|
+ List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
|
|
|
+ app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
|
|
+ 1 * GB, 2, true, priority_1, recordFactory));
|
|
|
+ app_0_1_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
|
|
+ 1 * GB, 2, true, priority_1, recordFactory, "y"));
|
|
|
+ app_0_1.updateResourceRequests(app_0_1_requests);
|
|
|
+
|
|
|
+ // Schedule to compute
|
|
|
+ queue.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
|
|
|
+ queue.assignContainers(clusterResource, node_1,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
|
|
|
+ assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change
|
|
|
+ //head room for default label + head room for y partition
|
|
|
+ //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
|
|
|
+ Resource expectedHeadroomWithReqInY =
|
|
|
+ Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom);
|
|
|
+ assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
|
|
|
+
|
|
|
+ // Submit first application from user_1, check for new headroom
|
|
|
+ final ApplicationAttemptId appAttemptId_1_0 =
|
|
|
+ TestUtils.getMockApplicationAttemptId(2, 0);
|
|
|
+ FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1,
|
|
|
+ queue, queue.getActiveUsersManager(), spyRMContext);
|
|
|
+ queue.submitApplicationAttempt(app_1_0, user_1);
|
|
|
+
|
|
|
+ List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
|
|
|
+ app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
|
|
+ 1 * GB, 2, true, priority_1, recordFactory));
|
|
|
+ app_1_0_requests.add(TestUtils.createResourceRequest(ResourceRequest.ANY,
|
|
|
+ 1 * GB, 2, true, priority_1, recordFactory, "y"));
|
|
|
+ app_1_0.updateResourceRequests(app_1_0_requests);
|
|
|
+
|
|
|
+ // Schedule to compute
|
|
|
+ queue.assignContainers(clusterResource, node_0,
|
|
|
+ new ResourceLimits(clusterResource),
|
|
|
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
|
|
|
+ //head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users)
|
|
|
+ expectedHeadroom =
|
|
|
+ Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1);
|
|
|
+ //head room for default label + head room for y partition
|
|
|
+ //head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
|
|
|
+ expectedHeadroomWithReqInY =
|
|
|
+ Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1),
|
|
|
+ expectedHeadroom);
|
|
|
+ assertEquals(expectedHeadroom, app_0_0.getHeadroom());
|
|
|
+ assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
|
|
|
+ assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom());
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
}
|