|
@@ -18,6 +18,10 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
+import static org.mockito.Matchers.argThat;
|
|
|
+import static org.mockito.Mockito.doNothing;
|
|
|
+import static org.mockito.Mockito.spy;
|
|
|
+
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.EnumSet;
|
|
|
import java.util.HashMap;
|
|
@@ -33,6 +37,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
import org.apache.hadoop.yarn.api.records.Container;
|
|
@@ -44,9 +49,17 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.Token;
|
|
|
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
+import org.apache.hadoop.yarn.event.AbstractEvent;
|
|
|
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.Dispatcher;
|
|
|
+import org.apache.hadoop.yarn.event.EventHandler;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
|
|
@@ -54,7 +67,9 @@ import org.apache.log4j.Level;
|
|
|
import org.apache.log4j.LogManager;
|
|
|
import org.apache.log4j.Logger;
|
|
|
import org.junit.Test;
|
|
|
+import org.mockito.ArgumentMatcher;
|
|
|
|
|
|
+@SuppressWarnings({"unchecked", "rawtypes"})
|
|
|
public class TestRM {
|
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(TestRM.class);
|
|
@@ -397,19 +412,19 @@ public class TestRM {
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
|
nm1.registerNode();
|
|
|
- MockAM am1 = MockRM.launchAM(app1, rm1, nm1);
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
|
MockRM.finishApplicationMaster(app1, rm1, nm1, am1);
|
|
|
|
|
|
// a failed app
|
|
|
RMApp app2 = rm1.submitApp(200);
|
|
|
- MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
nm1.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
am2.waitForState(RMAppAttemptState.FAILED);
|
|
|
rm1.waitForState(app2.getApplicationId(), RMAppState.FAILED);
|
|
|
|
|
|
// a killed app
|
|
|
RMApp app3 = rm1.submitApp(200);
|
|
|
- MockAM am3 = MockRM.launchAM(app3, rm1, nm1);
|
|
|
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
|
|
|
rm1.killApp(app3.getApplicationId());
|
|
|
rm1.waitForState(app3.getApplicationId(), RMAppState.KILLED);
|
|
|
rm1.waitForState(am3.getApplicationAttemptId(), RMAppAttemptState.KILLED);
|
|
@@ -449,7 +464,7 @@ public class TestRM {
|
|
|
|
|
|
// a failed app
|
|
|
RMApp app2 = rm1.submitApp(200);
|
|
|
- MockAM am2 = MockRM.launchAM(app2, rm1, nm1);
|
|
|
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
|
|
|
nm1
|
|
|
.nodeHeartbeat(am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
am2.waitForState(RMAppAttemptState.FAILED);
|
|
@@ -466,10 +481,88 @@ public class TestRM {
|
|
|
Assert.assertEquals(-1, report1.getRpcPort());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Validate killing an application when it is at accepted state.
|
|
|
+ * @throws Exception exception
|
|
|
+ */
|
|
|
+ @Test (timeout = 60000)
|
|
|
+ public void testApplicationKillAtAcceptedState() throws Exception {
|
|
|
+
|
|
|
+ YarnConfiguration conf = new YarnConfiguration();
|
|
|
+ final Dispatcher dispatcher = new AsyncDispatcher() {
|
|
|
+ @Override
|
|
|
+ public EventHandler getEventHandler() {
|
|
|
+
|
|
|
+ class EventArgMatcher extends ArgumentMatcher<AbstractEvent> {
|
|
|
+ @Override
|
|
|
+ public boolean matches(Object argument) {
|
|
|
+ if (argument instanceof RMAppAttemptEvent) {
|
|
|
+ if (((RMAppAttemptEvent) argument).getType().equals(
|
|
|
+ RMAppAttemptEventType.KILL)) {
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ EventHandler handler = spy(super.getEventHandler());
|
|
|
+ doNothing().when(handler).handle(argThat(new EventArgMatcher()));
|
|
|
+ return handler;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ MockRM rm = new MockRM(conf) {
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ rm.start();
|
|
|
+ MockNM nm1 =
|
|
|
+ new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
|
|
|
+ nm1.registerNode();
|
|
|
+
|
|
|
+ // a failed app
|
|
|
+ RMApp application = rm.submitApp(200);
|
|
|
+ MockAM am = MockRM.launchAM(application, rm, nm1);
|
|
|
+ am.waitForState(RMAppAttemptState.LAUNCHED);
|
|
|
+ nm1.nodeHeartbeat(am.getApplicationAttemptId(), 1, ContainerState.RUNNING);
|
|
|
+ rm.waitForState(application.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
+
|
|
|
+ // Now kill the application before new attempt is launched, the app report
|
|
|
+ // returns the invalid AM host and port.
|
|
|
+ KillApplicationRequest request =
|
|
|
+ KillApplicationRequest.newInstance(application.getApplicationId());
|
|
|
+ rm.getClientRMService().forceKillApplication(request);
|
|
|
+
|
|
|
+ // Specific test for YARN-1689 follows
|
|
|
+ // Now let's say a race causes AM to register now. This should not crash RM.
|
|
|
+ am.registerAppAttempt(false);
|
|
|
+
|
|
|
+ // We explicitly intercepted the kill-event to RMAppAttempt, so app should
|
|
|
+ // still be in KILLING state.
|
|
|
+ rm.waitForState(application.getApplicationId(), RMAppState.KILLING);
|
|
|
+ // AM should now be in running
|
|
|
+ rm.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
|
|
+
|
|
|
+ // Simulate that appAttempt is killed.
|
|
|
+ rm.getRMContext().getDispatcher().getEventHandler().handle(
|
|
|
+ new RMAppEvent(application.getApplicationId(),
|
|
|
+ RMAppEventType.ATTEMPT_KILLED));
|
|
|
+ rm.waitForState(application.getApplicationId(), RMAppState.KILLED);
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestRM t = new TestRM();
|
|
|
t.testGetNewAppId();
|
|
|
t.testAppWithNoContainers();
|
|
|
t.testAppOnMultiNode();
|
|
|
+ t.testNMToken();
|
|
|
+ t.testActivatingApplicationAfterAddingNM();
|
|
|
+ t.testInvalidateAMHostPortWhenAMFailedOrKilled();
|
|
|
+ t.testInvalidatedAMHostPortOnAMRestart();
|
|
|
+ t.testApplicationKillAtAcceptedState();
|
|
|
}
|
|
|
}
|