Browse Source

HADOOP-10597. RPC Server signals backoff to clients when all request queues are full. (Contributed by Ming Ma)

(cherry picked from commit edbeefdb052bffd0a3ba2f00c8a5a376017fbf33)
Arpit Agarwal 10 năm trước cách đây
mục cha
commit
c60cbb3396

+ 3 - 0
hadoop-common-project/hadoop-common/CHANGES.txt

@@ -9,6 +9,9 @@ Release 2.7.4 - UNRELEASED
   IMPROVEMENTS
     HADOOP-12259. Utility to Dynamic port allocation (brahmareddy via rkanter)
 
+    HADOOP-10597. RPC Server signals backoff to clients when all request
+    queues are full. (Ming Ma via Arpit Agarwal)
+
   OPTIMIZATIONS
 
   BUG FIXES

+ 2 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -95,6 +95,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String IPC_CALLQUEUE_NAMESPACE = "ipc";
   public static final String IPC_CALLQUEUE_IMPL_KEY = "callqueue.impl";
   public static final String IPC_CALLQUEUE_IDENTITY_PROVIDER_KEY = "identity-provider.impl";
+  public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
+  public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
 
   /** This is for specifying the implementation for the mappings from
    * hostnames to the racks they belong to

+ 18 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

@@ -39,16 +39,19 @@ public class CallQueueManager<E> {
       Class<?> queneClass, Class<E> elementClass) {
     return (Class<? extends BlockingQueue<E>>)queneClass;
   }
-  
+  private final boolean clientBackOffEnabled;
+
   // Atomic refs point to active callQueue
   // We have two so we can better control swapping
   private final AtomicReference<BlockingQueue<E>> putRef;
   private final AtomicReference<BlockingQueue<E>> takeRef;
 
   public CallQueueManager(Class<? extends BlockingQueue<E>> backingClass,
-      int maxQueueSize, String namespace, Configuration conf) {
+      boolean clientBackOffEnabled, int maxQueueSize, String namespace,
+      Configuration conf) {
     BlockingQueue<E> bq = createCallQueueInstance(backingClass,
       maxQueueSize, namespace, conf);
+    this.clientBackOffEnabled = clientBackOffEnabled;
     this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
     this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
     LOG.info("Using callQueue: " + backingClass + " queueCapacity: " +
@@ -100,6 +103,10 @@ public class CallQueueManager<E> {
       " could not be constructed.");
   }
 
+  boolean isClientBackoffEnabled() {
+    return clientBackOffEnabled;
+  }
+
   /**
    * Insert e into the backing queue or block until we can.
    * If we block and the queue changes on us, we will insert while the
@@ -109,6 +116,15 @@ public class CallQueueManager<E> {
     putRef.get().put(e);
   }
 
+  /**
+   * Insert e into the backing queue.
+   * Return true if e is queued.
+   * Return false if the queue is full.
+   */
+  public boolean offer(E e) throws InterruptedException {
+    return putRef.get().offer(e);
+  }
+
   /**
    * Retrieve an E from the backing queue or block until we can.
    * Guaranteed to return an element from the current queue.

+ 34 - 2
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

@@ -499,6 +499,17 @@ public abstract class Server {
     callQueue.swapQueue(getQueueClass(prefix, conf), maxQueueSize, prefix, conf);
   }
 
+  /**
+   * Get from config if client backoff is enabled on that port.
+   */
+  static boolean getClientBackoffEnable(
+      String prefix, Configuration conf) {
+    String name = prefix + "." +
+        CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
+    return conf.getBoolean(name,
+        CommonConfigurationKeys.IPC_BACKOFF_ENABLE_DEFAULT);
+  }
+
   /** A call queued for handling. */
   public static class Call implements Schedulable {
     private final int callId;             // the client's call id
@@ -1889,10 +1900,31 @@ public abstract class Server {
           rpcRequest, this, ProtoUtil.convert(header.getRpcKind()),
           header.getClientId().toByteArray(), traceSpan);
 
-      callQueue.put(call);              // queue the call; maybe blocked here
+      if (callQueue.isClientBackoffEnabled()) {
+        // if RPC queue is full, we will ask the RPC client to back off by
+        // throwing RetriableException. Whether RPC client will honor
+        // RetriableException and retry depends on client ipc retry policy.
+        // For example, FailoverOnNetworkExceptionRetry handles
+        // RetriableException.
+        queueRequestOrAskClientToBackOff(call);
+      } else {
+        callQueue.put(call);              // queue the call; maybe blocked here
+      }
       incRpcCount();  // Increment the rpc count
     }
 
