|
@@ -58,8 +58,8 @@ import org.slf4j.LoggerFactory;
|
|
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
|
|
import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * The decay RPC scheduler counts incoming requests in a map, then
|
|
|
|
- * decays the counts at a fixed time interval. The scheduler is optimized
|
|
|
|
|
|
+ * The decay RPC scheduler tracks the cost of incoming requests in a map, then
|
|
|
|
+ * decays the costs at a fixed time interval. The scheduler is optimized
|
|
* for large periods (on the order of seconds), as it offloads work to the
|
|
* for large periods (on the order of seconds), as it offloads work to the
|
|
* decay sweep.
|
|
* decay sweep.
|
|
*/
|
|
*/
|
|
@@ -134,15 +134,15 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
private static final ObjectWriter WRITER = new ObjectMapper().writer();
|
|
private static final ObjectWriter WRITER = new ObjectMapper().writer();
|
|
|
|
|
|
// Track the decayed and raw (no decay) number of calls for each schedulable
|
|
// Track the decayed and raw (no decay) number of calls for each schedulable
|
|
- // identity from all previous decay windows: idx 0 for decayed call count and
|
|
|
|
- // idx 1 for the raw call count
|
|
|
|
- private final ConcurrentHashMap<Object, List<AtomicLong>> callCounts =
|
|
|
|
|
|
+ // identity from all previous decay windows: idx 0 for decayed call cost and
|
|
|
|
+ // idx 1 for the raw call cost
|
|
|
|
+ private final ConcurrentHashMap<Object, List<AtomicLong>> callCosts =
|
|
new ConcurrentHashMap<Object, List<AtomicLong>>();
|
|
new ConcurrentHashMap<Object, List<AtomicLong>>();
|
|
|
|
|
|
- // Should be the sum of all AtomicLongs in decayed callCounts
|
|
|
|
- private final AtomicLong totalDecayedCallCount = new AtomicLong();
|
|
|
|
- // The sum of all AtomicLongs in raw callCounts
|
|
|
|
- private final AtomicLong totalRawCallCount = new AtomicLong();
|
|
|
|
|
|
+ // Should be the sum of all AtomicLongs in decayed callCosts
|
|
|
|
+ private final AtomicLong totalDecayedCallCost = new AtomicLong();
|
|
|
|
+ // The sum of all AtomicLongs in raw callCosts
|
|
|
|
+ private final AtomicLong totalRawCallCost = new AtomicLong();
|
|
|
|
|
|
|
|
|
|
// Track total call count and response time in current decay window
|
|
// Track total call count and response time in current decay window
|
|
@@ -160,7 +160,7 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
|
|
|
|
// Tune the behavior of the scheduler
|
|
// Tune the behavior of the scheduler
|
|
private final long decayPeriodMillis; // How long between each tick
|
|
private final long decayPeriodMillis; // How long between each tick
|
|
- private final double decayFactor; // nextCount = currentCount * decayFactor
|
|
|
|
|
|
+ private final double decayFactor; // nextCost = currentCost * decayFactor
|
|
private final int numLevels;
|
|
private final int numLevels;
|
|
private final double[] thresholds;
|
|
private final double[] thresholds;
|
|
private final IdentityProvider identityProvider;
|
|
private final IdentityProvider identityProvider;
|
|
@@ -170,9 +170,10 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
private final int topUsersCount; // e.g., report top 10 users' metrics
|
|
private final int topUsersCount; // e.g., report top 10 users' metrics
|
|
private static final double PRECISION = 0.0001;
|
|
private static final double PRECISION = 0.0001;
|
|
private MetricsProxy metricsProxy;
|
|
private MetricsProxy metricsProxy;
|
|
|
|
+ private final CostProvider costProvider;
|
|
|
|
|
|
/**
|
|
/**
|
|
- * This TimerTask will call decayCurrentCounts until
|
|
|
|
|
|
+ * This TimerTask will call decayCurrentCosts until
|
|
* the scheduler has been garbage collected.
|
|
* the scheduler has been garbage collected.
|
|
*/
|
|
*/
|
|
public static class DecayTask extends TimerTask {
|
|
public static class DecayTask extends TimerTask {
|
|
@@ -188,7 +189,7 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
public void run() {
|
|
public void run() {
|
|
DecayRpcScheduler sched = schedulerRef.get();
|
|
DecayRpcScheduler sched = schedulerRef.get();
|
|
if (sched != null) {
|
|
if (sched != null) {
|
|
- sched.decayCurrentCounts();
|
|
|
|
|
|
+ sched.decayCurrentCosts();
|
|
} else {
|
|
} else {
|
|
// Our scheduler was garbage collected since it is no longer in use,
|
|
// Our scheduler was garbage collected since it is no longer in use,
|
|
// so we should terminate the timer as well
|
|
// so we should terminate the timer as well
|
|
@@ -215,6 +216,7 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
this.decayFactor = parseDecayFactor(ns, conf);
|
|
this.decayFactor = parseDecayFactor(ns, conf);
|
|
this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
|
|
this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
|
|
this.identityProvider = this.parseIdentityProvider(ns, conf);
|
|
this.identityProvider = this.parseIdentityProvider(ns, conf);
|
|
|
|
+ this.costProvider = this.parseCostProvider(ns, conf);
|
|
this.thresholds = parseThresholds(ns, conf, numLevels);
|
|
this.thresholds = parseThresholds(ns, conf, numLevels);
|
|
this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
|
|
this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
|
|
conf);
|
|
conf);
|
|
@@ -242,6 +244,24 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
recomputeScheduleCache();
|
|
recomputeScheduleCache();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private CostProvider parseCostProvider(String ns, Configuration conf) {
|
|
|
|
+ List<CostProvider> providers = conf.getInstances(
|
|
|
|
+ ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
|
|
|
|
+ CostProvider.class);
|
|
|
|
+
|
|
|
|
+ if (providers.size() < 1) {
|
|
|
|
+ LOG.info("CostProvider not specified, defaulting to DefaultCostProvider");
|
|
|
|
+ return new DefaultCostProvider();
|
|
|
|
+ } else if (providers.size() > 1) {
|
|
|
|
+ LOG.warn("Found multiple CostProviders; using: {}",
|
|
|
|
+ providers.get(0).getClass());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ CostProvider provider = providers.get(0); // use the first
|
|
|
|
+ provider.init(ns, conf);
|
|
|
|
+ return provider;
|
|
|
|
+ }
|
|
|
|
+
|
|
// Load configs
|
|
// Load configs
|
|
private IdentityProvider parseIdentityProvider(String ns,
|
|
private IdentityProvider parseIdentityProvider(String ns,
|
|
Configuration conf) {
|
|
Configuration conf) {
|
|
@@ -388,41 +408,41 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Decay the stored counts for each user and clean as necessary.
|
|
|
|
|
|
+ * Decay the stored costs for each user and clean as necessary.
|
|
* This method should be called periodically in order to keep
|
|
* This method should be called periodically in order to keep
|
|
- * counts current.
|
|
|
|
|
|
+ * costs current.
|
|
*/
|
|
*/
|
|
- private void decayCurrentCounts() {
|
|
|
|
|
|
+ private void decayCurrentCosts() {
|
|
try {
|
|
try {
|
|
- long totalDecayedCount = 0;
|
|
|
|
- long totalRawCount = 0;
|
|
|
|
|
|
+ long totalDecayedCost = 0;
|
|
|
|
+ long totalRawCost = 0;
|
|
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
|
|
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
|
|
- callCounts.entrySet().iterator();
|
|
|
|
|
|
+ callCosts.entrySet().iterator();
|
|
|
|
|
|
while (it.hasNext()) {
|
|
while (it.hasNext()) {
|
|
Map.Entry<Object, List<AtomicLong>> entry = it.next();
|
|
Map.Entry<Object, List<AtomicLong>> entry = it.next();
|
|
- AtomicLong decayedCount = entry.getValue().get(0);
|
|
|
|
- AtomicLong rawCount = entry.getValue().get(1);
|
|
|
|
|
|
+ AtomicLong decayedCost = entry.getValue().get(0);
|
|
|
|
+ AtomicLong rawCost = entry.getValue().get(1);
|
|
|
|
|
|
|
|
|
|
// Compute the next value by reducing it by the decayFactor
|
|
// Compute the next value by reducing it by the decayFactor
|
|
- totalRawCount += rawCount.get();
|
|
|
|
- long currentValue = decayedCount.get();
|
|
|
|
|
|
+ totalRawCost += rawCost.get();
|
|
|
|
+ long currentValue = decayedCost.get();
|
|
long nextValue = (long) (currentValue * decayFactor);
|
|
long nextValue = (long) (currentValue * decayFactor);
|
|
- totalDecayedCount += nextValue;
|
|
|
|
- decayedCount.set(nextValue);
|
|
|
|
|
|
+ totalDecayedCost += nextValue;
|
|
|
|
+ decayedCost.set(nextValue);
|
|
|
|
|
|
if (nextValue == 0) {
|
|
if (nextValue == 0) {
|
|
// We will clean up unused keys here. An interesting optimization
|
|
// We will clean up unused keys here. An interesting optimization
|
|
- // might be to have an upper bound on keyspace in callCounts and only
|
|
|
|
|
|
+ // might be to have an upper bound on keyspace in callCosts and only
|
|
// clean once we pass it.
|
|
// clean once we pass it.
|
|
it.remove();
|
|
it.remove();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Update the total so that we remain in sync
|
|
// Update the total so that we remain in sync
|
|
- totalDecayedCallCount.set(totalDecayedCount);
|
|
|
|
- totalRawCallCount.set(totalRawCount);
|
|
|
|
|
|
+ totalDecayedCallCost.set(totalDecayedCost);
|
|
|
|
+ totalRawCallCost.set(totalRawCost);
|
|
|
|
|
|
// Now refresh the cache of scheduling decisions
|
|
// Now refresh the cache of scheduling decisions
|
|
recomputeScheduleCache();
|
|
recomputeScheduleCache();
|
|
@@ -430,19 +450,19 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
// Update average response time with decay
|
|
// Update average response time with decay
|
|
updateAverageResponseTime(true);
|
|
updateAverageResponseTime(true);
|
|
} catch (Exception ex) {
|
|
} catch (Exception ex) {
|
|
- LOG.error("decayCurrentCounts exception: " +
|
|
|
|
- ExceptionUtils.getFullStackTrace(ex));
|
|
|
|
|
|
+ LOG.error("decayCurrentCosts exception: " +
|
|
|
|
+ ExceptionUtils.getStackTrace(ex));
|
|
throw ex;
|
|
throw ex;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Update the scheduleCache to match current conditions in callCounts.
|
|
|
|
|
|
+ * Update the scheduleCache to match current conditions in callCosts.
|
|
*/
|
|
*/
|
|
private void recomputeScheduleCache() {
|
|
private void recomputeScheduleCache() {
|
|
Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
|
|
Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
|
|
|
|
|
|
- for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
|
|
|
|
|
|
+ for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
|
|
Object id = entry.getKey();
|
|
Object id = entry.getKey();
|
|
AtomicLong value = entry.getValue().get(0);
|
|
AtomicLong value = entry.getValue().get(0);
|
|
|
|
|
|
@@ -457,51 +477,52 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Get the number of occurrences and increment atomically.
|
|
|
|
- * @param identity the identity of the user to increment
|
|
|
|
- * @return the value before incrementation
|
|
|
|
|
|
+ * Adjust the stored cost for a given identity.
|
|
|
|
+ *
|
|
|
|
+ * @param identity the identity of the user whose cost should be adjusted
|
|
|
|
+ * @param costDelta the cost to add for the given identity
|
|
*/
|
|
*/
|
|
- private long getAndIncrementCallCounts(Object identity)
|
|
|
|
- throws InterruptedException {
|
|
|
|
- // We will increment the count, or create it if no such count exists
|
|
|
|
- List<AtomicLong> count = this.callCounts.get(identity);
|
|
|
|
- if (count == null) {
|
|
|
|
- // Create the counts since no such count exists.
|
|
|
|
- // idx 0 for decayed call count
|
|
|
|
- // idx 1 for the raw call count
|
|
|
|
- count = new ArrayList<AtomicLong>(2);
|
|
|
|
- count.add(new AtomicLong(0));
|
|
|
|
- count.add(new AtomicLong(0));
|
|
|
|
|
|
+ private void addCost(Object identity, long costDelta) {
|
|
|
|
+ // We will increment the cost, or create it if no such cost exists
|
|
|
|
+ List<AtomicLong> cost = this.callCosts.get(identity);
|
|
|
|
+ if (cost == null) {
|
|
|
|
+ // Create the costs since no such cost exists.
|
|
|
|
+ // idx 0 for decayed call cost
|
|
|
|
+ // idx 1 for the raw call cost
|
|
|
|
+ cost = new ArrayList<AtomicLong>(2);
|
|
|
|
+ cost.add(new AtomicLong(0));
|
|
|
|
+ cost.add(new AtomicLong(0));
|
|
|
|
|
|
// Put it in, or get the AtomicInteger that was put in by another thread
|
|
// Put it in, or get the AtomicInteger that was put in by another thread
|
|
- List<AtomicLong> otherCount = callCounts.putIfAbsent(identity, count);
|
|
|
|
- if (otherCount != null) {
|
|
|
|
- count = otherCount;
|
|
|
|
|
|
+ List<AtomicLong> otherCost = callCosts.putIfAbsent(identity, cost);
|
|
|
|
+ if (otherCost != null) {
|
|
|
|
+ cost = otherCost;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Update the total
|
|
// Update the total
|
|
- totalDecayedCallCount.getAndIncrement();
|
|
|
|
- totalRawCallCount.getAndIncrement();
|
|
|
|
|
|
+ totalDecayedCallCost.getAndAdd(costDelta);
|
|
|
|
+ totalRawCallCost.getAndAdd(costDelta);
|
|
|
|
|
|
// At this point value is guaranteed to be not null. It may however have
|
|
// At this point value is guaranteed to be not null. It may however have
|
|
- // been clobbered from callCounts. Nonetheless, we return what
|
|
|
|
|
|
+ // been clobbered from callCosts. Nonetheless, we return what
|
|
// we have.
|
|
// we have.
|
|
- count.get(1).getAndIncrement();
|
|
|
|
- return count.get(0).getAndIncrement();
|
|
|
|
|
|
+ cost.get(1).getAndAdd(costDelta);
|
|
|
|
+ cost.get(0).getAndAdd(costDelta);
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
- * Given the number of occurrences, compute a scheduling decision.
|
|
|
|
- * @param occurrences how many occurrences
|
|
|
|
|
|
+ * Given the cost for an identity, compute a scheduling decision.
|
|
|
|
+ *
|
|
|
|
+ * @param cost the cost for an identity
|
|
* @return scheduling decision from 0 to numLevels - 1
|
|
* @return scheduling decision from 0 to numLevels - 1
|
|
*/
|
|
*/
|
|
- private int computePriorityLevel(long occurrences) {
|
|
|
|
- long totalCallSnapshot = totalDecayedCallCount.get();
|
|
|
|
|
|
+ private int computePriorityLevel(long cost) {
|
|
|
|
+ long totalCallSnapshot = totalDecayedCallCost.get();
|
|
|
|
|
|
double proportion = 0;
|
|
double proportion = 0;
|
|
if (totalCallSnapshot > 0) {
|
|
if (totalCallSnapshot > 0) {
|
|
- proportion = (double) occurrences / totalCallSnapshot;
|
|
|
|
|
|
+ proportion = (double) cost / totalCallSnapshot;
|
|
}
|
|
}
|
|
|
|
|
|
// Start with low priority levels, since they will be most common
|
|
// Start with low priority levels, since they will be most common
|
|
@@ -522,31 +543,23 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
* @return integer scheduling decision from 0 to numLevels - 1
|
|
* @return integer scheduling decision from 0 to numLevels - 1
|
|
*/
|
|
*/
|
|
private int cachedOrComputedPriorityLevel(Object identity) {
|
|
private int cachedOrComputedPriorityLevel(Object identity) {
|
|
- try {
|
|
|
|
- long occurrences = this.getAndIncrementCallCounts(identity);
|
|
|
|
-
|
|
|
|
- // Try the cache
|
|
|
|
- Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
|
|
|
|
- if (scheduleCache != null) {
|
|
|
|
- Integer priority = scheduleCache.get(identity);
|
|
|
|
- if (priority != null) {
|
|
|
|
- LOG.debug("Cache priority for: {} with priority: {}", identity,
|
|
|
|
- priority);
|
|
|
|
- return priority;
|
|
|
|
- }
|
|
|
|
|
|
+ // Try the cache
|
|
|
|
+ Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
|
|
|
|
+ if (scheduleCache != null) {
|
|
|
|
+ Integer priority = scheduleCache.get(identity);
|
|
|
|
+ if (priority != null) {
|
|
|
|
+ LOG.debug("Cache priority for: {} with priority: {}", identity,
|
|
|
|
+ priority);
|
|
|
|
+ return priority;
|
|
}
|
|
}
|
|
-
|
|
|
|
- // Cache was no good, compute it
|
|
|
|
- int priority = computePriorityLevel(occurrences);
|
|
|
|
- LOG.debug("compute priority for " + identity + " priority " + priority);
|
|
|
|
- return priority;
|
|
|
|
-
|
|
|
|
- } catch (InterruptedException ie) {
|
|
|
|
- LOG.warn("Caught InterruptedException, returning low priority level");
|
|
|
|
- LOG.debug("Fallback priority for: {} with priority: {}", identity,
|
|
|
|
- numLevels - 1);
|
|
|
|
- return numLevels - 1;
|
|
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ // Cache was no good, compute it
|
|
|
|
+ List<AtomicLong> costList = callCosts.get(identity);
|
|
|
|
+ long currentCost = costList == null ? 0 : costList.get(0).get();
|
|
|
|
+ int priority = computePriorityLevel(currentCost);
|
|
|
|
+ LOG.debug("compute priority for {} priority {}", identity, priority);
|
|
|
|
+ return priority;
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -596,6 +609,10 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
@Override
|
|
@Override
|
|
public void addResponseTime(String callName, Schedulable schedulable,
|
|
public void addResponseTime(String callName, Schedulable schedulable,
|
|
ProcessingDetails details) {
|
|
ProcessingDetails details) {
|
|
|
|
+ String user = identityProvider.makeIdentity(schedulable);
|
|
|
|
+ long processingCost = costProvider.getCost(details);
|
|
|
|
+ addCost(user, processingCost);
|
|
|
|
+
|
|
int priorityLevel = schedulable.getPriorityLevel();
|
|
int priorityLevel = schedulable.getPriorityLevel();
|
|
long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
|
|
long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
|
|
long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
|
|
long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
|
|
@@ -643,22 +660,30 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
|
|
|
|
// For testing
|
|
// For testing
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public double getDecayFactor() { return decayFactor; }
|
|
|
|
|
|
+ double getDecayFactor() {
|
|
|
|
+ return decayFactor;
|
|
|
|
+ }
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public long getDecayPeriodMillis() { return decayPeriodMillis; }
|
|
|
|
|
|
+ long getDecayPeriodMillis() {
|
|
|
|
+ return decayPeriodMillis;
|
|
|
|
+ }
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public double[] getThresholds() { return thresholds; }
|
|
|
|
|
|
+ double[] getThresholds() {
|
|
|
|
+ return thresholds;
|
|
|
|
+ }
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public void forceDecay() { decayCurrentCounts(); }
|
|
|
|
|
|
+ void forceDecay() {
|
|
|
|
+ decayCurrentCosts();
|
|
|
|
+ }
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public Map<Object, Long> getCallCountSnapshot() {
|
|
|
|
|
|
+ Map<Object, Long> getCallCostSnapshot() {
|
|
HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
|
|
HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
|
|
|
|
|
|
- for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
|
|
|
|
|
|
+ for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
|
|
snapshot.put(entry.getKey(), entry.getValue().get(0).get());
|
|
snapshot.put(entry.getKey(), entry.getValue().get(0).get());
|
|
}
|
|
}
|
|
|
|
|
|
@@ -666,8 +691,8 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
}
|
|
}
|
|
|
|
|
|
@VisibleForTesting
|
|
@VisibleForTesting
|
|
- public long getTotalCallSnapshot() {
|
|
|
|
- return totalDecayedCallCount.get();
|
|
|
|
|
|
+ long getTotalCallSnapshot() {
|
|
|
|
+ return totalDecayedCallCost.get();
|
|
}
|
|
}
|
|
|
|
|
|
/**
|
|
/**
|
|
@@ -800,15 +825,15 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
}
|
|
}
|
|
|
|
|
|
public int getUniqueIdentityCount() {
|
|
public int getUniqueIdentityCount() {
|
|
- return callCounts.size();
|
|
|
|
|
|
+ return callCosts.size();
|
|
}
|
|
}
|
|
|
|
|
|
public long getTotalCallVolume() {
|
|
public long getTotalCallVolume() {
|
|
- return totalDecayedCallCount.get();
|
|
|
|
|
|
+ return totalDecayedCallCost.get();
|
|
}
|
|
}
|
|
|
|
|
|
public long getTotalRawCallVolume() {
|
|
public long getTotalRawCallVolume() {
|
|
- return totalRawCallCount.get();
|
|
|
|
|
|
+ return totalRawCallCost.get();
|
|
}
|
|
}
|
|
|
|
|
|
public long[] getResponseTimeCountInLastWindow() {
|
|
public long[] getResponseTimeCountInLastWindow() {
|
|
@@ -901,17 +926,17 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- // Get the top N callers' raw call count and scheduler decision
|
|
|
|
|
|
+ // Get the top N callers' raw call cost and scheduler decision
|
|
private TopN getTopCallers(int n) {
|
|
private TopN getTopCallers(int n) {
|
|
TopN topNCallers = new TopN(n);
|
|
TopN topNCallers = new TopN(n);
|
|
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
|
|
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
|
|
- callCounts.entrySet().iterator();
|
|
|
|
|
|
+ callCosts.entrySet().iterator();
|
|
while (it.hasNext()) {
|
|
while (it.hasNext()) {
|
|
Map.Entry<Object, List<AtomicLong>> entry = it.next();
|
|
Map.Entry<Object, List<AtomicLong>> entry = it.next();
|
|
String caller = entry.getKey().toString();
|
|
String caller = entry.getKey().toString();
|
|
- Long count = entry.getValue().get(1).get();
|
|
|
|
- if (count > 0) {
|
|
|
|
- topNCallers.offer(new NameValuePair(caller, count));
|
|
|
|
|
|
+ Long cost = entry.getValue().get(1).get();
|
|
|
|
+ if (cost > 0) {
|
|
|
|
+ topNCallers.offer(new NameValuePair(caller, cost));
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return topNCallers;
|
|
return topNCallers;
|
|
@@ -932,25 +957,25 @@ public class DecayRpcScheduler implements RpcScheduler,
|
|
|
|
|
|
public String getCallVolumeSummary() {
|
|
public String getCallVolumeSummary() {
|
|
try {
|
|
try {
|
|
- return WRITER.writeValueAsString(getDecayedCallCounts());
|
|
|
|
|
|
+ return WRITER.writeValueAsString(getDecayedCallCosts());
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
return "Error: " + e.getMessage();
|
|
return "Error: " + e.getMessage();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- private Map<Object, Long> getDecayedCallCounts() {
|
|
|
|
- Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size());
|
|
|
|
|
|
+ private Map<Object, Long> getDecayedCallCosts() {
|
|
|
|
+ Map<Object, Long> decayedCallCosts = new HashMap<>(callCosts.size());
|
|
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
|
|
Iterator<Map.Entry<Object, List<AtomicLong>>> it =
|
|
- callCounts.entrySet().iterator();
|
|
|
|
|
|
+ callCosts.entrySet().iterator();
|
|
while (it.hasNext()) {
|
|
while (it.hasNext()) {
|
|
Map.Entry<Object, List<AtomicLong>> entry = it.next();
|
|
Map.Entry<Object, List<AtomicLong>> entry = it.next();
|
|
Object user = entry.getKey();
|
|
Object user = entry.getKey();
|
|
- Long decayedCount = entry.getValue().get(0).get();
|
|
|
|
- if (decayedCount > 0) {
|
|
|
|
- decayedCallCounts.put(user, decayedCount);
|
|
|
|
|
|
+ Long decayedCost = entry.getValue().get(0).get();
|
|
|
|
+ if (decayedCost > 0) {
|
|
|
|
+ decayedCallCosts.put(user, decayedCost);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- return decayedCallCounts;
|
|
|
|
|
|
+ return decayedCallCosts;
|
|
}
|
|
}
|
|
|
|
|
|
@Override
|
|
@Override
|