|
@@ -18,6 +18,7 @@
|
|
|
|
|
|
package org.apache.hadoop.fs.s3a;
|
|
|
|
|
|
+import com.google.common.annotations.VisibleForTesting;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -29,8 +30,10 @@ import org.apache.hadoop.metrics2.MetricStringBuilder;
|
|
|
import org.apache.hadoop.metrics2.MetricsCollector;
|
|
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
|
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSource;
|
|
|
+import org.apache.hadoop.metrics2.MetricsSystem;
|
|
|
import org.apache.hadoop.metrics2.MetricsTag;
|
|
|
-import org.apache.hadoop.metrics2.annotation.Metrics;
|
|
|
+import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
|
|
import org.apache.hadoop.metrics2.lib.Interns;
|
|
|
import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
|
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
|
@@ -58,16 +61,49 @@ import static org.apache.hadoop.fs.s3a.Statistic.*;
|
|
|
* the operations to increment/query metric values are designed to handle
|
|
|
* lookup failures.
|
|
|
*/
|
|
|
-@Metrics(about = "Metrics for S3a", context = "S3AFileSystem")
|
|
|
@InterfaceAudience.Private
|
|
|
@InterfaceStability.Evolving
|
|
|
-public class S3AInstrumentation {
|
|
|
+public class S3AInstrumentation implements Closeable, MetricsSource {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(
|
|
|
S3AInstrumentation.class);
|
|
|
|
|
|
- public static final String CONTEXT = "S3AFileSystem";
|
|
|
+ private static final String METRICS_SOURCE_BASENAME = "S3AMetrics";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@value #METRICS_SYSTEM_NAME} The name of the s3a-specific metrics
|
|
|
+ * system instance used for s3a metrics.
|
|
|
+ */
|
|
|
+ public static final String METRICS_SYSTEM_NAME = "s3a-file-system";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@value #CONTEXT} Currently all s3a metrics are placed in a single
|
|
|
+ * "context". Distinct contexts may be used in the future.
|
|
|
+ */
|
|
|
+ public static final String CONTEXT = "s3aFileSystem";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@value #METRIC_TAG_FILESYSTEM_ID} The name of a field added to metrics
|
|
|
+ * records that uniquely identifies a specific FileSystem instance.
|
|
|
+ */
|
|
|
+ public static final String METRIC_TAG_FILESYSTEM_ID = "s3aFileSystemId";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * {@value #METRIC_TAG_BUCKET} The name of a field added to metrics records
|
|
|
+ * that indicates the hostname portion of the FS URL.
|
|
|
+ */
|
|
|
+ public static final String METRIC_TAG_BUCKET = "bucket";
|
|
|
+
|
|
|
+ // metricsSystemLock must be used to synchronize modifications to
|
|
|
+ // metricsSystem and the following counters.
|
|
|
+ private static Object metricsSystemLock = new Object();
|
|
|
+ private static MetricsSystem metricsSystem = null;
|
|
|
+ private static int metricsSourceNameCounter = 0;
|
|
|
+ private static int metricsSourceActiveCounter = 0;
|
|
|
+
|
|
|
+ private String metricsSourceName;
|
|
|
+
|
|
|
private final MetricsRegistry registry =
|
|
|
- new MetricsRegistry("S3AFileSystem").setContext(CONTEXT);
|
|
|
+ new MetricsRegistry("s3aFileSystem").setContext(CONTEXT);
|
|
|
private final MutableCounterLong streamOpenOperations;
|
|
|
private final MutableCounterLong streamCloseOperations;
|
|
|
private final MutableCounterLong streamClosed;
|
|
@@ -146,7 +182,6 @@ public class S3AInstrumentation {
|
|
|
STORE_IO_THROTTLED
|
|
|
};
|
|
|
|
|
|
-
|
|
|
private static final Statistic[] GAUGES_TO_CREATE = {
|
|
|
OBJECT_PUT_REQUESTS_ACTIVE,
|
|
|
OBJECT_PUT_BYTES_PENDING,
|
|
@@ -157,12 +192,10 @@ public class S3AInstrumentation {
|
|
|
|
|
|
public S3AInstrumentation(URI name) {
|
|
|
UUID fileSystemInstanceId = UUID.randomUUID();
|
|
|
- registry.tag("FileSystemId",
|
|
|
- "A unique identifier for the FS ",
|
|
|
- fileSystemInstanceId.toString() + "-" + name.getHost());
|
|
|
- registry.tag("fsURI",
|
|
|
- "URI of this filesystem",
|
|
|
- name.toString());
|
|
|
+ registry.tag(METRIC_TAG_FILESYSTEM_ID,
|
|
|
+ "A unique identifier for the instance",
|
|
|
+ fileSystemInstanceId.toString());
|
|
|
+ registry.tag(METRIC_TAG_BUCKET, "Hostname from the FS URL", name.getHost());
|
|
|
streamOpenOperations = streamCounter(STREAM_OPENED);
|
|
|
streamCloseOperations = streamCounter(STREAM_CLOSE_OPERATIONS);
|
|
|
streamClosed = streamCounter(STREAM_CLOSED);
|
|
@@ -204,6 +237,39 @@ public class S3AInstrumentation {
|
|
|
"ops", "latency", interval);
|
|
|
quantiles(S3GUARD_METADATASTORE_THROTTLE_RATE,
|
|
|
"events", "frequency (Hz)", interval);
|
|
|
+
|
|
|
+ registerAsMetricsSource(name);
|
|
|
+ }
|
|
|
+
|
|
|
+ @VisibleForTesting
|
|
|
+ public MetricsSystem getMetricsSystem() {
|
|
|
+ synchronized (metricsSystemLock) {
|
|
|
+ if (metricsSystem == null) {
|
|
|
+ metricsSystem = new MetricsSystemImpl();
|
|
|
+ metricsSystem.init(METRICS_SYSTEM_NAME);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return metricsSystem;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Register this instance as a metrics source.
|
|
|
+ * @param name s3a:// URI for the associated FileSystem instance
|
|
|
+ */
|
|
|
+ private void registerAsMetricsSource(URI name) {
|
|
|
+ int number;
|
|
|
+ synchronized(metricsSystemLock) {
|
|
|
+ getMetricsSystem();
|
|
|
+
|
|
|
+ metricsSourceActiveCounter++;
|
|
|
+ number = ++metricsSourceNameCounter;
|
|
|
+ }
|
|
|
+ String msName = METRICS_SOURCE_BASENAME + number;
|
|
|
+ if (number > 1) {
|
|
|
+ msName = msName + number;
|
|
|
+ }
|
|
|
+ metricsSourceName = msName + "-" + name.getHost();
|
|
|
+ metricsSystem.register(metricsSourceName, "", this);
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -560,6 +626,23 @@ public class S3AInstrumentation {
|
|
|
streamBytesDiscardedInAbort.incr(statistics.bytesDiscardedInAbort);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
+ public void getMetrics(MetricsCollector collector, boolean all) {
|
|
|
+ registry.snapshot(collector.addRecord(registry.info().name()), true);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void close() {
|
|
|
+ synchronized (metricsSystemLock) {
|
|
|
+ metricsSystem.unregisterSource(metricsSourceName);
|
|
|
+ int activeSources = --metricsSourceActiveCounter;
|
|
|
+ if (activeSources == 0) {
|
|
|
+ metricsSystem.publishMetricsNow();
|
|
|
+ metricsSystem.shutdown();
|
|
|
+ metricsSystem = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Statistics updated by an input stream during its actual operation.
|
|
|
* These counters not thread-safe and are for use in a single instance
|