瀏覽代碼

HDFS-17128. Updating SQLDelegationTokenSecretManager to use LoadingCache so tokens are updated frequently. (#5897) Contributed by Hector Sandoval Chaverri.

Reviewed-by: Simbarashe Dzinamarira <sdzinamarira@linkedin.com>
Reviewed-by: Inigo Goiri <inigoiri@apache.org>
Reviewed-by: Shilun Fan <slfan1989@apache.org>
Signed-off-by: Shilun Fan <slfan1989@apache.org>
hchaverri 1 年之前
父節點
當前提交
bc48e5cbe8

+ 7 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java

@@ -88,8 +88,7 @@ extends AbstractDelegationTokenIdentifier>
    * Cache of currently valid tokens, mapping from DelegationTokenIdentifier 
    * to DelegationTokenInformation. Protected by this object lock.
    */
-  protected final Map<TokenIdent, DelegationTokenInformation> currentTokens 
-      = new ConcurrentHashMap<>();
+  protected Map<TokenIdent, DelegationTokenInformation> currentTokens;
 
   /**
    * Map of token real owners to its token count. This is used to generate
@@ -155,6 +154,7 @@ extends AbstractDelegationTokenIdentifier>
     this.tokenRenewInterval = delegationTokenRenewInterval;
     this.tokenRemoverScanInterval = delegationTokenRemoverScanInterval;
     this.storeTokenTrackingId = false;
+    this.currentTokens = new ConcurrentHashMap<>();
   }
 
   /**
@@ -771,10 +771,14 @@ extends AbstractDelegationTokenIdentifier>
     for (TokenIdent ident : expiredTokens) {
       logExpireToken(ident);
       LOG.info("Removing expired token " + formatTokenId(ident));
-      removeStoredToken(ident);
+      removeExpiredStoredToken(ident);
     }
   }
 
+  protected void removeExpiredStoredToken(TokenIdent ident) throws IOException {
+    removeStoredToken(ident);
+  }
+
   public void stopThreads() {
     if (LOG.isDebugEnabled())
       LOG.debug("Stopping expired delegation token remover thread");

+ 118 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/DelegationTokenLoadingCache.java

@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.security.token.delegation;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+
+
+/**
+ * Cache for delegation tokens that can handle high volume of tokens. A
+ * loading cache will prevent all active tokens from being in memory at the
+ * same time. It will also trigger more requests from the persistent token storage.
+ */
+public class DelegationTokenLoadingCache<K, V> implements Map<K, V> {
+  private LoadingCache<K, V> internalLoadingCache;
+
+  public DelegationTokenLoadingCache(long cacheExpirationMs, long maximumCacheSize,
+      Function<K, V> singleEntryFunction) {
+    this.internalLoadingCache = CacheBuilder.newBuilder()
+        .expireAfterWrite(cacheExpirationMs, TimeUnit.MILLISECONDS)
+        .maximumSize(maximumCacheSize)
+        .build(new CacheLoader<K, V>() {
+          @Override
+          public V load(K k) throws Exception {
+            return singleEntryFunction.apply(k);
+          }
+        });
+  }
+
+  @Override
+  public int size() {
+    return (int) this.internalLoadingCache.size();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return size() == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    return this.internalLoadingCache.getIfPresent(key) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public V get(Object key) {
+    try {
+      return this.internalLoadingCache.get((K) key);
+    } catch (Exception e) {
+      return null;
+    }
+  }
+
+  @Override
+  public V put(K key, V value) {
+    this.internalLoadingCache.put(key, value);
+    return this.internalLoadingCache.getIfPresent(key);
+  }
+
+  @Override
+  public V remove(Object key) {
+    V value = this.internalLoadingCache.getIfPresent(key);
+    this.internalLoadingCache.invalidate(key);
+    return value;
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends V> m) {
+    this.internalLoadingCache.putAll(m);
+  }
+
+  @Override
+  public void clear() {
+    this.internalLoadingCache.invalidateAll();
+  }
+
+  @Override
+  public Set<K> keySet() {
+    return this.internalLoadingCache.asMap().keySet();
+  }
+
+  @Override
+  public Collection<V> values() {
+    return this.internalLoadingCache.asMap().values();
+  }
+
+  @Override
+  public Set<Entry<K, V>> entrySet() {
+    return this.internalLoadingCache.asMap().entrySet();
+  }
+}

+ 67 - 29
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/SQLDelegationTokenSecretManager.java

@@ -24,9 +24,13 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
+import java.util.NoSuchElementException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -46,6 +50,13 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent
   private static final String SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE = SQL_DTSM_CONF_PREFIX
       + "token.seqnum.batch.size";
   public static final int DEFAULT_SEQ_NUM_BATCH_SIZE = 10;
+  public static final String SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION = SQL_DTSM_CONF_PREFIX
+      + "token.loading.cache.expiration";
+  public static final long SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT =
+      TimeUnit.SECONDS.toMillis(10);
+  public static final String SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE = SQL_DTSM_CONF_PREFIX
+      + "token.loading.cache.max.size";
+  public static final long SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE_DEFAULT = 100000;
 
   // Batch of sequence numbers that will be requested by the sequenceNumCounter.
   // A new batch is requested once the sequenceNums available to a secret manager are
@@ -71,6 +82,13 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent
 
     this.seqNumBatchSize = conf.getInt(SQL_DTSM_TOKEN_SEQNUM_BATCH_SIZE,
         DEFAULT_SEQ_NUM_BATCH_SIZE);
+
+    long cacheExpirationMs = conf.getTimeDuration(SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION,
+        SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION_DEFAULT, TimeUnit.MILLISECONDS);
+    long maximumCacheSize = conf.getLong(SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE,
+        SQL_DTSM_TOKEN_LOADING_CACHE_MAX_SIZE_DEFAULT);
+    this.currentTokens = new DelegationTokenLoadingCache<>(cacheExpirationMs, maximumCacheSize,
+        this::getTokenInfoFromSQL);
   }
 
   /**
@@ -126,15 +144,11 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent
   @Override
   public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
       String canceller) throws IOException {
-    try (ByteArrayInputStream bis = new ByteArrayInputStream(token.getIdentifier());
-        DataInputStream din = new DataInputStream(bis)) {
-      TokenIdent id = createIdentifier();
-      id.readFields(din);
+    TokenIdent id = createTokenIdent(token.getIdentifier());
 
-      // Calling getTokenInfo to load token into local cache if not present.
-      // super.cancelToken() requires token to be present in local cache.
-      getTokenInfo(id);
-    }
+    // Calling getTokenInfo to load token into local cache if not present.
+    // super.cancelToken() requires token to be present in local cache.
+    getTokenInfo(id);
 
     return super.cancelToken(token, canceller);
   }
@@ -153,6 +167,24 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent
     }
   }
 
+  @Override
+  protected void removeExpiredStoredToken(TokenIdent ident) {
+    try {
+      // Ensure that the token has not been renewed in SQL by
+      // another secret manager
+      DelegationTokenInformation tokenInfo = getTokenInfoFromSQL(ident);
+      if (tokenInfo.getRenewDate() >= Time.now()) {
+        LOG.info("Token was renewed by a different router and has not been deleted: {}", ident);
+        return;
+      }
+      removeStoredToken(ident);
+    } catch (NoSuchElementException e) {
+      LOG.info("Token has already been deleted by a different router: {}", ident);
+    } catch (Exception e) {
+      LOG.warn("Could not remove token {}", ident, e);
+    }
+  }
+
   /**
    * Obtains the DelegationTokenInformation associated with the given
    * TokenIdentifier in the SQL database.
@@ -160,29 +192,35 @@ public abstract class SQLDelegationTokenSecretManager<TokenIdent
    * @return DelegationTokenInformation that matches the given TokenIdentifier or
    *         null if it doesn't exist in the database.
    */
-  @Override
-  protected DelegationTokenInformation getTokenInfo(TokenIdent ident) {
-    // Look for token in local cache
-    DelegationTokenInformation tokenInfo = super.getTokenInfo(ident);
-
-    if (tokenInfo == null) {
-      try {
-        // Look for token in SQL database
-        byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes());
+  @VisibleForTesting
+  protected DelegationTokenInformation getTokenInfoFromSQL(TokenIdent ident) {
+    try {
+      byte[] tokenInfoBytes = selectTokenInfo(ident.getSequenceNumber(), ident.getBytes());
+      if (tokenInfoBytes == null) {
+        // Throw exception so value is not added to cache
+        throw new NoSuchElementException("Token not found in SQL secret manager: " + ident);
+      }
+      return createTokenInfo(tokenInfoBytes);
+    } catch (SQLException | IOException e) {
+      LOG.error("Failed to get token in SQL secret manager", e);
+      throw new RuntimeException(e);
+    }
+  }
 
-        if (tokenInfoBytes != null) {
-          tokenInfo = new DelegationTokenInformation();
-          try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) {
-            try (DataInputStream dis = new DataInputStream(bis)) {
-              tokenInfo.readFields(dis);
-            }
-          }
+  private TokenIdent createTokenIdent(byte[] tokenIdentBytes) throws IOException {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenIdentBytes);
+        DataInputStream din = new DataInputStream(bis)) {
+      TokenIdent id = createIdentifier();
+      id.readFields(din);
+      return id;
+    }
+  }
 
-          // Update token in local cache
-          currentTokens.put(ident, tokenInfo);
-        }
-      } catch (IOException | SQLException e) {
-        LOG.error("Failed to get token in SQL secret manager", e);
+  private DelegationTokenInformation createTokenInfo(byte[] tokenInfoBytes) throws IOException {
+    DelegationTokenInformation tokenInfo = new DelegationTokenInformation();
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(tokenInfoBytes)) {
+      try (DataInputStream dis = new DataInputStream(bis)) {
+        tokenInfo.readFields(dis);
       }
     }
 

+ 116 - 2
hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/security/token/TestSQLDelegationTokenSecretManagerImpl.java

@@ -25,13 +25,21 @@ import java.sql.SQLException;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import org.apache.hadoop.security.token.delegation.SQLDelegationTokenSecretManager;
 import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -43,6 +51,7 @@ import org.junit.Test;
 public class TestSQLDelegationTokenSecretManagerImpl {
   private static final String CONNECTION_URL = "jdbc:derby:memory:TokenStore";
   private static final int TEST_MAX_RETRIES = 3;
+  private static final int TOKEN_EXPIRATION_SECONDS = 1;
   private static Configuration conf;
 
   @Before
@@ -111,6 +120,96 @@ public class TestSQLDelegationTokenSecretManagerImpl {
     }
   }
 
+  @Test
+  public void testCancelToken() throws Exception {
+    DelegationTokenManager tokenManager1 = createTokenManager(getShortLivedTokenConf());
+    DelegationTokenManager tokenManager2 = createTokenManager(getShortLivedTokenConf());
+
+    TestDelegationTokenSecretManager secretManager2 =
+        (TestDelegationTokenSecretManager) tokenManager2.getDelegationTokenSecretManager();
+
+    try {
+      // Create token on token manager 1
+      Token token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+
+      // Load token on token manager 2 to test it doesn't get stale
+      tokenManager2.verifyToken(token1);
+
+      // Cancel token on token manager 1
+      tokenManager1.cancelToken(token1, "foo");
+
+      // Validate that token cancellation is propagated to token manager 2
+      secretManager2.waitForTokenEviction(token1.decodeIdentifier());
+      LambdaTestUtils.intercept(SecretManager.InvalidToken.class,
+          () -> tokenManager2.verifyToken(token1));
+    } finally {
+      stopTokenManager(tokenManager1);
+      stopTokenManager(tokenManager2);
+    }
+  }
+
+  @Test
+  public void testRenewToken() throws Exception {
+    DelegationTokenManager tokenManager1 = createTokenManager(getShortLivedTokenConf());
+    DelegationTokenManager tokenManager2 = createTokenManager(getShortLivedTokenConf());
+
+    TestDelegationTokenSecretManager secretManager2 =
+        (TestDelegationTokenSecretManager) tokenManager2.getDelegationTokenSecretManager();
+
+    try {
+      // Create token on token manager 1
+      Token token1 = tokenManager1.createToken(UserGroupInformation.getCurrentUser(), "foo");
+      long expirationTime = Time.monotonicNow() +
+          TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 2;
+
+      // Load token on token manager 2 to test it doesn't get stale
+      tokenManager2.verifyToken(token1);
+
+      // Renew token on token manager 1 and verify token is updated on token manager 2
+      // Do this for long enough that the token should be expired if not renewed
+      AbstractDelegationTokenIdentifier token1Id =
+          (AbstractDelegationTokenIdentifier) token1.decodeIdentifier();
+      while (Time.monotonicNow() < expirationTime) {
+        tokenManager1.renewToken(token1, "foo");
+        callRemoveExpiredTokensAndValidateSQL(secretManager2, token1Id, true);
+        secretManager2.waitForTokenEviction(token1Id);
+        tokenManager2.verifyToken(token1);
+      }
+
+      // Stop renewing token and validate it's no longer valid and removed
+      // from SQL
+      Thread.sleep(TimeUnit.SECONDS.toMillis(TOKEN_EXPIRATION_SECONDS) * 2);
+      LambdaTestUtils.intercept(SecretManager.InvalidToken.class,
+          () -> tokenManager2.verifyToken(token1));
+      callRemoveExpiredTokensAndValidateSQL(secretManager2, token1Id, false);
+    } finally {
+      stopTokenManager(tokenManager1);
+      stopTokenManager(tokenManager2);
+    }
+  }
+
+  private Configuration getShortLivedTokenConf() {
+    Configuration shortLivedConf = new Configuration(conf);
+    shortLivedConf.setTimeDuration(
+        SQLDelegationTokenSecretManager.SQL_DTSM_TOKEN_LOADING_CACHE_EXPIRATION,
+        200, TimeUnit.MILLISECONDS);
+    shortLivedConf.setInt(DelegationTokenManager.RENEW_INTERVAL, TOKEN_EXPIRATION_SECONDS);
+    return shortLivedConf;
+  }
+
+  private void callRemoveExpiredTokensAndValidateSQL(
+      TestDelegationTokenSecretManager secretManager, AbstractDelegationTokenIdentifier tokenId,
+      boolean expectedInSQL) throws SQLException {
+    secretManager.removeExpiredStoredToken(tokenId);
+    byte[] tokenInfo = secretManager.selectTokenInfo(tokenId.getSequenceNumber(),
+        tokenId.getBytes());
+    if (expectedInSQL) {
+      Assert.assertNotNull("Verify token exists in database", tokenInfo);
+    } else {
+      Assert.assertNull("Verify token was removed from database", tokenInfo);
+    }
+  }
+
   @Test
   public void testSequenceNumAllocation() throws Exception {
     int tokensPerManager = SQLDelegationTokenSecretManagerImpl.DEFAULT_SEQ_NUM_BATCH_SIZE * 5;
@@ -292,8 +391,13 @@ public class TestSQLDelegationTokenSecretManagerImpl {
   }
 
   private DelegationTokenManager createTokenManager() {
+    return createTokenManager(conf);
+  }
+
+  private DelegationTokenManager createTokenManager(Configuration config) {
     DelegationTokenManager tokenManager = new DelegationTokenManager(new Configuration(), null);
-    tokenManager.setExternalDelegationTokenSecretManager(new TestDelegationTokenSecretManager());
+    tokenManager.setExternalDelegationTokenSecretManager(
+        new TestDelegationTokenSecretManager(config));
     return tokenManager;
   }
 
@@ -401,7 +505,7 @@ public class TestSQLDelegationTokenSecretManagerImpl {
       return keyRollLock;
     }
 
-    TestDelegationTokenSecretManager() {
+    TestDelegationTokenSecretManager(Configuration conf) {
       super(conf, new TestConnectionFactory(conf),
           SQLSecretManagerRetriableHandlerImpl.getInstance(conf, new TestRetryHandler()));
     }
@@ -428,6 +532,16 @@ public class TestSQLDelegationTokenSecretManagerImpl {
       }
     }
 
+    public void waitForTokenEviction(TokenIdentifier tokenId)
+        throws InterruptedException, TimeoutException {
+      // Wait until token is not found on cache
+      GenericTestUtils.waitFor(() -> !this.currentTokens.containsKey(tokenId), 100, 5000);
+    }
+
+    public void removeExpiredStoredToken(TokenIdentifier tokenId) {
+      super.removeExpiredStoredToken((AbstractDelegationTokenIdentifier) tokenId);
+    }
+
     public void setReadOnly(boolean readOnly) {
       ((TestConnectionFactory) getConnectionFactory()).readOnly = readOnly;
     }