|
@@ -22,8 +22,6 @@ import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.apache.commons.logging.Log;
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.curator.framework.CuratorFramework;
|
|
|
-import org.apache.curator.framework.api.transaction.CuratorTransaction;
|
|
|
-import org.apache.curator.framework.api.transaction.CuratorTransactionFinal;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
@@ -31,6 +29,7 @@ import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
|
import org.apache.hadoop.util.ZKUtil;
|
|
|
import org.apache.hadoop.util.curator.ZKCuratorManager;
|
|
|
+import org.apache.hadoop.util.curator.ZKCuratorManager.SafeTransaction;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
import org.apache.hadoop.yarn.api.records.ReservationId;
|
|
@@ -416,9 +415,10 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
|
|
|
|
|
if (exists(versionNodePath)) {
|
|
|
- safeSetData(versionNodePath, data, -1);
|
|
|
+ zkManager.safeSetData(versionNodePath, data, -1, zkAcl, fencingNodePath);
|
|
|
} else {
|
|
|
- safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(versionNodePath, data, zkAcl, CreateMode.PERSISTENT,
|
|
|
+ zkAcl, fencingNodePath);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -447,12 +447,14 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
// increment epoch and store it
|
|
|
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
|
|
.toByteArray();
|
|
|
- safeSetData(epochNodePath, storeData, -1);
|
|
|
+ zkManager.safeSetData(epochNodePath, storeData, -1, zkAcl,
|
|
|
+ fencingNodePath);
|
|
|
} else {
|
|
|
// initialize epoch node with 1 for the next time.
|
|
|
byte[] storeData = Epoch.newInstance(currentEpoch + 1).getProto()
|
|
|
.toByteArray();
|
|
|
- safeCreate(epochNodePath, storeData, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(epochNodePath, storeData, zkAcl,
|
|
|
+ CreateMode.PERSISTENT, zkAcl, fencingNodePath);
|
|
|
}
|
|
|
|
|
|
return currentEpoch;
|
|
@@ -721,7 +723,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
// No apps stored under parent path.
|
|
|
if (children != null && children.isEmpty()) {
|
|
|
try {
|
|
|
- safeDelete(parentAppNode);
|
|
|
+ zkManager.safeDelete(parentAppNode, zkAcl, fencingNodePath);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("No leaf app node exists. Removing parent node " +
|
|
|
parentAppNode);
|
|
@@ -749,7 +751,8 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
|
|
|
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
|
|
if (appStateData.length <= zknodeLimit) {
|
|
|
- safeCreate(nodeCreatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(nodeCreatePath, appStateData, zkAcl,
|
|
|
+ CreateMode.PERSISTENT, zkAcl, fencingNodePath);
|
|
|
} else {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Application state data size for " + appId + " is "
|
|
@@ -780,7 +783,8 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
String rootNode =
|
|
|
getSplitAppNodeParent(nodeUpdatePath, appIdNodeSplitIndex);
|
|
|
if (!exists(rootNode)) {
|
|
|
- safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(rootNode, null, zkAcl, CreateMode.PERSISTENT,
|
|
|
+ zkAcl, fencingNodePath);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -794,9 +798,11 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
byte[] appStateData = appStateDataPB.getProto().toByteArray();
|
|
|
|
|
|
if (pathExists) {
|
|
|
- safeSetData(nodeUpdatePath, appStateData, -1);
|
|
|
+ zkManager.safeSetData(nodeUpdatePath, appStateData, -1, zkAcl,
|
|
|
+ fencingNodePath);
|
|
|
} else {
|
|
|
- safeCreate(nodeUpdatePath, appStateData, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(nodeUpdatePath, appStateData, zkAcl,
|
|
|
+ CreateMode.PERSISTENT, zkAcl, fencingNodePath);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Path " + nodeUpdatePath + " for " + appId + " didn't " +
|
|
|
"exist. Creating a new znode to update the application state.");
|
|
@@ -839,9 +845,11 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
switch (operation) {
|
|
|
case UPDATE:
|
|
|
if (exists(path)) {
|
|
|
- safeSetData(path, attemptStateData, -1);
|
|
|
+ zkManager.safeSetData(path, attemptStateData, -1, zkAcl,
|
|
|
+ fencingNodePath);
|
|
|
} else {
|
|
|
- safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(path, attemptStateData, zkAcl,
|
|
|
+ CreateMode.PERSISTENT, zkAcl, fencingNodePath);
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Path " + path + " for " + appAttemptId + " didn't exist." +
|
|
|
" Created a new znode to update the application attempt state.");
|
|
@@ -849,10 +857,11 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
}
|
|
|
break;
|
|
|
case STORE:
|
|
|
- safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(path, attemptStateData, zkAcl, CreateMode.PERSISTENT,
|
|
|
+ zkAcl, fencingNodePath);
|
|
|
break;
|
|
|
case REMOVE:
|
|
|
- safeDelete(path);
|
|
|
+ zkManager.safeDelete(path, zkAcl, fencingNodePath);
|
|
|
break;
|
|
|
default:
|
|
|
break;
|
|
@@ -930,10 +939,10 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
for (ApplicationAttemptId attemptId : attempts) {
|
|
|
String attemptRemovePath =
|
|
|
getNodePath(appIdRemovePath, attemptId.toString());
|
|
|
- safeDelete(attemptRemovePath);
|
|
|
+ zkManager.safeDelete(attemptRemovePath, zkAcl, fencingNodePath);
|
|
|
}
|
|
|
}
|
|
|
- safeDelete(appIdRemovePath);
|
|
|
+ zkManager.safeDelete(appIdRemovePath, zkAcl, fencingNodePath);
|
|
|
} else {
|
|
|
CuratorFramework curatorFramework = zkManager.getCurator();
|
|
|
curatorFramework.delete().deletingChildrenIfNeeded().
|
|
@@ -947,7 +956,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
protected synchronized void storeRMDelegationTokenState(
|
|
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
|
|
throws Exception {
|
|
|
- SafeTransaction trx = new SafeTransaction();
|
|
|
+ SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
|
|
|
addStoreOrUpdateOps(trx, rmDTIdentifier, renewDate, false);
|
|
|
trx.commit();
|
|
|
}
|
|
@@ -964,14 +973,14 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
|
}
|
|
|
|
|
|
- safeDelete(nodeRemovePath);
|
|
|
+ zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected synchronized void updateRMDelegationTokenState(
|
|
|
RMDelegationTokenIdentifier rmDTIdentifier, Long renewDate)
|
|
|
throws Exception {
|
|
|
- SafeTransaction trx = new SafeTransaction();
|
|
|
+ SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
|
|
|
String nodeRemovePath =
|
|
|
getNodePath(delegationTokensRootPath, DELEGATION_TOKEN_PREFIX
|
|
|
+ rmDTIdentifier.getSequenceNumber());
|
|
@@ -1035,8 +1044,8 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
try(DataOutputStream fsOut = new DataOutputStream(os)) {
|
|
|
delegationKey.write(fsOut);
|
|
|
- safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
|
|
|
- CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(nodeCreatePath, os.toByteArray(), zkAcl,
|
|
|
+ CreateMode.PERSISTENT, zkAcl, fencingNodePath);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1051,7 +1060,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
LOG.debug("Removing RMDelegationKey_" + delegationKey.getKeyId());
|
|
|
}
|
|
|
|
|
|
- safeDelete(nodeRemovePath);
|
|
|
+ zkManager.safeDelete(nodeRemovePath, zkAcl, fencingNodePath);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1078,7 +1087,8 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
AMRMTokenSecretManagerState.newInstance(amrmTokenSecretManagerState);
|
|
|
byte[] stateData = data.getProto().toByteArray();
|
|
|
|
|
|
- safeSetData(amrmTokenSecretManagerRoot, stateData, -1);
|
|
|
+ zkManager.safeSetData(amrmTokenSecretManagerRoot, stateData, -1, zkAcl,
|
|
|
+ fencingNodePath);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -1092,12 +1102,12 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
+ " for" + " plan " + planName);
|
|
|
}
|
|
|
|
|
|
- safeDelete(reservationPath);
|
|
|
+ zkManager.safeDelete(reservationPath, zkAcl, fencingNodePath);
|
|
|
|
|
|
List<String> reservationNodes = getChildren(planNodePath);
|
|
|
|
|
|
if (reservationNodes.isEmpty()) {
|
|
|
- safeDelete(planNodePath);
|
|
|
+ zkManager.safeDelete(planNodePath, zkAcl, fencingNodePath);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -1105,7 +1115,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
protected synchronized void storeReservationState(
|
|
|
ReservationAllocationStateProto reservationAllocation, String planName,
|
|
|
String reservationIdName) throws Exception {
|
|
|
- SafeTransaction trx = new SafeTransaction();
|
|
|
+ SafeTransaction trx = zkManager.createTransaction(zkAcl, fencingNodePath);
|
|
|
addOrUpdateReservationState(reservationAllocation, planName,
|
|
|
reservationIdName, trx, false);
|
|
|
trx.commit();
|
|
@@ -1191,7 +1201,8 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
getNodePath(rootNode, nodeName.substring(0, splitIdx));
|
|
|
if (createParentIfNotExists && !exists(rootNodePath)) {
|
|
|
try {
|
|
|
- safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT);
|
|
|
+ zkManager.safeCreate(rootNodePath, null, zkAcl, CreateMode.PERSISTENT,
|
|
|
+ zkAcl, fencingNodePath);
|
|
|
} catch (KeeperException.NodeExistsException e) {
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
LOG.debug("Unable to create app parent node " + rootNodePath +
|
|
@@ -1248,76 +1259,6 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
zkManager.delete(path);
|
|
|
}
|
|
|
|
|
|
- private void safeCreate(String path, byte[] data, List<ACL> acl,
|
|
|
- CreateMode mode) throws Exception {
|
|
|
- if (!exists(path)) {
|
|
|
- SafeTransaction transaction = new SafeTransaction();
|
|
|
- transaction.create(path, data, acl, mode);
|
|
|
- transaction.commit();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Deletes the path. Checks for existence of path as well.
|
|
|
- * @param path Path to be deleted.
|
|
|
- * @throws Exception if any problem occurs while performing deletion.
|
|
|
- */
|
|
|
- private void safeDelete(final String path) throws Exception {
|
|
|
- if (exists(path)) {
|
|
|
- SafeTransaction transaction = new SafeTransaction();
|
|
|
- transaction.delete(path);
|
|
|
- transaction.commit();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void safeSetData(String path, byte[] data, int version)
|
|
|
- throws Exception {
|
|
|
- SafeTransaction transaction = new SafeTransaction();
|
|
|
- transaction.setData(path, data, version);
|
|
|
- transaction.commit();
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * Use curator transactions to ensure zk-operations are performed in an all
|
|
|
- * or nothing fashion. This is equivalent to using ZooKeeper#multi.
|
|
|
- *
|
|
|
- * TODO (YARN-3774): Curator 3.0 introduces CuratorOp similar to Op. We ll
|
|
|
- * have to rewrite this inner class when we adopt that.
|
|
|
- */
|
|
|
- private class SafeTransaction {
|
|
|
- private CuratorTransactionFinal transactionFinal;
|
|
|
-
|
|
|
- SafeTransaction() throws Exception {
|
|
|
- CuratorFramework curatorFramework = zkManager.getCurator();
|
|
|
- CuratorTransaction transaction = curatorFramework.inTransaction();
|
|
|
- transactionFinal = transaction.create()
|
|
|
- .withMode(CreateMode.PERSISTENT).withACL(zkAcl)
|
|
|
- .forPath(fencingNodePath, new byte[0]).and();
|
|
|
- }
|
|
|
-
|
|
|
- public void commit() throws Exception {
|
|
|
- transactionFinal = transactionFinal.delete()
|
|
|
- .forPath(fencingNodePath).and();
|
|
|
- transactionFinal.commit();
|
|
|
- }
|
|
|
-
|
|
|
- public void create(String path, byte[] data, List<ACL> acl, CreateMode mode)
|
|
|
- throws Exception {
|
|
|
- transactionFinal = transactionFinal.create()
|
|
|
- .withMode(mode).withACL(acl).forPath(path, data).and();
|
|
|
- }
|
|
|
-
|
|
|
- public void delete(String path) throws Exception {
|
|
|
- transactionFinal = transactionFinal.delete().forPath(path).and();
|
|
|
- }
|
|
|
-
|
|
|
- public void setData(String path, byte[] data, int version)
|
|
|
- throws Exception {
|
|
|
- transactionFinal = transactionFinal.setData()
|
|
|
- .withVersion(version).forPath(path, data).and();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
/**
|
|
|
* Helper class that periodically attempts creating a znode to ensure that
|
|
|
* this RM continues to be the Active.
|
|
@@ -1332,7 +1273,7 @@ public class ZKRMStateStore extends RMStateStore {
|
|
|
try {
|
|
|
while (!isFencedState()) {
|
|
|
// Create and delete fencing node
|
|
|
- new SafeTransaction().commit();
|
|
|
+ zkManager.createTransaction(zkAcl, fencingNodePath).commit();
|
|
|
Thread.sleep(zkSessionTimeout);
|
|
|
}
|
|
|
} catch (InterruptedException ie) {
|