|
@@ -24,6 +24,7 @@ import java.io.FileReader;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.io.PrintWriter;
|
|
import java.io.PrintWriter;
|
|
import java.net.InetAddress;
|
|
import java.net.InetAddress;
|
|
|
|
+import java.util.ArrayList;
|
|
import java.util.Arrays;
|
|
import java.util.Arrays;
|
|
import java.util.HashMap;
|
|
import java.util.HashMap;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -37,9 +38,13 @@ import org.apache.hadoop.fs.Path;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
import org.apache.hadoop.fs.UnsupportedFileSystemException;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.security.UserGroupInformation;
|
|
import org.apache.hadoop.util.Shell;
|
|
import org.apache.hadoop.util.Shell;
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
|
|
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
|
|
|
|
+import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
@@ -51,6 +56,7 @@ import org.apache.hadoop.yarn.api.records.LocalResourceType;
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
|
+import org.apache.hadoop.yarn.api.records.SerializedException;
|
|
import org.apache.hadoop.yarn.api.records.Token;
|
|
import org.apache.hadoop.yarn.api.records.Token;
|
|
import org.apache.hadoop.yarn.api.records.URL;
|
|
import org.apache.hadoop.yarn.api.records.URL;
|
|
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
|
import org.apache.hadoop.yarn.exceptions.InvalidContainerException;
|
|
@@ -63,6 +69,7 @@ import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor.ExitCode;
|
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
|
import org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor;
|
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
|
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService;
|
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
|
import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
|
|
@@ -88,11 +95,11 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
super.setup();
|
|
super.setup();
|
|
}
|
|
}
|
|
|
|
|
|
- private ContainerId createContainerId() {
|
|
|
|
|
|
+ private ContainerId createContainerId(int id) {
|
|
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
|
ApplicationId appId = ApplicationId.newInstance(0, 0);
|
|
ApplicationAttemptId appAttemptId =
|
|
ApplicationAttemptId appAttemptId =
|
|
ApplicationAttemptId.newInstance(appId, 1);
|
|
ApplicationAttemptId.newInstance(appId, 1);
|
|
- ContainerId containerId = ContainerId.newInstance(appAttemptId, 0);
|
|
|
|
|
|
+ ContainerId containerId = ContainerId.newInstance(appAttemptId, id);
|
|
return containerId;
|
|
return containerId;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -119,6 +126,14 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
.getKeyId()));
|
|
.getKeyId()));
|
|
return ugi;
|
|
return ugi;
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ protected void authorizeGetAndStopContainerRequest(ContainerId containerId,
|
|
|
|
+ Container container, boolean stopRequest, NMTokenIdentifier identifier) throws YarnException {
|
|
|
|
+ if(container == null || container.getUser().equals("Fail")){
|
|
|
|
+ throw new YarnException("Reject this container");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
};
|
|
};
|
|
}
|
|
}
|
|
|
|
|
|
@@ -138,12 +153,17 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
// Just do a query for a non-existing container.
|
|
// Just do a query for a non-existing container.
|
|
boolean throwsException = false;
|
|
boolean throwsException = false;
|
|
try {
|
|
try {
|
|
- GetContainerStatusRequest request =
|
|
|
|
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
|
|
|
|
- ContainerId cId = createContainerId();
|
|
|
|
- request.setContainerId(cId);
|
|
|
|
- containerManager.getContainerStatus(request);
|
|
|
|
- } catch (YarnException e) {
|
|
|
|
|
|
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
|
|
|
+ ContainerId id =createContainerId(0);
|
|
|
|
+ containerIds.add(id);
|
|
|
|
+ GetContainerStatusesRequest request =
|
|
|
|
+ GetContainerStatusesRequest.newInstance(containerIds);
|
|
|
|
+ GetContainerStatusesResponse response =
|
|
|
|
+ containerManager.getContainerStatuses(request);
|
|
|
|
+ if(response.getFailedRequests().containsKey(id)){
|
|
|
|
+ throw response.getFailedRequests().get(id).deSerialize();
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable e) {
|
|
throwsException = true;
|
|
throwsException = true;
|
|
}
|
|
}
|
|
Assert.assertTrue(throwsException);
|
|
Assert.assertTrue(throwsException);
|
|
@@ -163,7 +183,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
fileWriter.close();
|
|
fileWriter.close();
|
|
|
|
|
|
// ////// Construct the Container-id
|
|
// ////// Construct the Container-id
|
|
- ContainerId cId = createContainerId();
|
|
|
|
|
|
+ ContainerId cId = createContainerId(0);
|
|
|
|
|
|
// ////// Construct the container-spec.
|
|
// ////// Construct the container-spec.
|
|
ContainerLaunchContext containerLaunchContext =
|
|
ContainerLaunchContext containerLaunchContext =
|
|
@@ -182,14 +202,17 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
new HashMap<String, LocalResource>();
|
|
new HashMap<String, LocalResource>();
|
|
localResources.put(destinationFile, rsrc_alpha);
|
|
localResources.put(destinationFile, rsrc_alpha);
|
|
containerLaunchContext.setLocalResources(localResources);
|
|
containerLaunchContext.setLocalResources(localResources);
|
|
- StartContainerRequest startRequest =
|
|
|
|
- recordFactory.newRecordInstance(StartContainerRequest.class);
|
|
|
|
- startRequest.setContainerLaunchContext(containerLaunchContext);
|
|
|
|
- startRequest.setContainerToken(createContainerToken(cId,
|
|
|
|
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
|
|
|
|
- context.getContainerTokenSecretManager()));
|
|
|
|
|
|
|
|
- containerManager.startContainer(startRequest);
|
|
|
|
|
|
+ StartContainerRequest scRequest =
|
|
|
|
+ StartContainerRequest.newInstance(
|
|
|
|
+ containerLaunchContext,
|
|
|
|
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
|
|
|
|
+ user, context.getContainerTokenSecretManager()));
|
|
|
|
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
|
|
|
+ list.add(scRequest);
|
|
|
|
+ StartContainersRequest allRequests =
|
|
|
|
+ StartContainersRequest.newInstance(list);
|
|
|
|
+ containerManager.startContainers(allRequests);
|
|
|
|
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
ContainerState.COMPLETE);
|
|
ContainerState.COMPLETE);
|
|
@@ -237,7 +260,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
new File(tmpDir, "start_file.txt").getAbsoluteFile();
|
|
new File(tmpDir, "start_file.txt").getAbsoluteFile();
|
|
|
|
|
|
// ////// Construct the Container-id
|
|
// ////// Construct the Container-id
|
|
- ContainerId cId = createContainerId();
|
|
|
|
|
|
+ ContainerId cId = createContainerId(0);
|
|
|
|
|
|
if (Shell.WINDOWS) {
|
|
if (Shell.WINDOWS) {
|
|
fileWriter.println("@echo Hello World!> " + processStartFile);
|
|
fileWriter.println("@echo Hello World!> " + processStartFile);
|
|
@@ -272,13 +295,17 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
|
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
|
containerLaunchContext.setCommands(commands);
|
|
containerLaunchContext.setCommands(commands);
|
|
|
|
|
|
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
|
|
|
- startRequest.setContainerLaunchContext(containerLaunchContext);
|
|
|
|
- startRequest.setContainerToken(createContainerToken(cId,
|
|
|
|
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
|
|
|
|
- context.getContainerTokenSecretManager()));
|
|
|
|
- containerManager.startContainer(startRequest);
|
|
|
|
-
|
|
|
|
|
|
+ StartContainerRequest scRequest =
|
|
|
|
+ StartContainerRequest.newInstance(containerLaunchContext,
|
|
|
|
+ createContainerToken(cId,
|
|
|
|
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
|
|
|
|
+ context.getContainerTokenSecretManager()));
|
|
|
|
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
|
|
|
+ list.add(scRequest);
|
|
|
|
+ StartContainersRequest allRequests =
|
|
|
|
+ StartContainersRequest.newInstance(list);
|
|
|
|
+ containerManager.startContainers(allRequests);
|
|
|
|
+
|
|
int timeoutSecs = 0;
|
|
int timeoutSecs = 0;
|
|
while (!processStartFile.exists() && timeoutSecs++ < 20) {
|
|
while (!processStartFile.exists() && timeoutSecs++ < 20) {
|
|
Thread.sleep(1000);
|
|
Thread.sleep(1000);
|
|
@@ -305,18 +332,18 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
Assert.assertTrue("Process is not alive!",
|
|
Assert.assertTrue("Process is not alive!",
|
|
DefaultContainerExecutor.containerIsAlive(pid));
|
|
DefaultContainerExecutor.containerIsAlive(pid));
|
|
|
|
|
|
- StopContainerRequest stopRequest = recordFactory.newRecordInstance(StopContainerRequest.class);
|
|
|
|
- stopRequest.setContainerId(cId);
|
|
|
|
- containerManager.stopContainer(stopRequest);
|
|
|
|
-
|
|
|
|
|
|
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
|
|
|
+ containerIds.add(cId);
|
|
|
|
+ StopContainersRequest stopRequest =
|
|
|
|
+ StopContainersRequest.newInstance(containerIds);
|
|
|
|
+ containerManager.stopContainers(stopRequest);
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
ContainerState.COMPLETE);
|
|
ContainerState.COMPLETE);
|
|
|
|
|
|
- GetContainerStatusRequest gcsRequest =
|
|
|
|
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
|
|
|
|
- gcsRequest.setContainerId(cId);
|
|
|
|
|
|
+ GetContainerStatusesRequest gcsRequest =
|
|
|
|
+ GetContainerStatusesRequest.newInstance(containerIds);
|
|
ContainerStatus containerStatus =
|
|
ContainerStatus containerStatus =
|
|
- containerManager.getContainerStatus(gcsRequest).getStatus();
|
|
|
|
|
|
+ containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
|
|
int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
|
|
int expectedExitCode = Shell.WINDOWS ? ExitCode.FORCE_KILLED.getExitCode() :
|
|
ExitCode.TERMINATED.getExitCode();
|
|
ExitCode.TERMINATED.getExitCode();
|
|
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
|
|
Assert.assertEquals(expectedExitCode, containerStatus.getExitStatus());
|
|
@@ -325,7 +352,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
Assert.assertFalse("Process is still alive!",
|
|
Assert.assertFalse("Process is still alive!",
|
|
DefaultContainerExecutor.containerIsAlive(pid));
|
|
DefaultContainerExecutor.containerIsAlive(pid));
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
private void testContainerLaunchAndExit(int exitCode) throws IOException,
|
|
private void testContainerLaunchAndExit(int exitCode) throws IOException,
|
|
InterruptedException, YarnException {
|
|
InterruptedException, YarnException {
|
|
|
|
|
|
@@ -335,7 +362,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
new File(tmpDir, "start_file.txt").getAbsoluteFile();
|
|
new File(tmpDir, "start_file.txt").getAbsoluteFile();
|
|
|
|
|
|
// ////// Construct the Container-id
|
|
// ////// Construct the Container-id
|
|
- ContainerId cId = createContainerId();
|
|
|
|
|
|
+ ContainerId cId = createContainerId(0);
|
|
|
|
|
|
if (Shell.WINDOWS) {
|
|
if (Shell.WINDOWS) {
|
|
fileWriter.println("@echo Hello World!> " + processStartFile);
|
|
fileWriter.println("@echo Hello World!> " + processStartFile);
|
|
@@ -376,21 +403,26 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
|
List<String> commands = Arrays.asList(Shell.getRunScriptCommand(scriptFile));
|
|
containerLaunchContext.setCommands(commands);
|
|
containerLaunchContext.setCommands(commands);
|
|
|
|
|
|
- StartContainerRequest startRequest = recordFactory.newRecordInstance(StartContainerRequest.class);
|
|
|
|
- startRequest.setContainerLaunchContext(containerLaunchContext);
|
|
|
|
- startRequest.setContainerToken(createContainerToken(cId,
|
|
|
|
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
|
|
|
|
- context.getContainerTokenSecretManager()));
|
|
|
|
- containerManager.startContainer(startRequest);
|
|
|
|
|
|
+ StartContainerRequest scRequest =
|
|
|
|
+ StartContainerRequest.newInstance(
|
|
|
|
+ containerLaunchContext,
|
|
|
|
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
|
|
|
|
+ user, context.getContainerTokenSecretManager()));
|
|
|
|
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
|
|
|
+ list.add(scRequest);
|
|
|
|
+ StartContainersRequest allRequests =
|
|
|
|
+ StartContainersRequest.newInstance(list);
|
|
|
|
+ containerManager.startContainers(allRequests);
|
|
|
|
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
ContainerState.COMPLETE);
|
|
ContainerState.COMPLETE);
|
|
|
|
|
|
- GetContainerStatusRequest gcsRequest =
|
|
|
|
- recordFactory.newRecordInstance(GetContainerStatusRequest.class);
|
|
|
|
- gcsRequest.setContainerId(cId);
|
|
|
|
|
|
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
|
|
|
+ containerIds.add(cId);
|
|
|
|
+ GetContainerStatusesRequest gcsRequest =
|
|
|
|
+ GetContainerStatusesRequest.newInstance(containerIds);
|
|
ContainerStatus containerStatus =
|
|
ContainerStatus containerStatus =
|
|
- containerManager.getContainerStatus(gcsRequest).getStatus();
|
|
|
|
|
|
+ containerManager.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
|
|
|
|
|
|
// Verify exit status matches exit state of script
|
|
// Verify exit status matches exit state of script
|
|
Assert.assertEquals(exitCode,
|
|
Assert.assertEquals(exitCode,
|
|
@@ -439,7 +471,7 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
fileWriter.close();
|
|
fileWriter.close();
|
|
|
|
|
|
// ////// Construct the Container-id
|
|
// ////// Construct the Container-id
|
|
- ContainerId cId = createContainerId();
|
|
|
|
|
|
+ ContainerId cId = createContainerId(0);
|
|
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
|
|
ApplicationId appId = cId.getApplicationAttemptId().getApplicationId();
|
|
|
|
|
|
// ////// Construct the container-spec.
|
|
// ////// Construct the container-spec.
|
|
@@ -460,11 +492,17 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
new HashMap<String, LocalResource>();
|
|
new HashMap<String, LocalResource>();
|
|
localResources.put(destinationFile, rsrc_alpha);
|
|
localResources.put(destinationFile, rsrc_alpha);
|
|
containerLaunchContext.setLocalResources(localResources);
|
|
containerLaunchContext.setLocalResources(localResources);
|
|
- StartContainerRequest request = recordFactory.newRecordInstance(StartContainerRequest.class);
|
|
|
|
- request.setContainerLaunchContext(containerLaunchContext);
|
|
|
|
- request.setContainerToken(createContainerToken(cId, DUMMY_RM_IDENTIFIER,
|
|
|
|
- context.getNodeId(), user, context.getContainerTokenSecretManager()));
|
|
|
|
- containerManager.startContainer(request);
|
|
|
|
|
|
+
|
|
|
|
+ StartContainerRequest scRequest =
|
|
|
|
+ StartContainerRequest.newInstance(
|
|
|
|
+ containerLaunchContext,
|
|
|
|
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
|
|
|
|
+ user, context.getContainerTokenSecretManager()));
|
|
|
|
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
|
|
|
+ list.add(scRequest);
|
|
|
|
+ StartContainersRequest allRequests =
|
|
|
|
+ StartContainersRequest.newInstance(list);
|
|
|
|
+ containerManager.startContainers(allRequests);
|
|
|
|
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
BaseContainerManagerTest.waitForContainerState(containerManager, cId,
|
|
ContainerState.COMPLETE);
|
|
ContainerState.COMPLETE);
|
|
@@ -528,29 +566,37 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
|
|
|
|
@Test
|
|
@Test
|
|
public void testContainerLaunchFromPreviousRM() throws IOException,
|
|
public void testContainerLaunchFromPreviousRM() throws IOException,
|
|
- InterruptedException {
|
|
|
|
|
|
+ InterruptedException, YarnException {
|
|
containerManager.start();
|
|
containerManager.start();
|
|
|
|
|
|
ContainerLaunchContext containerLaunchContext =
|
|
ContainerLaunchContext containerLaunchContext =
|
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
|
|
|
- ContainerId cId1 = createContainerId();
|
|
|
|
- ContainerId cId2 = createContainerId();
|
|
|
|
|
|
+ ContainerId cId1 = createContainerId(0);
|
|
|
|
+ ContainerId cId2 = createContainerId(0);
|
|
containerLaunchContext
|
|
containerLaunchContext
|
|
.setLocalResources(new HashMap<String, LocalResource>());
|
|
.setLocalResources(new HashMap<String, LocalResource>());
|
|
|
|
|
|
// Construct the Container with Invalid RMIdentifier
|
|
// Construct the Container with Invalid RMIdentifier
|
|
StartContainerRequest startRequest1 =
|
|
StartContainerRequest startRequest1 =
|
|
- recordFactory.newRecordInstance(StartContainerRequest.class);
|
|
|
|
- startRequest1.setContainerLaunchContext(containerLaunchContext);
|
|
|
|
|
|
+ StartContainerRequest.newInstance(containerLaunchContext,
|
|
|
|
+ createContainerToken(cId1,
|
|
|
|
+ ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
|
|
|
|
+ user, context.getContainerTokenSecretManager()));
|
|
|
|
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
|
|
|
+ list.add(startRequest1);
|
|
|
|
+ StartContainersRequest allRequests =
|
|
|
|
+ StartContainersRequest.newInstance(list);
|
|
|
|
+ containerManager.startContainers(allRequests);
|
|
|
|
|
|
- startRequest1.setContainerToken(createContainerToken(cId1,
|
|
|
|
- ResourceManagerConstants.RM_INVALID_IDENTIFIER, context.getNodeId(),
|
|
|
|
- user, context.getContainerTokenSecretManager()));
|
|
|
|
boolean catchException = false;
|
|
boolean catchException = false;
|
|
try {
|
|
try {
|
|
- containerManager.startContainer(startRequest1);
|
|
|
|
- } catch (YarnException e) {
|
|
|
|
|
|
+ StartContainersResponse response = containerManager.startContainers(allRequests);
|
|
|
|
+ if(response.getFailedRequests().containsKey(cId1)) {
|
|
|
|
+ throw response.getFailedRequests().get(cId1).deSerialize();
|
|
|
|
+ }
|
|
|
|
+ } catch (Throwable e) {
|
|
|
|
+ e.printStackTrace();
|
|
catchException = true;
|
|
catchException = true;
|
|
Assert.assertTrue(e.getMessage().contains(
|
|
Assert.assertTrue(e.getMessage().contains(
|
|
"Container " + cId1 + " rejected as it is allocated by a previous RM"));
|
|
"Container " + cId1 + " rejected as it is allocated by a previous RM"));
|
|
@@ -563,21 +609,143 @@ public class TestContainerManager extends BaseContainerManagerTest {
|
|
|
|
|
|
// Construct the Container with a RMIdentifier within current RM
|
|
// Construct the Container with a RMIdentifier within current RM
|
|
StartContainerRequest startRequest2 =
|
|
StartContainerRequest startRequest2 =
|
|
- recordFactory.newRecordInstance(StartContainerRequest.class);
|
|
|
|
- startRequest2.setContainerLaunchContext(containerLaunchContext);
|
|
|
|
- startRequest2.setContainerToken(createContainerToken(cId2,
|
|
|
|
- DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
|
|
|
|
- context.getContainerTokenSecretManager()));
|
|
|
|
|
|
+ StartContainerRequest.newInstance(containerLaunchContext,
|
|
|
|
+ createContainerToken(cId2,
|
|
|
|
+ DUMMY_RM_IDENTIFIER, context.getNodeId(), user,
|
|
|
|
+ context.getContainerTokenSecretManager()));
|
|
|
|
+ List<StartContainerRequest> list2 = new ArrayList<StartContainerRequest>();
|
|
|
|
+ list.add(startRequest2);
|
|
|
|
+ StartContainersRequest allRequests2 =
|
|
|
|
+ StartContainersRequest.newInstance(list2);
|
|
|
|
+ containerManager.startContainers(allRequests2);
|
|
|
|
+
|
|
boolean noException = true;
|
|
boolean noException = true;
|
|
try {
|
|
try {
|
|
- containerManager.startContainer(startRequest2);
|
|
|
|
|
|
+ containerManager.startContainers(allRequests2);
|
|
} catch (YarnException e) {
|
|
} catch (YarnException e) {
|
|
noException = false;
|
|
noException = false;
|
|
}
|
|
}
|
|
// Verify that startContainer get no YarnException
|
|
// Verify that startContainer get no YarnException
|
|
Assert.assertTrue(noException);
|
|
Assert.assertTrue(noException);
|
|
}
|
|
}
|
|
-
|
|
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testMultipleContainersLaunch() throws Exception {
|
|
|
|
+ containerManager.start();
|
|
|
|
+
|
|
|
|
+ List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
|
|
|
|
+ ContainerLaunchContext containerLaunchContext =
|
|
|
|
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
|
+ ContainerId cId = createContainerId(i);
|
|
|
|
+ long identifier = 0;
|
|
|
|
+ if ((i & 1) == 0)
|
|
|
|
+ // container with even id fail
|
|
|
|
+ identifier = ResourceManagerConstants.RM_INVALID_IDENTIFIER;
|
|
|
|
+ else
|
|
|
|
+ identifier = DUMMY_RM_IDENTIFIER;
|
|
|
|
+ Token containerToken =
|
|
|
|
+ createContainerToken(cId, identifier, context.getNodeId(), user,
|
|
|
|
+ context.getContainerTokenSecretManager());
|
|
|
|
+ StartContainerRequest request =
|
|
|
|
+ StartContainerRequest.newInstance(containerLaunchContext,
|
|
|
|
+ containerToken);
|
|
|
|
+ list.add(request);
|
|
|
|
+ }
|
|
|
|
+ StartContainersRequest requestList =
|
|
|
|
+ StartContainersRequest.newInstance(list);
|
|
|
|
+
|
|
|
|
+ StartContainersResponse response =
|
|
|
|
+ containerManager.startContainers(requestList);
|
|
|
|
+
|
|
|
|
+ Assert.assertEquals(5, response.getSuccessfullyStartedContainers().size());
|
|
|
|
+ for (ContainerId id : response.getSuccessfullyStartedContainers()) {
|
|
|
|
+ // Containers with odd id should succeed.
|
|
|
|
+ Assert.assertEquals(1, id.getId() & 1);
|
|
|
|
+ }
|
|
|
|
+ Assert.assertEquals(5, response.getFailedRequests().size());
|
|
|
|
+ for (Map.Entry<ContainerId, SerializedException> entry : response
|
|
|
|
+ .getFailedRequests().entrySet()) {
|
|
|
|
+ // Containers with even id should fail.
|
|
|
|
+ Assert.assertEquals(0, entry.getKey().getId() & 1);
|
|
|
|
+ Assert.assertTrue(entry.getValue().getMessage()
|
|
|
|
+ .contains(
|
|
|
|
+ "Container " + entry.getKey() + " rejected as it is allocated by a previous RM"));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Test
|
|
|
|
+ public void testMultipleContainersStopAndGetStatus() throws Exception {
|
|
|
|
+ containerManager.start();
|
|
|
|
+ List<StartContainerRequest> startRequest =
|
|
|
|
+ new ArrayList<StartContainerRequest>();
|
|
|
|
+ ContainerLaunchContext containerLaunchContext =
|
|
|
|
+ recordFactory.newRecordInstance(ContainerLaunchContext.class);
|
|
|
|
+
|
|
|
|
+ List<ContainerId> containerIds = new ArrayList<ContainerId>();
|
|
|
|
+ for (int i = 0; i < 10; i++) {
|
|
|
|
+ ContainerId cId = createContainerId(i);
|
|
|
|
+ String user = null;
|
|
|
|
+ if ((i & 1) == 0) {
|
|
|
|
+ // container with even id fail
|
|
|
|
+ user = "Fail";
|
|
|
|
+ } else {
|
|
|
|
+ user = "Pass";
|
|
|
|
+ }
|
|
|
|
+ Token containerToken =
|
|
|
|
+ createContainerToken(cId, DUMMY_RM_IDENTIFIER, context.getNodeId(),
|
|
|
|
+ user, context.getContainerTokenSecretManager());
|
|
|
|
+ StartContainerRequest request =
|
|
|
|
+ StartContainerRequest.newInstance(containerLaunchContext,
|
|
|
|
+ containerToken);
|
|
|
|
+ startRequest.add(request);
|
|
|
|
+ containerIds.add(cId);
|
|
|
|
+ }
|
|
|
|
+ // start containers
|
|
|
|
+ StartContainersRequest requestList =
|
|
|
|
+ StartContainersRequest.newInstance(startRequest);
|
|
|
|
+ containerManager.startContainers(requestList);
|
|
|
|
+
|
|
|
|
+ // Get container statuses
|
|
|
|
+ GetContainerStatusesRequest statusRequest =
|
|
|
|
+ GetContainerStatusesRequest.newInstance(containerIds);
|
|
|
|
+ GetContainerStatusesResponse statusResponse =
|
|
|
|
+ containerManager.getContainerStatuses(statusRequest);
|
|
|
|
+ Assert.assertEquals(5, statusResponse.getContainerStatuses().size());
|
|
|
|
+ for (ContainerStatus status : statusResponse.getContainerStatuses()) {
|
|
|
|
+ // Containers with odd id should succeed
|
|
|
|
+ Assert.assertEquals(1, status.getContainerId().getId() & 1);
|
|
|
|
+ }
|
|
|
|
+ Assert.assertEquals(5, statusResponse.getFailedRequests().size());
|
|
|
|
+ for (Map.Entry<ContainerId, SerializedException> entry : statusResponse
|
|
|
|
+ .getFailedRequests().entrySet()) {
|
|
|
|
+ // Containers with even id should fail.
|
|
|
|
+ Assert.assertEquals(0, entry.getKey().getId() & 1);
|
|
|
|
+ Assert.assertTrue(entry.getValue().getMessage()
|
|
|
|
+ .contains("Reject this container"));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // stop containers
|
|
|
|
+ StopContainersRequest stopRequest =
|
|
|
|
+ StopContainersRequest.newInstance(containerIds);
|
|
|
|
+ StopContainersResponse stopResponse =
|
|
|
|
+ containerManager.stopContainers(stopRequest);
|
|
|
|
+ Assert.assertEquals(5, stopResponse.getSuccessfullyStoppedContainers()
|
|
|
|
+ .size());
|
|
|
|
+ for (ContainerId id : stopResponse.getSuccessfullyStoppedContainers()) {
|
|
|
|
+ // Containers with odd id should succeed.
|
|
|
|
+ Assert.assertEquals(1, id.getId() & 1);
|
|
|
|
+ }
|
|
|
|
+ Assert.assertEquals(5, stopResponse.getFailedRequests().size());
|
|
|
|
+ for (Map.Entry<ContainerId, SerializedException> entry : stopResponse
|
|
|
|
+ .getFailedRequests().entrySet()) {
|
|
|
|
+ // Containers with even id should fail.
|
|
|
|
+ Assert.assertEquals(0, entry.getKey().getId() & 1);
|
|
|
|
+ Assert.assertTrue(entry.getValue().getMessage()
|
|
|
|
+ .contains("Reject this container"));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
|
public static Token createContainerToken(ContainerId cId, long rmIdentifier,
|
|
NodeId nodeId, String user,
|
|
NodeId nodeId, String user,
|
|
NMContainerTokenSecretManager containerTokenSecretManager)
|
|
NMContainerTokenSecretManager containerTokenSecretManager)
|