|
@@ -46,6 +46,7 @@ import org.apache.hadoop.security.Credentials;
|
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
|
|
+import org.apache.hadoop.service.Service;
|
|
|
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
|
@@ -71,11 +72,15 @@ import org.apache.hadoop.yarn.api.records.ResourceOption;
|
|
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
|
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
|
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
|
|
+import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
|
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UpdateNodeResourceRequest;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.AdminService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.Application;
|
|
@@ -83,10 +88,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.NMLivelinessMonitor;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.NodeManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.NodesListManager;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceTrackerService;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.Task;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MockRMWithAMS;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
|
@@ -106,6 +114,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEven
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
@@ -124,6 +133,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSc
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
|
@@ -144,7 +154,7 @@ import org.junit.Test;
|
|
|
import com.google.common.collect.ImmutableMap;
|
|
|
import com.google.common.collect.ImmutableSet;
|
|
|
import com.google.common.collect.Sets;
|
|
|
-
|
|
|
+import org.mockito.Mockito;
|
|
|
|
|
|
public class TestCapacityScheduler {
|
|
|
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
|
|
@@ -3248,4 +3258,106 @@ public class TestCapacityScheduler {
|
|
|
Assert.fail("Cannot find RMContainer");
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private class SleepHandler implements EventHandler<SchedulerEvent> {
|
|
|
+ boolean sleepFlag = false;
|
|
|
+ int sleepTime = 20;
|
|
|
+ @Override
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
+ try {
|
|
|
+ if(sleepFlag) {
|
|
|
+ Thread.sleep(sleepTime);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch(InterruptedException ie) {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private ResourceTrackerService getPrivateResourceTrackerService(
|
|
|
+ Dispatcher privateDispatcher, SleepHandler sleepHandler) {
|
|
|
+
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ ResourceTrackerService privateResourceTrackerService;
|
|
|
+
|
|
|
+ RMContext privateContext =
|
|
|
+ new RMContextImpl(privateDispatcher, null, null, null, null, null, null,
|
|
|
+ null, null, null);
|
|
|
+ privateContext.setNodeLabelManager(Mockito.mock(RMNodeLabelsManager.class));
|
|
|
+
|
|
|
+ privateDispatcher.register(SchedulerEventType.class, sleepHandler);
|
|
|
+ privateDispatcher.register(SchedulerEventType.class,
|
|
|
+ resourceManager.getResourceScheduler());
|
|
|
+ privateDispatcher.register(RMNodeEventType.class,
|
|
|
+ new ResourceManager.NodeEventDispatcher(privateContext));
|
|
|
+ ((Service) privateDispatcher).init(conf);
|
|
|
+ ((Service) privateDispatcher).start();
|
|
|
+ NMLivelinessMonitor nmLivelinessMonitor =
|
|
|
+ new NMLivelinessMonitor(privateDispatcher);
|
|
|
+ nmLivelinessMonitor.init(conf);
|
|
|
+ nmLivelinessMonitor.start();
|
|
|
+ NodesListManager nodesListManager = new NodesListManager(privateContext);
|
|
|
+ nodesListManager.init(conf);
|
|
|
+ RMContainerTokenSecretManager containerTokenSecretManager =
|
|
|
+ new RMContainerTokenSecretManager(conf);
|
|
|
+ containerTokenSecretManager.start();
|
|
|
+ NMTokenSecretManagerInRM nmTokenSecretManager =
|
|
|
+ new NMTokenSecretManagerInRM(conf);
|
|
|
+ nmTokenSecretManager.start();
|
|
|
+ privateResourceTrackerService =
|
|
|
+ new ResourceTrackerService(privateContext, nodesListManager,
|
|
|
+ nmLivelinessMonitor, containerTokenSecretManager,
|
|
|
+ nmTokenSecretManager);
|
|
|
+ privateResourceTrackerService.init(conf);
|
|
|
+ privateResourceTrackerService.start();
|
|
|
+ resourceManager.getResourceScheduler().setRMContext(privateContext);
|
|
|
+ return privateResourceTrackerService;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Test the behaviour of the capacity scheduler when a node reconnects
|
|
|
+ * with changed capabilities. This test is to catch any race conditions
|
|
|
+ * that might occur due to the use of the RMNode object.
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ @Test
|
|
|
+ public void testNodemanagerReconnect() throws Exception {
|
|
|
+
|
|
|
+ DrainDispatcher privateDispatcher = new DrainDispatcher();
|
|
|
+ SleepHandler sleepHandler = new SleepHandler();
|
|
|
+ ResourceTrackerService privateResourceTrackerService =
|
|
|
+ getPrivateResourceTrackerService(privateDispatcher,
|
|
|
+ sleepHandler);
|
|
|
+
|
|
|
+ // Register node1
|
|
|
+ String hostname1 = "localhost1";
|
|
|
+ Resource capability = BuilderUtils.newResource(4096, 4);
|
|
|
+ RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
|
|
|
+
|
|
|
+ RegisterNodeManagerRequest request1 =
|
|
|
+ recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
+ NodeId nodeId1 = NodeId.newInstance(hostname1, 0);
|
|
|
+ request1.setNodeId(nodeId1);
|
|
|
+ request1.setHttpPort(0);
|
|
|
+ request1.setResource(capability);
|
|
|
+ privateResourceTrackerService.registerNodeManager(request1);
|
|
|
+ privateDispatcher.await();
|
|
|
+ Resource clusterResource = resourceManager.getResourceScheduler().getClusterResource();
|
|
|
+ Assert.assertEquals("Initial cluster resources don't match", capability,
|
|
|
+ clusterResource);
|
|
|
+
|
|
|
+ Resource newCapability = BuilderUtils.newResource(1024, 1);
|
|
|
+ RegisterNodeManagerRequest request2 =
|
|
|
+ recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
|
|
|
+ request2.setNodeId(nodeId1);
|
|
|
+ request2.setHttpPort(0);
|
|
|
+ request2.setResource(newCapability);
|
|
|
+ // hold up the disaptcher and register the same node with lower capability
|
|
|
+ sleepHandler.sleepFlag = true;
|
|
|
+ privateResourceTrackerService.registerNodeManager(request2);
|
|
|
+ privateDispatcher.await();
|
|
|
+ Assert.assertEquals("Cluster resources don't match", newCapability,
|
|
|
+ resourceManager.getResourceScheduler().getClusterResource());
|
|
|
+ privateResourceTrackerService.stop();
|
|
|
+ }
|
|
|
}
|