Browse Source

HADOOP-16268. Allow StandbyException to be thrown as CallQueueOverflowException when RPC call queue is filled. Contributed by CR Hota.

Erik Krogen 5 years ago
parent
commit
337e9b794d

+ 5 - 0
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java

@@ -109,6 +109,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
   public static final String IPC_COST_PROVIDER_KEY = "cost-provider.impl";
   public static final String IPC_BACKOFF_ENABLE = "backoff.enable";
   public static final boolean IPC_BACKOFF_ENABLE_DEFAULT = false;
+  // Callqueue overflow trigger failover for stateless servers.
+  public static final String IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE =
+      "callqueue.overflow.trigger.failover";
+  public static final boolean IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT =
+      false;
 
   /**
    * IPC scheduler priority levels.

+ 14 - 3
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/CallQueueManager.java

@@ -61,6 +61,7 @@ public class CallQueueManager<E extends Schedulable>
   }
 
   private volatile boolean clientBackOffEnabled;
+  private boolean serverFailOverEnabled;
 
   // Atomic refs point to active callQueue
   // We have two so we can better control swapping
@@ -79,6 +80,10 @@ public class CallQueueManager<E extends Schedulable>
     BlockingQueue<E> bq = createCallQueueInstance(backingClass,
         priorityLevels, maxQueueSize, namespace, conf);
     this.clientBackOffEnabled = clientBackOffEnabled;
+    this.serverFailOverEnabled = conf.getBoolean(
+        namespace + "." +
+        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
+        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
     this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
     this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
     LOG.info("Using callQueue: {}, queueCapacity: {}, " +
@@ -88,11 +93,12 @@ public class CallQueueManager<E extends Schedulable>
 
   @VisibleForTesting // only!
   CallQueueManager(BlockingQueue<E> queue, RpcScheduler scheduler,
-      boolean clientBackOffEnabled) {
+      boolean clientBackOffEnabled, boolean serverFailOverEnabled) {
     this.putRef = new AtomicReference<BlockingQueue<E>>(queue);
     this.takeRef = new AtomicReference<BlockingQueue<E>>(queue);
     this.scheduler = scheduler;
     this.clientBackOffEnabled = clientBackOffEnabled;
+    this.serverFailOverEnabled = serverFailOverEnabled;
   }
 
   private static <T extends RpcScheduler> T createScheduler(
@@ -249,7 +255,9 @@ public class CallQueueManager<E extends Schedulable>
 
   // ideally this behavior should be controllable too.
   private void throwBackoff() throws IllegalStateException {
-    throw CallQueueOverflowException.DISCONNECT;
+    throw serverFailOverEnabled ?
+        CallQueueOverflowException.FAILOVER :
+        CallQueueOverflowException.DISCONNECT;
   }
 
   /**
@@ -421,7 +429,10 @@ public class CallQueueManager<E extends Schedulable>
         new CallQueueOverflowException(
             new RetriableException(TOO_BUSY + " - disconnecting"),
             RpcStatusProto.FATAL);
-
+    static final CallQueueOverflowException FAILOVER =
+        new CallQueueOverflowException(
+            new StandbyException(TOO_BUSY + " - disconnect and failover"),
+            RpcStatusProto.FATAL);
     CallQueueOverflowException(final IOException ioe,
         final RpcStatusProto status) {
       super("Queue full", new RpcServerException(ioe.getMessage(), ioe){

+ 19 - 4
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java

@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException;
 import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
@@ -77,6 +78,8 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
   /* Statistic tracking */
   private final ArrayList<AtomicLong> overflowedCalls;
 
