|
@@ -33,14 +33,15 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
-import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.event.DrainDispatcher;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.event.EventHandler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.RMAppManagerEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.ApplicationsStore.ApplicationStore;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemStore;
|
|
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
|
|
@@ -48,6 +49,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptE
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.ApplicationTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
|
|
import org.junit.Before;
|
|
import org.junit.Before;
|
|
@@ -60,7 +63,7 @@ public class TestRMAppTransitions {
|
|
private RMContext rmContext;
|
|
private RMContext rmContext;
|
|
private static int maxRetries = 4;
|
|
private static int maxRetries = 4;
|
|
private static int appId = 1;
|
|
private static int appId = 1;
|
|
-// private AsyncDispatcher rmDispatcher;
|
|
|
|
|
|
+ private DrainDispatcher rmDispatcher;
|
|
|
|
|
|
// ignore all the RM application attempt events
|
|
// ignore all the RM application attempt events
|
|
private static final class TestApplicationAttemptEventDispatcher implements
|
|
private static final class TestApplicationAttemptEventDispatcher implements
|
|
@@ -110,12 +113,27 @@ public class TestRMAppTransitions {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ // handle all the RM application manager events - same as in
|
|
|
|
+ // ResourceManager.java
|
|
|
|
+ private static final class TestApplicationManagerEventDispatcher implements
|
|
|
|
+ EventHandler<RMAppManagerEvent> {
|
|
|
|
+ @Override
|
|
|
|
+ public void handle(RMAppManagerEvent event) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // handle all the scheduler events - same as in ResourceManager.java
|
|
|
|
+ private static final class TestSchedulerEventDispatcher implements
|
|
|
|
+ EventHandler<SchedulerEvent> {
|
|
|
|
+ @Override
|
|
|
|
+ public void handle(SchedulerEvent event) {
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
@Before
|
|
@Before
|
|
public void setUp() throws Exception {
|
|
public void setUp() throws Exception {
|
|
- AsyncDispatcher rmDispatcher = new AsyncDispatcher();
|
|
|
|
Configuration conf = new Configuration();
|
|
Configuration conf = new Configuration();
|
|
- rmDispatcher = new InlineDispatcher();
|
|
|
|
-
|
|
|
|
|
|
+ rmDispatcher = new DrainDispatcher();
|
|
ContainerAllocationExpirer containerAllocationExpirer =
|
|
ContainerAllocationExpirer containerAllocationExpirer =
|
|
mock(ContainerAllocationExpirer.class);
|
|
mock(ContainerAllocationExpirer.class);
|
|
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
|
AMLivelinessMonitor amLivelinessMonitor = mock(AMLivelinessMonitor.class);
|
|
@@ -131,6 +149,13 @@ public class TestRMAppTransitions {
|
|
|
|
|
|
rmDispatcher.register(RMAppEventType.class,
|
|
rmDispatcher.register(RMAppEventType.class,
|
|
new TestApplicationEventDispatcher(rmContext));
|
|
new TestApplicationEventDispatcher(rmContext));
|
|
|
|
+
|
|
|
|
+ rmDispatcher.register(RMAppManagerEventType.class,
|
|
|
|
+ new TestApplicationManagerEventDispatcher());
|
|
|
|
+
|
|
|
|
+ rmDispatcher.register(SchedulerEventType.class,
|
|
|
|
+ new TestSchedulerEventDispatcher());
|
|
|
|
+
|
|
rmDispatcher.init(conf);
|
|
rmDispatcher.init(conf);
|
|
rmDispatcher.start();
|
|
rmDispatcher.start();
|
|
}
|
|
}
|
|
@@ -225,9 +250,8 @@ public class TestRMAppTransitions {
|
|
"Application killed by user.", diag.toString());
|
|
"Application killed by user.", diag.toString());
|
|
}
|
|
}
|
|
|
|
|
|
- private static void assertAppAndAttemptKilled(RMApp application) {
|
|
|
|
|
|
+ private static void assertAppAndAttemptKilled(RMApp application) throws InterruptedException {
|
|
assertKilled(application);
|
|
assertKilled(application);
|
|
- /* also check if the attempt is killed */
|
|
|
|
Assert.assertEquals( RMAppAttemptState.KILLED,
|
|
Assert.assertEquals( RMAppAttemptState.KILLED,
|
|
application.getCurrentAppAttempt().getAppAttemptState()
|
|
application.getCurrentAppAttempt().getAppAttemptState()
|
|
);
|
|
);
|
|
@@ -332,6 +356,7 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event = new RMAppFailedAttemptEvent(
|
|
RMAppEvent event = new RMAppFailedAttemptEvent(
|
|
application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
|
|
application.getApplicationId(), RMAppEventType.ATTEMPT_FAILED, "");
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
|
|
RMAppAttempt appAttempt = application.getCurrentAppAttempt();
|
|
Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId());
|
|
Assert.assertEquals(1, appAttempt.getAppAttemptId().getAttemptId());
|
|
assertFailed(application,
|
|
assertFailed(application,
|
|
@@ -353,6 +378,7 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event =
|
|
RMAppEvent event =
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertKilled(application);
|
|
assertKilled(application);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -366,6 +392,7 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event =
|
|
RMAppEvent event =
|
|
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
|
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertFailed(application, rejectedText);
|
|
assertFailed(application, rejectedText);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -379,18 +406,22 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event =
|
|
RMAppEvent event =
|
|
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
|
new RMAppRejectedEvent(application.getApplicationId(), rejectedText);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertFailed(application, rejectedText);
|
|
assertFailed(application, rejectedText);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testAppSubmittedKill() throws IOException {
|
|
|
|
|
|
+ public void testAppSubmittedKill() throws IOException, InterruptedException {
|
|
LOG.info("--- START: testAppSubmittedKill---");
|
|
LOG.info("--- START: testAppSubmittedKill---");
|
|
-
|
|
|
|
- RMApp application = testCreateAppAccepted(null);
|
|
|
|
- // SUBMITTED => KILLED event RMAppEventType.KILL
|
|
|
|
- RMAppEvent event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
|
|
- this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(), application);
|
|
|
|
|
|
+ RMApp application = testCreateAppSubmitted(null);
|
|
|
|
+ // SUBMITTED => KILLED event RMAppEventType.KILL
|
|
|
|
+ RMAppEvent event = new RMAppEvent(application.getApplicationId(),
|
|
|
|
+ RMAppEventType.KILL);
|
|
|
|
+ this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
|
|
|
+ application);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
|
|
+ assertKilled(application);
|
|
assertAppAndAttemptKilled(application);
|
|
assertAppAndAttemptKilled(application);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -410,6 +441,7 @@ public class TestRMAppTransitions {
|
|
new RMAppEvent(application.getApplicationId(),
|
|
new RMAppEvent(application.getApplicationId(),
|
|
RMAppEventType.APP_ACCEPTED);
|
|
RMAppEventType.APP_ACCEPTED);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertAppState(RMAppState.ACCEPTED, application);
|
|
assertAppState(RMAppState.ACCEPTED, application);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -420,19 +452,23 @@ public class TestRMAppTransitions {
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
RMAppEventType.ATTEMPT_FAILED, message);
|
|
RMAppEventType.ATTEMPT_FAILED, message);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertFailed(application, ".*" + message + ".*Failing the application.*");
|
|
assertFailed(application, ".*" + message + ".*Failing the application.*");
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
- public void testAppAcceptedKill() throws IOException {
|
|
|
|
|
|
+ public void testAppAcceptedKill() throws IOException, InterruptedException {
|
|
LOG.info("--- START: testAppAcceptedKill ---");
|
|
LOG.info("--- START: testAppAcceptedKill ---");
|
|
-
|
|
|
|
RMApp application = testCreateAppAccepted(null);
|
|
RMApp application = testCreateAppAccepted(null);
|
|
// ACCEPTED => KILLED event RMAppEventType.KILL
|
|
// ACCEPTED => KILLED event RMAppEventType.KILL
|
|
- RMAppEvent event =
|
|
|
|
- new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
|
|
|
|
+ RMAppEvent event = new RMAppEvent(application.getApplicationId(),
|
|
|
|
+ RMAppEventType.KILL);
|
|
|
|
+ this.rmContext.getRMApps().putIfAbsent(application.getApplicationId(),
|
|
|
|
+ application);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertKilled(application);
|
|
assertKilled(application);
|
|
|
|
+ assertAppAndAttemptKilled(application);
|
|
}
|
|
}
|
|
|
|
|
|
@Test
|
|
@Test
|
|
@@ -444,6 +480,7 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event =
|
|
RMAppEvent event =
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertKilled(application);
|
|
assertKilled(application);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -462,6 +499,7 @@ public class TestRMAppTransitions {
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
RMAppEventType.ATTEMPT_FAILED, "");
|
|
RMAppEventType.ATTEMPT_FAILED, "");
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertAppState(RMAppState.SUBMITTED, application);
|
|
assertAppState(RMAppState.SUBMITTED, application);
|
|
appAttempt = application.getCurrentAppAttempt();
|
|
appAttempt = application.getCurrentAppAttempt();
|
|
Assert.assertEquals(++expectedAttemptId,
|
|
Assert.assertEquals(++expectedAttemptId,
|
|
@@ -470,11 +508,13 @@ public class TestRMAppTransitions {
|
|
new RMAppEvent(application.getApplicationId(),
|
|
new RMAppEvent(application.getApplicationId(),
|
|
RMAppEventType.APP_ACCEPTED);
|
|
RMAppEventType.APP_ACCEPTED);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertAppState(RMAppState.ACCEPTED, application);
|
|
assertAppState(RMAppState.ACCEPTED, application);
|
|
event =
|
|
event =
|
|
new RMAppEvent(application.getApplicationId(),
|
|
new RMAppEvent(application.getApplicationId(),
|
|
RMAppEventType.ATTEMPT_REGISTERED);
|
|
RMAppEventType.ATTEMPT_REGISTERED);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertAppState(RMAppState.RUNNING, application);
|
|
assertAppState(RMAppState.RUNNING, application);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -484,11 +524,13 @@ public class TestRMAppTransitions {
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
RMAppEventType.ATTEMPT_FAILED, "");
|
|
RMAppEventType.ATTEMPT_FAILED, "");
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertFailed(application, ".*Failing the application.*");
|
|
assertFailed(application, ".*Failing the application.*");
|
|
|
|
|
|
// FAILED => FAILED event RMAppEventType.KILL
|
|
// FAILED => FAILED event RMAppEventType.KILL
|
|
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertFailed(application, ".*Failing the application.*");
|
|
assertFailed(application, ".*Failing the application.*");
|
|
}
|
|
}
|
|
|
|
|
|
@@ -501,6 +543,7 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event =
|
|
RMAppEvent event =
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertAppState(RMAppState.FINISHED, application);
|
|
assertAppState(RMAppState.FINISHED, application);
|
|
}
|
|
}
|
|
|
|
|
|
@@ -513,6 +556,7 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event =
|
|
RMAppEvent event =
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertTimesAtFinish(application);
|
|
assertTimesAtFinish(application);
|
|
assertAppState(RMAppState.FINISHED, application);
|
|
assertAppState(RMAppState.FINISHED, application);
|
|
StringBuilder diag = application.getDiagnostics();
|
|
StringBuilder diag = application.getDiagnostics();
|
|
@@ -530,6 +574,7 @@ public class TestRMAppTransitions {
|
|
RMAppEvent event =
|
|
RMAppEvent event =
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertTimesAtFinish(application);
|
|
assertTimesAtFinish(application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
|
|
|
|
@@ -538,6 +583,7 @@ public class TestRMAppTransitions {
|
|
new RMAppEvent(application.getApplicationId(),
|
|
new RMAppEvent(application.getApplicationId(),
|
|
RMAppEventType.ATTEMPT_FINISHED);
|
|
RMAppEventType.ATTEMPT_FINISHED);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertTimesAtFinish(application);
|
|
assertTimesAtFinish(application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
|
|
|
|
@@ -546,6 +592,7 @@ public class TestRMAppTransitions {
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
new RMAppFailedAttemptEvent(application.getApplicationId(),
|
|
RMAppEventType.ATTEMPT_FAILED, "");
|
|
RMAppEventType.ATTEMPT_FAILED, "");
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertTimesAtFinish(application);
|
|
assertTimesAtFinish(application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
|
|
|
|
@@ -554,12 +601,14 @@ public class TestRMAppTransitions {
|
|
new RMAppEvent(application.getApplicationId(),
|
|
new RMAppEvent(application.getApplicationId(),
|
|
RMAppEventType.ATTEMPT_KILLED);
|
|
RMAppEventType.ATTEMPT_KILLED);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertTimesAtFinish(application);
|
|
assertTimesAtFinish(application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
|
|
|
|
// KILLED => KILLED event RMAppEventType.KILL
|
|
// KILLED => KILLED event RMAppEventType.KILL
|
|
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
event = new RMAppEvent(application.getApplicationId(), RMAppEventType.KILL);
|
|
application.handle(event);
|
|
application.handle(event);
|
|
|
|
+ rmDispatcher.await();
|
|
assertTimesAtFinish(application);
|
|
assertTimesAtFinish(application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
assertAppState(RMAppState.KILLED, application);
|
|
}
|
|
}
|