|
@@ -29,7 +29,6 @@ import java.io.File;
|
|
|
import java.io.FileOutputStream;
|
|
|
import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
-import java.net.UnknownHostException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Arrays;
|
|
@@ -108,7 +107,7 @@ import org.junit.Assert;
|
|
|
import org.junit.Before;
|
|
|
import org.junit.Test;
|
|
|
|
|
|
-public class TestRMRestart {
|
|
|
+public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
private final static File TEMP_DIR = new File(System.getProperty(
|
|
|
"test.build.data", "/tmp"), "decommision");
|
|
|
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
|
|
@@ -116,12 +115,17 @@ public class TestRMRestart {
|
|
|
|
|
|
// Fake rmAddr for token-renewal
|
|
|
private static InetSocketAddress rmAddr;
|
|
|
+ private List<MockRM> rms = new ArrayList<MockRM>();
|
|
|
+
|
|
|
+ public TestRMRestart(SchedulerType type) {
|
|
|
+ super(type);
|
|
|
+ }
|
|
|
|
|
|
@Before
|
|
|
- public void setup() throws UnknownHostException {
|
|
|
+ public void setup() throws IOException {
|
|
|
+ conf = getConf();
|
|
|
Logger rootLogger = LogManager.getRootLogger();
|
|
|
rootLogger.setLevel(Level.DEBUG);
|
|
|
- conf = new YarnConfiguration();
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
|
|
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
|
@@ -131,9 +135,24 @@ public class TestRMRestart {
|
|
|
|
|
|
@After
|
|
|
public void tearDown() {
|
|
|
+ for (MockRM rm : rms) {
|
|
|
+ rm.stop();
|
|
|
+ }
|
|
|
+ rms.clear();
|
|
|
+
|
|
|
TEMP_DIR.delete();
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @return a new MockRM that will be stopped at the end of the test.
|
|
|
+ */
|
|
|
+ private MockRM createMockRM(YarnConfiguration conf, RMStateStore store) {
|
|
|
+ MockRM rm = new MockRM(conf, store);
|
|
|
+ rms.add(rm);
|
|
|
+ return rm;
|
|
|
+ }
|
|
|
+
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
@Test (timeout=180000)
|
|
|
public void testRMRestart() throws Exception {
|
|
@@ -150,7 +169,7 @@ public class TestRMRestart {
|
|
|
// PHASE 1: create state in an RM
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
|
|
|
// start like normal because state is empty
|
|
|
rm1.start();
|
|
@@ -246,7 +265,7 @@ public class TestRMRestart {
|
|
|
// PHASE 2: create new RM and start from old state
|
|
|
|
|
|
// create new RM to represent restart and recover state
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
|
|
|
// start new RM
|
|
|
rm2.start();
|
|
@@ -317,7 +336,7 @@ public class TestRMRestart {
|
|
|
NMContainerStatus status =
|
|
|
TestRMRestart
|
|
|
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
|
|
- .getAppAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
nm1.registerNode(Arrays.asList(status), null);
|
|
|
nm2.registerNode();
|
|
|
|
|
@@ -414,7 +433,7 @@ public class TestRMRestart {
|
|
|
rmState.getApplicationState();
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -443,13 +462,11 @@ public class TestRMRestart {
|
|
|
rm1.waitForState(app0.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
|
|
|
// start new RM
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
// assert the previous AM state is loaded back on RM recovery.
|
|
|
|
|
|
rm2.waitForState(am0.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
@@ -473,7 +490,7 @@ public class TestRMRestart {
|
|
|
rmState.getApplicationState();
|
|
|
|
|
|
// start RM
|
|
|
- final MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ final MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
|
|
@@ -497,8 +514,7 @@ public class TestRMRestart {
|
|
|
.getAppAttemptState(), RMAppAttemptState.RUNNING);
|
|
|
|
|
|
// start new RM.
|
|
|
- MockRM rm2 = null;
|
|
|
- rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
@@ -525,7 +541,7 @@ public class TestRMRestart {
|
|
|
|
|
|
NMContainerStatus status =
|
|
|
TestRMRestart.createNMContainerStatus(
|
|
|
- am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
+ am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
nm1.registerNode(Arrays.asList(status), null);
|
|
|
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
|
launchAM(rmApp, rm2, nm1);
|
|
@@ -535,8 +551,7 @@ public class TestRMRestart {
|
|
|
// Now restart RM ...
|
|
|
// Setting AMLivelinessMonitor interval to be 10 Secs.
|
|
|
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
|
|
|
- MockRM rm3 = null;
|
|
|
- rm3 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm3 = createMockRM(conf, memStore);
|
|
|
rm3.start();
|
|
|
|
|
|
// Wait for RM to process all the events as a part of rm recovery.
|
|
@@ -583,8 +598,7 @@ public class TestRMRestart {
|
|
|
memStore.getState().getApplicationState().get(app2.getApplicationId())
|
|
|
.getAttemptCount());
|
|
|
|
|
|
- MockRM rm4 = null;
|
|
|
- rm4 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm4 = createMockRM(conf, memStore);
|
|
|
rm4.start();
|
|
|
|
|
|
rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
|
|
@@ -640,7 +654,7 @@ public class TestRMRestart {
|
|
|
rmState.getApplicationState();
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 = rm1.registerNode("127.0.0.1:1234", 15120);
|
|
|
RMApp app0 = rm1.submitApp(200);
|
|
@@ -657,7 +671,7 @@ public class TestRMRestart {
|
|
|
Assert.assertNull(rmAppState.get(app0.getApplicationId()).getState());
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
rm2.start();
|
|
|
|
|
@@ -666,7 +680,7 @@ public class TestRMRestart {
|
|
|
rm2.waitForState(app0.getApplicationId(), RMAppState.FINISHED);
|
|
|
// app final state is saved via the finish event from attempt.
|
|
|
Assert.assertEquals(RMAppState.FINISHED,
|
|
|
- rmAppState.get(app0.getApplicationId()).getState());
|
|
|
+ rmAppState.get(app0.getApplicationId()).getState());
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
@@ -679,7 +693,7 @@ public class TestRMRestart {
|
|
|
rmState.getApplicationState();
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -701,7 +715,7 @@ public class TestRMRestart {
|
|
|
appState.getAttempt(am0.getApplicationAttemptId()).getState());
|
|
|
|
|
|
// start new RM
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
|
|
rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
|
|
@@ -714,8 +728,6 @@ public class TestRMRestart {
|
|
|
.contains("Failing the application."));
|
|
|
// failed diagnostics from attempt is lost because the diagnostics from
|
|
|
// attempt is not yet available by the time app is saving the app state.
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
@@ -729,7 +741,7 @@ public class TestRMRestart {
|
|
|
rmState.getApplicationState();
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -751,7 +763,7 @@ public class TestRMRestart {
|
|
|
appState.getAttempt(am0.getApplicationAttemptId()).getState());
|
|
|
|
|
|
// restart rm
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
RMApp loadedApp0 = rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
|
|
rm2.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
|
@@ -761,9 +773,7 @@ public class TestRMRestart {
|
|
|
|
|
|
ApplicationReport appReport = verifyAppReportAfterRMRestart(app0, rm2);
|
|
|
Assert.assertEquals(app0.getDiagnostics().toString(),
|
|
|
- appReport.getDiagnostics());
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
+ appReport.getDiagnostics());
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
@@ -786,7 +796,7 @@ public class TestRMRestart {
|
|
|
memStore.init(conf);
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
// create app
|
|
|
RMApp app0 =
|
|
@@ -798,7 +808,7 @@ public class TestRMRestart {
|
|
|
rm1.waitForState(app0.getApplicationId(), RMAppState.KILLED);
|
|
|
|
|
|
// restart rm
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
RMApp loadedApp0 =
|
|
|
rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
|
@@ -817,7 +827,7 @@ public class TestRMRestart {
|
|
|
rmState.getApplicationState();
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -844,7 +854,7 @@ public class TestRMRestart {
|
|
|
Assert.assertEquals(app0.getFinishTime(), appState.getFinishTime());
|
|
|
|
|
|
// restart rm
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
|
|
|
// verify application report returns the same app info as the app info
|
|
@@ -853,9 +863,6 @@ public class TestRMRestart {
|
|
|
Assert.assertEquals(FinalApplicationStatus.SUCCEEDED,
|
|
|
appReport.getFinalApplicationStatus());
|
|
|
Assert.assertEquals("trackingUrl", appReport.getOriginalTrackingUrl());
|
|
|
-
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
@@ -865,7 +872,7 @@ public class TestRMRestart {
|
|
|
memStore.init(conf);
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -902,7 +909,7 @@ public class TestRMRestart {
|
|
|
return spy(super.createRMAppManager());
|
|
|
}
|
|
|
};
|
|
|
-
|
|
|
+ rms.add(rm2);
|
|
|
rm2.start();
|
|
|
|
|
|
GetApplicationsRequest request1 =
|
|
@@ -949,9 +956,6 @@ public class TestRMRestart {
|
|
|
// check application summary is logged for the completed apps after RM restart.
|
|
|
verify(rm2.getRMAppManager(), times(3)).logApplicationSummary(
|
|
|
isA(ApplicationId.class));
|
|
|
-
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
|
|
@@ -1017,7 +1021,7 @@ public class TestRMRestart {
|
|
|
|
|
|
Map<ApplicationId, ApplicationState> rmAppState =
|
|
|
rmState.getApplicationState();
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1055,7 +1059,7 @@ public class TestRMRestart {
|
|
|
// Setting AMLivelinessMonitor interval to be 3 Secs.
|
|
|
conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 3000);
|
|
|
// start new RM
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
|
|
|
// verify that maxAppAttempts is set to global value
|
|
@@ -1074,10 +1078,6 @@ public class TestRMRestart {
|
|
|
Assert.assertEquals(RMAppState.FAILED,
|
|
|
rmAppState.get(app1.getApplicationId()).getState());
|
|
|
Assert.assertNull(rmAppState.get(app2.getApplicationId()).getState());
|
|
|
-
|
|
|
- // stop the RM
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
@@ -1159,10 +1159,6 @@ public class TestRMRestart {
|
|
|
// verify tokens are properly populated back to rm2 DelegationTokenRenewer
|
|
|
Assert.assertEquals(tokenSet, rm2.getRMContext()
|
|
|
.getDelegationTokenRenewer().getDelegationTokens());
|
|
|
-
|
|
|
- // stop the RM
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
private void waitForTokensToBeRenewed(MockRM rm2) throws Exception {
|
|
@@ -1258,8 +1254,6 @@ public class TestRMRestart {
|
|
|
Assert.assertArrayEquals(amrmToken.getPassword(),
|
|
|
rm2.getRMContext().getAMRMTokenSecretManager().retrievePassword(
|
|
|
amrmToken.decodeIdentifier()));
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
@@ -1407,10 +1401,6 @@ public class TestRMRestart {
|
|
|
.getAllTokens();
|
|
|
Assert.assertFalse(allTokensRM2.containsKey(dtId1));
|
|
|
Assert.assertFalse(rmDTState.containsKey(dtId1));
|
|
|
-
|
|
|
- // stop the RM
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
// This is to test submit an application to the new RM with the old delegation
|
|
@@ -1471,7 +1461,7 @@ public class TestRMRestart {
|
|
|
memStore.init(conf);
|
|
|
|
|
|
// start RM
|
|
|
- final MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ final MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
|
|
|
// create apps.
|
|
@@ -1517,7 +1507,7 @@ public class TestRMRestart {
|
|
|
RMState rmState = memStore.getState();
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1528,7 +1518,7 @@ public class TestRMRestart {
|
|
|
MockAM am0 = launchAM(app0, rm1, nm1);
|
|
|
finishApplicationMaster(app0, rm1, nm1, am0);
|
|
|
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
rm2.start();
|
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
|
nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
|
|
@@ -1550,9 +1540,6 @@ public class TestRMRestart {
|
|
|
Assert.assertNull(rm2.getRMContext().getRMApps()
|
|
|
.get(app0.getApplicationId()));
|
|
|
Assert.assertNull(rmAppState.get(app0.getApplicationId()));
|
|
|
-
|
|
|
- rm1.stop();
|
|
|
- rm2.stop();
|
|
|
}
|
|
|
|
|
|
// This is to test RM does not get hang on shutdown.
|
|
@@ -1569,7 +1556,7 @@ public class TestRMRestart {
|
|
|
memStore.init(conf);
|
|
|
MockRM rm1 = null;
|
|
|
try {
|
|
|
- rm1 = new MockRM(conf, memStore);
|
|
|
+ rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
Assert.fail();
|
|
|
} catch (Exception e) {
|
|
@@ -1587,7 +1574,7 @@ public class TestRMRestart {
|
|
|
memStore.init(conf);
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1703,7 +1690,11 @@ public class TestRMRestart {
|
|
|
}
|
|
|
}
|
|
|
};
|
|
|
- rm1.start();
|
|
|
+ try {
|
|
|
+ rm1.start();
|
|
|
+ } finally {
|
|
|
+ rm1.stop();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@SuppressWarnings("resource")
|
|
@@ -1716,7 +1707,7 @@ public class TestRMRestart {
|
|
|
|
|
|
// PHASE 1: create state in an RM
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1754,7 +1745,7 @@ public class TestRMRestart {
|
|
|
|
|
|
// PHASE 2: create new RM and start from old state
|
|
|
// create new RM to represent restart and recover state
|
|
|
- MockRM rm2 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, memStore);
|
|
|
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
|
|
|
resetQueueMetrics(qm2);
|
|
|
assertQueueMetrics(qm2, 0, 0, 0, 0);
|
|
@@ -1770,7 +1761,7 @@ public class TestRMRestart {
|
|
|
NMContainerStatus status =
|
|
|
TestRMRestart
|
|
|
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
|
|
|
- .getAppAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
+ .getAppAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
nm1.registerNode(Arrays.asList(status), null);
|
|
|
|
|
|
while (loadedApp1.getAppAttempts().size() != 2) {
|
|
@@ -1799,10 +1790,6 @@ public class TestRMRestart {
|
|
|
// finish the AMs
|
|
|
finishApplicationMaster(loadedApp1, rm2, nm1, am1);
|
|
|
assertQueueMetrics(qm2, 1, 0, 0, 1);
|
|
|
-
|
|
|
- // stop RM's
|
|
|
- rm2.stop();
|
|
|
- rm1.stop();
|
|
|
}
|
|
|
|
|
|
|
|
@@ -1840,43 +1827,58 @@ public class TestRMRestart {
|
|
|
hostFile.getAbsolutePath());
|
|
|
writeToHostsFile("");
|
|
|
final DrainDispatcher dispatcher = new DrainDispatcher();
|
|
|
- MockRM rm1 = new MockRM(conf) {
|
|
|
- @Override
|
|
|
- protected Dispatcher createDispatcher() {
|
|
|
- return dispatcher;
|
|
|
+ MockRM rm1 = null, rm2 = null;
|
|
|
+ try {
|
|
|
+ rm1 = new MockRM(conf) {
|
|
|
+ @Override
|
|
|
+ protected Dispatcher createDispatcher() {
|
|
|
+ return dispatcher;
|
|
|
+ }
|
|
|
+ };
|
|
|
+ rm1.start();
|
|
|
+ MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
|
|
+ MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
|
|
+ Assert
|
|
|
+ .assertEquals(0,
|
|
|
+ ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
+ String ip = NetUtils.normalizeHostName("localhost");
|
|
|
+ // Add 2 hosts to exclude list.
|
|
|
+ writeToHostsFile("host2", ip);
|
|
|
+
|
|
|
+ // refresh nodes
|
|
|
+ rm1.getNodesListManager().refreshNodes(conf);
|
|
|
+ NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
+ Assert
|
|
|
+ .assertTrue(
|
|
|
+ NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
|
|
+ nodeHeartbeat = nm2.nodeHeartbeat(true);
|
|
|
+ Assert.assertTrue("The decommisioned metrics are not updated",
|
|
|
+ NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
|
|
+
|
|
|
+ dispatcher.await();
|
|
|
+ Assert
|
|
|
+ .assertEquals(2,
|
|
|
+ ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
+ rm1.stop();
|
|
|
+ rm1 = null;
|
|
|
+ Assert
|
|
|
+ .assertEquals(0,
|
|
|
+ ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
+
|
|
|
+ // restart RM.
|
|
|
+ rm2 = new MockRM(conf);
|
|
|
+ rm2.start();
|
|
|
+ Assert
|
|
|
+ .assertEquals(2,
|
|
|
+ ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
+ } finally {
|
|
|
+ if (rm1 != null) {
|
|
|
+ rm1.stop();
|
|
|
}
|
|
|
- };
|
|
|
- rm1.start();
|
|
|
- MockNM nm1 = rm1.registerNode("localhost:1234", 8000);
|
|
|
- MockNM nm2 = rm1.registerNode("host2:1234", 8000);
|
|
|
- Assert
|
|
|
- .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
- String ip = NetUtils.normalizeHostName("localhost");
|
|
|
- // Add 2 hosts to exclude list.
|
|
|
- writeToHostsFile("host2", ip);
|
|
|
-
|
|
|
- // refresh nodes
|
|
|
- rm1.getNodesListManager().refreshNodes(conf);
|
|
|
- NodeHeartbeatResponse nodeHeartbeat = nm1.nodeHeartbeat(true);
|
|
|
- Assert
|
|
|
- .assertTrue(NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
|
|
- nodeHeartbeat = nm2.nodeHeartbeat(true);
|
|
|
- Assert.assertTrue("The decommisioned metrics are not updated",
|
|
|
- NodeAction.SHUTDOWN.equals(nodeHeartbeat.getNodeAction()));
|
|
|
-
|
|
|
- dispatcher.await();
|
|
|
- Assert
|
|
|
- .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
- rm1.stop();
|
|
|
- Assert
|
|
|
- .assertEquals(0, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
-
|
|
|
- // restart RM.
|
|
|
- MockRM rm2 = new MockRM(conf);
|
|
|
- rm2.start();
|
|
|
- Assert
|
|
|
- .assertEquals(2, ClusterMetrics.getMetrics().getNumDecommisionedNMs());
|
|
|
- rm2.stop();
|
|
|
+ if (rm2 != null) {
|
|
|
+ rm2.stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Test Delegation token is renewed synchronously so that recover events
|
|
@@ -1891,7 +1893,7 @@ public class TestRMRestart {
|
|
|
memStore.init(conf);
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
final MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1914,24 +1916,29 @@ public class TestRMRestart {
|
|
|
nm1.setResourceTrackerService(getResourceTrackerService());
|
|
|
NMContainerStatus status =
|
|
|
TestRMRestart.createNMContainerStatus(
|
|
|
- am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
+ am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
|
nm1.registerNode(Arrays.asList(status), null);
|
|
|
}
|
|
|
};
|
|
|
}
|
|
|
};
|
|
|
- // Re-start RM
|
|
|
- rm2.start();
|
|
|
|
|
|
- // wait for the 2nd attempt to be started.
|
|
|
- RMApp loadedApp0 =
|
|
|
- rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
|
|
- int timeoutSecs = 0;
|
|
|
- while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
|
|
|
- Thread.sleep(200);
|
|
|
+ try {
|
|
|
+ // Re-start RM
|
|
|
+ rm2.start();
|
|
|
+
|
|
|
+ // wait for the 2nd attempt to be started.
|
|
|
+ RMApp loadedApp0 =
|
|
|
+ rm2.getRMContext().getRMApps().get(app0.getApplicationId());
|
|
|
+ int timeoutSecs = 0;
|
|
|
+ while (loadedApp0.getAppAttempts().size() != 2 && timeoutSecs++ < 40) {
|
|
|
+ Thread.sleep(200);
|
|
|
+ }
|
|
|
+ MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
|
|
|
+ MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
|
|
|
+ } finally {
|
|
|
+ rm2.stop();
|
|
|
}
|
|
|
- MockAM am1 = MockRM.launchAndRegisterAM(loadedApp0, rm2, nm1);
|
|
|
- MockRM.finishAMAndVerifyAppState(loadedApp0, rm2, nm1, am1);
|
|
|
}
|
|
|
|
|
|
private void writeToHostsFile(String... hosts) throws IOException {
|