|
@@ -0,0 +1,403 @@
|
|
|
+/**
|
|
|
+* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
+* or more contributor license agreements. See the NOTICE file
|
|
|
+* distributed with this work for additional information
|
|
|
+* regarding copyright ownership. The ASF licenses this file
|
|
|
+* to you under the Apache License, Version 2.0 (the
|
|
|
+* "License"); you may not use this file except in compliance
|
|
|
+* with the License. You may obtain a copy of the License at
|
|
|
+*
|
|
|
+* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+*
|
|
|
+* Unless required by applicable law or agreed to in writing, software
|
|
|
+* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
+* See the License for the specific language governing permissions and
|
|
|
+* limitations under the License.
|
|
|
+*/
|
|
|
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
|
|
|
+
|
|
|
+import static org.junit.Assert.assertEquals;
|
|
|
+import static org.junit.Assert.assertNull;
|
|
|
+import static org.mockito.Matchers.*;
|
|
|
+import static org.mockito.Mockito.*;
|
|
|
+
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
+import org.apache.hadoop.conf.Configuration;
|
|
|
+import org.apache.hadoop.yarn.MockApps;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.Container;
|
|
|
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.AMLauncherEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptRejectedEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
|
+import org.junit.After;
|
|
|
+import org.junit.Before;
|
|
|
+import org.junit.Test;
|
|
|
+
|
|
|
+public class TestRMAppAttemptTransitions {
|
|
|
+
|
|
|
+ private static final Log LOG =
|
|
|
+ LogFactory.getLog(TestRMAppAttemptTransitions.class);
|
|
|
+
|
|
|
+ private static final String EMPTY_DIAGNOSTICS = "";
|
|
|
+
|
|
|
+ private RMContext rmContext;
|
|
|
+ private YarnScheduler scheduler;
|
|
|
+ private ApplicationMasterService masterService;
|
|
|
+ private ApplicationMasterLauncher applicationMasterLauncher;
|
|
|
+
|
|
|
+ private RMApp application;
|
|
|
+ private RMAppAttempt applicationAttempt;
|
|
|
+
|
|
|
+ private final class TestApplicationAttemptEventDispatcher implements
|
|
|
+ EventHandler<RMAppAttemptEvent> {
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void handle(RMAppAttemptEvent event) {
|
|
|
+ ApplicationAttemptId appID = event.getApplicationAttemptId();
|
|
|
+ assertEquals(applicationAttempt.getAppAttemptId(), appID);
|
|
|
+ try {
|
|
|
+ applicationAttempt.handle(event);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error in handling event type " + event.getType()
|
|
|
+ + " for application " + appID, t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // handle all the RM application events - same as in ResourceManager.java
|
|
|
+ private final class TestApplicationEventDispatcher implements
|
|
|
+ EventHandler<RMAppEvent> {
|
|
|
+ @Override
|
|
|
+ public void handle(RMAppEvent event) {
|
|
|
+ assertEquals(application.getApplicationId(), event.getApplicationId());
|
|
|
+ try {
|
|
|
+ application.handle(event);
|
|
|
+ } catch (Throwable t) {
|
|
|
+ LOG.error("Error in handling event type " + event.getType()
|
|
|
+ + " for application " + application.getApplicationId(), t);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private final class TestSchedulerEventDispatcher implements
|
|
|
+ EventHandler<SchedulerEvent> {
|
|
|
+ @Override
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
+ scheduler.handle(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private final class TestAMLauncherEventDispatcher implements
|
|
|
+ EventHandler<AMLauncherEvent> {
|
|
|
+ @Override
|
|
|
+ public void handle(AMLauncherEvent event) {
|
|
|
+ applicationMasterLauncher.handle(event);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static int appId = 1;
|
|
|
+
|
|
|
+ @Before
|
|
|
+ public void setUp() throws Exception {
|
|
|
+ InlineDispatcher rmDispatcher = new InlineDispatcher();
|
|
|
+
|
|
|
+ ContainerAllocationExpirer containerAllocationExpirer =
|
|
|
+ mock(ContainerAllocationExpirer.class);
|
|
|
+ AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
|
|
+ rmContext = new RMContextImpl(new MemStore(), rmDispatcher,
|
|
|
+ containerAllocationExpirer, amLivelinessMonitor);
|
|
|
+
|
|
|
+ scheduler = mock(YarnScheduler.class);
|
|
|
+ masterService = mock(ApplicationMasterService.class);
|
|
|
+ applicationMasterLauncher = mock(ApplicationMasterLauncher.class);
|
|
|
+
|
|
|
+ rmDispatcher.register(RMAppAttemptEventType.class,
|
|
|
+ new TestApplicationAttemptEventDispatcher());
|
|
|
+
|
|
|
+ rmDispatcher.register(RMAppEventType.class,
|
|
|
+ new TestApplicationEventDispatcher());
|
|
|
+
|
|
|
+ rmDispatcher.register(SchedulerEventType.class,
|
|
|
+ new TestSchedulerEventDispatcher());
|
|
|
+
|
|
|
+ rmDispatcher.register(AMLauncherEventType.class,
|
|
|
+ new TestAMLauncherEventDispatcher());
|
|
|
+
|
|
|
+ rmDispatcher.init(new Configuration());
|
|
|
+ rmDispatcher.start();
|
|
|
+
|
|
|
+
|
|
|
+ ApplicationId applicationId = MockApps.newAppID(appId++);
|
|
|
+ ApplicationAttemptId applicationAttemptId =
|
|
|
+ MockApps.newAppAttemptID(applicationId, 0);
|
|
|
+
|
|
|
+ final String user = MockApps.newUserName();
|
|
|
+ final String queue = MockApps.newQueue();
|
|
|
+ ApplicationSubmissionContext submissionContext =
|
|
|
+ mock(ApplicationSubmissionContext.class);
|
|
|
+ when(submissionContext.getUser()).thenReturn(user);
|
|
|
+ when(submissionContext.getQueue()).thenReturn(queue);
|
|
|
+ ContainerLaunchContext amContainerSpec = mock(ContainerLaunchContext.class);
|
|
|
+ Resource resource = mock(Resource.class);
|
|
|
+ when(amContainerSpec.getResource()).thenReturn(resource);
|
|
|
+ when(submissionContext.getAMContainerSpec()).thenReturn(amContainerSpec);
|
|
|
+
|
|
|
+ application = mock(RMApp.class);
|
|
|
+ applicationAttempt =
|
|
|
+ new RMAppAttemptImpl(applicationAttemptId, null, rmContext, scheduler,
|
|
|
+ masterService, submissionContext);
|
|
|
+ when(application.getCurrentAppAttempt()).thenReturn(applicationAttempt);
|
|
|
+ when(application.getApplicationId()).thenReturn(applicationId);
|
|
|
+
|
|
|
+ testAppAttemptNewState();
|
|
|
+ }
|
|
|
+
|
|
|
+ @After
|
|
|
+ public void tearDown() throws Exception {
|
|
|
+ ((AsyncDispatcher)this.rmContext.getDispatcher()).stop();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link RMAppAttemptState#NEW}
|
|
|
+ */
|
|
|
+ private void testAppAttemptNewState() {
|
|
|
+ assertEquals(RMAppAttemptState.NEW,
|
|
|
+ applicationAttempt.getAppAttemptState());
|
|
|
+ assertEquals(0, applicationAttempt.getDiagnostics().length());
|
|
|
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
|
|
+ assertNull(applicationAttempt.getMasterContainer());
|
|
|
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
|
|
+ assertEquals(0, applicationAttempt.getRanNodes().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link RMAppAttemptState#SUBMITTED}
|
|
|
+ */
|
|
|
+ private void testAppAttemptSubmittedState() {
|
|
|
+ assertEquals(RMAppAttemptState.SUBMITTED,
|
|
|
+ applicationAttempt.getAppAttemptState());
|
|
|
+ assertEquals(0, applicationAttempt.getDiagnostics().length());
|
|
|
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
|
|
+ assertNull(applicationAttempt.getMasterContainer());
|
|
|
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
|
|
+ assertEquals(0, applicationAttempt.getRanNodes().size());
|
|
|
+
|
|
|
+ // Check events
|
|
|
+ verify(masterService).
|
|
|
+ registerAppAttempt(applicationAttempt.getAppAttemptId());
|
|
|
+ verify(scheduler).handle(any(AppAddedSchedulerEvent.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link RMAppAttemptState#SUBMITTED} -> {@link RMAppAttemptState#FAILED}
|
|
|
+ */
|
|
|
+ private void testAppAttemptSubmittedToFailedState(String diagnostics) {
|
|
|
+ assertEquals(RMAppAttemptState.FAILED,
|
|
|
+ applicationAttempt.getAppAttemptState());
|
|
|
+ assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
|
|
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
|
|
+ assertNull(applicationAttempt.getMasterContainer());
|
|
|
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
|
|
+ assertEquals(0, applicationAttempt.getRanNodes().size());
|
|
|
+
|
|
|
+ // Check events
|
|
|
+ verify(application).handle(any(RMAppRejectedEvent.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link RMAppAttemptState#KILLED}
|
|
|
+ */
|
|
|
+ private void testAppAttemptKilledState(Container amContainer,
|
|
|
+ String diagnostics) {
|
|
|
+ assertEquals(RMAppAttemptState.KILLED,
|
|
|
+ applicationAttempt.getAppAttemptState());
|
|
|
+ assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
|
|
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
|
|
+ assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
|
|
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
|
|
+ assertEquals(0, applicationAttempt.getRanNodes().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link RMAppAttemptState#SCHEDULED}
|
|
|
+ */
|
|
|
+ private void testAppAttemptScheduledState() {
|
|
|
+ assertEquals(RMAppAttemptState.SCHEDULED,
|
|
|
+ applicationAttempt.getAppAttemptState());
|
|
|
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
|
|
+ assertNull(applicationAttempt.getMasterContainer());
|
|
|
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
|
|
+ assertEquals(0, applicationAttempt.getRanNodes().size());
|
|
|
+
|
|
|
+ // Check events
|
|
|
+ verify(application).handle(any(RMAppEvent.class));
|
|
|
+ verify(scheduler).
|
|
|
+ allocate(any(ApplicationAttemptId.class),
|
|
|
+ any(List.class), any(List.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link RMAppAttemptState#ALLOCATED}
|
|
|
+ */
|
|
|
+ private void testAppAttemptAllocatedState(Container amContainer) {
|
|
|
+ assertEquals(RMAppAttemptState.ALLOCATED,
|
|
|
+ applicationAttempt.getAppAttemptState());
|
|
|
+ assertEquals(amContainer, applicationAttempt.getMasterContainer());
|
|
|
+
|
|
|
+ // Check events
|
|
|
+ verify(applicationMasterLauncher).handle(any(AMLauncherEvent.class));
|
|
|
+ verify(scheduler, times(2)).
|
|
|
+ allocate(
|
|
|
+ any(ApplicationAttemptId.class), any(List.class), any(List.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@link RMAppAttemptState#FAILED}
|
|
|
+ */
|
|
|
+ private void testAppAttemptFailedState(Container container,
|
|
|
+ String diagnostics) {
|
|
|
+ assertEquals(RMAppAttemptState.FAILED,
|
|
|
+ applicationAttempt.getAppAttemptState());
|
|
|
+ assertEquals(diagnostics, applicationAttempt.getDiagnostics());
|
|
|
+ assertEquals(0,applicationAttempt.getJustFinishedContainers().size());
|
|
|
+ assertEquals(container, applicationAttempt.getMasterContainer());
|
|
|
+ assertEquals(0.0, (double)applicationAttempt.getProgress(), 0.0001);
|
|
|
+ assertEquals(0, applicationAttempt.getRanNodes().size());
|
|
|
+
|
|
|
+ // Check events
|
|
|
+ verify(application, times(2)).handle(any(RMAppFailedAttemptEvent.class));
|
|
|
+ }
|
|
|
+
|
|
|
+ private void submitApplicationAttempt() {
|
|
|
+ ApplicationAttemptId appAttemptId = applicationAttempt.getAppAttemptId();
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptEvent(appAttemptId, RMAppAttemptEventType.START));
|
|
|
+ testAppAttemptSubmittedState();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void scheduleApplicationAttempt() {
|
|
|
+ submitApplicationAttempt();
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(),
|
|
|
+ RMAppAttemptEventType.APP_ACCEPTED));
|
|
|
+ testAppAttemptScheduledState();
|
|
|
+ }
|
|
|
+
|
|
|
+ private Container allocateApplicationAttempt() {
|
|
|
+ scheduleApplicationAttempt();
|
|
|
+
|
|
|
+ // Mock the allocation of AM container
|
|
|
+ Container container = mock(Container.class);
|
|
|
+ Allocation allocation = mock(Allocation.class);
|
|
|
+ when(allocation.getContainers()).
|
|
|
+ thenReturn(Collections.singletonList(container));
|
|
|
+ when(
|
|
|
+ scheduler.allocate(
|
|
|
+ any(ApplicationAttemptId.class),
|
|
|
+ any(List.class),
|
|
|
+ any(List.class))).
|
|
|
+ thenReturn(allocation);
|
|
|
+
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptContainerAllocatedEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(),
|
|
|
+ container));
|
|
|
+
|
|
|
+ testAppAttemptAllocatedState(container);
|
|
|
+
|
|
|
+ return container;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testNewToKilled() {
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(),
|
|
|
+ RMAppAttemptEventType.KILL));
|
|
|
+ testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSubmittedToFailed() {
|
|
|
+ submitApplicationAttempt();
|
|
|
+ String message = "Rejected";
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptRejectedEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(), message));
|
|
|
+ testAppAttemptSubmittedToFailedState(message);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testSubmittedToKilled() {
|
|
|
+ submitApplicationAttempt();
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(),
|
|
|
+ RMAppAttemptEventType.KILL));
|
|
|
+ testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testScheduledToKilled() {
|
|
|
+ scheduleApplicationAttempt();
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(),
|
|
|
+ RMAppAttemptEventType.KILL));
|
|
|
+ testAppAttemptKilledState(null, EMPTY_DIAGNOSTICS);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testAllocatedToKilled() {
|
|
|
+ Container amContainer = allocateApplicationAttempt();
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(),
|
|
|
+ RMAppAttemptEventType.KILL));
|
|
|
+ testAppAttemptKilledState(amContainer, EMPTY_DIAGNOSTICS);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Test
|
|
|
+ public void testAllocatedToFailed() {
|
|
|
+ Container amContainer = allocateApplicationAttempt();
|
|
|
+ String diagnostics = "Launch Failed";
|
|
|
+ applicationAttempt.handle(
|
|
|
+ new RMAppAttemptLaunchFailedEvent(
|
|
|
+ applicationAttempt.getAppAttemptId(),
|
|
|
+ diagnostics));
|
|
|
+ testAppAttemptFailedState(amContainer, diagnostics);
|
|
|
+ }
|
|
|
+
|
|
|
+}
|