|
@@ -21,6 +21,7 @@ package org.apache.hadoop.metrics2.sink;
|
|
|
import org.apache.commons.configuration2.SubsetConfiguration;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.classification.VisibleForTesting;
|
|
|
import org.apache.hadoop.metrics2.AbstractMetric;
|
|
|
import org.apache.hadoop.metrics2.MetricsException;
|
|
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
|
@@ -37,171 +38,173 @@ import java.net.Socket;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
|
|
|
|
/**
|
|
|
- * A metrics sink that writes to a Graphite server
|
|
|
+ * A metrics sink that writes to a Graphite server.
|
|
|
*/
|
|
|
@InterfaceAudience.Public
|
|
|
@InterfaceStability.Evolving
|
|
|
public class GraphiteSink implements MetricsSink, Closeable {
|
|
|
- private static final Logger LOG =
|
|
|
- LoggerFactory.getLogger(GraphiteSink.class);
|
|
|
- private static final String SERVER_HOST_KEY = "server_host";
|
|
|
- private static final String SERVER_PORT_KEY = "server_port";
|
|
|
- private static final String METRICS_PREFIX = "metrics_prefix";
|
|
|
- private String metricsPrefix = null;
|
|
|
- private Graphite graphite = null;
|
|
|
-
|
|
|
- @Override
|
|
|
- public void init(SubsetConfiguration conf) {
|
|
|
- // Get Graphite host configurations.
|
|
|
- final String serverHost = conf.getString(SERVER_HOST_KEY);
|
|
|
- final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
|
|
|
-
|
|
|
- // Get Graphite metrics graph prefix.
|
|
|
- metricsPrefix = conf.getString(METRICS_PREFIX);
|
|
|
- if (metricsPrefix == null)
|
|
|
- metricsPrefix = "";
|
|
|
-
|
|
|
- graphite = new Graphite(serverHost, serverPort);
|
|
|
- graphite.connect();
|
|
|
+ private static final Logger LOG =
|
|
|
+ LoggerFactory.getLogger(GraphiteSink.class);
|
|
|
+ private static final String SERVER_HOST_KEY = "server_host";
|
|
|
+ private static final String SERVER_PORT_KEY = "server_port";
|
|
|
+ private static final String METRICS_PREFIX = "metrics_prefix";
|
|
|
+ private String metricsPrefix = null;
|
|
|
+ private Graphite graphite = null;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void init(SubsetConfiguration conf) {
|
|
|
+ // Get Graphite host configurations.
|
|
|
+ final String serverHost = conf.getString(SERVER_HOST_KEY);
|
|
|
+ final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
|
|
|
+
|
|
|
+ // Get Graphite metrics graph prefix.
|
|
|
+ metricsPrefix = conf.getString(METRICS_PREFIX);
|
|
|
+ if (metricsPrefix == null) {
|
|
|
+ metricsPrefix = "";
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void putMetrics(MetricsRecord record) {
|
|
|
- StringBuilder lines = new StringBuilder();
|
|
|
- StringBuilder metricsPathPrefix = new StringBuilder();
|
|
|
-
|
|
|
- // Configure the hierarchical place to display the graph.
|
|
|
- metricsPathPrefix.append(metricsPrefix).append(".")
|
|
|
- .append(record.context()).append(".").append(record.name());
|
|
|
-
|
|
|
- for (MetricsTag tag : record.tags()) {
|
|
|
- if (tag.value() != null) {
|
|
|
- metricsPathPrefix.append(".")
|
|
|
- .append(tag.name())
|
|
|
- .append("=")
|
|
|
- .append(tag.value());
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds.
|
|
|
- long timestamp = record.timestamp() / 1000L;
|
|
|
+ graphite = new Graphite(serverHost, serverPort);
|
|
|
+ graphite.connect();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void putMetrics(MetricsRecord record) {
|
|
|
+ StringBuilder lines = new StringBuilder();
|
|
|
+ StringBuilder metricsPathPrefix = new StringBuilder();
|
|
|
+
|
|
|
+ // Configure the hierarchical place to display the graph.
|
|
|
+ metricsPathPrefix.append(metricsPrefix).append(".")
|
|
|
+ .append(record.context()).append(".").append(record.name());
|
|
|
+
|
|
|
+ for (MetricsTag tag : record.tags()) {
|
|
|
+ if (tag.value() != null) {
|
|
|
+ metricsPathPrefix.append(".")
|
|
|
+ .append(tag.name())
|
|
|
+ .append("=")
|
|
|
+ .append(tag.value());
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- // Collect datapoints.
|
|
|
- for (AbstractMetric metric : record.metrics()) {
|
|
|
- lines.append(
|
|
|
- metricsPathPrefix.toString() + "."
|
|
|
- + metric.name().replace(' ', '.')).append(" ")
|
|
|
- .append(metric.value()).append(" ").append(timestamp)
|
|
|
- .append("\n");
|
|
|
- }
|
|
|
+ // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds.
|
|
|
+ long timestamp = record.timestamp() / 1000L;
|
|
|
|
|
|
- try {
|
|
|
- graphite.write(lines.toString());
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Error sending metrics to Graphite", e);
|
|
|
- try {
|
|
|
- graphite.close();
|
|
|
- } catch (Exception e1) {
|
|
|
- throw new MetricsException("Error closing connection to Graphite", e1);
|
|
|
- }
|
|
|
- }
|
|
|
+ // Collect datapoints.
|
|
|
+ for (AbstractMetric metric : record.metrics()) {
|
|
|
+ lines.append(metricsPathPrefix + "." + metric.name().replace(' ', '.')).append(" ")
|
|
|
+ .append(metric.value()).append(" ").append(timestamp)
|
|
|
+ .append("\n");
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void flush() {
|
|
|
+ try {
|
|
|
+ graphite.write(lines.toString());
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Error sending metrics to Graphite.", e);
|
|
|
try {
|
|
|
- graphite.flush();
|
|
|
- } catch (Exception e) {
|
|
|
- LOG.warn("Error flushing metrics to Graphite", e);
|
|
|
- try {
|
|
|
- graphite.close();
|
|
|
- } catch (Exception e1) {
|
|
|
- throw new MetricsException("Error closing connection to Graphite", e1);
|
|
|
- }
|
|
|
+ graphite.close();
|
|
|
+ } catch (Exception e1) {
|
|
|
+ throw new MetricsException("Error closing connection to Graphite", e1);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- @Override
|
|
|
- public void close() throws IOException {
|
|
|
- graphite.close();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void flush() {
|
|
|
+ try {
|
|
|
+ graphite.flush();
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.warn("Error flushing metrics to Graphite.", e);
|
|
|
+ try {
|
|
|
+ graphite.close();
|
|
|
+ } catch (Exception e1) {
|
|
|
+ throw new MetricsException("Error closing connection to Graphite.", e1);
|
|
|
+ }
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- public static class Graphite {
|
|
|
- private final static int MAX_CONNECTION_FAILURES = 5;
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ graphite.close();
|
|
|
+ }
|
|
|
|
|
|
- private String serverHost;
|
|
|
- private int serverPort;
|
|
|
- private Writer writer = null;
|
|
|
- private Socket socket = null;
|
|
|
- private int connectionFailures = 0;
|
|
|
+ public static class Graphite {
|
|
|
+ private final static int MAX_CONNECTION_FAILURES = 5;
|
|
|
|
|
|
- public Graphite(String serverHost, int serverPort) {
|
|
|
- this.serverHost = serverHost;
|
|
|
- this.serverPort = serverPort;
|
|
|
- }
|
|
|
+ private String serverHost;
|
|
|
+ private int serverPort;
|
|
|
+ private Writer writer = null;
|
|
|
+ private Socket socket = null;
|
|
|
+ private int connectionFailures = 0;
|
|
|
|
|
|
- public void connect() {
|
|
|
- if (isConnected()) {
|
|
|
- throw new MetricsException("Already connected to Graphite");
|
|
|
- }
|
|
|
- if (tooManyConnectionFailures()) {
|
|
|
- // return silently (there was ERROR in logs when we reached limit for the first time)
|
|
|
- return;
|
|
|
- }
|
|
|
- try {
|
|
|
+ public Graphite(String serverHost, int serverPort) {
|
|
|
+ this.serverHost = serverHost;
|
|
|
+ this.serverPort = serverPort;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void connect() {
|
|
|
+ if (isConnected()) {
|
|
|
+ throw new MetricsException("Already connected to Graphite");
|
|
|
+ }
|
|
|
+ if (tooManyConnectionFailures()) {
|
|
|
+ // return silently (there was ERROR in logs when we reached limit for the first time)
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
// Open a connection to Graphite server.
|
|
|
- socket = new Socket(serverHost, serverPort);
|
|
|
+ socket = new Socket(serverHost, serverPort);
|
|
|
writer = new OutputStreamWriter(socket.getOutputStream(),
|
|
|
StandardCharsets.UTF_8);
|
|
|
- } catch (Exception e) {
|
|
|
- connectionFailures++;
|
|
|
- if (tooManyConnectionFailures()) {
|
|
|
- // first time when connection limit reached, report to logs
|
|
|
- LOG.error("Too many connection failures, would not try to connect again.");
|
|
|
- }
|
|
|
- throw new MetricsException("Error creating connection, "
|
|
|
- + serverHost + ":" + serverPort, e);
|
|
|
+ } catch (Exception e) {
|
|
|
+ connectionFailures++;
|
|
|
+ if (tooManyConnectionFailures()) {
|
|
|
+ // first time when connection limit reached, report to logs
|
|
|
+ LOG.error("Too many connection failures, would not try to connect again.");
|
|
|
}
|
|
|
+ throw new MetricsException("Error creating connection, " +
|
|
|
+ serverHost + ":" + serverPort, e);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- public void write(String msg) throws IOException {
|
|
|
- if (!isConnected()) {
|
|
|
- connect();
|
|
|
- }
|
|
|
- if (isConnected()) {
|
|
|
- writer.write(msg);
|
|
|
- }
|
|
|
+ public void write(String msg) throws IOException {
|
|
|
+ if (!isConnected()) {
|
|
|
+ connect();
|
|
|
}
|
|
|
-
|
|
|
- public void flush() throws IOException {
|
|
|
- if (isConnected()) {
|
|
|
- writer.flush();
|
|
|
- }
|
|
|
+ if (isConnected()) {
|
|
|
+ writer.write(msg);
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- public boolean isConnected() {
|
|
|
- return socket != null && socket.isConnected() && !socket.isClosed();
|
|
|
+ public void flush() throws IOException {
|
|
|
+ if (isConnected()) {
|
|
|
+ writer.flush();
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
- public void close() throws IOException {
|
|
|
- try {
|
|
|
- if (writer != null) {
|
|
|
- writer.close();
|
|
|
- }
|
|
|
- } catch (IOException ex) {
|
|
|
- if (socket != null) {
|
|
|
- socket.close();
|
|
|
- }
|
|
|
- } finally {
|
|
|
- socket = null;
|
|
|
- writer = null;
|
|
|
- }
|
|
|
- }
|
|
|
+ public boolean isConnected() {
|
|
|
+ return socket != null && socket.isConnected() && !socket.isClosed();
|
|
|
+ }
|
|
|
|
|
|
- private boolean tooManyConnectionFailures() {
|
|
|
- return connectionFailures > MAX_CONNECTION_FAILURES;
|
|
|
+ public void close() throws IOException {
|
|
|
+ try {
|
|
|
+ if (writer != null) {
|
|
|
+ writer.close();
|
|
|
+ }
|
|
|
+ } catch (IOException ex) {
|
|
|
+ if (socket != null) {
|
|
|
+ socket.close();
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ socket = null;
|
|
|
+ writer = null;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
+ private boolean tooManyConnectionFailures() {
|
|
|
+ return connectionFailures > MAX_CONNECTION_FAILURES;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
+ @VisibleForTesting
|
|
|
+ void setGraphite(Graphite graphite) {
|
|
|
+ this.graphite = graphite;
|
|
|
+ }
|
|
|
}
|