|
@@ -18,24 +18,10 @@
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
-import java.nio.ByteBuffer;
|
|
|
-import java.security.PrivilegedExceptionAction;
|
|
|
-import java.util.Arrays;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.EnumSet;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
-
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
-import org.apache.hadoop.io.DataOutputBuffer;
|
|
|
-import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
|
|
-import org.apache.hadoop.security.Credentials;
|
|
|
-import org.apache.hadoop.security.UserGroupInformation;
|
|
|
import org.apache.hadoop.security.token.Token;
|
|
|
+import org.apache.hadoop.test.GenericTestUtils;
|
|
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.FailApplicationAttemptResponse;
|
|
@@ -48,25 +34,16 @@ import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
|
|
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
|
|
|
-import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationResponse;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
|
|
-import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
|
|
-import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeState;
|
|
|
-import org.apache.hadoop.yarn.api.records.Priority;
|
|
|
import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
|
|
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
|
|
|
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
@@ -112,10 +89,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSe
|
|
|
import org.apache.hadoop.yarn.util.Records;
|
|
|
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
|
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
|
|
-import org.slf4j.event.Level;
|
|
|
+import org.junit.Assert;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
-import org.junit.Assert;
|
|
|
+import org.slf4j.event.Level;
|
|
|
+
|
|
|
+import java.io.IOException;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.Collection;
|
|
|
+import java.util.EnumSet;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
|
|
|
|
|
|
@SuppressWarnings("unchecked")
|
|
@@ -514,408 +499,6 @@ public class MockRM extends ResourceManager {
|
|
|
.newRecord(GetNewApplicationRequest.class));
|
|
|
}
|
|
|
|
|
|
- public RMApp submitApp(int masterMemory) throws Exception {
|
|
|
- return submitApp(masterMemory, false);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String queue) throws Exception {
|
|
|
- return submitApp(masterMemory, "",
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
|
|
|
- queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS),
|
|
|
- null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, Set<String> appTags)
|
|
|
- throws Exception {
|
|
|
- return submitApp(masterMemory, null, false, null, Priority.newInstance(0),
|
|
|
- appTags);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String queue,
|
|
|
- boolean isAppIdProvided, ApplicationId appId, Priority priority,
|
|
|
- Set<String> appTags) throws Exception {
|
|
|
- Resource resource = Resource.newInstance(masterMemory, 0);
|
|
|
- ResourceRequest amResourceRequest = ResourceRequest.newInstance(
|
|
|
- Priority.newInstance(0), ResourceRequest.ANY, resource, 1);
|
|
|
- return submitApp(Collections.singletonList(amResourceRequest), "",
|
|
|
- UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
|
|
|
- queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
|
|
- false, isAppIdProvided, appId, 0, null, true, priority, null,
|
|
|
- null, null, appTags);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, Priority priority) throws Exception {
|
|
|
- Resource resource = Resource.newInstance(masterMemory, 0);
|
|
|
- return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
|
|
- .getShortUserName(), null, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
|
|
- false, false, null, 0, null, true, priority);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, Priority priority,
|
|
|
- Credentials cred, ByteBuffer tokensConf) throws Exception {
|
|
|
- Resource resource = Resource.newInstance(masterMemory, 0);
|
|
|
- return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
|
|
- .getShortUserName(), null, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), cred, null, true,
|
|
|
- false, false, null, 0, null, true, priority, null, null,
|
|
|
- tokensConf);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, boolean unmanaged)
|
|
|
- throws Exception {
|
|
|
- return submitApp(masterMemory, "", UserGroupInformation.getCurrentUser()
|
|
|
- .getShortUserName(), unmanaged);
|
|
|
- }
|
|
|
-
|
|
|
- // client
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user) throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, false);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- boolean unmanaged)
|
|
|
- throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, null, unmanaged, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls) throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, acls, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, String queue) throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, acls, false, queue,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, String queue, String amLabel)
|
|
|
- throws Exception {
|
|
|
- Resource resource = Records.newRecord(Resource.class);
|
|
|
- resource.setMemorySize(masterMemory);
|
|
|
- Priority priority = Priority.newInstance(0);
|
|
|
- return submitApp(resource, name, user, acls, false, queue,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
|
|
- false, null, 0, null, true, priority, amLabel, null, null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(Resource resource, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, String queue) throws Exception {
|
|
|
- return submitApp(resource, name, user, acls, false, queue,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
|
|
- true, false, false, null, 0, null, true, null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(Resource resource, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unManaged, String queue)
|
|
|
- throws Exception {
|
|
|
- return submitApp(resource, name, user, acls, unManaged, queue,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
|
|
- false, false, null, 0, null, true, null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, String queue,
|
|
|
- boolean waitForAccepted) throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, acls, false, queue,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null,
|
|
|
- waitForAccepted);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
|
|
- int maxAppAttempts, Credentials ts) throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
|
|
- int maxAppAttempts, Credentials ts, String appType) throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, appType, true);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
|
|
- int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted)
|
|
|
- throws Exception {
|
|
|
- return submitApp(masterMemory, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, appType, waitForAccepted, false);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
|
|
- int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted, boolean keepContainers) throws Exception {
|
|
|
- Resource resource = Records.newRecord(Resource.class);
|
|
|
- resource.setMemorySize(masterMemory);
|
|
|
- return submitApp(resource, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
|
|
- false, null, 0, null, true, Priority.newInstance(0));
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, long attemptFailuresValidityInterval,
|
|
|
- boolean keepContainers) throws Exception {
|
|
|
- Resource resource = Records.newRecord(Resource.class);
|
|
|
- resource.setMemorySize(masterMemory);
|
|
|
- Priority priority = Priority.newInstance(0);
|
|
|
- return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
|
|
- .getShortUserName(), null, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, keepContainers,
|
|
|
- false, null, attemptFailuresValidityInterval, null, true, priority);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
|
|
- int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
|
|
- ApplicationId applicationId) throws Exception {
|
|
|
- Resource resource = Records.newRecord(Resource.class);
|
|
|
- resource.setMemorySize(masterMemory);
|
|
|
- Priority priority = Priority.newInstance(0);
|
|
|
- return submitApp(resource, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
|
|
- isAppIdProvided, applicationId, 0, null, true, priority);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(int masterMemory,
|
|
|
- LogAggregationContext logAggregationContext) throws Exception {
|
|
|
- Resource resource = Records.newRecord(Resource.class);
|
|
|
- resource.setMemorySize(masterMemory);
|
|
|
- Priority priority = Priority.newInstance(0);
|
|
|
- return submitApp(resource, "", UserGroupInformation.getCurrentUser()
|
|
|
- .getShortUserName(), null, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, false,
|
|
|
- false, null, 0, logAggregationContext, true, priority);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(Resource capability, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
|
|
- int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
|
|
- ApplicationId applicationId, long attemptFailuresValidityInterval,
|
|
|
- LogAggregationContext logAggregationContext,
|
|
|
- boolean cancelTokensWhenComplete, Priority priority) throws Exception {
|
|
|
- return submitApp(capability, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
|
|
- isAppIdProvided, applicationId, attemptFailuresValidityInterval,
|
|
|
- logAggregationContext, cancelTokensWhenComplete, priority, "", null,
|
|
|
- null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(Credentials cred, ByteBuffer tokensConf)
|
|
|
- throws Exception {
|
|
|
- return submitApp(Resource.newInstance(200, 1), "app1", "user", null, false,
|
|
|
- null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), cred, null, true,
|
|
|
- false, false, null, 0, null, true, Priority.newInstance(0), null, null,
|
|
|
- tokensConf);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(List<ResourceRequest> amResourceRequests)
|
|
|
- throws Exception {
|
|
|
- return submitApp(amResourceRequests, "app1",
|
|
|
- "user", null, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
|
|
- false, false, null, 0, null, true,
|
|
|
- amResourceRequests.get(0).getPriority(),
|
|
|
- amResourceRequests.get(0).getNodeLabelExpression(), null, null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(List<ResourceRequest> amResourceRequests,
|
|
|
- String appNodeLabel) throws Exception {
|
|
|
- return submitApp(amResourceRequests, "app1", "user", null, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
|
|
- false, false, null, 0, null, true,
|
|
|
- amResourceRequests.get(0).getPriority(),
|
|
|
- amResourceRequests.get(0).getNodeLabelExpression(), null, null, null,
|
|
|
- appNodeLabel);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(Resource capability, String name, String user,
|
|
|
- Map<ApplicationAccessType, String> acls, boolean unmanaged, String queue,
|
|
|
- int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
|
|
- ApplicationId applicationId, long attemptFailuresValidityInterval,
|
|
|
- LogAggregationContext logAggregationContext,
|
|
|
- boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
|
|
- Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
|
|
- ByteBuffer tokensConf)
|
|
|
- throws Exception {
|
|
|
- priority = (priority == null) ? Priority.newInstance(0) : priority;
|
|
|
- ResourceRequest amResourceRequest = ResourceRequest.newInstance(
|
|
|
- priority, ResourceRequest.ANY, capability, 1);
|
|
|
- if (amLabel != null && !amLabel.isEmpty()) {
|
|
|
- amResourceRequest.setNodeLabelExpression(amLabel.trim());
|
|
|
- }
|
|
|
- return submitApp(Collections.singletonList(amResourceRequest), name, user,
|
|
|
- acls, unmanaged, queue, maxAppAttempts, ts, appType, waitForAccepted,
|
|
|
- keepContainers, isAppIdProvided, applicationId,
|
|
|
- attemptFailuresValidityInterval, logAggregationContext,
|
|
|
- cancelTokensWhenComplete, priority, amLabel, applicationTimeouts,
|
|
|
- tokensConf);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
|
|
|
- String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
|
|
|
- String queue, int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
|
|
- ApplicationId applicationId, long attemptFailuresValidityInterval,
|
|
|
- LogAggregationContext logAggregationContext,
|
|
|
- boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
|
|
- Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
|
|
- ByteBuffer tokensConf) throws Exception {
|
|
|
- return submitApp(amResourceRequests, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
|
|
- isAppIdProvided, applicationId, attemptFailuresValidityInterval,
|
|
|
- logAggregationContext, cancelTokensWhenComplete, priority, amLabel,
|
|
|
- applicationTimeouts, tokensConf, null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
|
|
|
- String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
|
|
|
- String queue, int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
|
|
- ApplicationId applicationId, long attemptFailuresValidityInterval,
|
|
|
- LogAggregationContext logAggregationContext,
|
|
|
- boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
|
|
- Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
|
|
- ByteBuffer tokensConf, Set<String> applicationTags) throws Exception {
|
|
|
- return submitApp(amResourceRequests, name, user, acls, unmanaged, queue,
|
|
|
- maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
|
|
|
- isAppIdProvided, applicationId, attemptFailuresValidityInterval,
|
|
|
- logAggregationContext, cancelTokensWhenComplete, priority, amLabel,
|
|
|
- applicationTimeouts, tokensConf, applicationTags, null);
|
|
|
- }
|
|
|
-
|
|
|
- public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
|
|
|
- String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
|
|
|
- String queue, int maxAppAttempts, Credentials ts, String appType,
|
|
|
- boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
|
|
|
- ApplicationId applicationId, long attemptFailuresValidityInterval,
|
|
|
- LogAggregationContext logAggregationContext,
|
|
|
- boolean cancelTokensWhenComplete, Priority priority, String amLabel,
|
|
|
- Map<ApplicationTimeoutType, Long> applicationTimeouts,
|
|
|
- ByteBuffer tokensConf, Set<String> applicationTags, String appNodeLabel)
|
|
|
- throws Exception {
|
|
|
- ApplicationId appId = isAppIdProvided ? applicationId : null;
|
|
|
- ApplicationClientProtocol client = getClientRMService();
|
|
|
- if (! isAppIdProvided) {
|
|
|
- GetNewApplicationResponse resp = client.getNewApplication(Records
|
|
|
- .newRecord(GetNewApplicationRequest.class));
|
|
|
- appId = resp.getApplicationId();
|
|
|
- }
|
|
|
- SubmitApplicationRequest req = Records
|
|
|
- .newRecord(SubmitApplicationRequest.class);
|
|
|
- ApplicationSubmissionContext sub = Records
|
|
|
- .newRecord(ApplicationSubmissionContext.class);
|
|
|
- sub.setKeepContainersAcrossApplicationAttempts(keepContainers);
|
|
|
- sub.setApplicationId(appId);
|
|
|
- sub.setApplicationName(name);
|
|
|
- sub.setMaxAppAttempts(maxAppAttempts);
|
|
|
- if (applicationTags != null) {
|
|
|
- sub.setApplicationTags(applicationTags);
|
|
|
- }
|
|
|
- if (applicationTimeouts != null && applicationTimeouts.size() > 0) {
|
|
|
- sub.setApplicationTimeouts(applicationTimeouts);
|
|
|
- }
|
|
|
- if (unmanaged) {
|
|
|
- sub.setUnmanagedAM(true);
|
|
|
- }
|
|
|
- if (queue != null) {
|
|
|
- sub.setQueue(queue);
|
|
|
- }
|
|
|
- if (priority != null) {
|
|
|
- sub.setPriority(priority);
|
|
|
- }
|
|
|
- if (appNodeLabel != null) {
|
|
|
- sub.setNodeLabelExpression(appNodeLabel);
|
|
|
- }
|
|
|
- sub.setApplicationType(appType);
|
|
|
- ContainerLaunchContext clc = Records
|
|
|
- .newRecord(ContainerLaunchContext.class);
|
|
|
- clc.setApplicationACLs(acls);
|
|
|
- if (ts != null && UserGroupInformation.isSecurityEnabled()) {
|
|
|
- DataOutputBuffer dob = new DataOutputBuffer();
|
|
|
- ts.writeTokenStorageToStream(dob);
|
|
|
- ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
|
|
|
- clc.setTokens(securityTokens);
|
|
|
- clc.setTokensConf(tokensConf);
|
|
|
- }
|
|
|
- sub.setAMContainerSpec(clc);
|
|
|
- sub.setAttemptFailuresValidityInterval(attemptFailuresValidityInterval);
|
|
|
- if (logAggregationContext != null) {
|
|
|
- sub.setLogAggregationContext(logAggregationContext);
|
|
|
- }
|
|
|
- sub.setCancelTokensWhenComplete(cancelTokensWhenComplete);
|
|
|
- if (amLabel != null && !amLabel.isEmpty()) {
|
|
|
- for (ResourceRequest amResourceRequest : amResourceRequests) {
|
|
|
- amResourceRequest.setNodeLabelExpression(amLabel.trim());
|
|
|
- }
|
|
|
- }
|
|
|
- sub.setAMContainerResourceRequests(amResourceRequests);
|
|
|
- req.setApplicationSubmissionContext(sub);
|
|
|
- UserGroupInformation fakeUser =
|
|
|
- UserGroupInformation.createUserForTesting(user, new String[] {"someGroup"});
|
|
|
- PrivilegedExceptionAction<SubmitApplicationResponse> action =
|
|
|
- new PrivilegedExceptionAction<SubmitApplicationResponse>() {
|
|
|
- ApplicationClientProtocol client;
|
|
|
- SubmitApplicationRequest req;
|
|
|
- @Override
|
|
|
- public SubmitApplicationResponse run() throws IOException, YarnException {
|
|
|
- try {
|
|
|
- return client.submitApplication(req);
|
|
|
- } catch (YarnException | IOException e) {
|
|
|
- e.printStackTrace();
|
|
|
- throw e;
|
|
|
- }
|
|
|
- }
|
|
|
- PrivilegedExceptionAction<SubmitApplicationResponse> setClientReq(
|
|
|
- ApplicationClientProtocol client, SubmitApplicationRequest req) {
|
|
|
- this.client = client;
|
|
|
- this.req = req;
|
|
|
- return this;
|
|
|
- }
|
|
|
- }.setClientReq(client, req);
|
|
|
- fakeUser.doAs(action);
|
|
|
- // make sure app is immediately available after submit
|
|
|
- if (waitForAccepted) {
|
|
|
- waitForState(appId, RMAppState.ACCEPTED);
|
|
|
- }
|
|
|
- RMApp rmApp = getRMContext().getRMApps().get(appId);
|
|
|
-
|
|
|
- // unmanaged AM won't go to RMAppAttemptState.SCHEDULED.
|
|
|
- if (waitForAccepted && !unmanaged) {
|
|
|
- waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
|
|
|
- RMAppAttemptState.SCHEDULED);
|
|
|
- }
|
|
|
-
|
|
|
- ((AbstractYarnScheduler)getResourceScheduler()).update();
|
|
|
-
|
|
|
- return rmApp;
|
|
|
- }
|
|
|
-
|
|
|
public MockNM unRegisterNode(MockNM nm) throws Exception {
|
|
|
nm.unRegisterNode();
|
|
|
drainEventsImplicitly();
|
|
@@ -1424,17 +1007,6 @@ public class MockRM extends ResourceManager {
|
|
|
disableDrainEventsImplicitly = false;
|
|
|
}
|
|
|
|
|
|
- public RMApp submitApp(int masterMemory, Priority priority,
|
|
|
- Map<ApplicationTimeoutType, Long> applicationTimeouts) throws Exception {
|
|
|
- Resource resource = Resource.newInstance(masterMemory, 0);
|
|
|
- return submitApp(
|
|
|
- resource, "", UserGroupInformation.getCurrentUser().getShortUserName(),
|
|
|
- null, false, null,
|
|
|
- super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
|
|
|
- YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
|
|
|
- false, false, null, 0, null, true, priority, null, applicationTimeouts,
|
|
|
- null);
|
|
|
- }
|
|
|
|
|
|
@Override
|
|
|
protected void serviceInit(Configuration conf) throws Exception {
|