|
@@ -45,15 +45,14 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
|
|
|
|
public static final String COLLECTOR_HOST = "host";
|
|
public static final String COLLECTOR_HOST = "host";
|
|
public static final String COLLECTOR_PORT = "port";
|
|
public static final String COLLECTOR_PORT = "port";
|
|
-
|
|
|
|
public static final String METRICS_COLLECTOR = "metrics_collector";
|
|
public static final String METRICS_COLLECTOR = "metrics_collector";
|
|
-
|
|
|
|
- public static final String APP_ID = "nimbus";
|
|
|
|
|
|
+ public static final String APP_ID = "appId";
|
|
|
|
|
|
private String hostname;
|
|
private String hostname;
|
|
private SocketAddress socketAddress;
|
|
private SocketAddress socketAddress;
|
|
private String collectorUri;
|
|
private String collectorUri;
|
|
private NimbusClient nimbusClient;
|
|
private NimbusClient nimbusClient;
|
|
|
|
+ private String applicationId;
|
|
|
|
|
|
public StormTimelineMetricsReporter() {
|
|
public StormTimelineMetricsReporter() {
|
|
|
|
|
|
@@ -85,6 +84,7 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
|
|
this.nimbusClient = NimbusClient.getConfiguredClient(stormConf);
|
|
String collectorHostname = cf.get(COLLECTOR_HOST).toString();
|
|
String collectorHostname = cf.get(COLLECTOR_HOST).toString();
|
|
String port = cf.get(COLLECTOR_PORT).toString();
|
|
String port = cf.get(COLLECTOR_PORT).toString();
|
|
|
|
+ applicationId = cf.get(APP_ID).toString();
|
|
collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
|
|
collectorUri = "http://" + collectorHostname + ":" + port + "/ws/v1/timeline/metrics";
|
|
List<InetSocketAddress> socketAddresses =
|
|
List<InetSocketAddress> socketAddresses =
|
|
Servers.parse(collectorHostname, Integer.valueOf(port));
|
|
Servers.parse(collectorHostname, Integer.valueOf(port));
|
|
@@ -92,7 +92,8 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
socketAddress = socketAddresses.get(0);
|
|
socketAddress = socketAddresses.get(0);
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
- LOG.warn("could not initialize metrics collector, please specify host, port under $STORM_HOME/conf/config.yaml ", e);
|
|
|
|
|
|
+ LOG.warn("Could not initialize metrics collector, please specify host, " +
|
|
|
|
+ "port under $STORM_HOME/conf/config.yaml ", e);
|
|
}
|
|
}
|
|
|
|
|
|
}
|
|
}
|
|
@@ -103,9 +104,9 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
|
|
ClusterSummary cs = this.nimbusClient.getClient().getClusterInfo();
|
|
long currentTimeMillis = System.currentTimeMillis();
|
|
long currentTimeMillis = System.currentTimeMillis();
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
- APP_ID, "Supervisors", String.valueOf(cs.get_supervisors_size())));
|
|
|
|
|
|
+ applicationId, "Supervisors", String.valueOf(cs.get_supervisors_size())));
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
- APP_ID, "Topologies", String.valueOf(cs.get_topologies_size())));
|
|
|
|
|
|
+ applicationId, "Topologies", String.valueOf(cs.get_topologies_size())));
|
|
|
|
|
|
List<SupervisorSummary> sups = cs.get_supervisors();
|
|
List<SupervisorSummary> sups = cs.get_supervisors();
|
|
int totalSlots = 0;
|
|
int totalSlots = 0;
|
|
@@ -117,11 +118,11 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
int freeSlots = totalSlots - usedSlots;
|
|
int freeSlots = totalSlots - usedSlots;
|
|
|
|
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
- APP_ID, "Total Slots", String.valueOf(totalSlots)));
|
|
|
|
|
|
+ applicationId, "Total Slots", String.valueOf(totalSlots)));
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
- APP_ID, "Used Slots", String.valueOf(usedSlots)));
|
|
|
|
|
|
+ applicationId, "Used Slots", String.valueOf(usedSlots)));
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
- APP_ID, "Free Slots", String.valueOf(freeSlots)));
|
|
|
|
|
|
+ applicationId, "Free Slots", String.valueOf(freeSlots)));
|
|
|
|
|
|
List<TopologySummary> topos = cs.get_topologies();
|
|
List<TopologySummary> topos = cs.get_topologies();
|
|
int totalExecutors = 0;
|
|
int totalExecutors = 0;
|
|
@@ -132,9 +133,9 @@ public class StormTimelineMetricsReporter extends AbstractTimelineMetricsSink
|
|
}
|
|
}
|
|
|
|
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
- APP_ID, "Total Executors", String.valueOf(totalExecutors)));
|
|
|
|
|
|
+ applicationId, "Total Executors", String.valueOf(totalExecutors)));
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
totalMetrics.add(createTimelineMetric(currentTimeMillis,
|
|
- APP_ID, "Total Tasks", String.valueOf(totalTasks)));
|
|
|
|
|
|
+ applicationId, "Total Tasks", String.valueOf(totalTasks)));
|
|
|
|
|
|
TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
TimelineMetrics timelineMetrics = new TimelineMetrics();
|
|
timelineMetrics.setMetrics(totalMetrics);
|
|
timelineMetrics.setMetrics(totalMetrics);
|