+    private void queueRequestOrAskClientToBackOff(Call call)
+        throws WrappedRpcServerException, InterruptedException {
+      // If rpc queue is full, we will ask the client to back off.
+      boolean isCallQueued = callQueue.offer(call);
+      if (!isCallQueued) {
+        rpcMetrics.incrClientBackoff();
+        RetriableException retriableException =
+            new RetriableException("Server is too busy.");
+        throw new WrappedRpcServerException(
+            RpcErrorCodeProto.ERROR_RPC_SERVER, retriableException);
+      }
+    }
 
     /**
      * Establish RPC connection setup by negotiating SASL if required, then
@@ -2219,7 +2251,7 @@ public abstract class Server {
     // Setup appropriate callqueue
     final String prefix = getQueueClassPrefix();
     this.callQueue = new CallQueueManager<Call>(getQueueClass(prefix, conf),
-        maxQueueSize, prefix, conf);
+        getClientBackoffEnable(prefix, conf), maxQueueSize, prefix, conf);
 
     this.secretManager = (SecretManager<TokenIdentifier>) secretManager;
     this.authorize = 

+ 10 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java

@@ -95,6 +95,8 @@ public class RpcMetrics {
   MutableCounterLong rpcAuthorizationFailures;
   @Metric("Number of authorization sucesses")
   MutableCounterLong rpcAuthorizationSuccesses;
+  @Metric("Number of client backoff requests")
+  MutableCounterLong rpcClientBackoff;
 
   @Metric("Number of open connections") public int numOpenConnections() {
     return server.getNumOpenConnections();
@@ -192,4 +194,12 @@ public class RpcMetrics {
       }
     }
   }
+
+  /**
+   * One client backoff event
+   */
+  //@Override
+  public void incrClientBackoff() {
+    rpcClientBackoff.incr();
+  }
 }

+ 5 - 4
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java

@@ -145,21 +145,21 @@ public class TestCallQueueManager {
 
   @Test
   public void testCallQueueCapacity() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
 
     assertCanPut(manager, 10, 20); // Will stop at 10 due to capacity
   }
 
   @Test
   public void testEmptyConsume() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, 10, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, false, 10, "", null);
 
     assertCanTake(manager, 0, 1); // Fails since it's empty
   }
 
   @Test(timeout=60000)
   public void testSwapUnderContention() throws InterruptedException {
-    manager = new CallQueueManager<FakeCall>(queueClass, 5000, "", null);
+    manager = new CallQueueManager<FakeCall>(queueClass, false, 5000, "", null);
 
     ArrayList<Putter> producers = new ArrayList<Putter>();
     ArrayList<Taker> consumers = new ArrayList<Taker>();
@@ -235,7 +235,8 @@ public class TestCallQueueManager {
   @Test
   public void testInvocationException() throws InterruptedException {
     try {
-      new CallQueueManager<ExceptionFakeCall>(exceptionQueueClass, 10, "", null);
+      new CallQueueManager<ExceptionFakeCall>(
+          exceptionQueueClass, false, 10, "", null);
       fail();
     } catch (RuntimeException re) {
       assertTrue(re.getCause() instanceof IllegalArgumentException);

+ 58 - 0
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java

@@ -1081,6 +1081,64 @@ public class TestRPC {
     }
   }
 
+  /**
+   *  Test RPC backoff.
+   */
+  @Test (timeout=30000)
+  public void testClientBackOff() throws Exception {
+    boolean succeeded = false;
+    final int numClients = 2;
+    final List<Future<Void>> res = new ArrayList<Future<Void>>();
+    final ExecutorService executorService =
+        Executors.newFixedThreadPool(numClients);
+    final Configuration conf = new Configuration();
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+    conf.setBoolean(CommonConfigurationKeys.IPC_CALLQUEUE_NAMESPACE +
+        ".0." + CommonConfigurationKeys.IPC_BACKOFF_ENABLE, true);
+    final Server server = new RPC.Builder(conf)
+        .setProtocol(TestProtocol.class).setInstance(new TestImpl())
+        .setBindAddress(ADDRESS).setPort(0)
+        .setQueueSizePerHandler(1).setNumHandlers(1).setVerbose(true)
+        .build();
+    server.start();
+
+    final TestProtocol proxy =
+        RPC.getProxy(TestProtocol.class, TestProtocol.versionID,
+            NetUtils.getConnectAddress(server), conf);
+    try {
+      // start a sleep RPC call to consume the only handler thread.
+      // Start another sleep RPC call to make callQueue full.
+      // Start another sleep RPC call to make reader thread block on CallQueue.
+      for (int i = 0; i < numClients; i++) {
+        res.add(executorService.submit(
+            new Callable<Void>() {
+              @Override
+              public Void call() throws IOException, InterruptedException {
+                proxy.sleep(100000);
+                return null;
+              }
+            }));
+      }
+      while (server.getCallQueueLen() != 1
+          && countThreads(CallQueueManager.class.getName()) != 1) {
+        Thread.sleep(100);
+      }
+      try {
+        proxy.sleep(100);
+      } catch (RemoteException e) {
+        IOException unwrapExeption = e.unwrapRemoteException();
+        if (unwrapExeption instanceof RetriableException) {
+          succeeded = true;
+        }
+      }
+    } finally {
+      server.stop();
+      RPC.stopProxy(proxy);
+      executorService.shutdown();
+    }
+    assertTrue("RetriableException not received", succeeded);
+  }
+
   /**
    *  Test RPC timeout.
    */