|
@@ -62,7 +62,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
private static final String NAME_KEY = "name";
|
|
|
private static final String PORT_KEY = "tag.port";
|
|
|
private static final String DOT_REPLACEMENT_CHAR = "#";
|
|
|
- private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 10000L;
|
|
|
+ private static final long DEFAULT_POPULATE_TIMEOUT_MILLIS = 12000L;
|
|
|
|
|
|
public static final String TIMED_OUT_MSG = "Timed out waiting for JMX metrics.";
|
|
|
public static final String STORM_REST_API = "STORM_REST_API";
|
|
@@ -188,13 +188,16 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
public Set<Resource> populateResources(Set<Resource> resources, Request request, Predicate predicate)
|
|
|
throws SystemException {
|
|
|
|
|
|
+ // Get a valid ticket for the request.
|
|
|
+ Ticket ticket = new Ticket();
|
|
|
+
|
|
|
CompletionService<Resource> completionService =
|
|
|
new ExecutorCompletionService<Resource>(EXECUTOR_SERVICE);
|
|
|
|
|
|
// In a large cluster we could have thousands of resources to populate here.
|
|
|
// Distribute the work across multiple threads.
|
|
|
for (Resource resource : resources) {
|
|
|
- completionService.submit(getPopulateResourceCallable(resource, request, predicate));
|
|
|
+ completionService.submit(getPopulateResourceCallable(resource, request, predicate, ticket));
|
|
|
}
|
|
|
|
|
|
Set<Resource> keepers = new HashSet<Resource>();
|
|
@@ -205,7 +208,8 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
|
|
|
if (resourceFuture == null) {
|
|
|
// its been more than the populateTimeout since the last callable completed ...
|
|
|
- // don't wait any longer
|
|
|
+ // invalidate the ticket to abort the threads and don't wait any longer
|
|
|
+ ticket.invalidate();
|
|
|
LOG.error(TIMED_OUT_MSG);
|
|
|
break;
|
|
|
} else {
|
|
@@ -239,7 +243,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
/**
|
|
|
* Get the spec to locate the JMX stream from the given host and port
|
|
|
*
|
|
|
- ** @param protocol the protocol, one of http or https
|
|
|
+ * @param protocol the protocol, one of http or https
|
|
|
* @param hostName the host name
|
|
|
* @param port the port
|
|
|
*
|
|
@@ -271,14 +275,15 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
* @param resource the resource to be populated
|
|
|
* @param request the request
|
|
|
* @param predicate the predicate
|
|
|
+ * @param ticket a valid ticket
|
|
|
*
|
|
|
* @return a callable that can be used to populate the given resource
|
|
|
*/
|
|
|
private Callable<Resource> getPopulateResourceCallable(
|
|
|
- final Resource resource, final Request request, final Predicate predicate) {
|
|
|
+ final Resource resource, final Request request, final Predicate predicate, final Ticket ticket) {
|
|
|
return new Callable<Resource>() {
|
|
|
public Resource call() throws SystemException {
|
|
|
- return populateResource(resource, request, predicate);
|
|
|
+ return populateResource(resource, request, predicate, ticket);
|
|
|
}
|
|
|
};
|
|
|
}
|
|
@@ -289,10 +294,11 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
* @param resource the resource to be populated
|
|
|
* @param request the request
|
|
|
* @param predicate the predicate
|
|
|
+ * @param ticket a valid ticket
|
|
|
*
|
|
|
* @return the populated resource; null if the resource should NOT be part of the result set for the given predicate
|
|
|
*/
|
|
|
- private Resource populateResource(Resource resource, Request request, Predicate predicate)
|
|
|
+ private Resource populateResource(Resource resource, Request request, Predicate predicate, Ticket ticket)
|
|
|
throws SystemException {
|
|
|
|
|
|
Set<String> ids = getRequestPropertyIds(request, predicate);
|
|
@@ -349,10 +355,14 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
for (String hostName : hostNames) {
|
|
|
try {
|
|
|
in = streamProvider.readFrom(getSpec(protocol, hostName, port, componentName));
|
|
|
+ // if the ticket becomes invalid (timeout) then bail out
|
|
|
+ if (!ticket.isValid()) {
|
|
|
+ return resource;
|
|
|
+ }
|
|
|
if (null == componentName || !componentName.equals(STORM_REST_API)) {
|
|
|
- getHadoopMetricValue(in, ids, resource, request);
|
|
|
+ getHadoopMetricValue(in, ids, resource, request, ticket);
|
|
|
} else {
|
|
|
- getStormMetricValue(in, ids, resource);
|
|
|
+ getStormMetricValue(in, ids, resource, ticket);
|
|
|
}
|
|
|
} catch (IOException e) {
|
|
|
logException(e);
|
|
@@ -373,7 +383,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
* Hadoop-specific metrics fetching
|
|
|
*/
|
|
|
private void getHadoopMetricValue(InputStream in, Set<String> ids,
|
|
|
- Resource resource, Request request) throws IOException {
|
|
|
+ Resource resource, Request request, Ticket ticket) throws IOException {
|
|
|
JMXMetricHolder metricHolder = jmxObjectReader.readValue(in);
|
|
|
|
|
|
Map<String, Map<String, Object>> categories = new HashMap<String, Map<String, Object>>();
|
|
@@ -442,11 +452,17 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
}
|
|
|
// We need to do the final filtering here, after the argument substitution
|
|
|
if (isRequestedPropertyId(newPropertyId, requestedPropertyId, request)) {
|
|
|
+ if (!ticket.isValid()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
setResourceValue(resource, categories, newPropertyId, jmxCat, property, keyList);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
} else {
|
|
|
+ if (!ticket.isValid()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
setResourceValue(resource, categories, propertyId, category, property, keyList);
|
|
|
}
|
|
|
}
|
|
@@ -459,7 +475,7 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
* Storm-specific metrics fetching
|
|
|
*/
|
|
|
private void getStormMetricValue(InputStream in, Set<String> ids,
|
|
|
- Resource resource) throws IOException {
|
|
|
+ Resource resource, Ticket ticket) throws IOException {
|
|
|
HashMap<String, Object> metricHolder = stormObjectReader.readValue(in);
|
|
|
for (String category : ids) {
|
|
|
Map<String, PropertyInfo> defProps = getComponentMetrics().get(STORM_REST_API);
|
|
@@ -469,6 +485,9 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
String propName = propInfo.getPropertyId();
|
|
|
Object propertyValue = metricHolder.get(propName);
|
|
|
String absId = PropertyHelper.getPropertyId(category, propName);
|
|
|
+ if (!ticket.isValid()) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
// TODO: Maybe cast to int
|
|
|
resource.setProperty(absId, propertyValue);
|
|
|
}
|
|
@@ -509,12 +528,6 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
return jmxHostProvider.getJMXProtocol(clusterName, componentName);
|
|
|
}
|
|
|
|
|
|
- private String getHost(Resource resource, String clusterName, String componentName) throws SystemException {
|
|
|
- return hostNamePropertyId == null ?
|
|
|
- jmxHostProvider.getHostName(clusterName, componentName) :
|
|
|
- (String) resource.getPropertyValue(hostNamePropertyId);
|
|
|
- }
|
|
|
-
|
|
|
private Set<String> getHosts(Resource resource, String clusterName, String componentName) {
|
|
|
return hostNamePropertyId == null ?
|
|
|
jmxHostProvider.getHostNames(clusterName, componentName) :
|
|
@@ -570,6 +583,36 @@ public class JMXPropertyProvider extends AbstractPropertyProvider {
|
|
|
throw (SystemException) throwable;
|
|
|
}
|
|
|
throw new SystemException (msg, throwable);
|
|
|
- }
|
|
|
-
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // ----- inner class : Ticket ----------------------------------------------
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Ticket used to cancel provider threads. The provider threads should
|
|
|
+ * monitor the validity of the passed in ticket and bail out if it becomes
|
|
|
+ * invalid (as in a timeout).
|
|
|
+ */
|
|
|
+ private static class Ticket {
|
|
|
+ /**
|
|
|
+ * Indicate whether or not the ticket is valid.
|
|
|
+ */
|
|
|
+ private volatile boolean valid = true;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Invalidate the ticket.
|
|
|
+ */
|
|
|
+ public void invalidate() {
|
|
|
+ valid = false;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Determine whether or not this ticket is valid.
|
|
|
+ *
|
|
|
+ * @return true if the ticket is valid
|
|
|
+ */
|
|
|
+ public boolean isValid() {
|
|
|
+ return valid;
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|