|
@@ -22,19 +22,14 @@ import org.apache.ambari.server.controller.internal.PropertyInfo;
|
|
|
import org.apache.ambari.server.controller.spi.*;
|
|
|
import org.apache.ambari.server.controller.utilities.PropertyHelper;
|
|
|
import org.apache.ambari.server.controller.utilities.StreamProvider;
|
|
|
-import org.codehaus.jackson.map.ObjectMapper;
|
|
|
-import org.codehaus.jackson.type.TypeReference;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
+import java.io.BufferedReader;
|
|
|
import java.io.IOException;
|
|
|
-import java.util.Collection;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.InputStreamReader;
|
|
|
+import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* Abstract property provider implementation for a Ganglia source.
|
|
@@ -59,6 +54,7 @@ public abstract class GangliaPropertyProvider implements PropertyProvider {
|
|
|
private final String componentNamePropertyId;
|
|
|
|
|
|
|
|
|
+
|
|
|
/**
|
|
|
* Map of Ganglia cluster names keyed by component type.
|
|
|
*/
|
|
@@ -92,7 +88,8 @@ public abstract class GangliaPropertyProvider implements PropertyProvider {
|
|
|
this.hostNamePropertyId = hostNamePropertyId;
|
|
|
this.componentNamePropertyId = componentNamePropertyId;
|
|
|
|
|
|
- propertyIds = new HashSet<String>();
|
|
|
+ propertyIds = new HashSet<String>();
|
|
|
+
|
|
|
for (Map.Entry<String, Map<String, PropertyInfo>> entry : componentPropertyInfoMap.entrySet()) {
|
|
|
propertyIds.addAll(entry.getValue().keySet());
|
|
|
}
|
|
@@ -118,9 +115,11 @@ public abstract class GangliaPropertyProvider implements PropertyProvider {
|
|
|
for (Map.Entry<String, Map<TemporalInfo, RRDRequest>> clusterEntry : requestMap.entrySet()) {
|
|
|
// For each request ...
|
|
|
for (RRDRequest rrdRequest : clusterEntry.getValue().values() ) {
|
|
|
+ //todo: property provider can reduce set of resources
|
|
|
keepers.addAll(rrdRequest.populateResources());
|
|
|
}
|
|
|
}
|
|
|
+ //todo: ignoring keepers returned by the provider
|
|
|
return resources;
|
|
|
}
|
|
|
|
|
@@ -311,6 +310,7 @@ public abstract class GangliaPropertyProvider implements PropertyProvider {
|
|
|
}
|
|
|
else {
|
|
|
sb.append("&e=now");
|
|
|
+ sb.append("&pt=true");
|
|
|
}
|
|
|
|
|
|
return sb.toString();
|
|
@@ -414,31 +414,76 @@ public abstract class GangliaPropertyProvider implements PropertyProvider {
|
|
|
public Collection<Resource> populateResources() throws SystemException {
|
|
|
|
|
|
String spec = getSpec(clusterName, clusterSet, hostSet, metrics.keySet(), temporalInfo);
|
|
|
- Collection<Resource> populatedResources = new HashSet<Resource>();
|
|
|
-
|
|
|
+ BufferedReader reader = null;
|
|
|
try {
|
|
|
- List<GangliaMetric> gangliaMetrics = new ObjectMapper().readValue(getStreamProvider().readFrom(spec),
|
|
|
- new TypeReference<List<GangliaMetric>>() {
|
|
|
- });
|
|
|
-
|
|
|
- if (gangliaMetrics != null) {
|
|
|
- for (GangliaMetric gangliaMetric : gangliaMetrics) {
|
|
|
-
|
|
|
- ResourceKey key = new ResourceKey(gangliaMetric.getHost_name(), gangliaMetric.getCluster_name());
|
|
|
- Set<Resource> resourceSet = resources.get(key);
|
|
|
- if (resourceSet != null) {
|
|
|
- for (Resource resource : resourceSet) {
|
|
|
- populateResource(resource, gangliaMetric);
|
|
|
- }
|
|
|
+ reader = new BufferedReader(new InputStreamReader(
|
|
|
+ getStreamProvider().readFrom(spec)));
|
|
|
+
|
|
|
+ int startTime = convertToNumber(reader.readLine()).intValue();
|
|
|
+
|
|
|
+ String dsName = reader.readLine();
|
|
|
+ while(! dsName.equals("[AMBARI_END]")) {
|
|
|
+ GangliaMetric metric = new GangliaMetric();
|
|
|
+ List<GangliaMetric.TemporalMetric> listTemporalMetrics =
|
|
|
+ new ArrayList<GangliaMetric.TemporalMetric>();
|
|
|
+
|
|
|
+ metric.setDs_name(dsName);
|
|
|
+ metric.setCluster_name(reader.readLine());
|
|
|
+ metric.setHost_name(reader.readLine());
|
|
|
+ metric.setMetric_name(reader.readLine());
|
|
|
+
|
|
|
+ int time = convertToNumber(reader.readLine()).intValue();
|
|
|
+ int step = convertToNumber(reader.readLine()).intValue();
|
|
|
+
|
|
|
+ String val = reader.readLine();
|
|
|
+ while(! val.equals("[AMBARI_DP_END]")) {
|
|
|
+ listTemporalMetrics.add(
|
|
|
+ new GangliaMetric.TemporalMetric(convertToNumber(val), time));
|
|
|
+ time += step;
|
|
|
+ val = reader.readLine();
|
|
|
+ }
|
|
|
+
|
|
|
+ //todo: change setter in GangliaMetric to take collection
|
|
|
+ Number[][] datapointsArray = new Number[listTemporalMetrics.size()][2];
|
|
|
+ for (int i = 0; i < listTemporalMetrics.size(); ++i) {
|
|
|
+ GangliaMetric.TemporalMetric m = listTemporalMetrics.get(i);
|
|
|
+ datapointsArray[i][0] = m.getValue();
|
|
|
+ datapointsArray[i][1] = m.getTime();
|
|
|
+ }
|
|
|
+ metric.setDatapoints(datapointsArray);
|
|
|
+
|
|
|
+ ResourceKey key = new ResourceKey(metric.getHost_name(), metric.getCluster_name());
|
|
|
+ Set<Resource> resourceSet = resources.get(key);
|
|
|
+ if (resourceSet != null) {
|
|
|
+ for (Resource resource : resourceSet) {
|
|
|
+ populateResource(resource, metric);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ dsName = reader.readLine();
|
|
|
+ }
|
|
|
+ int endTime = convertToNumber(reader.readLine()).intValue();
|
|
|
+
|
|
|
+ if (LOG.isInfoEnabled()) {
|
|
|
+ LOG.info("Ganglia resource population time: " + (endTime - startTime));
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
if (LOG.isErrorEnabled()) {
|
|
|
LOG.error("Caught exception getting Ganglia metrics : spec=" + spec, e);
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ if (reader != null) {
|
|
|
+ try {
|
|
|
+ reader.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ if (LOG.isWarnEnabled()) {
|
|
|
+ LOG.warn("Unable to close http input steam : spec=" + spec, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
- return populatedResources;
|
|
|
+ //todo: filter out resources and return keepers
|
|
|
+ return Collections.emptySet();
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -462,6 +507,10 @@ public abstract class GangliaPropertyProvider implements PropertyProvider {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private Number convertToNumber(String s) {
|
|
|
+ return s.contains(".") ? Double.parseDouble(s) : Long.parseLong(s);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
|