瀏覽代碼

HDFS-14403. Cost-based extension to the RPC Fair Call Queue. Contributed by Christopher Gregorian.

Christopher Gregorian 6 年之前
父節點
當前提交
129576f628

+ 1 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -106,6 +106,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
   public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
   public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
   public static final String IPC_SCHEDULER_IMPL_KEY = "scheduler.impl";
   public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
   public static final String IPC_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
+  public static final String IPC_COST_PROVIDER_KEY = "cost-provider.impl";
   public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
   public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
   public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
   public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
 
 

+ 0 - 1
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

@@ -198,7 +198,6 @@ public class CallQueueManager<E extends Schedulable>
   }
   }
 
 
   // This should be only called once per call and cached in the call object
   // This should be only called once per call and cached in the call object
-  // each getPriorityLevel call will increment the counter for the caller
   int getPriorityLevel(Schedulable e) {
   int getPriorityLevel(Schedulable e) {
     return scheduler.getPriorityLevel(e);
     return scheduler.getPriorityLevel(e);
   }
   }

+ 46 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CostProvider.java

@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Used by {@link DecayRpcScheduler} to get the cost of users' operations. This
+ * is configurable using
+ * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY}.
+ */
+public interface CostProvider {
+
+  /**
+   * Initialize this provider using the given configuration, examining only
+   * ones which fall within the provided namespace.
+   *
+   * @param namespace The namespace to use when looking up configurations.
+   * @param conf The configuration
+   */
+  void init(String namespace, Configuration conf);
+
+  /**
+   * Get cost from {@link ProcessingDetails} which will be used in scheduler.
+   *
+   * @param details Process details
+   * @return The cost of the call
+   */
+  long getCost(ProcessingDetails details);
+}

+ 137 - 112
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -58,8 +58,8 @@ import org.slf4j.LoggerFactory;
 import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
 import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
 
 
 /**
 /**
- * The decay RPC scheduler counts incoming requests in a map, then
- * decays the counts at a fixed time interval. The scheduler is optimized
+ * The decay RPC scheduler tracks the cost of incoming requests in a map, then
+ * decays the costs at a fixed time interval. The scheduler is optimized
  * for large periods (on the order of seconds), as it offloads work to the
  * for large periods (on the order of seconds), as it offloads work to the
  * decay sweep.
  * decay sweep.
  */
  */
