|
@@ -694,7 +694,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
|
|
LOG.debug("Removing info for app: " + appId + " at: " + appIdRemovePath
|
|
+ " and its attempts.");
|
|
+ " and its attempts.");
|
|
}
|
|
}
|
|
- doMultiWithRetries(opList);
|
|
|
|
|
|
+ doDeleteMultiWithRetries(opList);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -703,13 +703,12 @@ public class ZKRMStateStore extends RMStateStore {
|
|
throws Exception {
|
|
throws Exception {
|
|
ArrayList<Op> opList = new ArrayList<Op>();
|
|
ArrayList<Op> opList = new ArrayList<Op>();
|
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, false);
|
|
- doMultiWithRetries(opList);
|
|
|
|
|
|
+ doStoreMultiWithRetries(opList);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
protected synchronized void removeRMDelegationTokenState(
|
|
protected synchronized void removeRMDelegationTokenState(
|
|
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
|
RMDelegationTokenIdentifier rmDTIdentifier) throws Exception {
|
|
- ArrayList<Op> opList = new ArrayList<Op>();
|
|
|
|
String nodeRemovePath =
|
|
String nodeRemovePath =
|
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
@@ -718,11 +717,12 @@ public class ZKRMStateStore extends RMStateStore {
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
}
|
|
}
|
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
|
|
|
+ ArrayList<Op> opList = new ArrayList<Op>();
|
|
opList.add(Op.delete(nodeRemovePath, -1));
|
|
opList.add(Op.delete(nodeRemovePath, -1));
|
|
|
|
+ doDeleteMultiWithRetries(opList);
|
|
} else {
|
|
} else {
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
}
|
|
}
|
|
- doMultiWithRetries(opList);
|
|
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -741,7 +741,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
// in case znode exists
|
|
// in case znode exists
|
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
|
|
addStoreOrUpdateOps(opList, rmDTIdentifier, renewDate, true);
|
|
}
|
|
}
|
|
- doMultiWithRetries(opList);
|
|
|
|
|
|
+ doStoreMultiWithRetries(opList);
|
|
}
|
|
}
|
|
|
|
|
|
private void addStoreOrUpdateOps(ArrayList<Op> opList,
|
|
private void addStoreOrUpdateOps(ArrayList<Op> opList,
|
|
@@ -810,7 +810,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
|
}
|
|
}
|
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
|
if (existsWithRetries(nodeRemovePath, false) != null) {
|
|
- doMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
|
|
|
|
|
+ doDeleteMultiWithRetries(Op.delete(nodeRemovePath, -1));
|
|
} else {
|
|
} else {
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
LOG.debug("Attempted to delete a non-existing znode " + nodeRemovePath);
|
|
}
|
|
}
|
|
@@ -905,7 +905,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
* Helper method that creates fencing node, executes the passed operations,
|
|
* Helper method that creates fencing node, executes the passed operations,
|
|
* and deletes the fencing node.
|
|
* and deletes the fencing node.
|
|
*/
|
|
*/
|
|
- private synchronized void doMultiWithRetries(
|
|
|
|
|
|
+ private synchronized void doStoreMultiWithRetries(
|
|
final List<Op> opList) throws Exception {
|
|
final List<Op> opList) throws Exception {
|
|
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
|
|
final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
|
|
execOpList.add(createFencingNodePathOp);
|
|
execOpList.add(createFencingNodePathOp);
|
|
@@ -924,8 +924,32 @@ public class ZKRMStateStore extends RMStateStore {
|
|
* Helper method that creates fencing node, executes the passed operation,
|
|
* Helper method that creates fencing node, executes the passed operation,
|
|
* and deletes the fencing node.
|
|
* and deletes the fencing node.
|
|
*/
|
|
*/
|
|
- private void doMultiWithRetries(final Op op) throws Exception {
|
|
|
|
- doMultiWithRetries(Collections.singletonList(op));
|
|
|
|
|
|
+ private void doStoreMultiWithRetries(final Op op) throws Exception {
|
|
|
|
+ doStoreMultiWithRetries(Collections.singletonList(op));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * Helper method that creates fencing node, executes the passed
|
|
|
|
+ * delete related operations and deletes the fencing node.
|
|
|
|
+ */
|
|
|
|
+ private synchronized void doDeleteMultiWithRetries(
|
|
|
|
+ final List<Op> opList) throws Exception {
|
|
|
|
+ final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2);
|
|
|
|
+ execOpList.add(createFencingNodePathOp);
|
|
|
|
+ execOpList.addAll(opList);
|
|
|
|
+ execOpList.add(deleteFencingNodePathOp);
|
|
|
|
+ new ZKAction<Void>() {
|
|
|
|
+ @Override
|
|
|
|
+ public Void run() throws KeeperException, InterruptedException {
|
|
|
|
+ setHasDeleteNodeOp(true);
|
|
|
|
+ zkClient.multi(execOpList);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }.runWithRetries();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void doDeleteMultiWithRetries(final Op op) throws Exception {
|
|
|
|
+ doDeleteMultiWithRetries(Collections.singletonList(op));
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@@ -934,7 +958,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
public void createWithRetries(
|
|
public void createWithRetries(
|
|
final String path, final byte[] data, final List<ACL> acl,
|
|
final String path, final byte[] data, final List<ACL> acl,
|
|
final CreateMode mode) throws Exception {
|
|
final CreateMode mode) throws Exception {
|
|
- doMultiWithRetries(Op.create(path, data, acl, mode));
|
|
|
|
|
|
+ doStoreMultiWithRetries(Op.create(path, data, acl, mode));
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@@ -942,7 +966,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
@Unstable
|
|
@Unstable
|
|
public void setDataWithRetries(final String path, final byte[] data,
|
|
public void setDataWithRetries(final String path, final byte[] data,
|
|
final int version) throws Exception {
|
|
final int version) throws Exception {
|
|
- doMultiWithRetries(Op.setData(path, data, version));
|
|
|
|
|
|
+ doStoreMultiWithRetries(Op.setData(path, data, version));
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
@@ -1008,7 +1032,12 @@ public class ZKRMStateStore extends RMStateStore {
|
|
for (String child : children) {
|
|
for (String child : children) {
|
|
recursiveDeleteWithRetriesHelper(path + "/" + child, false);
|
|
recursiveDeleteWithRetriesHelper(path + "/" + child, false);
|
|
}
|
|
}
|
|
- zkClient.delete(path, -1);
|
|
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ zkClient.delete(path, -1);
|
|
|
|
+ } catch (KeeperException.NoNodeException nne) {
|
|
|
|
+ LOG.info("Node " + path + " doesn't exist to delete");
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -1028,7 +1057,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
if(isFencedState()) {
|
|
if(isFencedState()) {
|
|
break;
|
|
break;
|
|
}
|
|
}
|
|
- doMultiWithRetries(emptyOpList);
|
|
|
|
|
|
+ doStoreMultiWithRetries(emptyOpList);
|
|
Thread.sleep(zkSessionTimeout);
|
|
Thread.sleep(zkSessionTimeout);
|
|
}
|
|
}
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
@@ -1041,6 +1070,10 @@ public class ZKRMStateStore extends RMStateStore {
|
|
}
|
|
}
|
|
|
|
|
|
private abstract class ZKAction<T> {
|
|
private abstract class ZKAction<T> {
|
|
|
|
+ private boolean hasDeleteNodeOp = false;
|
|
|
|
+ void setHasDeleteNodeOp(boolean hasDeleteOp) {
|
|
|
|
+ this.hasDeleteNodeOp = hasDeleteOp;
|
|
|
|
+ }
|
|
// run() expects synchronization on ZKRMStateStore.this
|
|
// run() expects synchronization on ZKRMStateStore.this
|
|
abstract T run() throws KeeperException, InterruptedException;
|
|
abstract T run() throws KeeperException, InterruptedException;
|
|
|
|
|
|
@@ -1090,6 +1123,11 @@ public class ZKRMStateStore extends RMStateStore {
|
|
LOG.info("znode already exists!");
|
|
LOG.info("znode already exists!");
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
|
|
+ if (hasDeleteNodeOp && ke.code() == Code.NONODE) {
|
|
|
|
+ LOG.info("znode has already been deleted!");
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+
|
|
LOG.info("Exception while executing a ZK operation.", ke);
|
|
LOG.info("Exception while executing a ZK operation.", ke);
|
|
if (shouldRetry(ke.code()) && ++retry < numRetries) {
|
|
if (shouldRetry(ke.code()) && ++retry < numRetries) {
|
|
LOG.info("Retrying operation on ZK. Retry no. " + retry);
|
|
LOG.info("Retrying operation on ZK. Retry no. " + retry);
|