|
@@ -0,0 +1,148 @@
|
|
|
+/**
|
|
|
+ * Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+ * or more contributor license agreements. See the NOTICE file
|
|
|
+ * distributed with this work for additional information
|
|
|
+ * regarding copyright ownership. The ASF licenses this file
|
|
|
+ * to you under the Apache License, Version 2.0 (the
|
|
|
+ * "License"); you may not use this file except in compliance
|
|
|
+ * with the License. You may obtain a copy of the License at
|
|
|
+ *
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ *
|
|
|
+ * Unless required by applicable law or agreed to in writing, software
|
|
|
+ * distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+ * See the License for the specific language governing permissions and
|
|
|
+ * limitations under the License.
|
|
|
+ */
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
+
|
|
|
+import static org.mockito.Matchers.any;
|
|
|
+import static org.mockito.Mockito.doAnswer;
|
|
|
+import static org.mockito.Mockito.doReturn;
|
|
|
+import static org.mockito.Mockito.mock;
|
|
|
+import static org.mockito.Mockito.verify;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import junit.framework.Assert;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
+import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.server.api.records.HeartbeatResponse;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
+import org.apache.hadoop.yarn.util.BuilderUtils;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+import org.mockito.invocation.InvocationOnMock;
|
|
|
+import org.mockito.stubbing.Answer;
|
|
|
+
|
|
|
+public class TestRMNodeTransitions {
|
|
|
+
|
|
|
+ RMNodeImpl node;
|
|
|
+
|
|
|
+ private RMContext rmContext;
|
|
|
+ private YarnScheduler scheduler;
|
|
|
+
|
|
|
+ private SchedulerEventType eventType;
|
|
|
+ private List<ContainerStatus> completedContainers;
|
|
|
+
|
|
|
+ private final class TestSchedulerEventDispatcher implements
|
|
|
+ EventHandler<SchedulerEvent> {
|
|
|
+ @Override
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
+ scheduler.handle(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ InlineDispatcher rmDispatcher = new InlineDispatcher();
|
|
|
+
|
|
|
+ rmContext =
|
|
|
+ new RMContextImpl(new MemStore(), rmDispatcher, null, null, null);
|
|
|
+ scheduler = mock(YarnScheduler.class);
|
|
|
+ doAnswer(
|
|
|
+ new Answer<Void>() {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Void answer(InvocationOnMock invocation) throws Throwable {
|
|
|
+ final SchedulerEvent event = (SchedulerEvent)(invocation.getArguments()[0]);
|
|
|
+ eventType = event.getType();
|
|
|
+ if (eventType == SchedulerEventType.NODE_UPDATE) {
|
|
|
+ completedContainers =
|
|
|
+ ((NodeUpdateSchedulerEvent)event).getCompletedContainers();
|
|
|
+ } else {
|
|
|
+ completedContainers = null;
|
|
|
+ }
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ ).when(scheduler).handle(any(SchedulerEvent.class));
|
|
|
+
|
|
|
+ rmDispatcher.register(SchedulerEventType.class,
|
|
|
+ new TestSchedulerEventDispatcher());
|
|
|
+
|
|
|
+
|
|
|
+ node = new RMNodeImpl(null, rmContext, null, 0, 0, null, null);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void tearDown() throws Exception {
|
|
|
+ }
|
|
|
+
|
|
|
+ private RMNodeStatusEvent getMockRMNodeStatusEvent() {
|
|
|
+ HeartbeatResponse response = mock(HeartbeatResponse.class);
|
|
|
+
|
|
|
+ NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
|
|
|
+ Boolean yes = new Boolean(true);
|
|
|
+ doReturn(yes).when(healthStatus).getIsNodeHealthy();
|
|
|
+
|
|
|
+ RMNodeStatusEvent event = mock(RMNodeStatusEvent.class);
|
|
|
+ doReturn(healthStatus).when(event).getNodeHealthStatus();
|
|
|
+ doReturn(response).when(event).getLatestResponse();
|
|
|
+ doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
|
|
|
+ return event;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testExpiredContainer() {
|
|
|
+ // Start the node
|
|
|
+ node.handle(new RMNodeEvent(null, RMNodeEventType.STARTED));
|
|
|
+ verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
|
|
|
+
|
|
|
+ // Expire a container
|
|
|
+ ContainerId completedContainerId = BuilderUtils.newContainerId(
|
|
|
+ BuilderUtils.newApplicationAttemptId(
|
|
|
+ BuilderUtils.newApplicationId(0, 0), 0), 0);
|
|
|
+ node.handle(new RMNodeCleanContainerEvent(null, completedContainerId));
|
|
|
+ Assert.assertEquals(1, node.getContainersToCleanUp().size());
|
|
|
+
|
|
|
+ // Now verify that scheduler isn't notified of an expired container
|
|
|
+ // by checking number of 'completedContainers' it got in the previous event
|
|
|
+ RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
|
|
|
+ ContainerStatus containerStatus = mock(ContainerStatus.class);
|
|
|
+ doReturn(completedContainerId).when(containerStatus).getContainerId();
|
|
|
+ doReturn(Collections.singletonList(containerStatus)).
|
|
|
+ when(statusEvent).getContainers();
|
|
|
+ node.handle(statusEvent);
|
|
|
+ Assert.assertEquals(0, completedContainers.size());
|
|
|
+ }
|
|
|
+
|
|
|
+}
|