|
@@ -45,6 +45,7 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
|
|
|
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
|
|
|
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
|
|
|
import org.apache.curator.framework.recipes.shared.SharedCount;
|
|
|
+import org.apache.curator.framework.recipes.shared.VersionedValue;
|
|
|
import org.apache.curator.retry.RetryNTimes;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
@@ -58,7 +59,6 @@ import org.apache.zookeeper.ZooDefs.Perms;
|
|
|
import org.apache.zookeeper.client.ZooKeeperSaslClient;
|
|
|
import org.apache.zookeeper.data.ACL;
|
|
|
import org.apache.zookeeper.data.Id;
|
|
|
-import org.apache.zookeeper.data.Stat;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -109,10 +109,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
|
"ZKDelegationTokenSecretManagerClient";
|
|
|
|
|
|
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
|
|
|
- private static final String ZK_DTSM_SEQNUM_ROOT = "ZKDTSMSeqNumRoot";
|
|
|
- private static final String ZK_DTSM_KEYID_ROOT = "ZKDTSMKeyIdRoot";
|
|
|
- private static final String ZK_DTSM_TOKENS_ROOT = "ZKDTSMTokensRoot";
|
|
|
- private static final String ZK_DTSM_MASTER_KEY_ROOT = "ZKDTSMMasterKeyRoot";
|
|
|
+ private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot";
|
|
|
+ private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot";
|
|
|
+ private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
|
|
|
+ private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot";
|
|
|
|
|
|
private static final String DELEGATION_KEY_PREFIX = "DK_";
|
|
|
private static final String DELEGATION_TOKEN_PREFIX = "DT_";
|
|
@@ -505,11 +505,20 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
|
return delTokSeqCounter.getCount();
|
|
|
}
|
|
|
|
|
|
+ private void incrSharedCount(SharedCount sharedCount) throws Exception {
|
|
|
+ while (true) {
|
|
|
+ // Loop until we successfully increment the counter
|
|
|
+ VersionedValue<Integer> versionedValue = sharedCount.getVersionedValue();
|
|
|
+ if (sharedCount.trySetCount(versionedValue, versionedValue.getValue() + 1)) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
protected int incrementDelegationTokenSeqNum() {
|
|
|
try {
|
|
|
- while (!delTokSeqCounter.trySetCount(delTokSeqCounter.getCount() + 1)) {
|
|
|
- }
|
|
|
+ incrSharedCount(delTokSeqCounter);
|
|
|
} catch (InterruptedException e) {
|
|
|
// The ExpirationThread is just finishing.. so dont do anything..
|
|
|
LOG.debug("Thread interrupted while performing token counter increment", e);
|
|
@@ -537,8 +546,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
|
@Override
|
|
|
protected int incrementCurrentKeyId() {
|
|
|
try {
|
|
|
- while (!keyIdSeqCounter.trySetCount(keyIdSeqCounter.getCount() + 1)) {
|
|
|
- }
|
|
|
+ incrSharedCount(keyIdSeqCounter);
|
|
|
} catch (InterruptedException e) {
|
|
|
// The ExpirationThread is just finishing.. so dont do anything..
|
|
|
LOG.debug("Thread interrupted while performing keyId increment", e);
|