Переглянути джерело

HADOOP-17165. Implement service-user feature in DecayRPCScheduler. (#2240)

Takanobu Asanuma 4 роки тому
батько
коміт
e5fe326270

+ 36 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java

@@ -20,11 +20,14 @@ package org.apache.hadoop.ipc;
 
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
@@ -108,6 +111,13 @@ public class DecayRpcScheduler implements RpcScheduler,
   public static final String IPC_FCQ_DECAYSCHEDULER_THRESHOLDS_KEY =
       "faircallqueue.decay-scheduler.thresholds";
 
+  /**
+   *  Service users will always be scheduled into the highest-priority queue.
+   *  They are specified as a comma-separated list.
+   */
+  public static final String IPC_DECAYSCHEDULER_SERVICE_USERS_KEY =
+      "decay-scheduler.service-users";
+
   // Specifies the identity to use when the IdentityProvider cannot handle
   // a schedulable.
   public static final String DECAYSCHEDULER_UNKNOWN_IDENTITY =
@@ -178,6 +188,7 @@ public class DecayRpcScheduler implements RpcScheduler,
   private static final double PRECISION = 0.0001;
   private MetricsProxy metricsProxy;
   private final CostProvider costProvider;
+  private Set<String> serviceUserNames;
 
   /**
    * This TimerTask will call decayCurrentCosts until
@@ -229,6 +240,7 @@ public class DecayRpcScheduler implements RpcScheduler,
         conf);
     this.backOffResponseTimeThresholds =
         parseBackOffResponseTimeThreshold(ns, conf, numLevels);
+    this.serviceUserNames = this.parseServiceUserNames(ns, conf);
 
     // Setup response time metrics
     responseTimeTotalInCurrWindow = new AtomicLongArray(numLevels);
@@ -359,6 +371,12 @@ public class DecayRpcScheduler implements RpcScheduler,
     return decimals;
   }
 
+  private Set<String> parseServiceUserNames(String ns, Configuration conf) {
+    Collection<String> collection = conf.getStringCollection(
+        ns + "." + IPC_DECAYSCHEDULER_SERVICE_USERS_KEY);
+    return new HashSet<>(collection);
+  }
+
   /**
    * Generate default thresholds if user did not specify. Strategy is
    * to halve each time, since queue usage tends to be exponential.
@@ -486,7 +504,7 @@ public class DecayRpcScheduler implements RpcScheduler,
       AtomicLong value = entry.getValue().get(0);
 
       long snapshot = value.get();
-      int computedLevel = computePriorityLevel(snapshot);
+      int computedLevel = computePriorityLevel(snapshot, id);
 
       nextCache.put(id, computedLevel);
     }
@@ -534,9 +552,15 @@ public class DecayRpcScheduler implements RpcScheduler,
    * Given the cost for an identity, compute a scheduling decision.
    *
    * @param cost the cost for an identity
+   * @param identity the identity of the user
    * @return scheduling decision from 0 to numLevels - 1
    */
-  private int computePriorityLevel(long cost) {
+  private int computePriorityLevel(long cost, Object identity) {
+    // The priority for service users is always 0
+    if (isServiceUser((String)identity)) {
+      return 0;
+    }
+
     long totalCallSnapshot = totalDecayedCallCost.get();
 
     double proportion = 0;
@@ -576,7 +600,7 @@ public class DecayRpcScheduler implements RpcScheduler,
     // Cache was no good, compute it
     List<AtomicLong> costList = callCosts.get(identity);
     long currentCost = costList == null ? 0 : costList.get(0).get();
-    int priority = computePriorityLevel(currentCost);
+    int priority = computePriorityLevel(currentCost, identity);
     LOG.debug("compute priority for {} priority {}", identity, priority);
     return priority;
   }
@@ -598,6 +622,10 @@ public class DecayRpcScheduler implements RpcScheduler,
     return cachedOrComputedPriorityLevel(identity);
   }
 
+  private boolean isServiceUser(String userName) {
+    return this.serviceUserNames.contains(userName);
+  }
+
   @Override
   public boolean shouldBackOff(Schedulable obj) {
     Boolean backOff = false;
@@ -698,6 +726,11 @@ public class DecayRpcScheduler implements RpcScheduler,
     return thresholds;
   }
 
+  @VisibleForTesting
+  Set<String> getServiceUserNames() {
+    return serviceUserNames;
+  }
+
   @VisibleForTesting
   void forceDecay() {
     decayCurrentCosts();

+ 8 - 0
hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

@@ -2584,6 +2584,14 @@
   </description>
 </property>
 
+<property>
+  <name>ipc.[port_number].decay-scheduler.service-users</name>
+  <value></value>
+  <description>Service users will always be scheduled into the highest-priority
+    queue. They are specified as a comma-separated list.
+  </description>
+</property>
+
 <property>
   <name>ipc.[port_number].weighted-cost.lockshared</name>
   <value>10</value>

+ 4 - 0
hadoop-common-project/hadoop-common/src/site/markdown/FairCallQueue.md

@@ -91,6 +91,9 @@ This is configurable via the **identity provider**, which defaults to the **User
 provider simply uses the username of the client submitting the request. However, a custom identity provider can be used
 to performing throttling based on other groupings, or using an external identity provider.
 
+If particular users submit important requests and you don't want to limit them, you can set them up as the
+**service-users**. They are always scheduled into the high-priority queue.
+
 ### Cost-based Fair Call Queue
 
 Though the fair call queue itself does a good job of mitigating the impact from users who submit a very high _number_
@@ -138,6 +141,7 @@ omitted.
 | decay-scheduler.backoff.responsetime.enable | DecayRpcScheduler | Whether or not to enable the backoff by response time feature. | false |
 | decay-scheduler.backoff.responsetime.thresholds | DecayRpcScheduler | The response time thresholds, as time durations, for each priority queue. If the average response time for a queue is above this threshold, backoff will occur in lower priority queues. This should be a comma-separated list of length equal to the number of priority levels. | Threshold increases by 10s per level (e.g., for 4 levels: `10s,20s,30s,40s`) |
 | decay-scheduler.metrics.top.user.count | DecayRpcScheduler | The number of top (i.e., heaviest) users to emit metric information about. | 10 |
+| decay-scheduler.service-users | DecayRpcScheduler | Service users will always be scheduled into the highest-priority queue. They are specified as a comma-separated list. |  |
 | weighted-cost.lockshared | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds a shared (read) lock. | 10 |
 | weighted-cost.lockexclusive | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phase which holds an exclusive (write) lock. | 100 |
 | weighted-cost.{handler,lockfree,response} | WeightedTimeCostProvider | The weight multiplier to apply to the time spent in the processing phases which do not involve holding a lock. See `org.apache.hadoop.ipc.ProcessingDetails.Timing` for more details on each phase. | 1 |

+ 2 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java

@@ -172,6 +172,8 @@ public class TestCommonConfigurationFields extends TestConfigurationFieldsBase {
         "ipc.[port_number].decay-scheduler.backoff.responsetime.thresholds");
     xmlPropsToSkipCompare.add(
         "ipc.[port_number].decay-scheduler.metrics.top.user.count");
+    xmlPropsToSkipCompare.add(
+        "ipc.[port_number].decay-scheduler.service-users");
     xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockshared");
     xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockexclusive");
     xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.handler");

+ 39 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestDecayRpcScheduler.java

@@ -20,9 +20,14 @@ package org.apache.hadoop.ipc;
 
 import static java.lang.Thread.sleep;
 
+import java.util.Map;
+import org.eclipse.jetty.util.ajax.JSON;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -383,4 +388,37 @@ public class TestDecayRpcScheduler {
     scheduler.addResponseTime("ignored", mockCall, emptyProcessingDetails);
     return priority;
   }
+
+  @Test
+  public void testServiceUsers() {
+    Configuration conf = new Configuration();
+    conf.setLong("ipc.19."
+        + DecayRpcScheduler.IPC_SCHEDULER_DECAYSCHEDULER_PERIOD_KEY, 999999);
+    conf.set("ipc.19." + DecayRpcScheduler.IPC_DECAYSCHEDULER_SERVICE_USERS_KEY,
+        "service1,service2");
+    scheduler = new DecayRpcScheduler(4, "ipc.19", conf);
+
+    assertTrue(scheduler.getServiceUserNames().contains("service1"));
+    assertTrue(scheduler.getServiceUserNames().contains("service2"));
+
+    for (int i = 0; i < 10; i++) {
+      getPriorityIncrementCallCount("user1");
+      getPriorityIncrementCallCount("service1");
+      getPriorityIncrementCallCount("service2");
+    }
+
+    assertNotEquals(0, scheduler.getPriorityLevel(mockCall("user1")));
+    // The priorities of service users should be always 0.
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("service1")));
+    assertEquals(0, scheduler.getPriorityLevel(mockCall("service2")));
+
+    // DecayRpcScheduler caches priorities after decay
+    scheduler.forceDecay();
+    // Check priorities on cache
+    String summary = scheduler.getSchedulingDecisionSummary();
+    Map<String, Object> summaryMap = (Map<String, Object>) JSON.parse(summary);
+    assertNotEquals(0L, summaryMap.get("user1"));
+    assertEquals(0L, summaryMap.get("service1"));
+    assertEquals(0L, summaryMap.get("service2"));
+  }
 }