Browse Source

HADOOP 9704. Write metrics sink plugin for Hadoop/Graphite (Chu Tong, Alex Newman and Babak Behzad via raviprak)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1599413 13f79535-47bb-0310-9956-ffa450edef68
Ravi Prakash 11 years ago
parent
commit
ad5d0d7167

+ 2 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -371,6 +371,8 @@ Release 2.5.0 - UNRELEASED
 
     HADOOP-10498. Add support for proxy server. (daryn)
 
+    HADOOP-9704. Write metrics sink plugin for Hadoop/Graphite (Chu Tong, Alex Newman and Babak Behzad via raviprak)
+
   IMPROVEMENTS
 
     HADOOP-10451. Remove unused field and imports from SaslRpcServer.

+ 116 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java

@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.sink;
+
+import java.io.OutputStreamWriter;
+import java.io.Writer;
+import java.net.Socket;
+
+import org.apache.commons.configuration.SubsetConfiguration;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsException;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsSink;
+import org.apache.hadoop.metrics2.MetricsTag;
+
+/**
+ * A metrics sink that writes to a Graphite server
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class GraphiteSink implements MetricsSink {
+    private static final String SERVER_HOST_KEY = "server_host";
+    private static final String SERVER_PORT_KEY = "server_port";
+    private static final String METRICS_PREFIX = "metrics_prefix";
+    private Writer writer = null;
+    private String metricsPrefix = null;
+
+    public void setWriter(Writer writer) {
+        this.writer = writer;
+    }
+
+    @Override
+    public void init(SubsetConfiguration conf) {
+        // Get Graphite host configurations.
+        String serverHost = conf.getString(SERVER_HOST_KEY);
+        Integer serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY));
+
+        // Get Graphite metrics graph prefix.
+        metricsPrefix = conf.getString(METRICS_PREFIX);
+        if (metricsPrefix == null)
+            metricsPrefix = "";
+
+        try {
+            // Open an connection to Graphite server.
+            Socket socket = new Socket(serverHost, serverPort);
+            setWriter(new OutputStreamWriter(socket.getOutputStream()));
+        } catch (Exception e) {
+            throw new MetricsException("Error creating connection, "
+                    + serverHost + ":" + serverPort, e);
+        }
+    }
+
+    @Override
+    public void putMetrics(MetricsRecord record) {
+        StringBuilder lines = new StringBuilder();
+        StringBuilder metricsPathPrefix = new StringBuilder();
+
+        // Configure the hierarchical place to display the graph.
+        metricsPathPrefix.append(metricsPrefix).append(".")
+                .append(record.context()).append(".").append(record.name());
+
+        for (MetricsTag tag : record.tags()) {
+            if (tag.value() != null) {
+                metricsPathPrefix.append(".");
+                metricsPathPrefix.append(tag.name());
+                metricsPathPrefix.append("=");
+                metricsPathPrefix.append(tag.value());
+            }
+        }
+
+        // Round the timestamp to second as Graphite accepts it in such format.
+        int timestamp = Math.round(record.timestamp() / 1000.0f);
+
+        // Collect datapoints.
+        for (AbstractMetric metric : record.metrics()) {
+            lines.append(
+                    metricsPathPrefix.toString() + "."
+                            + metric.name().replace(' ', '.')).append(" ")
+                    .append(metric.value()).append(" ").append(timestamp)
+                    .append("\n");
+        }
+
+        try {
+            writer.write(lines.toString());
+        } catch (Exception e) {
+            throw new MetricsException("Error sending metrics", e);
+        }
+    }
+
+    @Override
+    public void flush() {
+        try {
+            writer.flush();
+        } catch (Exception e) {
+            throw new MetricsException("Error flushing metrics", e);
+        }
+    }
+}

+ 110 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.metrics2.impl;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import static org.mockito.Mockito.*;
+
+import org.apache.hadoop.metrics2.AbstractMetric;
+import org.apache.hadoop.metrics2.MetricsRecord;
+import org.apache.hadoop.metrics2.MetricsTag;
+import org.apache.hadoop.metrics2.sink.GraphiteSink;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+public class TestGraphiteMetrics {
+    private AbstractMetric makeMetric(String name, Number value) {
+        AbstractMetric metric = mock(AbstractMetric.class);
+        when(metric.name()).thenReturn(name);
+        when(metric.value()).thenReturn(value);
+        return metric;
+    }
+
+    @Test
+    public void testPutMetrics() {
+        GraphiteSink sink = new GraphiteSink();
+        List<MetricsTag> tags = new ArrayList<MetricsTag>();
+        tags.add(new MetricsTag(MsInfo.Context, "all"));
+        tags.add(new MetricsTag(MsInfo.Hostname, "host"));
+        Set<AbstractMetric> metrics = new HashSet<AbstractMetric>();
+        metrics.add(makeMetric("foo1", 1.25));
+        metrics.add(makeMetric("foo2", 2.25));
+        MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics);
+
+        OutputStreamWriter writer = mock(OutputStreamWriter.class);
+        ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+
+        sink.setWriter(writer);
+        sink.putMetrics(record);
+
+        try {
+            verify(writer).write(argument.capture());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        String result = argument.getValue().toString();
+
+        assertEquals(true,
+            result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" +
+            "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") ||
+            result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + 
+            "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n"));
+    }
+
+    @Test
+    public void testPutMetrics2() {
+        GraphiteSink sink = new GraphiteSink();
+        List<MetricsTag> tags = new ArrayList<MetricsTag>();
+        tags.add(new MetricsTag(MsInfo.Context, "all"));
+        tags.add(new MetricsTag(MsInfo.Hostname, null));
+        Set<AbstractMetric> metrics = new HashSet<AbstractMetric>();
+        metrics.add(makeMetric("foo1", 1));
+        metrics.add(makeMetric("foo2", 2));
+        MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics);
+
+        OutputStreamWriter writer = mock(OutputStreamWriter.class);
+        ArgumentCaptor<String> argument = ArgumentCaptor.forClass(String.class);
+
+        sink.setWriter(writer);
+        sink.putMetrics(record);
+
+        try {
+            verify(writer).write(argument.capture());
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        String result = argument.getValue().toString();
+
+        assertEquals(true,
+            result.equals("null.all.Context.Context=all.foo1 1 10\n" + 
+            "null.all.Context.Context=all.foo2 2 10\n") ||
+            result.equals("null.all.Context.Context=all.foo2 2 10\n" + 
+            "null.all.Context.Context=all.foo1 1 10\n"));
+    }
+}