|
@@ -1384,7 +1384,7 @@ public class TestRMContainerAllocator {
|
|
|
static final List<JobUpdatedNodesEvent> jobUpdatedNodeEvents
|
|
|
= new ArrayList<JobUpdatedNodesEvent>();
|
|
|
private MyResourceManager rm;
|
|
|
-
|
|
|
+ private boolean isUnregistered = false;
|
|
|
private static AppContext createAppContext(
|
|
|
ApplicationAttemptId appAttemptId, Job job) {
|
|
|
AppContext context = mock(AppContext.class);
|
|
@@ -1474,6 +1474,7 @@ public class TestRMContainerAllocator {
|
|
|
|
|
|
@Override
|
|
|
protected void unregister() {
|
|
|
+ isUnregistered=true;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1523,6 +1524,15 @@ public class TestRMContainerAllocator {
|
|
|
protected void startAllocatorThread() {
|
|
|
// override to NOT start thread
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ protected boolean isApplicationMasterRegistered() {
|
|
|
+ return super.isApplicationMasterRegistered();
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean isUnregistered() {
|
|
|
+ return isUnregistered;
|
|
|
+ }
|
|
|
|
|
|
}
|
|
|
|
|
@@ -1770,6 +1780,51 @@ public class TestRMContainerAllocator {
|
|
|
Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType());
|
|
|
}
|
|
|
|
|
|
+ @Test
|
|
|
+ public void testUnregistrationOnlyIfRegistered() throws Exception {
|
|
|
+ Configuration conf = new Configuration();
|
|
|
+ final MyResourceManager rm = new MyResourceManager(conf);
|
|
|
+ rm.start();
|
|
|
+ DrainDispatcher rmDispatcher =
|
|
|
+ (DrainDispatcher) rm.getRMContext().getDispatcher();
|
|
|
+
|
|
|
+ // Submit the application
|
|
|
+ RMApp rmApp = rm.submitApp(1024);
|
|
|
+ rmDispatcher.await();
|
|
|
+
|
|
|
+ MockNM amNodeManager = rm.registerNode("127.0.0.1:1234", 11264);
|
|
|
+ amNodeManager.nodeHeartbeat(true);
|
|
|
+ rmDispatcher.await();
|
|
|
+
|
|
|
+ final ApplicationAttemptId appAttemptId =
|
|
|
+ rmApp.getCurrentAppAttempt().getAppAttemptId();
|
|
|
+ rm.sendAMLaunched(appAttemptId);
|
|
|
+ rmDispatcher.await();
|
|
|
+
|
|
|
+ MRApp mrApp =
|
|
|
+ new MRApp(appAttemptId, ContainerId.newInstance(appAttemptId, 0), 10,
|
|
|
+ 0, false, this.getClass().getName(), true, 1) {
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return new DrainDispatcher();
|
|
|
+ }
|
|
|
+
|
|
|
+ protected ContainerAllocator createContainerAllocator(
|
|
|
+ ClientService clientService, AppContext context) {
|
|
|
+ return new MyContainerAllocator(rm, appAttemptId, context);
|
|
|
+ };
|
|
|
+ };
|
|
|
+
|
|
|
+ mrApp.submit(conf);
|
|
|
+ DrainDispatcher amDispatcher = (DrainDispatcher) mrApp.getDispatcher();
|
|
|
+ MyContainerAllocator allocator =
|
|
|
+ (MyContainerAllocator) mrApp.getContainerAllocator();
|
|
|
+ amDispatcher.await();
|
|
|
+ Assert.assertTrue(allocator.isApplicationMasterRegistered());
|
|
|
+ mrApp.stop();
|
|
|
+ Assert.assertTrue(allocator.isUnregistered());
|
|
|
+ }
|
|
|
+
|
|
|
public static void main(String[] args) throws Exception {
|
|
|
TestRMContainerAllocator t = new TestRMContainerAllocator();
|
|
|
t.testSimple();
|