Bläddra i källkod

AMBARI-9970. Metrics are absent for Storm. (mpapyrkovskyy via swagle)

Siddharth Wagle 10 år sedan
förälder
incheckning
61839b3dcf

+ 8 - 18
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -19,10 +19,6 @@ package org.apache.hadoop.metrics2.sink.timeline;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.ConnectException;
-import java.net.SocketAddress;
-
-import java.io.IOException;
-import java.net.SocketAddress;
 
 
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.PostMethod;
@@ -63,19 +59,15 @@ public abstract class AbstractTimelineMetricsSink {
     try {
     try {
       String jsonData = mapper.writeValueAsString(metrics);
       String jsonData = mapper.writeValueAsString(metrics);
 
 
-      SocketAddress socketAddress = getServerSocketAddress();
+      StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
 
 
-      if (socketAddress != null) {
-        StringRequestEntity requestEntity = new StringRequestEntity(jsonData, "application/json", "UTF-8");
-        
-        PostMethod postMethod = new PostMethod(connectUrl);
-        postMethod.setRequestEntity(requestEntity);
-        int statusCode = httpClient.executeMethod(postMethod);
-        if (statusCode != 200) {
-          LOG.info("Unable to POST metrics to collector, " + connectUrl);
-        } else {
-          LOG.debug("Metrics posted to Collector " + connectUrl);
-        }
+      PostMethod postMethod = new PostMethod(connectUrl);
+      postMethod.setRequestEntity(requestEntity);
+      int statusCode = httpClient.executeMethod(postMethod);
+      if (statusCode != 200) {
+        LOG.info("Unable to POST metrics to collector, " + connectUrl);
+      } else {
+        LOG.debug("Metrics posted to Collector " + connectUrl);
       }
       }
     } catch (ConnectException e) {
     } catch (ConnectException e) {
       throw new UnableToConnectException(e).setConnectUrl(connectUrl);
       throw new UnableToConnectException(e).setConnectUrl(connectUrl);
@@ -86,7 +78,5 @@ public abstract class AbstractTimelineMetricsSink {
     this.httpClient = httpClient;
     this.httpClient = httpClient;
   }
   }
 
 
-  abstract protected SocketAddress getServerSocketAddress();
-
   abstract protected String getCollectorUri();
   abstract protected String getCollectorUri();
 }
 }

+ 0 - 4
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java

