|
@@ -17,16 +17,36 @@
|
|
|
*/
|
|
|
package org.apache.ambari.server.configuration;
|
|
|
|
|
|
-import com.google.common.collect.Multimap;
|
|
|
-import com.google.common.collect.Sets;
|
|
|
-import com.google.gson.Gson;
|
|
|
-import com.google.gson.GsonBuilder;
|
|
|
-import com.google.gson.JsonElement;
|
|
|
-import com.google.gson.JsonObject;
|
|
|
-import com.google.gson.JsonParser;
|
|
|
-import com.google.gson.JsonPrimitive;
|
|
|
-import com.google.inject.Inject;
|
|
|
-import com.google.inject.Singleton;
|
|
|
+import java.io.File;
|
|
|
+import java.io.FileInputStream;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.io.FileReader;
|
|
|
+import java.io.FileWriter;
|
|
|
+import java.io.IOException;
|
|
|
+import java.io.InputStream;
|
|
|
+import java.io.Writer;
|
|
|
+import java.lang.annotation.ElementType;
|
|
|
+import java.lang.annotation.Retention;
|
|
|
+import java.lang.annotation.RetentionPolicy;
|
|
|
+import java.lang.annotation.Target;
|
|
|
+import java.lang.reflect.Field;
|
|
|
+import java.security.cert.CertificateException;
|
|
|
+import java.security.interfaces.RSAPublicKey;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.EnumSet;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Map.Entry;
|
|
|
+import java.util.Properties;
|
|
|
+import java.util.Set;
|
|
|
+import java.util.SortedMap;
|
|
|
+import java.util.TreeMap;
|
|
|
+import java.util.concurrent.BlockingQueue;
|
|
|
+import java.util.concurrent.Callable;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
import org.apache.ambari.annotations.Experimental;
|
|
|
import org.apache.ambari.annotations.ExperimentalFeature;
|
|
|
import org.apache.ambari.annotations.Markdown;
|
|
@@ -35,6 +55,7 @@ import org.apache.ambari.server.actionmanager.CommandExecutionType;
|
|
|
import org.apache.ambari.server.actionmanager.HostRoleCommand;
|
|
|
import org.apache.ambari.server.actionmanager.Stage;
|
|
|
import org.apache.ambari.server.controller.spi.PropertyProvider;
|
|
|
+import org.apache.ambari.server.controller.utilities.ScalingThreadPoolExecutor;
|
|
|
import org.apache.ambari.server.events.listeners.alerts.AlertReceivedListener;
|
|
|
import org.apache.ambari.server.orm.JPATableGenerationStrategy;
|
|
|
import org.apache.ambari.server.orm.PersistenceType;
|
|
@@ -70,34 +91,16 @@ import org.apache.commons.lang.math.NumberUtils;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
|
|
|
-import java.io.File;
|
|
|
-import java.io.FileInputStream;
|
|
|
-import java.io.FileNotFoundException;
|
|
|
-import java.io.FileReader;
|
|
|
-import java.io.FileWriter;
|
|
|
-import java.io.IOException;
|
|
|
-import java.io.InputStream;
|
|
|
-import java.io.Writer;
|
|
|
-import java.lang.annotation.ElementType;
|
|
|
-import java.lang.annotation.Retention;
|
|
|
-import java.lang.annotation.RetentionPolicy;
|
|
|
-import java.lang.annotation.Target;
|
|
|
-import java.lang.reflect.Field;
|
|
|
-import java.security.cert.CertificateException;
|
|
|
-import java.security.interfaces.RSAPublicKey;
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.EnumSet;
|
|
|
-import java.util.HashMap;
|
|
|
-import java.util.List;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Map.Entry;
|
|
|
-import java.util.Properties;
|
|
|
-import java.util.Set;
|
|
|
-import java.util.SortedMap;
|
|
|
-import java.util.TreeMap;
|
|
|
-import java.util.concurrent.Callable;
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
+import com.google.common.collect.Multimap;
|
|
|
+import com.google.common.collect.Sets;
|
|
|
+import com.google.gson.Gson;
|
|
|
+import com.google.gson.GsonBuilder;
|
|
|
+import com.google.gson.JsonElement;
|
|
|
+import com.google.gson.JsonObject;
|
|
|
+import com.google.gson.JsonParser;
|
|
|
+import com.google.gson.JsonPrimitive;
|
|
|
+import com.google.inject.Inject;
|
|
|
+import com.google.inject.Singleton;
|
|
|
|
|
|
/**
|
|
|
* The {@link Configuration} class is used to read from the
|
|
@@ -2206,7 +2209,7 @@ public class Configuration {
|
|
|
"alerts.template.file", null);
|
|
|
|
|
|
/**
|
|
|
- * The maximum number of threads which will handle published alert events.
|
|
|
+ * The core number of threads which will handle published alert events.
|
|
|
*/
|
|
|
@ConfigurationMarkdown(
|
|
|
group = ConfigurationGrouping.ALERTS,
|
|
@@ -2215,10 +2218,41 @@ public class Configuration {
|
|
|
@ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2"),
|
|
|
@ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4"),
|
|
|
@ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "4") },
|
|
|
+ markdown = @Markdown(
|
|
|
+ description = "The core number of threads used to process incoming alert events. The value should be increased as the size of the cluster increases."))
|
|
|
+ public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE = new ConfigurationProperty<>(
|
|
|
+ "alerts.execution.scheduler.threadpool.size.core", 2);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The maximum number of threads which will handle published alert events.
|
|
|
+ */
|
|
|
+ @ConfigurationMarkdown(
|
|
|
+ group = ConfigurationGrouping.ALERTS,
|
|
|
+ scaleValues = {
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "2"),
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2"),
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "8"),
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "8") },
|
|
|
markdown = @Markdown(
|
|
|
description = "The number of threads used to handle alerts received from the Ambari Agents. The value should be increased as the size of the cluster increases."))
|
|
|
- public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS = new ConfigurationProperty<>(
|
|
|
- "alerts.execution.scheduler.maxThreads", 2);
|
|
|
+ public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE = new ConfigurationProperty<>(
|
|
|
+ "alerts.execution.scheduler.threadpool.size.max", 2);
|
|
|
+
|
|
|
+ /**
|
|
|
+ * The size of the {@link BlockingQueue} used to control the
|
|
|
+ * {@link ScalingThreadPoolExecutor} when handling incoming alert events.
|
|
|
+ */
|
|
|
+ @ConfigurationMarkdown(
|
|
|
+ group = ConfigurationGrouping.ALERTS,
|
|
|
+ scaleValues = {
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_10, value = "400"),
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_50, value = "2000"),
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_100, value = "4000"),
|
|
|
+ @ClusterScale(clusterSize = ClusterSizeType.HOSTS_500, value = "20000") },
|
|
|
+ markdown = @Markdown(
|
|
|
+ description = "The number of queued alerts allowed before discarding old alerts which have not been handled. The value should be increased as the size of the cluster increases."))
|
|
|
+ public static final ConfigurationProperty<Integer> ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE = new ConfigurationProperty<>(
|
|
|
+ "alerts.execution.scheduler.threadpool.worker.size", 2000);
|
|
|
|
|
|
/**
|
|
|
* If {@code true} then alert information is cached and not immediately
|
|
@@ -4510,11 +4544,25 @@ public class Configuration {
|
|
|
return StringUtils.strip(getProperty(ALERT_TEMPLATE_FILE));
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * @return core thread pool size for AlertEventPublisher, default 2
|
|
|
+ */
|
|
|
+ public int getAlertEventPublisherCorePoolSize() {
|
|
|
+ return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_CORE_SIZE));
|
|
|
+ }
|
|
|
+
|
|
|
/**
|
|
|
* @return max thread pool size for AlertEventPublisher, default 2
|
|
|
*/
|
|
|
- public int getAlertEventPublisherPoolSize() {
|
|
|
- return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS));
|
|
|
+ public int getAlertEventPublisherMaxPoolSize() {
|
|
|
+ return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_THREADS_MAX_SIZE));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return the size of the queue for unhandled alert events
|
|
|
+ */
|
|
|
+ public int getAlertEventPublisherWorkerQueueSize() {
|
|
|
+ return Integer.parseInt(getProperty(ALERTS_EXECUTION_SCHEDULER_WORKER_QUEUE_SIZE));
|
|
|
}
|
|
|
|
|
|
/**
|