|
@@ -0,0 +1,251 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
+
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager.TestAMRestart;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
+import org.junit.Assert;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+/**
|
|
|
+ * Validate system behavior when the am-scheduling logic 'blacklists' a node for
|
|
|
+ * an application because of AM failures.
|
|
|
+ */
|
|
|
+public class TestNodeBlacklistingOnAMFailures {
|
|
|
+
|
|
|
+ @Test(timeout = 100000)
|
|
|
+ public void testNodeBlacklistingOnAMFailure() throws Exception {
|
|
|
+
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
|
|
+ ResourceScheduler.class);
|
|
|
+ conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
|
|
|
+ true);
|
|
|
+
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+ MockRM rm = startRM(conf, dispatcher);
|
|
|
+ CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
|
|
|
+
|
|
|
+ MockNM nm1 =
|
|
|
+ new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+
|
|
|
+ MockNM nm2 =
|
|
|
+ new MockNM("127.0.0.2:2345", 8000, rm.getResourceTrackerService());
|
|
|
+ nm2.registerNode();
|
|
|
+
|
|
|
+ RMApp app = rm.submitApp(200);
|
|
|
+
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1);
|
|
|
+ ContainerId amContainerId =
|
|
|
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
|
|
+ RMContainer rmContainer = scheduler.getRMContainer(amContainerId);
|
|
|
+ NodeId nodeWhereAMRan = rmContainer.getAllocatedNode();
|
|
|
+
|
|
|
+ MockNM currentNode, otherNode;
|
|
|
+ if (nodeWhereAMRan.equals(nm1.getNodeId())) {
|
|
|
+ currentNode = nm1;
|
|
|
+ otherNode = nm2;
|
|
|
+ } else {
|
|
|
+ currentNode = nm2;
|
|
|
+ otherNode = nm1;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Set the exist status to INVALID so that we can verify that the system
|
|
|
+ // automatically blacklisting the node
|
|
|
+ makeAMContainerExit(rm, amContainerId, currentNode,
|
|
|
+ ContainerExitStatus.INVALID);
|
|
|
+
|
|
|
+ // restart the am
|
|
|
+ RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
|
|
|
+ System.out.println("New AppAttempt launched " + attempt.getAppAttemptId());
|
|
|
+
|
|
|
+ // Try the current node a few times
|
|
|
+ for (int i = 0; i <= 2; i++) {
|
|
|
+ currentNode.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ Assert.assertEquals(
|
|
|
+ "AppAttemptState should still be SCHEDULED if currentNode is "
|
|
|
+ + "blacklisted correctly", RMAppAttemptState.SCHEDULED,
|
|
|
+ attempt.getAppAttemptState());
|
|
|
+ }
|
|
|
+
|
|
|
+ // Now try the other node
|
|
|
+ otherNode.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ // Now the AM container should be allocated
|
|
|
+ MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
|
|
|
+
|
|
|
+ MockAM am2 = rm.sendAMLaunched(attempt.getAppAttemptId());
|
|
|
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
|
|
+ amContainerId =
|
|
|
+ ContainerId.newContainerId(am2.getApplicationAttemptId(), 1);
|
|
|
+ rmContainer = scheduler.getRMContainer(amContainerId);
|
|
|
+ nodeWhereAMRan = rmContainer.getAllocatedNode();
|
|
|
+
|
|
|
+ // The other node should now receive the assignment
|
|
|
+ Assert.assertEquals(
|
|
|
+ "After blacklisting, AM should have run on the other node",
|
|
|
+ otherNode.getNodeId(), nodeWhereAMRan);
|
|
|
+
|
|
|
+ am2.registerAppAttempt();
|
|
|
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
|
|
|
+
|
|
|
+ List<Container> allocatedContainers =
|
|
|
+ TestAMRestart.allocateContainers(currentNode, am2, 1);
|
|
|
+ Assert.assertEquals(
|
|
|
+ "Even though AM is blacklisted from the node, application can "
|
|
|
+ + "still allocate non-AM containers there",
|
|
|
+ currentNode.getNodeId(), allocatedContainers.get(0).getNodeId());
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test(timeout = 100000)
|
|
|
+ public void testNoBlacklistingForNonSystemErrors() throws Exception {
|
|
|
+
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ conf.setBoolean(YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_ENABLED,
|
|
|
+ true);
|
|
|
+ // disable the float so it is possible to blacklist the entire cluster
|
|
|
+ conf.setFloat(
|
|
|
+ YarnConfiguration.AM_SCHEDULING_NODE_BLACKLISTING_DISABLE_THRESHOLD,
|
|
|
+ 1.5f);
|
|
|
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 100);
|
|
|
+
|
|
|
+ DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
+ MockRM rm = startRM(conf, dispatcher);
|
|
|
+
|
|
|
+ MockNM node =
|
|
|
+ new MockNM("127.0.0.1:1234", 8000, rm.getResourceTrackerService());
|
|
|
+ node.registerNode();
|
|
|
+
|
|
|
+ RMApp app = rm.submitApp(200);
|
|
|
+ ApplicationId appId = app.getApplicationId();
|
|
|
+
|
|
|
+ int numAppAttempts = 1;
|
|
|
+
|
|
|
+ // Now the AM container should be allocated
|
|
|
+ RMAppAttempt attempt = MockRM.waitForAttemptScheduled(app, rm);
|
|
|
+ node.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+ MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
|
|
|
+ rm.sendAMLaunched(attempt.getAppAttemptId());
|
|
|
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
+ ApplicationAttemptId.newInstance(appId, numAppAttempts);
|
|
|
+ ContainerId amContainerId = ContainerId.newContainerId(appAttemptId, 1);
|
|
|
+
|
|
|
+ for (int containerExitStatus : new int[] {
|
|
|
+ ContainerExitStatus.PREEMPTED,
|
|
|
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
|
|
+ // ContainerExitStatus.KILLED_BY_APPMASTER,
|
|
|
+ ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
|
|
|
+ ContainerExitStatus.ABORTED, ContainerExitStatus.DISKS_FAILED,
|
|
|
+ ContainerExitStatus.KILLED_EXCEEDED_VMEM,
|
|
|
+ ContainerExitStatus.KILLED_EXCEEDED_PMEM }) {
|
|
|
+
|
|
|
+ // Set the exist status to be containerExitStatus so that we can verify
|
|
|
+ // that the system automatically blacklisting the node
|
|
|
+ makeAMContainerExit(rm, amContainerId, node, containerExitStatus);
|
|
|
+
|
|
|
+ // restart the am
|
|
|
+ attempt = MockRM.waitForAttemptScheduled(app, rm);
|
|
|
+ System.out
|
|
|
+ .println("New AppAttempt launched " + attempt.getAppAttemptId());
|
|
|
+
|
|
|
+ node.nodeHeartbeat(true);
|
|
|
+ dispatcher.await();
|
|
|
+
|
|
|
+ MockRM.waitForState(attempt, RMAppAttemptState.ALLOCATED, 20000);
|
|
|
+ rm.sendAMLaunched(attempt.getAppAttemptId());
|
|
|
+ rm.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.LAUNCHED);
|
|
|
+
|
|
|
+ numAppAttempts++;
|
|
|
+ appAttemptId = ApplicationAttemptId.newInstance(appId, numAppAttempts);
|
|
|
+ amContainerId = ContainerId.newContainerId(appAttemptId, 1);
|
|
|
+ rm.waitForState(node, amContainerId, RMContainerState.ACQUIRED);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void makeAMContainerExit(MockRM rm, ContainerId amContainer,
|
|
|
+ MockNM node, int exitStatus) throws Exception, InterruptedException {
|
|
|
+ ContainerStatus containerStatus =
|
|
|
+ BuilderUtils.newContainerStatus(amContainer, ContainerState.COMPLETE,
|
|
|
+ "", exitStatus, Resources.createResource(200));
|
|
|
+ node.containerStatus(containerStatus);
|
|
|
+ ApplicationAttemptId amAttemptID = amContainer.getApplicationAttemptId();
|
|
|
+ rm.waitForState(amAttemptID, RMAppAttemptState.FAILED);
|
|
|
+ rm.waitForState(amAttemptID.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
+ }
|
|
|
+
|
|
|
+ private MockRM startRM(YarnConfiguration conf,
|
|
|
+ final DrainDispatcher dispatcher) {
|
|
|
+
|
|
|
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ memStore.init(conf);
|
|
|
+
|
|
|
+ MockRM rm1 = new MockRM(conf, memStore) {
|
|
|
+ @Override
|
|
|
+ protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
|
|
|
+ return new EventDispatcher<SchedulerEvent>(this.scheduler,
|
|
|
+ this.scheduler.getClass().getName()) {
|
|
|
+ @Override
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
+ super.handle(event);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm1.start();
|
|
|
+ return rm1;
|
|
|
+ }
|
|
|
+}
|