Przeglądaj źródła

YARN-11489. Fix memory leak of DelegationTokenRenewer futures in DelegationTokenRenewerPoolTracker. (#5629). Contributed by Chun Chen.

Signed-off-by: He Xiaoqiao <hexiaoqiao@apache.org>
Chun Chen 2 lat temu
rodzic
commit
11af08d67a

+ 56 - 28
hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java

@@ -123,8 +123,8 @@ public class DelegationTokenRenewer extends AbstractService {
   private long tokenRenewerThreadTimeout;
   private long tokenRenewerThreadTimeout;
   private long tokenRenewerThreadRetryInterval;
   private long tokenRenewerThreadRetryInterval;
   private int tokenRenewerThreadRetryMaxAttempts;
   private int tokenRenewerThreadRetryMaxAttempts;
-  private final Map<DelegationTokenRenewerEvent, Future<?>> futures =
-      new ConcurrentHashMap<>();
+  private final LinkedBlockingQueue<DelegationTokenRenewerFuture> futures =
+      new LinkedBlockingQueue<>();
   private boolean delegationTokenRenewerPoolTrackerFlag = true;
   private boolean delegationTokenRenewerPoolTrackerFlag = true;
 
 
   // this config is supposedly not used by end-users.
   // this config is supposedly not used by end-users.
@@ -227,7 +227,7 @@ public class DelegationTokenRenewer extends AbstractService {
       if (isServiceStarted) {
       if (isServiceStarted) {
         Future<?> future =
         Future<?> future =
             renewerService.submit(new DelegationTokenRenewerRunnable(evt));
             renewerService.submit(new DelegationTokenRenewerRunnable(evt));
-        futures.put(evt, future);
+        futures.add(new DelegationTokenRenewerFuture(evt, future));
       } else {
       } else {
         pendingEventQueue.add(evt);
         pendingEventQueue.add(evt);
         int qSize = pendingEventQueue.size();
         int qSize = pendingEventQueue.size();
@@ -998,33 +998,35 @@ public class DelegationTokenRenewer extends AbstractService {
     @Override
     @Override
     public void run() {
     public void run() {
       while (true) {
       while (true) {
-        for (Map.Entry<DelegationTokenRenewerEvent, Future<?>> entry : futures
-            .entrySet()) {
-          DelegationTokenRenewerEvent evt = entry.getKey();
-          Future<?> future = entry.getValue();
-          try {
-            future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
-          } catch (TimeoutException e) {
-
-            // Cancel thread and retry the same event in case of timeout
-            if (future != null && !future.isDone() && !future.isCancelled()) {
-              future.cancel(true);
-              futures.remove(evt);
-              if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
-                renewalTimer.schedule(
-                    getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
-                    tokenRenewerThreadRetryInterval);
-              } else {
-                LOG.info(
-                    "Exhausted max retry attempts {} in token renewer "
-                        + "thread for {}",
-                    tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
-              }
+        DelegationTokenRenewerFuture dtrf;
+        try {
+          dtrf = futures.take();
+        } catch (InterruptedException e) {
+          LOG.debug("DelegationTokenRenewer pool tracker interrupted");
+          return;
+        }
+        DelegationTokenRenewerEvent evt = dtrf.getEvt();
+        Future<?> future = dtrf.getFuture();
+        try {
+          future.get(tokenRenewerThreadTimeout, TimeUnit.MILLISECONDS);
+        } catch (TimeoutException e) {
+          // Cancel thread and retry the same event in case of timeout.
+          if (!future.isDone() && !future.isCancelled()) {
+            future.cancel(true);
+            if (evt.getAttempt() < tokenRenewerThreadRetryMaxAttempts) {
+              renewalTimer.schedule(
+                  getTimerTask((AbstractDelegationTokenRenewerAppEvent) evt),
+                  tokenRenewerThreadRetryInterval);
+            } else {
+              LOG.info(
+                  "Exhausted max retry attempts {} in token renewer "
+                      + "thread for {}",
+                  tokenRenewerThreadRetryMaxAttempts, evt.getApplicationId());
             }
             }
-          } catch (Exception e) {
-            LOG.info("Problem in submitting renew tasks in token renewer "
-                + "thread.", e);
           }
           }
+        } catch (Exception e) {
+          LOG.info("Problem in submitting renew tasks in token renewer "
+              + "thread.", e);
         }
         }
       }
       }
     }
     }
@@ -1192,6 +1194,32 @@ public class DelegationTokenRenewer extends AbstractService {
     }
     }
   }
   }
 
 
+  private static class DelegationTokenRenewerFuture {
+    private DelegationTokenRenewerEvent evt;
+    private Future<?> future;
+    DelegationTokenRenewerFuture(DelegationTokenRenewerEvent evt,
+        Future<?> future) {
+      this.future = future;
+      this.evt = evt;
+    }
+
+    public DelegationTokenRenewerEvent getEvt() {
+      return evt;
+    }
+
+    public void setEvt(DelegationTokenRenewerEvent evt) {
+      this.evt = evt;
+    }
+
+    public Future<?> getFuture() {
+      return future;
+    }
+
+    public void setFuture(Future<?> future) {
+      this.future = future;
+    }
+  }
+
   // only for testing
   // only for testing
   protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
   protected ConcurrentMap<Token<?>, DelegationTokenToRenew> getAllTokens() {
     return allTokens;
     return allTokens;