@@ -77,7 +77,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     "faircallqueue.decay-scheduler.period-ms";
     "faircallqueue.decay-scheduler.period-ms";
 
 
   /**
   /**
-   * Decay factor controls how much each count is suppressed by on each sweep.
+   * Decay factor controls how much each cost is suppressed by on each sweep.
    * Valid numbers are &gt; 0 and &lt; 1. Decay factor works in tandem with
    * Valid numbers are &gt; 0 and &lt; 1. Decay factor works in tandem with
    * period
    * period
    * to control how long the scheduler remembers an identity.
    * to control how long the scheduler remembers an identity.
@@ -135,15 +135,15 @@ public class DecayRpcScheduler implements RpcScheduler,
   private static final ObjectWriter WRITER = new ObjectMapper().writer();
   private static final ObjectWriter WRITER = new ObjectMapper().writer();
 
 
   // Track the decayed and raw (no decay) number of calls for each schedulable
   // Track the decayed and raw (no decay) number of calls for each schedulable
-  // identity from all previous decay windows: idx 0 for decayed call count and
-  // idx 1 for the raw call count
-  private final ConcurrentHashMap<Object, List<AtomicLong>> callCounts =
+  // identity from all previous decay windows: idx 0 for decayed call cost and
+  // idx 1 for the raw call cost
+  private final ConcurrentHashMap<Object, List<AtomicLong>> callCosts =
       new ConcurrentHashMap<Object, List<AtomicLong>>();
       new ConcurrentHashMap<Object, List<AtomicLong>>();
 
 
-  // Should be the sum of all AtomicLongs in decayed callCounts
-  private final AtomicLong totalDecayedCallCount = new AtomicLong();
-  // The sum of all AtomicLongs in raw callCounts
-  private final AtomicLong totalRawCallCount = new AtomicLong();
+  // Should be the sum of all AtomicLongs in decayed callCosts
+  private final AtomicLong totalDecayedCallCost = new AtomicLong();
+  // The sum of all AtomicLongs in raw callCosts
+  private final AtomicLong totalRawCallCost = new AtomicLong();
 
 
 
 
   // Track total call count and response time in current decay window
   // Track total call count and response time in current decay window
@@ -161,7 +161,7 @@ public class DecayRpcScheduler implements RpcScheduler,
 
 
   // Tune the behavior of the scheduler
   // Tune the behavior of the scheduler
   private final long decayPeriodMillis; // How long between each tick
   private final long decayPeriodMillis; // How long between each tick
-  private final double decayFactor; // nextCount = currentCount * decayFactor
+  private final double decayFactor; // nextCost = currentCost * decayFactor
   private final int numLevels;
   private final int numLevels;
   private final double[] thresholds;
   private final double[] thresholds;
   private final IdentityProvider identityProvider;
   private final IdentityProvider identityProvider;
@@ -171,9 +171,10 @@ public class DecayRpcScheduler implements RpcScheduler,
   private final int topUsersCount; // e.g., report top 10 users' metrics
   private final int topUsersCount; // e.g., report top 10 users' metrics
   private static final double PRECISION = 0.0001;
   private static final double PRECISION = 0.0001;
   private MetricsProxy metricsProxy;
   private MetricsProxy metricsProxy;
+  private final CostProvider costProvider;
 
 
   /**
   /**
-   * This TimerTask will call decayCurrentCounts until
+   * This TimerTask will call decayCurrentCosts until
    * the scheduler has been garbage collected.
    * the scheduler has been garbage collected.
    */
    */
   public static class DecayTask extends TimerTask {
   public static class DecayTask extends TimerTask {
@@ -189,7 +190,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     public void run() {
     public void run() {
       DecayRpcScheduler sched = schedulerRef.get();
       DecayRpcScheduler sched = schedulerRef.get();
       if (sched != null) {
       if (sched != null) {
-        sched.decayCurrentCounts();
+        sched.decayCurrentCosts();
       } else {
       } else {
         // Our scheduler was garbage collected since it is no longer in use,
         // Our scheduler was garbage collected since it is no longer in use,
         // so we should terminate the timer as well
         // so we should terminate the timer as well
@@ -216,6 +217,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     this.decayFactor = parseDecayFactor(ns, conf);
     this.decayFactor = parseDecayFactor(ns, conf);
     this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
     this.decayPeriodMillis = parseDecayPeriodMillis(ns, conf);
     this.identityProvider = this.parseIdentityProvider(ns, conf);
     this.identityProvider = this.parseIdentityProvider(ns, conf);
+    this.costProvider = this.parseCostProvider(ns, conf);
     this.thresholds = parseThresholds(ns, conf, numLevels);
     this.thresholds = parseThresholds(ns, conf, numLevels);
     this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
     this.backOffByResponseTimeEnabled = parseBackOffByResponseTimeEnabled(ns,
         conf);
         conf);
@@ -243,6 +245,24 @@ public class DecayRpcScheduler implements RpcScheduler,
     recomputeScheduleCache();
     recomputeScheduleCache();
   }
   }
 
 
+  private CostProvider parseCostProvider(String ns, Configuration conf) {
+    List<CostProvider> providers = conf.getInstances(
+        ns + "." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
+        CostProvider.class);
+
+    if (providers.size() < 1) {
+      LOG.info("CostProvider not specified, defaulting to DefaultCostProvider");
+      return new DefaultCostProvider();
+    } else if (providers.size() > 1) {
+      LOG.warn("Found multiple CostProviders; using: {}",
+          providers.get(0).getClass());
+    }
+
+    CostProvider provider = providers.get(0); // use the first
+    provider.init(ns, conf);
+    return provider;
+  }
+
   // Load configs
   // Load configs
   private IdentityProvider parseIdentityProvider(String ns,
   private IdentityProvider parseIdentityProvider(String ns,
       Configuration conf) {
       Configuration conf) {
@@ -389,69 +409,69 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
   }
 
 
   /**
   /**
-   * Decay the stored counts for each user and clean as necessary.
+   * Decay the stored costs for each user and clean as necessary.
    * This method should be called periodically in order to keep
    * This method should be called periodically in order to keep
-   * counts current.
+   * costs current.
    */
    */
-  private void decayCurrentCounts() {
-    LOG.debug("Start to decay current counts.");
+  private void decayCurrentCosts() {
+    LOG.debug("Start to decay current costs.");
     try {
     try {
-      long totalDecayedCount = 0;
-      long totalRawCount = 0;
+      long totalDecayedCost = 0;
+      long totalRawCost = 0;
       Iterator<Map.Entry<Object, List<AtomicLong>>> it =
       Iterator<Map.Entry<Object, List<AtomicLong>>> it =
-          callCounts.entrySet().iterator();
+          callCosts.entrySet().iterator();
 
 
       while (it.hasNext()) {
       while (it.hasNext()) {
         Map.Entry<Object, List<AtomicLong>> entry = it.next();
         Map.Entry<Object, List<AtomicLong>> entry = it.next();
-        AtomicLong decayedCount = entry.getValue().get(0);
-        AtomicLong rawCount = entry.getValue().get(1);
+        AtomicLong decayedCost = entry.getValue().get(0);
+        AtomicLong rawCost = entry.getValue().get(1);
 
 
 
 
         // Compute the next value by reducing it by the decayFactor
         // Compute the next value by reducing it by the decayFactor
-        totalRawCount += rawCount.get();
-        long currentValue = decayedCount.get();
+        totalRawCost += rawCost.get();
+        long currentValue = decayedCost.get();
         long nextValue = (long) (currentValue * decayFactor);
         long nextValue = (long) (currentValue * decayFactor);
-        totalDecayedCount += nextValue;
-        decayedCount.set(nextValue);
+        totalDecayedCost += nextValue;
+        decayedCost.set(nextValue);
 
 
-        LOG.debug("Decaying counts for the user: {}, " +
-            "its decayedCount: {}, rawCount: {}", entry.getKey(),
-            nextValue, rawCount.get());
+        LOG.debug(
+            "Decaying costs for the user: {}, its decayedCost: {}, rawCost: {}",
+            entry.getKey(), nextValue, rawCost.get());
         if (nextValue == 0) {
         if (nextValue == 0) {
-          LOG.debug("The decayed count for the user {} is zero " +
+          LOG.debug("The decayed cost for the user {} is zero " +
               "and being cleaned.", entry.getKey());
               "and being cleaned.", entry.getKey());
           // We will clean up unused keys here. An interesting optimization
           // We will clean up unused keys here. An interesting optimization
-          // might be to have an upper bound on keyspace in callCounts and only
+          // might be to have an upper bound on keyspace in callCosts and only
           // clean once we pass it.
           // clean once we pass it.
           it.remove();
           it.remove();
         }
         }
       }
       }
 
 
       // Update the total so that we remain in sync
       // Update the total so that we remain in sync
-      totalDecayedCallCount.set(totalDecayedCount);
-      totalRawCallCount.set(totalRawCount);
+      totalDecayedCallCost.set(totalDecayedCost);
+      totalRawCallCost.set(totalRawCost);
 
 
-      LOG.debug("After decaying the stored counts, totalDecayedCount: {}, " +
-          "totalRawCallCount: {}.", totalDecayedCount, totalRawCount);
+      LOG.debug("After decaying the stored costs, totalDecayedCost: {}, " +
+          "totalRawCallCost: {}.", totalDecayedCost, totalRawCost);
       // Now refresh the cache of scheduling decisions
       // Now refresh the cache of scheduling decisions
       recomputeScheduleCache();
       recomputeScheduleCache();
 
 
       // Update average response time with decay
       // Update average response time with decay
       updateAverageResponseTime(true);
       updateAverageResponseTime(true);
     } catch (Exception ex) {
     } catch (Exception ex) {
-      LOG.error("decayCurrentCounts exception: " +
+      LOG.error("decayCurrentCosts exception: " +
           ExceptionUtils.getStackTrace(ex));
           ExceptionUtils.getStackTrace(ex));
       throw ex;
       throw ex;
     }
     }
   }
   }
 
 
   /**
   /**
-   * Update the scheduleCache to match current conditions in callCounts.
+   * Update the scheduleCache to match current conditions in callCosts.
    */
    */
   private void recomputeScheduleCache() {
   private void recomputeScheduleCache() {
     Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
     Map<Object, Integer> nextCache = new HashMap<Object, Integer>();
 
 
-    for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+    for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
       Object id = entry.getKey();
       Object id = entry.getKey();
       AtomicLong value = entry.getValue().get(0);
       AtomicLong value = entry.getValue().get(0);
 
 
@@ -466,51 +486,52 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
   }
 
 
   /**
   /**
-   * Get the number of occurrences and increment atomically.
-   * @param identity the identity of the user to increment
-   * @return the value before incrementation
+   * Adjust the stored cost for a given identity.
+   *
+   * @param identity the identity of the user whose cost should be adjusted
+   * @param costDelta the cost to add for the given identity
    */
    */
-  private long getAndIncrementCallCounts(Object identity)
-      throws InterruptedException {
-    // We will increment the count, or create it if no such count exists
-    List<AtomicLong> count = this.callCounts.get(identity);
-    if (count == null) {
-      // Create the counts since no such count exists.
-      // idx 0 for decayed call count
-      // idx 1 for the raw call count
-      count = new ArrayList<AtomicLong>(2);
-      count.add(new AtomicLong(0));
-      count.add(new AtomicLong(0));
+  private void addCost(Object identity, long costDelta) {
+    // We will increment the cost, or create it if no such cost exists
+    List<AtomicLong> cost = this.callCosts.get(identity);
+    if (cost == null) {
+      // Create the costs since no such cost exists.
+      // idx 0 for decayed call cost
+      // idx 1 for the raw call cost
+      cost = new ArrayList<AtomicLong>(2);
+      cost.add(new AtomicLong(0));
+      cost.add(new AtomicLong(0));
 
 
       // Put it in, or get the AtomicInteger that was put in by another thread
       // Put it in, or get the AtomicInteger that was put in by another thread
-      List<AtomicLong> otherCount = callCounts.putIfAbsent(identity, count);
-      if (otherCount != null) {
-        count = otherCount;
+      List<AtomicLong> otherCost = callCosts.putIfAbsent(identity, cost);
+      if (otherCost != null) {
+        cost = otherCost;
       }
       }
     }
     }
 
 
     // Update the total
     // Update the total
-    totalDecayedCallCount.getAndIncrement();
-    totalRawCallCount.getAndIncrement();
+    totalDecayedCallCost.getAndAdd(costDelta);
+    totalRawCallCost.getAndAdd(costDelta);
 
 
     // At this point value is guaranteed to be not null. It may however have
     // At this point value is guaranteed to be not null. It may however have
-    // been clobbered from callCounts. Nonetheless, we return what
+    // been clobbered from callCosts. Nonetheless, we return what
     // we have.
     // we have.
-    count.get(1).getAndIncrement();
-    return count.get(0).getAndIncrement();
+    cost.get(1).getAndAdd(costDelta);
+    cost.get(0).getAndAdd(costDelta);
   }
   }
 
 
   /**
   /**
-   * Given the number of occurrences, compute a scheduling decision.
-   * @param occurrences how many occurrences
+   * Given the cost for an identity, compute a scheduling decision.
+   *
+   * @param cost the cost for an identity
    * @return scheduling decision from 0 to numLevels - 1
    * @return scheduling decision from 0 to numLevels - 1
    */
    */
-  private int computePriorityLevel(long occurrences) {
-    long totalCallSnapshot = totalDecayedCallCount.get();
+  private int computePriorityLevel(long cost) {
+    long totalCallSnapshot = totalDecayedCallCost.get();
 
 
     double proportion = 0;
     double proportion = 0;
     if (totalCallSnapshot > 0) {
     if (totalCallSnapshot > 0) {
-      proportion = (double) occurrences / totalCallSnapshot;
+      proportion = (double) cost / totalCallSnapshot;
     }
     }
 
 
     // Start with low priority levels, since they will be most common
     // Start with low priority levels, since they will be most common
@@ -531,31 +552,23 @@ public class DecayRpcScheduler implements RpcScheduler,
    * @return integer scheduling decision from 0 to numLevels - 1
    * @return integer scheduling decision from 0 to numLevels - 1
    */
    */
   private int cachedOrComputedPriorityLevel(Object identity) {
   private int cachedOrComputedPriorityLevel(Object identity) {
-    try {
-      long occurrences = this.getAndIncrementCallCounts(identity);
-
-      // Try the cache
-      Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
-      if (scheduleCache != null) {
-        Integer priority = scheduleCache.get(identity);
-        if (priority != null) {
-          LOG.debug("Cache priority for: {} with priority: {}", identity,
-              priority);
-          return priority;
-        }
+    // Try the cache
+    Map<Object, Integer> scheduleCache = scheduleCacheRef.get();
+    if (scheduleCache != null) {
+      Integer priority = scheduleCache.get(identity);
+      if (priority != null) {
+        LOG.debug("Cache priority for: {} with priority: {}", identity,
+            priority);
+        return priority;
       }
       }
-
-      // Cache was no good, compute it
-      int priority = computePriorityLevel(occurrences);
-      LOG.debug("compute priority for " + identity + " priority " + priority);
-      return priority;
-
-    } catch (InterruptedException ie) {
-      LOG.warn("Caught InterruptedException, returning low priority level");
-      LOG.debug("Fallback priority for: {} with priority: {}", identity,
-          numLevels - 1);
-      return numLevels - 1;
     }
     }
+
+    // Cache was no good, compute it
+    List<AtomicLong> costList = callCosts.get(identity);
+    long currentCost = costList == null ? 0 : costList.get(0).get();
+    int priority = computePriorityLevel(currentCost);
+    LOG.debug("compute priority for {} priority {}", identity, priority);
+    return priority;
   }
   }
 
 
   /**
   /**
@@ -605,6 +618,10 @@ public class DecayRpcScheduler implements RpcScheduler,
   @Override
   @Override
   public void addResponseTime(String callName, Schedulable schedulable,
   public void addResponseTime(String callName, Schedulable schedulable,
       ProcessingDetails details) {
       ProcessingDetails details) {
+    String user = identityProvider.makeIdentity(schedulable);
+    long processingCost = costProvider.getCost(details);
+    addCost(user, processingCost);
+
     int priorityLevel = schedulable.getPriorityLevel();
     int priorityLevel = schedulable.getPriorityLevel();
     long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
     long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS);
     long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
     long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS);
@@ -652,22 +669,30 @@ public class DecayRpcScheduler implements RpcScheduler,
 
 
   // For testing
   // For testing
   @VisibleForTesting
   @VisibleForTesting
-  public double getDecayFactor() { return decayFactor; }
+  double getDecayFactor() {
+    return decayFactor;
+  }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  public long getDecayPeriodMillis() { return decayPeriodMillis; }
+  long getDecayPeriodMillis() {
+    return decayPeriodMillis;
+  }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  public double[] getThresholds() { return thresholds; }
+  double[] getThresholds() {
+    return thresholds;
+  }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  public void forceDecay() { decayCurrentCounts(); }
+  void forceDecay() {
+    decayCurrentCosts();
+  }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  public Map<Object, Long> getCallCountSnapshot() {
+  Map<Object, Long> getCallCostSnapshot() {
     HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
     HashMap<Object, Long> snapshot = new HashMap<Object, Long>();
 
 
-    for (Map.Entry<Object, List<AtomicLong>> entry : callCounts.entrySet()) {
+    for (Map.Entry<Object, List<AtomicLong>> entry : callCosts.entrySet()) {
       snapshot.put(entry.getKey(), entry.getValue().get(0).get());
       snapshot.put(entry.getKey(), entry.getValue().get(0).get());
     }
     }
 
 
@@ -675,8 +700,8 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
   }
 
 
   @VisibleForTesting
   @VisibleForTesting
-  public long getTotalCallSnapshot() {
-    return totalDecayedCallCount.get();
+  long getTotalCallSnapshot() {
+    return totalDecayedCallCost.get();
   }
   }
 
 
   /**
   /**
@@ -809,15 +834,15 @@ public class DecayRpcScheduler implements RpcScheduler,
   }
   }
 
 
   public int getUniqueIdentityCount() {
   public int getUniqueIdentityCount() {
-    return callCounts.size();
+    return callCosts.size();
   }
   }
 
 
   public long getTotalCallVolume() {
   public long getTotalCallVolume() {
-    return totalDecayedCallCount.get();
+    return totalDecayedCallCost.get();
   }
   }
 
 
   public long getTotalRawCallVolume() {
   public long getTotalRawCallVolume() {
-    return totalRawCallCount.get();
+    return totalRawCallCost.get();
   }
   }
 
 
   public long[] getResponseTimeCountInLastWindow() {
   public long[] getResponseTimeCountInLastWindow() {
@@ -910,17 +935,17 @@ public class DecayRpcScheduler implements RpcScheduler,
     }
     }
   }
   }
 
 
-  // Get the top N callers' raw call count and scheduler decision
+  // Get the top N callers' raw call cost and scheduler decision
   private TopN getTopCallers(int n) {
   private TopN getTopCallers(int n) {
     TopN topNCallers = new TopN(n);
     TopN topNCallers = new TopN(n);
     Iterator<Map.Entry<Object, List<AtomicLong>>> it =
     Iterator<Map.Entry<Object, List<AtomicLong>>> it =
-        callCounts.entrySet().iterator();
+        callCosts.entrySet().iterator();
     while (it.hasNext()) {
     while (it.hasNext()) {
       Map.Entry<Object, List<AtomicLong>> entry = it.next();
       Map.Entry<Object, List<AtomicLong>> entry = it.next();
       String caller = entry.getKey().toString();
       String caller = entry.getKey().toString();
-      Long count = entry.getValue().get(1).get();
-      if (count > 0) {
-        topNCallers.offer(new NameValuePair(caller, count));
+      Long cost = entry.getValue().get(1).get();
+      if (cost > 0) {
+        topNCallers.offer(new NameValuePair(caller, cost));
       }
       }
     }
     }
     return topNCallers;
     return topNCallers;
@@ -941,25 +966,25 @@ public class DecayRpcScheduler implements RpcScheduler,
 
 
   public String getCallVolumeSummary() {
   public String getCallVolumeSummary() {
     try {
     try {
-      return WRITER.writeValueAsString(getDecayedCallCounts());
+      return WRITER.writeValueAsString(getDecayedCallCosts());
     } catch (Exception e) {
     } catch (Exception e) {
       return "Error: " + e.getMessage();
       return "Error: " + e.getMessage();
     }
     }
   }
   }
 
 
-  private Map<Object, Long> getDecayedCallCounts() {
-    Map<Object, Long> decayedCallCounts = new HashMap<>(callCounts.size());
+  private Map<Object, Long> getDecayedCallCosts() {
+    Map<Object, Long> decayedCallCosts = new HashMap<>(callCosts.size());
     Iterator<Map.Entry<Object, List<AtomicLong>>> it =
     Iterator<Map.Entry<Object, List<AtomicLong>>> it =
-        callCounts.entrySet().iterator();
+        callCosts.entrySet().iterator();
     while (it.hasNext()) {
     while (it.hasNext()) {
       Map.Entry<Object, List<AtomicLong>> entry = it.next();
       Map.Entry<Object, List<AtomicLong>> entry = it.next();
       Object user = entry.getKey();
       Object user = entry.getKey();
-      Long decayedCount = entry.getValue().get(0).get();
-      if (decayedCount > 0) {
-        decayedCallCounts.put(user, decayedCount);
+      Long decayedCost = entry.getValue().get(0).get();
+      if (decayedCost > 0) {
+        decayedCallCosts.put(user, decayedCost);
       }
       }
     }
     }
-    return decayedCallCounts;
+    return decayedCallCosts;
   }
   }
 
 
   @Override
   @Override

+ 43 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DefaultCostProvider.java

@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Ignores process details and returns a constant value for each call.
+ */
+public class DefaultCostProvider implements CostProvider {
+
+  @Override
+  public void init(String namespace, Configuration conf) {
+    // No-op
+  }
+
+  /**
+   * Returns 1, regardless of the processing details.
+   *
+   * @param details Process details (ignored)
+   * @return 1
+   */
+  @Override
+  public long getCost(ProcessingDetails details) {
+    return 1;
+  }
+}

+ 110 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedTimeCostProvider.java

@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.Locale;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.ipc.ProcessingDetails.Timing;
+
+/**
+ * A {@link CostProvider} that calculates the cost for an operation
+ * as a weighted sum of its processing time values (see
+ * {@link ProcessingDetails}). This can be used by specifying the
+ * {@link org.apache.hadoop.fs.CommonConfigurationKeys#IPC_COST_PROVIDER_KEY}
+ * configuration key.
+ *
+ * <p/>This allows for configuration of how heavily each of the operations
+ * within {@link ProcessingDetails} is weighted. By default,
+ * {@link ProcessingDetails.Timing#LOCKFREE},
+ * {@link ProcessingDetails.Timing#RESPONSE}, and
+ * {@link ProcessingDetails.Timing#HANDLER} times have a weight of
+ * {@value #DEFAULT_LOCKFREE_WEIGHT},
+ * {@link ProcessingDetails.Timing#LOCKSHARED} has a weight of
+ * {@value #DEFAULT_LOCKSHARED_WEIGHT},
+ * {@link ProcessingDetails.Timing#LOCKEXCLUSIVE} has a weight of
+ * {@value #DEFAULT_LOCKEXCLUSIVE_WEIGHT}, and others are ignored.
+ * These values can all be configured using the {@link #WEIGHT_CONFIG_PREFIX}
+ * key, prefixed with the IPC namespace, and suffixed with the name of the
+ * timing measurement from {@link ProcessingDetails} (all lowercase).
+ * For example, to set the lock exclusive weight to be 1000, set:
+ * <pre>
+ *   ipc.8020.cost-provider.impl=org.apache.hadoop.ipc.WeightedTimeCostProvider
+ *   ipc.8020.weighted-cost.lockexclusive=1000
+ * </pre>
+ */
+public class WeightedTimeCostProvider implements CostProvider {
+
+  /**
+   * The prefix used in configuration values specifying the weight to use when
+   * determining the cost of an operation. See the class Javadoc for more info.
+   */
+  public static final String WEIGHT_CONFIG_PREFIX = ".weighted-cost.";
+  static final int DEFAULT_LOCKFREE_WEIGHT = 1;
+  static final int DEFAULT_LOCKSHARED_WEIGHT = 10;
+  static final int DEFAULT_LOCKEXCLUSIVE_WEIGHT = 100;
+
+  private long[] weights;
+
+  @Override
+  public void init(String namespace, Configuration conf) {
+    weights = new long[Timing.values().length];
+    for (Timing timing : ProcessingDetails.Timing.values()) {
+      final int defaultValue;
+      switch (timing) {
+      case LOCKFREE:
+      case RESPONSE:
+      case HANDLER:
+        defaultValue = DEFAULT_LOCKFREE_WEIGHT;
+        break;
+      case LOCKSHARED:
+        defaultValue = DEFAULT_LOCKSHARED_WEIGHT;
+        break;
+      case LOCKEXCLUSIVE:
+        defaultValue = DEFAULT_LOCKEXCLUSIVE_WEIGHT;
+        break;
+      default:
+        // by default don't bill for queueing or lock wait time
+        defaultValue = 0;
+      }
+      String key = namespace + WEIGHT_CONFIG_PREFIX
+          + timing.name().toLowerCase(Locale.ENGLISH);
+      weights[timing.ordinal()] = conf.getInt(key, defaultValue);
+    }
+  }
+
+  /**
+   * Calculates a weighted sum of the times stored on the provided processing
+   * details to be used as the cost in {@link DecayRpcScheduler}.
+   *
+   * @param details Processing details
+   * @return The weighted sum of the times. The returned unit is the same
+   *         as the default unit used by the provided processing details.
+   */
+  @Override
+  public long getCost(ProcessingDetails details) {
+    assert weights != null : "Cost provider must be initialized before use";
+    long cost = 0;
+    // weights was initialized to the same length as Timing.values()
+    for (int i = 0; i < Timing.values().length; i++) {
+      cost += details.get(Timing.values()[i]) * weights[i];
+    }
+    return cost;
+  }
+}

+ 19 - 0
hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md

@@ -91,6 +91,21 @@ This is configurable via the **identity provider**, which defaults to the **User
 provider simply uses the username of the client submitting the request. However, a custom identity provider can be used
 provider simply uses the username of the client submitting the request. However, a custom identity provider can be used
 to performing throttling based on other groupings, or using an external identity provider.
 to performing throttling based on other groupings, or using an external identity provider.
 
 
+### Cost-based Fair Call Queue
+
+Though the fair call queue itself does a good job of mitigating the impact from users who submit a very high _number_
+of requests, it does not take account into how expensive each request is to process. Thus, when considering the
+HDFS NameNode, a user who submits 1000 "getFileInfo" requests would be prioritized the same as a user who submits 1000
+"listStatus" requests on some very large directory, or a user who submits 1000 "mkdir" requests, which are more
+expensive as they require an exclusive lock on the namesystem. To account for the _cost_ of an operation when
+considering the prioritization of user requests, there is a "cost-based" extension to the Fair Call Queue which uses
+the aggregate processing time of a user's operations to determine how that user should be prioritized. By default,
+queue time (time spent waiting to be processed) and lock wait time (time spent waiting to acquire a lock) is not
+considered in the cost, time spent processing without a lock is neutrally (1x) weighted, time spent processing with a
+shared lock is weighted 10x higher, and time spent processing with an exclusive lock is weighted 100x higher.
+This attempts to prioritize users based on the actual load they place on the server. To enable this feature, set the
+`costprovder.impl` configuration to `org.apache.hadoop.ipc.WeightedTimeCostProvider` as described below.
+
 Configuration
 Configuration
 -------------
 -------------
 
 
@@ -115,12 +130,16 @@ omitted.
 | scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 |
 | scheduler.priority.levels | RpcScheduler, CallQueue | How many priority levels to use within the scheduler and call queue. | 4 |
 | faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) |
 | faircallqueue.multiplexer.weights | WeightedRoundRobinMultiplexer | How much weight to give to each priority queue. This should be a comma-separated list of length equal to the number of priority levels. | Weights descend by a factor of 2 (e.g., for 4 levels: `8,4,2,1`) |
 | identity-provider.impl | DecayRpcScheduler | The identity provider mapping user requests to their identity. | org.apache.hadoop.ipc.UserIdentityProvider |
 | identity-provider.impl | DecayRpcScheduler | The identity provider mapping user requests to their identity. | org.apache.hadoop.ipc.UserIdentityProvider |
+| cost-provider.impl | DecayRpcScheduler | The cost provider mapping user requests to their cost. To enable determination of cost based on processing time, use `org.apache.hadoop.ipc.WeightedTimeCostProvider`. | org.apache.hadoop.ipc.DefaultCostProvider |
 | decay-scheduler.period-ms | DecayRpcScheduler | How frequently the decay factor should be applied to the operation counts of users. Higher values have less overhead, but respond less quickly to changes in client behavior. | 5000 |
 | decay-scheduler.period-ms | DecayRpcScheduler | How frequently the decay factor should be applied to the operation counts of users. Higher values have less overhead, but respond less quickly to changes in client behavior. | 5000 |
 | decay-scheduler.decay-factor | DecayRpcScheduler | When decaying the operation counts of users, the multiplicative decay factor to apply. Higher values will weight older operations more strongly, essentially giving the scheduler a longer memory, and penalizing heavy clients for a longer period of time. | 0.5 |
 | decay-scheduler.decay-factor | DecayRpcScheduler | When decaying the operation counts of users, the multiplicative decay factor to apply. Higher values will weight older operations more strongly, essentially giving the scheduler a longer memory, and penalizing heavy clients for a longer period of time. | 0.5 |
 | decay-scheduler.thresholds | DecayRpcScheduler | The client load threshold, as an integer percentage, for each priority queue. Clients producing less load, as a percent of total operations, than specified at position _i_ will be given priority _i_. This should be a comma-separated list of length equal to the number of priority levels minus 1 (the last is implicitly 100). | Thresholds ascend by a factor of 2 (e.g., for 4 levels: `13,25,50`) |
 | decay-scheduler.thresholds | DecayRpcScheduler | The client load threshold, as an integer percentage, for each priority queue. Clients producing less load, as a percent of total operations, than specified at position _i_ will be given priority _i_. This should be a comma-separated list of length equal to the number of priority levels minus 1 (the last is implicitly 100). | Thresholds ascend by a factor of 2 (e.g., for 4 levels: `13,25,50`) |
 | decay-scheduler.backoff.responsetime.enable | DecayRpcScheduler | Whether or not to enable the backoff by response time feature. | false |
 | decay-scheduler.backoff.responsetime.enable | DecayRpcScheduler | Whether or not to enable the backoff by response time feature. | false |
 | decay-scheduler.backoff.responsetime.thresholds | DecayRpcScheduler | The response time thresholds, as time durations, for each priority queue. If the average response time for a queue is above this threshold, backoff will occur in lower priority queues. This should be a comma-separated list of length equal to the number of priority levels. | Threshold increases by 10s per level (e.g., for 4 levels: `10s,20s,30s,40s`) |
 | decay-scheduler.backoff.responsetime.thresholds | DecayRpcScheduler | The response time thresholds, as time durations, for each priority queue. If the average response time for a queue is above this threshold, backoff will occur in lower priority queues. This should be a comma-separated list of length equal to the number of priority levels. | Threshold increases by 10s per level (e.g., for 4 levels: `10s,20s,30s,40s`) |
 | decay-scheduler.metrics.top.user.count | DecayRpcScheduler | The number of top (i.e., heaviest) users to emit metric information about. | 10 |
 | decay-scheduler.metrics.top.user.count | DecayRpcScheduler | The number of top (i.e., heaviest) users to emit metric information about. | 10 |
+| weighted-cost.lockshared | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds a shared (read) lock. | 10 |
+| weighted-cost.lockexclusive | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds an exclusive (write) lock. | 100 |
+| weighted-cost.{handler,lockfree,response} | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phases which do not involve holding a lock. See `org.apache.hadoop.ipc.ProcessingDetails.Timing` for more details on each phase. | 1 |
 
 
 ### Example Configuration
 ### Example Configuration
 
 

+ 140 - 34
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java

@@ -26,6 +26,7 @@ import static org.junit.Assert.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.when;
 
 
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -36,6 +37,7 @@ import javax.management.ObjectName;
 import java.io.ByteArrayOutputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.io.PrintStream;
 import java.lang.management.ManagementFactory;
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.TimeUnit;
 
 
 public class TestDecayRpcScheduler {
 public class TestDecayRpcScheduler {
   private Schedulable mockCall(String id) {
   private Schedulable mockCall(String id) {
@@ -131,67 +133,69 @@ public class TestDecayRpcScheduler {
     conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
     conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "99999999"); // Never flush
     scheduler = new DecayRpcScheduler(1, "ns", conf);
     scheduler = new DecayRpcScheduler(1, "ns", conf);
 
 
-    assertEquals(0, scheduler.getCallCountSnapshot().size()); // empty first
+    assertEquals(0, scheduler.getCallCostSnapshot().size()); // empty first
 
 
-    scheduler.getPriorityLevel(mockCall("A"));
-    assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
+    getPriorityIncrementCallCount("A");
+    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
 
 
-    scheduler.getPriorityLevel(mockCall("A"));
-    scheduler.getPriorityLevel(mockCall("B"));
-    scheduler.getPriorityLevel(mockCall("A"));
+    getPriorityIncrementCallCount("A");
+    getPriorityIncrementCallCount("B");
+    getPriorityIncrementCallCount("A");
 
 
-    assertEquals(3, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(3, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());
   }
   }
 
 
   @Test
   @Test
   @SuppressWarnings("deprecation")
   @SuppressWarnings("deprecation")
   public void testDecay() throws Exception {
   public void testDecay() throws Exception {
     Configuration conf = new Configuration();
     Configuration conf = new Configuration();
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_PERIOD_KEY, "999999999"); // Never
-    conf.set("ns." + DecayRpcScheduler.IPC_FCQ_DECAYSCHEDULER_FACTOR_KEY, "0.5");
+    conf.setLong("ns." // Never decay
+        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999999);
+    conf.setDouble("ns."
+        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_FACTOR_KEY, 0.5);
     scheduler = new DecayRpcScheduler(1, "ns", conf);
     scheduler = new DecayRpcScheduler(1, "ns", conf);
 
 
     assertEquals(0, scheduler.getTotalCallSnapshot());
     assertEquals(0, scheduler.getTotalCallSnapshot());
 
 
     for (int i = 0; i < 4; i++) {
     for (int i = 0; i < 4; i++) {
-      scheduler.getPriorityLevel(mockCall("A"));
+      getPriorityIncrementCallCount("A");
     }
     }
 
 
     sleep(1000);
     sleep(1000);
 
 
     for (int i = 0; i < 8; i++) {
     for (int i = 0; i < 8; i++) {
-      scheduler.getPriorityLevel(mockCall("B"));
+      getPriorityIncrementCallCount("B");
     }
     }
 
 
     assertEquals(12, scheduler.getTotalCallSnapshot());
     assertEquals(12, scheduler.getTotalCallSnapshot());
-    assertEquals(4, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(8, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(4, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(8, scheduler.getCallCostSnapshot().get("B").longValue());
 
 
     scheduler.forceDecay();
     scheduler.forceDecay();
 
 
     assertEquals(6, scheduler.getTotalCallSnapshot());
     assertEquals(6, scheduler.getTotalCallSnapshot());
-    assertEquals(2, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(4, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(2, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(4, scheduler.getCallCostSnapshot().get("B").longValue());
 
 
     scheduler.forceDecay();
     scheduler.forceDecay();
 
 
     assertEquals(3, scheduler.getTotalCallSnapshot());
     assertEquals(3, scheduler.getTotalCallSnapshot());
-    assertEquals(1, scheduler.getCallCountSnapshot().get("A").longValue());
-    assertEquals(2, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(1, scheduler.getCallCostSnapshot().get("A").longValue());
+    assertEquals(2, scheduler.getCallCostSnapshot().get("B").longValue());
 
 
     scheduler.forceDecay();
     scheduler.forceDecay();
 
 
     assertEquals(1, scheduler.getTotalCallSnapshot());
     assertEquals(1, scheduler.getTotalCallSnapshot());
-    assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
-    assertEquals(1, scheduler.getCallCountSnapshot().get("B").longValue());
+    assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
+    assertEquals(1, scheduler.getCallCostSnapshot().get("B").longValue());
 
 
     scheduler.forceDecay();
     scheduler.forceDecay();
 
 
     assertEquals(0, scheduler.getTotalCallSnapshot());
     assertEquals(0, scheduler.getTotalCallSnapshot());
-    assertEquals(null, scheduler.getCallCountSnapshot().get("A"));
-    assertEquals(null, scheduler.getCallCountSnapshot().get("B"));
+    assertEquals(null, scheduler.getCallCostSnapshot().get("A"));
+    assertEquals(null, scheduler.getCallCostSnapshot().get("B"));
   }
   }
 
 
   @Test
   @Test
@@ -205,16 +209,16 @@ public class TestDecayRpcScheduler {
         .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
         .IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY, "25, 50, 75");
     scheduler = new DecayRpcScheduler(4, namespace, conf);
     scheduler = new DecayRpcScheduler(4, namespace, conf);
 
 
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("B")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("B")));
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
-    assertEquals(0, scheduler.getPriorityLevel(mockCall("C")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(1, scheduler.getPriorityLevel(mockCall("A")));
-    assertEquals(2, scheduler.getPriorityLevel(mockCall("A")));
+    assertEquals(0, getPriorityIncrementCallCount("A")); // 0 out of 0 calls
+    assertEquals(3, getPriorityIncrementCallCount("A")); // 1 out of 1 calls
+    assertEquals(0, getPriorityIncrementCallCount("B")); // 0 out of 2 calls
+    assertEquals(1, getPriorityIncrementCallCount("B")); // 1 out of 3 calls
+    assertEquals(0, getPriorityIncrementCallCount("C")); // 0 out of 4 calls
+    assertEquals(0, getPriorityIncrementCallCount("C")); // 1 out of 5 calls
+    assertEquals(1, getPriorityIncrementCallCount("A")); // 2 out of 6 calls
+    assertEquals(1, getPriorityIncrementCallCount("A")); // 3 out of 7 calls
+    assertEquals(2, getPriorityIncrementCallCount("A")); // 4 out of 8 calls
+    assertEquals(2, getPriorityIncrementCallCount("A")); // 5 out of 9 calls
 
 
     MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
     MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
     ObjectName mxbeanName = new ObjectName(
     ObjectName mxbeanName = new ObjectName(
@@ -243,7 +247,7 @@ public class TestDecayRpcScheduler {
     assertEquals(0, scheduler.getTotalCallSnapshot());
     assertEquals(0, scheduler.getTotalCallSnapshot());
 
 
     for (int i = 0; i < 64; i++) {
     for (int i = 0; i < 64; i++) {
-      scheduler.getPriorityLevel(mockCall("A"));
+      getPriorityIncrementCallCount("A");
     }
     }
 
 
     // It should eventually decay to zero
     // It should eventually decay to zero
@@ -272,6 +276,108 @@ public class TestDecayRpcScheduler {
       //set systout back
       //set systout back
       System.setOut(output);
       System.setOut(output);
     }
     }
+  }
+
+  @Test
+  public void testUsingWeightedTimeCostProvider() {
+    scheduler = getSchedulerWithWeightedTimeCostProvider(3);
+
+    // 3 details in increasing order of cost. Although medium has a longer
+    // duration, the shared lock is weighted less than the exclusive lock
+    ProcessingDetails callDetailsLow =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    callDetailsLow.set(ProcessingDetails.Timing.LOCKFREE, 1);
+    ProcessingDetails callDetailsMedium =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    callDetailsMedium.set(ProcessingDetails.Timing.LOCKSHARED, 500);
+    ProcessingDetails callDetailsHigh =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    callDetailsHigh.set(ProcessingDetails.Timing.LOCKEXCLUSIVE, 100);
+
+    for (int i = 0; i < 10; i++) {
+      scheduler.addResponseTime("ignored", mockCall("LOW"), callDetailsLow);
+    }
+    scheduler.addResponseTime("ignored", mockCall("MED"), callDetailsMedium);
+    scheduler.addResponseTime("ignored", mockCall("HIGH"), callDetailsHigh);
+
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+    assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
+    assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));
+
+    assertEquals(3, scheduler.getUniqueIdentityCount());
+    long totalCallInitial = scheduler.getTotalRawCallVolume();
+    assertEquals(totalCallInitial, scheduler.getTotalCallVolume());
+
+    scheduler.forceDecay();
+
+    // Relative priorities should stay the same after a single decay
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+    assertEquals(1, scheduler.getPriorityLevel(mockCall("MED")));
+    assertEquals(2, scheduler.getPriorityLevel(mockCall("HIGH")));
+
+    assertEquals(3, scheduler.getUniqueIdentityCount());
+    assertEquals(totalCallInitial, scheduler.getTotalRawCallVolume());
+    assertTrue(scheduler.getTotalCallVolume() < totalCallInitial);
+
+    for (int i = 0; i < 100; i++) {
+      scheduler.forceDecay();
+    }
+    // After enough decay cycles, all callers should be high priority again
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("LOW")));
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("MED")));
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("HIGH")));
+  }
+
+  @Test
+  public void testUsingWeightedTimeCostProviderWithZeroCostCalls() {
+    scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+
+    ProcessingDetails emptyDetails =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+
+    for (int i = 0; i < 1000; i++) {
+      scheduler.addResponseTime("ignored", mockCall("MANY"), emptyDetails);
+    }
+    scheduler.addResponseTime("ignored", mockCall("FEW"), emptyDetails);
+
+    // Since the calls are all "free", they should have the same priority
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("MANY")));
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("FEW")));
+  }
+
+  @Test
+  public void testUsingWeightedTimeCostProviderNoRequests() {
+    scheduler = getSchedulerWithWeightedTimeCostProvider(2);
+
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("A")));
+  }
+
+  /**
+   * Get a scheduler that uses {@link WeightedTimeCostProvider} and has
+   * normal decaying disabled.
+   */
+  private static DecayRpcScheduler getSchedulerWithWeightedTimeCostProvider(
+      int priorityLevels) {
+    Configuration conf = new Configuration();
+    conf.setClass("ns." + CommonConfigurationKeys.IPC_COST_PROVIDER_KEY,
+        WeightedTimeCostProvider.class, CostProvider.class);
+    conf.setLong("ns."
+        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
+    return new DecayRpcScheduler(priorityLevels, "ns", conf);
+  }
 
 
+  /**
+   * Get the priority and increment the call count, assuming that
+   * {@link DefaultCostProvider} is in use.
+   */
+  private int getPriorityIncrementCallCount(String callId) {
+    Schedulable mockCall = mockCall(callId);
+    int priority = scheduler.getPriorityLevel(mockCall);
+    // The DefaultCostProvider uses a cost of 1 for all calls, ignoring
+    // the processing details, so an empty one is fine
+    ProcessingDetails emptyProcessingDetails =
+        new ProcessingDetails(TimeUnit.MILLISECONDS);
+    scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails);
+    return priority;
   }
   }
 }
 }

