|
@@ -173,24 +173,23 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
return rm;
|
|
|
}
|
|
|
|
|
|
- @SuppressWarnings("rawtypes")
|
|
|
+ private MockRM createMockRM(YarnConfiguration config) {
|
|
|
+ MockRM rm = new MockRM(config);
|
|
|
+ rms.add(rm);
|
|
|
+ return rm;
|
|
|
+ }
|
|
|
+
|
|
|
@Test (timeout=180000)
|
|
|
public void testRMRestart() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
+ // PHASE 1: create RM and get state
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
-
|
|
|
-
|
|
|
- // PHASE 1: create state in an RM
|
|
|
-
|
|
|
- // start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
-
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
+
|
|
|
// start like normal because state is empty
|
|
|
rm1.start();
|
|
|
|
|
@@ -446,14 +445,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
public void testRMRestartAppRunningAMFailed() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
- Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
|
|
|
- // start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
+ // Create RM
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
+ Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -503,14 +500,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
// be started immediately.
|
|
|
YarnConfiguration conf = new YarnConfiguration(this.conf);
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
+
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
-
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
// start RM
|
|
|
- final MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
AbstractYarnScheduler ys =
|
|
|
(AbstractYarnScheduler)rm1.getResourceScheduler();
|
|
@@ -669,7 +665,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
@Test (timeout = 60000)
|
|
|
public void testRMRestartWaitForPreviousSucceededAttempt() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
|
|
+ MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
|
|
int count = 0;
|
|
|
|
|
|
@Override
|
|
@@ -722,14 +718,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
@Test (timeout = 60000)
|
|
|
public void testRMRestartFailedApp() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
-
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -770,14 +764,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
public void testRMRestartKilledApp() throws Exception{
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
-
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -818,7 +810,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
|
public void testRMRestartKilledAppWithNoAttempts() throws Exception {
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
|
|
+ MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
|
|
@Override
|
|
|
public synchronized void storeApplicationAttemptStateInternal(
|
|
|
ApplicationAttemptId attemptId,
|
|
@@ -860,14 +852,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
public void testRMRestartSucceededApp() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
+ // PHASE 1: create RM and get state
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
|
|
|
- // start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
+ // start like normal because state is empty
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -908,11 +899,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
@Test (timeout = 60000)
|
|
|
public void testRMRestartGetApplicationList() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
-
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore) {
|
|
|
+ MockRM rm1 = new MockRM(conf) {
|
|
|
@Override
|
|
|
protected SystemMetricsPublisher createSystemMetricsPublisher() {
|
|
|
return spy(super.createSystemMetricsPublisher());
|
|
@@ -951,7 +939,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
.appCreated(any(RMApp.class), anyLong());
|
|
|
// restart rm
|
|
|
|
|
|
- MockRM rm2 = new MockRM(conf, memStore) {
|
|
|
+ MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
|
|
@Override
|
|
|
protected RMAppManager createRMAppManager() {
|
|
|
return spy(super.createRMAppManager());
|
|
@@ -1072,13 +1060,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
-
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
+ // start RM
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1145,13 +1132,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
"kerberos");
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
-
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = new TestSecurityMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
- MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
+ // start RM
|
|
|
rm1.start();
|
|
|
|
|
|
HashSet<Token<RMDelegationTokenIdentifier>> tokenSet =
|
|
@@ -1240,13 +1226,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
"kerberos");
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
-
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = new TestSecurityMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
- MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
+ // start RM
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("0.0.0.0:4321", 15120, rm1.getResourceTrackerService());
|
|
@@ -1321,8 +1306,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
"kerberos");
|
|
|
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
+
|
|
|
+ MockRM rm1 = new TestSecurityMockRM(conf);
|
|
|
+ rm1.start();
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
RMState rmState = memStore.getState();
|
|
|
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
@@ -1332,10 +1319,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
Set<DelegationKey> rmDTMasterKeyState =
|
|
|
rmState.getRMDTSecretManagerState().getMasterKeyState();
|
|
|
|
|
|
- MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
|
|
-
|
|
|
- rm1.start();
|
|
|
-
|
|
|
// create an empty credential
|
|
|
Credentials ts = new Credentials();
|
|
|
|
|
@@ -1470,10 +1453,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
"kerberos");
|
|
|
conf.set(YarnConfiguration.RM_ADDRESS, "localhost:8032");
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
|
|
|
- MockRM rm1 = new TestSecurityMockRM(conf, memStore);
|
|
|
+ MockRM rm1 = new TestSecurityMockRM(conf);
|
|
|
rm1.start();
|
|
|
|
|
|
GetDelegationTokenRequest request1 =
|
|
@@ -1486,7 +1467,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
ConverterUtils.convertFromYarn(response1.getRMDelegationToken(), rmAddr);
|
|
|
|
|
|
// start new RM
|
|
|
- MockRM rm2 = new TestSecurityMockRM(conf, memStore);
|
|
|
+ MockRM rm2 = new TestSecurityMockRM(conf, rm1.getRMStateStore());
|
|
|
rm2.start();
|
|
|
|
|
|
// submit an app with the old delegation token got from previous RM.
|
|
@@ -1564,14 +1545,13 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
|
public void testFinishedAppRemovalAfterRMRestart() throws Exception {
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS, 1);
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
rm1.start();
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
+ RMState rmState = memStore.getState();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
|
nm1.registerNode();
|
|
@@ -1609,7 +1589,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
// This is to test RM does not get hang on shutdown.
|
|
|
@Test (timeout = 10000)
|
|
|
public void testRMShutdown() throws Exception {
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore() {
|
|
|
+ MemoryRMStateStore memStore = new MockRMMemoryStateStore() {
|
|
|
@Override
|
|
|
public synchronized void checkVersion()
|
|
|
throws Exception {
|
|
@@ -1675,10 +1655,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
"kerberos");
|
|
|
UserGroupInformation.setConfiguration(conf);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
-
|
|
|
- MockRM rm1 = new TestSecurityMockRM(conf, memStore) {
|
|
|
+ MockRM rm1 = new TestSecurityMockRM(conf) {
|
|
|
class TestDelegationTokenRenewer extends DelegationTokenRenewer {
|
|
|
public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
|
|
|
boolean shouldCancelAtEnd, String user, Configuration appConf) {
|
|
@@ -1691,6 +1668,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
}
|
|
|
};
|
|
|
rm1.start();
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
RMApp app1 = null;
|
|
|
try {
|
|
|
app1 = rm1.submitApp(200, "name", "user",
|
|
@@ -1714,7 +1692,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
@Test (timeout = 20000)
|
|
|
public void testAppRecoveredInOrderOnRMRestart() throws Exception {
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
+ MemoryRMStateStore memStore = new MockRMMemoryStateStore();
|
|
|
memStore.init(conf);
|
|
|
|
|
|
for (int i = 10; i > 0; i--) {
|
|
@@ -1769,12 +1747,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
public void testQueueMetricsOnRMRestart() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
-
|
|
|
- // PHASE 1: create state in an RM
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1812,7 +1786,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// PHASE 2: create new RM and start from old state
|
|
|
// create new RM to represent restart and recover state
|
|
|
- MockRM rm2 = createMockRM(conf, memStore);
|
|
|
+ MockRM rm2 = createMockRM(conf, rm1.getRMStateStore());
|
|
|
QueueMetrics qm2 = rm2.getResourceScheduler().getRootQueueMetrics();
|
|
|
resetQueueMetrics(qm2);
|
|
|
assertQueueMetrics(qm2, 0, 0, 0, 0);
|
|
@@ -1893,7 +1867,6 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
@Test (timeout = 60000)
|
|
|
public void testDecomissionedNMsMetricsOnRMRestart() throws Exception {
|
|
|
- YarnConfiguration conf = new YarnConfiguration();
|
|
|
conf.set(YarnConfiguration.RM_NODES_EXCLUDE_FILE_PATH,
|
|
|
hostFile.getAbsolutePath());
|
|
|
writeToHostsFile("");
|
|
@@ -1972,11 +1945,9 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
|
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
|
|
|
"kerberos");
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
rm1.start();
|
|
|
final MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -1984,7 +1955,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
RMApp app0 = rm1.submitApp(200);
|
|
|
final MockAM am0 = MockRM.launchAndRegisterAM(app0, rm1, nm1);
|
|
|
|
|
|
- MockRM rm2 = new MockRM(conf, memStore) {
|
|
|
+ MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
|
|
@Override
|
|
|
protected ResourceTrackerService createResourceTrackerService() {
|
|
|
return new ResourceTrackerService(this.rmContext,
|
|
@@ -2091,6 +2062,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
super(conf, store);
|
|
|
}
|
|
|
|
|
|
+ public TestSecurityMockRM(Configuration conf) {
|
|
|
+ super(conf);
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void init(Configuration conf) {
|
|
|
// reset localServiceAddress.
|
|
@@ -2141,10 +2116,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
|
|
nodeLabelFsStoreDirURI);
|
|
|
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
|
|
- MockRM rm1 = new MockRM(conf, memStore) {
|
|
|
+ MockRM rm1 = new MockRM(conf) {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
|
@@ -2194,7 +2167,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
Assert.assertEquals(1, nodeLabelManager.getNodeLabels().size());
|
|
|
Assert.assertTrue(nodeLabels.get(n1).equals(toSet("y")));
|
|
|
|
|
|
- MockRM rm2 = new MockRM(conf, memStore) {
|
|
|
+ MockRM rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
|
@@ -2223,14 +2196,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
int maxAttempt =
|
|
|
conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
- RMState rmState = memStore.getState();
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
Map<ApplicationId, ApplicationStateData> rmAppState =
|
|
|
- rmState.getApplicationState();
|
|
|
-
|
|
|
+ memStore.getState().getApplicationState();
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -2298,10 +2269,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
conf.set(YarnConfiguration.FS_NODE_LABELS_STORE_ROOT_DIR,
|
|
|
nodeLabelFsStoreDirURI);
|
|
|
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
|
|
- MockRM rm1 = new MockRM(conf, memStore) {
|
|
|
+ MockRM rm1 = new MockRM(conf) {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
|
@@ -2329,7 +2298,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
nodeLabelManager.replaceLabelsOnNode(ImmutableMap.of(n1, toSet("x")));
|
|
|
MockRM rm2 = null;
|
|
|
for (int i = 0; i < 2; i++) {
|
|
|
- rm2 = new MockRM(conf, memStore) {
|
|
|
+ rm2 = new MockRM(conf, rm1.getRMStateStore()) {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
|
@@ -2352,15 +2321,12 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
@Test(timeout = 120000)
|
|
|
public void testRMRestartAfterPreemption() throws Exception {
|
|
|
- Configuration conf = new Configuration();
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
|
if (!getSchedulerType().equals(SchedulerType.CAPACITY)) {
|
|
|
return;
|
|
|
}
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
// start RM
|
|
|
- MockRM rm1 = new MockRM(conf, memStore);
|
|
|
+ MockRM rm1 = new MockRM(conf);
|
|
|
rm1.start();
|
|
|
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
|
|
|
|
|
@@ -2399,7 +2365,7 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
MockRM rm2 = null;
|
|
|
// start RM2
|
|
|
try {
|
|
|
- rm2 = new MockRM(conf, memStore);
|
|
|
+ rm2 = new MockRM(conf, rm1.getRMStateStore());
|
|
|
rm2.start();
|
|
|
Assert.assertTrue("RM start successfully", true);
|
|
|
} catch (Exception e) {
|
|
@@ -2413,11 +2379,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
@Test(timeout = 60000)
|
|
|
public void testRMRestartOnMissingAttempts() throws Exception {
|
|
|
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 5);
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
-
|
|
|
+ // create RM
|
|
|
+ MockRM rm1 = createMockRM(conf);
|
|
|
+ MemoryRMStateStore memStore = (MemoryRMStateStore) rm1.getRMStateStore();
|
|
|
// start RM
|
|
|
- MockRM rm1 = createMockRM(conf, memStore);
|
|
|
rm1.start();
|
|
|
MockNM nm1 =
|
|
|
new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
|
|
@@ -2473,13 +2438,10 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
@Test(timeout = 60000)
|
|
|
public void testRMRestartAfterNodeLabelDisabled() throws Exception {
|
|
|
- MemoryRMStateStore memStore = new MemoryRMStateStore();
|
|
|
- memStore.init(conf);
|
|
|
-
|
|
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
|
|
|
|
|
|
MockRM rm1 = new MockRM(
|
|
|
- TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
|
|
|
+ TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|
|
@@ -2513,7 +2475,8 @@ public class TestRMRestart extends ParameterizedSchedulerTestBase {
|
|
|
// restart rm with node label disabled
|
|
|
conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, false);
|
|
|
MockRM rm2 = new MockRM(
|
|
|
- TestUtils.getConfigurationWithDefaultQueueLabels(conf), memStore) {
|
|
|
+ TestUtils.getConfigurationWithDefaultQueueLabels(conf),
|
|
|
+ rm1.getRMStateStore()) {
|
|
|
@Override
|
|
|
protected RMNodeLabelsManager createNodeLabelManager() {
|
|
|
RMNodeLabelsManager mgr = new RMNodeLabelsManager();
|