|
@@ -111,70 +111,66 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
|
|
|
private void loadRMAppState(RMState rmState) throws Exception {
|
|
|
try {
|
|
|
- FileStatus[] childNodes = fs.listStatus(rmAppRoot);
|
|
|
List<ApplicationAttemptState> attempts =
|
|
|
- new ArrayList<ApplicationAttemptState>();
|
|
|
- for(FileStatus childNodeStatus : childNodes) {
|
|
|
- assert childNodeStatus.isFile();
|
|
|
- String childNodeName = childNodeStatus.getPath().getName();
|
|
|
- Path childNodePath = getNodePath(rmAppRoot, childNodeName);
|
|
|
- byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
|
|
|
- if(childNodeName.startsWith(ApplicationId.appIdStrPrefix)){
|
|
|
- // application
|
|
|
- LOG.info("Loading application from node: " + childNodeName);
|
|
|
- ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
|
|
|
- ApplicationStateDataPBImpl appStateData =
|
|
|
- new ApplicationStateDataPBImpl(
|
|
|
- ApplicationStateDataProto.parseFrom(childData));
|
|
|
- ApplicationState appState = new ApplicationState(
|
|
|
- appStateData.getSubmitTime(),
|
|
|
- appStateData.getApplicationSubmissionContext(),
|
|
|
- appStateData.getUser());
|
|
|
- // assert child node name is same as actual applicationId
|
|
|
- assert appId.equals(appState.context.getApplicationId());
|
|
|
- rmState.appState.put(appId, appState);
|
|
|
- } else if(childNodeName.startsWith(
|
|
|
- ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
|
|
- // attempt
|
|
|
- LOG.info("Loading application attempt from node: " + childNodeName);
|
|
|
- ApplicationAttemptId attemptId =
|
|
|
- ConverterUtils.toApplicationAttemptId(childNodeName);
|
|
|
- ApplicationAttemptStateDataPBImpl attemptStateData =
|
|
|
- new ApplicationAttemptStateDataPBImpl(
|
|
|
+ new ArrayList<ApplicationAttemptState>();
|
|
|
+
|
|
|
+ for (FileStatus appDir : fs.listStatus(rmAppRoot)) {
|
|
|
+ for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
|
|
|
+ assert childNodeStatus.isFile();
|
|
|
+ String childNodeName = childNodeStatus.getPath().getName();
|
|
|
+ byte[] childData =
|
|
|
+ readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
|
|
|
+ if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
|
|
+ // application
|
|
|
+ LOG.info("Loading application from node: " + childNodeName);
|
|
|
+ ApplicationId appId = ConverterUtils.toApplicationId(childNodeName);
|
|
|
+ ApplicationStateDataPBImpl appStateData =
|
|
|
+ new ApplicationStateDataPBImpl(
|
|
|
+ ApplicationStateDataProto.parseFrom(childData));
|
|
|
+ ApplicationState appState =
|
|
|
+ new ApplicationState(appStateData.getSubmitTime(),
|
|
|
+ appStateData.getApplicationSubmissionContext(),
|
|
|
+ appStateData.getUser());
|
|
|
+ // assert child node name is same as actual applicationId
|
|
|
+ assert appId.equals(appState.context.getApplicationId());
|
|
|
+ rmState.appState.put(appId, appState);
|
|
|
+ } else if (childNodeName
|
|
|
+ .startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
|
|
+ // attempt
|
|
|
+ LOG.info("Loading application attempt from node: " + childNodeName);
|
|
|
+ ApplicationAttemptId attemptId =
|
|
|
+ ConverterUtils.toApplicationAttemptId(childNodeName);
|
|
|
+ ApplicationAttemptStateDataPBImpl attemptStateData =
|
|
|
+ new ApplicationAttemptStateDataPBImpl(
|
|
|
ApplicationAttemptStateDataProto.parseFrom(childData));
|
|
|
- Credentials credentials = null;
|
|
|
- if(attemptStateData.getAppAttemptTokens() != null){
|
|
|
- credentials = new Credentials();
|
|
|
- DataInputByteBuffer dibb = new DataInputByteBuffer();
|
|
|
- dibb.reset(attemptStateData.getAppAttemptTokens());
|
|
|
- credentials.readTokenStorageStream(dibb);
|
|
|
+ Credentials credentials = null;
|
|
|
+ if (attemptStateData.getAppAttemptTokens() != null) {
|
|
|
+ credentials = new Credentials();
|
|
|
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
|
|
|
+ dibb.reset(attemptStateData.getAppAttemptTokens());
|
|
|
+ credentials.readTokenStorageStream(dibb);
|
|
|
+ }
|
|
|
+ ApplicationAttemptState attemptState =
|
|
|
+ new ApplicationAttemptState(attemptId,
|
|
|
+ attemptStateData.getMasterContainer(), credentials);
|
|
|
+
|
|
|
+ // assert child node name is same as application attempt id
|
|
|
+ assert attemptId.equals(attemptState.getAttemptId());
|
|
|
+ attempts.add(attemptState);
|
|
|
+ } else {
|
|
|
+ LOG.info("Unknown child node with name: " + childNodeName);
|
|
|
}
|
|
|
- ApplicationAttemptState attemptState =
|
|
|
- new ApplicationAttemptState(attemptId,
|
|
|
- attemptStateData.getMasterContainer(), credentials);
|
|
|
-
|
|
|
- // assert child node name is same as application attempt id
|
|
|
- assert attemptId.equals(attemptState.getAttemptId());
|
|
|
- attempts.add(attemptState);
|
|
|
- } else {
|
|
|
- LOG.info("Unknown child node with name: " + childNodeName);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // go through all attempts and add them to their apps
|
|
|
- for(ApplicationAttemptState attemptState : attempts) {
|
|
|
+ // go through all attempts and add them to their apps, Ideally, each
|
|
|
+ // attempt node must have a corresponding app node, because remove
|
|
|
+ // directory operation remove both at the same time
|
|
|
+ for (ApplicationAttemptState attemptState : attempts) {
|
|
|
ApplicationId appId = attemptState.getAttemptId().getApplicationId();
|
|
|
ApplicationState appState = rmState.appState.get(appId);
|
|
|
- if(appState != null) {
|
|
|
- appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
|
|
- } else {
|
|
|
- // the application node may have been removed when the application
|
|
|
- // completed but the RM might have stopped before it could remove the
|
|
|
- // application attempt nodes
|
|
|
- LOG.info("Application node not found for attempt: "
|
|
|
- + attemptState.getAttemptId());
|
|
|
- deleteFile(getNodePath(rmAppRoot, attemptState.getAttemptId().toString()));
|
|
|
- }
|
|
|
+ assert appState != null;
|
|
|
+ appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
LOG.error("Failed to load state.", e);
|
|
@@ -188,6 +184,12 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
for(FileStatus childNodeStatus : childNodes) {
|
|
|
assert childNodeStatus.isFile();
|
|
|
String childNodeName = childNodeStatus.getPath().getName();
|
|
|
+ if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
|
|
|
+ rmState.rmSecretManagerState.dtSequenceNumber =
|
|
|
+ Integer.parseInt(childNodeName.split("_")[1]);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
Path childNodePath = getNodePath(rmDTSecretManagerRoot, childNodeName);
|
|
|
byte[] childData = readFile(childNodePath, childNodeStatus.getLen());
|
|
|
ByteArrayInputStream is = new ByteArrayInputStream(childData);
|
|
@@ -202,10 +204,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
long renewDate = fsIn.readLong();
|
|
|
rmState.rmSecretManagerState.delegationTokenState.put(identifier,
|
|
|
renewDate);
|
|
|
- } else if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
|
|
|
- rmState.rmSecretManagerState.dtSequenceNumber =
|
|
|
- Integer.parseInt(childNodeName.split("_")[1]);
|
|
|
- }else {
|
|
|
+ } else {
|
|
|
LOG.warn("Unknown file for recovering RMDelegationTokenSecretManager");
|
|
|
}
|
|
|
fsIn.close();
|
|
@@ -215,7 +214,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
@Override
|
|
|
public synchronized void storeApplicationState(String appId,
|
|
|
ApplicationStateDataPBImpl appStateDataPB) throws Exception {
|
|
|
- Path nodeCreatePath = getNodePath(rmAppRoot, appId);
|
|
|
+ Path appDirPath = getAppDir(rmAppRoot, appId);
|
|
|
+ fs.mkdirs(appDirPath);
|
|
|
+ Path nodeCreatePath = getNodePath(appDirPath, appId);
|
|
|
|
|
|
LOG.info("Storing info for app: " + appId + " at: " + nodeCreatePath);
|
|
|
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
|
@@ -232,7 +233,11 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
@Override
|
|
|
public synchronized void storeApplicationAttemptState(String attemptId,
|
|
|
ApplicationAttemptStateDataPBImpl attemptStateDataPB) throws Exception {
|
|
|
- Path nodeCreatePath = getNodePath(rmAppRoot, attemptId);
|
|
|
+ ApplicationAttemptId appAttemptId =
|
|
|
+ ConverterUtils.toApplicationAttemptId(attemptId);
|
|
|
+ Path appDirPath =
|
|
|
+ getAppDir(rmAppRoot, appAttemptId.getApplicationId().toString());
|
|
|
+ Path nodeCreatePath = getNodePath(appDirPath, attemptId);
|
|
|
LOG.info("Storing info for attempt: " + attemptId
|
|
|
+ " at: " + nodeCreatePath);
|
|
|
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
|
|
@@ -250,20 +255,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
public synchronized void removeApplicationState(ApplicationState appState)
|
|
|
throws Exception {
|
|
|
String appId = appState.getAppId().toString();
|
|
|
- Path nodeRemovePath = getNodePath(rmAppRoot, appId);
|
|
|
+ Path nodeRemovePath = getAppDir(rmAppRoot, appId);
|
|
|
LOG.info("Removing info for app: " + appId + " at: " + nodeRemovePath);
|
|
|
deleteFile(nodeRemovePath);
|
|
|
- for(ApplicationAttemptId attemptId : appState.attempts.keySet()) {
|
|
|
- removeApplicationAttemptState(attemptId.toString());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- public synchronized void removeApplicationAttemptState(String attemptId)
|
|
|
- throws Exception {
|
|
|
- Path nodeRemovePath = getNodePath(rmAppRoot, attemptId);
|
|
|
- LOG.info("Removing info for attempt: " + attemptId
|
|
|
- + " at: " + nodeRemovePath);
|
|
|
- deleteFile(nodeRemovePath);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -329,6 +323,10 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
deleteFile(nodeCreatePath);
|
|
|
}
|
|
|
|
|
|
+ private Path getAppDir(Path root, String appId) {
|
|
|
+ return getNodePath(root, appId);
|
|
|
+ }
|
|
|
+
|
|
|
// FileSystem related code
|
|
|
|
|
|
private void deleteFile(Path deletePath) throws Exception {
|