+ 57 - 45
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -18,7 +18,6 @@
 
 
 package org.apache.hadoop.ipc;
 package org.apache.hadoop.ipc;
 
 
-import com.google.common.base.Supplier;
 import com.google.protobuf.ServiceException;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configuration;
@@ -1195,15 +1194,6 @@ public class TestRPC extends TestRpcBase {
     Exception lastException = null;
     Exception lastException = null;
     proxy = getClient(addr, conf);
     proxy = getClient(addr, conf);
 
 
-    MetricsRecordBuilder rb1 =
-        getMetrics("DecayRpcSchedulerMetrics2." + ns);
-    final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
-        "DecayedCallVolume", rb1);
-    final long beginRawCallVolume = MetricsAsserts.getLongCounter(
-        "CallVolume", rb1);
-    final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
-        rb1);
-
     try {
     try {
       // start a sleep RPC call that sleeps 3s.
       // start a sleep RPC call that sleeps 3s.
       for (int i = 0; i < numClients; i++) {
       for (int i = 0; i < numClients; i++) {
@@ -1231,41 +1221,6 @@ public class TestRPC extends TestRpcBase {
         } else {
         } else {
           lastException = unwrapExeption;
           lastException = unwrapExeption;
         }
         }
-
-        // Lets Metric system update latest metrics
-        GenericTestUtils.waitFor(new Supplier<Boolean>() {
-          @Override
-          public Boolean get() {
-            MetricsRecordBuilder rb2 =
-              getMetrics("DecayRpcSchedulerMetrics2." + ns);
-            long decayedCallVolume1 = MetricsAsserts.getLongCounter(
-                "DecayedCallVolume", rb2);
-            long rawCallVolume1 = MetricsAsserts.getLongCounter(
-                "CallVolume", rb2);
-            int uniqueCaller1 = MetricsAsserts.getIntCounter(
-                "UniqueCallers", rb2);
-            long callVolumePriority0 = MetricsAsserts.getLongGauge(
-                "Priority.0.CompletedCallVolume", rb2);
-            long callVolumePriority1 = MetricsAsserts.getLongGauge(
-                "Priority.1.CompletedCallVolume", rb2);
-            double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
-                "Priority.0.AvgResponseTime", rb2);
-            double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
-                "Priority.1.AvgResponseTime", rb2);
-
-            LOG.info("DecayedCallVolume: " + decayedCallVolume1);
-            LOG.info("CallVolume: " + rawCallVolume1);
-            LOG.info("UniqueCaller: " + uniqueCaller1);
-            LOG.info("Priority.0.CompletedCallVolume: " + callVolumePriority0);
-            LOG.info("Priority.1.CompletedCallVolume: " + callVolumePriority1);
-            LOG.info("Priority.0.AvgResponseTime: " + avgRespTimePriority0);
-            LOG.info("Priority.1.AvgResponseTime: " + avgRespTimePriority1);
-
-            return decayedCallVolume1 > beginDecayedCallVolume &&
-                rawCallVolume1 > beginRawCallVolume &&
-                uniqueCaller1 > beginUniqueCaller;
-          }
-        }, 30, 60000);
       }
       }
     } finally {
     } finally {
       executorService.shutdown();
       executorService.shutdown();
@@ -1277,6 +1232,63 @@ public class TestRPC extends TestRpcBase {
     assertTrue("RetriableException not received", succeeded);
     assertTrue("RetriableException not received", succeeded);
   }
   }
 
 
