|
@@ -18,13 +18,18 @@
|
|
|
|
|
|
package org.apache.hadoop.metrics2.sink;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.io.OutputStreamWriter;
|
|
|
import java.io.Writer;
|
|
|
+import java.io.Closeable;
|
|
|
import java.net.Socket;
|
|
|
|
|
|
import org.apache.commons.configuration.SubsetConfiguration;
|
|
|
+import org.apache.commons.logging.Log;
|
|
|
+import org.apache.commons.logging.LogFactory;
|
|
|
import org.apache.hadoop.classification.InterfaceAudience;
|
|
|
import org.apache.hadoop.classification.InterfaceStability;
|
|
|
+import org.apache.hadoop.io.IOUtils;
|
|
|
import org.apache.hadoop.metrics2.AbstractMetric;
|
|
|
import org.apache.hadoop.metrics2.MetricsException;
|
|
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
|
@@ -36,12 +41,14 @@ import org.apache.hadoop.metrics2.MetricsTag;
|
|
|
*/
|
|
|
@InterfaceAudience.Public
|
|
|
@InterfaceStability.Evolving
|
|
|
-public class GraphiteSink implements MetricsSink {
|
|
|
+public class GraphiteSink implements MetricsSink, Closeable {
|
|
|
+ private static final Log LOG = LogFactory.getLog(GraphiteSink.class);
|
|
|
private static final String SERVER_HOST_KEY = "server_host";
|
|
|
private static final String SERVER_PORT_KEY = "server_port";
|
|
|
private static final String METRICS_PREFIX = "metrics_prefix";
|
|
|
private Writer writer = null;
|
|
|
private String metricsPrefix = null;
|
|
|
+ private Socket socket = null;
|
|
|
|
|
|
public void setWriter(Writer writer) {
|
|
|
this.writer = writer;
|
|
@@ -60,7 +67,7 @@ public class GraphiteSink implements MetricsSink {
|
|
|
|
|
|
try {
|
|
|
// Open an connection to Graphite server.
|
|
|
- Socket socket = new Socket(serverHost, serverPort);
|
|
|
+ socket = new Socket(serverHost, serverPort);
|
|
|
setWriter(new OutputStreamWriter(socket.getOutputStream()));
|
|
|
} catch (Exception e) {
|
|
|
throw new MetricsException("Error creating connection, "
|
|
@@ -99,7 +106,11 @@ public class GraphiteSink implements MetricsSink {
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- writer.write(lines.toString());
|
|
|
+ if(writer != null){
|
|
|
+ writer.write(lines.toString());
|
|
|
+ } else {
|
|
|
+ throw new MetricsException("Writer in GraphiteSink is null!");
|
|
|
+ }
|
|
|
} catch (Exception e) {
|
|
|
throw new MetricsException("Error sending metrics", e);
|
|
|
}
|
|
@@ -113,4 +124,21 @@ public class GraphiteSink implements MetricsSink {
|
|
|
throw new MetricsException("Error flushing metrics", e);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void close() throws IOException {
|
|
|
+ try {
|
|
|
+ IOUtils.closeStream(writer);
|
|
|
+ writer = null;
|
|
|
+ LOG.info("writer in GraphiteSink is closed!");
|
|
|
+ } catch (Throwable e){
|
|
|
+ throw new MetricsException("Error closing writer", e);
|
|
|
+ } finally {
|
|
|
+ if (socket != null && !socket.isClosed()) {
|
|
|
+ socket.close();
|
|
|
+ socket = null;
|
|
|
+ LOG.info("socket in GraphiteSink is closed!");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|