|
@@ -42,6 +42,7 @@ import java.util.Collections;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.ConcurrentMap;
|
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
import org.apache.hadoop.fs.FileContext;
|
|
@@ -682,7 +683,49 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
|
|
verify(cm, never()).handle(isA(CMgrCompletedAppsEvent.class));
|
|
|
}
|
|
|
|
|
|
- private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
|
|
|
+ @Test
|
|
|
+ public void testKilledContainerInQueuedStateRecovery() throws Exception {
|
|
|
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_ENABLED, true);
|
|
|
+ conf.setBoolean(YarnConfiguration.NM_RECOVERY_SUPERVISED, true);
|
|
|
+ NMStateStoreService stateStore = new NMMemoryStateStoreService();
|
|
|
+ stateStore.init(conf);
|
|
|
+ stateStore.start();
|
|
|
+ context = createContext(conf, stateStore);
|
|
|
+ ContainerManagerImpl cm = createContainerManager(context, delSrvc);
|
|
|
+ ((NMContext) context).setContainerManager(cm);
|
|
|
+ cm.init(conf);
|
|
|
+ cm.start();
|
|
|
+
|
|
|
+ // add an application by starting a container
|
|
|
+ ApplicationId appId = ApplicationId.newInstance(0, 0);
|
|
|
+ ApplicationAttemptId attemptId =
|
|
|
+ ApplicationAttemptId.newInstance(appId, 1);
|
|
|
+ ContainerId cid = ContainerId.newContainerId(attemptId, 1);
|
|
|
+ createStartContainerRequest(appId, cid, cm);
|
|
|
+
|
|
|
+ Application app = context.getApplications().get(appId);
|
|
|
+ assertEquals(1, context.getApplications().size());
|
|
|
+ assertNotNull(app);
|
|
|
+
|
|
|
+ stateStore.storeContainerKilled(cid);
|
|
|
+ // restart and verify container scheduler has recovered correctly
|
|
|
+ cm.stop();
|
|
|
+ context = createContext(conf, stateStore);
|
|
|
+ cm = createContainerManager(context, delSrvc);
|
|
|
+ ((NMContext) context).setContainerManager(cm);
|
|
|
+ cm.init(conf);
|
|
|
+ cm.start();
|
|
|
+ assertEquals(1, context.getApplications().size());
|
|
|
+
|
|
|
+ ConcurrentMap<ContainerId, Container> containers = context.getContainers();
|
|
|
+ Container c = containers.get(cid);
|
|
|
+ assertEquals(ContainerState.DONE, c.getContainerState());
|
|
|
+ app = context.getApplications().get(appId);
|
|
|
+ assertNotNull(app);
|
|
|
+ cm.stop();
|
|
|
+ }
|
|
|
+
|
|
|
+ private void createStartContainerRequest(ApplicationId appId, ContainerId cid,
|
|
|
ContainerManagerImpl cm) throws Exception {
|
|
|
Map<String, String> containerEnv = new HashMap<>();
|
|
|
setFlowContext(containerEnv, "app_name1", appId);
|
|
@@ -727,6 +770,11 @@ public class TestContainerManagerRecovery extends BaseContainerManagerTest {
|
|
|
context, cm, cid, clc, null, ContainerType.TASK);
|
|
|
assertTrue(startResponse.getFailedRequests().isEmpty());
|
|
|
assertEquals(1, context.getApplications().size());
|
|
|
+ }
|
|
|
+
|
|
|
+ private void commonLaunchContainer(ApplicationId appId, ContainerId cid,
|
|
|
+ ContainerManagerImpl cm) throws Exception {
|
|
|
+ createStartContainerRequest(appId, cid, cm);
|
|
|
// make sure the container reaches RUNNING state
|
|
|
waitForNMContainerState(cm, cid,
|
|
|
org.apache.hadoop.yarn.server.nodemanager
|