+  /* Failover if queue is filled up */
+  private boolean serverFailOverEnabled;
   /**
    * Create a FairCallQueue.
    * @param capacity the total size of all sub-queues
@@ -108,6 +111,10 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
       }
       this.overflowedCalls.add(new AtomicLong(0));
     }
+    this.serverFailOverEnabled = conf.getBoolean(
+        ns + "." +
+        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
+        CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE_DEFAULT);
 
     this.multiplexer = new WeightedRoundRobinMultiplexer(numQueues, ns, conf);
     // Make this the active source of metrics
@@ -158,10 +165,18 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
     final int priorityLevel = e.getPriorityLevel();
     // try offering to all queues.
     if (!offerQueues(priorityLevel, e, true)) {
-      // only disconnect the lowest priority users that overflow the queue.
-      throw (priorityLevel == queues.size() - 1)
-          ? CallQueueOverflowException.DISCONNECT
-          : CallQueueOverflowException.KEEPALIVE;
+
+      CallQueueOverflowException ex;
+      if (serverFailOverEnabled) {
+        // Signal clients to failover and try a separate server.
+        ex = CallQueueOverflowException.FAILOVER;
+      } else if (priorityLevel == queues.size() - 1){
+        // only disconnect the lowest priority users that overflow the queue.
+        ex = CallQueueOverflowException.DISCONNECT;
+      } else {
+        ex = CallQueueOverflowException.KEEPALIVE;
+      }
+      throw ex;
     }
     return true;
   }

+ 14 - 1
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestCallQueueManager.java

@@ -384,9 +384,22 @@ public class TestCallQueueManager {
     RpcScheduler scheduler = Mockito.mock(RpcScheduler.class);
     BlockingQueue<Schedulable> queue = Mockito.mock(BlockingQueue.class);
     CallQueueManager<Schedulable> cqm =
-        Mockito.spy(new CallQueueManager<>(queue, scheduler, false));
+        Mockito.spy(new CallQueueManager<>(queue, scheduler, false, false));
+    CallQueueManager<Schedulable> cqmTriggerFailover =
+            Mockito.spy(new CallQueueManager<>(queue, scheduler, false, true));
     Schedulable call = new FakeCall(0);
 
+    // call queue exceptions that trigger failover
+    cqmTriggerFailover.setClientBackoffEnabled(true);
+    doReturn(Boolean.TRUE).when(cqmTriggerFailover).shouldBackOff(call);
+    try {
+      cqmTriggerFailover.put(call);
+      fail("didn't fail");
+    } catch (Exception ex) {
+      assertEquals(CallQueueOverflowException.FAILOVER.getCause().getMessage(),
+          ex.getCause().getMessage());
+    }
+
     // call queue exceptions passed threw as-is
     doThrow(CallQueueOverflowException.KEEPALIVE).when(queue).add(call);
     try {

+ 73 - 7
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java

@@ -28,6 +28,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.times;
 
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.junit.Before;
 import org.junit.Test;
 import static org.junit.Assert.*;
@@ -148,6 +149,65 @@ public class TestFairCallQueue {
     assertNull(fcq.poll());
   }
 
+  @SuppressWarnings("unchecked")
+  @Test
+  public void testInsertionWithFailover() {
+    Configuration conf = new Configuration();
+    // Config for server to throw StandbyException instead of the
+    // regular RetriableException if call queue is full.
+    conf.setBoolean(
+        "ns." + CommonConfigurationKeys.IPC_CALLQUEUE_SERVER_FAILOVER_ENABLE,
+        true);
+    // 3 queues, 2 slots each.
+    fcq = Mockito.spy(new FairCallQueue<>(3, 6, "ns", conf));
+
+    Schedulable p0 = mockCall("a", 0);
+    Schedulable p1 = mockCall("b", 1);
+
+    // add to first queue.
+    addToQueueAndVerify(p0, 1, 0, 0);
+    // 0:x- 1:-- 2:--
+
+    // add to second queue.
+    addToQueueAndVerify(p1, 0, 1, 0);
+    // 0:x- 1:x- 2:--
+
+    // add to first queue.
+    addToQueueAndVerify(p0, 1, 0, 0);
+    // 0:xx 1:x- 2:--
+
+    // add to first full queue spills over to second.
+    addToQueueAndVerify(p0, 1, 1, 0);
+    // 0:xx 1:xx 2:--
+
+    // add to second full queue spills over to third.
+    addToQueueAndVerify(p1, 0, 1, 1);
+    // 0:xx 1:xx 2:x-
+
+    // add to first and second full queue spills over to third.
+    addToQueueAndVerify(p0, 1, 1, 1);
+    // 0:xx 1:xx 2:xx
+
+    // adding non-lowest priority with all queues full throws a
+    // standby exception for client to try another server.
+    Mockito.reset(fcq);
+    try {
+      fcq.add(p0);
+      fail("didn't fail");
+    } catch (IllegalStateException ise) {
+      checkOverflowException(ise, RpcStatusProto.FATAL, true);
+    }
+  }
+
+  private void addToQueueAndVerify(Schedulable call, int expectedQueue0,
+      int expectedQueue1, int expectedQueue2) {
+    Mockito.reset(fcq);
+    fcq.add(call);
+    Mockito.verify(fcq, times(expectedQueue0)).offerQueue(0, call);
+    Mockito.verify(fcq, times(expectedQueue1)).offerQueue(1, call);
+    Mockito.verify(fcq, times(expectedQueue2)).offerQueue(2, call);
+  }
+
   @SuppressWarnings("unchecked") // for mock reset.
   @Test
   public void testInsertion() throws Exception {
@@ -215,7 +275,7 @@ public class TestFairCallQueue {
       fcq.add(p0);
       fail("didn't fail");
     } catch (IllegalStateException ise) {
-      checkOverflowException(ise, RpcStatusProto.ERROR);
+      checkOverflowException(ise, RpcStatusProto.ERROR, false);
     }
     Mockito.verify(fcq, times(1)).offerQueue(0, p0);
     Mockito.verify(fcq, times(1)).offerQueue(1, p0);
@@ -228,7 +288,7 @@ public class TestFairCallQueue {
       fcq.add(p1);
       fail("didn't fail");
     } catch (IllegalStateException ise) {
-      checkOverflowException(ise, RpcStatusProto.ERROR);
+      checkOverflowException(ise, RpcStatusProto.ERROR, false);
     }
     Mockito.verify(fcq, times(0)).offerQueue(0, p1);
     Mockito.verify(fcq, times(1)).offerQueue(1, p1);
@@ -241,7 +301,7 @@ public class TestFairCallQueue {
       fcq.add(p2);
       fail("didn't fail");
     } catch (IllegalStateException ise) {
-      checkOverflowException(ise, RpcStatusProto.FATAL);
+      checkOverflowException(ise, RpcStatusProto.FATAL, false);
     }
     Mockito.verify(fcq, times(0)).offerQueue(0, p2);
     Mockito.verify(fcq, times(0)).offerQueue(1, p2);
@@ -280,7 +340,8 @@ public class TestFairCallQueue {
     Mockito.verify(fcq, times(1)).putQueue(2, p2);
   }
 
-  private void checkOverflowException(Exception ex, RpcStatusProto status) {
+  private void checkOverflowException(Exception ex, RpcStatusProto status,
+      boolean failOverTriggered) {
     // should be an overflow exception
     assertTrue(ex.getClass().getName() + " != CallQueueOverflowException",
         ex instanceof CallQueueOverflowException);
@@ -289,10 +350,15 @@ public class TestFairCallQueue {
     assertTrue(ioe.getClass().getName() + " != RpcServerException",
         ioe instanceof RpcServerException);
     RpcServerException rse = (RpcServerException)ioe;
-    // check error/fatal status and if it embeds a retriable ex.
+    // check error/fatal status and if it embeds a retriable ex or standby ex.
     assertEquals(status, rse.getRpcStatusProto());
-    assertTrue(rse.getClass().getName() + " != RetriableException",
-        rse.getCause() instanceof RetriableException);
+    if (failOverTriggered) {
+      assertTrue(rse.getClass().getName() + " != RetriableException",
+          rse.getCause() instanceof StandbyException);
+    } else {
+      assertTrue(rse.getClass().getName() + " != RetriableException",
+          rse.getCause() instanceof RetriableException);
+    }
   }
 
   //