|
@@ -391,7 +391,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
|
byte[] data =
|
|
byte[] data =
|
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
|
- if (existsWithRetries(versionNodePath, true) != null) {
|
|
|
|
|
|
+ if (existsWithRetries(versionNodePath, false) != null) {
|
|
setDataWithRetries(versionNodePath, data, -1);
|
|
setDataWithRetries(versionNodePath, data, -1);
|
|
} else {
|
|
} else {
|
|
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
|
|
createWithRetries(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
|
|
@@ -402,8 +402,8 @@ public class ZKRMStateStore extends RMStateStore {
|
|
protected synchronized Version loadVersion() throws Exception {
|
|
protected synchronized Version loadVersion() throws Exception {
|
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
|
|
|
|
|
- if (existsWithRetries(versionNodePath, true) != null) {
|
|
|
|
- byte[] data = getDataWithRetries(versionNodePath, true);
|
|
|
|
|
|
+ if (existsWithRetries(versionNodePath, false) != null) {
|
|
|
|
+ byte[] data = getDataWithRetries(versionNodePath, false);
|
|
Version version =
|
|
Version version =
|
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
|
return version;
|
|
return version;
|
|
@@ -415,9 +415,9 @@ public class ZKRMStateStore extends RMStateStore {
|
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
|
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
|
String epochNodePath = getNodePath(zkRootNodePath, EPOCH_NODE);
|
|
long currentEpoch = 0;
|
|
long currentEpoch = 0;
|
|
- if (existsWithRetries(epochNodePath, true) != null) {
|
|
|
|
|
|
+ if (existsWithRetries(epochNodePath, false) != null) {
|
|
// load current epoch
|
|
// load current epoch
|
|
- byte[] data = getDataWithRetries(epochNodePath, true);
|
|
|
|
|
|
+ byte[] data = getDataWithRetries(epochNodePath, false);
|
|
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
|
Epoch epoch = new EpochPBImpl(EpochProto.parseFrom(data));
|
|
currentEpoch = epoch.getEpoch();
|
|
currentEpoch = epoch.getEpoch();
|
|
// increment epoch and store it
|
|
// increment epoch and store it
|
|
@@ -447,7 +447,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
|
|
|
private void loadAMRMTokenSecretManagerState(RMState rmState)
|
|
private void loadAMRMTokenSecretManagerState(RMState rmState)
|
|
throws Exception {
|
|
throws Exception {
|
|
- byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, true);
|
|
|
|
|
|
+ byte[] data = getDataWithRetries(amrmTokenSecretManagerRoot, false);
|
|
if (data == null) {
|
|
if (data == null) {
|
|
LOG.warn("There is no data saved");
|
|
LOG.warn("There is no data saved");
|
|
return;
|
|
return;
|
|
@@ -470,10 +470,10 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
|
|
|
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
|
|
private void loadRMDelegationKeyState(RMState rmState) throws Exception {
|
|
List<String> childNodes =
|
|
List<String> childNodes =
|
|
- getChildrenWithRetries(dtMasterKeysRootPath, true);
|
|
|
|
|
|
+ getChildrenWithRetries(dtMasterKeysRootPath, false);
|
|
for (String childNodeName : childNodes) {
|
|
for (String childNodeName : childNodes) {
|
|
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
|
|
String childNodePath = getNodePath(dtMasterKeysRootPath, childNodeName);
|
|
- byte[] childData = getDataWithRetries(childNodePath, true);
|
|
|
|
|
|
+ byte[] childData = getDataWithRetries(childNodePath, false);
|
|
|
|
|
|
if (childData == null) {
|
|
if (childData == null) {
|
|
LOG.warn("Content of " + childNodePath + " is broken.");
|
|
LOG.warn("Content of " + childNodePath + " is broken.");
|
|
@@ -515,11 +515,11 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
|
|
|
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
|
|
private void loadRMDelegationTokenState(RMState rmState) throws Exception {
|
|
List<String> childNodes =
|
|
List<String> childNodes =
|
|
- getChildrenWithRetries(delegationTokensRootPath, true);
|
|
|
|
|
|
+ getChildrenWithRetries(delegationTokensRootPath, false);
|
|
for (String childNodeName : childNodes) {
|
|
for (String childNodeName : childNodes) {
|
|
String childNodePath =
|
|
String childNodePath =
|
|
getNodePath(delegationTokensRootPath, childNodeName);
|
|
getNodePath(delegationTokensRootPath, childNodeName);
|
|
- byte[] childData = getDataWithRetries(childNodePath, true);
|
|
|
|
|
|
+ byte[] childData = getDataWithRetries(childNodePath, false);
|
|
|
|
|
|
if (childData == null) {
|
|
if (childData == null) {
|
|
LOG.warn("Content of " + childNodePath + " is broken.");
|
|
LOG.warn("Content of " + childNodePath + " is broken.");
|
|
@@ -551,10 +551,10 @@ public class ZKRMStateStore extends RMStateStore {
|
|
}
|
|
}
|
|
|
|
|
|
private synchronized void loadRMAppState(RMState rmState) throws Exception {
|
|
private synchronized void loadRMAppState(RMState rmState) throws Exception {
|
|
- List<String> childNodes = getChildrenWithRetries(rmAppRoot, true);
|
|
|
|
|
|
+ List<String> childNodes = getChildrenWithRetries(rmAppRoot, false);
|
|
for (String childNodeName : childNodes) {
|
|
for (String childNodeName : childNodes) {
|
|
String childNodePath = getNodePath(rmAppRoot, childNodeName);
|
|
String childNodePath = getNodePath(rmAppRoot, childNodeName);
|
|
- byte[] childData = getDataWithRetries(childNodePath, true);
|
|
|
|
|
|
+ byte[] childData = getDataWithRetries(childNodePath, false);
|
|
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
|
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
|
// application
|
|
// application
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
@@ -585,7 +585,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
for (String attemptIDStr : attempts) {
|
|
for (String attemptIDStr : attempts) {
|
|
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
|
if (attemptIDStr.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
|
String attemptPath = getNodePath(appPath, attemptIDStr);
|
|
String attemptPath = getNodePath(appPath, attemptIDStr);
|
|
- byte[] attemptData = getDataWithRetries(attemptPath, true);
|
|
|
|
|
|
+ byte[] attemptData = getDataWithRetries(attemptPath, false);
|
|
|
|
|
|
ApplicationAttemptStateDataPBImpl attemptState =
|
|
ApplicationAttemptStateDataPBImpl attemptState =
|
|
new ApplicationAttemptStateDataPBImpl(
|
|
new ApplicationAttemptStateDataPBImpl(
|
|
@@ -622,7 +622,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
}
|
|
}
|
|
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
|
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
|
|
|
|
|
- if (existsWithRetries(nodeUpdatePath, true) != null) {
|
|
|
|
|
|
+ if (existsWithRetries(nodeUpdatePath, false) != null) {
|
|
setDataWithRetries(nodeUpdatePath, appStateData, -1);
|
|
setDataWithRetries(nodeUpdatePath, appStateData, -1);
|
|
} else {
|
|
} else {
|
|
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
|
|
createWithRetries(nodeUpdatePath, appStateData, zkAcl,
|
|
@@ -665,7 +665,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
}
|
|
}
|
|
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
|
|
byte[] attemptStateData = attemptStateDataPB.getProto().toByteArray();
|
|
|
|
|
|
- if (existsWithRetries(nodeUpdatePath, true) != null) {
|
|
|
|
|
|
+ if (existsWithRetries(nodeUpdatePath, false) != null) {
|
|
setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
|
|
setDataWithRetries(nodeUpdatePath, attemptStateData, -1);
|
|
} else {
|
|
} else {
|
|
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
|
|
createWithRetries(nodeUpdatePath, attemptStateData, zkAcl,
|
|
@@ -717,7 +717,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
LOG.debug("Removing RMDelegationToken_"
|
|
LOG.debug("Removing RMDelegationToken_"
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
}
|
|
}
|
|
- if (existsWithRetries(nodeRemovePath, true) != null) {
|
|
|
|
|
|
+ if (existsWithRetries(nodeRemovePath, false) != null) {
|
|
opList.add(Op.delete(nodeRemovePath, -1));
|
|
opList.add(Op.delete(nodeRemovePath, -1));
|
|
} else {
|
|
} else {
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
@@ -733,7 +733,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
String nodeRemovePath =
|
|
String nodeRemovePath =
|
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
- if (existsWithRetries(nodeRemovePath, true) == null) {
|
|
|
|
|
|
+ if (existsWithRetries(nodeRemovePath, false) == null) {
|
|
// in case znode doesn't exist
|
|
// in case znode doesn't exist
|
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
|
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
|
|
LOG.debug("Attempted to update a non-existing znode " + nodeRemovePath);
|
|
@@ -809,7 +809,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
if (LOG.isDebugEnabled()) {
|
|
if (LOG.isDebugEnabled()) {
|
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
|
}
|
|
}
|
|
- if (existsWithRetries(nodeRemovePath, true) != null) {
|
|
|
|
|
|
+ if (existsWithRetries(nodeRemovePath, false) != null) {
|
|
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
|
doMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
|
} else {
|
|
} else {
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
@@ -818,8 +818,8 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public synchronized void deleteStore() throws Exception {
|
|
public synchronized void deleteStore() throws Exception {
|
|
- if (existsWithRetries(zkRootNodePath, true) != null) {
|
|
|
|
- deleteWithRetries(zkRootNodePath, true);
|
|
|
|
|
|
+ if (existsWithRetries(zkRootNodePath, false) != null) {
|
|
|
|
+ deleteWithRetries(zkRootNodePath, false);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|