Selaa lähdekoodia

AMBARI-16440 : Flush metrics to collector if metric system is stopped gracefully in the Sink daemon. (avijayan)

Aravindan Vijayan 9 vuotta sitten
vanhempi
commit
fd85458c78

+ 25 - 0
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/cache/TimelineMetricsCache.java

@@ -22,8 +22,12 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
+import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
 
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -124,6 +128,23 @@ public class TimelineMetricsCache {
       return timelineMetric;
     }
 
+    public TimelineMetrics evictAll() {
+      List<TimelineMetric> metricList = new ArrayList<TimelineMetric>();
+
+      for (Iterator<Map.Entry<String, TimelineMetricWrapper>> it = this.entrySet().iterator(); it.hasNext();) {
+        Map.Entry<String, TimelineMetricWrapper> cacheEntry = it.next();
+        TimelineMetricWrapper metricWrapper = cacheEntry.getValue();
+        if (metricWrapper != null) {
+          TimelineMetric timelineMetric = cacheEntry.getValue().getTimelineMetric();
+          metricList.add(timelineMetric);
+        }
+        it.remove();
+      }
+      TimelineMetrics timelineMetrics = new TimelineMetrics();
+      timelineMetrics.setMetrics(metricList);
+      return timelineMetrics;
+    }
+
     public void put(String metricName, TimelineMetric timelineMetric) {
       if (isDuplicate(timelineMetric)) {
         return;
@@ -157,6 +178,10 @@ public class TimelineMetricsCache {
     return null;
   }
 
+  public TimelineMetrics getAllMetrics() {
+    return timelineMetricCache.evictAll();
+  }
+
   /**
    * Getter method to help testing eviction
    * @return @int

+ 20 - 1
ambari-metrics/ambari-metrics-hadoop-sink/src/main/java/org/apache/hadoop/metrics2/sink/timeline/HadoopTimelineMetricsSink.java

@@ -31,6 +31,7 @@ import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
 import org.apache.hadoop.metrics2.util.Servers;
 import org.apache.hadoop.net.DNS;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.HttpURLConnection;
@@ -44,10 +45,12 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
-public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink {
+public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink implements MetricsSink, Closeable {
   private Map<String, Set<String>> useTagsMap = new HashMap<String, Set<String>>();
   private TimelineMetricsCache metricsCache;
   private String hostName = "UNKNOWN.example.com";
@@ -63,6 +66,8 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
   private SubsetConfiguration conf;
   // Cache the rpc port used and the suffix to use if the port tag is found
   private Map<String, String> rpcPortSuffixes = new HashMap<>(10);
+  private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+
 
   @Override
   public void init(SubsetConfiguration conf) {
@@ -386,4 +391,18 @@ public class HadoopTimelineMetricsSink extends AbstractTimelineMetricsSink imple
     // TODO: Buffering implementation
   }
 
+  @Override
+  public void close() throws IOException {
+
+    executorService.submit(new Runnable() {
+      @Override
+      public void run() {
+        LOG.info("Closing HadoopTimelineMetricSink. Flushing metrics to collector...");
+        TimelineMetrics metrics = metricsCache.getAllMetrics();
+        if (metrics != null) {
+          emitMetrics(metrics);
+        }
+      }
+    });
+  }
 }