|
@@ -55,6 +55,7 @@ import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.SecurityUtil;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.Token;
|
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
|
|
import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
|
|
|
|
+import static org.apache.hadoop.util.Time.now;
|
|
import org.apache.zookeeper.CreateMode;
|
|
import org.apache.zookeeper.CreateMode;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException;
|
|
import org.apache.zookeeper.KeeperException.NoNodeException;
|
|
import org.apache.zookeeper.KeeperException.NoNodeException;
|
|
@@ -79,7 +80,7 @@ import com.google.common.base.Preconditions;
|
|
public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
|
|
public abstract class ZKDelegationTokenSecretManager<TokenIdent extends AbstractDelegationTokenIdentifier>
|
|
extends AbstractDelegationTokenSecretManager<TokenIdent> {
|
|
extends AbstractDelegationTokenSecretManager<TokenIdent> {
|
|
|
|
|
|
- private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
|
|
|
|
|
|
+ public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager.";
|
|
public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
|
|
public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX
|
|
+ "zkNumRetries";
|
|
+ "zkNumRetries";
|
|
public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
|
|
public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX
|
|
@@ -100,6 +101,9 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
+ "kerberos.principal";
|
|
+ "kerberos.principal";
|
|
public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
|
|
public static final String ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE = ZK_CONF_PREFIX
|
|
+ "token.seqnum.batch.size";
|
|
+ "token.seqnum.batch.size";
|
|
|
|
+ public static final String ZK_DTSM_TOKEN_WATCHER_ENABLED = ZK_CONF_PREFIX
|
|
|
|
+ + "token.watcher.enabled";
|
|
|
|
+ public static final boolean ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT = true;
|
|
|
|
|
|
public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
|
|
public static final int ZK_DTSM_ZK_NUM_RETRIES_DEFAULT = 3;
|
|
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
|
|
public static final int ZK_DTSM_ZK_SESSION_TIMEOUT_DEFAULT = 10000;
|
|
@@ -118,7 +122,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
|
|
private static final String ZK_DTSM_NAMESPACE = "ZKDTSMRoot";
|
|
private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot";
|
|
private static final String ZK_DTSM_SEQNUM_ROOT = "/ZKDTSMSeqNumRoot";
|
|
private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot";
|
|
private static final String ZK_DTSM_KEYID_ROOT = "/ZKDTSMKeyIdRoot";
|
|
- private static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
|
|
|
|
|
|
+ protected static final String ZK_DTSM_TOKENS_ROOT = "/ZKDTSMTokensRoot";
|
|
private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot";
|
|
private static final String ZK_DTSM_MASTER_KEY_ROOT = "/ZKDTSMMasterKeyRoot";
|
|
|
|
|
|
private static final String DELEGATION_KEY_PREFIX = "DK_";
|
|
private static final String DELEGATION_KEY_PREFIX = "DK_";
|
|
@@ -132,7 +136,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
}
|
|
}
|
|
|
|
|
|
private final boolean isExternalClient;
|
|
private final boolean isExternalClient;
|
|
- private final CuratorFramework zkClient;
|
|
|
|
|
|
+ protected final CuratorFramework zkClient;
|
|
private SharedCount delTokSeqCounter;
|
|
private SharedCount delTokSeqCounter;
|
|
private SharedCount keyIdSeqCounter;
|
|
private SharedCount keyIdSeqCounter;
|
|
private PathChildrenCache keyCache;
|
|
private PathChildrenCache keyCache;
|
|
@@ -143,6 +147,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
private int currentSeqNum;
|
|
private int currentSeqNum;
|
|
private int currentMaxSeqNum;
|
|
private int currentMaxSeqNum;
|
|
|
|
|
|
|
|
+ private final boolean isTokenWatcherEnabled;
|
|
|
|
+
|
|
public ZKDelegationTokenSecretManager(Configuration conf) {
|
|
public ZKDelegationTokenSecretManager(Configuration conf) {
|
|
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
|
|
super(conf.getLong(DelegationTokenManager.UPDATE_INTERVAL,
|
|
DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
|
|
DelegationTokenManager.UPDATE_INTERVAL_DEFAULT) * 1000,
|
|
@@ -156,6 +162,8 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
|
|
ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT);
|
|
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
|
|
seqNumBatchSize = conf.getInt(ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
|
|
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
|
|
ZK_DTSM_TOKEN_SEQNUM_BATCH_SIZE_DEFAULT);
|
|
|
|
+ isTokenWatcherEnabled = conf.getBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED,
|
|
|
|
+ ZK_DTSM_TOKEN_WATCHER_ENABLED_DEFAULT);
|
|
if (CURATOR_TL.get() != null) {
|
|
if (CURATOR_TL.get() != null) {
|
|
zkClient =
|
|
zkClient =
|
|
CURATOR_TL.get().usingNamespace(
|
|
CURATOR_TL.get().usingNamespace(
|
|
@@ -383,34 +391,37 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
throw new IOException("Could not start PathChildrenCache for keys", e);
|
|
throw new IOException("Could not start PathChildrenCache for keys", e);
|
|
}
|
|
}
|
|
- try {
|
|
|
|
- tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
|
|
|
|
- if (tokenCache != null) {
|
|
|
|
- tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
|
|
|
|
- tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
|
|
|
|
-
|
|
|
|
- @Override
|
|
|
|
- public void childEvent(CuratorFramework client,
|
|
|
|
- PathChildrenCacheEvent event) throws Exception {
|
|
|
|
- switch (event.getType()) {
|
|
|
|
- case CHILD_ADDED:
|
|
|
|
- processTokenAddOrUpdate(event.getData());
|
|
|
|
- break;
|
|
|
|
- case CHILD_UPDATED:
|
|
|
|
- processTokenAddOrUpdate(event.getData());
|
|
|
|
- break;
|
|
|
|
- case CHILD_REMOVED:
|
|
|
|
- processTokenRemoved(event.getData());
|
|
|
|
- break;
|
|
|
|
- default:
|
|
|
|
- break;
|
|
|
|
|
|
+ if (isTokenWatcherEnabled) {
|
|
|
|
+ LOG.info("TokenCache is enabled");
|
|
|
|
+ try {
|
|
|
|
+ tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true);
|
|
|
|
+ if (tokenCache != null) {
|
|
|
|
+ tokenCache.start(StartMode.BUILD_INITIAL_CACHE);
|
|
|
|
+ tokenCache.getListenable().addListener(new PathChildrenCacheListener() {
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void childEvent(CuratorFramework client,
|
|
|
|
+ PathChildrenCacheEvent event) throws Exception {
|
|
|
|
+ switch (event.getType()) {
|
|
|
|
+ case CHILD_ADDED:
|
|
|
|
+ processTokenAddOrUpdate(event.getData().getData());
|
|
|
|
+ break;
|
|
|
|
+ case CHILD_UPDATED:
|
|
|
|
+ processTokenAddOrUpdate(event.getData().getData());
|
|
|
|
+ break;
|
|
|
|
+ case CHILD_REMOVED:
|
|
|
|
+ processTokenRemoved(event.getData());
|
|
|
|
+ break;
|
|
|
|
+ default:
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
- }, listenerThreadPool);
|
|
|
|
- loadFromZKCache(true);
|
|
|
|
|
|
+ }, listenerThreadPool);
|
|
|
|
+ loadFromZKCache(true);
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new IOException("Could not start PathChildrenCache for tokens", e);
|
|
}
|
|
}
|
|
- } catch (Exception e) {
|
|
|
|
- throw new IOException("Could not start PathChildrenCache for tokens", e);
|
|
|
|
}
|
|
}
|
|
super.startThreads();
|
|
super.startThreads();
|
|
}
|
|
}
|
|
@@ -435,7 +446,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
for (ChildData child : children) {
|
|
for (ChildData child : children) {
|
|
try {
|
|
try {
|
|
if (isTokenCache) {
|
|
if (isTokenCache) {
|
|
- processTokenAddOrUpdate(child);
|
|
|
|
|
|
+ processTokenAddOrUpdate(child.getData());
|
|
} else {
|
|
} else {
|
|
processKeyAddOrUpdate(child.getData());
|
|
processKeyAddOrUpdate(child.getData());
|
|
}
|
|
}
|
|
@@ -457,9 +468,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
DataInputStream din = new DataInputStream(bin);
|
|
DataInputStream din = new DataInputStream(bin);
|
|
DelegationKey key = new DelegationKey();
|
|
DelegationKey key = new DelegationKey();
|
|
key.readFields(din);
|
|
key.readFields(din);
|
|
- synchronized (this) {
|
|
|
|
- allKeys.put(key.getKeyId(), key);
|
|
|
|
- }
|
|
|
|
|
|
+ allKeys.put(key.getKeyId(), key);
|
|
}
|
|
}
|
|
|
|
|
|
private void processKeyRemoved(String path) {
|
|
private void processKeyRemoved(String path) {
|
|
@@ -469,15 +478,13 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
int j = tokSeg.indexOf('_');
|
|
int j = tokSeg.indexOf('_');
|
|
if (j > 0) {
|
|
if (j > 0) {
|
|
int keyId = Integer.parseInt(tokSeg.substring(j + 1));
|
|
int keyId = Integer.parseInt(tokSeg.substring(j + 1));
|
|
- synchronized (this) {
|
|
|
|
- allKeys.remove(keyId);
|
|
|
|
- }
|
|
|
|
|
|
+ allKeys.remove(keyId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private void processTokenAddOrUpdate(ChildData data) throws IOException {
|
|
|
|
- ByteArrayInputStream bin = new ByteArrayInputStream(data.getData());
|
|
|
|
|
|
+ protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException {
|
|
|
|
+ ByteArrayInputStream bin = new ByteArrayInputStream(data);
|
|
DataInputStream din = new DataInputStream(bin);
|
|
DataInputStream din = new DataInputStream(bin);
|
|
TokenIdent ident = createIdentifier();
|
|
TokenIdent ident = createIdentifier();
|
|
ident.readFields(din);
|
|
ident.readFields(din);
|
|
@@ -488,12 +495,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
if (numRead > -1) {
|
|
if (numRead > -1) {
|
|
DelegationTokenInformation tokenInfo =
|
|
DelegationTokenInformation tokenInfo =
|
|
new DelegationTokenInformation(renewDate, password);
|
|
new DelegationTokenInformation(renewDate, password);
|
|
- synchronized (this) {
|
|
|
|
- currentTokens.put(ident, tokenInfo);
|
|
|
|
- // The cancel task might be waiting
|
|
|
|
- notifyAll();
|
|
|
|
- }
|
|
|
|
|
|
+ currentTokens.put(ident, tokenInfo);
|
|
|
|
+ return ident;
|
|
}
|
|
}
|
|
|
|
+ return null;
|
|
}
|
|
}
|
|
|
|
|
|
private void processTokenRemoved(ChildData data) throws IOException {
|
|
private void processTokenRemoved(ChildData data) throws IOException {
|
|
@@ -501,11 +506,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
DataInputStream din = new DataInputStream(bin);
|
|
DataInputStream din = new DataInputStream(bin);
|
|
TokenIdent ident = createIdentifier();
|
|
TokenIdent ident = createIdentifier();
|
|
ident.readFields(din);
|
|
ident.readFields(din);
|
|
- synchronized (this) {
|
|
|
|
- currentTokens.remove(ident);
|
|
|
|
- // The cancel task might be waiting
|
|
|
|
- notifyAll();
|
|
|
|
- }
|
|
|
|
|
|
+ currentTokens.remove(ident);
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
@@ -706,7 +707,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
*
|
|
*
|
|
* @param ident Identifier of the token
|
|
* @param ident Identifier of the token
|
|
*/
|
|
*/
|
|
- private synchronized void syncLocalCacheWithZk(TokenIdent ident) {
|
|
|
|
|
|
+ protected void syncLocalCacheWithZk(TokenIdent ident) {
|
|
try {
|
|
try {
|
|
DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident);
|
|
DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident);
|
|
if (tokenInfo != null && !currentTokens.containsKey(ident)) {
|
|
if (tokenInfo != null && !currentTokens.containsKey(ident)) {
|
|
@@ -720,16 +721,21 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
|
|
|
|
|
|
+ protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident)
|
|
throws IOException {
|
|
throws IOException {
|
|
return getTokenInfoFromZK(ident, false);
|
|
return getTokenInfoFromZK(ident, false);
|
|
}
|
|
}
|
|
|
|
|
|
- private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
|
|
|
|
|
|
+ protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident,
|
|
boolean quiet) throws IOException {
|
|
boolean quiet) throws IOException {
|
|
String nodePath =
|
|
String nodePath =
|
|
getNodePath(ZK_DTSM_TOKENS_ROOT,
|
|
getNodePath(ZK_DTSM_TOKENS_ROOT,
|
|
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
|
|
DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber());
|
|
|
|
+ return getTokenInfoFromZK(nodePath, quiet);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected DelegationTokenInformation getTokenInfoFromZK(String nodePath,
|
|
|
|
+ boolean quiet) throws IOException {
|
|
try {
|
|
try {
|
|
byte[] data = zkClient.getData().forPath(nodePath);
|
|
byte[] data = zkClient.getData().forPath(nodePath);
|
|
if ((data == null) || (data.length == 0)) {
|
|
if ((data == null) || (data.length == 0)) {
|
|
@@ -864,15 +870,30 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
@Override
|
|
@Override
|
|
protected void removeStoredToken(TokenIdent ident)
|
|
protected void removeStoredToken(TokenIdent ident)
|
|
throws IOException {
|
|
throws IOException {
|
|
|
|
+ removeStoredToken(ident, false);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ protected void removeStoredToken(TokenIdent ident,
|
|
|
|
+ boolean checkAgainstZkBeforeDeletion) throws IOException {
|
|
String nodeRemovePath =
|
|
String nodeRemovePath =
|
|
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
|
|
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
|
|
+ ident.getSequenceNumber());
|
|
+ ident.getSequenceNumber());
|
|
- if (LOG.isDebugEnabled()) {
|
|
|
|
- LOG.debug("Removing ZKDTSMDelegationToken_"
|
|
|
|
- + ident.getSequenceNumber());
|
|
|
|
- }
|
|
|
|
try {
|
|
try {
|
|
- if (zkClient.checkExists().forPath(nodeRemovePath) != null) {
|
|
|
|
|
|
+ DelegationTokenInformation dtInfo = getTokenInfoFromZK(ident, true);
|
|
|
|
+ if (dtInfo != null) {
|
|
|
|
+ // For the case there is no sync or watch miss, it is possible that the
|
|
|
|
+ // local storage has expired tokens which have been renewed by peer
|
|
|
|
+ // so double check again to avoid accidental delete
|
|
|
|
+ if (checkAgainstZkBeforeDeletion
|
|
|
|
+ && dtInfo.getRenewDate() > now()) {
|
|
|
|
+ LOG.info("Node already renewed by peer " + nodeRemovePath +
|
|
|
|
+ " so this token should not be deleted");
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("Removing ZKDTSMDelegationToken_"
|
|
|
|
+ + ident.getSequenceNumber());
|
|
|
|
+ }
|
|
while(zkClient.checkExists().forPath(nodeRemovePath) != null){
|
|
while(zkClient.checkExists().forPath(nodeRemovePath) != null){
|
|
try {
|
|
try {
|
|
zkClient.delete().guaranteed().forPath(nodeRemovePath);
|
|
zkClient.delete().guaranteed().forPath(nodeRemovePath);
|
|
@@ -895,7 +916,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|
|
- public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
|
|
|
|
|
|
+ public TokenIdent cancelToken(Token<TokenIdent> token,
|
|
String canceller) throws IOException {
|
|
String canceller) throws IOException {
|
|
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
|
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
|
DataInputStream in = new DataInputStream(buf);
|
|
DataInputStream in = new DataInputStream(buf);
|
|
@@ -906,7 +927,7 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
return super.cancelToken(token, canceller);
|
|
return super.cancelToken(token, canceller);
|
|
}
|
|
}
|
|
|
|
|
|
- private void addOrUpdateToken(TokenIdent ident,
|
|
|
|
|
|
+ protected void addOrUpdateToken(TokenIdent ident,
|
|
DelegationTokenInformation info, boolean isUpdate) throws Exception {
|
|
DelegationTokenInformation info, boolean isUpdate) throws Exception {
|
|
String nodeCreatePath =
|
|
String nodeCreatePath =
|
|
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
|
|
getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX
|
|
@@ -933,6 +954,10 @@ public abstract class ZKDelegationTokenSecretManager<TokenIdent extends Abstract
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public boolean isTokenWatcherEnabled() {
|
|
|
|
+ return isTokenWatcherEnabled;
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Simple implementation of an {@link ACLProvider} that simply returns an ACL
|
|
* Simple implementation of an {@link ACLProvider} that simply returns an ACL
|
|
* that gives all permissions only to a single principal.
|
|
* that gives all permissions only to a single principal.
|