浏览代码

AMBARI-16628. Improve LogSearch Solr Metric loader stability (Miklos Gergely via oleewrere)

Miklos Gergely 9 年之前
父节点
当前提交
114819b0c8

+ 2 - 2
ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/LogFeederAMSClient.java

@@ -67,8 +67,8 @@ public class LogFeederAMSClient extends AbstractTimelineMetricsSink {
   }
   }
 
 
   @Override
   @Override
-  protected void emitMetrics(TimelineMetrics metrics) {
-    super.emitMetrics(metrics);
+  protected boolean emitMetrics(TimelineMetrics metrics) {
+    return super.emitMetrics(metrics);
   }
   }
 
 
 }
 }

+ 2 - 2
ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrAmsClient.java

@@ -40,7 +40,7 @@ public class SolrAmsClient extends AbstractTimelineMetricsSink {
   }
   }
 
 
   @Override
   @Override
-  protected void emitMetrics(TimelineMetrics metrics) {
-    super.emitMetrics(metrics);
+  protected boolean emitMetrics(TimelineMetrics metrics) {
+    return super.emitMetrics(metrics);
   }
   }
 }
 }

+ 52 - 22
ambari-logsearch/ambari-logsearch-portal/src/main/java/org/apache/ambari/logsearch/solr/metrics/SolrMetricsLoader.java

@@ -23,6 +23,7 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Map;
 import java.util.Timer;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.TimerTask;
+import java.util.TreeMap;
 
 
 import javax.management.MalformedObjectNameException;
 import javax.management.MalformedObjectNameException;
 
 
