Browse Source

HADOOP-13263. Reload cached groups in background after expiry. (Contributed bt Stephen O'Donnell)

(cherry picked from commit 9683eab0e1ee42c159cf678254c464d97bc3ff57)
Arpit Agarwal 9 years ago
parent
commit
9ff8597622

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -15,6 +15,9 @@ Release 2.7.6 - UNRELEASED
     HADOOP-15177. Update the release year to 2018.
     (Bharat Viswanadham via aajisaka)
 
+    HADOOP-13263. Reload cached groups in background after expiry.
+    (Stephen O'Donnell via Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 13 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

@@ -271,6 +271,19 @@ public class CommonConfigurationKeysPublic {
   public static final long HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT =
     5000;
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */
+  public static final String HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD =
+      "hadoop.security.groups.cache.background.reload";
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
+  public static final boolean
+      HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_DEFAULT = false;
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
+  public static final String
+      HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS =
+          "hadoop.security.groups.cache.background.reload.threads";
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>. */
+  public static final int
+      HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS_DEFAULT = 3;
+  /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a>.*/
   public static final String  HADOOP_SECURITY_AUTHENTICATION =
     "hadoop.security.authentication";
   /** See <a href="{@docRoot}/../core-default.html">core-default.xml</a> */

+ 117 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/Groups.java

@@ -26,9 +26,14 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Ticker;
@@ -36,6 +41,10 @@ import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -72,6 +81,17 @@ public class Groups {
   private final long warningDeltaMs;
   private final Timer timer;
   private Set<String> negativeCache;
+  private final boolean reloadGroupsInBackground;
+  private final int reloadGroupsThreadCount;
+
+  private final AtomicLong backgroundRefreshSuccess =
+      new AtomicLong(0);
+  private final AtomicLong backgroundRefreshException =
+      new AtomicLong(0);
+  private final AtomicLong backgroundRefreshQueued =
+      new AtomicLong(0);
+  private final AtomicLong backgroundRefreshRunning =
+      new AtomicLong(0);
 
   public Groups(Configuration conf) {
     this(conf, new Timer());
@@ -94,6 +114,18 @@ public class Groups {
     warningDeltaMs =
       conf.getLong(CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS,
         CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_WARN_AFTER_MS_DEFAULT);
+    reloadGroupsInBackground =
+      conf.getBoolean(
+          CommonConfigurationKeys.
+              HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
+          CommonConfigurationKeys.
+              HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_DEFAULT);
+    reloadGroupsThreadCount  =
+      conf.getInt(
+          CommonConfigurationKeys.
+              HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS,
+          CommonConfigurationKeys.
+              HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS_DEFAULT);
     parseStaticMapping(conf);
 
     this.timer = timer;
@@ -195,6 +227,22 @@ public class Groups {
     }
   }
 
+  public long getBackgroundRefreshSuccess() {
+    return backgroundRefreshSuccess.get();
+  }
+
+  public long getBackgroundRefreshException() {
+    return backgroundRefreshException.get();
+  }
+
+  public long getBackgroundRefreshQueued() {
+    return backgroundRefreshQueued.get();
+  }
+
+  public long getBackgroundRefreshRunning() {
+    return backgroundRefreshRunning.get();
+  }
+
   /**
    * Convert millisecond times from hadoop's timer to guava's nanosecond ticker.
    */
@@ -216,11 +264,41 @@ public class Groups {
    * Deals with loading data into the cache.
    */
   private class GroupCacheLoader extends CacheLoader<String, List<String>> {
+
+    private ListeningExecutorService executorService;
+
+    GroupCacheLoader() {
+      if (reloadGroupsInBackground) {
+        ThreadFactory threadFactory = new ThreadFactoryBuilder()
+            .setNameFormat("Group-Cache-Reload")
+            .setDaemon(true)
+            .build();
+        // With coreThreadCount == maxThreadCount we effectively
+        // create a fixed size thread pool. As allowCoreThreadTimeOut
+        // has been set, all threads will die after 60 seconds of non use
+        ThreadPoolExecutor parentExecutor =  new ThreadPoolExecutor(
+            reloadGroupsThreadCount,
+            reloadGroupsThreadCount,
+            60,
+            TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            threadFactory);
+        parentExecutor.allowCoreThreadTimeOut(true);
+        executorService = MoreExecutors.listeningDecorator(parentExecutor);
+      }
+    }
+
     /**
      * This method will block if a cache entry doesn't exist, and
      * any subsequent requests for the same user will wait on this
      * request to return. If a user already exists in the cache,
-     * this will be run in the background.
+     * and when the key expires, the first call to reload the key
+     * will block, but subsequent requests will return the old
+     * value until the blocking thread returns.
+     * If reloadGroupsInBackground is true, then the thread that
+     * needs to refresh an expired key will not block either. Instead
+     * it will return the old cache value and schedule a background
+     * refresh
      * @param user key of cache
      * @return List of groups belonging to user
      * @throws IOException to prevent caching negative entries
@@ -243,6 +321,44 @@ public class Groups {
           new ArrayList<>(new LinkedHashSet<>(groups)));
     }
 
+    /**
+     * Override the reload method to provide an asynchronous implementation. If
+     * reloadGroupsInBackground is false, then this method defers to the super
+     * implementation, otherwise is arranges for the cache to be updated later
+     */
+    @Override
+    public ListenableFuture<List<String>> reload(final String key,
+                                                 List<String> oldValue)
+        throws Exception {
+      if (!reloadGroupsInBackground) {
+        return super.reload(key, oldValue);
+      }
+
+      backgroundRefreshQueued.incrementAndGet();
+      ListenableFuture<List<String>> listenableFuture =
+          executorService.submit(new Callable<List<String>>() {
+            @Override
+            public List<String> call() throws Exception {
+              boolean success = false;
+              try {
+                backgroundRefreshQueued.decrementAndGet();
+                backgroundRefreshRunning.incrementAndGet();
+                List<String> results = load(key);
+                success = true;
+                return results;
+              } finally {
+                backgroundRefreshRunning.decrementAndGet();
+                if (success) {
+                  backgroundRefreshSuccess.incrementAndGet();
+                } else {
+                  backgroundRefreshException.incrementAndGet();
+                }
+              }
+            }
+          });
+      return listenableFuture;
+    }
+
     /**
      * Queries impl for groups belonging to the user. This could involve I/O and take awhile.
      */

+ 22 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -223,6 +223,28 @@ for ldap providers in the same way as above does.
   </description>
 </property>
 
+<property>
+  <name>hadoop.security.groups.cache.background.reload</name>
+  <value>false</value>
+  <description>
+    Whether to reload expired user->group mappings using a background thread
+    pool. If set to true, a pool of
+    hadoop.security.groups.cache.background.reload.threads is created to
+    update the cache in the background.
+  </description>
+</property>
+
+<property>
+  <name>hadoop.security.groups.cache.background.reload.threads</name>
+  <value>3</value>
+  <description>
+    Only relevant if hadoop.security.groups.cache.background.reload is true.
+    Controls the number of concurrent background user->group cache entry
+    refreshes. Pending refresh requests beyond this value are queued and
+    processed when a thread is free.
+  </description>
+</property>
+
 <property>
   <name>hadoop.security.group.mapping.ldap.url</name>
   <value></value>

+ 266 - 3
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/TestGroupsCaching.java

@@ -50,8 +50,8 @@ public class TestGroupsCaching {
   private Configuration conf;
 
   @Before
-  public void setup() {
-    FakeGroupMapping.resetRequestCount();
+  public void setup() throws IOException {
+    FakeGroupMapping.clearAll();
     ExceptionalGroupMapping.resetRequestCount();
 
     conf = new Configuration();
@@ -66,13 +66,18 @@ public class TestGroupsCaching {
     private static Set<String> blackList = new HashSet<String>();
     private static int requestCount = 0;
     private static long getGroupsDelayMs = 0;
+    private static boolean throwException;
 
     @Override
     public List<String> getGroups(String user) throws IOException {
       LOG.info("Getting groups for " + user);
+      delayIfNecessary();
+
       requestCount++;
 
-      delayIfNecessary();
+      if (throwException) {
+        throw new IOException("For test");
+      }
 
       if (blackList.contains(user)) {
         return new LinkedList<String>();
@@ -102,6 +107,15 @@ public class TestGroupsCaching {
       blackList.clear();
     }
 
+    public static void clearAll() throws IOException {
+      LOG.info("Resetting FakeGroupMapping");
+      blackList.clear();
+      allGroups.clear();
+      requestCount = 0;
+      getGroupsDelayMs = 0;
+      throwException = false;
+    }
+
     @Override
     public void cacheGroupsAdd(List<String> groups) throws IOException {
       LOG.info("Adding " + groups + " to groups.");
@@ -124,6 +138,10 @@ public class TestGroupsCaching {
     public static void setGetGroupsDelayMs(long delayMs) {
       getGroupsDelayMs = delayMs;
     }
+
+    public static void setThrowException(boolean throwIfTrue) {
+      throwException = throwIfTrue;
+    }
   }
 
   public static class ExceptionalGroupMapping extends ShellBasedUnixGroupsMapping {
@@ -403,6 +421,251 @@ public class TestGroupsCaching {
     assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
   }
 
+  @Test
+  public void testThreadNotBlockedWhenExpiredEntryExistsWithBackgroundRefresh()
+      throws Exception {
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    conf.setBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
+        true);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    // We make an initial request to populate the cache
+    groups.getGroups("me");
+    // Further lookups will have a delay
+    FakeGroupMapping.setGetGroupsDelayMs(100);
+    // add another groups
+    groups.cacheGroupsAdd(Arrays.asList("grp3"));
+    int startingRequestCount = FakeGroupMapping.getRequestCount();
+
+    // Then expire that entry
+    timer.advance(4 * 1000);
+
+    // Now get the cache entry - it should return immediately
+    // with the old value and the cache will not have completed
+    // a request to getGroups yet.
+    assertEquals(groups.getGroups("me").size(), 2);
+    assertEquals(startingRequestCount, FakeGroupMapping.getRequestCount());
+
+    // Now sleep for over the delay time and the request count should
+    // have completed
+    Thread.sleep(110);
+    assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
+    // Another call to get groups should give 3 groups instead of 2
+    assertEquals(groups.getGroups("me").size(), 3);
+  }
+
+  @Test
+  public void testThreadBlockedWhenExpiredEntryExistsWithoutBackgroundRefresh()
+      throws Exception {
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    conf.setBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
+        false);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    // We make an initial request to populate the cache
+    groups.getGroups("me");
+    // Further lookups will have a delay
+    FakeGroupMapping.setGetGroupsDelayMs(100);
+    // add another group
+    groups.cacheGroupsAdd(Arrays.asList("grp3"));
+    int startingRequestCount = FakeGroupMapping.getRequestCount();
+
+    // Then expire that entry
+    timer.advance(4 * 1000);
+
+    // Now get the cache entry - it should block and return the new
+    // 3 group value
+    assertEquals(groups.getGroups("me").size(), 3);
+    assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
+  }
+
+  @Test
+  public void testExceptionOnBackgroundRefreshHandled() throws Exception {
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    conf.setBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
+        true);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    // We make an initial request to populate the cache
+    groups.getGroups("me");
+
+    // add another group
+    groups.cacheGroupsAdd(Arrays.asList("grp3"));
+    int startingRequestCount = FakeGroupMapping.getRequestCount();
+    // Arrange for an exception to occur only on the
+    // second call
+    FakeGroupMapping.setThrowException(true);
+
+    // Then expire that entry
+    timer.advance(4 * 1000);
+
+    // Now get the cache entry - it should return immediately
+    // with the old value and the cache will not have completed
+    // a request to getGroups yet.
+    assertEquals(groups.getGroups("me").size(), 2);
+    assertEquals(startingRequestCount, FakeGroupMapping.getRequestCount());
+
+    // Now sleep for a short time and re-check the request count. It should have
+    // increased, but the exception means the cache will not have updated
+    Thread.sleep(50);
+    FakeGroupMapping.setThrowException(false);
+    assertEquals(startingRequestCount + 1, FakeGroupMapping.getRequestCount());
+    assertEquals(groups.getGroups("me").size(), 2);
+
+    // Now sleep another short time - the 3rd call to getGroups above
+    // will have kicked off another refresh that updates the cache
+    Thread.sleep(50);
+    assertEquals(startingRequestCount + 2, FakeGroupMapping.getRequestCount());
+    assertEquals(groups.getGroups("me").size(), 3);
+  }
+
+
+  @Test
+  public void testEntriesExpireIfBackgroundRefreshFails() throws Exception {
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    conf.setBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
+        true);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    // We make an initial request to populate the cache
+    groups.getGroups("me");
+
+    // Now make all calls to the FakeGroupMapper throw exceptions
+    FakeGroupMapping.setThrowException(true);
+
+    // The cache entry expires for refresh after 1 second
+    // It expires for eviction after 1 * 10 seconds after it was last written
+    // So if we call getGroups repeatedly over 9 seconds, 9 refreshes should
+    // be triggered which will fail to update the key, but the keys old value
+    // will be retrievable until it is evicted after about 10 seconds.
+    for(int i=0; i<9; i++) {
+      assertEquals(groups.getGroups("me").size(), 2);
+      timer.advance(1 * 1000);
+    }
+    // Wait until the 11th second. The call to getGroups should throw
+    // an exception as the key will have been evicted and FakeGroupMapping
+    // will throw IO Exception when it is asked for new groups. In this case
+    // load must be called synchronously as there is no key present
+    timer.advance(2 * 1000);
+    try {
+      groups.getGroups("me");
+      fail("Should have thrown an exception here");
+    } catch (Exception e) {
+      // pass
+    }
+
+    // Finally check groups are retrieve again after FakeGroupMapping
+    // stops throw exceptions
+    FakeGroupMapping.setThrowException(false);
+    assertEquals(groups.getGroups("me").size(), 2);
+  }
+
+  @Test
+  public void testBackgroundRefreshCounters()
+      throws IOException, InterruptedException {
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    conf.setBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
+        true);
+    conf.setInt(
+        CommonConfigurationKeys.
+            HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD_THREADS,
+        2);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    // populate the cache
+    String[] grps = {"one", "two", "three", "four", "five"};
+    for (String g: grps) {
+      groups.getGroups(g);
+    }
+
+    // expire the cache
+    timer.advance(2*1000);
+    FakeGroupMapping.setGetGroupsDelayMs(40);
+
+    // Request all groups again, as there are 2 threads to process them
+    // 3 should get queued and 2 should be running
+    for (String g: grps) {
+      groups.getGroups(g);
+    }
+    Thread.sleep(20);
+    assertEquals(groups.getBackgroundRefreshQueued(), 3);
+    assertEquals(groups.getBackgroundRefreshRunning(), 2);
+
+    // After 120ms all should have completed running
+    Thread.sleep(120);
+    assertEquals(groups.getBackgroundRefreshQueued(), 0);
+    assertEquals(groups.getBackgroundRefreshRunning(), 0);
+    assertEquals(groups.getBackgroundRefreshSuccess(), 5);
+
+    // Now run again, this time throwing exceptions but no delay
+    timer.advance(2*1000);
+    FakeGroupMapping.setGetGroupsDelayMs(0);
+    FakeGroupMapping.setThrowException(true);
+    for (String g: grps) {
+      groups.getGroups(g);
+    }
+    Thread.sleep(20);
+    assertEquals(groups.getBackgroundRefreshQueued(), 0);
+    assertEquals(groups.getBackgroundRefreshRunning(), 0);
+    assertEquals(groups.getBackgroundRefreshSuccess(), 5);
+    assertEquals(groups.getBackgroundRefreshException(), 5);
+  }
+
+  @Test
+  public void testExceptionCallingLoadWithoutBackgroundRefreshReturnsOldValue()
+      throws Exception {
+    conf.setLong(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_SECS, 1);
+    conf.setBoolean(
+        CommonConfigurationKeys.HADOOP_SECURITY_GROUPS_CACHE_BACKGROUND_RELOAD,
+        false);
+    FakeTimer timer = new FakeTimer();
+    final Groups groups = new Groups(conf, timer);
+    groups.cacheGroupsAdd(Arrays.asList(myGroups));
+    groups.refresh();
+    FakeGroupMapping.clearBlackList();
+
+    // First populate the cash
+    assertEquals(groups.getGroups("me").size(), 2);
+
+    // Advance the timer so a refresh is required
+    timer.advance(2 * 1000);
+
+    // This call should throw an exception
+    FakeGroupMapping.setThrowException(true);
+    assertEquals(groups.getGroups("me").size(), 2);
+  }
+
   @Test
   public void testCacheEntriesExpire() throws Exception {
     conf.setLong(