|
@@ -27,6 +27,7 @@ import org.slf4j.LoggerFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
+import org.apache.hadoop.fs.impl.WeakRefMetricsSource;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics;
|
|
|
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
|
|
@@ -160,7 +161,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
|
|
|
|
|
private final DurationTrackerFactory durationTrackerFactory;
|
|
|
|
|
|
- private String metricsSourceName;
|
|
|
+ /**
|
|
|
+ * Weak reference so there's no back reference to the instrumentation.
|
|
|
+ */
|
|
|
+ private WeakRefMetricsSource metricsSourceReference;
|
|
|
|
|
|
private final MetricsRegistry registry =
|
|
|
new MetricsRegistry("s3aFileSystem").setContext(CONTEXT);
|
|
@@ -233,19 +237,33 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
|
|
new MetricDurationTrackerFactory());
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get the current metrics system; demand creating.
|
|
|
+ * @return a metric system, creating if need be.
|
|
|
+ */
|
|
|
@VisibleForTesting
|
|
|
- public MetricsSystem getMetricsSystem() {
|
|
|
+ static MetricsSystem getMetricsSystem() {
|
|
|
synchronized (METRICS_SYSTEM_LOCK) {
|
|
|
if (metricsSystem == null) {
|
|
|
metricsSystem = new MetricsSystemImpl();
|
|
|
metricsSystem.init(METRICS_SYSTEM_NAME);
|
|
|
+ LOG.debug("Metrics system inited {}", metricsSystem);
|
|
|
}
|
|
|
}
|
|
|
return metricsSystem;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * Register this instance as a metrics source.
|
|
|
+ * Does the instrumentation have a metrics system?
|
|
|
+ * @return true if the metrics system is present.
|
|
|
+ */
|
|
|
+ @VisibleForTesting
|
|
|
+ static boolean hasMetricSystem() {
|
|
|
+ return metricsSystem != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Register this instance as a metrics source via a weak reference.
|
|
|
* @param name s3a:// URI for the associated FileSystem instance
|
|
|
*/
|
|
|
private void registerAsMetricsSource(URI name) {
|
|
@@ -257,8 +275,9 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
|
|
number = ++metricsSourceNameCounter;
|
|
|
}
|
|
|
String msName = METRICS_SOURCE_BASENAME + number;
|
|
|
- metricsSourceName = msName + "-" + name.getHost();
|
|
|
- metricsSystem.register(metricsSourceName, "", this);
|
|
|
+ String metricsSourceName = msName + "-" + name.getHost();
|
|
|
+ metricsSourceReference = new WeakRefMetricsSource(metricsSourceName, this);
|
|
|
+ metricsSystem.register(metricsSourceName, "", metricsSourceReference);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -680,19 +699,42 @@ public class S3AInstrumentation implements Closeable, MetricsSource,
|
|
|
registry.snapshot(collector.addRecord(registry.info().name()), true);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * if registered with the metrics, return the
|
|
|
+ * name of the source.
|
|
|
+ * @return the name of the metrics, or null if this instance is not bonded.
|
|
|
+ */
|
|
|
+ public String getMetricSourceName() {
|
|
|
+ return metricsSourceReference != null
|
|
|
+ ? metricsSourceReference.getName()
|
|
|
+ : null;
|
|
|
+ }
|
|
|
+
|
|
|
public void close() {
|
|
|
- synchronized (METRICS_SYSTEM_LOCK) {
|
|
|
- // it is critical to close each quantile, as they start a scheduled
|
|
|
- // task in a shared thread pool.
|
|
|
- throttleRateQuantile.stop();
|
|
|
- metricsSystem.unregisterSource(metricsSourceName);
|
|
|
- metricsSourceActiveCounter--;
|
|
|
- int activeSources = metricsSourceActiveCounter;
|
|
|
- if (activeSources == 0) {
|
|
|
- LOG.debug("Shutting down metrics publisher");
|
|
|
- metricsSystem.publishMetricsNow();
|
|
|
- metricsSystem.shutdown();
|
|
|
- metricsSystem = null;
|
|
|
+ if (metricsSourceReference != null) {
|
|
|
+ // get the name
|
|
|
+ String name = metricsSourceReference.getName();
|
|
|
+ LOG.debug("Unregistering metrics for {}", name);
|
|
|
+ // then set to null so a second close() is a noop here.
|
|
|
+ metricsSourceReference = null;
|
|
|
+ synchronized (METRICS_SYSTEM_LOCK) {
|
|
|
+ // it is critical to close each quantile, as they start a scheduled
|
|
|
+ // task in a shared thread pool.
|
|
|
+ if (metricsSystem == null) {
|
|
|
+ LOG.debug("there is no metric system to unregister {} from", name);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ throttleRateQuantile.stop();
|
|
|
+
|
|
|
+ metricsSystem.unregisterSource(name);
|
|
|
+ metricsSourceActiveCounter--;
|
|
|
+ int activeSources = metricsSourceActiveCounter;
|
|
|
+ if (activeSources == 0) {
|
|
|
+ LOG.debug("Shutting down metrics publisher");
|
|
|
+ metricsSystem.publishMetricsNow();
|
|
|
+ metricsSystem.shutdown();
|
|
|
+ metricsSystem = null;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|