@@ -38,9 +39,14 @@ import org.slf4j.LoggerFactory;
 public class SolrMetricsLoader extends TimerTask {
 public class SolrMetricsLoader extends TimerTask {
   private static final Logger LOG = LoggerFactory.getLogger(SolrMetricsLoader.class);
   private static final Logger LOG = LoggerFactory.getLogger(SolrMetricsLoader.class);
 
 
+  private static final int RETRY = 3;
+  private static final int MAX_METRIC_SIZE= 1000;
+
   private final String solrHost;
   private final String solrHost;
   private final SolrJmxAdapter solrJmxAdapter;
   private final SolrJmxAdapter solrJmxAdapter;
   private final SolrAmsClient solrAmsClient;
   private final SolrAmsClient solrAmsClient;
+  
+  private final TimelineMetrics metrics = new TimelineMetrics();
 
 
   public SolrMetricsLoader(String solrHost, int solrJmxPort, String collectorHost) throws IOException {
   public SolrMetricsLoader(String solrHost, int solrJmxPort, String collectorHost) throws IOException {
     this.solrHost = solrHost;
     this.solrHost = solrHost;
@@ -54,21 +60,20 @@ public class SolrMetricsLoader extends TimerTask {
   public void run() {
   public void run() {
     LOG.info("Loading Solr Metrics for the host " + solrHost);
     LOG.info("Loading Solr Metrics for the host " + solrHost);
 
 
-    TimelineMetrics metrics = new TimelineMetrics();
-
-    addCpuUsageMetric(metrics);
-    addHeapMemoryUsageMetric(metrics);
-    addIndexSizeMetric(metrics);
+    addCpuUsageMetric();
+    addHeapMemoryUsageMetric();
+    addIndexSizeMetric();
 
 
-    solrAmsClient.emitMetrics(metrics);
+    emitMetrics();
+    removeOverTheLimitMetrics();
   }
   }
 
 
-  private void addCpuUsageMetric(TimelineMetrics metrics) {
+  private void addCpuUsageMetric() {
     Exception lastException = null;
     Exception lastException = null;
-    for (int retries = 0; retries < 3; retries++) {
+    for (int retries = 0; retries < RETRY; retries++) {
       try {
       try {
         double processCpuLoad = solrJmxAdapter.getProcessCpuLoad();
         double processCpuLoad = solrJmxAdapter.getProcessCpuLoad();
-        addMetric("logsearch.solr.cpu.usage", "Float", processCpuLoad, metrics);
+        addMetric("logsearch.solr.cpu.usage", "Float", processCpuLoad);
         return;
         return;
       } catch (MalformedObjectNameException e) {
       } catch (MalformedObjectNameException e) {
         lastException = e;
         lastException = e;
@@ -82,17 +87,17 @@ public class SolrMetricsLoader extends TimerTask {
     LOG.info("Could not load solr cpu usage metric, last exception:", lastException);
     LOG.info("Could not load solr cpu usage metric, last exception:", lastException);
   }
   }
 
 
-  private void addHeapMemoryUsageMetric(TimelineMetrics metrics) {
+  private void addHeapMemoryUsageMetric() {
     Exception lastException = null;
     Exception lastException = null;
-    for (int retries = 0; retries < 3; retries++) {
+    for (int retries = 0; retries < RETRY; retries++) {
       try {
       try {
         Map<String, Long> memoryData = solrJmxAdapter.getMemoryData();
         Map<String, Long> memoryData = solrJmxAdapter.getMemoryData();
-        addMetric("jvm.JvmMetrics.MemHeapUsedM", "Long", memoryData.get("heapMemoryUsed").doubleValue() / 1024 / 1024, metrics);
-        addMetric("jvm.JvmMetrics.MemHeapCommittedM", "Long", memoryData.get("heapMemoryCommitted").doubleValue() / 1024 / 1024, metrics);
-        addMetric("jvm.JvmMetrics.MemHeapMaxM", "Long", memoryData.get("heapMemoryMax").doubleValue() / 1024 / 1024, metrics);
-        addMetric("jvm.JvmMetrics.MemNonHeapUsedM", "Long", memoryData.get("nonHeapMemoryUsed").doubleValue() / 1024 / 1024, metrics);
-        addMetric("jvm.JvmMetrics.MemNonHeapCommittedM", "Long", memoryData.get("nonHeapMemoryCommitted").doubleValue() / 1024 / 1024, metrics);
-        addMetric("jvm.JvmMetrics.MemNonHeapMaxM", "Long", memoryData.get("nonHeapMemoryMax").doubleValue() / 1024 / 1024, metrics);
+        addMetric("jvm.JvmMetrics.MemHeapUsedM", "Long", memoryData.get("heapMemoryUsed").doubleValue() / 1024 / 1024);
+        addMetric("jvm.JvmMetrics.MemHeapCommittedM", "Long", memoryData.get("heapMemoryCommitted").doubleValue() / 1024 / 1024);
+        addMetric("jvm.JvmMetrics.MemHeapMaxM", "Long", memoryData.get("heapMemoryMax").doubleValue() / 1024 / 1024);
+        addMetric("jvm.JvmMetrics.MemNonHeapUsedM", "Long", memoryData.get("nonHeapMemoryUsed").doubleValue() / 1024 / 1024);
+        addMetric("jvm.JvmMetrics.MemNonHeapCommittedM", "Long", memoryData.get("nonHeapMemoryCommitted").doubleValue() / 1024 / 1024);
+        addMetric("jvm.JvmMetrics.MemNonHeapMaxM", "Long", memoryData.get("nonHeapMemoryMax").doubleValue() / 1024 / 1024);
         return;
         return;
       } catch (MalformedObjectNameException e) {
       } catch (MalformedObjectNameException e) {
         lastException = e;
         lastException = e;
@@ -106,12 +111,12 @@ public class SolrMetricsLoader extends TimerTask {
     LOG.info("Could not load solr heap memory usage metric, last exception:", lastException);
     LOG.info("Could not load solr heap memory usage metric, last exception:", lastException);
   }
   }
 
 
-  private void addIndexSizeMetric(TimelineMetrics metrics) {
+  private void addIndexSizeMetric() {
     Exception lastException = null;
     Exception lastException = null;
-    for (int retries = 0; retries < 3; retries++) {
+    for (int retries = 0; retries < RETRY; retries++) {
       try {
       try {
         double indexSize = solrJmxAdapter.getIndexSize();
         double indexSize = solrJmxAdapter.getIndexSize();
-        addMetric("logsearch.solr.index.size", "Long", indexSize / 1024 / 1024 / 1024, metrics);
+        addMetric("logsearch.solr.index.size", "Long", indexSize / 1024 / 1024 / 1024);
         return;
         return;
       } catch (Exception e) {
       } catch (Exception e) {
         lastException = e;
         lastException = e;
@@ -125,7 +130,7 @@ public class SolrMetricsLoader extends TimerTask {
     LOG.info("Could not load solr index size metric, last exception:", lastException);
     LOG.info("Could not load solr index size metric, last exception:", lastException);
   }
   }
 
 
-  private void addMetric(String metricName, String type, Double value, TimelineMetrics metrics) {
+  private void addMetric(String metricName, String type, Double value) {
     Long currMS = System.currentTimeMillis();
     Long currMS = System.currentTimeMillis();
 
 
     TimelineMetric metric = new TimelineMetric();
     TimelineMetric metric = new TimelineMetric();
@@ -140,6 +145,31 @@ public class SolrMetricsLoader extends TimerTask {
     metrics.addOrMergeTimelineMetric(metric);
     metrics.addOrMergeTimelineMetric(metric);
   }
   }
 
 
+  private void emitMetrics() {
+    Exception lastException = null;
+    for (int retries = 0; retries < RETRY; retries++) {
+      try {
+        if (solrAmsClient.emitMetrics(metrics)) {
+          metrics.getMetrics().clear();
+          return;
+        }
+      } catch (Exception e) {
+        lastException = e;
+      }
+    }
+
+    LOG.info("Could not emit metrics, last exception:", lastException);
+  }
+
+  private void removeOverTheLimitMetrics() {
+    for (TimelineMetric metric : metrics.getMetrics()) {
+      TreeMap<Long, Double> metricValues = metric.getMetricValues();
+      while (metricValues.size() > MAX_METRIC_SIZE) {
+        metricValues.remove(metricValues.firstKey());
+      }
+    }
+  }
+
   public static void startSolrMetricsLoaderTasks() {
   public static void startSolrMetricsLoaderTasks() {
     try {
     try {
       String collectorHosts = PropertiesUtil.getProperty("metrics.collector.hosts");
       String collectorHosts = PropertiesUtil.getProperty("metrics.collector.hosts");
@@ -158,7 +188,7 @@ public class SolrMetricsLoader extends TimerTask {
       Collection<String> solrHosts = ambariSolrCloudClient.getSolrHosts();
       Collection<String> solrHosts = ambariSolrCloudClient.getSolrHosts();
       for (String solrHost : solrHosts) {
       for (String solrHost : solrHosts) {
         SolrMetricsLoader sml = new SolrMetricsLoader(solrHost, solrJmxPort, collectorHosts);
         SolrMetricsLoader sml = new SolrMetricsLoader(solrHost, solrJmxPort, collectorHosts);
-        Timer timer = new Timer("Solr Metrics Loader - " + solrHost, false);
+        Timer timer = new Timer("Solr Metrics Loader - " + solrHost, true);
         timer.scheduleAtFixedRate(sml, 0, 10000);
         timer.scheduleAtFixedRate(sml, 0, 10000);
       }
       }
     } catch (Exception e) {
     } catch (Exception e) {

+ 6 - 3
ambari-metrics/ambari-metrics-common/src/main/java/org/apache/hadoop/metrics2/sink/timeline/AbstractTimelineMetricsSink.java

@@ -79,7 +79,7 @@ public abstract class AbstractTimelineMetricsSink {
     LOG = LogFactory.getLog(this.getClass());
     LOG = LogFactory.getLog(this.getClass());
   }
   }
 
 
-  protected void emitMetricsJson(String connectUrl, String jsonData) {
+  protected boolean emitMetricsJson(String connectUrl, String jsonData) {
     int timeout = getTimeoutSeconds() * 1000;
     int timeout = getTimeoutSeconds() * 1000;
     HttpURLConnection connection = null;
     HttpURLConnection connection = null;
     try {
     try {
@@ -115,6 +115,7 @@ public abstract class AbstractTimelineMetricsSink {
       cleanupInputStream(connection.getInputStream());
       cleanupInputStream(connection.getInputStream());
       //reset failedCollectorConnectionsCounter to "0"
       //reset failedCollectorConnectionsCounter to "0"
       failedCollectorConnectionsCounter.set(0);
       failedCollectorConnectionsCounter.set(0);
+      return true;
     } catch (IOException ioe) {
     } catch (IOException ioe) {
       StringBuilder errorMessage =
       StringBuilder errorMessage =
           new StringBuilder("Unable to connect to collector, " + connectUrl + "\n"
           new StringBuilder("Unable to connect to collector, " + connectUrl + "\n"
@@ -139,11 +140,12 @@ public abstract class AbstractTimelineMetricsSink {
         if (LOG.isDebugEnabled()) {
         if (LOG.isDebugEnabled()) {
           LOG.debug(String.format("Ignoring %s AMS connection exceptions", NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS));
           LOG.debug(String.format("Ignoring %s AMS connection exceptions", NUMBER_OF_SKIPPED_COLLECTOR_EXCEPTIONS));
         }
         }
+        return false;
       }
       }
     }
     }
   }
   }
 
 
-  protected void emitMetrics(TimelineMetrics metrics) {
+  protected boolean emitMetrics(TimelineMetrics metrics) {
     String connectUrl = getCollectorUri();
     String connectUrl = getCollectorUri();
     String jsonData = null;
     String jsonData = null;
     try {
     try {
@@ -152,8 +154,9 @@ public abstract class AbstractTimelineMetricsSink {
       LOG.error("Unable to parse metrics", e);
       LOG.error("Unable to parse metrics", e);
     }
     }
     if (jsonData != null) {
     if (jsonData != null) {
-      emitMetricsJson(connectUrl, jsonData);
+      return emitMetricsJson(connectUrl, jsonData);
     }
     }
+    return false;
   }
   }
 
 
   /**
   /**

+ 2 - 2
ambari-metrics/ambari-metrics-common/src/test/java/org/apache/hadoop/metrics2/sink/timeline/cache/HandleConnectExceptionTest.java

@@ -98,8 +98,8 @@ public class HandleConnectExceptionTest {
     }
     }
 
 
     @Override
     @Override
-    public void emitMetrics(TimelineMetrics metrics) {
-      super.emitMetrics(metrics);
+    public boolean emitMetrics(TimelineMetrics metrics) {
+      return super.emitMetrics(metrics);
     }
     }
   }
   }
 }
 }