+  /** Test that the metrics for DecayRpcScheduler are updated. */
+  @Test (timeout=30000)
+  public void testDecayRpcSchedulerMetrics() throws Exception {
+    final String ns = CommonConfigurationKeys.IPC_NAMESPACE + ".0";
+    Server server = setupDecayRpcSchedulerandTestServer(ns + ".");
+
+    MetricsRecordBuilder rb1 =
+        getMetrics("DecayRpcSchedulerMetrics2." + ns);
+    final long beginDecayedCallVolume = MetricsAsserts.getLongCounter(
+        "DecayedCallVolume", rb1);
+    final long beginRawCallVolume = MetricsAsserts.getLongCounter(
+        "CallVolume", rb1);
+    final int beginUniqueCaller = MetricsAsserts.getIntCounter("UniqueCallers",
+        rb1);
+
+    TestRpcService proxy = getClient(addr, conf);
+    try {
+      for (int i = 0; i < 2; i++) {
+        proxy.sleep(null, newSleepRequest(100));
+      }
+
+      // Lets Metric system update latest metrics
+      GenericTestUtils.waitFor(() -> {
+        MetricsRecordBuilder rb2 =
+            getMetrics("DecayRpcSchedulerMetrics2." + ns);
+        long decayedCallVolume1 = MetricsAsserts.getLongCounter(
+            "DecayedCallVolume", rb2);
+        long rawCallVolume1 = MetricsAsserts.getLongCounter(
+            "CallVolume", rb2);
+        int uniqueCaller1 = MetricsAsserts.getIntCounter(
+            "UniqueCallers", rb2);
+        long callVolumePriority0 = MetricsAsserts.getLongGauge(
+            "Priority.0.CompletedCallVolume", rb2);
+        long callVolumePriority1 = MetricsAsserts.getLongGauge(
+            "Priority.1.CompletedCallVolume", rb2);
+        double avgRespTimePriority0 = MetricsAsserts.getDoubleGauge(
+            "Priority.0.AvgResponseTime", rb2);
+        double avgRespTimePriority1 = MetricsAsserts.getDoubleGauge(
+            "Priority.1.AvgResponseTime", rb2);
+
+        LOG.info("DecayedCallVolume: {}", decayedCallVolume1);
+        LOG.info("CallVolume: {}", rawCallVolume1);
+        LOG.info("UniqueCaller: {}", uniqueCaller1);
+        LOG.info("Priority.0.CompletedCallVolume: {}", callVolumePriority0);
+        LOG.info("Priority.1.CompletedCallVolume: {}", callVolumePriority1);
+        LOG.info("Priority.0.AvgResponseTime: {}", avgRespTimePriority0);
+        LOG.info("Priority.1.AvgResponseTime: {}", avgRespTimePriority1);
+
+        return decayedCallVolume1 > beginDecayedCallVolume &&
+            rawCallVolume1 > beginRawCallVolume &&
+            uniqueCaller1 > beginUniqueCaller;
+      }, 30, 60000);
+    } finally {
+      stop(server, proxy);
+    }
+  }
+
   private Server setupDecayRpcSchedulerandTestServer(String ns)
   private Server setupDecayRpcSchedulerandTestServer(String ns)
       throws Exception {
       throws Exception {
     final int queueSizePerHandler = 3;
     final int queueSizePerHandler = 3;

+ 86 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedTimeCostProvider.java

@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ipc;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.ProcessingDetails.Timing;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKEXCLUSIVE_WEIGHT;
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKFREE_WEIGHT;
+import static org.apache.hadoop.ipc.WeightedTimeCostProvider.DEFAULT_LOCKSHARED_WEIGHT;
+import static org.junit.Assert.assertEquals;
+
+/** Tests for {@link WeightedTimeCostProvider}. */
+public class TestWeightedTimeCostProvider {
+
+  private static final int QUEUE_TIME = 3;
+  private static final int LOCKFREE_TIME = 5;
+  private static final int LOCKSHARED_TIME = 7;
+  private static final int LOCKEXCLUSIVE_TIME = 11;
+
+  private WeightedTimeCostProvider costProvider;
+  private ProcessingDetails processingDetails;
+
+  @Before
+  public void setup() {
+    costProvider = new WeightedTimeCostProvider();
+    processingDetails = new ProcessingDetails(TimeUnit.MILLISECONDS);
+    processingDetails.set(Timing.QUEUE, QUEUE_TIME);
+    processingDetails.set(Timing.LOCKFREE, LOCKFREE_TIME);
+    processingDetails.set(Timing.LOCKSHARED, LOCKSHARED_TIME);
+    processingDetails.set(Timing.LOCKEXCLUSIVE, LOCKEXCLUSIVE_TIME);
+  }
+
+  @Test(expected = AssertionError.class)
+  public void testGetCostBeforeInit() {
+    costProvider.getCost(null);
+  }
+
+  @Test
+  public void testGetCostDefaultWeights() {
+    costProvider.init("foo", new Configuration());
+    long actualCost = costProvider.getCost(processingDetails);
+    long expectedCost = DEFAULT_LOCKFREE_WEIGHT * LOCKFREE_TIME
+        + DEFAULT_LOCKSHARED_WEIGHT * LOCKSHARED_TIME
+        + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME;
+    assertEquals(expectedCost, actualCost);
+  }
+
+  @Test
+  public void testGetCostConfiguredWeights() {
+    Configuration conf = new Configuration();
+    int queueWeight = 1000;
+    int lockfreeWeight = 10000;
+    int locksharedWeight = 100000;
+    conf.setInt("foo.weighted-cost.queue", queueWeight);
+    conf.setInt("foo.weighted-cost.lockfree", lockfreeWeight);
+    conf.setInt("foo.weighted-cost.lockshared", locksharedWeight);
+    conf.setInt("bar.weighted-cost.lockexclusive", 0); // should not apply
+    costProvider.init("foo", conf);
+    long actualCost = costProvider.getCost(processingDetails);
+    long expectedCost = queueWeight * QUEUE_TIME
+        + lockfreeWeight * LOCKFREE_TIME
+        + locksharedWeight * LOCKSHARED_TIME
+        + DEFAULT_LOCKEXCLUSIVE_WEIGHT * LOCKEXCLUSIVE_TIME;
+    assertEquals(expectedCost, actualCost);
+  }
+}