|
@@ -139,8 +139,8 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
@Override
|
|
|
protected synchronized Version loadVersion() throws Exception {
|
|
|
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
|
|
- if (fs.exists(versionNodePath)) {
|
|
|
- FileStatus status = fs.getFileStatus(versionNodePath);
|
|
|
+ FileStatus status = getFileStatus(versionNodePath);
|
|
|
+ if (status != null) {
|
|
|
byte[] data = readFile(versionNodePath, status.getLen());
|
|
|
Version version =
|
|
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
|
@@ -165,9 +165,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
|
|
Path epochNodePath = getNodePath(rootDirPath, EPOCH_NODE);
|
|
|
long currentEpoch = 0;
|
|
|
- if (fs.exists(epochNodePath)) {
|
|
|
+ FileStatus status = getFileStatus(epochNodePath);
|
|
|
+ if (status != null) {
|
|
|
// load current epoch
|
|
|
- FileStatus status = fs.getFileStatus(epochNodePath);
|
|
|
byte[] data = readFile(epochNodePath, status.getLen());
|
|
|
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
|
|
currentEpoch = epoch.getEpoch();
|
|
@@ -201,13 +201,11 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
checkAndResumeUpdateOperation(amrmTokenSecretManagerRoot);
|
|
|
Path amrmTokenSecretManagerStateDataDir =
|
|
|
new Path(amrmTokenSecretManagerRoot, AMRMTOKEN_SECRET_MANAGER_NODE);
|
|
|
- FileStatus status;
|
|
|
- try {
|
|
|
- status = fs.getFileStatus(amrmTokenSecretManagerStateDataDir);
|
|
|
- assert status.isFile();
|
|
|
- } catch (FileNotFoundException ex) {
|
|
|
+ FileStatus status = getFileStatus(amrmTokenSecretManagerStateDataDir);
|
|
|
+ if (status == null) {
|
|
|
return;
|
|
|
}
|
|
|
+ assert status.isFile();
|
|
|
byte[] data = readFile(amrmTokenSecretManagerStateDataDir, status.getLen());
|
|
|
AMRMTokenSecretManagerStatePBImpl stateData =
|
|
|
new AMRMTokenSecretManagerStatePBImpl(
|
|
@@ -466,7 +464,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- protected void updateRMDelegationTokenState(
|
|
|
+ protected synchronized void updateRMDelegationTokenState(
|
|
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
|
|
throws Exception {
|
|
|
storeOrUpdateRMDelegationTokenState(rmDTIdentifier, renewDate, true);
|
|
@@ -560,6 +558,14 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private FileStatus getFileStatus(Path path) throws Exception {
|
|
|
+ try {
|
|
|
+ return fs.getFileStatus(path);
|
|
|
+ } catch (FileNotFoundException e) {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/*
|
|
|
* In order to make this write atomic as a part of write we will first write
|
|
|
* data to .tmp file and then rename it. Here we are assuming that rename is
|