|
@@ -27,17 +27,21 @@ import java.util.Map;
|
|
import java.util.Timer;
|
|
import java.util.Timer;
|
|
import java.util.TimerTask;
|
|
import java.util.TimerTask;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
import java.util.concurrent.atomic.AtomicLong;
|
|
|
|
+import java.util.concurrent.atomic.AtomicLongArray;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
import java.util.concurrent.atomic.AtomicReference;
|
|
|
|
|
|
-import org.apache.commons.logging.Log;
|
|
|
|
-import org.apache.commons.logging.LogFactory;
|
|
|
|
|
|
+import com.google.common.util.concurrent.AtomicDoubleArray;
|
|
|
|
+import org.apache.commons.lang.exception.ExceptionUtils;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.conf.Configuration;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
import org.apache.hadoop.metrics2.util.MBeans;
|
|
|
|
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
import org.codehaus.jackson.map.ObjectMapper;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
+import org.slf4j.Logger;
|
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
|
|
|
/**
|
|
/**
|
|
* The decay RPC scheduler counts incoming requests in a map, then
|
|
* The decay RPC scheduler counts incoming requests in a map, then
|
|
@@ -49,22 +53,28 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
/**
|
|
/**
|
|
* Period controls how many milliseconds between each decay sweep.
|
|
* Period controls how many milliseconds between each decay sweep.
|
|
*/
|
|
*/
|
|
- public static final String IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY =
|
|
|
|
|
|
+ public static final String IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY =
|
|
|
|
+ "decay-scheduler.period-ms";
|
|
|
|
+ public static final long IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT =
|
|
|
|
+ 5000;
|
|
|
|
+ @Deprecated
|
|
|
|
+ public static final String IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY =
|
|
"faircallqueue.decay-scheduler.period-ms";
|
|
"faircallqueue.decay-scheduler.period-ms";
|
|
- public static final long IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT =
|
|
|
|
- 5000L;
|
|
|
|
|
|
|
|
/**
|
|
/**
|
|
* Decay factor controls how much each count is suppressed by on each sweep.
|
|
* Decay factor controls how much each count is suppressed by on each sweep.
|
|
* Valid numbers are > 0 and < 1. Decay factor works in tandem with period
|
|
* Valid numbers are > 0 and < 1. Decay factor works in tandem with period
|
|
* to control how long the scheduler remembers an identity.
|
|
* to control how long the scheduler remembers an identity.
|
|
*/
|
|
*/
|
|
- public static final String IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY =
|
|
|
|
|
|
+ public static final String IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY =
|
|
|
|
+ "decay-scheduler.decay-factor";
|
|
|
|
+ public static final double IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT =
|
|
|
|
+ 0.5;
|
|
|
|
+ @Deprecated
|
|
|
|
+ public static final String IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY =
|
|
"faircallqueue.decay-scheduler.decay-factor";
|
|
"faircallqueue.decay-scheduler.decay-factor";
|
|
- public static final double IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT =
|
|
|
|
- 0.5;
|
|
|
|
|
|
|
|
- /**
|
|
|
|
|
|
+ /**
|
|
* Thresholds are specified as integer percentages, and specify which usage
|
|
* Thresholds are specified as integer percentages, and specify which usage
|
|
* range each queue will be allocated to. For instance, specifying the list
|
|
* range each queue will be allocated to. For instance, specifying the list
|
|
* 10, 40, 80
|
|
* 10, 40, 80
|
|
@@ -74,15 +84,31 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
* - q1 from 10 up to 40
|
|
* - q1 from 10 up to 40
|
|
* - q0 otherwise.
|
|
* - q0 otherwise.
|
|
*/
|
|
*/
|
|
- public static final String IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY =
|
|
|
|
- "faircallqueue.decay-scheduler.thresholds";
|
|
|
|
|
|
+ public static final String IPC_DECAYSCHEDULER_THRESHOLDS_KEY =
|
|
|
|
+ "decay-scheduler.thresholds";
|
|
|
|
+ @Deprecated
|
|
|
|
+ public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY =
|
|
|
|
+ "faircallqueue.decay-scheduler.thresholds";
|
|
|
|
|
|
// Specifies the identity to use when the IdentityProvider cannot handle
|
|
// Specifies the identity to use when the IdentityProvider cannot handle
|
|
// a schedulable.
|
|
// a schedulable.
|
|
public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY =
|
|
public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY =
|
|
- "IdentityProvider.Unknown";
|
|
|
|
|
|
+ "IdentityProvider.Unknown";
|
|
|
|
+
|
|
|
|
+ public static final String
|
|
|
|
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY =
|
|
|
|
+ "decay-scheduler.backoff.responsetime.enable";
|
|
|
|
+ public static final Boolean
|
|
|
|
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT = false;
|
|
|
|
+
|
|
|
|
+ // Specifies the average response time (ms) thresholds of each
|
|
|
|
+ // level to trigger backoff
|
|
|
|
+ public static final String
|
|
|
|
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY =
|
|
|
|
+ "decay-scheduler.backoff.responsetime.thresholds";
|
|
|
|
|
|
- public static final Log LOG = LogFactory.getLog(DecayRpcScheduler.class);
|
|
|
|
|
|
+ public static final Logger LOG =
|
|
|
|
+ LoggerFactory.getLogger(DecayRpcScheduler.class);
|
|
|
|
|
|
// Track the number of calls for each schedulable identity
|
|
// Track the number of calls for each schedulable identity
|
|
private final ConcurrentHashMap<Object, AtomicLong> callCounts =
|
|
private final ConcurrentHashMap<Object, AtomicLong> callCounts =
|
|
@@ -91,6 +117,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
// Should be the sum of all AtomicLongs in callCounts
|
|
// Should be the sum of all AtomicLongs in callCounts
|
|
private final AtomicLong totalCalls = new AtomicLong();
|
|
private final AtomicLong totalCalls = new AtomicLong();
|
|
|
|
|
|
|
|
+ // Track total call count and response time in current decay window
|
|
|
|
+ private final AtomicLongArray responseTimeCountInCurrWindow;
|
|
|
|
+ private final AtomicLongArray responseTimeTotalInCurrWindow;
|
|
|
|
+
|
|
|
|
+ // Track average response time in previous decay window
|
|
|
|
+ private final AtomicDoubleArray responseTimeAvgInLastWindow;
|
|
|
|
+ private final AtomicLongArray responseTimeCountInLastWindow;
|
|
|
|
+
|
|
// Pre-computed scheduling decisions during the decay sweep are
|
|
// Pre-computed scheduling decisions during the decay sweep are
|
|
// atomically swapped in as a read-only map
|
|
// atomically swapped in as a read-only map
|
|
private final AtomicReference<Map<Object, Integer>> scheduleCacheRef =
|
|
private final AtomicReference<Map<Object, Integer>> scheduleCacheRef =
|
|
@@ -98,10 +132,12 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
|
|
|
|
// Tune the behavior of the scheduler
|
|
// Tune the behavior of the scheduler
|
|
private final long decayPeriodMillis; // How long between each tick
|
|
private final long decayPeriodMillis; // How long between each tick
|
|
- private final double decayFactor; // nextCount = currentCount / decayFactor
|
|
|
|
- private final int numQueues; // affects scheduling decisions, from 0 to numQueues - 1
|
|
|
|
|
|
+ private final double decayFactor; // nextCount = currentCount * decayFactor
|
|
|
|
+ private final int numLevels;
|
|
private final double[] thresholds;
|
|
private final double[] thresholds;
|
|
private final IdentityProvider identityProvider;
|
|
private final IdentityProvider identityProvider;
|
|
|
|
+ private final boolean backOffByResponseTimeEnabled;
|
|
|
|
+ private final long[] backOffResponseTimeThresholds;
|
|
|
|
|
|
/**
|
|
/**
|
|
* This TimerTask will call decayCurrentCounts until
|
|
* This TimerTask will call decayCurrentCounts until
|
|
@@ -132,35 +168,46 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
|
|
|
|
/**
|
|
/**
|
|
* Create a decay scheduler.
|
|
* Create a decay scheduler.
|
|
- * @param numQueues number of queues to schedule for
|
|
|
|
|
|
+ * @param numLevels number of priority levels
|
|
* @param ns config prefix, so that we can configure multiple schedulers
|
|
* @param ns config prefix, so that we can configure multiple schedulers
|
|
* in a single instance.
|
|
* in a single instance.
|
|
* @param conf configuration to use.
|
|
* @param conf configuration to use.
|
|
*/
|
|
*/
|
|
- public DecayRpcScheduler(int numQueues, String ns, Configuration conf) {
|
|
|
|
- if (numQueues < 1) {
|
|
|
|
- throw new IllegalArgumentException("number of queues must be > 0");
|
|
|
|
|
|
+ public DecayRpcScheduler(int numLevels, String ns, Configuration conf) {
|
|
|
|
+ if(numLevels < 1) {
|
|
|
|
+ throw new IllegalArgumentException("Number of Priority Levels must be " +
|
|
|
|
+ "at least 1");
|
|
}
|
|
}
|
|
-
|
|
|
|
- this.numQueues = numQueues;
|
|
|
|
|
|
+ this.numLevels = numLevels;
|
|
this.decayFactor = parseDecayFactor(ns, conf);
|
|
this.decayFactor = parseDecayFactor(ns, conf);
|
|
this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
|
|
this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
|
|
this.identityProvider = this.parseIdentityProvider(ns, conf);
|
|
this.identityProvider = this.parseIdentityProvider(ns, conf);
|
|
- this.thresholds = parseThresholds(ns, conf, numQueues);
|
|
|
|
|
|
+ this.thresholds = parseThresholds(ns, conf, numLevels);
|
|
|
|
+ this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
|
|
|
|
+ conf);
|
|
|
|
+ this.backOffResponseTimeThresholds =
|
|
|
|
+ parseBackOffResponseTimeThreshold(ns, conf, numLevels);
|
|
|
|
|
|
// Setup delay timer
|
|
// Setup delay timer
|
|
Timer timer = new Timer();
|
|
Timer timer = new Timer();
|
|
DecayTask task = new DecayTask(this, timer);
|
|
DecayTask task = new DecayTask(this, timer);
|
|
timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
|
|
timer.scheduleAtFixedRate(task, decayPeriodMillis, decayPeriodMillis);
|
|
|
|
|
|
- MetricsProxy prox = MetricsProxy.getInstance(ns);
|
|
|
|
|
|
+ // Setup response time metrics
|
|
|
|
+ responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels);
|
|
|
|
+ responseTimeCountInCurrWindow = new AtomicLongArray(numLevels);
|
|
|
|
+ responseTimeAvgInLastWindow = new AtomicDoubleArray(numLevels);
|
|
|
|
+ responseTimeCountInLastWindow = new AtomicLongArray(numLevels);
|
|
|
|
+
|
|
|
|
+ MetricsProxy prox = MetricsProxy.getInstance(ns, numLevels);
|
|
prox.setDelegate(this);
|
|
prox.setDelegate(this);
|
|
}
|
|
}
|
|
|
|
|
|
// Load configs
|
|
// Load configs
|
|
- private IdentityProvider parseIdentityProvider(String ns, Configuration conf) {
|
|
|
|
|
|
+ private IdentityProvider parseIdentityProvider(String ns,
|
|
|
|
+ Configuration conf) {
|
|
List<IdentityProvider> providers = conf.getInstances(
|
|
List<IdentityProvider> providers = conf.getInstances(
|
|
- ns + "." + CommonConfigurationKeys.IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY,
|
|
|
|
|
|
+ ns + "." + CommonConfigurationKeys.IPC_IDENTITY_PROVIDER_KEY,
|
|
IdentityProvider.class);
|
|
IdentityProvider.class);
|
|
|
|
|
|
if (providers.size() < 1) {
|
|
if (providers.size() < 1) {
|
|
@@ -174,10 +221,16 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
|
|
|
|
private static double parseDecayFactor(String ns, Configuration conf) {
|
|
private static double parseDecayFactor(String ns, Configuration conf) {
|
|
double factor = conf.getDouble(ns + "." +
|
|
double factor = conf.getDouble(ns + "." +
|
|
- IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_KEY,
|
|
|
|
- IPC_CALLQUEUE_DECAYSCHEDULER_FACTOR_DEFAULT
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
|
|
+ IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, 0.0);
|
|
|
|
+ if (factor == 0.0) {
|
|
|
|
+ factor = conf.getDouble(ns + "." +
|
|
|
|
+ IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY,
|
|
|
|
+ IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_DEFAULT);
|
|
|
|
+ } else if ((factor > 0.0) && (factor < 1)) {
|
|
|
|
+ LOG.warn(IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY +
|
|
|
|
+ " is deprecated. Please use " +
|
|
|
|
+ IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY + ".");
|
|
|
|
+ }
|
|
if (factor <= 0 || factor >= 1) {
|
|
if (factor <= 0 || factor >= 1) {
|
|
throw new IllegalArgumentException("Decay Factor " +
|
|
throw new IllegalArgumentException("Decay Factor " +
|
|
"must be between 0 and 1");
|
|
"must be between 0 and 1");
|
|
@@ -188,10 +241,17 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
|
|
|
|
private static long parseDecayPeriodMillis(String ns, Configuration conf) {
|
|
private static long parseDecayPeriodMillis(String ns, Configuration conf) {
|
|
long period = conf.getLong(ns + "." +
|
|
long period = conf.getLong(ns + "." +
|
|
- IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_KEY,
|
|
|
|
- IPC_CALLQUEUE_DECAYSCHEDULER_PERIOD_DEFAULT
|
|
|
|
- );
|
|
|
|
-
|
|
|
|
|
|
+ IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY,
|
|
|
|
+ 0);
|
|
|
|
+ if (period == 0) {
|
|
|
|
+ period = conf.getLong(ns + "." +
|
|
|
|
+ IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY,
|
|
|
|
+ IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_DEFAULT);
|
|
|
|
+ } else if (period > 0) {
|
|
|
|
+ LOG.warn((IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY +
|
|
|
|
+ " is deprecated. Please use " +
|
|
|
|
+ IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY));
|
|
|
|
+ }
|
|
if (period <= 0) {
|
|
if (period <= 0) {
|
|
throw new IllegalArgumentException("Period millis must be >= 0");
|
|
throw new IllegalArgumentException("Period millis must be >= 0");
|
|
}
|
|
}
|
|
@@ -200,15 +260,24 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
}
|
|
}
|
|
|
|
|
|
private static double[] parseThresholds(String ns, Configuration conf,
|
|
private static double[] parseThresholds(String ns, Configuration conf,
|
|
- int numQueues) {
|
|
|
|
|
|
+ int numLevels) {
|
|
int[] percentages = conf.getInts(ns + "." +
|
|
int[] percentages = conf.getInts(ns + "." +
|
|
- IPC_CALLQUEUE_DECAYSCHEDULER_THRESHOLDS_KEY);
|
|
|
|
|
|
+ IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY);
|
|
|
|
|
|
if (percentages.length == 0) {
|
|
if (percentages.length == 0) {
|
|
- return getDefaultThresholds(numQueues);
|
|
|
|
- } else if (percentages.length != numQueues-1) {
|
|
|
|
|
|
+ percentages = conf.getInts(ns + "." + IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
|
|
|
|
+ if (percentages.length == 0) {
|
|
|
|
+ return getDefaultThresholds(numLevels);
|
|
|
|
+ }
|
|
|
|
+ } else {
|
|
|
|
+ LOG.warn(IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY +
|
|
|
|
+ " is deprecated. Please use " +
|
|
|
|
+ IPC_DECAYSCHEDULER_THRESHOLDS_KEY);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if (percentages.length != numLevels-1) {
|
|
throw new IllegalArgumentException("Number of thresholds should be " +
|
|
throw new IllegalArgumentException("Number of thresholds should be " +
|
|
- (numQueues-1) + ". Was: " + percentages.length);
|
|
|
|
|
|
+ (numLevels-1) + ". Was: " + percentages.length);
|
|
}
|
|
}
|
|
|
|
|
|
// Convert integer percentages to decimals
|
|
// Convert integer percentages to decimals
|
|
@@ -223,14 +292,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
/**
|
|
/**
|
|
* Generate default thresholds if user did not specify. Strategy is
|
|
* Generate default thresholds if user did not specify. Strategy is
|
|
* to halve each time, since queue usage tends to be exponential.
|
|
* to halve each time, since queue usage tends to be exponential.
|
|
- * So if numQueues is 4, we would generate: double[]{0.125, 0.25, 0.5}
|
|
|
|
|
|
+ * So if numLevels is 4, we would generate: double[]{0.125, 0.25, 0.5}
|
|
* which specifies the boundaries between each queue's usage.
|
|
* which specifies the boundaries between each queue's usage.
|
|
- * @param numQueues number of queues to compute for
|
|
|
|
- * @return array of boundaries of length numQueues - 1
|
|
|
|
|
|
+ * @param numLevels number of levels to compute for
|
|
|
|
+ * @return array of boundaries of length numLevels - 1
|
|
*/
|
|
*/
|
|
- private static double[] getDefaultThresholds(int numQueues) {
|
|
|
|
- double[] ret = new double[numQueues - 1];
|
|
|
|
- double div = Math.pow(2, numQueues - 1);
|
|
|
|
|
|
+ private static double[] getDefaultThresholds(int numLevels) {
|
|
|
|
+ double[] ret = new double[numLevels - 1];
|
|
|
|
+ double div = Math.pow(2, numLevels - 1);
|
|
|
|
|
|
for (int i = 0; i < ret.length; i++) {
|
|
for (int i = 0; i < ret.length; i++) {
|
|
ret[i] = Math.pow(2, i)/div;
|
|
ret[i] = Math.pow(2, i)/div;
|
|
@@ -238,39 +307,89 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
return ret;
|
|
return ret;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private static long[] parseBackOffResponseTimeThreshold(String ns,
|
|
|
|
+ Configuration conf, int numLevels) {
|
|
|
|
+ long[] responseTimeThresholds = conf.getTimeDurations(ns + "." +
|
|
|
|
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_THRESHOLDS_KEY,
|
|
|
|
+ TimeUnit.MILLISECONDS);
|
|
|
|
+ // backoff thresholds not specified
|
|
|
|
+ if (responseTimeThresholds.length == 0) {
|
|
|
|
+ return getDefaultBackOffResponseTimeThresholds(numLevels);
|
|
|
|
+ }
|
|
|
|
+ // backoff thresholds specified but not match with the levels
|
|
|
|
+ if (responseTimeThresholds.length != numLevels) {
|
|
|
|
+ throw new IllegalArgumentException(
|
|
|
|
+ "responseTimeThresholds must match with the number of priority " +
|
|
|
|
+ "levels");
|
|
|
|
+ }
|
|
|
|
+ // invalid thresholds
|
|
|
|
+ for (long responseTimeThreshold: responseTimeThresholds) {
|
|
|
|
+ if (responseTimeThreshold <= 0) {
|
|
|
|
+ throw new IllegalArgumentException(
|
|
|
|
+ "responseTimeThreshold millis must be >= 0");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return responseTimeThresholds;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // 10s for level 0, 20s for level 1, 30s for level 2, ...
|
|
|
|
+ private static long[] getDefaultBackOffResponseTimeThresholds(int numLevels) {
|
|
|
|
+ long[] ret = new long[numLevels];
|
|
|
|
+ for (int i = 0; i < ret.length; i++) {
|
|
|
|
+ ret[i] = 10000*(i+1);
|
|
|
|
+ }
|
|
|
|
+ return ret;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private static Boolean parseBackOffByResponseTimeEnabled(String ns,
|
|
|
|
+ Configuration conf) {
|
|
|
|
+ return conf.getBoolean(ns + "." +
|
|
|
|
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_KEY,
|
|
|
|
+ IPC_DECAYSCHEDULER_BACKOFF_RESPONSETIME_ENABLE_DEFAULT);
|
|
|
|
+ }
|
|
|
|
+
|
|
/**
|
|
/**
|
|
* Decay the stored counts for each user and clean as necessary.
|
|
* Decay the stored counts for each user and clean as necessary.
|
|
* This method should be called periodically in order to keep
|
|
* This method should be called periodically in order to keep
|
|
* counts current.
|
|
* counts current.
|
|
*/
|
|
*/
|
|
private void decayCurrentCounts() {
|
|
private void decayCurrentCounts() {
|
|
- long total = 0;
|
|
|
|
- Iterator<Map.Entry<Object, AtomicLong>> it =
|
|
|
|
- callCounts.entrySet().iterator();
|
|
|
|
-
|
|
|
|
- while (it.hasNext()) {
|
|
|
|
- Map.Entry<Object, AtomicLong> entry = it.next();
|
|
|
|
- AtomicLong count = entry.getValue();
|
|
|
|
-
|
|
|
|
- // Compute the next value by reducing it by the decayFactor
|
|
|
|
- long currentValue = count.get();
|
|
|
|
- long nextValue = (long)(currentValue * decayFactor);
|
|
|
|
- total += nextValue;
|
|
|
|
- count.set(nextValue);
|
|
|
|
-
|
|
|
|
- if (nextValue == 0) {
|
|
|
|
- // We will clean up unused keys here. An interesting optimization might
|
|
|
|
- // be to have an upper bound on keyspace in callCounts and only
|
|
|
|
- // clean once we pass it.
|
|
|
|
- it.remove();
|
|
|
|
|
|
+ try {
|
|
|
|
+ long total = 0;
|
|
|
|
+ Iterator<Map.Entry<Object, AtomicLong>> it =
|
|
|
|
+ callCounts.entrySet().iterator();
|
|
|
|
+
|
|
|
|
+ while (it.hasNext()) {
|
|
|
|
+ Map.Entry<Object, AtomicLong> entry = it.next();
|
|
|
|
+ AtomicLong count = entry.getValue();
|
|
|
|
+
|
|
|
|
+ // Compute the next value by reducing it by the decayFactor
|
|
|
|
+ long currentValue = count.get();
|
|
|
|
+ long nextValue = (long) (currentValue * decayFactor);
|
|
|
|
+ total += nextValue;
|
|
|
|
+ count.set(nextValue);
|
|
|
|
+
|
|
|
|
+ if (nextValue == 0) {
|
|
|
|
+ // We will clean up unused keys here. An interesting optimization
|
|
|
|
+ // might be to have an upper bound on keyspace in callCounts and only
|
|
|
|
+ // clean once we pass it.
|
|
|
|
+ it.remove();
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
|
|
- // Update the total so that we remain in sync
|
|
|
|
- totalCalls.set(total);
|
|
|
|
|
|
+ // Update the total so that we remain in sync
|
|
|
|
+ totalCalls.set(total);
|
|
|
|
+
|
|
|
|
+ // Now refresh the cache of scheduling decisions
|
|
|
|
+ recomputeScheduleCache();
|
|
|
|
|
|
- // Now refresh the cache of scheduling decisions
|
|
|
|
- recomputeScheduleCache();
|
|
|
|
|
|
+ // Update average response time with decay
|
|
|
|
+ updateAverageResponseTime(true);
|
|
|
|
+ } catch (Exception ex) {
|
|
|
|
+ LOG.error("decayCurrentCounts exception: " +
|
|
|
|
+ ExceptionUtils.getFullStackTrace(ex));
|
|
|
|
+ throw ex;
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -324,7 +443,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
/**
|
|
/**
|
|
* Given the number of occurrences, compute a scheduling decision.
|
|
* Given the number of occurrences, compute a scheduling decision.
|
|
* @param occurrences how many occurrences
|
|
* @param occurrences how many occurrences
|
|
- * @return scheduling decision from 0 to numQueues - 1
|
|
|
|
|
|
+ * @return scheduling decision from 0 to numLevels - 1
|
|
*/
|
|
*/
|
|
private int computePriorityLevel(long occurrences) {
|
|
private int computePriorityLevel(long occurrences) {
|
|
long totalCallSnapshot = totalCalls.get();
|
|
long totalCallSnapshot = totalCalls.get();
|
|
@@ -334,14 +453,14 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
proportion = (double) occurrences / totalCallSnapshot;
|
|
proportion = (double) occurrences / totalCallSnapshot;
|
|
}
|
|
}
|
|
|
|
|
|
- // Start with low priority queues, since they will be most common
|
|
|
|
- for(int i = (numQueues - 1); i > 0; i--) {
|
|
|
|
|
|
+ // Start with low priority levels, since they will be most common
|
|
|
|
+ for(int i = (numLevels - 1); i > 0; i--) {
|
|
if (proportion >= this.thresholds[i - 1]) {
|
|
if (proportion >= this.thresholds[i - 1]) {
|
|
- return i; // We've found our queue number
|
|
|
|
|
|
+ return i; // We've found our level number
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // If we get this far, we're at queue 0
|
|
|
|
|
|
+ // If we get this far, we're at level 0
|
|
return 0;
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -349,7 +468,7 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
* Returns the priority level for a given identity by first trying the cache,
|
|
* Returns the priority level for a given identity by first trying the cache,
|
|
* then computing it.
|
|
* then computing it.
|
|
* @param identity an object responding to toString and hashCode
|
|
* @param identity an object responding to toString and hashCode
|
|
- * @return integer scheduling decision from 0 to numQueues - 1
|
|
|
|
|
|
+ * @return integer scheduling decision from 0 to numLevels - 1
|
|
*/
|
|
*/
|
|
private int cachedOrComputedPriorityLevel(Object identity) {
|
|
private int cachedOrComputedPriorityLevel(Object identity) {
|
|
try {
|
|
try {
|
|
@@ -360,22 +479,29 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
if (scheduleCache != null) {
|
|
if (scheduleCache != null) {
|
|
Integer priority = scheduleCache.get(identity);
|
|
Integer priority = scheduleCache.get(identity);
|
|
if (priority != null) {
|
|
if (priority != null) {
|
|
|
|
+ LOG.debug("Cache priority for: {} with priority: {}", identity,
|
|
|
|
+ priority);
|
|
return priority;
|
|
return priority;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Cache was no good, compute it
|
|
// Cache was no good, compute it
|
|
- return computePriorityLevel(occurrences);
|
|
|
|
|
|
+ int priority = computePriorityLevel(occurrences);
|
|
|
|
+ LOG.debug("compute priority for " + identity + " priority " + priority);
|
|
|
|
+ return priority;
|
|
|
|
+
|
|
} catch (InterruptedException ie) {
|
|
} catch (InterruptedException ie) {
|
|
- LOG.warn("Caught InterruptedException, returning low priority queue");
|
|
|
|
- return numQueues - 1;
|
|
|
|
|
|
+ LOG.warn("Caught InterruptedException, returning low priority level");
|
|
|
|
+ LOG.debug("Fallback priority for: {} with priority: {}", identity,
|
|
|
|
+ numLevels - 1);
|
|
|
|
+ return numLevels - 1;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
* Compute the appropriate priority for a schedulable based on past requests.
|
|
* Compute the appropriate priority for a schedulable based on past requests.
|
|
* @param obj the schedulable obj to query and remember
|
|
* @param obj the schedulable obj to query and remember
|
|
- * @return the queue index which we recommend scheduling in
|
|
|
|
|
|
+ * @return the level index which we recommend scheduling in
|
|
*/
|
|
*/
|
|
@Override
|
|
@Override
|
|
public int getPriorityLevel(Schedulable obj) {
|
|
public int getPriorityLevel(Schedulable obj) {
|
|
@@ -389,6 +515,73 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
return cachedOrComputedPriorityLevel(identity);
|
|
return cachedOrComputedPriorityLevel(identity);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ @Override
|
|
|
|
+ public boolean shouldBackOff(Schedulable obj) {
|
|
|
|
+ Boolean backOff = false;
|
|
|
|
+ if (backOffByResponseTimeEnabled) {
|
|
|
|
+ int priorityLevel = obj.getPriorityLevel();
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ double[] responseTimes = getAverageResponseTime();
|
|
|
|
+ LOG.debug("Current Caller: {} Priority: {} ",
|
|
|
|
+ obj.getUserGroupInformation().getUserName(),
|
|
|
|
+ obj.getPriorityLevel());
|
|
|
|
+ for (int i = 0; i < numLevels; i++) {
|
|
|
|
+ LOG.debug("Queue: {} responseTime: {} backoffThreshold: {}", i,
|
|
|
|
+ responseTimes[i], backOffResponseTimeThresholds[i]);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ // High priority rpc over threshold triggers back off of low priority rpc
|
|
|
|
+ for (int i = 0; i < priorityLevel + 1; i++) {
|
|
|
|
+ if (responseTimeAvgInLastWindow.get(i) >
|
|
|
|
+ backOffResponseTimeThresholds[i]) {
|
|
|
|
+ backOff = true;
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ return backOff;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void addResponseTime(String name, int priorityLevel, int queueTime,
|
|
|
|
+ int processingTime) {
|
|
|
|
+ responseTimeCountInCurrWindow.getAndIncrement(priorityLevel);
|
|
|
|
+ responseTimeTotalInCurrWindow.getAndAdd(priorityLevel,
|
|
|
|
+ queueTime+processingTime);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("addResponseTime for call: {} priority: {} queueTime: {} " +
|
|
|
|
+ "processingTime: {} ", name, priorityLevel, queueTime,
|
|
|
|
+ processingTime);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ // Update the cached average response time at the end of decay window
|
|
|
|
+ void updateAverageResponseTime(boolean enableDecay) {
|
|
|
|
+ for (int i = 0; i < numLevels; i++) {
|
|
|
|
+ double averageResponseTime = 0;
|
|
|
|
+ long totalResponseTime = responseTimeTotalInCurrWindow.get(i);
|
|
|
|
+ long responseTimeCount = responseTimeCountInCurrWindow.get(i);
|
|
|
|
+ if (responseTimeCount > 0) {
|
|
|
|
+ averageResponseTime = (double) totalResponseTime / responseTimeCount;
|
|
|
|
+ }
|
|
|
|
+ final double lastAvg = responseTimeAvgInLastWindow.get(i);
|
|
|
|
+ if (enableDecay && lastAvg > 0.0) {
|
|
|
|
+ final double decayed = decayFactor * lastAvg + averageResponseTime;
|
|
|
|
+ responseTimeAvgInLastWindow.set(i, decayed);
|
|
|
|
+ } else {
|
|
|
|
+ responseTimeAvgInLastWindow.set(i, averageResponseTime);
|
|
|
|
+ }
|
|
|
|
+ responseTimeCountInLastWindow.set(i, responseTimeCount);
|
|
|
|
+ if (LOG.isDebugEnabled()) {
|
|
|
|
+ LOG.debug("updateAverageResponseTime queue: {} Average: {} Count: {}",
|
|
|
|
+ i, averageResponseTime, responseTimeCount);
|
|
|
|
+ }
|
|
|
|
+ // Reset for next decay window
|
|
|
|
+ responseTimeTotalInCurrWindow.set(i, 0);
|
|
|
|
+ responseTimeCountInCurrWindow.set(i, 0);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
// For testing
|
|
// For testing
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
public double getDecayFactor() { return decayFactor; }
|
|
public double getDecayFactor() { return decayFactor; }
|
|
@@ -429,16 +622,21 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
|
|
|
|
// Weakref for delegate, so we don't retain it forever if it can be GC'd
|
|
// Weakref for delegate, so we don't retain it forever if it can be GC'd
|
|
private WeakReference<DecayRpcScheduler> delegate;
|
|
private WeakReference<DecayRpcScheduler> delegate;
|
|
|
|
+ private double[] averageResponseTimeDefault;
|
|
|
|
+ private long[] callCountInLastWindowDefault;
|
|
|
|
|
|
- private MetricsProxy(String namespace) {
|
|
|
|
|
|
+ private MetricsProxy(String namespace, int numLevels) {
|
|
|
|
+ averageResponseTimeDefault = new double[numLevels];
|
|
|
|
+ callCountInLastWindowDefault = new long[numLevels];
|
|
MBeans.register(namespace, "DecayRpcScheduler", this);
|
|
MBeans.register(namespace, "DecayRpcScheduler", this);
|
|
}
|
|
}
|
|
|
|
|
|
- public static synchronized MetricsProxy getInstance(String namespace) {
|
|
|
|
|
|
+ public static synchronized MetricsProxy getInstance(String namespace,
|
|
|
|
+ int numLevels) {
|
|
MetricsProxy mp = INSTANCES.get(namespace);
|
|
MetricsProxy mp = INSTANCES.get(namespace);
|
|
if (mp == null) {
|
|
if (mp == null) {
|
|
// We must create one
|
|
// We must create one
|
|
- mp = new MetricsProxy(namespace);
|
|
|
|
|
|
+ mp = new MetricsProxy(namespace, numLevels);
|
|
INSTANCES.put(namespace, mp);
|
|
INSTANCES.put(namespace, mp);
|
|
}
|
|
}
|
|
return mp;
|
|
return mp;
|
|
@@ -487,6 +685,25 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
return scheduler.getTotalCallVolume();
|
|
return scheduler.getTotalCallVolume();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public double[] getAverageResponseTime() {
|
|
|
|
+ DecayRpcScheduler scheduler = delegate.get();
|
|
|
|
+ if (scheduler == null) {
|
|
|
|
+ return averageResponseTimeDefault;
|
|
|
|
+ } else {
|
|
|
|
+ return scheduler.getAverageResponseTime();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public long[] getResponseTimeCountInLastWindow() {
|
|
|
|
+ DecayRpcScheduler scheduler = delegate.get();
|
|
|
|
+ if (scheduler == null) {
|
|
|
|
+ return callCountInLastWindowDefault;
|
|
|
|
+ } else {
|
|
|
|
+ return scheduler.getResponseTimeCountInLastWindow();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
public int getUniqueIdentityCount() {
|
|
public int getUniqueIdentityCount() {
|
|
@@ -497,6 +714,23 @@ public class DecayRpcScheduler implements RpcScheduler, DecayRpcSchedulerMXBean
|
|
return totalCalls.get();
|
|
return totalCalls.get();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ public long[] getResponseTimeCountInLastWindow() {
|
|
|
|
+ long[] ret = new long[responseTimeCountInLastWindow.length()];
|
|
|
|
+ for (int i = 0; i < responseTimeCountInLastWindow.length(); i++) {
|
|
|
|
+ ret[i] = responseTimeCountInLastWindow.get(i);
|
|
|
|
+ }
|
|
|
|
+ return ret;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public double[] getAverageResponseTime() {
|
|
|
|
+ double[] ret = new double[responseTimeAvgInLastWindow.length()];
|
|
|
|
+ for (int i = 0; i < responseTimeAvgInLastWindow.length(); i++) {
|
|
|
|
+ ret[i] = responseTimeAvgInLastWindow.get(i);
|
|
|
|
+ }
|
|
|
|
+ return ret;
|
|
|
|
+ }
|
|
|
|
+
|
|
public String getSchedulingDecisionSummary() {
|
|
public String getSchedulingDecisionSummary() {
|
|
Map<Object, Integer> decisions = scheduleCacheRef.get();
|
|
Map<Object, Integer> decisions = scheduleCacheRef.get();
|
|
if (decisions == null) {
|
|
if (decisions == null) {
|