|
@@ -36,6 +36,15 @@ import java.util.LinkedList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
import java.util.Set;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.CompletionService;
|
|
|
+import java.util.concurrent.ExecutionException;
|
|
|
+import java.util.concurrent.ExecutorCompletionService;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.Future;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
/**
|
|
|
* Property provider implementation for JMX sources.
|
|
@@ -45,24 +54,37 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
private static final String NAME_KEY = "name";
|
|
|
private static final String PORT_KEY = "tag.port";
|
|
|
|
|
|
- private final StreamProvider streamProvider;
|
|
|
+ private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
|
|
|
|
|
|
- private final JMXHostProvider jmxHostProvider;
|
|
|
+ public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics.";
|
|
|
|
|
|
- private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
|
|
|
-
|
|
|
- private final String clusterNamePropertyId;
|
|
|
+ /**
|
|
|
+ * Thread pool
|
|
|
+ */
|
|
|
+ private static final ExecutorService EXECUTOR_SERVICE;
|
|
|
+ private static final int THREAD_POOL_CORE_SIZE = 20;
|
|
|
+ private static final int THREAD_POOL_MAX_SIZE = 100;
|
|
|
+ private static final long THREAD_POOL_TIMEOUT_MILLIS = 30000L;
|
|
|
|
|
|
- private final String hostNamePropertyId;
|
|
|
+ static {
|
|
|
+ LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>(); // unlimited Queue
|
|
|
|
|
|
- private final String componentNamePropertyId;
|
|
|
+ ThreadPoolExecutor threadPoolExecutor =
|
|
|
+ new ThreadPoolExecutor(
|
|
|
+ THREAD_POOL_CORE_SIZE,
|
|
|
+ THREAD_POOL_MAX_SIZE,
|
|
|
+ THREAD_POOL_TIMEOUT_MILLIS,
|
|
|
+ TimeUnit.MILLISECONDS,
|
|
|
+ queue);
|
|
|
|
|
|
- private final String statePropertyId;
|
|
|
+ threadPoolExecutor.allowCoreThreadTimeOut(true);
|
|
|
|
|
|
- private final Set<String> healthyStates;
|
|
|
+ EXECUTOR_SERVICE = threadPoolExecutor;
|
|
|
+ }
|
|
|
|
|
|
private final static ObjectReader objectReader;
|
|
|
|
|
|
+ private static final Map<String, String> DEFAULT_JMX_PORTS = new HashMap<String, String>();
|
|
|
|
|
|
static {
|
|
|
DEFAULT_JMX_PORTS.put("NAMENODE", "50070");
|
|
@@ -80,6 +102,27 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
protected final static Logger LOG =
|
|
|
LoggerFactory.getLogger(JMXPropertyProvider.class);
|
|
|
|
|
|
+ private final StreamProvider streamProvider;
|
|
|
+
|
|
|
+ private final JMXHostProvider jmxHostProvider;
|
|
|
+
|
|
|
+ private final String clusterNamePropertyId;
|
|
|
+
|
|
|
+ private final String hostNamePropertyId;
|
|
|
+
|
|
|
+ private final String componentNamePropertyId;
|
|
|
+
|
|
|
+ private final String statePropertyId;
|
|
|
+
|
|
|
+ private final Set<String> healthyStates;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The amount of time that this provider will wait for JMX metric values to be
|
|
|
+ * returned from the JMX sources. If no results are returned for this amount of
|
|
|
+ * time then the request to populate the resources will fail.
|
|
|
+ */
|
|
|
+ protected long populateTimeout = DEFAULT_POPULATE_TIMEOUT_MILLIS;
|
|
|
+
|
|
|
|
|
|
// ----- Constructors ------------------------------------------------------
|
|
|
|
|
@@ -122,12 +165,38 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
|
|
|
throws SystemException {
|
|
|
|
|
|
+ CompletionService<Resource> completionService =
|
|
|
+ new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
|
|
|
|
|
|
- Set<Resource> keepers = new HashSet<Resource>();
|
|
|
+ // In a large cluster we could have thousands of resources to populate here.
|
|
|
+ // Distribute the work across multiple threads.
|
|
|
for (Resource resource : resources) {
|
|
|
- if (populateResource(resource, request, predicate)) {
|
|
|
- keepers.add(resource);
|
|
|
+ completionService.submit(getPopulateResourceCallable(resource, request, predicate));
|
|
|
+ }
|
|
|
+
|
|
|
+ Set<Resource> keepers = new HashSet<Resource>();
|
|
|
+ try {
|
|
|
+ for (int i = 0; i < resources.size(); ++ i) {
|
|
|
+ Future<Resource> resourceFuture =
|
|
|
+ completionService.poll(populateTimeout, TimeUnit.MILLISECONDS);
|
|
|
+
|
|
|
+ if (resourceFuture == null) {
|
|
|
+ // its been more than the populateTimeout since the last callable completed ...
|
|
|
+ // don't wait any longer
|
|
|
+ LOG.error(TIMED_OUT_MSG);
|
|
|
+ break;
|
|
|
+ } else {
|
|
|
+ // future should already be completed... no need to wait on get
|
|
|
+ Resource resource = resourceFuture.get();
|
|
|
+ if (resource != null) {
|
|
|
+ keepers.add(resource);
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logException(e);
|
|
|
+ } catch (ExecutionException e) {
|
|
|
+ rethrowSystemException(e.getCause());
|
|
|
}
|
|
|
return keepers;
|
|
|
}
|
|
@@ -135,6 +204,15 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
|
|
|
// ----- helper methods ----------------------------------------------------
|
|
|
|
|
|
+ /**
|
|
|
+ * Set the populate timeout value for this provider.
|
|
|
+ *
|
|
|
+ * @param populateTimeout the populate timeout value
|
|
|
+ */
|
|
|
+ protected void setPopulateTimeout(long populateTimeout) {
|
|
|
+ this.populateTimeout = populateTimeout;
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Get the spec to locate the JMX stream from the given host and port
|
|
|
*
|
|
@@ -147,6 +225,24 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
return "http://" + hostName + ":" + port + "/jmx";
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * Get a callable that can be used to populate the given resource.
|
|
|
+ *
|
|
|
+ * @param resource the resource to be populated
|
|
|
+ * @param request the request
|
|
|
+ * @param predicate the predicate
|
|
|
+ *
|
|
|
+ * @return a callable that can be used to populate the given resource
|
|
|
+ */
|
|
|
+ private Callable<Resource> getPopulateResourceCallable(
|
|
|
+ final Resource resource, final Request request, final Predicate predicate) {
|
|
|
+ return new Callable<Resource>() {
|
|
|
+ public Resource call() throws SystemException {
|
|
|
+ return populateResource(resource, request, predicate);
|
|
|
+ }
|
|
|
+ };
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* Populate a resource by obtaining the requested JMX properties.
|
|
|
*
|
|
@@ -154,22 +250,22 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
* @param request the request
|
|
|
* @param predicate the predicate
|
|
|
*
|
|
|
- * @return true if the resource should be part of the result set for the given predicate
|
|
|
+ * @return the populated resource; null if the resource should NOT be part of the result set for the given predicate
|
|
|
*/
|
|
|
- private boolean populateResource(Resource resource, Request request, Predicate predicate)
|
|
|
+ private Resource populateResource(Resource resource, Request request, Predicate predicate)
|
|
|
throws SystemException {
|
|
|
|
|
|
Set<String> ids = getRequestPropertyIds(request, predicate);
|
|
|
if (ids.isEmpty()) {
|
|
|
// no properties requested
|
|
|
- return true;
|
|
|
+ return resource;
|
|
|
}
|
|
|
|
|
|
// Don't attempt to get the JMX properties if the resource is in an unhealthy state
|
|
|
if (statePropertyId != null) {
|
|
|
String state = (String) resource.getPropertyValue(statePropertyId);
|
|
|
if (state != null && !healthyStates.contains(state)) {
|
|
|
- return true;
|
|
|
+ return resource;
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -177,106 +273,98 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
|
|
|
if (getComponentMetrics().get(componentName) == null) {
|
|
|
// If there are no metrics defined for the given component then there is nothing to do.
|
|
|
- return true;
|
|
|
+ return resource;
|
|
|
}
|
|
|
|
|
|
String clusterName = (String) resource.getPropertyValue(clusterNamePropertyId);
|
|
|
|
|
|
String port = getPort(clusterName, componentName);
|
|
|
if (port == null) {
|
|
|
- String error = "Unable to get JMX metrics. No port value for " + componentName;
|
|
|
- logError(error, null);
|
|
|
- throw new SystemException(error, null);
|
|
|
+ throw new SystemException(
|
|
|
+ "Unable to get JMX metrics. No port value for " + componentName, null);
|
|
|
}
|
|
|
|
|
|
String hostName = getHost(resource, clusterName, componentName);
|
|
|
if (hostName == null) {
|
|
|
- String error = "Unable to get JMX metrics. No host name for " + componentName;
|
|
|
- logError(error, null);
|
|
|
- throw new SystemException(error, null);
|
|
|
+ throw new SystemException(
|
|
|
+ "Unable to get JMX metrics. No host name for " + componentName, null);
|
|
|
}
|
|
|
|
|
|
- String spec = getSpec(hostName, port);
|
|
|
- InputStream in = null;
|
|
|
try {
|
|
|
- in = streamProvider.readFrom(spec);
|
|
|
- JMXMetricHolder metricHolder = objectReader.readValue(in);
|
|
|
+ InputStream in = streamProvider.readFrom(getSpec(hostName, port));
|
|
|
|
|
|
- Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
|
|
|
+ try {
|
|
|
+ JMXMetricHolder metricHolder = objectReader.readValue(in);
|
|
|
|
|
|
- for (Map<String, Object> bean : metricHolder.getBeans()) {
|
|
|
- String category = getCategory(bean);
|
|
|
- if (category != null) {
|
|
|
- categories.put(category, bean);
|
|
|
+ Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
|
|
|
+
|
|
|
+ for (Map<String, Object> bean : metricHolder.getBeans()) {
|
|
|
+ String category = getCategory(bean);
|
|
|
+ if (category != null) {
|
|
|
+ categories.put(category, bean);
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- for (String propertyId : ids) {
|
|
|
- Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
|
|
|
+ for (String propertyId : ids) {
|
|
|
+ Map<String, PropertyInfo> propertyInfoMap = getPropertyInfoMap(componentName, propertyId);
|
|
|
|
|
|
- for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
|
|
|
+ for (Map.Entry<String, PropertyInfo> entry : propertyInfoMap.entrySet()) {
|
|
|
|
|
|
- PropertyInfo propertyInfo = entry.getValue();
|
|
|
- propertyId = entry.getKey();
|
|
|
+ PropertyInfo propertyInfo = entry.getValue();
|
|
|
+ propertyId = entry.getKey();
|
|
|
|
|
|
- if (propertyInfo.isPointInTime()) {
|
|
|
+ if (propertyInfo.isPointInTime()) {
|
|
|
|
|
|
- String property = propertyInfo.getPropertyId();
|
|
|
- String category = "";
|
|
|
+ String property = propertyInfo.getPropertyId();
|
|
|
+ String category = "";
|
|
|
|
|
|
- List<String> keyList = new LinkedList<String>();
|
|
|
- int keyStartIndex = property.indexOf('[', 0);
|
|
|
- int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
|
|
|
- while (keyStartIndex > -1) {
|
|
|
- int keyEndIndex = property.indexOf(']', keyStartIndex);
|
|
|
- if (keyEndIndex > -1 & keyEndIndex > keyStartIndex) {
|
|
|
- keyList.add(property.substring(keyStartIndex + 1, keyEndIndex));
|
|
|
- keyStartIndex = property.indexOf('[', keyEndIndex);
|
|
|
- }
|
|
|
- else {
|
|
|
- keyStartIndex = -1;
|
|
|
+ List<String> keyList = new LinkedList<String>();
|
|
|
+ int keyStartIndex = property.indexOf('[', 0);
|
|
|
+ int firstKeyIndex = keyStartIndex > -1 ? keyStartIndex : property.length();
|
|
|
+ while (keyStartIndex > -1) {
|
|
|
+ int keyEndIndex = property.indexOf(']', keyStartIndex);
|
|
|
+ if (keyEndIndex > -1 & keyEndIndex > keyStartIndex) {
|
|
|
+ keyList.add(property.substring(keyStartIndex + 1, keyEndIndex));
|
|
|
+ keyStartIndex = property.indexOf('[', keyEndIndex);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ keyStartIndex = -1;
|
|
|
+ }
|
|
|
}
|
|
|
- }
|
|
|
|
|
|
- int dotIndex = property.lastIndexOf('.', firstKeyIndex - 1);
|
|
|
- if (dotIndex != -1){
|
|
|
- category = property.substring(0, dotIndex);
|
|
|
- property = property.substring(dotIndex + 1, firstKeyIndex);
|
|
|
- }
|
|
|
+ int dotIndex = property.lastIndexOf('.', firstKeyIndex - 1);
|
|
|
+ if (dotIndex != -1){
|
|
|
+ category = property.substring(0, dotIndex);
|
|
|
+ property = property.substring(dotIndex + 1, firstKeyIndex);
|
|
|
+ }
|
|
|
|
|
|
- Map<String, Object> properties = categories.get(category);
|
|
|
- if (properties != null && properties.containsKey(property)) {
|
|
|
- Object value = properties.get(property);
|
|
|
- if (keyList.size() > 0 && value instanceof Map) {
|
|
|
- Map map = (Map) value;
|
|
|
- for (String key : keyList) {
|
|
|
- value = map.get(key);
|
|
|
- if (value instanceof Map) {
|
|
|
- map = (Map) value;
|
|
|
- }
|
|
|
- else {
|
|
|
- break;
|
|
|
+ Map<String, Object> properties = categories.get(category);
|
|
|
+ if (properties != null && properties.containsKey(property)) {
|
|
|
+ Object value = properties.get(property);
|
|
|
+ if (keyList.size() > 0 && value instanceof Map) {
|
|
|
+ Map map = (Map) value;
|
|
|
+ for (String key : keyList) {
|
|
|
+ value = map.get(key);
|
|
|
+ if (value instanceof Map) {
|
|
|
+ map = (Map) value;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
+ resource.setProperty(propertyId, value);
|
|
|
}
|
|
|
- resource.setProperty(propertyId, value);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ in.close();
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
- logError(spec, e);
|
|
|
- } finally {
|
|
|
- if (in != null) {
|
|
|
- try {
|
|
|
- in.close();
|
|
|
- } catch (IOException e) {
|
|
|
- logError("Unable to close http input steam : spec=" + spec, e);
|
|
|
- }
|
|
|
- }
|
|
|
+ logException(e);
|
|
|
}
|
|
|
-
|
|
|
- return true;
|
|
|
+ return resource;
|
|
|
}
|
|
|
|
|
|
private String getPort(String clusterName, String componentName) throws SystemException {
|
|
@@ -303,14 +391,35 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
return null;
|
|
|
}
|
|
|
|
|
|
- private static void logError(String error, IOException e) {
|
|
|
- if (LOG.isErrorEnabled()) {
|
|
|
- if (e == null) {
|
|
|
- LOG.error("Caught exception getting JMX metrics : spec=" + error);
|
|
|
- } else {
|
|
|
- LOG.error("Caught exception getting JMX metrics : spec=" + error);
|
|
|
- LOG.debug("" + e);
|
|
|
- }
|
|
|
+ /**
|
|
|
+ * Log an error for the given exception.
|
|
|
+ *
|
|
|
+ * @param throwable the caught exception
|
|
|
+ *
|
|
|
+ * @return the error message that was logged
|
|
|
+ */
|
|
|
+ private static String logException(Throwable throwable) {
|
|
|
+ String msg = "Caught exception getting JMX metrics : " + throwable.getLocalizedMessage();
|
|
|
+
|
|
|
+ LOG.error(msg);
|
|
|
+ LOG.debug(msg, throwable);
|
|
|
+
|
|
|
+ return msg;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Rethrow the given exception as a System exception and log the message.
|
|
|
+ *
|
|
|
+ * @param throwable the caught exception
|
|
|
+ *
|
|
|
+ * @throws SystemException always around the given exception
|
|
|
+ */
|
|
|
+ private static void rethrowSystemException(Throwable throwable) throws SystemException {
|
|
|
+ String msg = logException(throwable);
|
|
|
+
|
|
|
+ if (throwable instanceof SystemException) {
|
|
|
+ throw (SystemException) throwable;
|
|
|
}
|
|
|
+ throw new SystemException (msg, throwable);
|
|
|
}
|
|
|
}
|