@@ -64,10 +64,6 @@ public class HandleConnectExceptionTest {
     }
     }
   }
   }
   class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
   class TestTimelineMetricsSink extends AbstractTimelineMetricsSink{
-    @Override
-    protected SocketAddress getServerSocketAddress() {
-      return new InetSocketAddress("host", 13);
-    }
     @Override
     @Override
     protected String getCollectorUri() {
     protected String getCollectorUri() {
       return COLLECTOR_URL;
       return COLLECTOR_URL;

+ 0 - 13
ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java

@@ -30,11 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.sink.util.Servers;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.ArrayList;
@@ -50,7 +48,6 @@ import java.util.concurrent.TimeUnit;
 
 
 
 
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
-  private SocketAddress socketAddress;
   private String collectorUri;
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private TimelineMetricsCache metricsCache;
   private ScheduledExecutorService scheduledExecutorService;
   private ScheduledExecutorService scheduledExecutorService;
@@ -94,22 +91,12 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     String collectorHostname = configuration.getProperty(COLLECTOR_HOST_PROPERTY);
     String collectorHostname = configuration.getProperty(COLLECTOR_HOST_PROPERTY);
     String port = configuration.getProperty(COLLECTOR_PORT_PROPERTY);
     String port = configuration.getProperty(COLLECTOR_PORT_PROPERTY);
     collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
     collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
-    List<InetSocketAddress> socketAddresses =
-      Servers.parse(collectorHostname, Integer.valueOf(port));
-    if (socketAddresses != null && !socketAddresses.isEmpty()) {
-      socketAddress = socketAddresses.get(0);
-    }
     pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
     pollFrequency = Long.parseLong(configuration.getProperty("collectionFrequency"));
 
 
     String[] metrics = configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
     String[] metrics = configuration.getProperty(COUNTER_METRICS_PROPERTY).trim().split(",");
     Collections.addAll(counterMetrics, metrics);
     Collections.addAll(counterMetrics, metrics);
   }
   }
 
 
-  @Override
-  public SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
   @Override
   @Override
   public String getCollectorUri() {
   public String getCollectorUri() {
     return collectorUri;
     return collectorUri;

+ 0 - 7
ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java

@@ -131,13 +131,6 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     return conf.getPrefix();
     return conf.getPrefix();
   }
   }
 
 
-  protected SocketAddress getServerSocketAddress() {
-    if (metricsServers != null && !metricsServers.isEmpty()) {
-      return metricsServers.get(0);
-    }
-    return null;
-  }
-
   @Override
   @Override
   protected String getCollectorUri() {
   protected String getCollectorUri() {
     return collectorUri;
     return collectorUri;

+ 0 - 15
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -20,8 +20,6 @@ package org.apache.hadoop.metrics2.sink.kafka;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
@@ -40,7 +38,6 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-import org.apache.hadoop.metrics2.sink.util.Servers;
 
 
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Counter;
@@ -73,15 +70,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
   private final Object lock = new Object();
   private final Object lock = new Object();
   private String collectorUri;
   private String collectorUri;
   private String hostname;
   private String hostname;
-  private SocketAddress socketAddress;
   private TimelineScheduledReporter reporter;
   private TimelineScheduledReporter reporter;
   private TimelineMetricsCache metricsCache;
   private TimelineMetricsCache metricsCache;
 
 
-  @Override
-  protected SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
   @Override
   @Override
   protected String getCollectorUri() {
   protected String getCollectorUri() {
     return collectorUri;
     return collectorUri;
@@ -110,18 +101,12 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
         String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
         String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));
         collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
         collectorUri = "http://" + metricCollectorHost + ":" + metricCollectorPort + "/ws/v1/timeline/metrics";
-        List<InetSocketAddress> socketAddresses = Servers.parse(metricCollectorHost,
-            Integer.parseInt(metricCollectorPort));
-        if (socketAddresses != null && !socketAddresses.isEmpty()) {
-          socketAddress = socketAddresses.get(0);
-        }
         initializeReporter();
         initializeReporter();
         if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
         if (props.getBoolean(TIMELINE_REPORTER_ENABLED_PROPERTY, false)) {
           startReporter(metricsConfig.pollingIntervalSecs());
           startReporter(metricsConfig.pollingIntervalSecs());
         }
         }
         if (LOG.isTraceEnabled()) {
         if (LOG.isTraceEnabled()) {
           LOG.trace("CollectorUri = " + collectorUri);
           LOG.trace("CollectorUri = " + collectorUri);
-          LOG.trace("SocketAddress = " + socketAddress);
           LOG.trace("MetricsSendInterval = " + metricsSendInterval);
           LOG.trace("MetricsSendInterval = " + metricsSendInterval);
           LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
           LOG.trace("MaxRowCacheSize = " + maxRowCacheSize);
         }
         }

+ 6 - 15
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -29,11 +29,9 @@ import org.apache.commons.lang.Validate;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.util.Servers;
+import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 
 
 import java.net.InetAddress;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
@@ -48,7 +46,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   public static final String APP_ID = "appId";
   public static final String APP_ID = "appId";
 
 
   private String hostname;
   private String hostname;
-  private SocketAddress socketAddress;
   private String collectorUri;
   private String collectorUri;
   private NimbusClient nimbusClient;
   private NimbusClient nimbusClient;
   private String applicationId;
   private String applicationId;
@@ -57,11 +54,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
 
 
   }
   }
 
 
