|
@@ -6,9 +6,9 @@
|
|
|
* to you under the Apache License, Version 2.0 (the
|
|
|
* "License"); you may not use this file except in compliance
|
|
|
* with the License. You may obtain a copy of the License at
|
|
|
- *
|
|
|
- * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
- *
|
|
|
+ * <p/>
|
|
|
+ * http://www.apache.org/licenses/LICENSE-2.0
|
|
|
+ * <p/>
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
@@ -17,73 +17,177 @@
|
|
|
*/
|
|
|
package org.apache.ambari.server.metrics.system.impl;
|
|
|
|
|
|
-import java.io.IOException;
|
|
|
+
|
|
|
import java.net.InetAddress;
|
|
|
import java.net.UnknownHostException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.Collection;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.LinkedHashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.TreeMap;
|
|
|
|
|
|
-import org.apache.ambari.server.metrics.system.AmbariMetricSink;
|
|
|
-import org.apache.commons.lang.ClassUtils;
|
|
|
+import org.apache.ambari.server.controller.AmbariManagementController;
|
|
|
+import org.apache.ambari.server.controller.internal.AbstractControllerResourceProvider;
|
|
|
+import org.apache.ambari.server.controller.internal.ServiceConfigVersionResourceProvider;
|
|
|
+import org.apache.ambari.server.controller.spi.Predicate;
|
|
|
+import org.apache.ambari.server.controller.spi.Request;
|
|
|
+import org.apache.ambari.server.controller.spi.Resource;
|
|
|
+import org.apache.ambari.server.controller.spi.ResourceProvider;
|
|
|
+import org.apache.ambari.server.controller.utilities.PredicateBuilder;
|
|
|
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
|
|
|
+import org.apache.ambari.server.metrics.system.MetricsSink;
|
|
|
+import org.apache.ambari.server.metrics.system.SingleMetric;
|
|
|
+import org.apache.ambari.server.security.authorization.internal.InternalAuthenticationToken;
|
|
|
+import org.apache.ambari.server.state.Cluster;
|
|
|
+import org.apache.ambari.server.state.Clusters;
|
|
|
+import org.apache.ambari.server.state.Service;
|
|
|
+import org.apache.ambari.server.state.ServiceComponent;
|
|
|
+import org.apache.ambari.server.state.ServiceComponentHost;
|
|
|
+import org.apache.commons.collections.CollectionUtils;
|
|
|
+import org.apache.commons.lang.StringUtils;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
|
|
|
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetrics;
|
|
|
|
|
|
-import jline.internal.Log;
|
|
|
+import org.apache.hadoop.metrics2.sink.timeline.cache.TimelineMetricsCache;
|
|
|
+import org.springframework.security.core.context.SecurityContextHolder;
|
|
|
+
|
|
|
+public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements MetricsSink {
|
|
|
+ private static final String AMBARI_SERVER_APP_ID = "ambari_server";
|
|
|
+ private Collection<String> collectorHosts;
|
|
|
|
|
|
-public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements AmbariMetricSink {
|
|
|
- private static final String APP_ID = "ambari_server";
|
|
|
- private int timeoutSeconds = 10;
|
|
|
- private String collectorProtocol;
|
|
|
private String collectorUri;
|
|
|
+ private String port;
|
|
|
+ private String protocol;
|
|
|
private String hostName;
|
|
|
- private int counter = 0;
|
|
|
- private int frequency;
|
|
|
- private List<TimelineMetric> buffer = new ArrayList<>();
|
|
|
+ private AmbariManagementController ambariManagementController;
|
|
|
+ private TimelineMetricsCache timelineMetricsCache;
|
|
|
+ private boolean isInitialized = false;
|
|
|
+
|
|
|
+ public AmbariMetricSinkImpl(AmbariManagementController amc) {
|
|
|
+ this.ambariManagementController = amc;
|
|
|
+ }
|
|
|
+
|
|
|
@Override
|
|
|
- public void init(String protocol, String collectorUri, int frequency) {
|
|
|
+ public void init(MetricsConfiguration configuration) {
|
|
|
+
|
|
|
+ if (ambariManagementController == null) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ InternalAuthenticationToken authenticationToken = new InternalAuthenticationToken("admin");
|
|
|
+ authenticationToken.setAuthenticated(true);
|
|
|
+ SecurityContextHolder.getContext().setAuthentication(authenticationToken);
|
|
|
+ Clusters clusters = ambariManagementController.getClusters();
|
|
|
+ String ambariMetricsServiceName = "AMBARI_METRICS";
|
|
|
+ collectorHosts = new HashSet<>();
|
|
|
+
|
|
|
+ for (Map.Entry<String, Cluster> kv : clusters.getClusters().entrySet()) {
|
|
|
+ String clusterName = kv.getKey();
|
|
|
+ Cluster c = kv.getValue();
|
|
|
+ Resource.Type type = Resource.Type.ServiceConfigVersion;
|
|
|
+
|
|
|
+ Set<String> propertyIds = new HashSet<String>();
|
|
|
+ propertyIds.add(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
|
|
|
+
|
|
|
+ Predicate predicate = new PredicateBuilder().property(
|
|
|
+ ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CLUSTER_NAME_PROPERTY_ID).equals(clusterName).and().property(
|
|
|
+ ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_SERVICE_NAME_PROPERTY_ID).equals(ambariMetricsServiceName).and().property(
|
|
|
+ ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_IS_CURRENT_PROPERTY_ID).equals("true").toPredicate();
|
|
|
+
|
|
|
+ Request request = PropertyHelper.getReadRequest(propertyIds);
|
|
|
+
|
|
|
+ ResourceProvider provider = AbstractControllerResourceProvider.getResourceProvider(
|
|
|
+ type,
|
|
|
+ PropertyHelper.getPropertyIds(type),
|
|
|
+ PropertyHelper.getKeyPropertyIds(type),
|
|
|
+ ambariManagementController);
|
|
|
+
|
|
|
+ try {
|
|
|
+ //get collector host(s)
|
|
|
+ Service service = c.getService(ambariMetricsServiceName);
|
|
|
+ if (service != null) {
|
|
|
+ for (String component : service.getServiceComponents().keySet()) {
|
|
|
+ ServiceComponent sc = service.getServiceComponents().get(component);
|
|
|
+ for (ServiceComponentHost serviceComponentHost : sc.getServiceComponentHosts().values()) {
|
|
|
+ if (serviceComponentHost.getServiceComponentName().equals("METRICS_COLLECTOR")) {
|
|
|
+ collectorHosts.add(serviceComponentHost.getHostName());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // get collector port and protocol
|
|
|
+ Set<Resource> resources = provider.getResources(request, predicate);
|
|
|
+
|
|
|
+ for (Resource resource : resources) {
|
|
|
+ if (resource != null) {
|
|
|
+ ArrayList<LinkedHashMap<Object, Object>> configs = (ArrayList<LinkedHashMap<Object, Object>>)
|
|
|
+ resource.getPropertyValue(ServiceConfigVersionResourceProvider.SERVICE_CONFIG_VERSION_CONFIGURATIONS_PROPERTY_ID);
|
|
|
+ for (LinkedHashMap<Object, Object> config : configs) {
|
|
|
+ if (config != null && config.get("type").equals("ams-site")) {
|
|
|
+ TreeMap<Object, Object> properties = (TreeMap<Object, Object>) config.get("properties");
|
|
|
+ String timelineWebappAddress = (String) properties.get("timeline.metrics.service.webapp.address");
|
|
|
+ if (StringUtils.isNotEmpty(timelineWebappAddress) && timelineWebappAddress.contains(":")) {
|
|
|
+ port = timelineWebappAddress.split(":")[1];
|
|
|
+ }
|
|
|
+ String httpPolicy = (String) properties.get("timeline.metrics.service.http.policy");
|
|
|
+ protocol = httpPolicy.equals("HTTP_ONLY") ? "http" : "https";
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ LOG.info("Exception caught when retrieving Collector URI", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ collectorUri = getCollectorUri(findPreferredCollectHost());
|
|
|
+ hostName = configuration.getProperty("ambariserver.hostname.override", getDefaultLocalHostName());
|
|
|
+
|
|
|
+ int maxRowCacheSize = Integer.parseInt(configuration.getProperty(MAX_METRIC_ROW_CACHE_SIZE,
|
|
|
+ String.valueOf(TimelineMetricsCache.MAX_RECS_PER_NAME_DEFAULT)));
|
|
|
+ int metricsSendInterval = Integer.parseInt(configuration.getProperty(METRICS_SEND_INTERVAL,
|
|
|
+ String.valueOf(TimelineMetricsCache.MAX_EVICTION_TIME_MILLIS)));
|
|
|
|
|
|
- /**
|
|
|
- * Protocol is either HTTP or HTTPS, and the collectorURI is the domain name of the collector
|
|
|
- * An example of the complete collector URI might be: http://c6403.ambari.org/ws/v1/timeline/metrics
|
|
|
- */
|
|
|
- this.frequency = frequency;
|
|
|
- this.collectorProtocol = protocol;
|
|
|
- this.collectorUri = getCollectorUri(collectorUri);
|
|
|
+ timelineMetricsCache = new TimelineMetricsCache(maxRowCacheSize, metricsSendInterval);
|
|
|
|
|
|
+ if (CollectionUtils.isNotEmpty(collectorHosts)) {
|
|
|
+ isInitialized = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getDefaultLocalHostName() {
|
|
|
try {
|
|
|
- hostName = InetAddress.getLocalHost().getHostName();
|
|
|
+ return InetAddress.getLocalHost().getCanonicalHostName();
|
|
|
} catch (UnknownHostException e) {
|
|
|
- Log.info("Error getting host address");
|
|
|
+ LOG.info("Error getting host address");
|
|
|
}
|
|
|
+ return null;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
- public void publish(Map<String, Number> metricsMap) {
|
|
|
- List<TimelineMetric> metricsList = createMetricsList(metricsMap);
|
|
|
+ public void publish(List<SingleMetric> metrics) {
|
|
|
|
|
|
- if(counter > frequency) {
|
|
|
- TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
|
- timelineMetrics.setMetrics(buffer);
|
|
|
|
|
|
- String connectUrl = collectorUri;
|
|
|
- String jsonData = null;
|
|
|
- try {
|
|
|
- jsonData = mapper.writeValueAsString(timelineMetrics);
|
|
|
- } catch (IOException e) {
|
|
|
- LOG.error("Unable to parse metrics", e);
|
|
|
- }
|
|
|
- if (jsonData != null) {
|
|
|
- emitMetricsJson(connectUrl, jsonData);
|
|
|
+ //If Sink not yet initialized, drop the metrics on the floor.
|
|
|
+ if (isInitialized) {
|
|
|
+ List<TimelineMetric> metricList = getFilteredMetricList(metrics);
|
|
|
+ if (!metricList.isEmpty()) {
|
|
|
+ TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
|
+ timelineMetrics.setMetrics(metricList);
|
|
|
+ emitMetrics(timelineMetrics);
|
|
|
}
|
|
|
- counter = 0;
|
|
|
- } else {
|
|
|
- buffer.addAll(metricsList);
|
|
|
- counter++;
|
|
|
}
|
|
|
+ }
|
|
|
|
|
|
+ @Override
|
|
|
+ public boolean isInitialized() {
|
|
|
+ return isInitialized;
|
|
|
}
|
|
|
|
|
|
|
|
@@ -94,22 +198,22 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
|
|
|
*/
|
|
|
@Override
|
|
|
protected String getCollectorUri(String host) {
|
|
|
- return getCollectorProtocol() + "://" + host + WS_V1_TIMELINE_METRICS;
|
|
|
+ return constructContainerMetricUri(protocol, host, port);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected String getCollectorProtocol() {
|
|
|
- return collectorProtocol;
|
|
|
+ return protocol;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected String getCollectorPort() {
|
|
|
- return null;
|
|
|
+ return port;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
protected int getTimeoutSeconds() {
|
|
|
- return timeoutSeconds;
|
|
|
+ return 10;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -119,6 +223,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
|
|
|
*/
|
|
|
@Override
|
|
|
protected String getZookeeperQuorum() {
|
|
|
+ //Ignoring Zk Fallback.
|
|
|
return null;
|
|
|
}
|
|
|
|
|
@@ -129,7 +234,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
|
|
|
*/
|
|
|
@Override
|
|
|
protected Collection<String> getConfiguredCollectorHosts() {
|
|
|
- return null;
|
|
|
+ return collectorHosts;
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -142,16 +247,22 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
|
|
|
return hostName;
|
|
|
}
|
|
|
|
|
|
- private List<TimelineMetric> createMetricsList(Map<String, Number> metricsMap) {
|
|
|
- final List<TimelineMetric> metricsList = new ArrayList<>();
|
|
|
- for (Map.Entry<String, Number> entry : metricsMap.entrySet()) {
|
|
|
- final long currentTimeMillis = System.currentTimeMillis();
|
|
|
- String metricsName = entry.getKey();
|
|
|
- Number value = entry.getValue();
|
|
|
- TimelineMetric metric = createTimelineMetric(currentTimeMillis, APP_ID, metricsName, value);
|
|
|
- metricsList.add(metric);
|
|
|
+ private List<TimelineMetric> getFilteredMetricList(List<SingleMetric> metrics) {
|
|
|
+ final List<TimelineMetric> metricList = new ArrayList<>();
|
|
|
+ for (SingleMetric metric : metrics) {
|
|
|
+
|
|
|
+ String metricName = metric.getMetricName();
|
|
|
+ Double value = metric.getValue();
|
|
|
+
|
|
|
+ TimelineMetric timelineMetric = createTimelineMetric(metric.getTimestamp(), AMBARI_SERVER_APP_ID, metricName, value);
|
|
|
+ timelineMetricsCache.putTimelineMetric(timelineMetric, false);
|
|
|
+ TimelineMetric cachedMetric = timelineMetricsCache.getTimelineMetric(metricName);
|
|
|
+
|
|
|
+ if (cachedMetric != null) {
|
|
|
+ metricList.add(cachedMetric);
|
|
|
+ }
|
|
|
}
|
|
|
- return metricsList;
|
|
|
+ return metricList;
|
|
|
}
|
|
|
|
|
|
private TimelineMetric createTimelineMetric(long currentTimeMillis, String component, String attributeName,
|
|
@@ -161,7 +272,7 @@ public class AmbariMetricSinkImpl extends AbstractTimelineMetricsSink implements
|
|
|
timelineMetric.setHostName(hostName);
|
|
|
timelineMetric.setAppId(component);
|
|
|
timelineMetric.setStartTime(currentTimeMillis);
|
|
|
- timelineMetric.setType(ClassUtils.getShortCanonicalName(attributeValue, "Number"));
|
|
|
+
|
|
|
timelineMetric.getMetricValues().put(currentTimeMillis, attributeValue.doubleValue());
|
|
|
return timelineMetric;
|
|
|
}
|