瀏覽代碼

AMBARI-10904. Provide a configurable timeout setting on MetricsTimelineSink.emitMetrics. (swagle)

Siddharth Wagle 10 年之前
父節點
當前提交
5226ae1be4

+ 8 - 3
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -17,26 +17,28 @@
  */
 package org.apache.hadoop.metrics2.sink.timeline;
 
-import java.io.IOException;
-import java.net.ConnectException;
-
 import org.apache.commons.httpclient.HttpClient;
 import org.apache.commons.httpclient.methods.PostMethod;
 import org.apache.commons.httpclient.methods.StringRequestEntity;
+import org.apache.commons.httpclient.params.HttpMethodParams;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.codehaus.jackson.map.AnnotationIntrospector;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 import org.codehaus.jackson.xc.JaxbAnnotationIntrospector;
+import java.io.IOException;
+import java.net.ConnectException;
 
 public abstract class AbstractTimelineMetricsSink {
   public static final String TAGS_FOR_PREFIX_PROPERTY_PREFIX = "tagsForPrefix.";
   public static final String MAX_METRIC_ROW_CACHE_SIZE = "maxRowCacheSize";
   public static final String METRICS_SEND_INTERVAL = "sendInterval";
+  public static final String METRICS_POST_TIMEOUT_SECONDS = "timeout";
   public static final String COLLECTOR_HOST_PROPERTY = "collector";
   public static final String COLLECTOR_PORT_PROPERTY = "port";
 
+  protected static final int DEFAULT_POST_TIMEOUT_SECONDS = 10;
   protected final Log LOG;
   private HttpClient httpClient = new HttpClient();
 
@@ -63,6 +65,7 @@ public abstract class AbstractTimelineMetricsSink {
 
       PostMethod postMethod = new PostMethod(connectUrl);
       postMethod.setRequestEntity(requestEntity);
+      postMethod.setParameter(HttpMethodParams.SO_TIMEOUT, String.valueOf(getTimeoutSeconds() * 1000));
       int statusCode = httpClient.executeMethod(postMethod);
       if (statusCode != 200) {
         LOG.info("Unable to POST metrics to collector, " + connectUrl);
@@ -79,4 +82,6 @@ public abstract class AbstractTimelineMetricsSink {
   }
 
   abstract protected String getCollectorUri();
+
+  abstract protected int getTimeoutSeconds();
 }

+ 6 - 0
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java

@@ -68,6 +68,12 @@ public class HandleConnectExceptionTest {
     protected String getCollectorUri() {
       return COLLECTOR_URL;
     }
+
+    @Override
+    protected int getTimeoutSeconds() {
+      return 10;
+    }
+
     @Override
     public void emitMetrics(TimelineMetrics metrics) throws IOException {
       super.emitMetrics(metrics);

+ 9 - 4
ambari-metrics/ambari-metrics-flume-sink/src/main/java/org/apache/hadoop/metrics2/sink/flume/FlumeTimelineMetricsSink.java

@@ -24,16 +24,15 @@ import org.apache.flume.Context;
 import org.apache.flume.FlumeException;
 import org.apache.flume.instrumentation.MonitorService;
 import org.apache.flume.instrumentation.util.JMXPollUtil;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
 import org.apache.hadoop.metrics2.sink.timeline.UnableToConnectException;
 import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.sink.timeline.configuration.Configuration;
 
 import java.io.IOException;
 import java.net.InetAddress;
-import java.net.SocketAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -45,8 +44,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
-
-
 public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implements MonitorService {
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
@@ -55,6 +52,7 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
   private String hostname;
   private final static String COUNTER_METRICS_PROPERTY = "counters";
   private final Set<String> counterMetrics = new HashSet<String>();
+  private int timeoutSeconds = 10;
 
   @Override
   public void start() {
@@ -83,6 +81,8 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
       throw new FlumeException("Could not identify hostname.", e);
     }
     Configuration configuration = new Configuration("/flume-metrics2.properties");
+    timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
+        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
     int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
         String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
     int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
@@ -102,6 +102,11 @@ public class FlumeTimelineMetricsSink extends AbstractTimelineMetricsSink implem
     return collectorUri;
   }
 
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
   public void setPollFrequency(long pollFrequency) {
     this.pollFrequency = pollFrequency;
   }

+ 8 - 0
ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java

@@ -56,6 +56,7 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   private String collectorUri;
   private static final String SERVICE_NAME_PREFIX = "serviceName-prefix";
   private static final String SERVICE_NAME = "serviceName";
+  private int timeoutSeconds = 10;
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -91,6 +92,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
 
     LOG.info("Collector Uri: " + collectorUri);
 
+    timeoutSeconds = conf.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS);
+
     int maxRowCacheSize = conf.getInt(MAX_METRIC_ROW_CACHE_SIZE,
       TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT);
     int metricsSendInterval = conf.getInt(METRICS_SEND_INTERVAL,
@@ -150,6 +153,11 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     return collectorUri;
   }
 
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
   @Override
   public void putMetrics(MetricsRecord record) {
     try {

+ 33 - 27
ambari-metrics/ambari-metrics-kafka-sink/src/main/java/org/apache/hadoop/metrics2/sink/kafka/KafkaTimelineMetricsReporter.java

@@ -18,27 +18,6 @@
 
 package org.apache.hadoop.metrics2.sink.kafka;
 
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
-import kafka.metrics.KafkaMetricsConfig;
-import kafka.metrics.KafkaMetricsReporter;
-import kafka.utils.VerifiableProperties;
-
-import org.apache.commons.lang.ClassUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
-import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
-import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
-
 import com.yammer.metrics.Metrics;
 import com.yammer.metrics.core.Counter;
 import com.yammer.metrics.core.Gauge;
@@ -51,9 +30,31 @@ import com.yammer.metrics.core.MetricsRegistry;
 import com.yammer.metrics.core.Summarizable;
 import com.yammer.metrics.core.Timer;
 import com.yammer.metrics.stats.Snapshot;
+import kafka.metrics.KafkaMetricsConfig;
+import kafka.metrics.KafkaMetricsReporter;
+import kafka.utils.VerifiableProperties;
+import org.apache.commons.lang.ClassUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS;
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT;
 
-public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink implements KafkaMetricsReporter,
-    KafkaTimelineMetricsReporterMBean {
+public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink
+    implements KafkaMetricsReporter, KafkaTimelineMetricsReporterMBean {
 
   private final static Log LOG = LogFactory.getLog(KafkaTimelineMetricsReporter.class);
 
@@ -72,12 +73,18 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
   private String hostname;
   private TimelineScheduledReporter reporter;
   private TimelineMetricsCache metricsCache;
+  private int timeoutSeconds = 10;
 
   @Override
   protected String getCollectorUri() {
     return collectorUri;
   }
 
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
   public void setMetricsCache(TimelineMetricsCache metricsCache) {
     this.metricsCache = metricsCache;
   }
@@ -93,10 +100,9 @@ public class KafkaTimelineMetricsReporter extends AbstractTimelineMetricsSink im
           throw new RuntimeException("Could not identify hostname.", e);
         }
         KafkaMetricsConfig metricsConfig = new KafkaMetricsConfig(props);
-        int metricsSendInterval = Integer.parseInt(props.getString(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
-        int maxRowCacheSize = Integer.parseInt(props.getString(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY,
-            String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+        timeoutSeconds = props.getInt(METRICS_POST_TIMEOUT_SECONDS, DEFAULT_POST_TIMEOUT_SECONDS);
+        int metricsSendInterval = props.getInt(TIMELINE_METRICS_SEND_INTERVAL_PROPERTY, MAX_EVICTION_TIME_MILLIS);
+        int maxRowCacheSize = props.getInt(TIMELINE_METRICS_MAX_ROW_CACHE_SIZE_PROPERTY, MAX_RECS_PER_NAME_DEFAULT);
         String metricCollectorHost = props.getString(TIMELINE_HOST_PROPERTY, TIMELINE_DEFAULT_HOST);
         String metricCollectorPort = props.getString(TIMELINE_PORT_PROPERTY, TIMELINE_DEFAULT_PORT);
         setMetricsCache(new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval));

+ 9 - 0
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsReporter.java

@@ -49,6 +49,7 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
   private String collectorUri;
   private NimbusClient nimbusClient;
   private String applicationId;
+  private int timeoutSeconds;
 
   public StormTimelineMetricsReporter() {
 
@@ -59,6 +60,11 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
     return this.collectorUri;
   }
 
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
   @Override
   public void prepare(Map conf) {
     LOG.info("Preparing Storm Metrics Reporter");
@@ -75,6 +81,9 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
       this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
       String collectorHostname = cf.get(COLLECTOR_HOST).toString();
       String port = cf.get(COLLECTOR_PORT).toString();
+      timeoutSeconds = cf.get(METRICS_POST_TIMEOUT_SECONDS) != null ?
+        Integer.parseInt(cf.get(METRICS_POST_TIMEOUT_SECONDS).toString()) :
+        DEFAULT_POST_TIMEOUT_SECONDS;
       applicationId = cf.get(APP_ID).toString();
       collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
     } catch (Exception e) {

+ 12 - 2
ambari-metrics/ambari-metrics-storm-sink/src/main/java/org/apache/hadoop/metrics2/sink/storm/StormTimelineMetricsSink.java

@@ -39,16 +39,24 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache.*;
+
 public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implements IMetricsConsumer {
   private String collectorUri;
   private TimelineMetricsCache metricsCache;
   private String hostname;
+  private int timeoutSeconds;
 
   @Override
   protected String getCollectorUri() {
     return collectorUri;
   }
 
+  @Override
+  protected int getTimeoutSeconds() {
+    return timeoutSeconds;
+  }
+
   @Override
   public void prepare(Map map, Object o, TopologyContext topologyContext, IErrorReporter iErrorReporter) {
     LOG.info("Preparing Storm Metrics Sink");
@@ -59,10 +67,12 @@ public class StormTimelineMetricsSink extends AbstractTimelineMetricsSink implem
       throw new RuntimeException("Could not identify hostname.", e);
     }
     Configuration configuration = new Configuration("/storm-metrics2.properties");
+    timeoutSeconds = Integer.parseInt(configuration.getProperty(METRICS_POST_TIMEOUT_SECONDS,
+        String.valueOf(DEFAULT_POST_TIMEOUT_SECONDS)));
     int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
-        String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
+        String.valueOf(MAX_RECS_PER_NAME_DEFAULT)));
     int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
-        String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
+        String.valueOf(MAX_EVICTION_TIME_MILLIS)));
     metricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
     collectorUri = "http://" + configuration.getProperty(COLLECTOR_HOST_PROPERTY) + ":" + configuration.getProperty(COLLECTOR_PORT_PROPERTY) + "/ws/v1/timeline/metrics";
   }