|
@@ -18,6 +18,13 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
|
package org.apache.hadoop.yarn.server.resourcemanager.applicationsmanager;
|
|
|
|
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertEquals;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertFalse;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNotNull;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertNull;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.assertTrue;
|
|
|
|
+import static org.junit.jupiter.api.Assertions.fail;
|
|
|
|
+
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.Collection;
|
|
import java.util.Collection;
|
|
@@ -72,20 +79,25 @@ import org.apache.hadoop.yarn.util.Records;
|
|
|
|
|
|
import org.apache.hadoop.thirdparty.com.google.common.base.Throwables;
|
|
import org.apache.hadoop.thirdparty.com.google.common.base.Throwables;
|
|
|
|
|
|
-import org.junit.Assert;
|
|
|
|
-import org.junit.Test;
|
|
|
|
|
|
+import org.junit.jupiter.api.Timeout;
|
|
|
|
+import org.junit.jupiter.params.ParameterizedTest;
|
|
|
|
+import org.junit.jupiter.params.provider.MethodSource;
|
|
|
|
|
|
/**
|
|
/**
|
|
* Test AM restart functions.
|
|
* Test AM restart functions.
|
|
*/
|
|
*/
|
|
public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
- public TestAMRestart(SchedulerType type) throws IOException {
|
|
|
|
- super(type);
|
|
|
|
|
|
+ public void initTestAMRestart(SchedulerType type) throws IOException {
|
|
|
|
+ initParameterizedSchedulerTestBase(type);
|
|
}
|
|
}
|
|
|
|
|
|
- @Test(timeout = 30000)
|
|
|
|
- public void testAMRestartWithExistingContainers() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 30)
|
|
|
|
+ @SuppressWarnings("checkstyle:methodlength")
|
|
|
|
+ public void testAMRestartWithExistingContainers(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
|
|
|
|
MockRM rm1 = new MockRM(getConf());
|
|
MockRM rm1 = new MockRM(getConf());
|
|
@@ -158,7 +170,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
Thread.sleep(200);
|
|
Thread.sleep(200);
|
|
}
|
|
}
|
|
// assert containerId6 is reserved.
|
|
// assert containerId6 is reserved.
|
|
- Assert.assertEquals(containerId6, schedulerAttempt.getReservedContainers()
|
|
|
|
|
|
+ assertEquals(containerId6, schedulerAttempt.getReservedContainers()
|
|
.get(0).getContainerId());
|
|
.get(0).getContainerId());
|
|
|
|
|
|
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
|
// fail the AM by sending CONTAINER_FINISHED event without registering.
|
|
@@ -170,15 +182,15 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
Thread.sleep(3000);
|
|
Thread.sleep(3000);
|
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
|
rm1.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
|
// acquired/allocated containers are cleaned up.
|
|
// acquired/allocated containers are cleaned up.
|
|
- Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId4));
|
|
|
|
- Assert.assertNull(rm1.getResourceScheduler().getRMContainer(containerId5));
|
|
|
|
|
|
+ assertNull(rm1.getResourceScheduler().getRMContainer(containerId4));
|
|
|
|
+ assertNull(rm1.getResourceScheduler().getRMContainer(containerId5));
|
|
|
|
|
|
// wait for app to start a new attempt.
|
|
// wait for app to start a new attempt.
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
// assert this is a new AM.
|
|
// assert this is a new AM.
|
|
ApplicationAttemptId newAttemptId =
|
|
ApplicationAttemptId newAttemptId =
|
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
|
- Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
|
|
|
|
|
|
+ assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
|
|
|
|
|
|
// launch the new AM
|
|
// launch the new AM
|
|
MockAM am2 = rm1.launchAM(app1, rm1, nm1);
|
|
MockAM am2 = rm1.launchAM(app1, rm1, nm1);
|
|
@@ -187,7 +199,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
|
|
|
|
// Assert two containers are running: container2 and container3;
|
|
// Assert two containers are running: container2 and container3;
|
|
- Assert.assertEquals(2, registerResponse.getContainersFromPreviousAttempts()
|
|
|
|
|
|
+ assertEquals(2, registerResponse.getContainersFromPreviousAttempts()
|
|
.size());
|
|
.size());
|
|
boolean containerId2Exists = false, containerId3Exists = false;
|
|
boolean containerId2Exists = false, containerId3Exists = false;
|
|
for (Container container : registerResponse
|
|
for (Container container : registerResponse
|
|
@@ -199,7 +211,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
containerId3Exists = true;
|
|
containerId3Exists = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- Assert.assertTrue(containerId2Exists && containerId3Exists);
|
|
|
|
|
|
+ assertTrue(containerId2Exists && containerId3Exists);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
|
|
|
|
// complete container by sending the container complete event which has earlier
|
|
// complete container by sending the container complete event which has earlier
|
|
@@ -236,7 +248,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
container6Exists = true;
|
|
container6Exists = true;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- Assert.assertTrue(container3Exists && container4Exists && container5Exists
|
|
|
|
|
|
+ assertTrue(container3Exists && container4Exists && container5Exists
|
|
&& container6Exists);
|
|
&& container6Exists);
|
|
|
|
|
|
// New SchedulerApplicationAttempt also has the containers info.
|
|
// New SchedulerApplicationAttempt also has the containers info.
|
|
@@ -251,7 +263,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// the 2nd attempt released the 1st attempt's running container, when the
|
|
// the 2nd attempt released the 1st attempt's running container, when the
|
|
// 2nd attempt finishes.
|
|
// 2nd attempt finishes.
|
|
- Assert.assertFalse(schedulerNewAttempt.getLiveContainers().contains(
|
|
|
|
|
|
+ assertFalse(schedulerNewAttempt.getLiveContainers().contains(
|
|
containerId2));
|
|
containerId2));
|
|
// all 4 normal containers finished.
|
|
// all 4 normal containers finished.
|
|
System.out.println("New attempt's just finished containers: "
|
|
System.out.println("New attempt's just finished containers: "
|
|
@@ -278,8 +290,8 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
Thread.sleep(200);
|
|
Thread.sleep(200);
|
|
}
|
|
}
|
|
|
|
|
|
- Assert.assertEquals("Did not get all containers allocated",
|
|
|
|
- NUM_CONTAINERS, containers.size());
|
|
|
|
|
|
+ assertEquals(NUM_CONTAINERS, containers.size(),
|
|
|
|
+ "Did not get all containers allocated");
|
|
return containers;
|
|
return containers;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -293,8 +305,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- @Test(timeout = 30000)
|
|
|
|
- public void testNMTokensRebindOnAMRestart() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 30)
|
|
|
|
+ public void testNMTokensRebindOnAMRestart(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 3);
|
|
// To prevent test from blacklisting nm1 for AM, we sit threshold to half
|
|
// To prevent test from blacklisting nm1 for AM, we sit threshold to half
|
|
// of 2 nodes which is 1
|
|
// of 2 nodes which is 1
|
|
@@ -369,10 +384,10 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
|
|
|
|
|
// check am2 get the nm token from am1.
|
|
// check am2 get the nm token from am1.
|
|
- Assert.assertEquals(expectedNMTokens.size(),
|
|
|
|
|
|
+ assertEquals(expectedNMTokens.size(),
|
|
registerResponse.getNMTokensFromPreviousAttempts().size());
|
|
registerResponse.getNMTokensFromPreviousAttempts().size());
|
|
for (int i = 0; i < expectedNMTokens.size(); i++) {
|
|
for (int i = 0; i < expectedNMTokens.size(); i++) {
|
|
- Assert.assertTrue(expectedNMTokens.get(i)
|
|
|
|
|
|
+ assertTrue(expectedNMTokens.get(i)
|
|
.equals(registerResponse.getNMTokensFromPreviousAttempts().get(i)));
|
|
.equals(registerResponse.getNMTokensFromPreviousAttempts().get(i)));
|
|
}
|
|
}
|
|
|
|
|
|
@@ -408,8 +423,8 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// check am3 get the NM token from both am1 and am2;
|
|
// check am3 get the NM token from both am1 and am2;
|
|
List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
|
|
List<NMToken> transferredTokens = registerResponse.getNMTokensFromPreviousAttempts();
|
|
- Assert.assertEquals(2, transferredTokens.size());
|
|
|
|
- Assert.assertTrue(transferredTokens.containsAll(expectedNMTokens));
|
|
|
|
|
|
+ assertEquals(2, transferredTokens.size());
|
|
|
|
+ assertTrue(transferredTokens.containsAll(expectedNMTokens));
|
|
rm1.stop();
|
|
rm1.stop();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -417,8 +432,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
* AM container preempted, nm disk failure
|
|
* AM container preempted, nm disk failure
|
|
* should not be counted towards AM max retry count.
|
|
* should not be counted towards AM max retry count.
|
|
*/
|
|
*/
|
|
- @Test(timeout = 100000)
|
|
|
|
- public void testShouldNotCountFailureToMaxAttemptRetry() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 100)
|
|
|
|
+ public void testShouldNotCountFailureToMaxAttemptRetry(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().set(
|
|
getConf().set(
|
|
@@ -442,7 +460,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am1.getApplicationAttemptId());
|
|
am1.getApplicationAttemptId());
|
|
|
|
|
|
- Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
ApplicationStateData appState =
|
|
ApplicationStateData appState =
|
|
((MemoryRMStateStore) rm1.getRMStateStore()).getState()
|
|
((MemoryRMStateStore) rm1.getRMStateStore()).getState()
|
|
@@ -462,7 +480,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am2.getApplicationAttemptId());
|
|
am2.getApplicationAttemptId());
|
|
|
|
|
|
- Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
MockAM am3 =
|
|
MockAM am3 =
|
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
|
|
rm1.waitForNewAMToLaunchAndRegister(app1.getApplicationId(), 3, nm1);
|
|
@@ -484,8 +502,8 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am3.getApplicationAttemptId());
|
|
am3.getApplicationAttemptId());
|
|
|
|
|
|
- Assert.assertFalse(attempt3.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
- Assert.assertEquals(ContainerExitStatus.DISKS_FAILED,
|
|
|
|
|
|
+ assertFalse(attempt3.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
+ assertEquals(ContainerExitStatus.DISKS_FAILED,
|
|
appState.getAttempt(am3.getApplicationAttemptId())
|
|
appState.getAttempt(am3.getApplicationAttemptId())
|
|
.getAMContainerExitStatus());
|
|
.getAMContainerExitStatus());
|
|
|
|
|
|
@@ -505,8 +523,8 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am4.getApplicationAttemptId());
|
|
am4.getApplicationAttemptId());
|
|
|
|
|
|
- Assert.assertFalse(attempt4.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
- Assert.assertEquals(ContainerExitStatus.ABORTED,
|
|
|
|
|
|
+ assertFalse(attempt4.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
+ assertEquals(ContainerExitStatus.ABORTED,
|
|
appState.getAttempt(am4.getApplicationAttemptId())
|
|
appState.getAttempt(am4.getApplicationAttemptId())
|
|
.getAMContainerExitStatus());
|
|
.getAMContainerExitStatus());
|
|
// launch next AM in nm2
|
|
// launch next AM in nm2
|
|
@@ -520,7 +538,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am5.getApplicationAttemptId());
|
|
am5.getApplicationAttemptId());
|
|
|
|
|
|
- Assert.assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertTrue(attempt5.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
// launch next AM in nm2
|
|
// launch next AM in nm2
|
|
MockAM am6 =
|
|
MockAM am6 =
|
|
@@ -534,16 +552,19 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am6.getApplicationAttemptId());
|
|
am6.getApplicationAttemptId());
|
|
|
|
|
|
- Assert.assertTrue(attempt6.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertTrue(attempt6.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
// AM should not be restarted.
|
|
// AM should not be restarted.
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
- Assert.assertEquals(6, app1.getAppAttempts().size());
|
|
|
|
|
|
+ assertEquals(6, app1.getAppAttempts().size());
|
|
rm1.stop();
|
|
rm1.stop();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test(timeout = 100000)
|
|
|
|
- public void testMaxAttemptOneMeansOne() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 100)
|
|
|
|
+ public void testMaxAttemptOneMeansOne(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().set(
|
|
getConf().set(
|
|
@@ -569,7 +590,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// AM should not be restarted.
|
|
// AM should not be restarted.
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
- Assert.assertEquals(1, app1.getAppAttempts().size());
|
|
|
|
|
|
+ assertEquals(1, app1.getAppAttempts().size());
|
|
rm1.stop();
|
|
rm1.stop();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -578,8 +599,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
* AM preemption failure towards the max-retry-account and should be able to
|
|
* AM preemption failure towards the max-retry-account and should be able to
|
|
* re-launch the AM.
|
|
* re-launch the AM.
|
|
*/
|
|
*/
|
|
- @Test(timeout = 60000)
|
|
|
|
- public void testPreemptedAMRestartOnRMRestart() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 60)
|
|
|
|
+ public void testPreemptedAMRestartOnRMRestart(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(
|
|
getConf().setBoolean(
|
|
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
|
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
|
@@ -608,7 +632,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am1.getApplicationAttemptId());
|
|
am1.getApplicationAttemptId());
|
|
- Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
// wait for the next AM to start
|
|
// wait for the next AM to start
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
@@ -621,21 +645,21 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
|
scheduler.killContainer(scheduler.getRMContainer(amContainer));
|
|
|
|
|
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
rm1.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
- Assert.assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertFalse(attempt2.shouldCountTowardsMaxAttemptRetry());
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
|
|
|
// state store has 2 attempts stored.
|
|
// state store has 2 attempts stored.
|
|
ApplicationStateData appState =
|
|
ApplicationStateData appState =
|
|
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
|
memStore.getState().getApplicationState().get(app1.getApplicationId());
|
|
- Assert.assertEquals(2, appState.getAttemptCount());
|
|
|
|
|
|
+ assertEquals(2, appState.getAttemptCount());
|
|
if (getSchedulerType().equals(SchedulerType.FAIR)) {
|
|
if (getSchedulerType().equals(SchedulerType.FAIR)) {
|
|
// attempt stored has the preempted container exit status.
|
|
// attempt stored has the preempted container exit status.
|
|
- Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
|
|
|
|
|
+ assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
|
appState.getAttempt(am2.getApplicationAttemptId())
|
|
appState.getAttempt(am2.getApplicationAttemptId())
|
|
.getAMContainerExitStatus());
|
|
.getAMContainerExitStatus());
|
|
} else {
|
|
} else {
|
|
// attempt stored has the preempted container exit status.
|
|
// attempt stored has the preempted container exit status.
|
|
- Assert.assertEquals(ContainerExitStatus.PREEMPTED,
|
|
|
|
|
|
+ assertEquals(ContainerExitStatus.PREEMPTED,
|
|
appState.getAttempt(am2.getApplicationAttemptId())
|
|
appState.getAttempt(am2.getApplicationAttemptId())
|
|
.getAMContainerExitStatus());
|
|
.getAMContainerExitStatus());
|
|
}
|
|
}
|
|
@@ -652,8 +676,8 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
RMAppAttempt attempt3 =
|
|
RMAppAttempt attempt3 =
|
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
|
.getCurrentAppAttempt();
|
|
.getCurrentAppAttempt();
|
|
- Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
- Assert.assertEquals(ContainerExitStatus.INVALID,
|
|
|
|
|
|
+ assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
+ assertEquals(ContainerExitStatus.INVALID,
|
|
appState.getAttempt(am3.getApplicationAttemptId())
|
|
appState.getAttempt(am3.getApplicationAttemptId())
|
|
.getAMContainerExitStatus());
|
|
.getAMContainerExitStatus());
|
|
rm1.stop();
|
|
rm1.stop();
|
|
@@ -665,9 +689,12 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
* AM failure towards the max-retry-account and should be able to
|
|
* AM failure towards the max-retry-account and should be able to
|
|
* re-launch the AM.
|
|
* re-launch the AM.
|
|
*/
|
|
*/
|
|
- @Test(timeout = 50000)
|
|
|
|
- public void testRMRestartOrFailoverNotCountedForAMFailures()
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 50)
|
|
|
|
+ public void testRMRestartOrFailoverNotCountedForAMFailures(SchedulerType type)
|
|
throws Exception {
|
|
throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(
|
|
getConf().setBoolean(
|
|
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
|
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
|
@@ -696,7 +723,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am1.getApplicationAttemptId());
|
|
am1.getApplicationAttemptId());
|
|
- Assert.assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertTrue(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
// wait for the next AM to start
|
|
// wait for the next AM to start
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
@@ -720,7 +747,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
nm1.registerNode(Collections.singletonList(status), null);
|
|
nm1.registerNode(Collections.singletonList(status), null);
|
|
|
|
|
|
rm2.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.FAILED);
|
|
rm2.waitForState(attempt2.getAppAttemptId(), RMAppAttemptState.FAILED);
|
|
- Assert.assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
|
|
|
|
|
+ assertEquals(ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
|
|
appState.getAttempt(am2.getApplicationAttemptId())
|
|
appState.getAttempt(am2.getApplicationAttemptId())
|
|
.getAMContainerExitStatus());
|
|
.getAMContainerExitStatus());
|
|
// Will automatically start a new AppAttempt in rm2
|
|
// Will automatically start a new AppAttempt in rm2
|
|
@@ -731,8 +758,8 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
RMAppAttempt attempt3 =
|
|
RMAppAttempt attempt3 =
|
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
|
rm2.getRMContext().getRMApps().get(app1.getApplicationId())
|
|
.getCurrentAppAttempt();
|
|
.getCurrentAppAttempt();
|
|
- Assert.assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
- Assert.assertEquals(ContainerExitStatus.INVALID,
|
|
|
|
|
|
+ assertTrue(attempt3.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
+ assertEquals(ContainerExitStatus.INVALID,
|
|
appState.getAttempt(am3.getApplicationAttemptId())
|
|
appState.getAttempt(am3.getApplicationAttemptId())
|
|
.getAMContainerExitStatus());
|
|
.getAMContainerExitStatus());
|
|
|
|
|
|
@@ -740,8 +767,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
rm2.stop();
|
|
rm2.stop();
|
|
}
|
|
}
|
|
|
|
|
|
- @Test (timeout = 120000)
|
|
|
|
- public void testRMAppAttemptFailuresValidityInterval() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 120)
|
|
|
|
+ public void testRMAppAttemptFailuresValidityInterval(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(
|
|
getConf().setBoolean(
|
|
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
|
YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, false);
|
|
@@ -778,7 +808,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
rm1.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
rm1.waitForState(am.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
// launch the second attempt
|
|
// launch the second attempt
|
|
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
|
|
- Assert.assertEquals(2, app.getAppAttempts().size());
|
|
|
|
|
|
+ assertEquals(2, app.getAppAttempts().size());
|
|
|
|
|
|
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
|
MockAM am_2 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
|
rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
|
rm1.waitForState(am_2.getApplicationAttemptId(), RMAppAttemptState.RUNNING);
|
|
@@ -809,7 +839,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// launch the second attempt
|
|
// launch the second attempt
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
- Assert.assertEquals(2, app1.getAppAttempts().size());
|
|
|
|
|
|
+ assertEquals(2, app1.getAppAttempts().size());
|
|
|
|
|
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
MockAM am2 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
@@ -824,7 +854,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// can launch the third attempt successfully
|
|
// can launch the third attempt successfully
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
- Assert.assertEquals(3, app1.getAppAttempts().size());
|
|
|
|
|
|
+ assertEquals(3, app1.getAppAttempts().size());
|
|
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
|
RMAppAttempt attempt3 = app1.getCurrentAppAttempt();
|
|
clock.reset();
|
|
clock.reset();
|
|
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
MockAM am3 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
|
|
@@ -840,7 +870,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
ApplicationStateData app1State =
|
|
ApplicationStateData app1State =
|
|
memStore1.getState().getApplicationState().
|
|
memStore1.getState().getApplicationState().
|
|
get(app1.getApplicationId());
|
|
get(app1.getApplicationId());
|
|
- Assert.assertEquals(1, app1State.getFirstAttemptId());
|
|
|
|
|
|
+ assertEquals(1, app1State.getFirstAttemptId());
|
|
|
|
|
|
// re-register the NM
|
|
// re-register the NM
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
|
|
@@ -856,7 +886,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
//Wait to make sure attempt3 be removed in State Store
|
|
//Wait to make sure attempt3 be removed in State Store
|
|
//TODO explore a better way than sleeping for a while (YARN-4929)
|
|
//TODO explore a better way than sleeping for a while (YARN-4929)
|
|
Thread.sleep(15 * 1000);
|
|
Thread.sleep(15 * 1000);
|
|
- Assert.assertEquals(2, app1State.getAttemptCount());
|
|
|
|
|
|
+ assertEquals(2, app1State.getAttemptCount());
|
|
|
|
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
|
|
|
|
@@ -870,7 +900,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
nm1
|
|
nm1
|
|
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
.nodeHeartbeat(am4.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
rm2.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
rm2.waitForState(am4.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
- Assert.assertEquals(2, app1State.getAttemptCount());
|
|
|
|
|
|
+ assertEquals(2, app1State.getAttemptCount());
|
|
|
|
|
|
// can launch the 5th attempt successfully
|
|
// can launch the 5th attempt successfully
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
@@ -884,7 +914,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
nm1
|
|
nm1
|
|
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
.nodeHeartbeat(am5.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
|
|
rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
rm2.waitForState(am5.getApplicationAttemptId(), RMAppAttemptState.FAILED);
|
|
- Assert.assertEquals(2, app1State.getAttemptCount());
|
|
|
|
|
|
+ assertEquals(2, app1State.getAttemptCount());
|
|
|
|
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
rm1.stop();
|
|
rm1.stop();
|
|
@@ -901,8 +931,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
return false;
|
|
return false;
|
|
}
|
|
}
|
|
|
|
|
|
- @Test(timeout = 40000)
|
|
|
|
- public void testAMRestartNotLostContainerCompleteMsg() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 40)
|
|
|
|
+ public void testAMRestartNotLostContainerCompleteMsg(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
|
|
|
|
MockRM rm1 = new MockRM(getConf());
|
|
MockRM rm1 = new MockRM(getConf());
|
|
@@ -958,7 +991,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
app1.getCurrentAppAttempt().getJustFinishedContainers();
|
|
app1.getCurrentAppAttempt().getJustFinishedContainers();
|
|
if (isContainerIdInContainerStatus(containerStatuses,
|
|
if (isContainerIdInContainerStatus(containerStatuses,
|
|
containerId2)) {
|
|
containerId2)) {
|
|
- Assert.fail();
|
|
|
|
|
|
+ fail();
|
|
}
|
|
}
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
@@ -973,7 +1006,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
// assert this is a new AM.
|
|
// assert this is a new AM.
|
|
ApplicationAttemptId newAttemptId =
|
|
ApplicationAttemptId newAttemptId =
|
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
|
- Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
|
|
|
|
|
|
+ assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
|
|
|
|
|
|
// launch the new AM
|
|
// launch the new AM
|
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
|
@@ -986,11 +1019,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
allocateResponse.getCompletedContainersStatuses();
|
|
allocateResponse.getCompletedContainersStatuses();
|
|
if (isContainerIdInContainerStatus(containerStatuses,
|
|
if (isContainerIdInContainerStatus(containerStatuses,
|
|
containerId2) == false) {
|
|
containerId2) == false) {
|
|
- Assert.fail();
|
|
|
|
|
|
+ fail();
|
|
}
|
|
}
|
|
containerStatuses = attempt2.getJustFinishedContainers();
|
|
containerStatuses = attempt2.getJustFinishedContainers();
|
|
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
|
|
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
|
|
- Assert.fail();
|
|
|
|
|
|
+ fail();
|
|
}
|
|
}
|
|
|
|
|
|
// the second allocate should not get container complete msg
|
|
// the second allocate should not get container complete msg
|
|
@@ -999,7 +1032,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
containerStatuses =
|
|
containerStatuses =
|
|
allocateResponse.getCompletedContainersStatuses();
|
|
allocateResponse.getCompletedContainersStatuses();
|
|
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
|
|
if (isContainerIdInContainerStatus(containerStatuses, containerId2)) {
|
|
- Assert.fail();
|
|
|
|
|
|
+ fail();
|
|
}
|
|
}
|
|
|
|
|
|
rm1.stop();
|
|
rm1.stop();
|
|
@@ -1010,9 +1043,12 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
* after AM reset window, even if AM who was the last is failed,
|
|
* after AM reset window, even if AM who was the last is failed,
|
|
* all containers are launched by previous AM should be kept.
|
|
* all containers are launched by previous AM should be kept.
|
|
*/
|
|
*/
|
|
- @Test (timeout = 20000)
|
|
|
|
- public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval()
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 20)
|
|
|
|
+ public void testAMRestartNotLostContainerAfterAttemptFailuresValidityInterval(SchedulerType type)
|
|
throws Exception {
|
|
throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
// explicitly set max-am-retry count as 2.
|
|
// explicitly set max-am-retry count as 2.
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
|
|
|
|
@@ -1047,7 +1083,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// launch the second attempt
|
|
// launch the second attempt
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
- Assert.assertEquals(2, app1.getAppAttempts().size());
|
|
|
|
|
|
+ assertEquals(2, app1.getAppAttempts().size());
|
|
|
|
|
|
// It will be the last attempt.
|
|
// It will be the last attempt.
|
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
|
RMAppAttempt attempt2 = app1.getCurrentAppAttempt();
|
|
@@ -1064,20 +1100,20 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
|
|
|
|
// can launch the third attempt successfully
|
|
// can launch the third attempt successfully
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
|
|
- Assert.assertEquals(3, app1.getAppAttempts().size());
|
|
|
|
|
|
+ assertEquals(3, app1.getAppAttempts().size());
|
|
MockAM am3 = rm1.launchAM(app1, rm1, nm1);
|
|
MockAM am3 = rm1.launchAM(app1, rm1, nm1);
|
|
RegisterApplicationMasterResponse registerResponse =
|
|
RegisterApplicationMasterResponse registerResponse =
|
|
am3.registerAppAttempt();
|
|
am3.registerAppAttempt();
|
|
|
|
|
|
// keepContainers is applied, even if attempt2 was the last attempt.
|
|
// keepContainers is applied, even if attempt2 was the last attempt.
|
|
- Assert.assertEquals(1, registerResponse.getContainersFromPreviousAttempts()
|
|
|
|
|
|
+ assertEquals(1, registerResponse.getContainersFromPreviousAttempts()
|
|
.size());
|
|
.size());
|
|
boolean containerId2Exists = false;
|
|
boolean containerId2Exists = false;
|
|
Container container = registerResponse.getContainersFromPreviousAttempts().get(0);
|
|
Container container = registerResponse.getContainersFromPreviousAttempts().get(0);
|
|
if (container.getId().equals(containerId2)) {
|
|
if (container.getId().equals(containerId2)) {
|
|
containerId2Exists = true;
|
|
containerId2Exists = true;
|
|
}
|
|
}
|
|
- Assert.assertTrue(containerId2Exists);
|
|
|
|
|
|
+ assertTrue(containerId2Exists);
|
|
|
|
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
rm1.stop();
|
|
rm1.stop();
|
|
@@ -1102,9 +1138,12 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
* 8. Verify that the app master receives container 3 in the RM response to
|
|
* 8. Verify that the app master receives container 3 in the RM response to
|
|
* its heartbeat.
|
|
* its heartbeat.
|
|
*/
|
|
*/
|
|
- @Test(timeout = 200000)
|
|
|
|
- public void testContainersFromPreviousAttemptsWithRMRestart()
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 200)
|
|
|
|
+ public void testContainersFromPreviousAttemptsWithRMRestart(SchedulerType type)
|
|
throws Exception {
|
|
throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 2);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(
|
|
getConf().setBoolean(
|
|
@@ -1182,7 +1221,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
|
|
nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 2,
|
|
ContainerState.RUNNING);
|
|
ContainerState.RUNNING);
|
|
rm2.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
|
rm2.waitForState(nm1, containerId2, RMContainerState.RUNNING);
|
|
- Assert.assertNotNull(rm2.getResourceScheduler()
|
|
|
|
|
|
+ assertNotNull(rm2.getResourceScheduler()
|
|
.getRMContainer(containerId2));
|
|
.getRMContainer(containerId2));
|
|
|
|
|
|
// wait for app to start a new attempt.
|
|
// wait for app to start a new attempt.
|
|
@@ -1190,7 +1229,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
// assert this is a new AM.
|
|
// assert this is a new AM.
|
|
ApplicationAttemptId newAttemptId =
|
|
ApplicationAttemptId newAttemptId =
|
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
|
app1.getCurrentAppAttempt().getAppAttemptId();
|
|
- Assert.assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
|
|
|
|
|
|
+ assertFalse(newAttemptId.equals(am1.getApplicationAttemptId()));
|
|
|
|
|
|
// launch the new AM
|
|
// launch the new AM
|
|
MockAM am2 = MockRM.launchAMWhenAsyncSchedulingEnabled(app1, rm2);
|
|
MockAM am2 = MockRM.launchAMWhenAsyncSchedulingEnabled(app1, rm2);
|
|
@@ -1198,15 +1237,15 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
am2.registerAppAttempt();
|
|
am2.registerAppAttempt();
|
|
|
|
|
|
// container2 is recovered from previous attempt
|
|
// container2 is recovered from previous attempt
|
|
- Assert.assertEquals(1,
|
|
|
|
|
|
+ assertEquals(1,
|
|
registerResponse.getContainersFromPreviousAttempts().size());
|
|
registerResponse.getContainersFromPreviousAttempts().size());
|
|
- Assert.assertEquals("container 2", containerId2,
|
|
|
|
- registerResponse.getContainersFromPreviousAttempts().get(0).getId());
|
|
|
|
|
|
+ assertEquals(containerId2, registerResponse.getContainersFromPreviousAttempts().get(0).getId(),
|
|
|
|
+ "container 2");
|
|
List<NMToken> prevNMTokens = registerResponse
|
|
List<NMToken> prevNMTokens = registerResponse
|
|
.getNMTokensFromPreviousAttempts();
|
|
.getNMTokensFromPreviousAttempts();
|
|
- Assert.assertEquals(1, prevNMTokens.size());
|
|
|
|
|
|
+ assertEquals(1, prevNMTokens.size());
|
|
// container 2 is running on node 1
|
|
// container 2 is running on node 1
|
|
- Assert.assertEquals(nm1Address, prevNMTokens.get(0).getNodeId().toString());
|
|
|
|
|
|
+ assertEquals(nm1Address, prevNMTokens.get(0).getNodeId().toString());
|
|
|
|
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
|
|
|
|
|
|
@@ -1220,7 +1259,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3,
|
|
nm2.nodeHeartbeat(am1.getApplicationAttemptId(), 3,
|
|
ContainerState.RUNNING);
|
|
ContainerState.RUNNING);
|
|
rm2.waitForState(nm2, containerId3, RMContainerState.RUNNING);
|
|
rm2.waitForState(nm2, containerId3, RMContainerState.RUNNING);
|
|
- Assert.assertNotNull(rm2.getResourceScheduler()
|
|
|
|
|
|
+ assertNotNull(rm2.getResourceScheduler()
|
|
.getRMContainer(containerId3));
|
|
.getRMContainer(containerId3));
|
|
|
|
|
|
List<Container> containersFromPreviousAttempts = new ArrayList<>();
|
|
List<Container> containersFromPreviousAttempts = new ArrayList<>();
|
|
@@ -1230,12 +1269,12 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
if (allocateResponse.getContainersFromPreviousAttempts().size() > 0){
|
|
if (allocateResponse.getContainersFromPreviousAttempts().size() > 0){
|
|
containersFromPreviousAttempts.addAll(
|
|
containersFromPreviousAttempts.addAll(
|
|
allocateResponse.getContainersFromPreviousAttempts());
|
|
allocateResponse.getContainersFromPreviousAttempts());
|
|
- Assert.assertEquals("new containers should not be allocated",
|
|
|
|
- 0, allocateResponse.getAllocatedContainers().size());
|
|
|
|
|
|
+ assertEquals(0, allocateResponse.getAllocatedContainers().size(),
|
|
|
|
+ "new containers should not be allocated");
|
|
List<NMToken> nmTokens = allocateResponse.getNMTokens();
|
|
List<NMToken> nmTokens = allocateResponse.getNMTokens();
|
|
- Assert.assertEquals(1, nmTokens.size());
|
|
|
|
|
|
+ assertEquals(1, nmTokens.size());
|
|
// container 3 is running on node 2
|
|
// container 3 is running on node 2
|
|
- Assert.assertEquals(nm2Address,
|
|
|
|
|
|
+ assertEquals(nm2Address,
|
|
nmTokens.get(0).getNodeId().toString());
|
|
nmTokens.get(0).getNodeId().toString());
|
|
return true;
|
|
return true;
|
|
}
|
|
}
|
|
@@ -1245,8 +1284,8 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
}
|
|
}
|
|
return false;
|
|
return false;
|
|
}, 2000, 200000);
|
|
}, 2000, 200000);
|
|
- Assert.assertEquals("container 3", containerId3,
|
|
|
|
- containersFromPreviousAttempts.get(0).getId());
|
|
|
|
|
|
+ assertEquals(containerId3,
|
|
|
|
+ containersFromPreviousAttempts.get(0).getId(), "container 3");
|
|
rm2.stop();
|
|
rm2.stop();
|
|
rm1.stop();
|
|
rm1.stop();
|
|
}
|
|
}
|
|
@@ -1262,8 +1301,11 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
* 4. Verify that the used resource of queue should be cleaned up normally
|
|
* 4. Verify that the used resource of queue should be cleaned up normally
|
|
* after app fail.
|
|
* after app fail.
|
|
*/
|
|
*/
|
|
- @Test(timeout = 30000)
|
|
|
|
- public void testQueueResourceDoesNotLeak() throws Exception {
|
|
|
|
|
|
+ @ParameterizedTest(name = "{0}")
|
|
|
|
+ @MethodSource("getParameters")
|
|
|
|
+ @Timeout(value = 30)
|
|
|
|
+ public void testQueueResourceDoesNotLeak(SchedulerType type) throws Exception {
|
|
|
|
+ initTestAMRestart(type);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
getConf().setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf().setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
|
getConf()
|
|
getConf()
|
|
@@ -1301,7 +1343,7 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
TestSchedulerUtils.waitSchedulerApplicationAttemptStopped(scheduler,
|
|
am1.getApplicationAttemptId());
|
|
am1.getApplicationAttemptId());
|
|
|
|
|
|
- Assert.assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
+ assertFalse(attempt1.shouldCountTowardsMaxAttemptRetry());
|
|
|
|
|
|
// AM should not be restarted.
|
|
// AM should not be restarted.
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
rm1.waitForState(app1.getApplicationId(), RMAppState.FAILED);
|
|
@@ -1311,20 +1353,20 @@ public class TestAMRestart extends ParameterizedSchedulerTestBase {
|
|
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
|
if (getSchedulerType() == SchedulerType.CAPACITY) {
|
|
LeafQueue queue =
|
|
LeafQueue queue =
|
|
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
|
|
(LeafQueue) ((CapacityScheduler) scheduler).getQueue("default");
|
|
- Assert.assertEquals(0,
|
|
|
|
|
|
+ assertEquals(0,
|
|
queue.getQueueResourceUsage().getUsed().getMemorySize());
|
|
queue.getQueueResourceUsage().getUsed().getMemorySize());
|
|
- Assert.assertEquals(0,
|
|
|
|
|
|
+ assertEquals(0,
|
|
queue.getQueueResourceUsage().getUsed().getVirtualCores());
|
|
queue.getQueueResourceUsage().getUsed().getVirtualCores());
|
|
} else if (getSchedulerType() == SchedulerType.FAIR) {
|
|
} else if (getSchedulerType() == SchedulerType.FAIR) {
|
|
// The default queue is not auto created after YARN-7769 so
|
|
// The default queue is not auto created after YARN-7769 so
|
|
// user-as-default-queue option is used
|
|
// user-as-default-queue option is used
|
|
Collection<FSLeafQueue> queues = ((FairScheduler) scheduler)
|
|
Collection<FSLeafQueue> queues = ((FairScheduler) scheduler)
|
|
.getQueueManager().getLeafQueues();
|
|
.getQueueManager().getLeafQueues();
|
|
- Assert.assertEquals(1, queues.size());
|
|
|
|
|
|
+ assertEquals(1, queues.size());
|
|
|
|
|
|
FSLeafQueue queue = queues.iterator().next();
|
|
FSLeafQueue queue = queues.iterator().next();
|
|
- Assert.assertEquals(0, queue.getResourceUsage().getMemorySize());
|
|
|
|
- Assert.assertEquals(0, queue.getResourceUsage().getVirtualCores());
|
|
|
|
|
|
+ assertEquals(0, queue.getResourceUsage().getMemorySize());
|
|
|
|
+ assertEquals(0, queue.getResourceUsage().getVirtualCores());
|
|
}
|
|
}
|
|
|
|
|
|
rm1.stop();
|
|
rm1.stop();
|