-  @Override
-  protected SocketAddress getServerSocketAddress() {
-    return this.socketAddress;
-  }
-
   @Override
   @Override
   protected String getCollectorUri() {
   protected String getCollectorUri() {
     return this.collectorUri;
     return this.collectorUri;
@@ -85,11 +77,6 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       String port = cf.get(COLLECTOR_PORT).toString();
       String port = cf.get(COLLECTOR_PORT).toString();
       applicationId = cf.get(APP_ID).toString();
       applicationId = cf.get(APP_ID).toString();
       collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
       collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
-      List<InetSocketAddress> socketAddresses =
-        Servers.parse(collectorHostname, Integer.valueOf(port));
-      if (socketAddresses != null && !socketAddresses.isEmpty()) {
-        socketAddress = socketAddresses.get(0);
-      }
     } catch (Exception e) {
     } catch (Exception e) {
       LOG.warn("Could not initialize metrics collector, please specify host, " +
       LOG.warn("Could not initialize metrics collector, please specify host, " +
         "port under $STORM_HOME/conf/config.yaml ", e);
         "port under $STORM_HOME/conf/config.yaml ", e);
@@ -139,7 +126,11 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
     TimelineMetrics timelineMetrics = new TimelineMetrics();
     TimelineMetrics timelineMetrics = new TimelineMetrics();
     timelineMetrics.setMetrics(totalMetrics);
     timelineMetrics.setMetrics(totalMetrics);
 
 
-    emitMetrics(timelineMetrics);
+    try {
+      emitMetrics(timelineMetrics);
+    } catch (UnableToConnectException e) {
+      LOG.warn("Unable to connect to Metrics Collector " + e.getConnectUrl() + ". " + e.getMessage());
+    }
 
 
   }
   }
 
 

+ 0 - 17
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -30,12 +30,9 @@ import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
-import org.apache.hadoop.metrics2.sink.util.Servers;
 
 
 import java.io.IOException;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collection;
@@ -43,16 +40,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map;
 
 
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
-  private SocketAddress socketAddress;
   private String collectorUri;
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private TimelineMetricsCache metricsCache;
   private String hostname;
   private String hostname;
 
 
-  @Override
-  protected SocketAddress getServerSocketAddress() {
-    return socketAddress;
-  }
-
   @Override
   @Override
   protected String getCollectorUri() {
   protected String getCollectorUri() {
     return collectorUri;
     return collectorUri;
@@ -74,11 +65,6 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
         String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
         String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
     collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics";
     collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics";
-    List<InetSocketAddress> socketAddresses =
-        Servers.parse(configuration.getProperty(configuration.getProperty(COLLECTOR_HOST_PROPERTY)), Integer.valueOf(configuration.getProperty(COLLECTOR_PORT_PROPERTY)));
-    if (socketAddresses != null && !socketAddresses.isEmpty()) {
-      socketAddress = socketAddresses.get(0);
-    }
   }
   }
 
 
   @Override
   @Override
@@ -134,7 +120,4 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     this.metricsCache = metricsCache;
     this.metricsCache = metricsCache;
   }
   }
 
 
-  public void setServerSocketAddress(SocketAddress socketAddress) {
-    this.socketAddress = socketAddress;
-  }
 }
 }

+ 0 - 1
ambari-metrics/ambari-metrics-storm-sink/src/test/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSinkTest.java

@@ -64,7 +64,6 @@ public class StormTimelineMetricsSinkTest {
     HttpClient httpClient = createNiceMock(HttpClient.class);
     HttpClient httpClient = createNiceMock(HttpClient.class);
     stormTimelineMetricsSink.setHttpClient(httpClient);
     stormTimelineMetricsSink.setHttpClient(httpClient);
     expect(httpClient.executeMethod(anyObject(PostMethod.class))).andReturn(200).once();
     expect(httpClient.executeMethod(anyObject(PostMethod.class))).andReturn(200).once();
-    stormTimelineMetricsSink.setServerSocketAddress(createNiceMock(SocketAddress.class));
     replay(timelineMetricsCache, httpClient);
     replay(timelineMetricsCache, httpClient);
     stormTimelineMetricsSink.handleDataPoints(
     stormTimelineMetricsSink.handleDataPoints(
         new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),
         new IMetricsConsumer.TaskInfo("localhost", 1234, "testComponent", 42, 20000L, 60),