|
@@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.ByteArrayOutputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataInputStream;
|
|
import java.io.DataOutputStream;
|
|
import java.io.DataOutputStream;
|
|
|
|
+import java.io.FileNotFoundException;
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
import java.util.ArrayList;
|
|
import java.util.ArrayList;
|
|
import java.util.List;
|
|
import java.util.List;
|
|
@@ -43,16 +44,18 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
+import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
-
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
|
|
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
|
@@ -76,6 +79,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
|
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
|
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
|
|
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
|
|
.newInstance(1, 1);
|
|
.newInstance(1, 1);
|
|
|
|
+ protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
|
|
|
|
+ "AMRMTokenSecretManagerNode";
|
|
|
|
|
|
protected FileSystem fs;
|
|
protected FileSystem fs;
|
|
|
|
|
|
@@ -89,6 +94,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
Path fsWorkingPath;
|
|
Path fsWorkingPath;
|
|
|
|
|
|
|
|
+ Path amrmTokenSecretManagerRoot;
|
|
@Override
|
|
@Override
|
|
public synchronized void initInternal(Configuration conf)
|
|
public synchronized void initInternal(Configuration conf)
|
|
throws Exception{
|
|
throws Exception{
|
|
@@ -96,6 +102,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
|
rootDirPath = new Path(fsWorkingPath, ROOT_DIR_NAME);
|
|
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
|
|
rmDTSecretManagerRoot = new Path(rootDirPath, RM_DT_SECRET_MANAGER_ROOT);
|
|
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
|
|
rmAppRoot = new Path(rootDirPath, RM_APP_ROOT);
|
|
|
|
+ amrmTokenSecretManagerRoot =
|
|
|
|
+ new Path(rootDirPath, AMRMTOKEN_SECRET_MANAGER_ROOT);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -113,6 +121,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
fs = fsWorkingPath.getFileSystem(conf);
|
|
fs = fsWorkingPath.getFileSystem(conf);
|
|
fs.mkdirs(rmDTSecretManagerRoot);
|
|
fs.mkdirs(rmDTSecretManagerRoot);
|
|
fs.mkdirs(rmAppRoot);
|
|
fs.mkdirs(rmAppRoot);
|
|
|
|
+ fs.mkdirs(amrmTokenSecretManagerRoot);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -180,9 +189,32 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
loadRMDTSecretManagerState(rmState);
|
|
loadRMDTSecretManagerState(rmState);
|
|
// recover RM applications
|
|
// recover RM applications
|
|
loadRMAppState(rmState);
|
|
loadRMAppState(rmState);
|
|
|
|
+ // recover AMRMTokenSecretManager
|
|
|
|
+ loadAMRMTokenSecretManagerState(rmState);
|
|
return rmState;
|
|
return rmState;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private void loadAMRMTokenSecretManagerState(RMState rmState)
|
|
|
|
+ throws Exception {
|
|
|
|
+ checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
|
|
|
|
+ Path amrmTokenSecretManagerStateDataDir =
|
|
|
|
+ new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
|
|
|
|
+ FileStatus status;
|
|
|
|
+ try {
|
|
|
|
+ status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
|
|
|
|
+ assert status.isFile();
|
|
|
|
+ } catch (FileNotFoundException ex) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
|
|
|
|
+ AMRMTokenSecretManagerStatePBImpl stateData =
|
|
|
|
+ new AMRMTokenSecretManagerStatePBImpl(
|
|
|
|
+ AMRMTokenSecretManagerStateProto.parseFrom(data));
|
|
|
|
+ rmState.amrmTokenSecretManagerState =
|
|
|
|
+ AMRMTokenSecretManagerState.newInstance(
|
|
|
|
+ stateData.getCurrentMasterKey(), stateData.getNextMasterKey());
|
|
|
|
+ }
|
|
|
|
+
|
|
private void loadRMAppState(RMState rmState) throws Exception {
|
|
private void loadRMAppState(RMState rmState) throws Exception {
|
|
try {
|
|
try {
|
|
List<ApplicationAttemptState> attempts =
|
|
List<ApplicationAttemptState> attempts =
|
|
@@ -597,4 +629,25 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
return new Path(root, nodeName);
|
|
return new Path(root, nodeName);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public synchronized void storeOrUpdateAMRMTokenSecretManagerState(
|
|
|
|
+ AMRMTokenSecretManagerState amrmTokenSecretManagerState,
|
|
|
|
+ boolean isUpdate){
|
|
|
|
+ Path nodeCreatePath =
|
|
|
|
+ getNodePath(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
|
|
|
|
+ AMRMTokenSecretManagerState data =
|
|
|
|
+ AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
|
|
|
|
+ byte[] stateData = data.getProto().toByteArray();
|
|
|
|
+ try {
|
|
|
|
+ if (isUpdate) {
|
|
|
|
+ updateFile(nodeCreatePath, stateData);
|
|
|
|
+ } else {
|
|
|
|
+ writeFile(nodeCreatePath, stateData);
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ LOG.info("Error storing info for AMRMTokenSecretManager", ex);
|
|
|
|
+ notifyStoreOperationFailed(ex);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
}
|
|
}
|