|
@@ -40,16 +40,17 @@ import java.util.regex.Pattern;
|
|
|
|
|
|
import org.apache.ambari.server.controller.internal.AbstractPropertyProvider;
|
|
|
import org.apache.ambari.server.controller.internal.PropertyInfo;
|
|
|
-import org.apache.ambari.server.controller.spi.NoSuchParentResourceException;
|
|
|
import org.apache.ambari.server.controller.spi.Predicate;
|
|
|
import org.apache.ambari.server.controller.spi.Request;
|
|
|
import org.apache.ambari.server.controller.spi.Resource;
|
|
|
import org.apache.ambari.server.controller.spi.SystemException;
|
|
|
-import org.apache.ambari.server.controller.spi.UnsupportedPropertyException;
|
|
|
+import org.apache.ambari.server.controller.utilities.PropertyHelper;
|
|
|
import org.apache.ambari.server.controller.utilities.StreamProvider;
|
|
|
+import org.codehaus.jackson.JsonFactory;
|
|
|
import org.codehaus.jackson.map.DeserializationConfig;
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
|
import org.codehaus.jackson.map.ObjectReader;
|
|
|
+import org.codehaus.jackson.type.TypeReference;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
@@ -58,12 +59,17 @@ import org.slf4j.LoggerFactory;
|
|
|
*/
|
|
|
public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
|
|
|
+
|
|
|
+ // TODO: HACK, value is set at populateResource()
|
|
|
+ private String componentName = null;
|
|
|
+
|
|
|
private static final String NAME_KEY = "name";
|
|
|
private static final String PORT_KEY = "tag.port";
|
|
|
private static final String DOT_REPLACEMENT_CHAR = "#";
|
|
|
private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
|
|
|
|
|
|
public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics.";
|
|
|
+ public static final String STORM_REST_API = "STORM_REST_API";
|
|
|
|
|
|
/**
|
|
|
* Thread pool
|
|
@@ -89,7 +95,8 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
EXECUTOR_SERVICE = threadPoolExecutor;
|
|
|
}
|
|
|
|
|
|
- private final static ObjectReader objectReader;
|
|
|
+ private final static ObjectReader jmxObjectReader;
|
|
|
+ private final static ObjectReader stormObjectReader;
|
|
|
|
|
|
private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
|
|
|
|
|
@@ -104,10 +111,19 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
DEFAULT_JMX_PORTS.put("HISTORYSERVER", "19888");
|
|
|
DEFAULT_JMX_PORTS.put("NODEMANAGER", "8042");
|
|
|
DEFAULT_JMX_PORTS.put("JOURNALNODE", "8480");
|
|
|
-
|
|
|
- ObjectMapper objectMapper = new ObjectMapper();
|
|
|
- objectMapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
|
|
|
- objectReader = objectMapper.reader(JMXMetricHolder.class);
|
|
|
+ DEFAULT_JMX_PORTS.put("STORM_REST_API", "8745");
|
|
|
+
|
|
|
+ ObjectMapper jmxObjectMapper = new ObjectMapper();
|
|
|
+ jmxObjectMapper.configure(DeserializationConfig.Feature.USE_ANNOTATIONS, false);
|
|
|
+ jmxObjectReader = jmxObjectMapper.reader(JMXMetricHolder.class);
|
|
|
+
|
|
|
+ JsonFactory factory = new JsonFactory();
|
|
|
+ ObjectMapper stormObjectMapper = new ObjectMapper(factory);
|
|
|
+ TypeReference<HashMap<String,Object>> typeRef
|
|
|
+ = new TypeReference<
|
|
|
+ HashMap<String,Object>
|
|
|
+ >() {};
|
|
|
+ stormObjectReader = jmxObjectMapper.reader(typeRef);
|
|
|
}
|
|
|
|
|
|
protected final static Logger LOG =
|
|
@@ -233,7 +249,10 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
* @return the spec
|
|
|
*/
|
|
|
protected String getSpec(String protocol, String hostName, String port) {
|
|
|
- return protocol + "://" + hostName + ":" + port + "/jmx";
|
|
|
+ if (null == componentName || !componentName.equals(STORM_REST_API))
|
|
|
+ return protocol + "://" + hostName + ":" + port + "/jmx";
|
|
|
+ else
|
|
|
+ return protocol + "://" + hostName + ":" + port + "/api/cluster/summary";
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -301,7 +320,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- String componentName = (String) resource.getPropertyValue(componentNamePropertyId);
|
|
|
+ componentName = (String) resource.getPropertyValue(componentNamePropertyId);
|
|
|
|
|
|
if (getComponentMetrics().get(componentName) == null) {
|
|
|
// If there are no metrics defined for the given component then there is nothing to do.
|
|
@@ -327,91 +346,127 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
InputStream in = streamProvider.readFrom(getSpec(protocol, hostName, port));
|
|
|
|
|
|
try {
|
|
|
- JMXMetricHolder metricHolder = objectReader.readValue(in);
|
|
|
-
|
|
|
- Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
|
|
|
|
|
|
- for (Map<String, Object> bean : metricHolder.getBeans()) {
|
|
|
- String category = getCategory(bean);
|
|
|
- if (category != null) {
|
|
|
- categories.put(category, bean);
|
|
|
- }
|
|
|
+ if (null == componentName || !componentName.equals(STORM_REST_API)) {
|
|
|
+ getHadoopMetricValue(in, ids, resource, request);
|
|
|
+ } else {
|
|
|
+ getStormMetricValue(in, ids, resource, request);
|
|
|
}
|
|
|
|
|
|
- for (String propertyId : ids) {
|
|
|
- Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
|
|
|
+ } finally {
|
|
|
+ in.close();
|
|
|
+ }
|
|
|
+ } catch (IOException e) {
|
|
|
+ logException(e);
|
|
|
+ }
|
|
|
+ return resource;
|
|
|
+ }
|
|
|
|
|
|
- String requestedPropertyId = propertyId;
|
|
|
+ /**
|
|
|
+ * Hadoop-specific metrics fetching
|
|
|
+ */
|
|
|
+ private void getHadoopMetricValue(InputStream in, Set<String> ids,
|
|
|
+ Resource resource, Request request) throws IOException {
|
|
|
+ JMXMetricHolder metricHolder = jmxObjectReader.readValue(in);
|
|
|
|
|
|
- for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
|
|
|
+ Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
|
|
|
|
|
|
- PropertyInfo propertyInfo = entry.getValue();
|
|
|
- propertyId = entry.getKey();
|
|
|
+ for (Map<String, Object> bean : metricHolder.getBeans()) {
|
|
|
+ String category = getCategory(bean);
|
|
|
+ if (category != null) {
|
|
|
+ categories.put(category, bean);
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if (propertyInfo.isPointInTime()) {
|
|
|
+ for (String propertyId : ids) {
|
|
|
+ Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
|
|
|
|
|
|
- String property = propertyInfo.getPropertyId();
|
|
|
- String category = "";
|
|
|
+ String requestedPropertyId = propertyId;
|
|
|
|
|
|
-
|
|
|
- List<String> keyList = new LinkedList<String>();
|
|
|
-
|
|
|
- int keyStartIndex = property.indexOf('[');
|
|
|
- if (-1 != keyStartIndex) {
|
|
|
- int keyEndIndex = property.indexOf(']', keyStartIndex);
|
|
|
- if (-1 != keyEndIndex && keyEndIndex > keyStartIndex) {
|
|
|
- keyList.add(property.substring(keyStartIndex+1, keyEndIndex));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- if (!containsArguments(propertyId)) {
|
|
|
- int dotIndex = property.indexOf('.', property.indexOf('='));
|
|
|
- if (-1 != dotIndex) {
|
|
|
- category = property.substring(0, dotIndex);
|
|
|
- property = (-1 == keyStartIndex) ?
|
|
|
+ for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
|
|
|
+
|
|
|
+ PropertyInfo propertyInfo = entry.getValue();
|
|
|
+ propertyId = entry.getKey();
|
|
|
+
|
|
|
+ if (propertyInfo.isPointInTime()) {
|
|
|
+
|
|
|
+ String property = propertyInfo.getPropertyId();
|
|
|
+ String category = "";
|
|
|
+
|
|
|
+
|
|
|
+ List<String> keyList = new LinkedList<String>();
|
|
|
+
|
|
|
+ int keyStartIndex = property.indexOf('[');
|
|
|
+ if (-1 != keyStartIndex) {
|
|
|
+ int keyEndIndex = property.indexOf(']', keyStartIndex);
|
|
|
+ if (-1 != keyEndIndex && keyEndIndex > keyStartIndex) {
|
|
|
+ keyList.add(property.substring(keyStartIndex+1, keyEndIndex));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!containsArguments(propertyId)) {
|
|
|
+ int dotIndex = property.indexOf('.', property.indexOf('='));
|
|
|
+ if (-1 != dotIndex) {
|
|
|
+ category = property.substring(0, dotIndex);
|
|
|
+ property = (-1 == keyStartIndex) ?
|
|
|
property.substring(dotIndex+1) :
|
|
|
- property.substring(dotIndex+1, keyStartIndex);
|
|
|
- }
|
|
|
- } else {
|
|
|
- int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
|
|
|
- int dotIndex = property.lastIndexOf('.', firstKeyIndex);
|
|
|
+ property.substring(dotIndex+1, keyStartIndex);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
|
|
|
+ int dotIndex = property.lastIndexOf('.', firstKeyIndex);
|
|
|
|
|
|
- if (dotIndex != -1) {
|
|
|
- category = property.substring(0, dotIndex);
|
|
|
- property = property.substring(dotIndex + 1, firstKeyIndex);
|
|
|
- }
|
|
|
- }
|
|
|
+ if (dotIndex != -1) {
|
|
|
+ category = property.substring(0, dotIndex);
|
|
|
+ property = property.substring(dotIndex + 1, firstKeyIndex);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ if (containsArguments(propertyId)) {
|
|
|
+ Pattern pattern = Pattern.compile(category);
|
|
|
|
|
|
- if (containsArguments(propertyId)) {
|
|
|
- Pattern pattern = Pattern.compile(category);
|
|
|
-
|
|
|
- // find all jmx categories that match the regex
|
|
|
- for (String jmxCat : categories.keySet()) {
|
|
|
- Matcher matcher = pattern.matcher(jmxCat);
|
|
|
- if (matcher.matches()) {
|
|
|
- String newPropertyId = propertyId;
|
|
|
- for (int i = 0; i < matcher.groupCount(); i++) {
|
|
|
- newPropertyId = substituteArgument(newPropertyId, "$" + (i + 1), matcher.group(i + 1));
|
|
|
- }
|
|
|
- // We need to do the final filtering here, after the argument substitution
|
|
|
- if (isRequestedPropertyId(newPropertyId, requestedPropertyId, request)) {
|
|
|
- setResourceValue(resource, categories, newPropertyId, jmxCat, property, keyList);
|
|
|
- }
|
|
|
- }
|
|
|
+ // find all jmx categories that match the regex
|
|
|
+ for (String jmxCat : categories.keySet()) {
|
|
|
+ Matcher matcher = pattern.matcher(jmxCat);
|
|
|
+ if (matcher.matches()) {
|
|
|
+ String newPropertyId = propertyId;
|
|
|
+ for (int i = 0; i < matcher.groupCount(); i++) {
|
|
|
+ newPropertyId = substituteArgument(newPropertyId, "$" + (i + 1), matcher.group(i + 1));
|
|
|
+ }
|
|
|
+ // We need to do the final filtering here, after the argument substitution
|
|
|
+ if (isRequestedPropertyId(newPropertyId, requestedPropertyId, request)) {
|
|
|
+ setResourceValue(resource, categories, newPropertyId, jmxCat, property, keyList);
|
|
|
}
|
|
|
- } else {
|
|
|
- setResourceValue(resource, categories, propertyId, category, property, keyList);
|
|
|
}
|
|
|
}
|
|
|
+ } else {
|
|
|
+ setResourceValue(resource, categories, propertyId, category, property, keyList);
|
|
|
}
|
|
|
}
|
|
|
- } finally {
|
|
|
- in.close();
|
|
|
}
|
|
|
- } catch (IOException e) {
|
|
|
- logException(e);
|
|
|
}
|
|
|
- return resource;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * TODO: Refactor
|
|
|
+ * Storm-specific metrics fetching
|
|
|
+ */
|
|
|
+ private void getStormMetricValue(InputStream in, Set<String> ids,
|
|
|
+ Resource resource, Request request) throws IOException {
|
|
|
+ HashMap<String, Object> metricHolder = stormObjectReader.readValue(in);
|
|
|
+ for (String category : ids) {
|
|
|
+ Map<String, PropertyInfo> defProps = getComponentMetrics().get(STORM_REST_API);
|
|
|
+ for (String depProp : defProps.keySet()) {
|
|
|
+ if (depProp.startsWith(category)) {
|
|
|
+ PropertyInfo propInfo = defProps.get(depProp);
|
|
|
+ String propName = propInfo.getPropertyId();
|
|
|
+ Object propertyValue = metricHolder.get(propName);
|
|
|
+ String absId = PropertyHelper.getPropertyId(category, propName);
|
|
|
+ // TODO: Maybe cast to int
|
|
|
+ resource.setProperty(absId, propertyValue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void setResourceValue(Resource resource, Map<String, Map<String, Object>> categories, String propertyId,
|