|
@@ -74,6 +74,7 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
|
|
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
|
|
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
|
|
+import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
import org.apache.hadoop.yarn.api.records.SerializedException;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.LogAggregationContextPBImpl;
|
|
@@ -95,6 +96,7 @@ import org.apache.hadoop.yarn.security.NMTokenIdentifier;
|
|
|
import org.apache.hadoop.yarn.server.api.ContainerType;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedAppsEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.CMgrCompletedContainersEvent;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.CMgrDecreaseContainersResourceEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.ContainerManagerEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
|
@@ -113,6 +115,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Ap
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationInitEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
|
|
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ChangeContainerResourceEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
|
|
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
|
|
@@ -141,6 +144,7 @@ import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
import com.google.protobuf.ByteString;
|
|
|
+import org.apache.hadoop.yarn.util.resource.Resources;
|
|
|
|
|
|
public class ContainerManagerImpl extends CompositeService implements
|
|
|
ServiceStateChangeListener, ContainerManagementProtocol,
|
|
@@ -681,33 +685,45 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
|
|
|
/**
|
|
|
* @param containerTokenIdentifier
|
|
|
- * of the container to be started
|
|
|
+ * of the container whose resource is to be started or increased
|
|
|
* @throws YarnException
|
|
|
*/
|
|
|
@Private
|
|
|
@VisibleForTesting
|
|
|
- protected void authorizeStartRequest(NMTokenIdentifier nmTokenIdentifier,
|
|
|
- ContainerTokenIdentifier containerTokenIdentifier) throws YarnException {
|
|
|
+ protected void authorizeStartAndResourceIncreaseRequest(
|
|
|
+ NMTokenIdentifier nmTokenIdentifier,
|
|
|
+ ContainerTokenIdentifier containerTokenIdentifier,
|
|
|
+ boolean startRequest)
|
|
|
+ throws YarnException {
|
|
|
if (nmTokenIdentifier == null) {
|
|
|
throw RPCUtil.getRemoteException(INVALID_NMTOKEN_MSG);
|
|
|
}
|
|
|
if (containerTokenIdentifier == null) {
|
|
|
throw RPCUtil.getRemoteException(INVALID_CONTAINERTOKEN_MSG);
|
|
|
}
|
|
|
+ /*
|
|
|
+ * Check the following:
|
|
|
+ * 1. The request comes from the same application attempt
|
|
|
+ * 2. The request possess a container token that has not expired
|
|
|
+ * 3. The request possess a container token that is granted by a known RM
|
|
|
+ */
|
|
|
ContainerId containerId = containerTokenIdentifier.getContainerID();
|
|
|
String containerIDStr = containerId.toString();
|
|
|
boolean unauthorized = false;
|
|
|
StringBuilder messageBuilder =
|
|
|
- new StringBuilder("Unauthorized request to start container. ");
|
|
|
+ new StringBuilder("Unauthorized request to " + (startRequest ?
|
|
|
+ "start container." : "increase container resource."));
|
|
|
if (!nmTokenIdentifier.getApplicationAttemptId().getApplicationId().
|
|
|
equals(containerId.getApplicationAttemptId().getApplicationId())) {
|
|
|
unauthorized = true;
|
|
|
messageBuilder.append("\nNMToken for application attempt : ")
|
|
|
.append(nmTokenIdentifier.getApplicationAttemptId())
|
|
|
- .append(" was used for starting container with container token")
|
|
|
+ .append(" was used for "
|
|
|
+ + (startRequest ? "starting " : "increasing resource of ")
|
|
|
+ + "container with container token")
|
|
|
.append(" issued for application attempt : ")
|
|
|
.append(containerId.getApplicationAttemptId());
|
|
|
- } else if (!this.context.getContainerTokenSecretManager()
|
|
|
+ } else if (startRequest && !this.context.getContainerTokenSecretManager()
|
|
|
.isValidStartContainerRequest(containerTokenIdentifier)) {
|
|
|
// Is the container being relaunched? Or RPC layer let startCall with
|
|
|
// tokens generated off old-secret through?
|
|
@@ -729,6 +745,14 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
LOG.error(msg);
|
|
|
throw RPCUtil.getRemoteException(msg);
|
|
|
}
|
|
|
+ if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
|
|
|
+ .getRMIdentifier()) {
|
|
|
+ // Is the container coming from unknown RM
|
|
|
+ StringBuilder sb = new StringBuilder("\nContainer ");
|
|
|
+ sb.append(containerTokenIdentifier.getContainerID().toString())
|
|
|
+ .append(" rejected as it is allocated by a previous RM");
|
|
|
+ throw new InvalidContainerException(sb.toString());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -745,7 +769,7 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
}
|
|
|
UserGroupInformation remoteUgi = getRemoteUgi();
|
|
|
NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
|
|
|
- authorizeUser(remoteUgi,nmTokenIdentifier);
|
|
|
+ authorizeUser(remoteUgi, nmTokenIdentifier);
|
|
|
List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
|
|
|
Map<ContainerId, SerializedException> failedContainers =
|
|
|
new HashMap<ContainerId, SerializedException>();
|
|
@@ -844,16 +868,8 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
* belongs to correct Node Manager (part of retrieve password). c) It has
|
|
|
* correct RMIdentifier. d) It is not expired.
|
|
|
*/
|
|
|
- authorizeStartRequest(nmTokenIdentifier, containerTokenIdentifier);
|
|
|
-
|
|
|
- if (containerTokenIdentifier.getRMIdentifier() != nodeStatusUpdater
|
|
|
- .getRMIdentifier()) {
|
|
|
- // Is the container coming from unknown RM
|
|
|
- StringBuilder sb = new StringBuilder("\nContainer ");
|
|
|
- sb.append(containerTokenIdentifier.getContainerID().toString())
|
|
|
- .append(" rejected as it is allocated by a previous RM");
|
|
|
- throw new InvalidContainerException(sb.toString());
|
|
|
- }
|
|
|
+ authorizeStartAndResourceIncreaseRequest(
|
|
|
+ nmTokenIdentifier, containerTokenIdentifier, true);
|
|
|
// update NMToken
|
|
|
updateNMTokenIdentifier(nmTokenIdentifier);
|
|
|
|
|
@@ -960,9 +976,118 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
@Override
|
|
|
public IncreaseContainersResourceResponse increaseContainersResource(
|
|
|
IncreaseContainersResourceRequest requests)
|
|
|
- throws YarnException, IOException {
|
|
|
- // To be implemented in YARN-1645
|
|
|
- return null;
|
|
|
+ throws YarnException, IOException {
|
|
|
+ if (blockNewContainerRequests.get()) {
|
|
|
+ throw new NMNotYetReadyException(
|
|
|
+ "Rejecting container resource increase as NodeManager has not"
|
|
|
+ + " yet connected with ResourceManager");
|
|
|
+ }
|
|
|
+ UserGroupInformation remoteUgi = getRemoteUgi();
|
|
|
+ NMTokenIdentifier nmTokenIdentifier = selectNMTokenIdentifier(remoteUgi);
|
|
|
+ authorizeUser(remoteUgi, nmTokenIdentifier);
|
|
|
+ List<ContainerId> successfullyIncreasedContainers
|
|
|
+ = new ArrayList<ContainerId>();
|
|
|
+ Map<ContainerId, SerializedException> failedContainers =
|
|
|
+ new HashMap<ContainerId, SerializedException>();
|
|
|
+ // Process container resource increase requests
|
|
|
+ for (org.apache.hadoop.yarn.api.records.Token token :
|
|
|
+ requests.getContainersToIncrease()) {
|
|
|
+ ContainerId containerId = null;
|
|
|
+ try {
|
|
|
+ if (token.getIdentifier() == null) {
|
|
|
+ throw new IOException(INVALID_CONTAINERTOKEN_MSG);
|
|
|
+ }
|
|
|
+ ContainerTokenIdentifier containerTokenIdentifier =
|
|
|
+ BuilderUtils.newContainerTokenIdentifier(token);
|
|
|
+ verifyAndGetContainerTokenIdentifier(token,
|
|
|
+ containerTokenIdentifier);
|
|
|
+ authorizeStartAndResourceIncreaseRequest(
|
|
|
+ nmTokenIdentifier, containerTokenIdentifier, false);
|
|
|
+ containerId = containerTokenIdentifier.getContainerID();
|
|
|
+ // Reuse the startContainer logic to update NMToken,
|
|
|
+ // as container resource increase request will have come with
|
|
|
+ // an updated NMToken.
|
|
|
+ updateNMTokenIdentifier(nmTokenIdentifier);
|
|
|
+ Resource resource = containerTokenIdentifier.getResource();
|
|
|
+ changeContainerResourceInternal(containerId, resource, true);
|
|
|
+ successfullyIncreasedContainers.add(containerId);
|
|
|
+ } catch (YarnException | InvalidToken e) {
|
|
|
+ failedContainers.put(containerId, SerializedException.newInstance(e));
|
|
|
+ } catch (IOException e) {
|
|
|
+ throw RPCUtil.getRemoteException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return IncreaseContainersResourceResponse.newInstance(
|
|
|
+ successfullyIncreasedContainers, failedContainers);
|
|
|
+ }
|
|
|
+
|
|
|
+ @SuppressWarnings("unchecked")
|
|
|
+ private void changeContainerResourceInternal(
|
|
|
+ ContainerId containerId, Resource targetResource, boolean increase)
|
|
|
+ throws YarnException, IOException {
|
|
|
+ Container container = context.getContainers().get(containerId);
|
|
|
+ // Check container existence
|
|
|
+ if (container == null) {
|
|
|
+ if (nodeStatusUpdater.isContainerRecentlyStopped(containerId)) {
|
|
|
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
|
|
+ + " was recently stopped on node manager.");
|
|
|
+ } else {
|
|
|
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
|
|
+ + " is not handled by this NodeManager");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // Check container state
|
|
|
+ org.apache.hadoop.yarn.server.nodemanager.
|
|
|
+ containermanager.container.ContainerState currentState =
|
|
|
+ container.getContainerState();
|
|
|
+ if (currentState != org.apache.hadoop.yarn.server.
|
|
|
+ nodemanager.containermanager.container.ContainerState.RUNNING) {
|
|
|
+ throw RPCUtil.getRemoteException("Container " + containerId.toString()
|
|
|
+ + " is in " + currentState.name() + " state."
|
|
|
+ + " Resource can only be changed when a container is in"
|
|
|
+ + " RUNNING state");
|
|
|
+ }
|
|
|
+ // Check validity of the target resource.
|
|
|
+ Resource currentResource = container.getResource();
|
|
|
+ if (currentResource.equals(targetResource)) {
|
|
|
+ LOG.warn("Unable to change resource for container "
|
|
|
+ + containerId.toString()
|
|
|
+ + ". The target resource "
|
|
|
+ + targetResource.toString()
|
|
|
+ + " is the same as the current resource");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (increase && !Resources.fitsIn(currentResource, targetResource)) {
|
|
|
+ throw RPCUtil.getRemoteException("Unable to increase resource for "
|
|
|
+ + "container " + containerId.toString()
|
|
|
+ + ". The target resource "
|
|
|
+ + targetResource.toString()
|
|
|
+ + " is smaller than the current resource "
|
|
|
+ + currentResource.toString());
|
|
|
+ }
|
|
|
+ if (!increase &&
|
|
|
+ (!Resources.fitsIn(Resources.none(), targetResource)
|
|
|
+ || !Resources.fitsIn(targetResource, currentResource))) {
|
|
|
+ throw RPCUtil.getRemoteException("Unable to decrease resource for "
|
|
|
+ + "container " + containerId.toString()
|
|
|
+ + ". The target resource "
|
|
|
+ + targetResource.toString()
|
|
|
+ + " is not smaller than the current resource "
|
|
|
+ + currentResource.toString());
|
|
|
+ }
|
|
|
+ this.readLock.lock();
|
|
|
+ try {
|
|
|
+ if (!serviceStopped) {
|
|
|
+ dispatcher.getEventHandler().handle(new ChangeContainerResourceEvent(
|
|
|
+ containerId, targetResource));
|
|
|
+ } else {
|
|
|
+ throw new YarnException(
|
|
|
+ "Unable to change container resource as the NodeManager is "
|
|
|
+ + "in the process of shutting down");
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ this.readLock.unlock();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Private
|
|
@@ -1182,6 +1307,21 @@ public class ContainerManagerImpl extends CompositeService implements
|
|
|
"Container Killed by ResourceManager"));
|
|
|
}
|
|
|
break;
|
|
|
+ case DECREASE_CONTAINERS_RESOURCE:
|
|
|
+ CMgrDecreaseContainersResourceEvent containersDecreasedEvent =
|
|
|
+ (CMgrDecreaseContainersResourceEvent) event;
|
|
|
+ for (org.apache.hadoop.yarn.api.records.Container container
|
|
|
+ : containersDecreasedEvent.getContainersToDecrease()) {
|
|
|
+ try {
|
|
|
+ changeContainerResourceInternal(container.getId(),
|
|
|
+ container.getResource(), false);
|
|
|
+ } catch (YarnException e) {
|
|
|
+ LOG.error("Unable to decrease container resource", e);
|
|
|
+ } catch (IOException e) {
|
|
|
+ LOG.error("Unable to update container resource in store", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ break;
|
|
|
default:
|
|
|
throw new YarnRuntimeException(
|
|
|
"Got an unknown ContainerManagerEvent type: " + event.getType());
|