|
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.server.nodemanager.recovery;
|
|
|
|
|
|
import static org.fusesource.leveldbjni.JniDBFactory.asString;
|
|
|
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
|
|
+
|
|
|
+import org.apache.hadoop.yarn.api.records.Token;
|
|
|
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
import java.io.File;
|
|
@@ -45,11 +48,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainerRequestP
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
|
|
-import org.apache.hadoop.yarn.api.records.Resource;
|
|
|
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
|
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
-import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
|
@@ -58,6 +58,7 @@ import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.Deletion
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
|
|
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.StartContainerRequestProto;
|
|
|
+import org.apache.hadoop.yarn.proto.YarnSecurityTokenProtos.ContainerTokenIdentifierProto;
|
|
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
|
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
@@ -117,8 +118,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
private static final String CONTAINER_LAUNCHED_KEY_SUFFIX = "/launched";
|
|
|
private static final String CONTAINER_QUEUED_KEY_SUFFIX = "/queued";
|
|
|
private static final String CONTAINER_PAUSED_KEY_SUFFIX = "/paused";
|
|
|
- private static final String CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX =
|
|
|
- "/resourceChanged";
|
|
|
+ private static final String CONTAINER_UPDATE_TOKEN_SUFFIX =
|
|
|
+ "/updateToken";
|
|
|
private static final String CONTAINER_KILLED_KEY_SUFFIX = "/killed";
|
|
|
private static final String CONTAINER_EXIT_CODE_KEY_SUFFIX = "/exitcode";
|
|
|
private static final String CONTAINER_REMAIN_RETRIES_KEY_SUFFIX =
|
|
@@ -284,9 +285,17 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
} else if (suffix.equals(CONTAINER_EXIT_CODE_KEY_SUFFIX)) {
|
|
|
rcs.status = RecoveredContainerStatus.COMPLETED;
|
|
|
rcs.exitCode = Integer.parseInt(asString(entry.getValue()));
|
|
|
- } else if (suffix.equals(CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX)) {
|
|
|
- rcs.capability = new ResourcePBImpl(
|
|
|
- ResourceProto.parseFrom(entry.getValue()));
|
|
|
+ } else if (suffix.equals(CONTAINER_UPDATE_TOKEN_SUFFIX)) {
|
|
|
+ ContainerTokenIdentifierProto tokenIdentifierProto =
|
|
|
+ ContainerTokenIdentifierProto.parseFrom(entry.getValue());
|
|
|
+ Token currentToken = rcs.getStartRequest().getContainerToken();
|
|
|
+ Token updatedToken = Token
|
|
|
+ .newInstance(tokenIdentifierProto.toByteArray(),
|
|
|
+ ContainerTokenIdentifier.KIND.toString(),
|
|
|
+ currentToken.getPassword().array(), currentToken.getService());
|
|
|
+ rcs.startRequest.setContainerToken(updatedToken);
|
|
|
+ rcs.capability = new ResourcePBImpl(tokenIdentifierProto.getResource());
|
|
|
+ rcs.version = tokenIdentifierProto.getVersion();
|
|
|
} else if (suffix.equals(CONTAINER_REMAIN_RETRIES_KEY_SUFFIX)) {
|
|
|
rcs.setRemainingRetryAttempts(
|
|
|
Integer.parseInt(asString(entry.getValue())));
|
|
@@ -361,6 +370,21 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void removeContainerQueued(ContainerId containerId)
|
|
|
+ throws IOException {
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
+ LOG.debug("removeContainerQueued: containerId=" + containerId);
|
|
|
+ }
|
|
|
+
|
|
|
+ String key = CONTAINERS_KEY_PREFIX + containerId.toString()
|
|
|
+ + CONTAINER_QUEUED_KEY_SUFFIX;
|
|
|
+ try {
|
|
|
+ db.delete(bytes(key));
|
|
|
+ } catch (DBException e) {
|
|
|
+ throw new IOException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
public void storeContainerPaused(ContainerId containerId) throws IOException {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -416,6 +440,8 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
LOG.debug("storeContainerLaunched: containerId=" + containerId);
|
|
|
}
|
|
|
|
|
|
+ // Removing the container if queued for backward compatibility reasons
|
|
|
+ removeContainerQueued(containerId);
|
|
|
String key = CONTAINERS_KEY_PREFIX + containerId.toString()
|
|
|
+ CONTAINER_LAUNCHED_KEY_SUFFIX;
|
|
|
try {
|
|
@@ -426,24 +452,25 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void storeContainerResourceChanged(ContainerId containerId,
|
|
|
- int containerVersion, Resource capability) throws IOException {
|
|
|
+ public void storeContainerUpdateToken(ContainerId containerId,
|
|
|
+ ContainerTokenIdentifier containerTokenIdentifier) throws IOException {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
- LOG.debug("storeContainerResourceChanged: containerId=" + containerId
|
|
|
- + ", capability=" + capability);
|
|
|
+ LOG.debug("storeContainerUpdateToken: containerId=" + containerId);
|
|
|
}
|
|
|
|
|
|
- String keyResChng = CONTAINERS_KEY_PREFIX + containerId.toString()
|
|
|
- + CONTAINER_RESOURCE_CHANGED_KEY_SUFFIX;
|
|
|
+ String keyUpdateToken = CONTAINERS_KEY_PREFIX + containerId.toString()
|
|
|
+ + CONTAINER_UPDATE_TOKEN_SUFFIX;
|
|
|
String keyVersion = CONTAINERS_KEY_PREFIX + containerId.toString()
|
|
|
+ CONTAINER_VERSION_KEY_SUFFIX;
|
|
|
+
|
|
|
try {
|
|
|
WriteBatch batch = db.createWriteBatch();
|
|
|
try {
|
|
|
// New value will overwrite old values for the same key
|
|
|
- batch.put(bytes(keyResChng),
|
|
|
- ProtoUtils.convertToProtoFormat(capability).toByteArray());
|
|
|
- batch.put(bytes(keyVersion), bytes(Integer.toString(containerVersion)));
|
|
|
+ batch.put(bytes(keyUpdateToken),
|
|
|
+ containerTokenIdentifier.getProto().toByteArray());
|
|
|
+ batch.put(bytes(keyVersion),
|
|
|
+ bytes(Integer.toString(containerTokenIdentifier.getVersion())));
|
|
|
db.write(batch);
|
|
|
} finally {
|
|
|
batch.close();
|
|
@@ -539,6 +566,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
|
|
batch.delete(bytes(keyPrefix + CONTAINER_PAUSED_KEY_SUFFIX));
|
|
|
batch.delete(bytes(keyPrefix + CONTAINER_KILLED_KEY_SUFFIX));
|
|
|
batch.delete(bytes(keyPrefix + CONTAINER_EXIT_CODE_KEY_SUFFIX));
|
|
|
+ batch.delete(bytes(keyPrefix + CONTAINER_UPDATE_TOKEN_SUFFIX));
|
|
|
List<String> unknownKeysForContainer = containerUnknownKeySuffixes
|
|
|
.removeAll(containerId);
|
|
|
for (String unknownKeySuffix : unknownKeysForContainer) {
|