|
@@ -29,8 +29,11 @@ import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
+import java.util.Iterator;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
@@ -44,15 +47,19 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
import org.apache.hadoop.net.NetUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.ExecutionType;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.Event;
|
|
|
import org.apache.hadoop.yarn.event.EventDispatcher;
|
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
@@ -71,8 +78,14 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
@@ -1921,6 +1934,103 @@ public class TestResourceTrackerService extends NodeLabelTestBase {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ @Test
|
|
|
+ public void testHandleOpportunisticContainerStatus() throws Exception{
|
|
|
+ final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
|
+ conf.setBoolean(YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED,
|
|
|
+ true);
|
|
|
+ rm = new MockRM(conf){
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm.start();
|
|
|
+ RMApp app = rm.submitApp(1024, true);
|
|
|
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
|
|
|
+ .getAppAttemptId();
|
|
|
+
|
|
|
+ ResourceTrackerService resourceTrackerService =
|
|
|
+ rm.getResourceTrackerService();
|
|
|
+ SchedulerApplicationAttempt applicationAttempt = null;
|
|
|
+ while (applicationAttempt == null) {
|
|
|
+ applicationAttempt =
|
|
|
+ ((AbstractYarnScheduler)rm.getRMContext().getScheduler())
|
|
|
+ .getApplicationAttempt(appAttemptId);
|
|
|
+ Thread.sleep(100);
|
|
|
+ }
|
|
|
+
|
|
|
+ Resource currentConsumption = applicationAttempt.getCurrentConsumption();
|
|
|
+ Assert.assertEquals(Resource.newInstance(0, 0), currentConsumption);
|
|
|
+ Resource allocResources =
|
|
|
+ applicationAttempt.getQueue().getMetrics().getAllocatedResources();
|
|
|
+ Assert.assertEquals(Resource.newInstance(0, 0), allocResources);
|
|
|
+
|
|
|
+ RegisterNodeManagerRequest req = Records.newRecord(
|
|
|
+ RegisterNodeManagerRequest.class);
|
|
|
+ NodeId nodeId = NodeId.newInstance("host2", 1234);
|
|
|
+ Resource capability = BuilderUtils.newResource(1024, 1);
|
|
|
+ req.setResource(capability);
|
|
|
+ req.setNodeId(nodeId);
|
|
|
+ req.setHttpPort(1234);
|
|
|
+ req.setNMVersion(YarnVersionInfo.getVersion());
|
|
|
+ ContainerId c1 = ContainerId.newContainerId(appAttemptId, 1);
|
|
|
+ ContainerId c2 = ContainerId.newContainerId(appAttemptId, 2);
|
|
|
+ ContainerId c3 = ContainerId.newContainerId(appAttemptId, 3);
|
|
|
+ NMContainerStatus queuedOpp =
|
|
|
+ NMContainerStatus.newInstance(c1, 1, ContainerState.SCHEDULED,
|
|
|
+ Resource.newInstance(1024, 1), "Dummy Queued OC",
|
|
|
+ ContainerExitStatus.INVALID, Priority.newInstance(5), 1234, "",
|
|
|
+ ExecutionType.OPPORTUNISTIC);
|
|
|
+ NMContainerStatus runningOpp =
|
|
|
+ NMContainerStatus.newInstance(c2, 1, ContainerState.RUNNING,
|
|
|
+ Resource.newInstance(2048, 1), "Dummy Running OC",
|
|
|
+ ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
|
|
|
+ ExecutionType.OPPORTUNISTIC);
|
|
|
+ NMContainerStatus runningGuar =
|
|
|
+ NMContainerStatus.newInstance(c3, 1, ContainerState.RUNNING,
|
|
|
+ Resource.newInstance(2048, 1), "Dummy Running GC",
|
|
|
+ ContainerExitStatus.INVALID, Priority.newInstance(6), 1234, "",
|
|
|
+ ExecutionType.GUARANTEED);
|
|
|
+ req.setContainerStatuses(Arrays.asList(queuedOpp, runningOpp, runningGuar));
|
|
|
+ // trying to register a invalid node.
|
|
|
+ RegisterNodeManagerResponse response =
|
|
|
+ resourceTrackerService.registerNodeManager(req);
|
|
|
+ dispatcher.await();
|
|
|
+ Thread.sleep(2000);
|
|
|
+ dispatcher.await();
|
|
|
+ Assert.assertEquals(NodeAction.NORMAL, response.getNodeAction());
|
|
|
+
|
|
|
+ Collection<RMContainer> liveContainers = applicationAttempt
|
|
|
+ .getLiveContainers();
|
|
|
+ Assert.assertEquals(3, liveContainers.size());
|
|
|
+ Iterator<RMContainer> iter = liveContainers.iterator();
|
|
|
+ while (iter.hasNext()) {
|
|
|
+ RMContainer rc = iter.next();
|
|
|
+ Assert.assertEquals(
|
|
|
+ rc.getContainerId().equals(c3) ?
|
|
|
+ ExecutionType.GUARANTEED : ExecutionType.OPPORTUNISTIC,
|
|
|
+ rc.getExecutionType());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Should only include GUARANTEED resources
|
|
|
+ currentConsumption = applicationAttempt.getCurrentConsumption();
|
|
|
+ Assert.assertEquals(Resource.newInstance(2048, 1), currentConsumption);
|
|
|
+ allocResources =
|
|
|
+ applicationAttempt.getQueue().getMetrics().getAllocatedResources();
|
|
|
+ Assert.assertEquals(Resource.newInstance(2048, 1), allocResources);
|
|
|
+
|
|
|
+ SchedulerNode schedulerNode =
|
|
|
+ rm.getRMContext().getScheduler().getSchedulerNode(nodeId);
|
|
|
+ Assert.assertNotNull(schedulerNode);
|
|
|
+ Resource nodeResources = schedulerNode.getAllocatedResource();
|
|
|
+ Assert.assertEquals(Resource.newInstance(2048, 1), nodeResources);
|
|
|
+ }
|
|
|
+
|
|
|
@Test(timeout = 60000)
|
|
|
public void testNodeHeartBeatResponseForUnknownContainerCleanUp()
|
|
|
throws Exception {
|