Jelajahi Sumber

HADOOP-11105. MetricsSystemImpl could leak memory in registered callbacks. Contributed by Chuan Liu.

(cherry picked from commit 1942364ef14396e9bd94a87c0d901ff9abe1d42a)
cnauroth 10 tahun lalu
induk
melakukan
bf26b9be39

+ 28 - 7
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java

@@ -83,7 +83,12 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
   private final Map<String, MetricsSource> allSources;
   private final Map<String, MetricsSinkAdapter> sinks;
   private final Map<String, MetricsSink> allSinks;
+
+  // The callback list is used by register(Callback callback), while
+  // the callback map is used by register(String name, String desc, T sink)
   private final List<Callback> callbacks;
+  private final Map<String, Callback> namedCallbacks;
+
   private final MetricsCollectorImpl collector;
   private final MetricsRegistry registry = new MetricsRegistry(MS_NAME);
   @Metric({"Snapshot", "Snapshot stats"}) MutableStat snapshotStat;
@@ -119,6 +124,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     sourceConfigs = Maps.newHashMap();
     sinkConfigs = Maps.newHashMap();
     callbacks = Lists.newArrayList();
+    namedCallbacks = Maps.newHashMap();
     injectedTags = Lists.newArrayList();
     collector = new MetricsCollectorImpl();
     if (prefix != null) {
@@ -178,11 +184,13 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
       return;
     }
     for (Callback cb : callbacks) cb.preStart();
+    for (Callback cb : namedCallbacks.values()) cb.preStart();
     configure(prefix);
     startTimer();
     monitoring = true;
     LOG.info(prefix +" metrics system started");
     for (Callback cb : callbacks) cb.postStart();
+    for (Callback cb : namedCallbacks.values()) cb.postStart();
   }
 
   @Override
@@ -198,6 +206,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
       return;
     }
     for (Callback cb : callbacks) cb.preStop();
+    for (Callback cb : namedCallbacks.values()) cb.preStop();
     LOG.info("Stopping "+ prefix +" metrics system...");
     stopTimer();
     stopSources();
@@ -206,6 +215,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     monitoring = false;
     LOG.info(prefix +" metrics system stopped.");
     for (Callback cb : callbacks) cb.postStop();
+    for (Callback cb : namedCallbacks.values()) cb.postStop();
   }
 
   @Override public synchronized <T>
@@ -224,7 +234,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     }
     // We want to re-register the source to pick up new config when the
     // metrics system restarts.
-    register(new AbstractCallback() {
+    register(name, new AbstractCallback() {
       @Override public void postStart() {
         registerSource(finalName, finalDesc, s);
       }
@@ -241,6 +251,9 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     if (allSources.containsKey(name)) {
       allSources.remove(name);
     }
+    if (namedCallbacks.containsKey(name)) {
+      namedCallbacks.remove(name);
+    }
   }
 
   synchronized
@@ -270,7 +283,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     }
     // We want to re-register the sink to pick up new config
     // when the metrics system restarts.
-    register(new AbstractCallback() {
+    register(name, new AbstractCallback() {
       @Override public void postStart() {
         register(name, description, sink);
       }
@@ -291,9 +304,16 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
 
   @Override
   public synchronized void register(final Callback callback) {
-    callbacks.add((Callback) Proxy.newProxyInstance(
-        callback.getClass().getClassLoader(), new Class<?>[] { Callback.class },
-        new InvocationHandler() {
+    callbacks.add((Callback) getProxyForCallback(callback));
+  }
+
+  private synchronized void register(String name, final Callback callback) {
+    namedCallbacks.put(name, (Callback) getProxyForCallback(callback));
+  }
+
+  private Object getProxyForCallback(final Callback callback) {
+    return Proxy.newProxyInstance(callback.getClass().getClassLoader(),
+        new Class<?>[] { Callback.class }, new InvocationHandler() {
           @Override
           public Object invoke(Object proxy, Method method, Object[] args)
               throws Throwable {
@@ -302,11 +322,11 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
             }
             catch (Exception e) {
               // These are not considered fatal.
-              LOG.warn("Caught exception in callback "+ method.getName(), e);
+              LOG.warn("Caught exception in callback " + method.getName(), e);
             }
             return null;
           }
-        }));
+        });
   }
 
   @Override
@@ -577,6 +597,7 @@ public class MetricsSystemImpl extends MetricsSystem implements MetricsSource {
     allSources.clear();
     allSinks.clear();
     callbacks.clear();
+    namedCallbacks.clear();
     if (mbeanName != null) {
       MBeans.unregister(mbeanName);
       mbeanName = null;