|
@@ -26,9 +26,18 @@ import java.io.IOException;
|
|
|
import java.net.InetSocketAddress;
|
|
|
import java.util.Enumeration;
|
|
|
import java.util.Objects;
|
|
|
+import java.util.Optional;
|
|
|
import java.util.Properties;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
import java.util.concurrent.ConcurrentMap;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.LinkedBlockingQueue;
|
|
|
+import java.util.concurrent.RejectedExecutionException;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicInteger;
|
|
|
import java.util.function.BiConsumer;
|
|
|
import javax.servlet.ServletException;
|
|
|
import javax.servlet.http.HttpServletRequest;
|
|
@@ -40,6 +49,7 @@ import org.apache.zookeeper.metrics.MetricsProvider;
|
|
|
import org.apache.zookeeper.metrics.MetricsProviderLifeCycleException;
|
|
|
import org.apache.zookeeper.metrics.Summary;
|
|
|
import org.apache.zookeeper.metrics.SummarySet;
|
|
|
+import org.apache.zookeeper.server.RateLogger;
|
|
|
import org.eclipse.jetty.server.Server;
|
|
|
import org.eclipse.jetty.servlet.ServletContextHandler;
|
|
|
import org.eclipse.jetty.servlet.ServletHolder;
|
|
@@ -56,6 +66,26 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
private static final Logger LOG = LoggerFactory.getLogger(PrometheusMetricsProvider.class);
|
|
|
private static final String LABEL = "key";
|
|
|
private static final String[] LABELS = {LABEL};
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Number of worker threads for reporting Prometheus summary metrics.
|
|
|
+ * Default value is 1.
|
|
|
+ * If the number is less than 1, the main thread will be used.
|
|
|
+ */
|
|
|
+ static final String NUM_WORKER_THREADS = "numWorkerThreads";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The max queue size for Prometheus summary metrics reporting task.
|
|
|
+ * Default value is 1000000.
|
|
|
+ */
|
|
|
+ static final String MAX_QUEUE_SIZE = "maxQueueSize";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The timeout in ms for Prometheus worker threads shutdown.
|
|
|
+ * Default value is 1000ms.
|
|
|
+ */
|
|
|
+ static final String WORKER_SHUTDOWN_TIMEOUT_MS = "workerShutdownTimeoutMs";
|
|
|
+
|
|
|
/**
|
|
|
* We are using the 'defaultRegistry'.
|
|
|
* <p>
|
|
@@ -64,12 +94,17 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
* </p>
|
|
|
*/
|
|
|
private final CollectorRegistry collectorRegistry = CollectorRegistry.defaultRegistry;
|
|
|
+ private final RateLogger rateLogger = new RateLogger(LOG, 60 * 1000);
|
|
|
private String host = "0.0.0.0";
|
|
|
private int port = 7000;
|
|
|
private boolean exportJvmInfo = true;
|
|
|
private Server server;
|
|
|
private final MetricsServletImpl servlet = new MetricsServletImpl();
|
|
|
private final Context rootContext = new Context();
|
|
|
+ private int numWorkerThreads = 1;
|
|
|
+ private int maxQueueSize = 1000000;
|
|
|
+ private long workerShutdownTimeoutMs = 1000;
|
|
|
+ private Optional<ExecutorService> executorOptional = Optional.empty();
|
|
|
|
|
|
@Override
|
|
|
public void configure(Properties configuration) throws MetricsProviderLifeCycleException {
|
|
@@ -77,10 +112,17 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
this.host = configuration.getProperty("httpHost", "0.0.0.0");
|
|
|
this.port = Integer.parseInt(configuration.getProperty("httpPort", "7000"));
|
|
|
this.exportJvmInfo = Boolean.parseBoolean(configuration.getProperty("exportJvmInfo", "true"));
|
|
|
+ this.numWorkerThreads = Integer.parseInt(
|
|
|
+ configuration.getProperty(NUM_WORKER_THREADS, "1"));
|
|
|
+ this.maxQueueSize = Integer.parseInt(
|
|
|
+ configuration.getProperty(MAX_QUEUE_SIZE, "1000000"));
|
|
|
+ this.workerShutdownTimeoutMs = Long.parseLong(
|
|
|
+ configuration.getProperty(WORKER_SHUTDOWN_TIMEOUT_MS, "1000"));
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void start() throws MetricsProviderLifeCycleException {
|
|
|
+ this.executorOptional = createExecutor();
|
|
|
try {
|
|
|
LOG.info("Starting /metrics HTTP endpoint at host: {}, port: {}, exportJvmInfo: {}",
|
|
|
host, port, exportJvmInfo);
|
|
@@ -120,6 +162,7 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
|
|
|
@Override
|
|
|
public void stop() {
|
|
|
+ shutdownExecutor();
|
|
|
if (server != null) {
|
|
|
try {
|
|
|
server.stop();
|
|
@@ -331,7 +374,7 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
|
|
|
}
|
|
|
|
|
|
- private class PrometheusSummary implements Summary {
|
|
|
+ class PrometheusSummary implements Summary {
|
|
|
|
|
|
private final io.prometheus.client.Summary inner;
|
|
|
private final String name;
|
|
@@ -355,16 +398,19 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
|
|
|
@Override
|
|
|
public void add(long delta) {
|
|
|
+ reportMetrics(() -> observe(delta));
|
|
|
+ }
|
|
|
+
|
|
|
+ void observe(final long delta) {
|
|
|
try {
|
|
|
inner.observe(delta);
|
|
|
- } catch (IllegalArgumentException err) {
|
|
|
+ } catch (final IllegalArgumentException err) {
|
|
|
LOG.error("invalid delta {} for metric {}", delta, name, err);
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
|
|
|
- private class PrometheusLabelledSummary implements SummarySet {
|
|
|
+ class PrometheusLabelledSummary implements SummarySet {
|
|
|
|
|
|
private final io.prometheus.client.Summary inner;
|
|
|
private final String name;
|
|
@@ -390,9 +436,13 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
|
|
|
@Override
|
|
|
public void add(String key, long value) {
|
|
|
+ reportMetrics(() -> observe(key, value));
|
|
|
+ }
|
|
|
+
|
|
|
+ void observe(final String key, final long value) {
|
|
|
try {
|
|
|
inner.labels(key).observe(value);
|
|
|
- } catch (IllegalArgumentException err) {
|
|
|
+ } catch (final IllegalArgumentException err) {
|
|
|
LOG.error("invalid value {} for metric {} with key {}", value, name, key, err);
|
|
|
}
|
|
|
}
|
|
@@ -410,4 +460,65 @@ public class PrometheusMetricsProvider implements MetricsProvider {
|
|
|
super.doGet(req, resp);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ private Optional<ExecutorService> createExecutor() {
|
|
|
+ if (numWorkerThreads < 1) {
|
|
|
+ LOG.info("Executor service was not created as numWorkerThreads {} is less than 1", numWorkerThreads);
|
|
|
+ return Optional.empty();
|
|
|
+ }
|
|
|
+
|
|
|
+ final BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(maxQueueSize);
|
|
|
+ final ThreadPoolExecutor executor = new ThreadPoolExecutor(numWorkerThreads,
|
|
|
+ numWorkerThreads,
|
|
|
+ 0L,
|
|
|
+ TimeUnit.MILLISECONDS,
|
|
|
+ queue, new PrometheusWorkerThreadFactory());
|
|
|
+ LOG.info("Executor service was created with numWorkerThreads {} and maxQueueSize {}",
|
|
|
+ numWorkerThreads,
|
|
|
+ maxQueueSize);
|
|
|
+ return Optional.of(executor);
|
|
|
+ }
|
|
|
+
|
|
|
+ private void shutdownExecutor() {
|
|
|
+ if (executorOptional.isPresent()) {
|
|
|
+ LOG.info("Shutdown executor service with timeout {}", workerShutdownTimeoutMs);
|
|
|
+ final ExecutorService executor = executorOptional.get();
|
|
|
+ executor.shutdown();
|
|
|
+ try {
|
|
|
+ if (!executor.awaitTermination(workerShutdownTimeoutMs, TimeUnit.MILLISECONDS)) {
|
|
|
+ LOG.error("Not all the Prometheus worker threads terminated properly after {} timeout",
|
|
|
+ workerShutdownTimeoutMs);
|
|
|
+ executor.shutdownNow();
|
|
|
+ }
|
|
|
+ } catch (final Exception e) {
|
|
|
+ LOG.error("Error occurred while terminating Prometheus worker threads", e);
|
|
|
+ executor.shutdownNow();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private static class PrometheusWorkerThreadFactory implements ThreadFactory {
|
|
|
+ private static final AtomicInteger workerCounter = new AtomicInteger(1);
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Thread newThread(final Runnable runnable) {
|
|
|
+ final String threadName = "PrometheusMetricsProviderWorker-" + workerCounter.getAndIncrement();
|
|
|
+ final Thread thread = new Thread(runnable, threadName);
|
|
|
+ thread.setDaemon(true);
|
|
|
+ return thread;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void reportMetrics(final Runnable task) {
|
|
|
+ if (executorOptional.isPresent()) {
|
|
|
+ try {
|
|
|
+ executorOptional.get().submit(task);
|
|
|
+ } catch (final RejectedExecutionException e) {
|
|
|
+ rateLogger.rateLimitLog("Prometheus metrics reporting task queue size exceeded the max",
|
|
|
+ String.valueOf(maxQueueSize));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ task.run();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|