|
@@ -52,6 +52,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
|
import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
+import org.apache.hadoop.net.NetworkTopology;
|
|
|
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
|
|
import org.apache.hadoop.yarn.MockApps;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
@@ -61,6 +62,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.QueueInfo;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
@@ -68,11 +70,17 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Event;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
@@ -84,6 +92,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptI
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
@@ -99,6 +108,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateS
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.Default;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.ControlledClock;
|
|
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
|
@@ -106,12 +116,14 @@ import org.junit.After;
|
|
|
import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.Mockito;
|
|
|
import org.xml.sax.SAXException;
|
|
|
|
|
|
import com.google.common.collect.Sets;
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
|
public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
+ private final int GB = 1024;
|
|
|
private final static String ALLOC_FILE =
|
|
|
new File(TEST_DIR, "test-queues").getAbsolutePath();
|
|
|
|
|
@@ -4372,4 +4384,83 @@ public class TestFairScheduler extends FairSchedulerTestBase {
|
|
|
long initSchedulerTime = lastScheduledContainer.get(priority);
|
|
|
assertEquals(DELAY_THRESHOLD_TIME_MS, initSchedulerTime);
|
|
|
}
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testResourceUpdateDecommissioningNode() throws Exception {
|
|
|
+ // Mock the RMNodeResourceUpdate event handler to update SchedulerNode
|
|
|
+ // to have 0 available resource
|
|
|
+ RMContext spyContext = Mockito.spy(resourceManager.getRMContext());
|
|
|
+ Dispatcher mockDispatcher = mock(AsyncDispatcher.class);
|
|
|
+ when(mockDispatcher.getEventHandler()).thenReturn(new EventHandler() {
|
|
|
+ @Override
|
|
|
+ public void handle(Event event) {
|
|
|
+ if (event instanceof RMNodeResourceUpdateEvent) {
|
|
|
+ RMNodeResourceUpdateEvent resourceEvent =
|
|
|
+ (RMNodeResourceUpdateEvent) event;
|
|
|
+ resourceManager
|
|
|
+ .getResourceScheduler()
|
|
|
+ .getSchedulerNode(resourceEvent.getNodeId())
|
|
|
+ .setTotalResource(resourceEvent.getResourceOption().getResource());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
+ Mockito.doReturn(mockDispatcher).when(spyContext).getDispatcher();
|
|
|
+ ((FairScheduler) resourceManager.getResourceScheduler())
|
|
|
+ .setRMContext(spyContext);
|
|
|
+ ((AsyncDispatcher) mockDispatcher).start();
|
|
|
+ // Register node
|
|
|
+ String host_0 = "host_0";
|
|
|
+ org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm_0 =
|
|
|
+ registerNode(host_0, 1234, 2345, NetworkTopology.DEFAULT_RACK,
|
|
|
+ Resources.createResource(8 * GB, 4));
|
|
|
+
|
|
|
+ RMNode node =
|
|
|
+ resourceManager.getRMContext().getRMNodes().get(nm_0.getNodeId());
|
|
|
+ // Send a heartbeat to kick the tires on the Scheduler
|
|
|
+ NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
|
|
|
+ resourceManager.getResourceScheduler().handle(nodeUpdate);
|
|
|
+
|
|
|
+ // Kick off another heartbeat with the node state mocked to decommissioning
|
|
|
+ // This should update the schedulernodes to have 0 available resource
|
|
|
+ RMNode spyNode =
|
|
|
+ Mockito.spy(resourceManager.getRMContext().getRMNodes()
|
|
|
+ .get(nm_0.getNodeId()));
|
|
|
+ when(spyNode.getState()).thenReturn(NodeState.DECOMMISSIONING);
|
|
|
+ resourceManager.getResourceScheduler().handle(
|
|
|
+ new NodeUpdateSchedulerEvent(spyNode));
|
|
|
+
|
|
|
+ // Check the used resource is 0 GB 0 core
|
|
|
+ // Assert.assertEquals(1 * GB, nm_0.getUsed().getMemory());
|
|
|
+ Resource usedResource =
|
|
|
+ resourceManager.getResourceScheduler()
|
|
|
+ .getSchedulerNode(nm_0.getNodeId()).getUsedResource();
|
|
|
+ Assert.assertEquals(usedResource.getMemory(), 0);
|
|
|
+ Assert.assertEquals(usedResource.getVirtualCores(), 0);
|
|
|
+ // Check total resource of scheduler node is also changed to 0 GB 0 core
|
|
|
+ Resource totalResource =
|
|
|
+ resourceManager.getResourceScheduler()
|
|
|
+ .getSchedulerNode(nm_0.getNodeId()).getTotalResource();
|
|
|
+ Assert.assertEquals(totalResource.getMemory(), 0 * GB);
|
|
|
+ Assert.assertEquals(totalResource.getVirtualCores(), 0);
|
|
|
+ // Check the available resource is 0/0
|
|
|
+ Resource availableResource =
|
|
|
+ resourceManager.getResourceScheduler()
|
|
|
+ .getSchedulerNode(nm_0.getNodeId()).getAvailableResource();
|
|
|
+ Assert.assertEquals(availableResource.getMemory(), 0);
|
|
|
+ Assert.assertEquals(availableResource.getVirtualCores(), 0);
|
|
|
+ }
|
|
|
+
|
|
|
+ private org.apache.hadoop.yarn.server.resourcemanager.NodeManager registerNode(
|
|
|
+ String hostName, int containerManagerPort, int httpPort, String rackName,
|
|
|
+ Resource capability) throws IOException, YarnException {
|
|
|
+ org.apache.hadoop.yarn.server.resourcemanager.NodeManager nm =
|
|
|
+ new org.apache.hadoop.yarn.server.resourcemanager.NodeManager(hostName,
|
|
|
+ containerManagerPort, httpPort, rackName, capability,
|
|
|
+ resourceManager);
|
|
|
+ NodeAddedSchedulerEvent nodeAddEvent1 =
|
|
|
+ new NodeAddedSchedulerEvent(resourceManager.getRMContext().getRMNodes()
|
|
|
+ .get(nm.getNodeId()));
|
|
|
+ resourceManager.getResourceScheduler().handle(nodeAddEvent1);
|
|
|
+ return nm;
|
|
|
+ }
|
|
|
}
|